Skip to main content

agent_orchestrator/
session_store.rs

1use crate::async_database::AsyncDatabase;
2use crate::config_load::now_ts;
3use crate::persistence::repository::{SessionRepository, SqliteSessionRepository};
4use anyhow::{Context, Result};
5use rusqlite::{params, Connection, OptionalExtension};
6use serde::{Deserialize, Serialize};
7use std::sync::Arc;
8
9/// Persisted interactive session row.
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct SessionRow {
12    /// Session identifier.
13    pub id: String,
14    /// Parent task identifier.
15    pub task_id: String,
16    /// Optional task-item identifier.
17    pub task_item_id: Option<String>,
18    /// Step identifier associated with the session.
19    pub step_id: String,
20    /// Phase name associated with the session.
21    pub phase: String,
22    /// Agent identifier that owns the session.
23    pub agent_id: String,
24    /// Session state string.
25    pub state: String,
26    /// PTY child PID.
27    pub pid: i64,
28    /// PTY backend identifier.
29    pub pty_backend: String,
30    /// Working directory for the child process.
31    pub cwd: String,
32    /// Rendered command line.
33    pub command: String,
34    /// FIFO path used for input streaming.
35    pub input_fifo_path: String,
36    /// Captured stdout path.
37    pub stdout_path: String,
38    /// Captured stderr path.
39    pub stderr_path: String,
40    /// Transcript file path.
41    pub transcript_path: String,
42    /// Optional structured output JSON spill path.
43    pub output_json_path: Option<String>,
44    /// Client currently holding the writer lease.
45    pub writer_client_id: Option<String>,
46    /// Creation timestamp.
47    pub created_at: String,
48    /// Last update timestamp.
49    pub updated_at: String,
50    /// Optional end timestamp.
51    pub ended_at: Option<String>,
52    /// Optional process exit code.
53    pub exit_code: Option<i64>,
54}
55
56/// Borrowed insert payload for a new interactive session.
57pub struct NewSession<'a> {
58    /// Session identifier.
59    pub id: &'a str,
60    /// Parent task identifier.
61    pub task_id: &'a str,
62    /// Optional task-item identifier.
63    pub task_item_id: Option<&'a str>,
64    /// Step identifier associated with the session.
65    pub step_id: &'a str,
66    /// Phase name associated with the session.
67    pub phase: &'a str,
68    /// Agent identifier that owns the session.
69    pub agent_id: &'a str,
70    /// Initial session state.
71    pub state: &'a str,
72    /// PTY child PID.
73    pub pid: i64,
74    /// PTY backend identifier.
75    pub pty_backend: &'a str,
76    /// Working directory for the child process.
77    pub cwd: &'a str,
78    /// Rendered command line.
79    pub command: &'a str,
80    /// FIFO path used for input streaming.
81    pub input_fifo_path: &'a str,
82    /// Captured stdout path.
83    pub stdout_path: &'a str,
84    /// Captured stderr path.
85    pub stderr_path: &'a str,
86    /// Transcript file path.
87    pub transcript_path: &'a str,
88    /// Optional structured output JSON spill path.
89    pub output_json_path: Option<&'a str>,
90}
91
92/// Inserts a new interactive session row.
93pub fn insert_session(conn: &Connection, s: &NewSession<'_>) -> Result<()> {
94    let now = now_ts();
95    conn.execute(
96        "INSERT INTO agent_sessions (id, task_id, task_item_id, step_id, phase, agent_id, state, pid, pty_backend, cwd, command, input_fifo_path, stdout_path, stderr_path, transcript_path, output_json_path, writer_client_id, created_at, updated_at, ended_at, exit_code) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, NULL, ?17, ?17, NULL, NULL)",
97        params![
98            s.id,
99            s.task_id,
100            s.task_item_id,
101            s.step_id,
102            s.phase,
103            s.agent_id,
104            s.state,
105            s.pid,
106            s.pty_backend,
107            s.cwd,
108            s.command,
109            s.input_fifo_path,
110            s.stdout_path,
111            s.stderr_path,
112            s.transcript_path,
113            s.output_json_path,
114            now
115        ],
116    )?;
117    Ok(())
118}
119
120/// Updates session state, exit code, and optional end time.
121pub fn update_session_state(
122    conn: &Connection,
123    session_id: &str,
124    state: &str,
125    exit_code: Option<i64>,
126    ended: bool,
127) -> Result<()> {
128    let now = now_ts();
129    let ended_at = if ended { Some(now.clone()) } else { None };
130    conn.execute(
131        "UPDATE agent_sessions SET state = ?2, updated_at = ?3, ended_at = COALESCE(?4, ended_at), exit_code = COALESCE(?5, exit_code) WHERE id = ?1",
132        params![session_id, state, now, ended_at, exit_code],
133    )?;
134    Ok(())
135}
136
137/// Updates the PID associated with an existing session.
138pub fn update_session_pid(conn: &Connection, session_id: &str, pid: i64) -> Result<()> {
139    conn.execute(
140        "UPDATE agent_sessions SET pid = ?2, updated_at = ?3 WHERE id = ?1",
141        params![session_id, pid, now_ts()],
142    )?;
143    Ok(())
144}
145
146fn row_to_session(r: &rusqlite::Row<'_>) -> rusqlite::Result<SessionRow> {
147    Ok(SessionRow {
148        id: r.get(0)?,
149        task_id: r.get(1)?,
150        task_item_id: r.get(2)?,
151        step_id: r.get(3)?,
152        phase: r.get(4)?,
153        agent_id: r.get(5)?,
154        state: r.get(6)?,
155        pid: r.get(7)?,
156        pty_backend: r.get(8)?,
157        cwd: r.get(9)?,
158        command: r.get(10)?,
159        input_fifo_path: r.get(11)?,
160        stdout_path: r.get(12)?,
161        stderr_path: r.get(13)?,
162        transcript_path: r.get(14)?,
163        output_json_path: r.get(15)?,
164        writer_client_id: r.get(16)?,
165        created_at: r.get(17)?,
166        updated_at: r.get(18)?,
167        ended_at: r.get(19)?,
168        exit_code: r.get(20)?,
169    })
170}
171
172const SESSION_COLUMNS: &str = "id, task_id, task_item_id, step_id, phase, agent_id, state, pid, pty_backend, cwd, command, input_fifo_path, stdout_path, stderr_path, transcript_path, output_json_path, writer_client_id, created_at, updated_at, ended_at, exit_code";
173
174/// Loads a session row by session identifier.
175pub fn load_session(conn: &Connection, session_id: &str) -> Result<Option<SessionRow>> {
176    conn.query_row(
177        &format!(
178            "SELECT {} FROM agent_sessions WHERE id = ?1",
179            SESSION_COLUMNS
180        ),
181        params![session_id],
182        row_to_session,
183    )
184    .optional()
185    .context("load session")
186}
187
188/// Loads the latest active or detached session for a task step.
189pub fn load_active_session_for_task_step(
190    conn: &Connection,
191    task_id: &str,
192    step_id: &str,
193) -> Result<Option<SessionRow>> {
194    conn.query_row(
195        &format!(
196            "SELECT {}
197             FROM agent_sessions
198             WHERE task_id = ?1 AND step_id = ?2 AND state IN ('active','detached')
199             ORDER BY created_at DESC
200             LIMIT 1",
201            SESSION_COLUMNS
202        ),
203        params![task_id, step_id],
204        row_to_session,
205    )
206    .optional()
207    .context("load active session for task step")
208}
209
210/// Lists all sessions for a task ordered from newest to oldest.
211pub fn list_task_sessions(conn: &Connection, task_id: &str) -> Result<Vec<SessionRow>> {
212    let mut stmt = conn.prepare(&format!(
213        "SELECT {}
214             FROM agent_sessions
215             WHERE task_id = ?1
216             ORDER BY created_at DESC",
217        SESSION_COLUMNS
218    ))?;
219    let rows = stmt
220        .query_map(params![task_id], row_to_session)?
221        .collect::<std::result::Result<Vec<_>, _>>()?;
222    Ok(rows)
223}
224
225/// Attempts to acquire the writer lease for a session.
226pub fn acquire_writer(conn: &Connection, session_id: &str, client_id: &str) -> Result<bool> {
227    let existing: Option<String> = conn
228        .query_row(
229            "SELECT writer_client_id FROM agent_sessions WHERE id = ?1",
230            params![session_id],
231            |r| r.get::<_, Option<String>>(0),
232        )
233        .optional()?
234        .flatten();
235    if let Some(owner) = existing {
236        if !owner.is_empty() && owner != client_id {
237            return Ok(false);
238        }
239    }
240    conn.execute(
241        "UPDATE agent_sessions SET writer_client_id = ?2, updated_at = ?3 WHERE id = ?1",
242        params![session_id, client_id, now_ts()],
243    )?;
244    conn.execute(
245        "INSERT INTO session_attachments (session_id, client_id, mode, attached_at, detached_at, reason) VALUES (?1, ?2, 'writer', ?3, NULL, NULL)",
246        params![session_id, client_id, now_ts()],
247    )?;
248    Ok(true)
249}
250
251/// Attaches a read-only client to a session.
252pub fn attach_reader(conn: &Connection, session_id: &str, client_id: &str) -> Result<()> {
253    conn.execute(
254        "INSERT INTO session_attachments (session_id, client_id, mode, attached_at, detached_at, reason) VALUES (?1, ?2, 'reader', ?3, NULL, NULL)",
255        params![session_id, client_id, now_ts()],
256    )?;
257    Ok(())
258}
259
260/// Deletes old terminal sessions and returns the number removed.
261pub fn cleanup_stale_sessions(conn: &Connection, max_age_hours: u64) -> Result<usize> {
262    let cutoff = chrono::Utc::now() - chrono::Duration::hours(max_age_hours as i64);
263    let deleted = conn.execute(
264        "DELETE FROM agent_sessions WHERE state IN ('exited', 'failed') AND updated_at < ?1",
265        params![cutoff.to_rfc3339()],
266    )?;
267    Ok(deleted)
268}
269
270/// Releases a reader or writer attachment for a client.
271pub fn release_attachment(
272    conn: &Connection,
273    session_id: &str,
274    client_id: &str,
275    reason: &str,
276) -> Result<()> {
277    conn.execute(
278        "UPDATE session_attachments SET detached_at = ?3, reason = ?4 WHERE session_id = ?1 AND client_id = ?2 AND detached_at IS NULL",
279        params![session_id, client_id, now_ts(), reason],
280    )?;
281    conn.execute(
282        "UPDATE agent_sessions SET writer_client_id = NULL, updated_at = ?2 WHERE id = ?1 AND writer_client_id = ?3",
283        params![session_id, now_ts(), client_id],
284    )?;
285    Ok(())
286}
287
288/// Owned version of `NewSession` for async closures (`'static + Send`).
289pub struct OwnedNewSession {
290    /// Session identifier.
291    pub id: String,
292    /// Parent task identifier.
293    pub task_id: String,
294    /// Optional task-item identifier.
295    pub task_item_id: Option<String>,
296    /// Step identifier associated with the session.
297    pub step_id: String,
298    /// Phase name associated with the session.
299    pub phase: String,
300    /// Agent identifier that owns the session.
301    pub agent_id: String,
302    /// Initial session state.
303    pub state: String,
304    /// PTY child PID.
305    pub pid: i64,
306    /// PTY backend identifier.
307    pub pty_backend: String,
308    /// Working directory for the child process.
309    pub cwd: String,
310    /// Rendered command line.
311    pub command: String,
312    /// FIFO path used for input streaming.
313    pub input_fifo_path: String,
314    /// Captured stdout path.
315    pub stdout_path: String,
316    /// Captured stderr path.
317    pub stderr_path: String,
318    /// Transcript file path.
319    pub transcript_path: String,
320    /// Optional structured output JSON spill path.
321    pub output_json_path: Option<String>,
322}
323
324impl<'a> From<&NewSession<'a>> for OwnedNewSession {
325    fn from(s: &NewSession<'a>) -> Self {
326        Self {
327            id: s.id.to_owned(),
328            task_id: s.task_id.to_owned(),
329            task_item_id: s.task_item_id.map(|v| v.to_owned()),
330            step_id: s.step_id.to_owned(),
331            phase: s.phase.to_owned(),
332            agent_id: s.agent_id.to_owned(),
333            state: s.state.to_owned(),
334            pid: s.pid,
335            pty_backend: s.pty_backend.to_owned(),
336            cwd: s.cwd.to_owned(),
337            command: s.command.to_owned(),
338            input_fifo_path: s.input_fifo_path.to_owned(),
339            stdout_path: s.stdout_path.to_owned(),
340            stderr_path: s.stderr_path.to_owned(),
341            transcript_path: s.transcript_path.to_owned(),
342            output_json_path: s.output_json_path.map(|v| v.to_owned()),
343        }
344    }
345}
346
347/// Async facade around a [`SessionRepository`] implementation.
348pub struct AsyncSessionStore {
349    repository: Arc<dyn SessionRepository>,
350}
351
352impl AsyncSessionStore {
353    /// Creates a SQLite-backed async session store.
354    pub fn new(async_db: Arc<AsyncDatabase>) -> Self {
355        Self::with_repository(Arc::new(SqliteSessionRepository::new(async_db)))
356    }
357
358    /// Creates an async session store from a repository implementation.
359    pub fn with_repository(repository: Arc<dyn SessionRepository>) -> Self {
360        Self { repository }
361    }
362
363    /// Inserts a new session row.
364    pub async fn insert_session(&self, s: OwnedNewSession) -> Result<()> {
365        self.repository.insert_session(s).await
366    }
367
368    /// Updates session state, exit code, and optional end time.
369    pub async fn update_session_state(
370        &self,
371        session_id: &str,
372        state: &str,
373        exit_code: Option<i64>,
374        ended: bool,
375    ) -> Result<()> {
376        self.repository
377            .update_session_state(session_id, state, exit_code, ended)
378            .await
379    }
380
381    /// Updates the PID associated with a session.
382    pub async fn update_session_pid(&self, session_id: &str, pid: i64) -> Result<()> {
383        self.repository.update_session_pid(session_id, pid).await
384    }
385
386    /// Loads a session row by identifier.
387    pub async fn load_session(&self, session_id: &str) -> Result<Option<SessionRow>> {
388        self.repository.load_session(session_id).await
389    }
390
391    /// Loads the latest active or detached session for a task step.
392    pub async fn load_active_session_for_task_step(
393        &self,
394        task_id: &str,
395        step_id: &str,
396    ) -> Result<Option<SessionRow>> {
397        self.repository
398            .load_active_session_for_task_step(task_id, step_id)
399            .await
400    }
401
402    /// Lists all sessions for a task.
403    pub async fn list_task_sessions(&self, task_id: &str) -> Result<Vec<SessionRow>> {
404        self.repository.list_task_sessions(task_id).await
405    }
406
407    /// Attempts to acquire the writer lease for a session.
408    pub async fn acquire_writer(&self, session_id: &str, client_id: &str) -> Result<bool> {
409        self.repository.acquire_writer(session_id, client_id).await
410    }
411
412    /// Attaches a read-only client to a session.
413    pub async fn attach_reader(&self, session_id: &str, client_id: &str) -> Result<()> {
414        self.repository.attach_reader(session_id, client_id).await
415    }
416
417    /// Deletes stale terminal sessions and returns the number removed.
418    pub async fn cleanup_stale_sessions(&self, max_age_hours: u64) -> Result<usize> {
419        self.repository.cleanup_stale_sessions(max_age_hours).await
420    }
421
422    /// Releases a reader or writer attachment for a client.
423    pub async fn release_attachment(
424        &self,
425        session_id: &str,
426        client_id: &str,
427        reason: &str,
428    ) -> Result<()> {
429        self.repository
430            .release_attachment(session_id, client_id, reason)
431            .await
432    }
433}
434
435#[cfg(test)]
436mod tests {
437    use super::*;
438    use crate::db::{init_schema, open_conn};
439    use tempfile::TempDir;
440
441    fn make_db() -> (TempDir, std::path::PathBuf) {
442        let dir = tempfile::tempdir().expect("create tempdir");
443        let db_path = dir.path().join("sessions.db");
444        init_schema(&db_path).expect("init schema");
445        (dir, db_path)
446    }
447
448    fn make_session<'a>(
449        id: &'a str,
450        task_id: &'a str,
451        step_id: &'a str,
452        state: &'a str,
453    ) -> NewSession<'a> {
454        NewSession {
455            id,
456            task_id,
457            task_item_id: Some("item-1"),
458            step_id,
459            phase: "qa",
460            agent_id: "agent-a",
461            state,
462            pid: 100,
463            pty_backend: "pty",
464            cwd: "/tmp",
465            command: "echo hi",
466            input_fifo_path: "/tmp/in.fifo",
467            stdout_path: "/tmp/stdout.log",
468            stderr_path: "/tmp/stderr.log",
469            transcript_path: "/tmp/transcript.log",
470            output_json_path: Some("/tmp/output.json"),
471        }
472    }
473
474    #[test]
475    fn insert_load_and_update_session_lifecycle() {
476        let (_dir, db_path) = make_db();
477        let conn = open_conn(&db_path).expect("open conn");
478        let session = make_session("sess-1", "task-1", "qa", "active");
479        insert_session(&conn, &session).expect("insert session");
480
481        let inserted = load_session(&conn, "sess-1")
482            .expect("load session")
483            .expect("session should exist");
484        assert_eq!(inserted.task_item_id.as_deref(), Some("item-1"));
485        assert_eq!(
486            inserted.output_json_path.as_deref(),
487            Some("/tmp/output.json")
488        );
489        assert_eq!(inserted.state, "active");
490        assert_eq!(inserted.pid, 100);
491        assert_eq!(inserted.ended_at, None);
492        assert_eq!(inserted.exit_code, None);
493
494        update_session_pid(&conn, "sess-1", 4242).expect("update pid");
495        update_session_state(&conn, "sess-1", "detached", Some(7), false).expect("detach session");
496
497        let detached = load_session(&conn, "sess-1")
498            .expect("reload session")
499            .expect("session should still exist");
500        assert_eq!(detached.pid, 4242);
501        assert_eq!(detached.state, "detached");
502        assert_eq!(detached.exit_code, Some(7));
503        assert_eq!(detached.ended_at, None);
504
505        update_session_state(&conn, "sess-1", "exited", None, true).expect("exit session");
506        let exited = load_session(&conn, "sess-1")
507            .expect("reload exited session")
508            .expect("session should still exist");
509        assert_eq!(exited.state, "exited");
510        assert_eq!(exited.exit_code, Some(7));
511        assert!(exited.ended_at.is_some());
512
513        assert!(load_session(&conn, "missing")
514            .expect("load missing session")
515            .is_none());
516    }
517
518    #[test]
519    fn active_session_lookup_and_listing_filter_by_task() {
520        let (_dir, db_path) = make_db();
521        let conn = open_conn(&db_path).expect("open conn");
522        insert_session(&conn, &make_session("sess-old", "task-1", "qa", "exited"))
523            .expect("insert exited session");
524        std::thread::sleep(std::time::Duration::from_millis(2));
525        insert_session(
526            &conn,
527            &make_session("sess-active", "task-1", "qa", "active"),
528        )
529        .expect("insert active session");
530        std::thread::sleep(std::time::Duration::from_millis(2));
531        insert_session(
532            &conn,
533            &make_session("sess-detached", "task-1", "qa", "detached"),
534        )
535        .expect("insert detached session");
536        insert_session(&conn, &make_session("sess-other", "task-2", "qa", "active"))
537            .expect("insert other task session");
538
539        let active = load_active_session_for_task_step(&conn, "task-1", "qa")
540            .expect("query active session")
541            .expect("task should have an active session");
542        assert_eq!(active.id, "sess-detached");
543        assert_eq!(active.state, "detached");
544
545        let task_1_sessions = list_task_sessions(&conn, "task-1").expect("list sessions");
546        let task_1_ids: Vec<&str> = task_1_sessions.iter().map(|row| row.id.as_str()).collect();
547        assert_eq!(task_1_ids.len(), 3);
548        assert!(task_1_ids.contains(&"sess-old"));
549        assert!(task_1_ids.contains(&"sess-active"));
550        assert!(task_1_ids.contains(&"sess-detached"));
551
552        assert!(
553            load_active_session_for_task_step(&conn, "task-1", "missing-step")
554                .expect("query missing step")
555                .is_none()
556        );
557    }
558
559    #[test]
560    fn cleanup_stale_sessions_removes_old_exited_keeps_recent() {
561        let (_dir, db_path) = make_db();
562        let conn = open_conn(&db_path).expect("open conn");
563
564        // Insert an "exited" session and manually backdate updated_at
565        insert_session(&conn, &make_session("old-exited", "task-1", "qa", "exited"))
566            .expect("insert old exited");
567        let old_ts = (chrono::Utc::now() - chrono::Duration::hours(100)).to_rfc3339();
568        conn.execute(
569            "UPDATE agent_sessions SET updated_at = ?2 WHERE id = ?1",
570            params!["old-exited", old_ts],
571        )
572        .expect("backdate old session");
573
574        // Insert an "active" session that is also old — should NOT be deleted
575        insert_session(&conn, &make_session("old-active", "task-1", "qa", "active"))
576            .expect("insert old active");
577        conn.execute(
578            "UPDATE agent_sessions SET updated_at = ?2 WHERE id = ?1",
579            params!["old-active", old_ts],
580        )
581        .expect("backdate active session");
582
583        // Insert a recent "exited" session — should NOT be deleted
584        insert_session(&conn, &make_session("new-exited", "task-1", "qa", "exited"))
585            .expect("insert new exited");
586
587        let deleted = cleanup_stale_sessions(&conn, 72).expect("cleanup");
588        assert_eq!(deleted, 1);
589
590        // Verify correct session was deleted
591        assert!(load_session(&conn, "old-exited").expect("load").is_none());
592        assert!(load_session(&conn, "old-active").expect("load").is_some());
593        assert!(load_session(&conn, "new-exited").expect("load").is_some());
594    }
595
596    #[test]
597    fn writer_and_reader_attachments_round_trip() {
598        let (_dir, db_path) = make_db();
599        let conn = open_conn(&db_path).expect("open conn");
600        insert_session(&conn, &make_session("sess-1", "task-1", "qa", "active"))
601            .expect("insert session");
602
603        assert!(acquire_writer(&conn, "sess-1", "writer-1").expect("acquire initial writer"));
604        assert!(acquire_writer(&conn, "sess-1", "writer-1").expect("re-acquire same writer"));
605        assert!(!acquire_writer(&conn, "sess-1", "writer-2").expect("reject second writer"));
606
607        attach_reader(&conn, "sess-1", "reader-1").expect("attach reader");
608        release_attachment(&conn, "sess-1", "reader-1", "done").expect("detach reader");
609        release_attachment(&conn, "sess-1", "writer-1", "handoff").expect("detach writer");
610
611        let session = load_session(&conn, "sess-1")
612            .expect("reload session")
613            .expect("session should exist");
614        assert_eq!(session.writer_client_id, None);
615
616        let writer_attachments: i64 = conn
617            .query_row(
618                "SELECT COUNT(*) FROM session_attachments WHERE session_id = ?1 AND mode = 'writer'",
619                params!["sess-1"],
620                |row| row.get(0),
621            )
622            .expect("count writer attachments");
623        let detached_attachments: i64 = conn
624            .query_row(
625                "SELECT COUNT(*) FROM session_attachments WHERE session_id = ?1 AND detached_at IS NOT NULL",
626                params!["sess-1"],
627                |row| row.get(0),
628            )
629            .expect("count detached attachments");
630
631        assert_eq!(writer_attachments, 2);
632        assert_eq!(detached_attachments, 3);
633    }
634
635    #[tokio::test]
636    async fn async_session_store_exercises_all_wrapper_methods() {
637        let (_dir, db_path) = make_db();
638        let async_db = Arc::new(AsyncDatabase::open(&db_path).await.expect("open async db"));
639        let store = AsyncSessionStore::new(async_db);
640
641        let session = make_session("sess-async", "task-1", "qa", "active");
642        store
643            .insert_session(OwnedNewSession::from(&session))
644            .await
645            .expect("insert session");
646
647        let loaded = store
648            .load_session("sess-async")
649            .await
650            .expect("load session")
651            .expect("session exists");
652        assert_eq!(loaded.id, "sess-async");
653        assert_eq!(loaded.state, "active");
654
655        let active = store
656            .load_active_session_for_task_step("task-1", "qa")
657            .await
658            .expect("load active session")
659            .expect("active session exists");
660        assert_eq!(active.id, "sess-async");
661
662        let listed = store
663            .list_task_sessions("task-1")
664            .await
665            .expect("list sessions");
666        assert_eq!(listed.len(), 1);
667
668        assert!(store
669            .acquire_writer("sess-async", "writer-1")
670            .await
671            .expect("acquire writer"));
672        assert!(!store
673            .acquire_writer("sess-async", "writer-2")
674            .await
675            .expect("reject second writer"));
676
677        store
678            .attach_reader("sess-async", "reader-1")
679            .await
680            .expect("attach reader");
681        store
682            .update_session_pid("sess-async", 5150)
683            .await
684            .expect("update pid");
685        store
686            .update_session_state("sess-async", "failed", Some(9), true)
687            .await
688            .expect("update session state");
689        store
690            .release_attachment("sess-async", "reader-1", "done")
691            .await
692            .expect("release reader");
693        store
694            .release_attachment("sess-async", "writer-1", "done")
695            .await
696            .expect("release writer");
697
698        let exited = store
699            .load_session("sess-async")
700            .await
701            .expect("reload exited session")
702            .expect("session still exists");
703        assert_eq!(exited.pid, 5150);
704        assert_eq!(exited.state, "failed");
705        assert_eq!(exited.exit_code, Some(9));
706        assert!(exited.ended_at.is_some());
707        assert!(exited.writer_client_id.is_none());
708
709        let conn = open_conn(&db_path).expect("open sync conn");
710        let old_ts = (chrono::Utc::now() - chrono::Duration::hours(100)).to_rfc3339();
711        conn.execute(
712            "UPDATE agent_sessions SET updated_at = ?2 WHERE id = ?1",
713            params!["sess-async", old_ts],
714        )
715        .expect("backdate session");
716
717        let deleted = store
718            .cleanup_stale_sessions(72)
719            .await
720            .expect("cleanup stale sessions");
721        assert_eq!(deleted, 1);
722        assert!(store
723            .load_session("sess-async")
724            .await
725            .expect("load deleted session")
726            .is_none());
727        assert!(store
728            .load_session("missing")
729            .await
730            .expect("load missing session")
731            .is_none());
732    }
733}