microagents_storage/
jsonl.rs1use microagents_events::types::{AgentEvent, JsonRpcNotification};
2use microagents_events::{AgentEventAny, SessionInitEvent};
3use std::{path::PathBuf, sync::OnceLock};
4use tokio::fs::OpenOptions;
5use tokio::io::{AsyncReadExt, AsyncWriteExt};
6
7use crate::types::AgentStorage;
8
9pub static JSONL_SESSION_STORAGE: OnceLock<PathBuf> = OnceLock::new();
11
12pub fn jsonl_session_storage() -> &'static PathBuf {
14 JSONL_SESSION_STORAGE.get_or_init(|| {
15 dirs::home_dir()
16 .expect("could not determine home directory")
17 .join(".microagents")
18 .join("sessions")
19 })
20}
21
22#[derive(Debug)]
26pub struct JsonlAgentStorage {
27 pub jsonl_path: PathBuf,
29}
30
31impl Default for JsonlAgentStorage {
32 fn default() -> Self {
33 Self {
34 jsonl_path: jsonl_session_storage().to_owned(),
35 }
36 }
37}
38
39impl JsonlAgentStorage {
40 pub fn new(jsonl_path: Option<PathBuf>) -> Self {
44 Self {
45 jsonl_path: jsonl_path.unwrap_or(jsonl_session_storage().to_owned()),
46 }
47 }
48
49 async fn ensure_sessions_dir(&self) -> anyhow::Result<()> {
50 if self.jsonl_path.is_dir() {
51 return Ok(());
52 }
53 tokio::fs::create_dir_all(&self.jsonl_path).await?;
54 Ok(())
55 }
56}
57
58#[async_trait::async_trait]
59impl AgentStorage for JsonlAgentStorage {
60 async fn create_session(&self, event: SessionInitEvent) -> anyhow::Result<()> {
61 self.ensure_sessions_dir().await?;
62 let mut file = OpenOptions::new()
63 .create(true)
64 .append(true)
65 .open(self.jsonl_path.join(format!("{}.jsonl", event.session_id)))
66 .await?;
67 let event_json = serde_json::to_string(&event.to_jsonrpc())?;
68 file.write_all(format!("{}\n", event_json).as_bytes())
69 .await?;
70 Ok(())
71 }
72
73 async fn update_session(&self, event: AgentEventAny) -> anyhow::Result<()> {
74 self.ensure_sessions_dir().await?;
75 let mut file = OpenOptions::new()
76 .create(true)
77 .append(true)
78 .open(
79 self.jsonl_path
80 .join(format!("{}.jsonl", event.session_id())),
81 )
82 .await?;
83 file.write_all(format!("{}\n", serde_json::to_string(&event.to_jsonrpc())?).as_bytes())
84 .await?;
85 Ok(())
86 }
87
88 async fn get_session(&self, session_id: &str) -> anyhow::Result<Vec<AgentEventAny>> {
89 self.ensure_sessions_dir().await?;
90 let mut file = OpenOptions::new()
91 .read(true)
92 .open(self.jsonl_path.join(format!("{session_id}.jsonl")))
93 .await?;
94 let mut buf = String::new();
95 file.read_to_string(&mut buf).await?;
96 let mut events = vec![];
97 let mut i = 0;
98 for line in buf.lines() {
99 i += 1;
100 let jsrpc: JsonRpcNotification = match serde_json::from_str(line.trim_end_matches("\n"))
101 {
102 Ok(r) => r,
103 Err(e) => {
104 eprintln!("Corrupted line {:?}. Error detail: {}", i, e);
105 continue;
106 }
107 };
108 let event = AgentEventAny::try_from(jsrpc)?;
109 events.push(event);
110 }
111
112 events.sort_by_key(|a| a.timestamp());
113
114 Ok(events)
115 }
116}
117
118#[cfg(test)]
119mod tests {
120 use chrono::Utc;
121 use microagents_events::{
122 AssistantResponseEvent, SessionStopEvent, Usage, UserPromptSubmitEvent,
123 };
124
125 use super::*;
126
127 #[test]
128 fn test_default_init() {
129 let jsonl = JsonlAgentStorage::default();
130 assert_eq!(jsonl.jsonl_path, jsonl_session_storage().to_owned());
131 }
132
133 #[tokio::test]
134 async fn test_create_session() {
135 let tmp = tempfile::tempdir().unwrap();
136 let jsonl = JsonlAgentStorage::new(Some(tmp.path().to_path_buf()));
137 jsonl
138 .create_session(SessionInitEvent {
139 session_id: "1".to_string(),
140 model: "gpt-5.5".into(),
141 provider: "openai".into(),
142 system: "you are a helpful assistant".into(),
143 init_type: microagents_events::SessionInitType::Start,
144 timestamp: Utc::now(),
145 })
146 .await
147 .expect("Should be able to create a session");
148 let content = tokio::fs::read_to_string(tmp.path().join("1.jsonl"))
149 .await
150 .expect("Should be able to read file");
151 let mut events = vec![];
152 for line in content.lines() {
153 let jsrpc: JsonRpcNotification =
154 serde_json::from_str(line).expect("Should serialize correctly");
155 let event = AgentEventAny::try_from(jsrpc).expect("Should convert to agent event");
156 events.push(event);
157 }
158 assert_eq!(events.len(), 1);
159 assert_eq!(
160 events[0].clone().to_jsonrpc().method,
161 "session.init".to_string()
162 );
163 }
164
165 #[tokio::test]
166 async fn test_create_update_get_session() {
167 let tmp = tempfile::tempdir().unwrap();
168 let jsonl = JsonlAgentStorage::new(Some(tmp.path().to_path_buf()));
169 jsonl
170 .create_session(SessionInitEvent {
171 session_id: "1".to_string(),
172 model: "gpt-5.5".into(),
173 provider: "openai".into(),
174 system: "you are a helpful assistant".into(),
175 init_type: microagents_events::SessionInitType::Start,
176 timestamp: Utc::now(),
177 })
178 .await
179 .expect("Should be able to create a session");
180 jsonl
181 .update_session(AgentEventAny::UserPromptSubmit(UserPromptSubmitEvent {
182 prompt: "hello".to_string(),
183 session_id: "1".to_string(),
184 turn_id: "t1".to_string(),
185 timestamp: Utc::now(),
186 }))
187 .await
188 .expect("Should be able to update memory");
189 jsonl
190 .update_session(AgentEventAny::AssistantResponse(AssistantResponseEvent {
191 session_id: "1".to_string(),
192 turn_id: "t1".to_string(),
193 full_text: "hello".to_string(),
194 tool_calls: None,
195 timestamp: Utc::now(),
196 }))
197 .await
198 .expect("Should be able to update memory");
199 jsonl
200 .update_session(AgentEventAny::SessionStop(SessionStopEvent {
201 session_id: "1".to_string(),
202 result: Some("hello".to_string()),
203 error: None,
204 success: true,
205 timestamp: Utc::now(),
206 usage: Usage::default(),
207 }))
208 .await
209 .expect("Should be able to update memory");
210 let events = jsonl
211 .get_session("1")
212 .await
213 .expect("Should be able to get the session");
214 assert_eq!(events.len(), 4);
215 assert_eq!(events[0].to_jsonrpc().method, "session.init".to_string());
216 assert_eq!(
217 events[1].to_jsonrpc().method,
218 "user.prompt.submit".to_string()
219 );
220 assert_eq!(
221 events[2].to_jsonrpc().method,
222 "assistant.response".to_string()
223 );
224 assert_eq!(events[3].to_jsonrpc().method, "session.stop".to_string());
225 }
226}