Skip to main content

claw_core/store/
session_lifecycle.rs

1//! Session lifecycle store for claw-core.
2//!
3//! The `sessions` table records when agent sessions start and end. Unlike the
4//! `session_state` store (per-session key-value pairs), this store tracks the
5//! lifecycle of sessions themselves.
6
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use sqlx::SqlitePool;
10
11use crate::error::{ClawError, ClawResult};
12use crate::store::memory::{ListOptions, ListPage};
13
14/// A session lifecycle record stored in the `sessions` table.
15///
16/// # Example
17///
18/// ```rust,no_run
19/// # use claw_core::ClawEngine;
20/// # async fn example() -> claw_core::ClawResult<()> {
21/// # let engine = ClawEngine::open_default().await?;
22/// let sid = engine.start_session().await?;
23/// let session = engine.get_session(&sid).await?;
24/// assert!(session.ended_at.is_none());
25/// # Ok(())
26/// # }
27/// ```
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct Session {
30    /// Unique session identifier (a UUID string).
31    pub id: String,
32    /// Timestamp when the session was started.
33    pub started_at: DateTime<Utc>,
34    /// Timestamp when the session was ended, or `None` if still active.
35    pub ended_at: Option<DateTime<Utc>>,
36}
37
38/// Data-access object for the `sessions` table.
39///
40/// Obtain an instance via [`SessionLifecycleStore::new`].
41#[derive(Debug)]
42pub struct SessionLifecycleStore<'a> {
43    pool: &'a SqlitePool,
44}
45
46impl<'a> SessionLifecycleStore<'a> {
47    /// Create a new [`SessionLifecycleStore`] bound to `pool`.
48    pub fn new(pool: &'a SqlitePool) -> Self {
49        SessionLifecycleStore { pool }
50    }
51
52    /// Start a new session and return its unique ID.
53    ///
54    /// # Errors
55    ///
56    /// Returns a [`ClawError`] if the SQL execution fails.
57    pub async fn start(&self) -> ClawResult<String> {
58        let id = uuid::Uuid::new_v4().to_string();
59        let now = Utc::now().to_rfc3339();
60        sqlx::query("INSERT INTO sessions (id, started_at) VALUES (?, ?)")
61            .bind(&id)
62            .bind(&now)
63            .execute(self.pool)
64            .await?;
65        Ok(id)
66    }
67
68    /// Mark a session as ended.
69    ///
70    /// # Errors
71    ///
72    /// Returns [`ClawError::NotFound`] if the session does not exist.
73    pub async fn end(&self, session_id: &str) -> ClawResult<()> {
74        let now = Utc::now().to_rfc3339();
75        let affected = sqlx::query("UPDATE sessions SET ended_at = ? WHERE id = ?")
76            .bind(&now)
77            .bind(session_id)
78            .execute(self.pool)
79            .await?
80            .rows_affected();
81
82        if affected == 0 {
83            return Err(ClawError::NotFound {
84                entity: "Session".to_string(),
85                id: session_id.to_string(),
86            });
87        }
88        Ok(())
89    }
90
91    /// Fetch a [`Session`] by its ID.
92    ///
93    /// # Errors
94    ///
95    /// Returns [`ClawError::NotFound`] if the session does not exist.
96    pub async fn get(&self, session_id: &str) -> ClawResult<Session> {
97        let row: Option<(String, String, Option<String>)> =
98            sqlx::query_as("SELECT id, started_at, ended_at FROM sessions WHERE id = ?")
99                .bind(session_id)
100                .fetch_optional(self.pool)
101                .await?;
102
103        let (id, started_at_str, ended_at_str) = row.ok_or_else(|| ClawError::NotFound {
104            entity: "Session".to_string(),
105            id: session_id.to_string(),
106        })?;
107
108        Ok(Session {
109            id,
110            started_at: DateTime::parse_from_rfc3339(&started_at_str)
111                .map_err(|e| ClawError::Store(e.to_string()))?
112                .with_timezone(&Utc),
113            ended_at: ended_at_str
114                .map(|s| {
115                    DateTime::parse_from_rfc3339(&s)
116                        .map(|dt| dt.with_timezone(&Utc))
117                        .map_err(|e| ClawError::Store(e.to_string()))
118                })
119                .transpose()?,
120        })
121    }
122
123    /// List all sessions, ordered by `started_at` descending (newest first).
124    ///
125    /// # Errors
126    ///
127    /// Returns a [`ClawError`] if the query fails.
128    pub async fn list(&self) -> ClawResult<Vec<Session>> {
129        let rows: Vec<(String, String, Option<String>)> = sqlx::query_as(
130            "SELECT id, started_at, ended_at FROM sessions ORDER BY started_at DESC",
131        )
132        .fetch_all(self.pool)
133        .await?;
134
135        rows.into_iter()
136            .map(|(id, started_at_str, ended_at_str)| {
137                Ok(Session {
138                    id,
139                    started_at: DateTime::parse_from_rfc3339(&started_at_str)
140                        .map_err(|e| ClawError::Store(e.to_string()))?
141                        .with_timezone(&Utc),
142                    ended_at: ended_at_str
143                        .map(|s| {
144                            DateTime::parse_from_rfc3339(&s)
145                                .map(|dt| dt.with_timezone(&Utc))
146                                .map_err(|e| ClawError::Store(e.to_string()))
147                        })
148                        .transpose()?,
149                })
150            })
151            .collect()
152    }
153
154    /// List sessions with keyset pagination.
155    ///
156    /// Sessions are ordered by `started_at` descending (newest first).  The
157    /// cursor is the `id` of the last session on the previous page.  Returns
158    /// `(records, next_cursor)` where `next_cursor` is `None` on the last
159    /// page.
160    ///
161    /// # Errors
162    ///
163    /// Returns a [`ClawError`] if the query fails.
164    pub async fn list_paginated(&self, opts: &ListOptions) -> ClawResult<ListPage<Session>> {
165        let limit = opts.validated_limit() as i64;
166        let fetch = limit.saturating_add(1);
167
168        let rows: Vec<(String, String, Option<String>)> = match &opts.cursor {
169            None => {
170                sqlx::query_as(
171                    "SELECT id, started_at, ended_at FROM sessions \
172                 ORDER BY started_at DESC, id DESC \
173                 LIMIT ?",
174                )
175                .bind(fetch)
176                .fetch_all(self.pool)
177                .await?
178            }
179            Some(cursor) => {
180                sqlx::query_as(
181                    "SELECT id, started_at, ended_at FROM sessions \
182                 WHERE (started_at, id) < \
183                     (SELECT started_at, id FROM sessions WHERE id = ?) \
184                 ORDER BY started_at DESC, id DESC \
185                 LIMIT ?",
186                )
187                .bind(cursor)
188                .bind(fetch)
189                .fetch_all(self.pool)
190                .await?
191            }
192        };
193
194        let has_more = rows.len() as i64 > limit;
195        let page = if has_more {
196            &rows[..limit as usize]
197        } else {
198            rows.as_slice()
199        };
200
201        let sessions: Vec<Session> = page
202            .iter()
203            .map(|(id, started_at_str, ended_at_str)| {
204                Ok(Session {
205                    id: id.clone(),
206                    started_at: DateTime::parse_from_rfc3339(started_at_str)
207                        .map_err(|e| ClawError::Store(e.to_string()))?
208                        .with_timezone(&Utc),
209                    ended_at: ended_at_str
210                        .as_deref()
211                        .map(|s| {
212                            DateTime::parse_from_rfc3339(s)
213                                .map(|dt| dt.with_timezone(&Utc))
214                                .map_err(|e| ClawError::Store(e.to_string()))
215                        })
216                        .transpose()?,
217                })
218            })
219            .collect::<ClawResult<_>>()?;
220
221        let next_cursor = if has_more {
222            sessions.last().map(|s| s.id.clone())
223        } else {
224            None
225        };
226        Ok(ListPage {
227            items: sessions,
228            next_cursor,
229        })
230    }
231}