motosan_agent_loop/
session_store.rs1use std::any::Any;
2use std::collections::HashMap;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use tokio::sync::Mutex;
8
9use crate::{Message, Result};
10
11#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
17pub struct SessionMeta {
18 pub session_id: String,
19 pub created_at_ms: u64,
20 pub agent_loop_version: String,
21 pub message_count: usize,
22}
23
24#[async_trait]
32pub trait SessionStore: Send + Sync {
33 async fn append(&self, session_id: &str, message: &Message) -> Result<()>;
34 async fn flush(&self, session_id: &str) -> Result<()>;
35 async fn load(&self, session_id: &str) -> Result<Vec<Message>>;
36 async fn delete(&self, session_id: &str) -> Result<()>;
37 async fn list(&self) -> Result<Vec<String>>;
38 fn as_any(&self) -> &dyn Any;
39}
40
41#[derive(Default)]
42pub struct MemorySessionStore {
43 sessions: Arc<Mutex<HashMap<String, Vec<Message>>>>,
44}
45
46impl MemorySessionStore {
47 pub fn new() -> Self {
48 Self::default()
49 }
50}
51
52#[async_trait]
53impl SessionStore for MemorySessionStore {
54 async fn append(&self, session_id: &str, message: &Message) -> Result<()> {
55 let mut sessions = self.sessions.lock().await;
56 sessions
57 .entry(session_id.to_string())
58 .or_default()
59 .push(message.clone());
60 Ok(())
61 }
62
63 async fn flush(&self, _session_id: &str) -> Result<()> {
64 Ok(())
65 }
66
67 async fn load(&self, session_id: &str) -> Result<Vec<Message>> {
68 let sessions = self.sessions.lock().await;
69 Ok(sessions.get(session_id).cloned().unwrap_or_default())
70 }
71
72 async fn delete(&self, session_id: &str) -> Result<()> {
73 let mut sessions = self.sessions.lock().await;
74 sessions.remove(session_id);
75 Ok(())
76 }
77
78 async fn list(&self) -> Result<Vec<String>> {
79 let sessions = self.sessions.lock().await;
80 let mut ids: Vec<String> = sessions.keys().cloned().collect();
81 ids.sort();
82 Ok(ids)
83 }
84
85 fn as_any(&self) -> &dyn Any {
86 self
87 }
88}
89
90#[derive(Default)]
91struct FileStoreState {
92 buffers: HashMap<String, Vec<Message>>,
93}
94
95pub struct FileSessionStore {
96 base_dir: PathBuf,
97 state: Arc<Mutex<FileStoreState>>,
98 flush_task: Option<tokio::task::JoinHandle<()>>,
99}
100
101impl FileSessionStore {
102 pub fn new(base_dir: impl Into<PathBuf>) -> Self {
108 let base_dir = base_dir.into();
109 let state = Arc::new(Mutex::new(FileStoreState::default()));
110 let state_clone = state.clone();
111 let base_dir_clone = base_dir.clone();
112
113 let flush_task = tokio::runtime::Handle::try_current().ok().map(|handle| {
114 handle.spawn(async move {
115 let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
116 loop {
117 interval.tick().await;
118 let session_ids = {
119 let state = state_clone.lock().await;
120 state
121 .buffers
122 .iter()
123 .filter(|(_, messages)| !messages.is_empty())
124 .map(|(id, _)| id.clone())
125 .collect::<Vec<_>>()
126 };
127 for session_id in session_ids {
128 let _ = flush_session(&base_dir_clone, &state_clone, &session_id).await;
129 }
130 }
131 })
132 });
133
134 Self {
135 base_dir,
136 state,
137 flush_task,
138 }
139 }
140
141 fn session_path(&self, session_id: &str) -> PathBuf {
142 self.base_dir.join(format!("{session_id}.jsonl"))
143 }
144
145 pub async fn load_meta(&self, session_id: &str) -> Result<Option<SessionMeta>> {
146 let path = self.session_path(session_id);
147 load_meta_from_path(&path).await
148 }
149}
150
151impl Drop for FileSessionStore {
152 fn drop(&mut self) {
153 if let Some(task) = &self.flush_task {
154 task.abort();
155 }
156 }
157}
158
159#[async_trait]
160impl SessionStore for FileSessionStore {
161 async fn append(&self, session_id: &str, message: &Message) -> Result<()> {
162 let should_flush = {
163 let mut state = self.state.lock().await;
164 let buffer = state.buffers.entry(session_id.to_string()).or_default();
165 buffer.push(message.clone());
166 buffer.len() >= 20
167 };
168
169 if should_flush {
170 self.flush(session_id).await?;
171 }
172
173 Ok(())
174 }
175
176 async fn flush(&self, session_id: &str) -> Result<()> {
177 flush_session(&self.base_dir, &self.state, session_id).await
178 }
179
180 async fn load(&self, session_id: &str) -> Result<Vec<Message>> {
181 let path = self.session_path(session_id);
182 load_messages_from_path(&path).await
183 }
184
185 async fn delete(&self, session_id: &str) -> Result<()> {
186 {
187 let mut state = self.state.lock().await;
188 state.buffers.remove(session_id);
189 }
190
191 let path = self.session_path(session_id);
192 match tokio::fs::remove_file(path).await {
193 Ok(()) => Ok(()),
194 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
195 Err(err) => Err(err.into()),
196 }
197 }
198
199 async fn list(&self) -> Result<Vec<String>> {
200 if !self.base_dir.exists() {
201 return Ok(Vec::new());
202 }
203
204 let mut out = Vec::new();
205 let mut entries = tokio::fs::read_dir(&self.base_dir).await?;
206 while let Some(entry) = entries.next_entry().await? {
207 let path = entry.path();
208 if path.extension().and_then(|ext| ext.to_str()) != Some("jsonl") {
209 continue;
210 }
211 if let Some(stem) = path.file_stem().and_then(|stem| stem.to_str()) {
212 out.push(stem.to_string());
213 }
214 }
215 out.sort();
216 Ok(out)
217 }
218
219 fn as_any(&self) -> &dyn Any {
220 self
221 }
222}
223
224async fn flush_session(
225 base_dir: &Path,
226 state: &Arc<Mutex<FileStoreState>>,
227 session_id: &str,
228) -> Result<()> {
229 let pending = {
230 let mut state = state.lock().await;
231 state.buffers.remove(session_id).unwrap_or_default()
232 };
233
234 if pending.is_empty() {
235 return Ok(());
236 }
237
238 tokio::fs::create_dir_all(base_dir).await?;
239 let path = base_dir.join(format!("{session_id}.jsonl"));
240 let mut existing = load_messages_from_path(&path).await?;
241 existing.extend(pending);
242
243 let meta = SessionMeta {
244 session_id: session_id.to_string(),
245 created_at_ms: now_ms(),
246 agent_loop_version: env!("CARGO_PKG_VERSION").to_string(),
247 message_count: existing.len(),
248 };
249
250 let mut lines = Vec::with_capacity(existing.len() + 1);
251 lines.push(serde_json::to_string(&meta)?);
252 for message in existing {
253 lines.push(serde_json::to_string(&message)?);
254 }
255
256 let payload = format!("{}\n", lines.join("\n"));
257 tokio::fs::write(path, payload).await?;
258 Ok(())
259}
260
261async fn load_meta_from_path(path: &Path) -> Result<Option<SessionMeta>> {
262 if !path.exists() {
263 return Ok(None);
264 }
265 let content = tokio::fs::read_to_string(path).await?;
266 let mut lines = content.lines();
267 let Some(first) = lines.next() else {
268 return Ok(None);
269 };
270 let meta = serde_json::from_str(first)?;
271 Ok(Some(meta))
272}
273
274async fn load_messages_from_path(path: &Path) -> Result<Vec<Message>> {
275 if !path.exists() {
276 return Ok(Vec::new());
277 }
278
279 let content = tokio::fs::read_to_string(path).await?;
280 let mut messages = Vec::new();
281
282 for (idx, line) in content.lines().enumerate() {
283 if line.trim().is_empty() {
284 continue;
285 }
286 if idx == 0 && serde_json::from_str::<SessionMeta>(line).is_ok() {
287 continue;
288 }
289 messages.push(serde_json::from_str::<Message>(line)?);
290 }
291
292 Ok(messages)
293}
294
295fn now_ms() -> u64 {
296 std::time::SystemTime::now()
297 .duration_since(std::time::UNIX_EPOCH)
298 .map(|d| d.as_millis() as u64)
299 .unwrap_or_default()
300}