1use crate::async_database::AsyncDatabase;
2use crate::config_load::now_ts;
3use crate::persistence::repository::{SessionRepository, SqliteSessionRepository};
4use anyhow::{Context, Result};
5use rusqlite::{params, Connection, OptionalExtension};
6use serde::{Deserialize, Serialize};
7use std::sync::Arc;
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct SessionRow {
12 pub id: String,
14 pub task_id: String,
16 pub task_item_id: Option<String>,
18 pub step_id: String,
20 pub phase: String,
22 pub agent_id: String,
24 pub state: String,
26 pub pid: i64,
28 pub pty_backend: String,
30 pub cwd: String,
32 pub command: String,
34 pub input_fifo_path: String,
36 pub stdout_path: String,
38 pub stderr_path: String,
40 pub transcript_path: String,
42 pub output_json_path: Option<String>,
44 pub writer_client_id: Option<String>,
46 pub created_at: String,
48 pub updated_at: String,
50 pub ended_at: Option<String>,
52 pub exit_code: Option<i64>,
54}
55
56pub struct NewSession<'a> {
58 pub id: &'a str,
60 pub task_id: &'a str,
62 pub task_item_id: Option<&'a str>,
64 pub step_id: &'a str,
66 pub phase: &'a str,
68 pub agent_id: &'a str,
70 pub state: &'a str,
72 pub pid: i64,
74 pub pty_backend: &'a str,
76 pub cwd: &'a str,
78 pub command: &'a str,
80 pub input_fifo_path: &'a str,
82 pub stdout_path: &'a str,
84 pub stderr_path: &'a str,
86 pub transcript_path: &'a str,
88 pub output_json_path: Option<&'a str>,
90}
91
92pub fn insert_session(conn: &Connection, s: &NewSession<'_>) -> Result<()> {
94 let now = now_ts();
95 conn.execute(
96 "INSERT INTO agent_sessions (id, task_id, task_item_id, step_id, phase, agent_id, state, pid, pty_backend, cwd, command, input_fifo_path, stdout_path, stderr_path, transcript_path, output_json_path, writer_client_id, created_at, updated_at, ended_at, exit_code) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, NULL, ?17, ?17, NULL, NULL)",
97 params![
98 s.id,
99 s.task_id,
100 s.task_item_id,
101 s.step_id,
102 s.phase,
103 s.agent_id,
104 s.state,
105 s.pid,
106 s.pty_backend,
107 s.cwd,
108 s.command,
109 s.input_fifo_path,
110 s.stdout_path,
111 s.stderr_path,
112 s.transcript_path,
113 s.output_json_path,
114 now
115 ],
116 )?;
117 Ok(())
118}
119
120pub fn update_session_state(
122 conn: &Connection,
123 session_id: &str,
124 state: &str,
125 exit_code: Option<i64>,
126 ended: bool,
127) -> Result<()> {
128 let now = now_ts();
129 let ended_at = if ended { Some(now.clone()) } else { None };
130 conn.execute(
131 "UPDATE agent_sessions SET state = ?2, updated_at = ?3, ended_at = COALESCE(?4, ended_at), exit_code = COALESCE(?5, exit_code) WHERE id = ?1",
132 params![session_id, state, now, ended_at, exit_code],
133 )?;
134 Ok(())
135}
136
137pub fn update_session_pid(conn: &Connection, session_id: &str, pid: i64) -> Result<()> {
139 conn.execute(
140 "UPDATE agent_sessions SET pid = ?2, updated_at = ?3 WHERE id = ?1",
141 params![session_id, pid, now_ts()],
142 )?;
143 Ok(())
144}
145
146fn row_to_session(r: &rusqlite::Row<'_>) -> rusqlite::Result<SessionRow> {
147 Ok(SessionRow {
148 id: r.get(0)?,
149 task_id: r.get(1)?,
150 task_item_id: r.get(2)?,
151 step_id: r.get(3)?,
152 phase: r.get(4)?,
153 agent_id: r.get(5)?,
154 state: r.get(6)?,
155 pid: r.get(7)?,
156 pty_backend: r.get(8)?,
157 cwd: r.get(9)?,
158 command: r.get(10)?,
159 input_fifo_path: r.get(11)?,
160 stdout_path: r.get(12)?,
161 stderr_path: r.get(13)?,
162 transcript_path: r.get(14)?,
163 output_json_path: r.get(15)?,
164 writer_client_id: r.get(16)?,
165 created_at: r.get(17)?,
166 updated_at: r.get(18)?,
167 ended_at: r.get(19)?,
168 exit_code: r.get(20)?,
169 })
170}
171
172const SESSION_COLUMNS: &str = "id, task_id, task_item_id, step_id, phase, agent_id, state, pid, pty_backend, cwd, command, input_fifo_path, stdout_path, stderr_path, transcript_path, output_json_path, writer_client_id, created_at, updated_at, ended_at, exit_code";
173
174pub fn load_session(conn: &Connection, session_id: &str) -> Result<Option<SessionRow>> {
176 conn.query_row(
177 &format!(
178 "SELECT {} FROM agent_sessions WHERE id = ?1",
179 SESSION_COLUMNS
180 ),
181 params![session_id],
182 row_to_session,
183 )
184 .optional()
185 .context("load session")
186}
187
188pub fn load_active_session_for_task_step(
190 conn: &Connection,
191 task_id: &str,
192 step_id: &str,
193) -> Result<Option<SessionRow>> {
194 conn.query_row(
195 &format!(
196 "SELECT {}
197 FROM agent_sessions
198 WHERE task_id = ?1 AND step_id = ?2 AND state IN ('active','detached')
199 ORDER BY created_at DESC
200 LIMIT 1",
201 SESSION_COLUMNS
202 ),
203 params![task_id, step_id],
204 row_to_session,
205 )
206 .optional()
207 .context("load active session for task step")
208}
209
210pub fn list_task_sessions(conn: &Connection, task_id: &str) -> Result<Vec<SessionRow>> {
212 let mut stmt = conn.prepare(&format!(
213 "SELECT {}
214 FROM agent_sessions
215 WHERE task_id = ?1
216 ORDER BY created_at DESC",
217 SESSION_COLUMNS
218 ))?;
219 let rows = stmt
220 .query_map(params![task_id], row_to_session)?
221 .collect::<std::result::Result<Vec<_>, _>>()?;
222 Ok(rows)
223}
224
225pub fn acquire_writer(conn: &Connection, session_id: &str, client_id: &str) -> Result<bool> {
227 let existing: Option<String> = conn
228 .query_row(
229 "SELECT writer_client_id FROM agent_sessions WHERE id = ?1",
230 params![session_id],
231 |r| r.get::<_, Option<String>>(0),
232 )
233 .optional()?
234 .flatten();
235 if let Some(owner) = existing {
236 if !owner.is_empty() && owner != client_id {
237 return Ok(false);
238 }
239 }
240 conn.execute(
241 "UPDATE agent_sessions SET writer_client_id = ?2, updated_at = ?3 WHERE id = ?1",
242 params![session_id, client_id, now_ts()],
243 )?;
244 conn.execute(
245 "INSERT INTO session_attachments (session_id, client_id, mode, attached_at, detached_at, reason) VALUES (?1, ?2, 'writer', ?3, NULL, NULL)",
246 params![session_id, client_id, now_ts()],
247 )?;
248 Ok(true)
249}
250
251pub fn attach_reader(conn: &Connection, session_id: &str, client_id: &str) -> Result<()> {
253 conn.execute(
254 "INSERT INTO session_attachments (session_id, client_id, mode, attached_at, detached_at, reason) VALUES (?1, ?2, 'reader', ?3, NULL, NULL)",
255 params![session_id, client_id, now_ts()],
256 )?;
257 Ok(())
258}
259
260pub fn cleanup_stale_sessions(conn: &Connection, max_age_hours: u64) -> Result<usize> {
262 let cutoff = chrono::Utc::now() - chrono::Duration::hours(max_age_hours as i64);
263 let deleted = conn.execute(
264 "DELETE FROM agent_sessions WHERE state IN ('exited', 'failed') AND updated_at < ?1",
265 params![cutoff.to_rfc3339()],
266 )?;
267 Ok(deleted)
268}
269
270pub fn release_attachment(
272 conn: &Connection,
273 session_id: &str,
274 client_id: &str,
275 reason: &str,
276) -> Result<()> {
277 conn.execute(
278 "UPDATE session_attachments SET detached_at = ?3, reason = ?4 WHERE session_id = ?1 AND client_id = ?2 AND detached_at IS NULL",
279 params![session_id, client_id, now_ts(), reason],
280 )?;
281 conn.execute(
282 "UPDATE agent_sessions SET writer_client_id = NULL, updated_at = ?2 WHERE id = ?1 AND writer_client_id = ?3",
283 params![session_id, now_ts(), client_id],
284 )?;
285 Ok(())
286}
287
288pub struct OwnedNewSession {
290 pub id: String,
292 pub task_id: String,
294 pub task_item_id: Option<String>,
296 pub step_id: String,
298 pub phase: String,
300 pub agent_id: String,
302 pub state: String,
304 pub pid: i64,
306 pub pty_backend: String,
308 pub cwd: String,
310 pub command: String,
312 pub input_fifo_path: String,
314 pub stdout_path: String,
316 pub stderr_path: String,
318 pub transcript_path: String,
320 pub output_json_path: Option<String>,
322}
323
324impl<'a> From<&NewSession<'a>> for OwnedNewSession {
325 fn from(s: &NewSession<'a>) -> Self {
326 Self {
327 id: s.id.to_owned(),
328 task_id: s.task_id.to_owned(),
329 task_item_id: s.task_item_id.map(|v| v.to_owned()),
330 step_id: s.step_id.to_owned(),
331 phase: s.phase.to_owned(),
332 agent_id: s.agent_id.to_owned(),
333 state: s.state.to_owned(),
334 pid: s.pid,
335 pty_backend: s.pty_backend.to_owned(),
336 cwd: s.cwd.to_owned(),
337 command: s.command.to_owned(),
338 input_fifo_path: s.input_fifo_path.to_owned(),
339 stdout_path: s.stdout_path.to_owned(),
340 stderr_path: s.stderr_path.to_owned(),
341 transcript_path: s.transcript_path.to_owned(),
342 output_json_path: s.output_json_path.map(|v| v.to_owned()),
343 }
344 }
345}
346
347pub struct AsyncSessionStore {
349 repository: Arc<dyn SessionRepository>,
350}
351
352impl AsyncSessionStore {
353 pub fn new(async_db: Arc<AsyncDatabase>) -> Self {
355 Self::with_repository(Arc::new(SqliteSessionRepository::new(async_db)))
356 }
357
358 pub fn with_repository(repository: Arc<dyn SessionRepository>) -> Self {
360 Self { repository }
361 }
362
363 pub async fn insert_session(&self, s: OwnedNewSession) -> Result<()> {
365 self.repository.insert_session(s).await
366 }
367
368 pub async fn update_session_state(
370 &self,
371 session_id: &str,
372 state: &str,
373 exit_code: Option<i64>,
374 ended: bool,
375 ) -> Result<()> {
376 self.repository
377 .update_session_state(session_id, state, exit_code, ended)
378 .await
379 }
380
381 pub async fn update_session_pid(&self, session_id: &str, pid: i64) -> Result<()> {
383 self.repository.update_session_pid(session_id, pid).await
384 }
385
386 pub async fn load_session(&self, session_id: &str) -> Result<Option<SessionRow>> {
388 self.repository.load_session(session_id).await
389 }
390
391 pub async fn load_active_session_for_task_step(
393 &self,
394 task_id: &str,
395 step_id: &str,
396 ) -> Result<Option<SessionRow>> {
397 self.repository
398 .load_active_session_for_task_step(task_id, step_id)
399 .await
400 }
401
402 pub async fn list_task_sessions(&self, task_id: &str) -> Result<Vec<SessionRow>> {
404 self.repository.list_task_sessions(task_id).await
405 }
406
407 pub async fn acquire_writer(&self, session_id: &str, client_id: &str) -> Result<bool> {
409 self.repository.acquire_writer(session_id, client_id).await
410 }
411
412 pub async fn attach_reader(&self, session_id: &str, client_id: &str) -> Result<()> {
414 self.repository.attach_reader(session_id, client_id).await
415 }
416
417 pub async fn cleanup_stale_sessions(&self, max_age_hours: u64) -> Result<usize> {
419 self.repository.cleanup_stale_sessions(max_age_hours).await
420 }
421
422 pub async fn release_attachment(
424 &self,
425 session_id: &str,
426 client_id: &str,
427 reason: &str,
428 ) -> Result<()> {
429 self.repository
430 .release_attachment(session_id, client_id, reason)
431 .await
432 }
433}
434
435#[cfg(test)]
436mod tests {
437 use super::*;
438 use crate::db::{init_schema, open_conn};
439 use tempfile::TempDir;
440
441 fn make_db() -> (TempDir, std::path::PathBuf) {
442 let dir = tempfile::tempdir().expect("create tempdir");
443 let db_path = dir.path().join("sessions.db");
444 init_schema(&db_path).expect("init schema");
445 (dir, db_path)
446 }
447
448 fn make_session<'a>(
449 id: &'a str,
450 task_id: &'a str,
451 step_id: &'a str,
452 state: &'a str,
453 ) -> NewSession<'a> {
454 NewSession {
455 id,
456 task_id,
457 task_item_id: Some("item-1"),
458 step_id,
459 phase: "qa",
460 agent_id: "agent-a",
461 state,
462 pid: 100,
463 pty_backend: "pty",
464 cwd: "/tmp",
465 command: "echo hi",
466 input_fifo_path: "/tmp/in.fifo",
467 stdout_path: "/tmp/stdout.log",
468 stderr_path: "/tmp/stderr.log",
469 transcript_path: "/tmp/transcript.log",
470 output_json_path: Some("/tmp/output.json"),
471 }
472 }
473
474 #[test]
475 fn insert_load_and_update_session_lifecycle() {
476 let (_dir, db_path) = make_db();
477 let conn = open_conn(&db_path).expect("open conn");
478 let session = make_session("sess-1", "task-1", "qa", "active");
479 insert_session(&conn, &session).expect("insert session");
480
481 let inserted = load_session(&conn, "sess-1")
482 .expect("load session")
483 .expect("session should exist");
484 assert_eq!(inserted.task_item_id.as_deref(), Some("item-1"));
485 assert_eq!(
486 inserted.output_json_path.as_deref(),
487 Some("/tmp/output.json")
488 );
489 assert_eq!(inserted.state, "active");
490 assert_eq!(inserted.pid, 100);
491 assert_eq!(inserted.ended_at, None);
492 assert_eq!(inserted.exit_code, None);
493
494 update_session_pid(&conn, "sess-1", 4242).expect("update pid");
495 update_session_state(&conn, "sess-1", "detached", Some(7), false).expect("detach session");
496
497 let detached = load_session(&conn, "sess-1")
498 .expect("reload session")
499 .expect("session should still exist");
500 assert_eq!(detached.pid, 4242);
501 assert_eq!(detached.state, "detached");
502 assert_eq!(detached.exit_code, Some(7));
503 assert_eq!(detached.ended_at, None);
504
505 update_session_state(&conn, "sess-1", "exited", None, true).expect("exit session");
506 let exited = load_session(&conn, "sess-1")
507 .expect("reload exited session")
508 .expect("session should still exist");
509 assert_eq!(exited.state, "exited");
510 assert_eq!(exited.exit_code, Some(7));
511 assert!(exited.ended_at.is_some());
512
513 assert!(load_session(&conn, "missing")
514 .expect("load missing session")
515 .is_none());
516 }
517
518 #[test]
519 fn active_session_lookup_and_listing_filter_by_task() {
520 let (_dir, db_path) = make_db();
521 let conn = open_conn(&db_path).expect("open conn");
522 insert_session(&conn, &make_session("sess-old", "task-1", "qa", "exited"))
523 .expect("insert exited session");
524 std::thread::sleep(std::time::Duration::from_millis(2));
525 insert_session(
526 &conn,
527 &make_session("sess-active", "task-1", "qa", "active"),
528 )
529 .expect("insert active session");
530 std::thread::sleep(std::time::Duration::from_millis(2));
531 insert_session(
532 &conn,
533 &make_session("sess-detached", "task-1", "qa", "detached"),
534 )
535 .expect("insert detached session");
536 insert_session(&conn, &make_session("sess-other", "task-2", "qa", "active"))
537 .expect("insert other task session");
538
539 let active = load_active_session_for_task_step(&conn, "task-1", "qa")
540 .expect("query active session")
541 .expect("task should have an active session");
542 assert_eq!(active.id, "sess-detached");
543 assert_eq!(active.state, "detached");
544
545 let task_1_sessions = list_task_sessions(&conn, "task-1").expect("list sessions");
546 let task_1_ids: Vec<&str> = task_1_sessions.iter().map(|row| row.id.as_str()).collect();
547 assert_eq!(task_1_ids.len(), 3);
548 assert!(task_1_ids.contains(&"sess-old"));
549 assert!(task_1_ids.contains(&"sess-active"));
550 assert!(task_1_ids.contains(&"sess-detached"));
551
552 assert!(
553 load_active_session_for_task_step(&conn, "task-1", "missing-step")
554 .expect("query missing step")
555 .is_none()
556 );
557 }
558
559 #[test]
560 fn cleanup_stale_sessions_removes_old_exited_keeps_recent() {
561 let (_dir, db_path) = make_db();
562 let conn = open_conn(&db_path).expect("open conn");
563
564 insert_session(&conn, &make_session("old-exited", "task-1", "qa", "exited"))
566 .expect("insert old exited");
567 let old_ts = (chrono::Utc::now() - chrono::Duration::hours(100)).to_rfc3339();
568 conn.execute(
569 "UPDATE agent_sessions SET updated_at = ?2 WHERE id = ?1",
570 params!["old-exited", old_ts],
571 )
572 .expect("backdate old session");
573
574 insert_session(&conn, &make_session("old-active", "task-1", "qa", "active"))
576 .expect("insert old active");
577 conn.execute(
578 "UPDATE agent_sessions SET updated_at = ?2 WHERE id = ?1",
579 params!["old-active", old_ts],
580 )
581 .expect("backdate active session");
582
583 insert_session(&conn, &make_session("new-exited", "task-1", "qa", "exited"))
585 .expect("insert new exited");
586
587 let deleted = cleanup_stale_sessions(&conn, 72).expect("cleanup");
588 assert_eq!(deleted, 1);
589
590 assert!(load_session(&conn, "old-exited").expect("load").is_none());
592 assert!(load_session(&conn, "old-active").expect("load").is_some());
593 assert!(load_session(&conn, "new-exited").expect("load").is_some());
594 }
595
596 #[test]
597 fn writer_and_reader_attachments_round_trip() {
598 let (_dir, db_path) = make_db();
599 let conn = open_conn(&db_path).expect("open conn");
600 insert_session(&conn, &make_session("sess-1", "task-1", "qa", "active"))
601 .expect("insert session");
602
603 assert!(acquire_writer(&conn, "sess-1", "writer-1").expect("acquire initial writer"));
604 assert!(acquire_writer(&conn, "sess-1", "writer-1").expect("re-acquire same writer"));
605 assert!(!acquire_writer(&conn, "sess-1", "writer-2").expect("reject second writer"));
606
607 attach_reader(&conn, "sess-1", "reader-1").expect("attach reader");
608 release_attachment(&conn, "sess-1", "reader-1", "done").expect("detach reader");
609 release_attachment(&conn, "sess-1", "writer-1", "handoff").expect("detach writer");
610
611 let session = load_session(&conn, "sess-1")
612 .expect("reload session")
613 .expect("session should exist");
614 assert_eq!(session.writer_client_id, None);
615
616 let writer_attachments: i64 = conn
617 .query_row(
618 "SELECT COUNT(*) FROM session_attachments WHERE session_id = ?1 AND mode = 'writer'",
619 params!["sess-1"],
620 |row| row.get(0),
621 )
622 .expect("count writer attachments");
623 let detached_attachments: i64 = conn
624 .query_row(
625 "SELECT COUNT(*) FROM session_attachments WHERE session_id = ?1 AND detached_at IS NOT NULL",
626 params!["sess-1"],
627 |row| row.get(0),
628 )
629 .expect("count detached attachments");
630
631 assert_eq!(writer_attachments, 2);
632 assert_eq!(detached_attachments, 3);
633 }
634
635 #[tokio::test]
636 async fn async_session_store_exercises_all_wrapper_methods() {
637 let (_dir, db_path) = make_db();
638 let async_db = Arc::new(AsyncDatabase::open(&db_path).await.expect("open async db"));
639 let store = AsyncSessionStore::new(async_db);
640
641 let session = make_session("sess-async", "task-1", "qa", "active");
642 store
643 .insert_session(OwnedNewSession::from(&session))
644 .await
645 .expect("insert session");
646
647 let loaded = store
648 .load_session("sess-async")
649 .await
650 .expect("load session")
651 .expect("session exists");
652 assert_eq!(loaded.id, "sess-async");
653 assert_eq!(loaded.state, "active");
654
655 let active = store
656 .load_active_session_for_task_step("task-1", "qa")
657 .await
658 .expect("load active session")
659 .expect("active session exists");
660 assert_eq!(active.id, "sess-async");
661
662 let listed = store
663 .list_task_sessions("task-1")
664 .await
665 .expect("list sessions");
666 assert_eq!(listed.len(), 1);
667
668 assert!(store
669 .acquire_writer("sess-async", "writer-1")
670 .await
671 .expect("acquire writer"));
672 assert!(!store
673 .acquire_writer("sess-async", "writer-2")
674 .await
675 .expect("reject second writer"));
676
677 store
678 .attach_reader("sess-async", "reader-1")
679 .await
680 .expect("attach reader");
681 store
682 .update_session_pid("sess-async", 5150)
683 .await
684 .expect("update pid");
685 store
686 .update_session_state("sess-async", "failed", Some(9), true)
687 .await
688 .expect("update session state");
689 store
690 .release_attachment("sess-async", "reader-1", "done")
691 .await
692 .expect("release reader");
693 store
694 .release_attachment("sess-async", "writer-1", "done")
695 .await
696 .expect("release writer");
697
698 let exited = store
699 .load_session("sess-async")
700 .await
701 .expect("reload exited session")
702 .expect("session still exists");
703 assert_eq!(exited.pid, 5150);
704 assert_eq!(exited.state, "failed");
705 assert_eq!(exited.exit_code, Some(9));
706 assert!(exited.ended_at.is_some());
707 assert!(exited.writer_client_id.is_none());
708
709 let conn = open_conn(&db_path).expect("open sync conn");
710 let old_ts = (chrono::Utc::now() - chrono::Duration::hours(100)).to_rfc3339();
711 conn.execute(
712 "UPDATE agent_sessions SET updated_at = ?2 WHERE id = ?1",
713 params!["sess-async", old_ts],
714 )
715 .expect("backdate session");
716
717 let deleted = store
718 .cleanup_stale_sessions(72)
719 .await
720 .expect("cleanup stale sessions");
721 assert_eq!(deleted, 1);
722 assert!(store
723 .load_session("sess-async")
724 .await
725 .expect("load deleted session")
726 .is_none());
727 assert!(store
728 .load_session("missing")
729 .await
730 .expect("load missing session")
731 .is_none());
732 }
733}