1use crate::async_database::AsyncDatabase;
2use crate::config_load::now_ts;
3use crate::persistence::repository::{SessionRepository, SqliteSessionRepository};
4use anyhow::{Context, Result};
5use rusqlite::{Connection, OptionalExtension, params};
6use serde::{Deserialize, Serialize};
7use std::sync::Arc;
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct SessionRow {
12 pub id: String,
14 pub task_id: String,
16 pub task_item_id: Option<String>,
18 pub step_id: String,
20 pub phase: String,
22 pub agent_id: String,
24 pub state: String,
26 pub pid: i64,
28 pub pty_backend: String,
30 pub cwd: String,
32 pub command: String,
34 pub input_fifo_path: String,
36 pub stdout_path: String,
38 pub stderr_path: String,
40 pub transcript_path: String,
42 pub output_json_path: Option<String>,
44 pub writer_client_id: Option<String>,
46 pub created_at: String,
48 pub updated_at: String,
50 pub ended_at: Option<String>,
52 pub exit_code: Option<i64>,
54}
55
56pub struct NewSession<'a> {
58 pub id: &'a str,
60 pub task_id: &'a str,
62 pub task_item_id: Option<&'a str>,
64 pub step_id: &'a str,
66 pub phase: &'a str,
68 pub agent_id: &'a str,
70 pub state: &'a str,
72 pub pid: i64,
74 pub pty_backend: &'a str,
76 pub cwd: &'a str,
78 pub command: &'a str,
80 pub input_fifo_path: &'a str,
82 pub stdout_path: &'a str,
84 pub stderr_path: &'a str,
86 pub transcript_path: &'a str,
88 pub output_json_path: Option<&'a str>,
90}
91
92pub fn insert_session(conn: &Connection, s: &NewSession<'_>) -> Result<()> {
94 let now = now_ts();
95 conn.execute(
96 "INSERT INTO agent_sessions (id, task_id, task_item_id, step_id, phase, agent_id, state, pid, pty_backend, cwd, command, input_fifo_path, stdout_path, stderr_path, transcript_path, output_json_path, writer_client_id, created_at, updated_at, ended_at, exit_code) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, NULL, ?17, ?17, NULL, NULL)",
97 params![
98 s.id,
99 s.task_id,
100 s.task_item_id,
101 s.step_id,
102 s.phase,
103 s.agent_id,
104 s.state,
105 s.pid,
106 s.pty_backend,
107 s.cwd,
108 s.command,
109 s.input_fifo_path,
110 s.stdout_path,
111 s.stderr_path,
112 s.transcript_path,
113 s.output_json_path,
114 now
115 ],
116 )?;
117 Ok(())
118}
119
120pub fn update_session_state(
122 conn: &Connection,
123 session_id: &str,
124 state: &str,
125 exit_code: Option<i64>,
126 ended: bool,
127) -> Result<()> {
128 let now = now_ts();
129 let ended_at = if ended { Some(now.clone()) } else { None };
130 conn.execute(
131 "UPDATE agent_sessions SET state = ?2, updated_at = ?3, ended_at = COALESCE(?4, ended_at), exit_code = COALESCE(?5, exit_code) WHERE id = ?1",
132 params![session_id, state, now, ended_at, exit_code],
133 )?;
134 Ok(())
135}
136
137pub fn update_session_pid(conn: &Connection, session_id: &str, pid: i64) -> Result<()> {
139 conn.execute(
140 "UPDATE agent_sessions SET pid = ?2, updated_at = ?3 WHERE id = ?1",
141 params![session_id, pid, now_ts()],
142 )?;
143 Ok(())
144}
145
146fn row_to_session(r: &rusqlite::Row<'_>) -> rusqlite::Result<SessionRow> {
147 Ok(SessionRow {
148 id: r.get(0)?,
149 task_id: r.get(1)?,
150 task_item_id: r.get(2)?,
151 step_id: r.get(3)?,
152 phase: r.get(4)?,
153 agent_id: r.get(5)?,
154 state: r.get(6)?,
155 pid: r.get(7)?,
156 pty_backend: r.get(8)?,
157 cwd: r.get(9)?,
158 command: r.get(10)?,
159 input_fifo_path: r.get(11)?,
160 stdout_path: r.get(12)?,
161 stderr_path: r.get(13)?,
162 transcript_path: r.get(14)?,
163 output_json_path: r.get(15)?,
164 writer_client_id: r.get(16)?,
165 created_at: r.get(17)?,
166 updated_at: r.get(18)?,
167 ended_at: r.get(19)?,
168 exit_code: r.get(20)?,
169 })
170}
171
172const SESSION_COLUMNS: &str = "id, task_id, task_item_id, step_id, phase, agent_id, state, pid, pty_backend, cwd, command, input_fifo_path, stdout_path, stderr_path, transcript_path, output_json_path, writer_client_id, created_at, updated_at, ended_at, exit_code";
173
174pub fn load_session(conn: &Connection, session_id: &str) -> Result<Option<SessionRow>> {
176 conn.query_row(
177 &format!(
178 "SELECT {} FROM agent_sessions WHERE id = ?1",
179 SESSION_COLUMNS
180 ),
181 params![session_id],
182 row_to_session,
183 )
184 .optional()
185 .context("load session")
186}
187
188pub fn load_active_session_for_task_step(
190 conn: &Connection,
191 task_id: &str,
192 step_id: &str,
193) -> Result<Option<SessionRow>> {
194 conn.query_row(
195 &format!(
196 "SELECT {}
197 FROM agent_sessions
198 WHERE task_id = ?1 AND step_id = ?2 AND state IN ('active','detached')
199 ORDER BY created_at DESC
200 LIMIT 1",
201 SESSION_COLUMNS
202 ),
203 params![task_id, step_id],
204 row_to_session,
205 )
206 .optional()
207 .context("load active session for task step")
208}
209
210pub fn list_task_sessions(conn: &Connection, task_id: &str) -> Result<Vec<SessionRow>> {
212 let mut stmt = conn.prepare(&format!(
213 "SELECT {}
214 FROM agent_sessions
215 WHERE task_id = ?1
216 ORDER BY created_at DESC",
217 SESSION_COLUMNS
218 ))?;
219 let rows = stmt
220 .query_map(params![task_id], row_to_session)?
221 .collect::<std::result::Result<Vec<_>, _>>()?;
222 Ok(rows)
223}
224
225pub fn acquire_writer(conn: &Connection, session_id: &str, client_id: &str) -> Result<bool> {
227 let existing: Option<String> = conn
228 .query_row(
229 "SELECT writer_client_id FROM agent_sessions WHERE id = ?1",
230 params![session_id],
231 |r| r.get::<_, Option<String>>(0),
232 )
233 .optional()?
234 .flatten();
235 if let Some(owner) = existing {
236 if !owner.is_empty() && owner != client_id {
237 return Ok(false);
238 }
239 }
240 conn.execute(
241 "UPDATE agent_sessions SET writer_client_id = ?2, updated_at = ?3 WHERE id = ?1",
242 params![session_id, client_id, now_ts()],
243 )?;
244 conn.execute(
245 "INSERT INTO session_attachments (session_id, client_id, mode, attached_at, detached_at, reason) VALUES (?1, ?2, 'writer', ?3, NULL, NULL)",
246 params![session_id, client_id, now_ts()],
247 )?;
248 Ok(true)
249}
250
251pub fn attach_reader(conn: &Connection, session_id: &str, client_id: &str) -> Result<()> {
253 conn.execute(
254 "INSERT INTO session_attachments (session_id, client_id, mode, attached_at, detached_at, reason) VALUES (?1, ?2, 'reader', ?3, NULL, NULL)",
255 params![session_id, client_id, now_ts()],
256 )?;
257 Ok(())
258}
259
260pub fn cleanup_stale_sessions(conn: &Connection, max_age_hours: u64) -> Result<usize> {
262 let cutoff = chrono::Utc::now() - chrono::Duration::hours(max_age_hours as i64);
263 let deleted = conn.execute(
264 "DELETE FROM agent_sessions WHERE state IN ('exited', 'failed') AND updated_at < ?1",
265 params![cutoff.to_rfc3339()],
266 )?;
267 Ok(deleted)
268}
269
270pub fn release_attachment(
272 conn: &Connection,
273 session_id: &str,
274 client_id: &str,
275 reason: &str,
276) -> Result<()> {
277 conn.execute(
278 "UPDATE session_attachments SET detached_at = ?3, reason = ?4 WHERE session_id = ?1 AND client_id = ?2 AND detached_at IS NULL",
279 params![session_id, client_id, now_ts(), reason],
280 )?;
281 conn.execute(
282 "UPDATE agent_sessions SET writer_client_id = NULL, updated_at = ?2 WHERE id = ?1 AND writer_client_id = ?3",
283 params![session_id, now_ts(), client_id],
284 )?;
285 Ok(())
286}
287
288pub struct OwnedNewSession {
290 pub id: String,
292 pub task_id: String,
294 pub task_item_id: Option<String>,
296 pub step_id: String,
298 pub phase: String,
300 pub agent_id: String,
302 pub state: String,
304 pub pid: i64,
306 pub pty_backend: String,
308 pub cwd: String,
310 pub command: String,
312 pub input_fifo_path: String,
314 pub stdout_path: String,
316 pub stderr_path: String,
318 pub transcript_path: String,
320 pub output_json_path: Option<String>,
322}
323
324impl<'a> From<&NewSession<'a>> for OwnedNewSession {
325 fn from(s: &NewSession<'a>) -> Self {
326 Self {
327 id: s.id.to_owned(),
328 task_id: s.task_id.to_owned(),
329 task_item_id: s.task_item_id.map(|v| v.to_owned()),
330 step_id: s.step_id.to_owned(),
331 phase: s.phase.to_owned(),
332 agent_id: s.agent_id.to_owned(),
333 state: s.state.to_owned(),
334 pid: s.pid,
335 pty_backend: s.pty_backend.to_owned(),
336 cwd: s.cwd.to_owned(),
337 command: s.command.to_owned(),
338 input_fifo_path: s.input_fifo_path.to_owned(),
339 stdout_path: s.stdout_path.to_owned(),
340 stderr_path: s.stderr_path.to_owned(),
341 transcript_path: s.transcript_path.to_owned(),
342 output_json_path: s.output_json_path.map(|v| v.to_owned()),
343 }
344 }
345}
346
347pub struct AsyncSessionStore {
349 repository: Arc<dyn SessionRepository>,
350}
351
352impl AsyncSessionStore {
353 pub fn new(async_db: Arc<AsyncDatabase>) -> Self {
355 Self::with_repository(Arc::new(SqliteSessionRepository::new(async_db)))
356 }
357
358 pub fn with_repository(repository: Arc<dyn SessionRepository>) -> Self {
360 Self { repository }
361 }
362
363 pub async fn insert_session(&self, s: OwnedNewSession) -> Result<()> {
365 self.repository.insert_session(s).await
366 }
367
368 pub async fn update_session_state(
370 &self,
371 session_id: &str,
372 state: &str,
373 exit_code: Option<i64>,
374 ended: bool,
375 ) -> Result<()> {
376 self.repository
377 .update_session_state(session_id, state, exit_code, ended)
378 .await
379 }
380
381 pub async fn update_session_pid(&self, session_id: &str, pid: i64) -> Result<()> {
383 self.repository.update_session_pid(session_id, pid).await
384 }
385
386 pub async fn load_session(&self, session_id: &str) -> Result<Option<SessionRow>> {
388 self.repository.load_session(session_id).await
389 }
390
391 pub async fn load_active_session_for_task_step(
393 &self,
394 task_id: &str,
395 step_id: &str,
396 ) -> Result<Option<SessionRow>> {
397 self.repository
398 .load_active_session_for_task_step(task_id, step_id)
399 .await
400 }
401
402 pub async fn list_task_sessions(&self, task_id: &str) -> Result<Vec<SessionRow>> {
404 self.repository.list_task_sessions(task_id).await
405 }
406
407 pub async fn acquire_writer(&self, session_id: &str, client_id: &str) -> Result<bool> {
409 self.repository.acquire_writer(session_id, client_id).await
410 }
411
412 pub async fn attach_reader(&self, session_id: &str, client_id: &str) -> Result<()> {
414 self.repository.attach_reader(session_id, client_id).await
415 }
416
417 pub async fn cleanup_stale_sessions(&self, max_age_hours: u64) -> Result<usize> {
419 self.repository.cleanup_stale_sessions(max_age_hours).await
420 }
421
422 pub async fn release_attachment(
424 &self,
425 session_id: &str,
426 client_id: &str,
427 reason: &str,
428 ) -> Result<()> {
429 self.repository
430 .release_attachment(session_id, client_id, reason)
431 .await
432 }
433}
434
435#[cfg(test)]
436mod tests {
437 use super::*;
438 use crate::db::{init_schema, open_conn};
439 use tempfile::TempDir;
440
441 fn make_db() -> (TempDir, std::path::PathBuf) {
442 let dir = tempfile::tempdir().expect("create tempdir");
443 let db_path = dir.path().join("sessions.db");
444 init_schema(&db_path).expect("init schema");
445 (dir, db_path)
446 }
447
448 fn make_session<'a>(
449 id: &'a str,
450 task_id: &'a str,
451 step_id: &'a str,
452 state: &'a str,
453 ) -> NewSession<'a> {
454 NewSession {
455 id,
456 task_id,
457 task_item_id: Some("item-1"),
458 step_id,
459 phase: "qa",
460 agent_id: "agent-a",
461 state,
462 pid: 100,
463 pty_backend: "pty",
464 cwd: "/tmp",
465 command: "echo hi",
466 input_fifo_path: "/tmp/in.fifo",
467 stdout_path: "/tmp/stdout.log",
468 stderr_path: "/tmp/stderr.log",
469 transcript_path: "/tmp/transcript.log",
470 output_json_path: Some("/tmp/output.json"),
471 }
472 }
473
474 #[test]
475 fn insert_load_and_update_session_lifecycle() {
476 let (_dir, db_path) = make_db();
477 let conn = open_conn(&db_path).expect("open conn");
478 let session = make_session("sess-1", "task-1", "qa", "active");
479 insert_session(&conn, &session).expect("insert session");
480
481 let inserted = load_session(&conn, "sess-1")
482 .expect("load session")
483 .expect("session should exist");
484 assert_eq!(inserted.task_item_id.as_deref(), Some("item-1"));
485 assert_eq!(
486 inserted.output_json_path.as_deref(),
487 Some("/tmp/output.json")
488 );
489 assert_eq!(inserted.state, "active");
490 assert_eq!(inserted.pid, 100);
491 assert_eq!(inserted.ended_at, None);
492 assert_eq!(inserted.exit_code, None);
493
494 update_session_pid(&conn, "sess-1", 4242).expect("update pid");
495 update_session_state(&conn, "sess-1", "detached", Some(7), false).expect("detach session");
496
497 let detached = load_session(&conn, "sess-1")
498 .expect("reload session")
499 .expect("session should still exist");
500 assert_eq!(detached.pid, 4242);
501 assert_eq!(detached.state, "detached");
502 assert_eq!(detached.exit_code, Some(7));
503 assert_eq!(detached.ended_at, None);
504
505 update_session_state(&conn, "sess-1", "exited", None, true).expect("exit session");
506 let exited = load_session(&conn, "sess-1")
507 .expect("reload exited session")
508 .expect("session should still exist");
509 assert_eq!(exited.state, "exited");
510 assert_eq!(exited.exit_code, Some(7));
511 assert!(exited.ended_at.is_some());
512
513 assert!(
514 load_session(&conn, "missing")
515 .expect("load missing session")
516 .is_none()
517 );
518 }
519
520 #[test]
521 fn active_session_lookup_and_listing_filter_by_task() {
522 let (_dir, db_path) = make_db();
523 let conn = open_conn(&db_path).expect("open conn");
524 insert_session(&conn, &make_session("sess-old", "task-1", "qa", "exited"))
525 .expect("insert exited session");
526 std::thread::sleep(std::time::Duration::from_millis(2));
527 insert_session(
528 &conn,
529 &make_session("sess-active", "task-1", "qa", "active"),
530 )
531 .expect("insert active session");
532 std::thread::sleep(std::time::Duration::from_millis(2));
533 insert_session(
534 &conn,
535 &make_session("sess-detached", "task-1", "qa", "detached"),
536 )
537 .expect("insert detached session");
538 insert_session(&conn, &make_session("sess-other", "task-2", "qa", "active"))
539 .expect("insert other task session");
540
541 let active = load_active_session_for_task_step(&conn, "task-1", "qa")
542 .expect("query active session")
543 .expect("task should have an active session");
544 assert_eq!(active.id, "sess-detached");
545 assert_eq!(active.state, "detached");
546
547 let task_1_sessions = list_task_sessions(&conn, "task-1").expect("list sessions");
548 let task_1_ids: Vec<&str> = task_1_sessions.iter().map(|row| row.id.as_str()).collect();
549 assert_eq!(task_1_ids.len(), 3);
550 assert!(task_1_ids.contains(&"sess-old"));
551 assert!(task_1_ids.contains(&"sess-active"));
552 assert!(task_1_ids.contains(&"sess-detached"));
553
554 assert!(
555 load_active_session_for_task_step(&conn, "task-1", "missing-step")
556 .expect("query missing step")
557 .is_none()
558 );
559 }
560
561 #[test]
562 fn cleanup_stale_sessions_removes_old_exited_keeps_recent() {
563 let (_dir, db_path) = make_db();
564 let conn = open_conn(&db_path).expect("open conn");
565
566 insert_session(&conn, &make_session("old-exited", "task-1", "qa", "exited"))
568 .expect("insert old exited");
569 let old_ts = (chrono::Utc::now() - chrono::Duration::hours(100)).to_rfc3339();
570 conn.execute(
571 "UPDATE agent_sessions SET updated_at = ?2 WHERE id = ?1",
572 params!["old-exited", old_ts],
573 )
574 .expect("backdate old session");
575
576 insert_session(&conn, &make_session("old-active", "task-1", "qa", "active"))
578 .expect("insert old active");
579 conn.execute(
580 "UPDATE agent_sessions SET updated_at = ?2 WHERE id = ?1",
581 params!["old-active", old_ts],
582 )
583 .expect("backdate active session");
584
585 insert_session(&conn, &make_session("new-exited", "task-1", "qa", "exited"))
587 .expect("insert new exited");
588
589 let deleted = cleanup_stale_sessions(&conn, 72).expect("cleanup");
590 assert_eq!(deleted, 1);
591
592 assert!(load_session(&conn, "old-exited").expect("load").is_none());
594 assert!(load_session(&conn, "old-active").expect("load").is_some());
595 assert!(load_session(&conn, "new-exited").expect("load").is_some());
596 }
597
598 #[test]
599 fn writer_and_reader_attachments_round_trip() {
600 let (_dir, db_path) = make_db();
601 let conn = open_conn(&db_path).expect("open conn");
602 insert_session(&conn, &make_session("sess-1", "task-1", "qa", "active"))
603 .expect("insert session");
604
605 assert!(acquire_writer(&conn, "sess-1", "writer-1").expect("acquire initial writer"));
606 assert!(acquire_writer(&conn, "sess-1", "writer-1").expect("re-acquire same writer"));
607 assert!(!acquire_writer(&conn, "sess-1", "writer-2").expect("reject second writer"));
608
609 attach_reader(&conn, "sess-1", "reader-1").expect("attach reader");
610 release_attachment(&conn, "sess-1", "reader-1", "done").expect("detach reader");
611 release_attachment(&conn, "sess-1", "writer-1", "handoff").expect("detach writer");
612
613 let session = load_session(&conn, "sess-1")
614 .expect("reload session")
615 .expect("session should exist");
616 assert_eq!(session.writer_client_id, None);
617
618 let writer_attachments: i64 = conn
619 .query_row(
620 "SELECT COUNT(*) FROM session_attachments WHERE session_id = ?1 AND mode = 'writer'",
621 params!["sess-1"],
622 |row| row.get(0),
623 )
624 .expect("count writer attachments");
625 let detached_attachments: i64 = conn
626 .query_row(
627 "SELECT COUNT(*) FROM session_attachments WHERE session_id = ?1 AND detached_at IS NOT NULL",
628 params!["sess-1"],
629 |row| row.get(0),
630 )
631 .expect("count detached attachments");
632
633 assert_eq!(writer_attachments, 2);
634 assert_eq!(detached_attachments, 3);
635 }
636
637 #[tokio::test]
638 async fn async_session_store_exercises_all_wrapper_methods() {
639 let (_dir, db_path) = make_db();
640 let async_db = Arc::new(AsyncDatabase::open(&db_path).await.expect("open async db"));
641 let store = AsyncSessionStore::new(async_db);
642
643 let session = make_session("sess-async", "task-1", "qa", "active");
644 store
645 .insert_session(OwnedNewSession::from(&session))
646 .await
647 .expect("insert session");
648
649 let loaded = store
650 .load_session("sess-async")
651 .await
652 .expect("load session")
653 .expect("session exists");
654 assert_eq!(loaded.id, "sess-async");
655 assert_eq!(loaded.state, "active");
656
657 let active = store
658 .load_active_session_for_task_step("task-1", "qa")
659 .await
660 .expect("load active session")
661 .expect("active session exists");
662 assert_eq!(active.id, "sess-async");
663
664 let listed = store
665 .list_task_sessions("task-1")
666 .await
667 .expect("list sessions");
668 assert_eq!(listed.len(), 1);
669
670 assert!(
671 store
672 .acquire_writer("sess-async", "writer-1")
673 .await
674 .expect("acquire writer")
675 );
676 assert!(
677 !store
678 .acquire_writer("sess-async", "writer-2")
679 .await
680 .expect("reject second writer")
681 );
682
683 store
684 .attach_reader("sess-async", "reader-1")
685 .await
686 .expect("attach reader");
687 store
688 .update_session_pid("sess-async", 5150)
689 .await
690 .expect("update pid");
691 store
692 .update_session_state("sess-async", "failed", Some(9), true)
693 .await
694 .expect("update session state");
695 store
696 .release_attachment("sess-async", "reader-1", "done")
697 .await
698 .expect("release reader");
699 store
700 .release_attachment("sess-async", "writer-1", "done")
701 .await
702 .expect("release writer");
703
704 let exited = store
705 .load_session("sess-async")
706 .await
707 .expect("reload exited session")
708 .expect("session still exists");
709 assert_eq!(exited.pid, 5150);
710 assert_eq!(exited.state, "failed");
711 assert_eq!(exited.exit_code, Some(9));
712 assert!(exited.ended_at.is_some());
713 assert!(exited.writer_client_id.is_none());
714
715 let conn = open_conn(&db_path).expect("open sync conn");
716 let old_ts = (chrono::Utc::now() - chrono::Duration::hours(100)).to_rfc3339();
717 conn.execute(
718 "UPDATE agent_sessions SET updated_at = ?2 WHERE id = ?1",
719 params!["sess-async", old_ts],
720 )
721 .expect("backdate session");
722
723 let deleted = store
724 .cleanup_stale_sessions(72)
725 .await
726 .expect("cleanup stale sessions");
727 assert_eq!(deleted, 1);
728 assert!(
729 store
730 .load_session("sess-async")
731 .await
732 .expect("load deleted session")
733 .is_none()
734 );
735 assert!(
736 store
737 .load_session("missing")
738 .await
739 .expect("load missing session")
740 .is_none()
741 );
742 }
743}