1use crate::types::AgentStorage;
2use microagents_events::{
3 AgentEventAny, SessionInitEvent,
4 types::{AgentEvent, JsonRpcNotification},
5};
6use std::{
7 path::PathBuf,
8 sync::{Arc, OnceLock},
9 time::SystemTime,
10};
11use tokio::sync::Mutex;
12use tokio_rusqlite::Connection;
13use tokio_rusqlite::rusqlite;
14
15pub static SQLITE_SESSION_STORAGE: OnceLock<PathBuf> = OnceLock::new();
17
18pub fn sqlite_session_storage() -> &'static PathBuf {
20 SQLITE_SESSION_STORAGE.get_or_init(|| {
21 dirs::home_dir()
22 .expect("could not determine home directory")
23 .join(".microagents")
24 .join("sessions.db")
25 })
26}
27
28#[derive(Debug, Clone)]
30pub struct SqliteAgentStorage {
31 connection: Arc<Mutex<Connection>>,
32}
33
34impl SqliteAgentStorage {
35 pub async fn new(db_path: Option<&PathBuf>) -> anyhow::Result<Self> {
39 let path = db_path.unwrap_or(sqlite_session_storage());
40 if let Some(parent) = std::path::Path::new(&path).parent()
41 && !parent.exists()
42 {
43 std::fs::create_dir_all(parent)?;
44 }
45 let connection = Arc::new(Mutex::new(Connection::open(path).await?));
46 let storage = Self { connection };
47 storage.ensure_table_and_idx().await?;
48 Ok(storage)
49 }
50
51 async fn ensure_table_and_idx(&self) -> anyhow::Result<()> {
52 self.connection
53 .lock()
54 .await
55 .call(|conn| -> Result<(), tokio_rusqlite::rusqlite::Error> {
56 conn.execute_batch(
57 r#"
58 PRAGMA journal_mode = WAL;
59 PRAGMA synchronous = NORMAL;
60 "#,
61 )?;
62
63 let version: i64 = conn.query_row("PRAGMA user_version", [], |r| r.get(0))?;
65
66 if version < 1 {
67 conn.execute_batch(
68 r#"
69 CREATE TABLE IF NOT EXISTS events (
70 id INTEGER PRIMARY KEY AUTOINCREMENT,
71 session_id TEXT NOT NULL,
72 payload TEXT NOT NULL,
73 created_at INTEGER NOT NULL
74 );
75 CREATE INDEX IF NOT EXISTS idx_events_session_id ON events(session_id);
76 PRAGMA user_version = 1;
77 "#,
78 )?;
79 }
80 Ok(())
81 })
82 .await?;
83 Ok(())
84 }
85}
86
87#[async_trait::async_trait]
88impl AgentStorage for SqliteAgentStorage {
89 async fn create_session(&self, event: SessionInitEvent) -> anyhow::Result<()> {
90 let session_id = event.session_id.clone();
91 let json_event = serde_json::to_string(&event.to_jsonrpc())?;
92 let now = now_millis()?;
93
94 self.connection
95 .lock()
96 .await
97 .call(move |conn| -> Result<(), tokio_rusqlite::rusqlite::Error> {
98 conn.execute(
99 "INSERT INTO events (session_id, payload, created_at) VALUES (?1, ?2, ?3)",
100 rusqlite::params![session_id, json_event, now],
101 )?;
102 Ok(())
103 })
104 .await?;
105 Ok(())
106 }
107
108 async fn update_session(&self, event: AgentEventAny) -> anyhow::Result<()> {
109 let session_id = event.session_id();
110 let json_event = serde_json::to_string(&event.to_jsonrpc())?;
111 let now = now_millis()?;
112
113 self.connection
114 .lock()
115 .await
116 .call(move |conn| -> Result<(), tokio_rusqlite::rusqlite::Error> {
117 conn.execute(
118 "INSERT INTO events (session_id, payload, created_at) VALUES (?1, ?2, ?3)",
119 rusqlite::params![session_id, json_event, now],
120 )?;
121 Ok(())
122 })
123 .await?;
124 Ok(())
125 }
126
127 async fn get_session(&self, session_id: &str) -> anyhow::Result<Vec<AgentEventAny>> {
128 let session_id = session_id.to_string();
129
130 let rows =
131 self.connection
132 .lock()
133 .await
134 .call(
135 move |conn| -> Result<Vec<(isize, String)>, tokio_rusqlite::rusqlite::Error> {
136 let mut stmt = conn.prepare(
137 "SELECT id, payload FROM events WHERE session_id = ?1 ORDER BY id ASC",
138 )?;
139 let rows = stmt
140 .query_map([&session_id], |row| {
141 Ok((row.get::<_, isize>(0)?, row.get::<_, String>(1)?))
142 })?
143 .collect::<Result<Vec<(isize, String)>, tokio_rusqlite::rusqlite::Error>>()?;
144 Ok(rows)
145 },
146 )
147 .await?;
148
149 let mut events: Vec<AgentEventAny> = Vec::with_capacity(rows.len());
150 for (_, payload) in rows {
151 let jrpc: JsonRpcNotification = serde_json::from_str(&payload)
152 .map_err(|e| anyhow::anyhow!("Invalid JSON payload in events table: {e}"))?;
153 let event = AgentEventAny::try_from(jrpc)
154 .map_err(|e| anyhow::anyhow!("Invalid event payload in events table: {e}"))?;
155 events.push(event);
156 }
157 events.sort_by_key(|a| a.timestamp());
158 Ok(events)
159 }
160}
161
162fn now_millis() -> anyhow::Result<i64> {
163 Ok(SystemTime::now()
164 .duration_since(SystemTime::UNIX_EPOCH)?
165 .as_millis()
166 .try_into()?)
167}
168
169#[cfg(test)]
170mod tests {
171 use chrono::Utc;
172 use microagents_events::{
173 AssistantResponseEvent, SessionStopEvent, Usage, UserPromptSubmitEvent,
174 };
175
176 use super::*;
177
178 #[tokio::test]
179 async fn test_default_init() {
180 let tmp = tempfile::tempdir().unwrap();
181 let db_path = tmp.path().join("sessions.db");
182 let result = SqliteAgentStorage::new(Some(&db_path)).await;
183 assert!(result.is_ok());
184 }
185
186 #[tokio::test]
187 async fn test_create_session() {
188 let tmp = tempfile::tempdir().unwrap();
189 let db_path = tmp.path().join("test.db");
190 let sql = SqliteAgentStorage::new(Some(&db_path))
191 .await
192 .expect("Should be able to open agent store");
193 sql.create_session(SessionInitEvent {
194 session_id: "1".to_string(),
195 model: "gpt-5.5".into(),
196 provider: "openai".into(),
197 system: "you are a helpful assistant".into(),
198 init_type: microagents_events::SessionInitType::Start,
199 timestamp: Utc::now(),
200 })
201 .await
202 .expect("Should be able to create a session");
203 let rows =
204 sql.connection
205 .lock()
206 .await
207 .call(
208 move |conn| -> Result<Vec<(isize, String)>, tokio_rusqlite::rusqlite::Error> {
209 let mut stmt = conn.prepare(
210 "SELECT id, payload FROM events WHERE session_id = ?1 ORDER BY id ASC",
211 )?;
212 let rows = stmt
213 .query_map(["1"], |row| {
214 Ok((row.get::<_, isize>(0)?, row.get::<_, String>(1)?))
215 })?
216 .collect::<Result<Vec<(isize, String)>, tokio_rusqlite::rusqlite::Error>>()?;
217 Ok(rows)
218 },
219 )
220 .await
221 .expect("Should be able to perform sql operation");
222
223 let events: Vec<AgentEventAny> = rows
224 .into_iter()
225 .map(|(_, payload)| {
226 let jrpc: JsonRpcNotification = serde_json::from_str(&payload).unwrap();
227 AgentEventAny::try_from(jrpc).unwrap()
228 })
229 .collect();
230 assert_eq!(events.len(), 1);
231 assert_eq!(
232 events[0].clone().to_jsonrpc().method,
233 "session.init".to_string()
234 );
235 }
237
238 #[tokio::test]
239 async fn test_create_update_get_session() {
240 let tmp = tempfile::tempdir().unwrap();
241 let db_path = tmp.path().join("test.db");
242 let sql = SqliteAgentStorage::new(Some(&db_path))
243 .await
244 .expect("Should be able to create sqlite store");
245 sql.create_session(SessionInitEvent {
246 session_id: "1".to_string(),
247 model: "gpt-5.5".into(),
248 provider: "openai".into(),
249 system: "you are a helpful assistant".into(),
250 init_type: microagents_events::SessionInitType::Start,
251 timestamp: Utc::now(),
252 })
253 .await
254 .expect("Should be able to create a session");
255 sql.update_session(AgentEventAny::UserPromptSubmit(UserPromptSubmitEvent {
256 prompt: "hello".to_string(),
257 session_id: "1".to_string(),
258 turn_id: "t1".to_string(),
259 timestamp: Utc::now(),
260 }))
261 .await
262 .expect("Should be able to update memory");
263 sql.update_session(AgentEventAny::AssistantResponse(AssistantResponseEvent {
264 session_id: "1".to_string(),
265 turn_id: "t1".to_string(),
266 full_text: "hello".to_string(),
267 tool_calls: None,
268 timestamp: Utc::now(),
269 }))
270 .await
271 .expect("Should be able to update memory");
272 sql.update_session(AgentEventAny::SessionStop(SessionStopEvent {
273 session_id: "1".to_string(),
274 result: Some("hello".to_string()),
275 error: None,
276 success: true,
277 timestamp: Utc::now(),
278 usage: Usage::default(),
279 }))
280 .await
281 .expect("Should be able to update memory");
282 let events = sql
283 .get_session("1")
284 .await
285 .expect("Should be able to get the session");
286 assert_eq!(events.len(), 4);
287 assert_eq!(events[0].to_jsonrpc().method, "session.init".to_string());
288 assert_eq!(
289 events[1].to_jsonrpc().method,
290 "user.prompt.submit".to_string()
291 );
292 assert_eq!(
293 events[2].to_jsonrpc().method,
294 "assistant.response".to_string()
295 );
296 assert_eq!(events[3].to_jsonrpc().method, "session.stop".to_string());
297 }
299}