1use serde::{Deserialize, Serialize};
7#[allow(unused_imports)]
8use zeph_db::sql;
9
10use super::SqliteStore;
11use crate::error::MemoryError;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
15#[serde(rename_all = "lowercase")]
16#[non_exhaustive]
17pub enum SessionKind {
18 Interactive,
20 Autonomous,
22 Acp,
24}
25
26impl SessionKind {
27 fn as_str(self) -> &'static str {
28 match self {
29 Self::Interactive => "interactive",
30 Self::Autonomous => "autonomous",
31 Self::Acp => "acp",
32 }
33 }
34}
35
36impl std::fmt::Display for SessionKind {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 f.write_str(self.as_str())
39 }
40}
41
42impl std::str::FromStr for SessionKind {
43 type Err = String;
44
45 fn from_str(s: &str) -> Result<Self, Self::Err> {
46 match s {
47 "interactive" => Ok(Self::Interactive),
48 "autonomous" => Ok(Self::Autonomous),
49 "acp" => Ok(Self::Acp),
50 other => Err(format!("unknown session kind: {other}")),
51 }
52 }
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
57#[serde(rename_all = "lowercase")]
58#[non_exhaustive]
59pub enum SessionStatus {
60 Active,
62 Completed,
64 Failed,
66 Cancelled,
68 Unknown,
70}
71
72impl SessionStatus {
73 fn as_str(self) -> &'static str {
74 match self {
75 Self::Active => "active",
76 Self::Completed => "completed",
77 Self::Failed => "failed",
78 Self::Cancelled => "cancelled",
79 Self::Unknown => "unknown",
80 }
81 }
82}
83
84impl std::fmt::Display for SessionStatus {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 f.write_str(self.as_str())
87 }
88}
89
90impl std::str::FromStr for SessionStatus {
91 type Err = String;
92
93 fn from_str(s: &str) -> Result<Self, Self::Err> {
94 match s {
95 "active" => Ok(Self::Active),
96 "completed" => Ok(Self::Completed),
97 "failed" => Ok(Self::Failed),
98 "cancelled" => Ok(Self::Cancelled),
99 "unknown" => Ok(Self::Unknown),
100 other => Err(format!("unknown session status: {other}")),
101 }
102 }
103}
104
105#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
107#[serde(rename_all = "lowercase")]
108#[non_exhaustive]
109pub enum SessionChannel {
110 Cli,
111 Tui,
112 Telegram,
113 Discord,
114 Slack,
115 Acp,
116}
117
118impl SessionChannel {
119 fn as_str(self) -> &'static str {
120 match self {
121 Self::Cli => "cli",
122 Self::Tui => "tui",
123 Self::Telegram => "telegram",
124 Self::Discord => "discord",
125 Self::Slack => "slack",
126 Self::Acp => "acp",
127 }
128 }
129}
130
131impl std::fmt::Display for SessionChannel {
132 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133 f.write_str(self.as_str())
134 }
135}
136
137impl std::str::FromStr for SessionChannel {
138 type Err = String;
139
140 fn from_str(s: &str) -> Result<Self, Self::Err> {
141 match s {
142 "cli" => Ok(Self::Cli),
143 "tui" => Ok(Self::Tui),
144 "telegram" => Ok(Self::Telegram),
145 "discord" => Ok(Self::Discord),
146 "slack" => Ok(Self::Slack),
147 "acp" => Ok(Self::Acp),
148 other => Err(format!("unknown session channel: {other}")),
149 }
150 }
151}
152
153#[derive(Debug, Clone)]
158pub struct AgentSessionRow {
159 pub id: String,
161 pub kind: SessionKind,
163 pub status: SessionStatus,
165 pub channel: SessionChannel,
167 pub model: String,
169 pub created_at: String,
171 pub last_active_at: String,
173 pub turns: u32,
175 pub prompt_tokens: u64,
177 pub completion_tokens: u64,
179 pub reasoning_tokens: u64,
181 pub cost_cents: f64,
183 pub goal_text: Option<String>,
185}
186
187impl SqliteStore {
188 #[tracing::instrument(name = "memory.fleet.upsert_session", skip_all, level = "debug", err)]
196 pub async fn upsert_agent_session(&self, s: &AgentSessionRow) -> Result<(), MemoryError> {
197 zeph_db::query(
198 "INSERT INTO agent_sessions \
199 (id, kind, status, channel, model, created_at, last_active_at, \
200 turns, prompt_tokens, completion_tokens, reasoning_tokens, cost_cents, goal_text) \
201 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) \
202 ON CONFLICT(id) DO UPDATE SET \
203 kind = excluded.kind, \
204 status = excluded.status, \
205 channel = excluded.channel, \
206 model = excluded.model, \
207 last_active_at = excluded.last_active_at, \
208 turns = excluded.turns, \
209 prompt_tokens = excluded.prompt_tokens, \
210 completion_tokens = excluded.completion_tokens, \
211 reasoning_tokens = excluded.reasoning_tokens, \
212 cost_cents = excluded.cost_cents, \
213 goal_text = excluded.goal_text",
214 )
215 .bind(&s.id)
216 .bind(s.kind.as_str())
217 .bind(s.status.as_str())
218 .bind(s.channel.as_str())
219 .bind(&s.model)
220 .bind(&s.created_at)
221 .bind(&s.last_active_at)
222 .bind(s.turns)
223 .bind(s.prompt_tokens.cast_signed())
224 .bind(s.completion_tokens.cast_signed())
225 .bind(s.reasoning_tokens.cast_signed())
226 .bind(s.cost_cents)
227 .bind(&s.goal_text)
228 .execute(&self.pool)
229 .await?;
230 Ok(())
231 }
232
233 #[tracing::instrument(
239 name = "memory.fleet.update_agent_session_status",
240 skip_all,
241 level = "debug",
242 err
243 )]
244 pub async fn update_agent_session_status(
245 &self,
246 id: &str,
247 status: SessionStatus,
248 ) -> Result<(), MemoryError> {
249 zeph_db::query(
250 "UPDATE agent_sessions SET status = ?, last_active_at = datetime('now') WHERE id = ?",
251 )
252 .bind(status.as_str())
253 .bind(id)
254 .execute(&self.pool)
255 .await?;
256 Ok(())
257 }
258
259 #[tracing::instrument(
268 name = "memory.fleet.reconcile_stale_sessions",
269 skip_all,
270 level = "debug",
271 err
272 )]
273 pub async fn reconcile_stale_sessions(
274 &self,
275 current_session_id: &str,
276 ) -> Result<u64, MemoryError> {
277 let result = zeph_db::query(
278 "UPDATE agent_sessions SET status = 'unknown' \
279 WHERE status = 'active' AND id != ?",
280 )
281 .bind(current_session_id)
282 .execute(&self.pool)
283 .await?;
284 Ok(result.rows_affected())
285 }
286
287 #[tracing::instrument(name = "memory.fleet.list_sessions", skip_all, level = "debug", err)]
296 pub async fn list_agent_sessions(
297 &self,
298 limit: u32,
299 status_filter: Option<SessionStatus>,
300 ) -> Result<Vec<AgentSessionRow>, MemoryError> {
301 #[allow(clippy::cast_possible_wrap)]
302 let sql_limit: i64 = if limit == 0 { -1 } else { i64::from(limit) };
303
304 type SessionRow = (
305 String,
306 String,
307 String,
308 String,
309 String,
310 String,
311 String,
312 i64,
313 i64,
314 i64,
315 i64,
316 f64,
317 Option<String>,
318 );
319 let rows: Vec<SessionRow> = if let Some(sf) = status_filter {
320 zeph_db::query_as(
321 "SELECT id, kind, status, channel, model, created_at, last_active_at, \
322 turns, prompt_tokens, completion_tokens, reasoning_tokens, cost_cents, goal_text \
323 FROM agent_sessions WHERE status = ? \
324 ORDER BY last_active_at DESC LIMIT ?",
325 )
326 .bind(sf.as_str())
327 .bind(sql_limit)
328 .fetch_all(&self.pool)
329 .await?
330 } else {
331 zeph_db::query_as(
332 "SELECT id, kind, status, channel, model, created_at, last_active_at, \
333 turns, prompt_tokens, completion_tokens, reasoning_tokens, cost_cents, goal_text \
334 FROM agent_sessions \
335 ORDER BY last_active_at DESC LIMIT ?",
336 )
337 .bind(sql_limit)
338 .fetch_all(&self.pool)
339 .await?
340 };
341
342 Ok(rows
343 .into_iter()
344 .map(
345 |(
346 id,
347 kind_s,
348 status_s,
349 channel_s,
350 model,
351 created_at,
352 last_active_at,
353 turns,
354 prompt_tokens,
355 completion_tokens,
356 reasoning_tokens,
357 cost_cents,
358 goal_text,
359 )| {
360 AgentSessionRow {
361 id,
362 kind: kind_s.parse().unwrap_or(SessionKind::Interactive),
363 status: status_s.parse().unwrap_or(SessionStatus::Unknown),
364 channel: channel_s.parse().unwrap_or(SessionChannel::Cli),
365 model,
366 created_at,
367 last_active_at,
368 turns: u32::try_from(turns).unwrap_or(0),
369 prompt_tokens: u64::try_from(prompt_tokens).unwrap_or(0),
370 completion_tokens: u64::try_from(completion_tokens).unwrap_or(0),
371 reasoning_tokens: u64::try_from(reasoning_tokens).unwrap_or(0),
372 cost_cents,
373 goal_text,
374 }
375 },
376 )
377 .collect())
378 }
379}
380
381#[cfg(test)]
382mod tests {
383 use super::*;
384
385 async fn make_store() -> SqliteStore {
386 SqliteStore::new(":memory:")
387 .await
388 .expect("SqliteStore::new")
389 }
390
391 fn sample(id: &str) -> AgentSessionRow {
392 AgentSessionRow {
393 id: id.to_owned(),
394 kind: SessionKind::Interactive,
395 status: SessionStatus::Active,
396 channel: SessionChannel::Cli,
397 model: "claude-sonnet-4-6".to_owned(),
398 created_at: "2026-01-01T00:00:00".to_owned(),
399 last_active_at: "2026-01-01T00:00:00".to_owned(),
400 turns: 0,
401 prompt_tokens: 0,
402 completion_tokens: 0,
403 reasoning_tokens: 0,
404 cost_cents: 0.0,
405 goal_text: None,
406 }
407 }
408
409 #[tokio::test]
410 async fn upsert_and_list() {
411 let store = make_store().await;
412 store.upsert_agent_session(&sample("s1")).await.unwrap();
413 let rows = store.list_agent_sessions(10, None).await.unwrap();
414 assert_eq!(rows.len(), 1);
415 assert_eq!(rows[0].id, "s1");
416 assert_eq!(rows[0].kind, SessionKind::Interactive);
417 assert_eq!(rows[0].status, SessionStatus::Active);
418 }
419
420 #[tokio::test]
421 async fn upsert_updates_existing() {
422 let store = make_store().await;
423 store.upsert_agent_session(&sample("s1")).await.unwrap();
424 let mut updated = sample("s1");
425 updated.turns = 5;
426 updated.status = SessionStatus::Completed;
427 store.upsert_agent_session(&updated).await.unwrap();
428
429 let rows = store.list_agent_sessions(10, None).await.unwrap();
430 assert_eq!(rows.len(), 1);
431 assert_eq!(rows[0].turns, 5);
432 assert_eq!(rows[0].status, SessionStatus::Completed);
433 }
434
435 #[tokio::test]
436 async fn update_status() {
437 let store = make_store().await;
438 store.upsert_agent_session(&sample("s1")).await.unwrap();
439 store
440 .update_agent_session_status("s1", SessionStatus::Failed)
441 .await
442 .unwrap();
443 let rows = store.list_agent_sessions(10, None).await.unwrap();
444 assert_eq!(rows[0].status, SessionStatus::Failed);
445 }
446
447 #[tokio::test]
448 async fn reconcile_stale_sessions() {
449 let store = make_store().await;
450 store.upsert_agent_session(&sample("s1")).await.unwrap();
451 store.upsert_agent_session(&sample("s2")).await.unwrap();
452 let affected = store.reconcile_stale_sessions("s2").await.unwrap();
453 assert_eq!(affected, 1);
454
455 let rows = store.list_agent_sessions(10, None).await.unwrap();
456 let s1 = rows.iter().find(|r| r.id == "s1").unwrap();
457 let s2 = rows.iter().find(|r| r.id == "s2").unwrap();
458 assert_eq!(s1.status, SessionStatus::Unknown);
459 assert_eq!(s2.status, SessionStatus::Active);
460 }
461
462 #[tokio::test]
463 async fn list_with_status_filter() {
464 let store = make_store().await;
465 store.upsert_agent_session(&sample("s1")).await.unwrap();
466 let mut s2 = sample("s2");
467 s2.status = SessionStatus::Completed;
468 store.upsert_agent_session(&s2).await.unwrap();
469
470 let active = store
471 .list_agent_sessions(10, Some(SessionStatus::Active))
472 .await
473 .unwrap();
474 assert_eq!(active.len(), 1);
475 assert_eq!(active[0].id, "s1");
476 }
477
478 #[tokio::test]
479 async fn list_respects_limit() {
480 let store = make_store().await;
481 for i in 0..5u8 {
482 store
483 .upsert_agent_session(&sample(&format!("s{i}")))
484 .await
485 .unwrap();
486 }
487 let rows = store.list_agent_sessions(3, None).await.unwrap();
488 assert_eq!(rows.len(), 3);
489 }
490
491 #[tokio::test]
492 async fn session_kind_roundtrip() {
493 use std::str::FromStr as _;
494 assert_eq!(
495 SessionKind::from_str("autonomous").unwrap(),
496 SessionKind::Autonomous
497 );
498 assert!(SessionKind::from_str("bad").is_err());
499 }
500
501 #[tokio::test]
502 async fn session_status_unknown_roundtrip() {
503 use std::str::FromStr as _;
504 assert_eq!(
505 SessionStatus::from_str("unknown").unwrap(),
506 SessionStatus::Unknown
507 );
508 }
509}