Skip to main content

zeph_memory/store/
agent_sessions.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! CRUD for the `agent_sessions` table used by the fleet dashboard (#3884).
5
6use serde::{Deserialize, Serialize};
7#[allow(unused_imports)]
8use zeph_db::sql;
9
10use super::SqliteStore;
11use crate::error::MemoryError;
12
13/// Discriminant for the type of agent session.
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
15#[serde(rename_all = "lowercase")]
16pub enum SessionKind {
17    /// An interactive conversation session (CLI or TUI).
18    Interactive,
19    /// An autonomous goal-directed session.
20    Autonomous,
21    /// An ACP (agent-to-agent communication protocol) session.
22    Acp,
23}
24
25impl SessionKind {
26    fn as_str(self) -> &'static str {
27        match self {
28            Self::Interactive => "interactive",
29            Self::Autonomous => "autonomous",
30            Self::Acp => "acp",
31        }
32    }
33}
34
35impl std::fmt::Display for SessionKind {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        f.write_str(self.as_str())
38    }
39}
40
41impl std::str::FromStr for SessionKind {
42    type Err = String;
43
44    fn from_str(s: &str) -> Result<Self, Self::Err> {
45        match s {
46            "interactive" => Ok(Self::Interactive),
47            "autonomous" => Ok(Self::Autonomous),
48            "acp" => Ok(Self::Acp),
49            other => Err(format!("unknown session kind: {other}")),
50        }
51    }
52}
53
54/// Lifecycle status of an agent session.
55#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
56#[serde(rename_all = "lowercase")]
57pub enum SessionStatus {
58    /// Session is currently running.
59    Active,
60    /// Session ended normally.
61    Completed,
62    /// Session ended due to an unrecoverable error.
63    Failed,
64    /// Session was cancelled by the user.
65    Cancelled,
66    /// Session was active at a previous startup and presumed crashed (no clean shutdown recorded).
67    Unknown,
68}
69
70impl SessionStatus {
71    fn as_str(self) -> &'static str {
72        match self {
73            Self::Active => "active",
74            Self::Completed => "completed",
75            Self::Failed => "failed",
76            Self::Cancelled => "cancelled",
77            Self::Unknown => "unknown",
78        }
79    }
80}
81
82impl std::fmt::Display for SessionStatus {
83    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84        f.write_str(self.as_str())
85    }
86}
87
88impl std::str::FromStr for SessionStatus {
89    type Err = String;
90
91    fn from_str(s: &str) -> Result<Self, Self::Err> {
92        match s {
93            "active" => Ok(Self::Active),
94            "completed" => Ok(Self::Completed),
95            "failed" => Ok(Self::Failed),
96            "cancelled" => Ok(Self::Cancelled),
97            "unknown" => Ok(Self::Unknown),
98            other => Err(format!("unknown session status: {other}")),
99        }
100    }
101}
102
103/// Channel over which the session was initiated.
104#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
105#[serde(rename_all = "lowercase")]
106pub enum SessionChannel {
107    Cli,
108    Tui,
109    Telegram,
110    Discord,
111    Slack,
112    Acp,
113}
114
115impl SessionChannel {
116    fn as_str(self) -> &'static str {
117        match self {
118            Self::Cli => "cli",
119            Self::Tui => "tui",
120            Self::Telegram => "telegram",
121            Self::Discord => "discord",
122            Self::Slack => "slack",
123            Self::Acp => "acp",
124        }
125    }
126}
127
128impl std::fmt::Display for SessionChannel {
129    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130        f.write_str(self.as_str())
131    }
132}
133
134impl std::str::FromStr for SessionChannel {
135    type Err = String;
136
137    fn from_str(s: &str) -> Result<Self, Self::Err> {
138        match s {
139            "cli" => Ok(Self::Cli),
140            "tui" => Ok(Self::Tui),
141            "telegram" => Ok(Self::Telegram),
142            "discord" => Ok(Self::Discord),
143            "slack" => Ok(Self::Slack),
144            "acp" => Ok(Self::Acp),
145            other => Err(format!("unknown session channel: {other}")),
146        }
147    }
148}
149
150/// A row from the `agent_sessions` table.
151///
152/// Each field maps directly to a column. Token fields are informational; `reasoning_tokens`
153/// is a subset of `completion_tokens` and must **not** be added separately to cost.
154#[derive(Debug, Clone)]
155pub struct AgentSessionRow {
156    /// Stable session identifier (UUID).
157    pub id: String,
158    /// Session type discriminant.
159    pub kind: SessionKind,
160    /// Current lifecycle state.
161    pub status: SessionStatus,
162    /// Channel over which the session runs.
163    pub channel: SessionChannel,
164    /// LLM model name (e.g. `claude-sonnet-4-6`).
165    pub model: String,
166    /// ISO-8601 creation timestamp (UTC).
167    pub created_at: String,
168    /// ISO-8601 timestamp of the most recent agent turn (UTC).
169    pub last_active_at: String,
170    /// Number of completed agent turns.
171    pub turns: u32,
172    /// Prompt (input) tokens consumed across all turns.
173    pub prompt_tokens: u64,
174    /// Completion (output) tokens generated across all turns.
175    pub completion_tokens: u64,
176    /// Reasoning tokens (subset of `completion_tokens`; `OpenAI` only, others are 0).
177    pub reasoning_tokens: u64,
178    /// Estimated session cost in US cents.
179    pub cost_cents: f64,
180    /// Goal description for autonomous sessions; `None` for interactive sessions.
181    pub goal_text: Option<String>,
182}
183
184impl SqliteStore {
185    /// Insert or update an agent session row.
186    ///
187    /// On conflict on `id` the row is updated with the latest field values.
188    ///
189    /// # Errors
190    ///
191    /// Returns an error if the database write fails.
192    #[tracing::instrument(name = "memory.fleet.upsert_session", skip_all, level = "debug", err)]
193    pub async fn upsert_agent_session(&self, s: &AgentSessionRow) -> Result<(), MemoryError> {
194        zeph_db::query(
195            "INSERT INTO agent_sessions \
196             (id, kind, status, channel, model, created_at, last_active_at, \
197              turns, prompt_tokens, completion_tokens, reasoning_tokens, cost_cents, goal_text) \
198             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) \
199             ON CONFLICT(id) DO UPDATE SET \
200               kind = excluded.kind, \
201               status = excluded.status, \
202               channel = excluded.channel, \
203               model = excluded.model, \
204               last_active_at = excluded.last_active_at, \
205               turns = excluded.turns, \
206               prompt_tokens = excluded.prompt_tokens, \
207               completion_tokens = excluded.completion_tokens, \
208               reasoning_tokens = excluded.reasoning_tokens, \
209               cost_cents = excluded.cost_cents, \
210               goal_text = excluded.goal_text",
211        )
212        .bind(&s.id)
213        .bind(s.kind.as_str())
214        .bind(s.status.as_str())
215        .bind(s.channel.as_str())
216        .bind(&s.model)
217        .bind(&s.created_at)
218        .bind(&s.last_active_at)
219        .bind(s.turns)
220        .bind(s.prompt_tokens.cast_signed())
221        .bind(s.completion_tokens.cast_signed())
222        .bind(s.reasoning_tokens.cast_signed())
223        .bind(s.cost_cents)
224        .bind(&s.goal_text)
225        .execute(&self.pool)
226        .await?;
227        Ok(())
228    }
229
230    /// Update the status of a session by its ID.
231    ///
232    /// # Errors
233    ///
234    /// Returns an error if the database write fails.
235    #[tracing::instrument(
236        name = "memory.fleet.update_agent_session_status",
237        skip_all,
238        level = "debug",
239        err
240    )]
241    pub async fn update_agent_session_status(
242        &self,
243        id: &str,
244        status: SessionStatus,
245    ) -> Result<(), MemoryError> {
246        zeph_db::query(
247            "UPDATE agent_sessions SET status = ?, last_active_at = datetime('now') WHERE id = ?",
248        )
249        .bind(status.as_str())
250        .bind(id)
251        .execute(&self.pool)
252        .await?;
253        Ok(())
254    }
255
256    /// Mark all sessions that are still `active` as `unknown` (crashed), except
257    /// for the current session.
258    ///
259    /// Called on startup to reconcile stale sessions from a previous unclean shutdown.
260    ///
261    /// # Errors
262    ///
263    /// Returns an error if the database write fails.
264    #[tracing::instrument(
265        name = "memory.fleet.reconcile_stale_sessions",
266        skip_all,
267        level = "debug",
268        err
269    )]
270    pub async fn reconcile_stale_sessions(
271        &self,
272        current_session_id: &str,
273    ) -> Result<u64, MemoryError> {
274        let result = zeph_db::query(
275            "UPDATE agent_sessions SET status = 'unknown' \
276             WHERE status = 'active' AND id != ?",
277        )
278        .bind(current_session_id)
279        .execute(&self.pool)
280        .await?;
281        Ok(result.rows_affected())
282    }
283
284    /// List agent sessions ordered by `last_active_at` descending.
285    ///
286    /// Pass `status_filter = Some(status)` to restrict results to that lifecycle state.
287    /// Pass `limit = 0` for unlimited results.
288    ///
289    /// # Errors
290    ///
291    /// Returns an error if the database query fails.
292    #[tracing::instrument(name = "memory.fleet.list_sessions", skip_all, level = "debug", err)]
293    pub async fn list_agent_sessions(
294        &self,
295        limit: u32,
296        status_filter: Option<SessionStatus>,
297    ) -> Result<Vec<AgentSessionRow>, MemoryError> {
298        #[allow(clippy::cast_possible_wrap)]
299        let sql_limit: i64 = if limit == 0 { -1 } else { i64::from(limit) };
300
301        type SessionRow = (
302            String,
303            String,
304            String,
305            String,
306            String,
307            String,
308            String,
309            i64,
310            i64,
311            i64,
312            i64,
313            f64,
314            Option<String>,
315        );
316        let rows: Vec<SessionRow> = if let Some(sf) = status_filter {
317            zeph_db::query_as(
318                "SELECT id, kind, status, channel, model, created_at, last_active_at, \
319                 turns, prompt_tokens, completion_tokens, reasoning_tokens, cost_cents, goal_text \
320                 FROM agent_sessions WHERE status = ? \
321                 ORDER BY last_active_at DESC LIMIT ?",
322            )
323            .bind(sf.as_str())
324            .bind(sql_limit)
325            .fetch_all(&self.pool)
326            .await?
327        } else {
328            zeph_db::query_as(
329                "SELECT id, kind, status, channel, model, created_at, last_active_at, \
330                 turns, prompt_tokens, completion_tokens, reasoning_tokens, cost_cents, goal_text \
331                 FROM agent_sessions \
332                 ORDER BY last_active_at DESC LIMIT ?",
333            )
334            .bind(sql_limit)
335            .fetch_all(&self.pool)
336            .await?
337        };
338
339        Ok(rows
340            .into_iter()
341            .map(
342                |(
343                    id,
344                    kind_s,
345                    status_s,
346                    channel_s,
347                    model,
348                    created_at,
349                    last_active_at,
350                    turns,
351                    prompt_tokens,
352                    completion_tokens,
353                    reasoning_tokens,
354                    cost_cents,
355                    goal_text,
356                )| {
357                    AgentSessionRow {
358                        id,
359                        kind: kind_s.parse().unwrap_or(SessionKind::Interactive),
360                        status: status_s.parse().unwrap_or(SessionStatus::Unknown),
361                        channel: channel_s.parse().unwrap_or(SessionChannel::Cli),
362                        model,
363                        created_at,
364                        last_active_at,
365                        turns: u32::try_from(turns).unwrap_or(0),
366                        prompt_tokens: u64::try_from(prompt_tokens).unwrap_or(0),
367                        completion_tokens: u64::try_from(completion_tokens).unwrap_or(0),
368                        reasoning_tokens: u64::try_from(reasoning_tokens).unwrap_or(0),
369                        cost_cents,
370                        goal_text,
371                    }
372                },
373            )
374            .collect())
375    }
376}
377
378#[cfg(test)]
379mod tests {
380    use super::*;
381
382    async fn make_store() -> SqliteStore {
383        SqliteStore::new(":memory:")
384            .await
385            .expect("SqliteStore::new")
386    }
387
388    fn sample(id: &str) -> AgentSessionRow {
389        AgentSessionRow {
390            id: id.to_owned(),
391            kind: SessionKind::Interactive,
392            status: SessionStatus::Active,
393            channel: SessionChannel::Cli,
394            model: "claude-sonnet-4-6".to_owned(),
395            created_at: "2026-01-01T00:00:00".to_owned(),
396            last_active_at: "2026-01-01T00:00:00".to_owned(),
397            turns: 0,
398            prompt_tokens: 0,
399            completion_tokens: 0,
400            reasoning_tokens: 0,
401            cost_cents: 0.0,
402            goal_text: None,
403        }
404    }
405
406    #[tokio::test]
407    async fn upsert_and_list() {
408        let store = make_store().await;
409        store.upsert_agent_session(&sample("s1")).await.unwrap();
410        let rows = store.list_agent_sessions(10, None).await.unwrap();
411        assert_eq!(rows.len(), 1);
412        assert_eq!(rows[0].id, "s1");
413        assert_eq!(rows[0].kind, SessionKind::Interactive);
414        assert_eq!(rows[0].status, SessionStatus::Active);
415    }
416
417    #[tokio::test]
418    async fn upsert_updates_existing() {
419        let store = make_store().await;
420        store.upsert_agent_session(&sample("s1")).await.unwrap();
421        let mut updated = sample("s1");
422        updated.turns = 5;
423        updated.status = SessionStatus::Completed;
424        store.upsert_agent_session(&updated).await.unwrap();
425
426        let rows = store.list_agent_sessions(10, None).await.unwrap();
427        assert_eq!(rows.len(), 1);
428        assert_eq!(rows[0].turns, 5);
429        assert_eq!(rows[0].status, SessionStatus::Completed);
430    }
431
432    #[tokio::test]
433    async fn update_status() {
434        let store = make_store().await;
435        store.upsert_agent_session(&sample("s1")).await.unwrap();
436        store
437            .update_agent_session_status("s1", SessionStatus::Failed)
438            .await
439            .unwrap();
440        let rows = store.list_agent_sessions(10, None).await.unwrap();
441        assert_eq!(rows[0].status, SessionStatus::Failed);
442    }
443
444    #[tokio::test]
445    async fn reconcile_stale_sessions() {
446        let store = make_store().await;
447        store.upsert_agent_session(&sample("s1")).await.unwrap();
448        store.upsert_agent_session(&sample("s2")).await.unwrap();
449        let affected = store.reconcile_stale_sessions("s2").await.unwrap();
450        assert_eq!(affected, 1);
451
452        let rows = store.list_agent_sessions(10, None).await.unwrap();
453        let s1 = rows.iter().find(|r| r.id == "s1").unwrap();
454        let s2 = rows.iter().find(|r| r.id == "s2").unwrap();
455        assert_eq!(s1.status, SessionStatus::Unknown);
456        assert_eq!(s2.status, SessionStatus::Active);
457    }
458
459    #[tokio::test]
460    async fn list_with_status_filter() {
461        let store = make_store().await;
462        store.upsert_agent_session(&sample("s1")).await.unwrap();
463        let mut s2 = sample("s2");
464        s2.status = SessionStatus::Completed;
465        store.upsert_agent_session(&s2).await.unwrap();
466
467        let active = store
468            .list_agent_sessions(10, Some(SessionStatus::Active))
469            .await
470            .unwrap();
471        assert_eq!(active.len(), 1);
472        assert_eq!(active[0].id, "s1");
473    }
474
475    #[tokio::test]
476    async fn list_respects_limit() {
477        let store = make_store().await;
478        for i in 0..5u8 {
479            store
480                .upsert_agent_session(&sample(&format!("s{i}")))
481                .await
482                .unwrap();
483        }
484        let rows = store.list_agent_sessions(3, None).await.unwrap();
485        assert_eq!(rows.len(), 3);
486    }
487
488    #[tokio::test]
489    async fn session_kind_roundtrip() {
490        use std::str::FromStr as _;
491        assert_eq!(
492            SessionKind::from_str("autonomous").unwrap(),
493            SessionKind::Autonomous
494        );
495        assert!(SessionKind::from_str("bad").is_err());
496    }
497
498    #[tokio::test]
499    async fn session_status_unknown_roundtrip() {
500        use std::str::FromStr as _;
501        assert_eq!(
502            SessionStatus::from_str("unknown").unwrap(),
503            SessionStatus::Unknown
504        );
505    }
506}