1use serde::{Deserialize, Serialize};
7#[allow(unused_imports)]
8use zeph_db::sql;
9
10use super::SqliteStore;
11use crate::error::MemoryError;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
15#[serde(rename_all = "lowercase")]
16pub enum SessionKind {
17 Interactive,
19 Autonomous,
21 Acp,
23}
24
25impl SessionKind {
26 fn as_str(self) -> &'static str {
27 match self {
28 Self::Interactive => "interactive",
29 Self::Autonomous => "autonomous",
30 Self::Acp => "acp",
31 }
32 }
33}
34
35impl std::fmt::Display for SessionKind {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 f.write_str(self.as_str())
38 }
39}
40
41impl std::str::FromStr for SessionKind {
42 type Err = String;
43
44 fn from_str(s: &str) -> Result<Self, Self::Err> {
45 match s {
46 "interactive" => Ok(Self::Interactive),
47 "autonomous" => Ok(Self::Autonomous),
48 "acp" => Ok(Self::Acp),
49 other => Err(format!("unknown session kind: {other}")),
50 }
51 }
52}
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
56#[serde(rename_all = "lowercase")]
57pub enum SessionStatus {
58 Active,
60 Completed,
62 Failed,
64 Cancelled,
66 Unknown,
68}
69
70impl SessionStatus {
71 fn as_str(self) -> &'static str {
72 match self {
73 Self::Active => "active",
74 Self::Completed => "completed",
75 Self::Failed => "failed",
76 Self::Cancelled => "cancelled",
77 Self::Unknown => "unknown",
78 }
79 }
80}
81
82impl std::fmt::Display for SessionStatus {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 f.write_str(self.as_str())
85 }
86}
87
88impl std::str::FromStr for SessionStatus {
89 type Err = String;
90
91 fn from_str(s: &str) -> Result<Self, Self::Err> {
92 match s {
93 "active" => Ok(Self::Active),
94 "completed" => Ok(Self::Completed),
95 "failed" => Ok(Self::Failed),
96 "cancelled" => Ok(Self::Cancelled),
97 "unknown" => Ok(Self::Unknown),
98 other => Err(format!("unknown session status: {other}")),
99 }
100 }
101}
102
103#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
105#[serde(rename_all = "lowercase")]
106pub enum SessionChannel {
107 Cli,
108 Tui,
109 Telegram,
110 Discord,
111 Slack,
112 Acp,
113}
114
115impl SessionChannel {
116 fn as_str(self) -> &'static str {
117 match self {
118 Self::Cli => "cli",
119 Self::Tui => "tui",
120 Self::Telegram => "telegram",
121 Self::Discord => "discord",
122 Self::Slack => "slack",
123 Self::Acp => "acp",
124 }
125 }
126}
127
128impl std::fmt::Display for SessionChannel {
129 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130 f.write_str(self.as_str())
131 }
132}
133
134impl std::str::FromStr for SessionChannel {
135 type Err = String;
136
137 fn from_str(s: &str) -> Result<Self, Self::Err> {
138 match s {
139 "cli" => Ok(Self::Cli),
140 "tui" => Ok(Self::Tui),
141 "telegram" => Ok(Self::Telegram),
142 "discord" => Ok(Self::Discord),
143 "slack" => Ok(Self::Slack),
144 "acp" => Ok(Self::Acp),
145 other => Err(format!("unknown session channel: {other}")),
146 }
147 }
148}
149
150#[derive(Debug, Clone)]
155pub struct AgentSessionRow {
156 pub id: String,
158 pub kind: SessionKind,
160 pub status: SessionStatus,
162 pub channel: SessionChannel,
164 pub model: String,
166 pub created_at: String,
168 pub last_active_at: String,
170 pub turns: u32,
172 pub prompt_tokens: u64,
174 pub completion_tokens: u64,
176 pub reasoning_tokens: u64,
178 pub cost_cents: f64,
180 pub goal_text: Option<String>,
182}
183
184impl SqliteStore {
185 #[tracing::instrument(name = "memory.fleet.upsert_session", skip_all, level = "debug", err)]
193 pub async fn upsert_agent_session(&self, s: &AgentSessionRow) -> Result<(), MemoryError> {
194 zeph_db::query(
195 "INSERT INTO agent_sessions \
196 (id, kind, status, channel, model, created_at, last_active_at, \
197 turns, prompt_tokens, completion_tokens, reasoning_tokens, cost_cents, goal_text) \
198 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) \
199 ON CONFLICT(id) DO UPDATE SET \
200 kind = excluded.kind, \
201 status = excluded.status, \
202 channel = excluded.channel, \
203 model = excluded.model, \
204 last_active_at = excluded.last_active_at, \
205 turns = excluded.turns, \
206 prompt_tokens = excluded.prompt_tokens, \
207 completion_tokens = excluded.completion_tokens, \
208 reasoning_tokens = excluded.reasoning_tokens, \
209 cost_cents = excluded.cost_cents, \
210 goal_text = excluded.goal_text",
211 )
212 .bind(&s.id)
213 .bind(s.kind.as_str())
214 .bind(s.status.as_str())
215 .bind(s.channel.as_str())
216 .bind(&s.model)
217 .bind(&s.created_at)
218 .bind(&s.last_active_at)
219 .bind(s.turns)
220 .bind(s.prompt_tokens.cast_signed())
221 .bind(s.completion_tokens.cast_signed())
222 .bind(s.reasoning_tokens.cast_signed())
223 .bind(s.cost_cents)
224 .bind(&s.goal_text)
225 .execute(&self.pool)
226 .await?;
227 Ok(())
228 }
229
230 #[tracing::instrument(
236 name = "memory.fleet.update_agent_session_status",
237 skip_all,
238 level = "debug",
239 err
240 )]
241 pub async fn update_agent_session_status(
242 &self,
243 id: &str,
244 status: SessionStatus,
245 ) -> Result<(), MemoryError> {
246 zeph_db::query(
247 "UPDATE agent_sessions SET status = ?, last_active_at = datetime('now') WHERE id = ?",
248 )
249 .bind(status.as_str())
250 .bind(id)
251 .execute(&self.pool)
252 .await?;
253 Ok(())
254 }
255
256 #[tracing::instrument(
265 name = "memory.fleet.reconcile_stale_sessions",
266 skip_all,
267 level = "debug",
268 err
269 )]
270 pub async fn reconcile_stale_sessions(
271 &self,
272 current_session_id: &str,
273 ) -> Result<u64, MemoryError> {
274 let result = zeph_db::query(
275 "UPDATE agent_sessions SET status = 'unknown' \
276 WHERE status = 'active' AND id != ?",
277 )
278 .bind(current_session_id)
279 .execute(&self.pool)
280 .await?;
281 Ok(result.rows_affected())
282 }
283
284 #[tracing::instrument(name = "memory.fleet.list_sessions", skip_all, level = "debug", err)]
293 pub async fn list_agent_sessions(
294 &self,
295 limit: u32,
296 status_filter: Option<SessionStatus>,
297 ) -> Result<Vec<AgentSessionRow>, MemoryError> {
298 #[allow(clippy::cast_possible_wrap)]
299 let sql_limit: i64 = if limit == 0 { -1 } else { i64::from(limit) };
300
301 type SessionRow = (
302 String,
303 String,
304 String,
305 String,
306 String,
307 String,
308 String,
309 i64,
310 i64,
311 i64,
312 i64,
313 f64,
314 Option<String>,
315 );
316 let rows: Vec<SessionRow> = if let Some(sf) = status_filter {
317 zeph_db::query_as(
318 "SELECT id, kind, status, channel, model, created_at, last_active_at, \
319 turns, prompt_tokens, completion_tokens, reasoning_tokens, cost_cents, goal_text \
320 FROM agent_sessions WHERE status = ? \
321 ORDER BY last_active_at DESC LIMIT ?",
322 )
323 .bind(sf.as_str())
324 .bind(sql_limit)
325 .fetch_all(&self.pool)
326 .await?
327 } else {
328 zeph_db::query_as(
329 "SELECT id, kind, status, channel, model, created_at, last_active_at, \
330 turns, prompt_tokens, completion_tokens, reasoning_tokens, cost_cents, goal_text \
331 FROM agent_sessions \
332 ORDER BY last_active_at DESC LIMIT ?",
333 )
334 .bind(sql_limit)
335 .fetch_all(&self.pool)
336 .await?
337 };
338
339 Ok(rows
340 .into_iter()
341 .map(
342 |(
343 id,
344 kind_s,
345 status_s,
346 channel_s,
347 model,
348 created_at,
349 last_active_at,
350 turns,
351 prompt_tokens,
352 completion_tokens,
353 reasoning_tokens,
354 cost_cents,
355 goal_text,
356 )| {
357 AgentSessionRow {
358 id,
359 kind: kind_s.parse().unwrap_or(SessionKind::Interactive),
360 status: status_s.parse().unwrap_or(SessionStatus::Unknown),
361 channel: channel_s.parse().unwrap_or(SessionChannel::Cli),
362 model,
363 created_at,
364 last_active_at,
365 turns: u32::try_from(turns).unwrap_or(0),
366 prompt_tokens: u64::try_from(prompt_tokens).unwrap_or(0),
367 completion_tokens: u64::try_from(completion_tokens).unwrap_or(0),
368 reasoning_tokens: u64::try_from(reasoning_tokens).unwrap_or(0),
369 cost_cents,
370 goal_text,
371 }
372 },
373 )
374 .collect())
375 }
376}
377
378#[cfg(test)]
379mod tests {
380 use super::*;
381
382 async fn make_store() -> SqliteStore {
383 SqliteStore::new(":memory:")
384 .await
385 .expect("SqliteStore::new")
386 }
387
388 fn sample(id: &str) -> AgentSessionRow {
389 AgentSessionRow {
390 id: id.to_owned(),
391 kind: SessionKind::Interactive,
392 status: SessionStatus::Active,
393 channel: SessionChannel::Cli,
394 model: "claude-sonnet-4-6".to_owned(),
395 created_at: "2026-01-01T00:00:00".to_owned(),
396 last_active_at: "2026-01-01T00:00:00".to_owned(),
397 turns: 0,
398 prompt_tokens: 0,
399 completion_tokens: 0,
400 reasoning_tokens: 0,
401 cost_cents: 0.0,
402 goal_text: None,
403 }
404 }
405
406 #[tokio::test]
407 async fn upsert_and_list() {
408 let store = make_store().await;
409 store.upsert_agent_session(&sample("s1")).await.unwrap();
410 let rows = store.list_agent_sessions(10, None).await.unwrap();
411 assert_eq!(rows.len(), 1);
412 assert_eq!(rows[0].id, "s1");
413 assert_eq!(rows[0].kind, SessionKind::Interactive);
414 assert_eq!(rows[0].status, SessionStatus::Active);
415 }
416
417 #[tokio::test]
418 async fn upsert_updates_existing() {
419 let store = make_store().await;
420 store.upsert_agent_session(&sample("s1")).await.unwrap();
421 let mut updated = sample("s1");
422 updated.turns = 5;
423 updated.status = SessionStatus::Completed;
424 store.upsert_agent_session(&updated).await.unwrap();
425
426 let rows = store.list_agent_sessions(10, None).await.unwrap();
427 assert_eq!(rows.len(), 1);
428 assert_eq!(rows[0].turns, 5);
429 assert_eq!(rows[0].status, SessionStatus::Completed);
430 }
431
432 #[tokio::test]
433 async fn update_status() {
434 let store = make_store().await;
435 store.upsert_agent_session(&sample("s1")).await.unwrap();
436 store
437 .update_agent_session_status("s1", SessionStatus::Failed)
438 .await
439 .unwrap();
440 let rows = store.list_agent_sessions(10, None).await.unwrap();
441 assert_eq!(rows[0].status, SessionStatus::Failed);
442 }
443
444 #[tokio::test]
445 async fn reconcile_stale_sessions() {
446 let store = make_store().await;
447 store.upsert_agent_session(&sample("s1")).await.unwrap();
448 store.upsert_agent_session(&sample("s2")).await.unwrap();
449 let affected = store.reconcile_stale_sessions("s2").await.unwrap();
450 assert_eq!(affected, 1);
451
452 let rows = store.list_agent_sessions(10, None).await.unwrap();
453 let s1 = rows.iter().find(|r| r.id == "s1").unwrap();
454 let s2 = rows.iter().find(|r| r.id == "s2").unwrap();
455 assert_eq!(s1.status, SessionStatus::Unknown);
456 assert_eq!(s2.status, SessionStatus::Active);
457 }
458
459 #[tokio::test]
460 async fn list_with_status_filter() {
461 let store = make_store().await;
462 store.upsert_agent_session(&sample("s1")).await.unwrap();
463 let mut s2 = sample("s2");
464 s2.status = SessionStatus::Completed;
465 store.upsert_agent_session(&s2).await.unwrap();
466
467 let active = store
468 .list_agent_sessions(10, Some(SessionStatus::Active))
469 .await
470 .unwrap();
471 assert_eq!(active.len(), 1);
472 assert_eq!(active[0].id, "s1");
473 }
474
475 #[tokio::test]
476 async fn list_respects_limit() {
477 let store = make_store().await;
478 for i in 0..5u8 {
479 store
480 .upsert_agent_session(&sample(&format!("s{i}")))
481 .await
482 .unwrap();
483 }
484 let rows = store.list_agent_sessions(3, None).await.unwrap();
485 assert_eq!(rows.len(), 3);
486 }
487
488 #[tokio::test]
489 async fn session_kind_roundtrip() {
490 use std::str::FromStr as _;
491 assert_eq!(
492 SessionKind::from_str("autonomous").unwrap(),
493 SessionKind::Autonomous
494 );
495 assert!(SessionKind::from_str("bad").is_err());
496 }
497
498 #[tokio::test]
499 async fn session_status_unknown_roundtrip() {
500 use std::str::FromStr as _;
501 assert_eq!(
502 SessionStatus::from_str("unknown").unwrap(),
503 SessionStatus::Unknown
504 );
505 }
506}