Skip to main content

microagents_storage/
sqlite.rs

1use crate::types::AgentStorage;
2use microagents_events::{
3    AgentEventAny, SessionInitEvent,
4    types::{AgentEvent, JsonRpcNotification},
5};
6use std::{
7    path::PathBuf,
8    sync::{Arc, OnceLock},
9    time::SystemTime,
10};
11use tokio::sync::Mutex;
12use tokio_rusqlite::Connection;
13use tokio_rusqlite::rusqlite;
14
15/// Global default path for the SQLite sessions database.
16pub static SQLITE_SESSION_STORAGE: OnceLock<PathBuf> = OnceLock::new();
17
18/// Return the default SQLite database path (`~/.microagents/sessions.db`).
19pub fn sqlite_session_storage() -> &'static PathBuf {
20    SQLITE_SESSION_STORAGE.get_or_init(|| {
21        dirs::home_dir()
22            .expect("could not determine home directory")
23            .join(".microagents")
24            .join("sessions.db")
25    })
26}
27
28/// SQLite-backed implementation of [`AgentStorage`].
29#[derive(Debug, Clone)]
30pub struct SqliteAgentStorage {
31    connection: Arc<Mutex<Connection>>,
32}
33
34impl SqliteAgentStorage {
35    /// Open (or create) the SQLite database at `db_path` and ensure the events table exists.
36    ///
37    /// If `db_path` is `None`, the default path from [`sqlite_session_storage`] is used.
38    pub async fn new(db_path: Option<&PathBuf>) -> anyhow::Result<Self> {
39        let path = db_path.unwrap_or(sqlite_session_storage());
40        if let Some(parent) = std::path::Path::new(&path).parent()
41            && !parent.exists()
42        {
43            std::fs::create_dir_all(parent)?;
44        }
45        let connection = Arc::new(Mutex::new(Connection::open(path).await?));
46        let storage = Self { connection };
47        storage.ensure_table_and_idx().await?;
48        Ok(storage)
49    }
50
51    async fn ensure_table_and_idx(&self) -> anyhow::Result<()> {
52        self.connection
53            .lock()
54            .await
55            .call(|conn| -> Result<(), tokio_rusqlite::rusqlite::Error> {
56                conn.execute_batch(
57                    r#"
58                    PRAGMA journal_mode = WAL;
59                    PRAGMA synchronous = NORMAL;
60                    "#,
61                )?;
62
63                // useful for future migrations
64                let version: i64 = conn.query_row("PRAGMA user_version", [], |r| r.get(0))?;
65
66                if version < 1 {
67                    conn.execute_batch(
68                        r#"
69                    CREATE TABLE IF NOT EXISTS events (
70                        id          INTEGER PRIMARY KEY AUTOINCREMENT,
71                        session_id  TEXT    NOT NULL,
72                        payload     TEXT    NOT NULL,
73                        created_at  INTEGER NOT NULL
74                    );
75                    CREATE INDEX IF NOT EXISTS idx_events_session_id ON events(session_id);
76                    PRAGMA user_version = 1;
77                    "#,
78                    )?;
79                }
80                Ok(())
81            })
82            .await?;
83        Ok(())
84    }
85}
86
87#[async_trait::async_trait]
88impl AgentStorage for SqliteAgentStorage {
89    async fn create_session(&self, event: SessionInitEvent) -> anyhow::Result<()> {
90        let session_id = event.session_id.clone();
91        let json_event = serde_json::to_string(&event.to_jsonrpc())?;
92        let now = now_millis()?;
93
94        self.connection
95            .lock()
96            .await
97            .call(move |conn| -> Result<(), tokio_rusqlite::rusqlite::Error> {
98                conn.execute(
99                    "INSERT INTO events (session_id, payload, created_at) VALUES (?1, ?2, ?3)",
100                    rusqlite::params![session_id, json_event, now],
101                )?;
102                Ok(())
103            })
104            .await?;
105        Ok(())
106    }
107
108    async fn update_session(&self, event: AgentEventAny) -> anyhow::Result<()> {
109        let session_id = event.session_id();
110        let json_event = serde_json::to_string(&event.to_jsonrpc())?;
111        let now = now_millis()?;
112
113        self.connection
114            .lock()
115            .await
116            .call(move |conn| -> Result<(), tokio_rusqlite::rusqlite::Error> {
117                conn.execute(
118                    "INSERT INTO events (session_id, payload, created_at) VALUES (?1, ?2, ?3)",
119                    rusqlite::params![session_id, json_event, now],
120                )?;
121                Ok(())
122            })
123            .await?;
124        Ok(())
125    }
126
127    async fn get_session(&self, session_id: &str) -> anyhow::Result<Vec<AgentEventAny>> {
128        let session_id = session_id.to_string();
129
130        let rows =
131            self.connection
132                .lock()
133                .await
134                .call(
135                    move |conn| -> Result<Vec<(isize, String)>, tokio_rusqlite::rusqlite::Error> {
136                        let mut stmt = conn.prepare(
137                            "SELECT id, payload FROM events WHERE session_id = ?1 ORDER BY id ASC",
138                        )?;
139                        let rows = stmt
140                    .query_map([&session_id], |row| {
141                        Ok((row.get::<_, isize>(0)?, row.get::<_, String>(1)?))
142                    })?
143                    .collect::<Result<Vec<(isize, String)>, tokio_rusqlite::rusqlite::Error>>()?;
144                        Ok(rows)
145                    },
146                )
147                .await?;
148
149        let mut events: Vec<AgentEventAny> = Vec::with_capacity(rows.len());
150        for (_, payload) in rows {
151            let jrpc: JsonRpcNotification = serde_json::from_str(&payload)
152                .map_err(|e| anyhow::anyhow!("Invalid JSON payload in events table: {e}"))?;
153            let event = AgentEventAny::try_from(jrpc)
154                .map_err(|e| anyhow::anyhow!("Invalid event payload in events table: {e}"))?;
155            events.push(event);
156        }
157        events.sort_by_key(|a| a.timestamp());
158        Ok(events)
159    }
160}
161
162fn now_millis() -> anyhow::Result<i64> {
163    Ok(SystemTime::now()
164        .duration_since(SystemTime::UNIX_EPOCH)?
165        .as_millis()
166        .try_into()?)
167}
168
169#[cfg(test)]
170mod tests {
171    use chrono::Utc;
172    use microagents_events::{
173        AssistantResponseEvent, SessionStopEvent, Usage, UserPromptSubmitEvent,
174    };
175
176    use super::*;
177
178    #[tokio::test]
179    async fn test_default_init() {
180        let tmp = tempfile::tempdir().unwrap();
181        let db_path = tmp.path().join("sessions.db");
182        let result = SqliteAgentStorage::new(Some(&db_path)).await;
183        assert!(result.is_ok());
184    }
185
186    #[tokio::test]
187    async fn test_create_session() {
188        let tmp = tempfile::tempdir().unwrap();
189        let db_path = tmp.path().join("test.db");
190        let sql = SqliteAgentStorage::new(Some(&db_path))
191            .await
192            .expect("Should be able to open agent store");
193        sql.create_session(SessionInitEvent {
194            session_id: "1".to_string(),
195            model: "gpt-5.5".into(),
196            provider: "openai".into(),
197            system: "you are a helpful assistant".into(),
198            init_type: microagents_events::SessionInitType::Start,
199            timestamp: Utc::now(),
200        })
201        .await
202        .expect("Should be able to create a session");
203        let rows =
204            sql.connection
205                .lock()
206                .await
207                .call(
208                    move |conn| -> Result<Vec<(isize, String)>, tokio_rusqlite::rusqlite::Error> {
209                        let mut stmt = conn.prepare(
210                            "SELECT id, payload FROM events WHERE session_id = ?1 ORDER BY id ASC",
211                        )?;
212                        let rows = stmt
213                    .query_map(["1"], |row| {
214                        Ok((row.get::<_, isize>(0)?, row.get::<_, String>(1)?))
215                    })?
216                    .collect::<Result<Vec<(isize, String)>, tokio_rusqlite::rusqlite::Error>>()?;
217                        Ok(rows)
218                    },
219                )
220                .await
221                .expect("Should be able to perform sql operation");
222
223        let events: Vec<AgentEventAny> = rows
224            .into_iter()
225            .map(|(_, payload)| {
226                let jrpc: JsonRpcNotification = serde_json::from_str(&payload).unwrap();
227                AgentEventAny::try_from(jrpc).unwrap()
228            })
229            .collect();
230        assert_eq!(events.len(), 1);
231        assert_eq!(
232            events[0].clone().to_jsonrpc().method,
233            "session.init".to_string()
234        );
235        // Connection is dropped when `sql` goes out of scope, then tmp dir is cleaned up
236    }
237
238    #[tokio::test]
239    async fn test_create_update_get_session() {
240        let tmp = tempfile::tempdir().unwrap();
241        let db_path = tmp.path().join("test.db");
242        let sql = SqliteAgentStorage::new(Some(&db_path))
243            .await
244            .expect("Should be able to create sqlite store");
245        sql.create_session(SessionInitEvent {
246            session_id: "1".to_string(),
247            model: "gpt-5.5".into(),
248            provider: "openai".into(),
249            system: "you are a helpful assistant".into(),
250            init_type: microagents_events::SessionInitType::Start,
251            timestamp: Utc::now(),
252        })
253        .await
254        .expect("Should be able to create a session");
255        sql.update_session(AgentEventAny::UserPromptSubmit(UserPromptSubmitEvent {
256            prompt: "hello".to_string(),
257            session_id: "1".to_string(),
258            turn_id: "t1".to_string(),
259            timestamp: Utc::now(),
260        }))
261        .await
262        .expect("Should be able to update memory");
263        sql.update_session(AgentEventAny::AssistantResponse(AssistantResponseEvent {
264            session_id: "1".to_string(),
265            turn_id: "t1".to_string(),
266            full_text: "hello".to_string(),
267            tool_calls: None,
268            timestamp: Utc::now(),
269        }))
270        .await
271        .expect("Should be able to update memory");
272        sql.update_session(AgentEventAny::SessionStop(SessionStopEvent {
273            session_id: "1".to_string(),
274            result: Some("hello".to_string()),
275            error: None,
276            success: true,
277            timestamp: Utc::now(),
278            usage: Usage::default(),
279        }))
280        .await
281        .expect("Should be able to update memory");
282        let events = sql
283            .get_session("1")
284            .await
285            .expect("Should be able to get the session");
286        assert_eq!(events.len(), 4);
287        assert_eq!(events[0].to_jsonrpc().method, "session.init".to_string());
288        assert_eq!(
289            events[1].to_jsonrpc().method,
290            "user.prompt.submit".to_string()
291        );
292        assert_eq!(
293            events[2].to_jsonrpc().method,
294            "assistant.response".to_string()
295        );
296        assert_eq!(events[3].to_jsonrpc().method, "session.stop".to_string());
297        // Connection is dropped when `sql` goes out of scope, then tmp dir is cleaned up
298    }
299}