agent_orchestrator/persistence/repository/
session.rs1use crate::async_database::{AsyncDatabase, flatten_err};
2use crate::session_store::{self, OwnedNewSession, SessionRow};
3use anyhow::Result;
4use async_trait::async_trait;
5use std::sync::Arc;
6
7#[async_trait]
8pub trait SessionRepository: Send + Sync {
10 async fn insert_session(&self, session: OwnedNewSession) -> Result<()>;
12 async fn update_session_state(
14 &self,
15 session_id: &str,
16 state: &str,
17 exit_code: Option<i64>,
18 ended: bool,
19 ) -> Result<()>;
20 async fn update_session_pid(&self, session_id: &str, pid: i64) -> Result<()>;
22 async fn load_session(&self, session_id: &str) -> Result<Option<SessionRow>>;
24 async fn load_active_session_for_task_step(
26 &self,
27 task_id: &str,
28 step_id: &str,
29 ) -> Result<Option<SessionRow>>;
30 async fn list_task_sessions(&self, task_id: &str) -> Result<Vec<SessionRow>>;
32 async fn acquire_writer(&self, session_id: &str, client_id: &str) -> Result<bool>;
34 async fn attach_reader(&self, session_id: &str, client_id: &str) -> Result<()>;
36 async fn cleanup_stale_sessions(&self, max_age_hours: u64) -> Result<usize>;
38 async fn release_attachment(
40 &self,
41 session_id: &str,
42 client_id: &str,
43 reason: &str,
44 ) -> Result<()>;
45}
46
47pub struct SqliteSessionRepository {
49 async_db: Arc<AsyncDatabase>,
50}
51
52impl SqliteSessionRepository {
53 pub fn new(async_db: Arc<AsyncDatabase>) -> Self {
55 Self { async_db }
56 }
57}
58
59#[async_trait]
60impl SessionRepository for SqliteSessionRepository {
61 async fn insert_session(&self, session: OwnedNewSession) -> Result<()> {
62 self.async_db
63 .writer()
64 .call(move |conn| {
65 let session = session_store::NewSession {
66 id: &session.id,
67 task_id: &session.task_id,
68 task_item_id: session.task_item_id.as_deref(),
69 step_id: &session.step_id,
70 phase: &session.phase,
71 agent_id: &session.agent_id,
72 state: &session.state,
73 pid: session.pid,
74 pty_backend: &session.pty_backend,
75 cwd: &session.cwd,
76 command: &session.command,
77 input_fifo_path: &session.input_fifo_path,
78 stdout_path: &session.stdout_path,
79 stderr_path: &session.stderr_path,
80 transcript_path: &session.transcript_path,
81 output_json_path: session.output_json_path.as_deref(),
82 };
83 session_store::insert_session(conn, &session)
84 .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
85 })
86 .await
87 .map_err(flatten_err)
88 }
89
90 async fn update_session_state(
91 &self,
92 session_id: &str,
93 state: &str,
94 exit_code: Option<i64>,
95 ended: bool,
96 ) -> Result<()> {
97 let session_id = session_id.to_owned();
98 let state = state.to_owned();
99 self.async_db
100 .writer()
101 .call(move |conn| {
102 session_store::update_session_state(conn, &session_id, &state, exit_code, ended)
103 .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
104 })
105 .await
106 .map_err(flatten_err)
107 }
108
109 async fn update_session_pid(&self, session_id: &str, pid: i64) -> Result<()> {
110 let session_id = session_id.to_owned();
111 self.async_db
112 .writer()
113 .call(move |conn| {
114 session_store::update_session_pid(conn, &session_id, pid)
115 .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
116 })
117 .await
118 .map_err(flatten_err)
119 }
120
121 async fn load_session(&self, session_id: &str) -> Result<Option<SessionRow>> {
122 let session_id = session_id.to_owned();
123 self.async_db
124 .reader()
125 .call(move |conn| {
126 session_store::load_session(conn, &session_id)
127 .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
128 })
129 .await
130 .map_err(flatten_err)
131 }
132
133 async fn load_active_session_for_task_step(
134 &self,
135 task_id: &str,
136 step_id: &str,
137 ) -> Result<Option<SessionRow>> {
138 let task_id = task_id.to_owned();
139 let step_id = step_id.to_owned();
140 self.async_db
141 .reader()
142 .call(move |conn| {
143 session_store::load_active_session_for_task_step(conn, &task_id, &step_id)
144 .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
145 })
146 .await
147 .map_err(flatten_err)
148 }
149
150 async fn list_task_sessions(&self, task_id: &str) -> Result<Vec<SessionRow>> {
151 let task_id = task_id.to_owned();
152 self.async_db
153 .reader()
154 .call(move |conn| {
155 session_store::list_task_sessions(conn, &task_id)
156 .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
157 })
158 .await
159 .map_err(flatten_err)
160 }
161
162 async fn acquire_writer(&self, session_id: &str, client_id: &str) -> Result<bool> {
163 let session_id = session_id.to_owned();
164 let client_id = client_id.to_owned();
165 self.async_db
166 .writer()
167 .call(move |conn| {
168 session_store::acquire_writer(conn, &session_id, &client_id)
169 .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
170 })
171 .await
172 .map_err(flatten_err)
173 }
174
175 async fn attach_reader(&self, session_id: &str, client_id: &str) -> Result<()> {
176 let session_id = session_id.to_owned();
177 let client_id = client_id.to_owned();
178 self.async_db
179 .writer()
180 .call(move |conn| {
181 session_store::attach_reader(conn, &session_id, &client_id)
182 .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
183 })
184 .await
185 .map_err(flatten_err)
186 }
187
188 async fn cleanup_stale_sessions(&self, max_age_hours: u64) -> Result<usize> {
189 self.async_db
190 .writer()
191 .call(move |conn| {
192 session_store::cleanup_stale_sessions(conn, max_age_hours)
193 .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
194 })
195 .await
196 .map_err(flatten_err)
197 }
198
199 async fn release_attachment(
200 &self,
201 session_id: &str,
202 client_id: &str,
203 reason: &str,
204 ) -> Result<()> {
205 let session_id = session_id.to_owned();
206 let client_id = client_id.to_owned();
207 let reason = reason.to_owned();
208 self.async_db
209 .writer()
210 .call(move |conn| {
211 session_store::release_attachment(conn, &session_id, &client_id, &reason)
212 .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
213 })
214 .await
215 .map_err(flatten_err)
216 }
217}