Skip to main content

microagents_storage/
jsonl.rs

1use microagents_events::types::{AgentEvent, JsonRpcNotification};
2use microagents_events::{AgentEventAny, SessionInitEvent};
3use std::{path::PathBuf, sync::OnceLock};
4use tokio::fs::OpenOptions;
5use tokio::io::{AsyncReadExt, AsyncWriteExt};
6
7use crate::types::AgentStorage;
8
9/// Global default directory for JSONL session files.
10pub static JSONL_SESSION_STORAGE: OnceLock<PathBuf> = OnceLock::new();
11
12/// Return the default JSONL storage directory (`~/.microagents/sessions`).
13pub fn jsonl_session_storage() -> &'static PathBuf {
14    JSONL_SESSION_STORAGE.get_or_init(|| {
15        dirs::home_dir()
16            .expect("could not determine home directory")
17            .join(".microagents")
18            .join("sessions")
19    })
20}
21
22/// JSON Lines file-based implementation of [`AgentStorage`].
23///
24/// Each session is stored as a separate `.jsonl` file under `jsonl_path`.
25#[derive(Debug)]
26pub struct JsonlAgentStorage {
27    /// Directory where session `.jsonl` files are written.
28    pub jsonl_path: PathBuf,
29}
30
31impl Default for JsonlAgentStorage {
32    fn default() -> Self {
33        Self {
34            jsonl_path: jsonl_session_storage().to_owned(),
35        }
36    }
37}
38
39impl JsonlAgentStorage {
40    /// Create a new [`JsonlAgentStorage`].
41    ///
42    /// If `jsonl_path` is `None`, the default directory from [`jsonl_session_storage`] is used.
43    pub fn new(jsonl_path: Option<PathBuf>) -> Self {
44        Self {
45            jsonl_path: jsonl_path.unwrap_or(jsonl_session_storage().to_owned()),
46        }
47    }
48
49    async fn ensure_sessions_dir(&self) -> anyhow::Result<()> {
50        if self.jsonl_path.is_dir() {
51            return Ok(());
52        }
53        tokio::fs::create_dir_all(&self.jsonl_path).await?;
54        Ok(())
55    }
56}
57
58#[async_trait::async_trait]
59impl AgentStorage for JsonlAgentStorage {
60    async fn create_session(&self, event: SessionInitEvent) -> anyhow::Result<()> {
61        self.ensure_sessions_dir().await?;
62        let mut file = OpenOptions::new()
63            .create(true)
64            .append(true)
65            .open(self.jsonl_path.join(format!("{}.jsonl", event.session_id)))
66            .await?;
67        let event_json = serde_json::to_string(&event.to_jsonrpc())?;
68        file.write_all(format!("{}\n", event_json).as_bytes())
69            .await?;
70        Ok(())
71    }
72
73    async fn update_session(&self, event: AgentEventAny) -> anyhow::Result<()> {
74        self.ensure_sessions_dir().await?;
75        let mut file = OpenOptions::new()
76            .create(true)
77            .append(true)
78            .open(
79                self.jsonl_path
80                    .join(format!("{}.jsonl", event.session_id())),
81            )
82            .await?;
83        file.write_all(format!("{}\n", serde_json::to_string(&event.to_jsonrpc())?).as_bytes())
84            .await?;
85        Ok(())
86    }
87
88    async fn get_session(&self, session_id: &str) -> anyhow::Result<Vec<AgentEventAny>> {
89        self.ensure_sessions_dir().await?;
90        let mut file = OpenOptions::new()
91            .read(true)
92            .open(self.jsonl_path.join(format!("{session_id}.jsonl")))
93            .await?;
94        let mut buf = String::new();
95        file.read_to_string(&mut buf).await?;
96        let mut events = vec![];
97        let mut i = 0;
98        for line in buf.lines() {
99            i += 1;
100            let jsrpc: JsonRpcNotification = match serde_json::from_str(line.trim_end_matches("\n"))
101            {
102                Ok(r) => r,
103                Err(e) => {
104                    eprintln!("Corrupted line {:?}. Error detail: {}", i, e);
105                    continue;
106                }
107            };
108            let event = AgentEventAny::try_from(jsrpc)?;
109            events.push(event);
110        }
111
112        events.sort_by_key(|a| a.timestamp());
113
114        Ok(events)
115    }
116}
117
118#[cfg(test)]
119mod tests {
120    use chrono::Utc;
121    use microagents_events::{
122        AssistantResponseEvent, SessionStopEvent, Usage, UserPromptSubmitEvent,
123    };
124
125    use super::*;
126
127    #[test]
128    fn test_default_init() {
129        let jsonl = JsonlAgentStorage::default();
130        assert_eq!(jsonl.jsonl_path, jsonl_session_storage().to_owned());
131    }
132
133    #[tokio::test]
134    async fn test_create_session() {
135        let tmp = tempfile::tempdir().unwrap();
136        let jsonl = JsonlAgentStorage::new(Some(tmp.path().to_path_buf()));
137        jsonl
138            .create_session(SessionInitEvent {
139                session_id: "1".to_string(),
140                model: "gpt-5.5".into(),
141                provider: "openai".into(),
142                system: "you are a helpful assistant".into(),
143                init_type: microagents_events::SessionInitType::Start,
144                timestamp: Utc::now(),
145            })
146            .await
147            .expect("Should be able to create a session");
148        let content = tokio::fs::read_to_string(tmp.path().join("1.jsonl"))
149            .await
150            .expect("Should be able to read file");
151        let mut events = vec![];
152        for line in content.lines() {
153            let jsrpc: JsonRpcNotification =
154                serde_json::from_str(line).expect("Should serialize correctly");
155            let event = AgentEventAny::try_from(jsrpc).expect("Should convert to agent event");
156            events.push(event);
157        }
158        assert_eq!(events.len(), 1);
159        assert_eq!(
160            events[0].clone().to_jsonrpc().method,
161            "session.init".to_string()
162        );
163    }
164
165    #[tokio::test]
166    async fn test_create_update_get_session() {
167        let tmp = tempfile::tempdir().unwrap();
168        let jsonl = JsonlAgentStorage::new(Some(tmp.path().to_path_buf()));
169        jsonl
170            .create_session(SessionInitEvent {
171                session_id: "1".to_string(),
172                model: "gpt-5.5".into(),
173                provider: "openai".into(),
174                system: "you are a helpful assistant".into(),
175                init_type: microagents_events::SessionInitType::Start,
176                timestamp: Utc::now(),
177            })
178            .await
179            .expect("Should be able to create a session");
180        jsonl
181            .update_session(AgentEventAny::UserPromptSubmit(UserPromptSubmitEvent {
182                prompt: "hello".to_string(),
183                session_id: "1".to_string(),
184                turn_id: "t1".to_string(),
185                timestamp: Utc::now(),
186            }))
187            .await
188            .expect("Should be able to update memory");
189        jsonl
190            .update_session(AgentEventAny::AssistantResponse(AssistantResponseEvent {
191                session_id: "1".to_string(),
192                turn_id: "t1".to_string(),
193                full_text: "hello".to_string(),
194                tool_calls: None,
195                timestamp: Utc::now(),
196            }))
197            .await
198            .expect("Should be able to update memory");
199        jsonl
200            .update_session(AgentEventAny::SessionStop(SessionStopEvent {
201                session_id: "1".to_string(),
202                result: Some("hello".to_string()),
203                error: None,
204                success: true,
205                timestamp: Utc::now(),
206                usage: Usage::default(),
207            }))
208            .await
209            .expect("Should be able to update memory");
210        let events = jsonl
211            .get_session("1")
212            .await
213            .expect("Should be able to get the session");
214        assert_eq!(events.len(), 4);
215        assert_eq!(events[0].to_jsonrpc().method, "session.init".to_string());
216        assert_eq!(
217            events[1].to_jsonrpc().method,
218            "user.prompt.submit".to_string()
219        );
220        assert_eq!(
221            events[2].to_jsonrpc().method,
222            "assistant.response".to_string()
223        );
224        assert_eq!(events[3].to_jsonrpc().method, "session.stop".to_string());
225    }
226}