Skip to main content

agent_orchestrator/persistence/repository/
session.rs

1use crate::async_database::{AsyncDatabase, flatten_err};
2use crate::session_store::{self, OwnedNewSession, SessionRow};
3use anyhow::Result;
4use async_trait::async_trait;
5use std::sync::Arc;
6
7#[async_trait]
8/// Async persistence interface for PTY-backed session lifecycle records.
9pub trait SessionRepository: Send + Sync {
10    /// Inserts a newly created session record.
11    async fn insert_session(&self, session: OwnedNewSession) -> Result<()>;
12    /// Updates the session state and optionally stores exit information.
13    async fn update_session_state(
14        &self,
15        session_id: &str,
16        state: &str,
17        exit_code: Option<i64>,
18        ended: bool,
19    ) -> Result<()>;
20    /// Updates the OS process identifier associated with a session.
21    async fn update_session_pid(&self, session_id: &str, pid: i64) -> Result<()>;
22    /// Loads one session by identifier.
23    async fn load_session(&self, session_id: &str) -> Result<Option<SessionRow>>;
24    /// Loads the active session for a task step, if one is attached.
25    async fn load_active_session_for_task_step(
26        &self,
27        task_id: &str,
28        step_id: &str,
29    ) -> Result<Option<SessionRow>>;
30    /// Lists all sessions associated with a task.
31    async fn list_task_sessions(&self, task_id: &str) -> Result<Vec<SessionRow>>;
32    /// Attempts to acquire exclusive writer attachment for a client.
33    async fn acquire_writer(&self, session_id: &str, client_id: &str) -> Result<bool>;
34    /// Attaches a read-only client to a session.
35    async fn attach_reader(&self, session_id: &str, client_id: &str) -> Result<()>;
36    /// Cleans up sessions considered stale according to the given age threshold.
37    async fn cleanup_stale_sessions(&self, max_age_hours: u64) -> Result<usize>;
38    /// Releases a writer or reader attachment from a session.
39    async fn release_attachment(
40        &self,
41        session_id: &str,
42        client_id: &str,
43        reason: &str,
44    ) -> Result<()>;
45}
46
47/// SQLite-backed session repository implementation.
48pub struct SqliteSessionRepository {
49    async_db: Arc<AsyncDatabase>,
50}
51
52impl SqliteSessionRepository {
53    /// Creates a repository backed by the provided async database handle.
54    pub fn new(async_db: Arc<AsyncDatabase>) -> Self {
55        Self { async_db }
56    }
57}
58
59#[async_trait]
60impl SessionRepository for SqliteSessionRepository {
61    async fn insert_session(&self, session: OwnedNewSession) -> Result<()> {
62        self.async_db
63            .writer()
64            .call(move |conn| {
65                let session = session_store::NewSession {
66                    id: &session.id,
67                    task_id: &session.task_id,
68                    task_item_id: session.task_item_id.as_deref(),
69                    step_id: &session.step_id,
70                    phase: &session.phase,
71                    agent_id: &session.agent_id,
72                    state: &session.state,
73                    pid: session.pid,
74                    pty_backend: &session.pty_backend,
75                    cwd: &session.cwd,
76                    command: &session.command,
77                    input_fifo_path: &session.input_fifo_path,
78                    stdout_path: &session.stdout_path,
79                    stderr_path: &session.stderr_path,
80                    transcript_path: &session.transcript_path,
81                    output_json_path: session.output_json_path.as_deref(),
82                };
83                session_store::insert_session(conn, &session)
84                    .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
85            })
86            .await
87            .map_err(flatten_err)
88    }
89
90    async fn update_session_state(
91        &self,
92        session_id: &str,
93        state: &str,
94        exit_code: Option<i64>,
95        ended: bool,
96    ) -> Result<()> {
97        let session_id = session_id.to_owned();
98        let state = state.to_owned();
99        self.async_db
100            .writer()
101            .call(move |conn| {
102                session_store::update_session_state(conn, &session_id, &state, exit_code, ended)
103                    .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
104            })
105            .await
106            .map_err(flatten_err)
107    }
108
109    async fn update_session_pid(&self, session_id: &str, pid: i64) -> Result<()> {
110        let session_id = session_id.to_owned();
111        self.async_db
112            .writer()
113            .call(move |conn| {
114                session_store::update_session_pid(conn, &session_id, pid)
115                    .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
116            })
117            .await
118            .map_err(flatten_err)
119    }
120
121    async fn load_session(&self, session_id: &str) -> Result<Option<SessionRow>> {
122        let session_id = session_id.to_owned();
123        self.async_db
124            .reader()
125            .call(move |conn| {
126                session_store::load_session(conn, &session_id)
127                    .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
128            })
129            .await
130            .map_err(flatten_err)
131    }
132
133    async fn load_active_session_for_task_step(
134        &self,
135        task_id: &str,
136        step_id: &str,
137    ) -> Result<Option<SessionRow>> {
138        let task_id = task_id.to_owned();
139        let step_id = step_id.to_owned();
140        self.async_db
141            .reader()
142            .call(move |conn| {
143                session_store::load_active_session_for_task_step(conn, &task_id, &step_id)
144                    .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
145            })
146            .await
147            .map_err(flatten_err)
148    }
149
150    async fn list_task_sessions(&self, task_id: &str) -> Result<Vec<SessionRow>> {
151        let task_id = task_id.to_owned();
152        self.async_db
153            .reader()
154            .call(move |conn| {
155                session_store::list_task_sessions(conn, &task_id)
156                    .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
157            })
158            .await
159            .map_err(flatten_err)
160    }
161
162    async fn acquire_writer(&self, session_id: &str, client_id: &str) -> Result<bool> {
163        let session_id = session_id.to_owned();
164        let client_id = client_id.to_owned();
165        self.async_db
166            .writer()
167            .call(move |conn| {
168                session_store::acquire_writer(conn, &session_id, &client_id)
169                    .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
170            })
171            .await
172            .map_err(flatten_err)
173    }
174
175    async fn attach_reader(&self, session_id: &str, client_id: &str) -> Result<()> {
176        let session_id = session_id.to_owned();
177        let client_id = client_id.to_owned();
178        self.async_db
179            .writer()
180            .call(move |conn| {
181                session_store::attach_reader(conn, &session_id, &client_id)
182                    .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
183            })
184            .await
185            .map_err(flatten_err)
186    }
187
188    async fn cleanup_stale_sessions(&self, max_age_hours: u64) -> Result<usize> {
189        self.async_db
190            .writer()
191            .call(move |conn| {
192                session_store::cleanup_stale_sessions(conn, max_age_hours)
193                    .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
194            })
195            .await
196            .map_err(flatten_err)
197    }
198
199    async fn release_attachment(
200        &self,
201        session_id: &str,
202        client_id: &str,
203        reason: &str,
204    ) -> Result<()> {
205        let session_id = session_id.to_owned();
206        let client_id = client_id.to_owned();
207        let reason = reason.to_owned();
208        self.async_db
209            .writer()
210            .call(move |conn| {
211                session_store::release_attachment(conn, &session_id, &client_id, &reason)
212                    .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
213            })
214            .await
215            .map_err(flatten_err)
216    }
217}