claw-core 0.1.2

Embedded local database engine for ClawDB — an agent-native cognitive database
Documentation
//! Session lifecycle store for claw-core.
//!
//! The `sessions` table records when agent sessions start and end. Unlike the
//! `session_state` store (per-session key-value pairs), this store tracks the
//! lifecycle of sessions themselves.

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::SqlitePool;

use crate::error::{ClawError, ClawResult};
use crate::store::memory::{ListOptions, ListPage};

/// A session lifecycle record stored in the `sessions` table.
///
/// # Example
///
/// ```rust,no_run
/// # use claw_core::ClawEngine;
/// # async fn example() -> claw_core::ClawResult<()> {
/// # let engine = ClawEngine::open_default().await?;
/// let sid = engine.start_session().await?;
/// let session = engine.get_session(&sid).await?;
/// assert!(session.ended_at.is_none());
/// # Ok(())
/// # }
/// ```
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Session {
    /// Unique session identifier (a UUID string).
    pub id: String,
    /// Timestamp when the session was started.
    pub started_at: DateTime<Utc>,
    /// Timestamp when the session was ended, or `None` if still active.
    pub ended_at: Option<DateTime<Utc>>,
}

/// Data-access object for the `sessions` table.
///
/// Obtain an instance via [`SessionLifecycleStore::new`].
#[derive(Debug)]
pub struct SessionLifecycleStore<'a> {
    pool: &'a SqlitePool,
}

impl<'a> SessionLifecycleStore<'a> {
    /// Create a new [`SessionLifecycleStore`] bound to `pool`.
    pub fn new(pool: &'a SqlitePool) -> Self {
        SessionLifecycleStore { pool }
    }

    /// Start a new session and return its unique ID.
    ///
    /// # Errors
    ///
    /// Returns a [`ClawError`] if the SQL execution fails.
    pub async fn start(&self) -> ClawResult<String> {
        let id = uuid::Uuid::new_v4().to_string();
        let now = Utc::now().to_rfc3339();
        sqlx::query("INSERT INTO sessions (id, started_at) VALUES (?, ?)")
            .bind(&id)
            .bind(&now)
            .execute(self.pool)
            .await?;
        Ok(id)
    }

    /// Mark a session as ended.
    ///
    /// # Errors
    ///
    /// Returns [`ClawError::NotFound`] if the session does not exist.
    pub async fn end(&self, session_id: &str) -> ClawResult<()> {
        let now = Utc::now().to_rfc3339();
        let affected = sqlx::query("UPDATE sessions SET ended_at = ? WHERE id = ?")
            .bind(&now)
            .bind(session_id)
            .execute(self.pool)
            .await?
            .rows_affected();

        if affected == 0 {
            return Err(ClawError::NotFound {
                entity: "Session".to_string(),
                id: session_id.to_string(),
            });
        }
        Ok(())
    }

    /// Fetch a [`Session`] by its ID.
    ///
    /// # Errors
    ///
    /// Returns [`ClawError::NotFound`] if the session does not exist.
    pub async fn get(&self, session_id: &str) -> ClawResult<Session> {
        let row: Option<(String, String, Option<String>)> =
            sqlx::query_as("SELECT id, started_at, ended_at FROM sessions WHERE id = ?")
                .bind(session_id)
                .fetch_optional(self.pool)
                .await?;

        let (id, started_at_str, ended_at_str) = row.ok_or_else(|| ClawError::NotFound {
            entity: "Session".to_string(),
            id: session_id.to_string(),
        })?;

        Ok(Session {
            id,
            started_at: DateTime::parse_from_rfc3339(&started_at_str)
                .map_err(|e| ClawError::Store(e.to_string()))?
                .with_timezone(&Utc),
            ended_at: ended_at_str
                .map(|s| {
                    DateTime::parse_from_rfc3339(&s)
                        .map(|dt| dt.with_timezone(&Utc))
                        .map_err(|e| ClawError::Store(e.to_string()))
                })
                .transpose()?,
        })
    }

    /// List all sessions, ordered by `started_at` descending (newest first).
    ///
    /// # Errors
    ///
    /// Returns a [`ClawError`] if the query fails.
    pub async fn list(&self) -> ClawResult<Vec<Session>> {
        let rows: Vec<(String, String, Option<String>)> = sqlx::query_as(
            "SELECT id, started_at, ended_at FROM sessions ORDER BY started_at DESC",
        )
        .fetch_all(self.pool)
        .await?;

        rows.into_iter()
            .map(|(id, started_at_str, ended_at_str)| {
                Ok(Session {
                    id,
                    started_at: DateTime::parse_from_rfc3339(&started_at_str)
                        .map_err(|e| ClawError::Store(e.to_string()))?
                        .with_timezone(&Utc),
                    ended_at: ended_at_str
                        .map(|s| {
                            DateTime::parse_from_rfc3339(&s)
                                .map(|dt| dt.with_timezone(&Utc))
                                .map_err(|e| ClawError::Store(e.to_string()))
                        })
                        .transpose()?,
                })
            })
            .collect()
    }

    /// List sessions with keyset pagination.
    ///
    /// Sessions are ordered by `started_at` descending (newest first).  The
    /// cursor is the `id` of the last session on the previous page.  Returns
    /// `(records, next_cursor)` where `next_cursor` is `None` on the last
    /// page.
    ///
    /// # Errors
    ///
    /// Returns a [`ClawError`] if the query fails.
    pub async fn list_paginated(&self, opts: &ListOptions) -> ClawResult<ListPage<Session>> {
        let limit = opts.validated_limit() as i64;
        let fetch = limit.saturating_add(1);

        let rows: Vec<(String, String, Option<String>)> = match &opts.cursor {
            None => {
                sqlx::query_as(
                    "SELECT id, started_at, ended_at FROM sessions \
                 ORDER BY started_at DESC, id DESC \
                 LIMIT ?",
                )
                .bind(fetch)
                .fetch_all(self.pool)
                .await?
            }
            Some(cursor) => {
                sqlx::query_as(
                    "SELECT id, started_at, ended_at FROM sessions \
                 WHERE (started_at, id) < \
                     (SELECT started_at, id FROM sessions WHERE id = ?) \
                 ORDER BY started_at DESC, id DESC \
                 LIMIT ?",
                )
                .bind(cursor)
                .bind(fetch)
                .fetch_all(self.pool)
                .await?
            }
        };

        let has_more = rows.len() as i64 > limit;
        let page = if has_more {
            &rows[..limit as usize]
        } else {
            rows.as_slice()
        };

        let sessions: Vec<Session> = page
            .iter()
            .map(|(id, started_at_str, ended_at_str)| {
                Ok(Session {
                    id: id.clone(),
                    started_at: DateTime::parse_from_rfc3339(started_at_str)
                        .map_err(|e| ClawError::Store(e.to_string()))?
                        .with_timezone(&Utc),
                    ended_at: ended_at_str
                        .as_deref()
                        .map(|s| {
                            DateTime::parse_from_rfc3339(s)
                                .map(|dt| dt.with_timezone(&Utc))
                                .map_err(|e| ClawError::Store(e.to_string()))
                        })
                        .transpose()?,
                })
            })
            .collect::<ClawResult<_>>()?;

        let next_cursor = if has_more {
            sessions.last().map(|s| s.id.clone())
        } else {
            None
        };
        Ok(ListPage {
            items: sessions,
            next_cursor,
        })
    }
}