Skip to main content

motosan_agent_loop/
session_store.rs

1use std::any::Any;
2use std::collections::HashMap;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use tokio::sync::Mutex;
8
9use crate::{Message, Result};
10
11/// Metadata header written at the top of a persisted session file.
12///
13/// Stores the session ID, creation timestamp, agent-loop version, and
14/// message count so that a [`SessionStore`] can verify compatibility
15/// when loading a previously saved session.
16#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
17pub struct SessionMeta {
18    pub session_id: String,
19    pub created_at_ms: u64,
20    pub agent_loop_version: String,
21    pub message_count: usize,
22}
23
24/// Pluggable persistence backend for [`AgentSession`](crate::AgentSession) history.
25///
26/// Implementations must be `Send + Sync` so they can be shared across
27/// async tasks.  Two built-in implementations are provided:
28///
29/// * [`MemorySessionStore`] — in-process, for tests and short-lived sessions.
30/// * [`FileSessionStore`] — JSONL-based, for durable persistence across restarts.
31#[async_trait]
32pub trait SessionStore: Send + Sync {
33    async fn append(&self, session_id: &str, message: &Message) -> Result<()>;
34    async fn flush(&self, session_id: &str) -> Result<()>;
35    async fn load(&self, session_id: &str) -> Result<Vec<Message>>;
36    async fn delete(&self, session_id: &str) -> Result<()>;
37    async fn list(&self) -> Result<Vec<String>>;
38    fn as_any(&self) -> &dyn Any;
39}
40
41#[derive(Default)]
42pub struct MemorySessionStore {
43    sessions: Arc<Mutex<HashMap<String, Vec<Message>>>>,
44}
45
46impl MemorySessionStore {
47    pub fn new() -> Self {
48        Self::default()
49    }
50}
51
52#[async_trait]
53impl SessionStore for MemorySessionStore {
54    async fn append(&self, session_id: &str, message: &Message) -> Result<()> {
55        let mut sessions = self.sessions.lock().await;
56        sessions
57            .entry(session_id.to_string())
58            .or_default()
59            .push(message.clone());
60        Ok(())
61    }
62
63    async fn flush(&self, _session_id: &str) -> Result<()> {
64        Ok(())
65    }
66
67    async fn load(&self, session_id: &str) -> Result<Vec<Message>> {
68        let sessions = self.sessions.lock().await;
69        Ok(sessions.get(session_id).cloned().unwrap_or_default())
70    }
71
72    async fn delete(&self, session_id: &str) -> Result<()> {
73        let mut sessions = self.sessions.lock().await;
74        sessions.remove(session_id);
75        Ok(())
76    }
77
78    async fn list(&self) -> Result<Vec<String>> {
79        let sessions = self.sessions.lock().await;
80        let mut ids: Vec<String> = sessions.keys().cloned().collect();
81        ids.sort();
82        Ok(ids)
83    }
84
85    fn as_any(&self) -> &dyn Any {
86        self
87    }
88}
89
90#[derive(Default)]
91struct FileStoreState {
92    buffers: HashMap<String, Vec<Message>>,
93}
94
95pub struct FileSessionStore {
96    base_dir: PathBuf,
97    state: Arc<Mutex<FileStoreState>>,
98    flush_task: Option<tokio::task::JoinHandle<()>>,
99}
100
101impl FileSessionStore {
102    /// Create a file-backed session store.
103    ///
104    /// If a Tokio runtime is available, a background flush task is started
105    /// (every 5s). Without a runtime, automatic periodic flushing is disabled;
106    /// callers should invoke `flush()` explicitly for durability.
107    pub fn new(base_dir: impl Into<PathBuf>) -> Self {
108        let base_dir = base_dir.into();
109        let state = Arc::new(Mutex::new(FileStoreState::default()));
110        let state_clone = state.clone();
111        let base_dir_clone = base_dir.clone();
112
113        let flush_task = tokio::runtime::Handle::try_current().ok().map(|handle| {
114            handle.spawn(async move {
115                let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
116                loop {
117                    interval.tick().await;
118                    let session_ids = {
119                        let state = state_clone.lock().await;
120                        state
121                            .buffers
122                            .iter()
123                            .filter(|(_, messages)| !messages.is_empty())
124                            .map(|(id, _)| id.clone())
125                            .collect::<Vec<_>>()
126                    };
127                    for session_id in session_ids {
128                        let _ = flush_session(&base_dir_clone, &state_clone, &session_id).await;
129                    }
130                }
131            })
132        });
133
134        Self {
135            base_dir,
136            state,
137            flush_task,
138        }
139    }
140
141    fn session_path(&self, session_id: &str) -> PathBuf {
142        self.base_dir.join(format!("{session_id}.jsonl"))
143    }
144
145    pub async fn load_meta(&self, session_id: &str) -> Result<Option<SessionMeta>> {
146        let path = self.session_path(session_id);
147        load_meta_from_path(&path).await
148    }
149}
150
151impl Drop for FileSessionStore {
152    fn drop(&mut self) {
153        if let Some(task) = &self.flush_task {
154            task.abort();
155        }
156    }
157}
158
159#[async_trait]
160impl SessionStore for FileSessionStore {
161    async fn append(&self, session_id: &str, message: &Message) -> Result<()> {
162        let should_flush = {
163            let mut state = self.state.lock().await;
164            let buffer = state.buffers.entry(session_id.to_string()).or_default();
165            buffer.push(message.clone());
166            buffer.len() >= 20
167        };
168
169        if should_flush {
170            self.flush(session_id).await?;
171        }
172
173        Ok(())
174    }
175
176    async fn flush(&self, session_id: &str) -> Result<()> {
177        flush_session(&self.base_dir, &self.state, session_id).await
178    }
179
180    async fn load(&self, session_id: &str) -> Result<Vec<Message>> {
181        let path = self.session_path(session_id);
182        load_messages_from_path(&path).await
183    }
184
185    async fn delete(&self, session_id: &str) -> Result<()> {
186        {
187            let mut state = self.state.lock().await;
188            state.buffers.remove(session_id);
189        }
190
191        let path = self.session_path(session_id);
192        match tokio::fs::remove_file(path).await {
193            Ok(()) => Ok(()),
194            Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
195            Err(err) => Err(err.into()),
196        }
197    }
198
199    async fn list(&self) -> Result<Vec<String>> {
200        if !self.base_dir.exists() {
201            return Ok(Vec::new());
202        }
203
204        let mut out = Vec::new();
205        let mut entries = tokio::fs::read_dir(&self.base_dir).await?;
206        while let Some(entry) = entries.next_entry().await? {
207            let path = entry.path();
208            if path.extension().and_then(|ext| ext.to_str()) != Some("jsonl") {
209                continue;
210            }
211            if let Some(stem) = path.file_stem().and_then(|stem| stem.to_str()) {
212                out.push(stem.to_string());
213            }
214        }
215        out.sort();
216        Ok(out)
217    }
218
219    fn as_any(&self) -> &dyn Any {
220        self
221    }
222}
223
224async fn flush_session(
225    base_dir: &Path,
226    state: &Arc<Mutex<FileStoreState>>,
227    session_id: &str,
228) -> Result<()> {
229    let pending = {
230        let mut state = state.lock().await;
231        state.buffers.remove(session_id).unwrap_or_default()
232    };
233
234    if pending.is_empty() {
235        return Ok(());
236    }
237
238    tokio::fs::create_dir_all(base_dir).await?;
239    let path = base_dir.join(format!("{session_id}.jsonl"));
240    let mut existing = load_messages_from_path(&path).await?;
241    existing.extend(pending);
242
243    let meta = SessionMeta {
244        session_id: session_id.to_string(),
245        created_at_ms: now_ms(),
246        agent_loop_version: env!("CARGO_PKG_VERSION").to_string(),
247        message_count: existing.len(),
248    };
249
250    let mut lines = Vec::with_capacity(existing.len() + 1);
251    lines.push(serde_json::to_string(&meta)?);
252    for message in existing {
253        lines.push(serde_json::to_string(&message)?);
254    }
255
256    let payload = format!("{}\n", lines.join("\n"));
257    tokio::fs::write(path, payload).await?;
258    Ok(())
259}
260
261async fn load_meta_from_path(path: &Path) -> Result<Option<SessionMeta>> {
262    if !path.exists() {
263        return Ok(None);
264    }
265    let content = tokio::fs::read_to_string(path).await?;
266    let mut lines = content.lines();
267    let Some(first) = lines.next() else {
268        return Ok(None);
269    };
270    let meta = serde_json::from_str(first)?;
271    Ok(Some(meta))
272}
273
274async fn load_messages_from_path(path: &Path) -> Result<Vec<Message>> {
275    if !path.exists() {
276        return Ok(Vec::new());
277    }
278
279    let content = tokio::fs::read_to_string(path).await?;
280    let mut messages = Vec::new();
281
282    for (idx, line) in content.lines().enumerate() {
283        if line.trim().is_empty() {
284            continue;
285        }
286        if idx == 0 && serde_json::from_str::<SessionMeta>(line).is_ok() {
287            continue;
288        }
289        messages.push(serde_json::from_str::<Message>(line)?);
290    }
291
292    Ok(messages)
293}
294
295fn now_ms() -> u64 {
296    std::time::SystemTime::now()
297        .duration_since(std::time::UNIX_EPOCH)
298        .map(|d| d.as_millis() as u64)
299        .unwrap_or_default()
300}