Skip to main content

zeph_memory/store/
agent_sessions.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! CRUD for the `agent_sessions` table used by the fleet dashboard (#3884).
5
6use serde::{Deserialize, Serialize};
7#[allow(unused_imports)]
8use zeph_db::sql;
9
10use super::SqliteStore;
11use crate::error::MemoryError;
12
13/// Discriminant for the type of agent session.
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
15#[serde(rename_all = "lowercase")]
16#[non_exhaustive]
17pub enum SessionKind {
18    /// An interactive conversation session (CLI or TUI).
19    Interactive,
20    /// An autonomous goal-directed session.
21    Autonomous,
22    /// An ACP (agent-to-agent communication protocol) session.
23    Acp,
24}
25
26impl SessionKind {
27    fn as_str(self) -> &'static str {
28        match self {
29            Self::Interactive => "interactive",
30            Self::Autonomous => "autonomous",
31            Self::Acp => "acp",
32        }
33    }
34}
35
36impl std::fmt::Display for SessionKind {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        f.write_str(self.as_str())
39    }
40}
41
42impl std::str::FromStr for SessionKind {
43    type Err = String;
44
45    fn from_str(s: &str) -> Result<Self, Self::Err> {
46        match s {
47            "interactive" => Ok(Self::Interactive),
48            "autonomous" => Ok(Self::Autonomous),
49            "acp" => Ok(Self::Acp),
50            other => Err(format!("unknown session kind: {other}")),
51        }
52    }
53}
54
55/// Lifecycle status of an agent session.
56#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
57#[serde(rename_all = "lowercase")]
58#[non_exhaustive]
59pub enum SessionStatus {
60    /// Session is currently running.
61    Active,
62    /// Session ended normally.
63    Completed,
64    /// Session ended due to an unrecoverable error.
65    Failed,
66    /// Session was cancelled by the user.
67    Cancelled,
68    /// Session was active at a previous startup and presumed crashed (no clean shutdown recorded).
69    Unknown,
70}
71
72impl SessionStatus {
73    fn as_str(self) -> &'static str {
74        match self {
75            Self::Active => "active",
76            Self::Completed => "completed",
77            Self::Failed => "failed",
78            Self::Cancelled => "cancelled",
79            Self::Unknown => "unknown",
80        }
81    }
82}
83
84impl std::fmt::Display for SessionStatus {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        f.write_str(self.as_str())
87    }
88}
89
90impl std::str::FromStr for SessionStatus {
91    type Err = String;
92
93    fn from_str(s: &str) -> Result<Self, Self::Err> {
94        match s {
95            "active" => Ok(Self::Active),
96            "completed" => Ok(Self::Completed),
97            "failed" => Ok(Self::Failed),
98            "cancelled" => Ok(Self::Cancelled),
99            "unknown" => Ok(Self::Unknown),
100            other => Err(format!("unknown session status: {other}")),
101        }
102    }
103}
104
105/// Channel over which the session was initiated.
106#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
107#[serde(rename_all = "lowercase")]
108#[non_exhaustive]
109pub enum SessionChannel {
110    Cli,
111    Tui,
112    Telegram,
113    Discord,
114    Slack,
115    Acp,
116}
117
118impl SessionChannel {
119    fn as_str(self) -> &'static str {
120        match self {
121            Self::Cli => "cli",
122            Self::Tui => "tui",
123            Self::Telegram => "telegram",
124            Self::Discord => "discord",
125            Self::Slack => "slack",
126            Self::Acp => "acp",
127        }
128    }
129}
130
131impl std::fmt::Display for SessionChannel {
132    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133        f.write_str(self.as_str())
134    }
135}
136
137impl std::str::FromStr for SessionChannel {
138    type Err = String;
139
140    fn from_str(s: &str) -> Result<Self, Self::Err> {
141        match s {
142            "cli" => Ok(Self::Cli),
143            "tui" => Ok(Self::Tui),
144            "telegram" => Ok(Self::Telegram),
145            "discord" => Ok(Self::Discord),
146            "slack" => Ok(Self::Slack),
147            "acp" => Ok(Self::Acp),
148            other => Err(format!("unknown session channel: {other}")),
149        }
150    }
151}
152
153/// A row from the `agent_sessions` table.
154///
155/// Each field maps directly to a column. Token fields are informational; `reasoning_tokens`
156/// is a subset of `completion_tokens` and must **not** be added separately to cost.
157#[derive(Debug, Clone)]
158pub struct AgentSessionRow {
159    /// Stable session identifier (UUID).
160    pub id: String,
161    /// Session type discriminant.
162    pub kind: SessionKind,
163    /// Current lifecycle state.
164    pub status: SessionStatus,
165    /// Channel over which the session runs.
166    pub channel: SessionChannel,
167    /// LLM model name (e.g. `claude-sonnet-4-6`).
168    pub model: String,
169    /// ISO-8601 creation timestamp (UTC).
170    pub created_at: String,
171    /// ISO-8601 timestamp of the most recent agent turn (UTC).
172    pub last_active_at: String,
173    /// Number of completed agent turns.
174    pub turns: u32,
175    /// Prompt (input) tokens consumed across all turns.
176    pub prompt_tokens: u64,
177    /// Completion (output) tokens generated across all turns.
178    pub completion_tokens: u64,
179    /// Reasoning tokens (subset of `completion_tokens`; `OpenAI` only, others are 0).
180    pub reasoning_tokens: u64,
181    /// Estimated session cost in US cents.
182    pub cost_cents: f64,
183    /// Goal description for autonomous sessions; `None` for interactive sessions.
184    pub goal_text: Option<String>,
185}
186
187impl SqliteStore {
188    /// Insert or update an agent session row.
189    ///
190    /// On conflict on `id` the row is updated with the latest field values.
191    ///
192    /// # Errors
193    ///
194    /// Returns an error if the database write fails.
195    #[tracing::instrument(name = "memory.fleet.upsert_session", skip_all, level = "debug", err)]
196    pub async fn upsert_agent_session(&self, s: &AgentSessionRow) -> Result<(), MemoryError> {
197        zeph_db::query(
198            "INSERT INTO agent_sessions \
199             (id, kind, status, channel, model, created_at, last_active_at, \
200              turns, prompt_tokens, completion_tokens, reasoning_tokens, cost_cents, goal_text) \
201             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) \
202             ON CONFLICT(id) DO UPDATE SET \
203               kind = excluded.kind, \
204               status = excluded.status, \
205               channel = excluded.channel, \
206               model = excluded.model, \
207               last_active_at = excluded.last_active_at, \
208               turns = excluded.turns, \
209               prompt_tokens = excluded.prompt_tokens, \
210               completion_tokens = excluded.completion_tokens, \
211               reasoning_tokens = excluded.reasoning_tokens, \
212               cost_cents = excluded.cost_cents, \
213               goal_text = excluded.goal_text",
214        )
215        .bind(&s.id)
216        .bind(s.kind.as_str())
217        .bind(s.status.as_str())
218        .bind(s.channel.as_str())
219        .bind(&s.model)
220        .bind(&s.created_at)
221        .bind(&s.last_active_at)
222        .bind(s.turns)
223        .bind(s.prompt_tokens.cast_signed())
224        .bind(s.completion_tokens.cast_signed())
225        .bind(s.reasoning_tokens.cast_signed())
226        .bind(s.cost_cents)
227        .bind(&s.goal_text)
228        .execute(&self.pool)
229        .await?;
230        Ok(())
231    }
232
233    /// Update the status of a session by its ID.
234    ///
235    /// # Errors
236    ///
237    /// Returns an error if the database write fails.
238    #[tracing::instrument(
239        name = "memory.fleet.update_agent_session_status",
240        skip_all,
241        level = "debug",
242        err
243    )]
244    pub async fn update_agent_session_status(
245        &self,
246        id: &str,
247        status: SessionStatus,
248    ) -> Result<(), MemoryError> {
249        zeph_db::query(
250            "UPDATE agent_sessions SET status = ?, last_active_at = datetime('now') WHERE id = ?",
251        )
252        .bind(status.as_str())
253        .bind(id)
254        .execute(&self.pool)
255        .await?;
256        Ok(())
257    }
258
259    /// Mark all sessions that are still `active` as `unknown` (crashed), except
260    /// for the current session.
261    ///
262    /// Called on startup to reconcile stale sessions from a previous unclean shutdown.
263    ///
264    /// # Errors
265    ///
266    /// Returns an error if the database write fails.
267    #[tracing::instrument(
268        name = "memory.fleet.reconcile_stale_sessions",
269        skip_all,
270        level = "debug",
271        err
272    )]
273    pub async fn reconcile_stale_sessions(
274        &self,
275        current_session_id: &str,
276    ) -> Result<u64, MemoryError> {
277        let result = zeph_db::query(
278            "UPDATE agent_sessions SET status = 'unknown' \
279             WHERE status = 'active' AND id != ?",
280        )
281        .bind(current_session_id)
282        .execute(&self.pool)
283        .await?;
284        Ok(result.rows_affected())
285    }
286
287    /// List agent sessions ordered by `last_active_at` descending.
288    ///
289    /// Pass `status_filter = Some(status)` to restrict results to that lifecycle state.
290    /// Pass `limit = 0` for unlimited results.
291    ///
292    /// # Errors
293    ///
294    /// Returns an error if the database query fails.
295    #[tracing::instrument(name = "memory.fleet.list_sessions", skip_all, level = "debug", err)]
296    pub async fn list_agent_sessions(
297        &self,
298        limit: u32,
299        status_filter: Option<SessionStatus>,
300    ) -> Result<Vec<AgentSessionRow>, MemoryError> {
301        #[allow(clippy::cast_possible_wrap)]
302        let sql_limit: i64 = if limit == 0 { -1 } else { i64::from(limit) };
303
304        type SessionRow = (
305            String,
306            String,
307            String,
308            String,
309            String,
310            String,
311            String,
312            i64,
313            i64,
314            i64,
315            i64,
316            f64,
317            Option<String>,
318        );
319        let rows: Vec<SessionRow> = if let Some(sf) = status_filter {
320            zeph_db::query_as(
321                "SELECT id, kind, status, channel, model, created_at, last_active_at, \
322                 turns, prompt_tokens, completion_tokens, reasoning_tokens, cost_cents, goal_text \
323                 FROM agent_sessions WHERE status = ? \
324                 ORDER BY last_active_at DESC LIMIT ?",
325            )
326            .bind(sf.as_str())
327            .bind(sql_limit)
328            .fetch_all(&self.pool)
329            .await?
330        } else {
331            zeph_db::query_as(
332                "SELECT id, kind, status, channel, model, created_at, last_active_at, \
333                 turns, prompt_tokens, completion_tokens, reasoning_tokens, cost_cents, goal_text \
334                 FROM agent_sessions \
335                 ORDER BY last_active_at DESC LIMIT ?",
336            )
337            .bind(sql_limit)
338            .fetch_all(&self.pool)
339            .await?
340        };
341
342        Ok(rows
343            .into_iter()
344            .map(
345                |(
346                    id,
347                    kind_s,
348                    status_s,
349                    channel_s,
350                    model,
351                    created_at,
352                    last_active_at,
353                    turns,
354                    prompt_tokens,
355                    completion_tokens,
356                    reasoning_tokens,
357                    cost_cents,
358                    goal_text,
359                )| {
360                    AgentSessionRow {
361                        id,
362                        kind: kind_s.parse().unwrap_or(SessionKind::Interactive),
363                        status: status_s.parse().unwrap_or(SessionStatus::Unknown),
364                        channel: channel_s.parse().unwrap_or(SessionChannel::Cli),
365                        model,
366                        created_at,
367                        last_active_at,
368                        turns: u32::try_from(turns).unwrap_or(0),
369                        prompt_tokens: u64::try_from(prompt_tokens).unwrap_or(0),
370                        completion_tokens: u64::try_from(completion_tokens).unwrap_or(0),
371                        reasoning_tokens: u64::try_from(reasoning_tokens).unwrap_or(0),
372                        cost_cents,
373                        goal_text,
374                    }
375                },
376            )
377            .collect())
378    }
379}
380
381#[cfg(test)]
382mod tests {
383    use super::*;
384
385    async fn make_store() -> SqliteStore {
386        SqliteStore::new(":memory:")
387            .await
388            .expect("SqliteStore::new")
389    }
390
391    fn sample(id: &str) -> AgentSessionRow {
392        AgentSessionRow {
393            id: id.to_owned(),
394            kind: SessionKind::Interactive,
395            status: SessionStatus::Active,
396            channel: SessionChannel::Cli,
397            model: "claude-sonnet-4-6".to_owned(),
398            created_at: "2026-01-01T00:00:00".to_owned(),
399            last_active_at: "2026-01-01T00:00:00".to_owned(),
400            turns: 0,
401            prompt_tokens: 0,
402            completion_tokens: 0,
403            reasoning_tokens: 0,
404            cost_cents: 0.0,
405            goal_text: None,
406        }
407    }
408
409    #[tokio::test]
410    async fn upsert_and_list() {
411        let store = make_store().await;
412        store.upsert_agent_session(&sample("s1")).await.unwrap();
413        let rows = store.list_agent_sessions(10, None).await.unwrap();
414        assert_eq!(rows.len(), 1);
415        assert_eq!(rows[0].id, "s1");
416        assert_eq!(rows[0].kind, SessionKind::Interactive);
417        assert_eq!(rows[0].status, SessionStatus::Active);
418    }
419
420    #[tokio::test]
421    async fn upsert_updates_existing() {
422        let store = make_store().await;
423        store.upsert_agent_session(&sample("s1")).await.unwrap();
424        let mut updated = sample("s1");
425        updated.turns = 5;
426        updated.status = SessionStatus::Completed;
427        store.upsert_agent_session(&updated).await.unwrap();
428
429        let rows = store.list_agent_sessions(10, None).await.unwrap();
430        assert_eq!(rows.len(), 1);
431        assert_eq!(rows[0].turns, 5);
432        assert_eq!(rows[0].status, SessionStatus::Completed);
433    }
434
435    #[tokio::test]
436    async fn update_status() {
437        let store = make_store().await;
438        store.upsert_agent_session(&sample("s1")).await.unwrap();
439        store
440            .update_agent_session_status("s1", SessionStatus::Failed)
441            .await
442            .unwrap();
443        let rows = store.list_agent_sessions(10, None).await.unwrap();
444        assert_eq!(rows[0].status, SessionStatus::Failed);
445    }
446
447    #[tokio::test]
448    async fn reconcile_stale_sessions() {
449        let store = make_store().await;
450        store.upsert_agent_session(&sample("s1")).await.unwrap();
451        store.upsert_agent_session(&sample("s2")).await.unwrap();
452        let affected = store.reconcile_stale_sessions("s2").await.unwrap();
453        assert_eq!(affected, 1);
454
455        let rows = store.list_agent_sessions(10, None).await.unwrap();
456        let s1 = rows.iter().find(|r| r.id == "s1").unwrap();
457        let s2 = rows.iter().find(|r| r.id == "s2").unwrap();
458        assert_eq!(s1.status, SessionStatus::Unknown);
459        assert_eq!(s2.status, SessionStatus::Active);
460    }
461
462    #[tokio::test]
463    async fn list_with_status_filter() {
464        let store = make_store().await;
465        store.upsert_agent_session(&sample("s1")).await.unwrap();
466        let mut s2 = sample("s2");
467        s2.status = SessionStatus::Completed;
468        store.upsert_agent_session(&s2).await.unwrap();
469
470        let active = store
471            .list_agent_sessions(10, Some(SessionStatus::Active))
472            .await
473            .unwrap();
474        assert_eq!(active.len(), 1);
475        assert_eq!(active[0].id, "s1");
476    }
477
478    #[tokio::test]
479    async fn list_respects_limit() {
480        let store = make_store().await;
481        for i in 0..5u8 {
482            store
483                .upsert_agent_session(&sample(&format!("s{i}")))
484                .await
485                .unwrap();
486        }
487        let rows = store.list_agent_sessions(3, None).await.unwrap();
488        assert_eq!(rows.len(), 3);
489    }
490
491    #[tokio::test]
492    async fn session_kind_roundtrip() {
493        use std::str::FromStr as _;
494        assert_eq!(
495            SessionKind::from_str("autonomous").unwrap(),
496            SessionKind::Autonomous
497        );
498        assert!(SessionKind::from_str("bad").is_err());
499    }
500
501    #[tokio::test]
502    async fn session_status_unknown_roundtrip() {
503        use std::str::FromStr as _;
504        assert_eq!(
505            SessionStatus::from_str("unknown").unwrap(),
506            SessionStatus::Unknown
507        );
508    }
509}