Skip to main content

claw_core/store/
session_lifecycle.rs

1//! Session lifecycle store for claw-core.
2//!
3//! The `sessions` table records when agent sessions start and end. Unlike the
4//! `session_state` store (per-session key-value pairs), this store tracks the
5//! lifecycle of sessions themselves.
6
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use sqlx::SqlitePool;
10
11use crate::error::{ClawError, ClawResult};
12
13/// A session lifecycle record stored in the `sessions` table.
14///
15/// # Example
16///
17/// ```rust,no_run
18/// # use claw_core::ClawEngine;
19/// # async fn example() -> claw_core::ClawResult<()> {
20/// # let engine = ClawEngine::open_default().await?;
21/// let sid = engine.start_session().await?;
22/// let session = engine.get_session(&sid).await?;
23/// assert!(session.ended_at.is_none());
24/// # Ok(())
25/// # }
26/// ```
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct Session {
29    /// Unique session identifier (a UUID string).
30    pub id: String,
31    /// Timestamp when the session was started.
32    pub started_at: DateTime<Utc>,
33    /// Timestamp when the session was ended, or `None` if still active.
34    pub ended_at: Option<DateTime<Utc>>,
35}
36
37/// Data-access object for the `sessions` table.
38///
39/// Obtain an instance via [`SessionLifecycleStore::new`].
40#[derive(Debug)]
41pub struct SessionLifecycleStore<'a> {
42    pool: &'a SqlitePool,
43}
44
45impl<'a> SessionLifecycleStore<'a> {
46    /// Create a new [`SessionLifecycleStore`] bound to `pool`.
47    pub fn new(pool: &'a SqlitePool) -> Self {
48        SessionLifecycleStore { pool }
49    }
50
51    /// Start a new session and return its unique ID.
52    ///
53    /// # Errors
54    ///
55    /// Returns a [`ClawError`] if the SQL execution fails.
56    pub async fn start(&self) -> ClawResult<String> {
57        let id = uuid::Uuid::new_v4().to_string();
58        let now = Utc::now().to_rfc3339();
59        sqlx::query("INSERT INTO sessions (id, started_at) VALUES (?, ?)")
60            .bind(&id)
61            .bind(&now)
62            .execute(self.pool)
63            .await?;
64        Ok(id)
65    }
66
67    /// Mark a session as ended.
68    ///
69    /// # Errors
70    ///
71    /// Returns [`ClawError::NotFound`] if the session does not exist.
72    pub async fn end(&self, session_id: &str) -> ClawResult<()> {
73        let now = Utc::now().to_rfc3339();
74        let affected = sqlx::query("UPDATE sessions SET ended_at = ? WHERE id = ?")
75            .bind(&now)
76            .bind(session_id)
77            .execute(self.pool)
78            .await?
79            .rows_affected();
80
81        if affected == 0 {
82            return Err(ClawError::NotFound {
83                entity: "Session".to_string(),
84                id: session_id.to_string(),
85            });
86        }
87        Ok(())
88    }
89
90    /// Fetch a [`Session`] by its ID.
91    ///
92    /// # Errors
93    ///
94    /// Returns [`ClawError::NotFound`] if the session does not exist.
95    pub async fn get(&self, session_id: &str) -> ClawResult<Session> {
96        let row: Option<(String, String, Option<String>)> =
97            sqlx::query_as("SELECT id, started_at, ended_at FROM sessions WHERE id = ?")
98                .bind(session_id)
99                .fetch_optional(self.pool)
100                .await?;
101
102        let (id, started_at_str, ended_at_str) = row.ok_or_else(|| ClawError::NotFound {
103            entity: "Session".to_string(),
104            id: session_id.to_string(),
105        })?;
106
107        Ok(Session {
108            id,
109            started_at: DateTime::parse_from_rfc3339(&started_at_str)
110                .map_err(|e| ClawError::Store(e.to_string()))?
111                .with_timezone(&Utc),
112            ended_at: ended_at_str
113                .map(|s| {
114                    DateTime::parse_from_rfc3339(&s)
115                        .map(|dt| dt.with_timezone(&Utc))
116                        .map_err(|e| ClawError::Store(e.to_string()))
117                })
118                .transpose()?,
119        })
120    }
121
122    /// List all sessions, ordered by `started_at` descending (newest first).
123    ///
124    /// # Errors
125    ///
126    /// Returns a [`ClawError`] if the query fails.
127    pub async fn list(&self) -> ClawResult<Vec<Session>> {
128        let rows: Vec<(String, String, Option<String>)> = sqlx::query_as(
129            "SELECT id, started_at, ended_at FROM sessions ORDER BY started_at DESC",
130        )
131        .fetch_all(self.pool)
132        .await?;
133
134        rows.into_iter()
135            .map(|(id, started_at_str, ended_at_str)| {
136                Ok(Session {
137                    id,
138                    started_at: DateTime::parse_from_rfc3339(&started_at_str)
139                        .map_err(|e| ClawError::Store(e.to_string()))?
140                        .with_timezone(&Utc),
141                    ended_at: ended_at_str
142                        .map(|s| {
143                            DateTime::parse_from_rfc3339(&s)
144                                .map(|dt| dt.with_timezone(&Utc))
145                                .map_err(|e| ClawError::Store(e.to_string()))
146                        })
147                        .transpose()?,
148                })
149            })
150            .collect()
151    }
152
153    /// List sessions with keyset pagination.
154    ///
155    /// Sessions are ordered by `started_at` descending (newest first).  The
156    /// cursor is the `id` of the last session on the previous page.  Returns
157    /// `(records, next_cursor)` where `next_cursor` is `None` on the last
158    /// page.
159    ///
160    /// # Errors
161    ///
162    /// Returns a [`ClawError`] if the query fails.
163    pub async fn list_paginated(
164        &self,
165        opts: &crate::store::memory::ListOptions,
166    ) -> ClawResult<(Vec<Session>, Option<String>)> {
167        let limit = if opts.limit == 0 {
168            i64::MAX
169        } else {
170            opts.limit as i64
171        };
172        let fetch = limit.saturating_add(1);
173
174        let rows: Vec<(String, String, Option<String>)> = match &opts.cursor {
175            None => {
176                sqlx::query_as(
177                    "SELECT id, started_at, ended_at FROM sessions \
178                 ORDER BY started_at DESC, id DESC \
179                 LIMIT ?",
180                )
181                .bind(fetch)
182                .fetch_all(self.pool)
183                .await?
184            }
185            Some(cursor) => {
186                sqlx::query_as(
187                    "SELECT id, started_at, ended_at FROM sessions \
188                 WHERE (started_at, id) < \
189                     (SELECT started_at, id FROM sessions WHERE id = ?) \
190                 ORDER BY started_at DESC, id DESC \
191                 LIMIT ?",
192                )
193                .bind(cursor)
194                .bind(fetch)
195                .fetch_all(self.pool)
196                .await?
197            }
198        };
199
200        let has_more = rows.len() as i64 > limit;
201        let page = if has_more {
202            &rows[..limit as usize]
203        } else {
204            rows.as_slice()
205        };
206
207        let sessions: Vec<Session> = page
208            .iter()
209            .map(|(id, started_at_str, ended_at_str)| {
210                Ok(Session {
211                    id: id.clone(),
212                    started_at: DateTime::parse_from_rfc3339(started_at_str)
213                        .map_err(|e| ClawError::Store(e.to_string()))?
214                        .with_timezone(&Utc),
215                    ended_at: ended_at_str
216                        .as_deref()
217                        .map(|s| {
218                            DateTime::parse_from_rfc3339(s)
219                                .map(|dt| dt.with_timezone(&Utc))
220                                .map_err(|e| ClawError::Store(e.to_string()))
221                        })
222                        .transpose()?,
223                })
224            })
225            .collect::<ClawResult<_>>()?;
226
227        let next_cursor = if has_more {
228            sessions.last().map(|s| s.id.clone())
229        } else {
230            None
231        };
232        Ok((sessions, next_cursor))
233    }
234}