Skip to main content

motosan_agent_loop/session/
store.rs

1use std::any::Any;
2use std::collections::HashMap;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use tokio::sync::Mutex;
8
9use crate::error::AgentError;
10use crate::message::{Message, Role};
11use crate::session::entry::{deterministic_entry_id, new_entry_id, EntryId, SessionEntry};
12use crate::Result;
13
14pub use crate::session::meta::SessionMeta;
15
16/// Pluggable persistence backend for [`AgentSession`](crate::AgentSession) history.
17#[async_trait]
18pub trait SessionStore: Send + Sync {
19    async fn append(&self, session_id: &str, message: &Message) -> Result<()>;
20    async fn flush(&self, session_id: &str) -> Result<()>;
21    async fn load(&self, session_id: &str) -> Result<Vec<Message>>;
22    async fn delete(&self, session_id: &str) -> Result<()>;
23    async fn list(&self) -> Result<Vec<String>>;
24    fn as_any(&self) -> &dyn Any;
25
26    async fn append_entry(&self, session_id: &str, entry: &SessionEntry) -> Result<EntryId> {
27        match entry {
28            SessionEntry::Message { message } => {
29                self.append(session_id, message).await?;
30                Ok(new_entry_id())
31            }
32            SessionEntry::Custom { .. } => Err(AgentError::Session(
33                "custom entries not supported by this SessionStore implementation; override append_entry() to enable".into(),
34            )),
35        }
36    }
37
38    async fn load_entries(&self, session_id: &str) -> Result<Vec<(EntryId, SessionEntry)>> {
39        let messages = self.load(session_id).await?;
40        Ok(messages
41            .into_iter()
42            .enumerate()
43            .map(|(i, m)| {
44                let content = serde_json::to_string(&m).unwrap_or_default();
45                (
46                    deterministic_entry_id(i, &content),
47                    SessionEntry::message(m),
48                )
49            })
50            .collect())
51    }
52
53    async fn load_meta(&self, _session_id: &str) -> Result<Option<SessionMeta>> {
54        Ok(None)
55    }
56
57    async fn update_meta(&self, _session_id: &str, _meta: &SessionMeta) -> Result<()> {
58        Err(AgentError::Session(
59            "meta updates not supported by this SessionStore implementation".into(),
60        ))
61    }
62
63    async fn list_meta(&self) -> Result<Vec<SessionMeta>> {
64        let ids = self.list().await?;
65        let mut out = Vec::with_capacity(ids.len());
66        for id in ids {
67            if let Some(meta) = self.load_meta(&id).await? {
68                out.push(meta);
69            }
70        }
71        Ok(out)
72    }
73}
74
75type SessionEntryLog = Vec<(EntryId, SessionEntry)>;
76type SessionEntryMap = HashMap<String, SessionEntryLog>;
77
78#[derive(Default)]
79pub struct MemorySessionStore {
80    entries: Arc<Mutex<SessionEntryMap>>,
81    metas: Arc<Mutex<HashMap<String, SessionMeta>>>,
82}
83
84impl MemorySessionStore {
85    pub fn new() -> Self {
86        Self::default()
87    }
88}
89
90#[async_trait]
91impl SessionStore for MemorySessionStore {
92    async fn append(&self, session_id: &str, message: &Message) -> Result<()> {
93        let mut entries = self.entries.lock().await;
94        entries
95            .entry(session_id.to_string())
96            .or_default()
97            .push((new_entry_id(), SessionEntry::message(message.clone())));
98        drop(entries);
99        self.refresh_memory_meta(session_id).await?;
100        Ok(())
101    }
102
103    async fn flush(&self, _session_id: &str) -> Result<()> {
104        Ok(())
105    }
106
107    async fn load(&self, session_id: &str) -> Result<Vec<Message>> {
108        let entries = self.entries.lock().await;
109        Ok(entries
110            .get(session_id)
111            .map(|log| {
112                log.iter()
113                    .filter_map(|(_, e)| e.as_message().cloned())
114                    .collect()
115            })
116            .unwrap_or_default())
117    }
118
119    async fn delete(&self, session_id: &str) -> Result<()> {
120        let mut entries = self.entries.lock().await;
121        entries.remove(session_id);
122        let mut metas = self.metas.lock().await;
123        metas.remove(session_id);
124        Ok(())
125    }
126
127    async fn list(&self) -> Result<Vec<String>> {
128        let entries = self.entries.lock().await;
129        let metas = self.metas.lock().await;
130        let mut ids: Vec<String> = entries.keys().chain(metas.keys()).cloned().collect();
131        ids.sort();
132        ids.dedup();
133        Ok(ids)
134    }
135
136    fn as_any(&self) -> &dyn Any {
137        self
138    }
139
140    async fn append_entry(&self, session_id: &str, entry: &SessionEntry) -> Result<EntryId> {
141        let mut entries = self.entries.lock().await;
142        let id = new_entry_id();
143        entries
144            .entry(session_id.to_string())
145            .or_default()
146            .push((id.clone(), entry.clone()));
147        drop(entries);
148        self.refresh_memory_meta(session_id).await?;
149        Ok(id)
150    }
151
152    async fn load_entries(&self, session_id: &str) -> Result<Vec<(EntryId, SessionEntry)>> {
153        let entries = self.entries.lock().await;
154        Ok(entries.get(session_id).cloned().unwrap_or_default())
155    }
156
157    async fn load_meta(&self, session_id: &str) -> Result<Option<SessionMeta>> {
158        let metas = self.metas.lock().await;
159        Ok(metas.get(session_id).cloned())
160    }
161
162    async fn update_meta(&self, session_id: &str, meta: &SessionMeta) -> Result<()> {
163        let mut metas = self.metas.lock().await;
164        metas.insert(session_id.to_string(), meta.clone());
165        Ok(())
166    }
167
168    async fn list_meta(&self) -> Result<Vec<SessionMeta>> {
169        let metas = self.metas.lock().await;
170        let mut out: Vec<SessionMeta> = metas.values().cloned().collect();
171        out.sort_by(|a, b| a.session_id.cmp(&b.session_id));
172        Ok(out)
173    }
174}
175
176impl MemorySessionStore {
177    async fn refresh_memory_meta(&self, session_id: &str) -> Result<()> {
178        let entries = self.load_entries(session_id).await?;
179        let mut metas = self.metas.lock().await;
180        let existing = metas.get(session_id).cloned();
181        let now = now_ms();
182        let meta = synthesize_meta(session_id, &entries, existing.as_ref(), now);
183        metas.insert(session_id.to_string(), meta);
184        Ok(())
185    }
186}
187
188#[derive(Default)]
189struct FileStoreState {
190    buffers: HashMap<String, Vec<(EntryId, SessionEntry)>>,
191}
192
193#[derive(Default)]
194struct FileStoreLocks {
195    // Keep one stable mutex per session ID for the lifetime of the store.
196    // Pruning these eagerly can reintroduce concurrent flushes if older tasks
197    // are still waiting on a previously-cloned Arc for the same session.
198    sessions: HashMap<String, Arc<Mutex<()>>>,
199}
200
201pub struct FileSessionStore {
202    base_dir: PathBuf,
203    state: Arc<Mutex<FileStoreState>>,
204    locks: Arc<Mutex<FileStoreLocks>>,
205    flush_task: Option<tokio::task::JoinHandle<()>>,
206}
207
208impl FileSessionStore {
209    /// Create a file-backed session store.
210    pub fn new(base_dir: impl Into<PathBuf>) -> Self {
211        let base_dir = base_dir.into();
212        let state = Arc::new(Mutex::new(FileStoreState::default()));
213        let locks = Arc::new(Mutex::new(FileStoreLocks::default()));
214        let state_clone = state.clone();
215        let locks_clone = locks.clone();
216        let base_dir_clone = base_dir.clone();
217
218        let flush_task = tokio::runtime::Handle::try_current().ok().map(|handle| {
219            handle.spawn(async move {
220                let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
221                loop {
222                    interval.tick().await;
223                    let session_ids = {
224                        let state = state_clone.lock().await;
225                        state
226                            .buffers
227                            .iter()
228                            .filter(|(_, entries)| !entries.is_empty())
229                            .map(|(id, _)| id.clone())
230                            .collect::<Vec<_>>()
231                    };
232                    for session_id in session_ids {
233                        let _ =
234                            flush_session(&base_dir_clone, &state_clone, &locks_clone, &session_id)
235                                .await;
236                    }
237                }
238            })
239        });
240
241        Self {
242            base_dir,
243            state,
244            locks,
245            flush_task,
246        }
247    }
248
249    fn session_path(&self, session_id: &str) -> Result<PathBuf> {
250        validate_session_id(session_id)?;
251        Ok(self.base_dir.join(format!("{session_id}.jsonl")))
252    }
253
254    /// Inherent method preserved for 0.14 API compatibility.
255    pub async fn load_meta(&self, session_id: &str) -> Result<Option<SessionMeta>> {
256        <Self as SessionStore>::load_meta(self, session_id).await
257    }
258}
259
260fn validate_session_id(session_id: &str) -> Result<()> {
261    use std::path::{Component, Path};
262
263    if session_id.is_empty() {
264        return Err(AgentError::Session("session id must not be empty".into()));
265    }
266
267    let mut components = Path::new(session_id).components();
268    match (components.next(), components.next()) {
269        (Some(Component::Normal(_)), None) if session_id != "." && session_id != ".." => Ok(()),
270        _ => Err(AgentError::Session(format!(
271            "invalid session id '{session_id}': must be a single path-safe file stem"
272        ))),
273    }
274}
275
276impl Drop for FileSessionStore {
277    fn drop(&mut self) {
278        if let Some(task) = &self.flush_task {
279            task.abort();
280        }
281    }
282}
283
284#[async_trait]
285impl SessionStore for FileSessionStore {
286    async fn append(&self, session_id: &str, message: &Message) -> Result<()> {
287        let entry = SessionEntry::message(message.clone());
288        let _ = self.append_entry(session_id, &entry).await?;
289        Ok(())
290    }
291
292    async fn flush(&self, session_id: &str) -> Result<()> {
293        validate_session_id(session_id)?;
294        flush_session(&self.base_dir, &self.state, &self.locks, session_id).await
295    }
296
297    async fn load(&self, session_id: &str) -> Result<Vec<Message>> {
298        let path = self.session_path(session_id)?;
299        let entries = load_entries_from_path(&path).await?;
300        Ok(entries
301            .into_iter()
302            .filter_map(|(_, e)| e.as_message().cloned())
303            .collect())
304    }
305
306    async fn delete(&self, session_id: &str) -> Result<()> {
307        let path = self.session_path(session_id)?;
308        let session_lock = session_lock(&self.locks, session_id).await;
309        let _guard = session_lock.lock().await;
310        {
311            let mut state = self.state.lock().await;
312            state.buffers.remove(session_id);
313        }
314        match tokio::fs::remove_file(path).await {
315            Ok(()) => Ok(()),
316            Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
317            Err(err) => Err(err.into()),
318        }
319    }
320
321    async fn list(&self) -> Result<Vec<String>> {
322        if !self.base_dir.exists() {
323            return Ok(Vec::new());
324        }
325        let mut out = Vec::new();
326        let mut entries = tokio::fs::read_dir(&self.base_dir).await?;
327        while let Some(entry) = entries.next_entry().await? {
328            let path = entry.path();
329            if path.extension().and_then(|ext| ext.to_str()) != Some("jsonl") {
330                continue;
331            }
332            if let Some(stem) = path.file_stem().and_then(|stem| stem.to_str()) {
333                out.push(stem.to_string());
334            }
335        }
336        out.sort();
337        Ok(out)
338    }
339
340    fn as_any(&self) -> &dyn Any {
341        self
342    }
343
344    async fn append_entry(&self, session_id: &str, entry: &SessionEntry) -> Result<EntryId> {
345        validate_session_id(session_id)?;
346        let id = new_entry_id();
347        let should_flush = {
348            let mut state = self.state.lock().await;
349            let buffer = state.buffers.entry(session_id.to_string()).or_default();
350            buffer.push((id.clone(), entry.clone()));
351            buffer.len() >= 20
352        };
353        if should_flush {
354            self.flush(session_id).await?;
355        }
356        Ok(id)
357    }
358
359    async fn load_entries(&self, session_id: &str) -> Result<Vec<(EntryId, SessionEntry)>> {
360        let path = self.session_path(session_id)?;
361        let mut existing = load_entries_from_path(&path).await?;
362        let pending = {
363            let state = self.state.lock().await;
364            state.buffers.get(session_id).cloned().unwrap_or_default()
365        };
366        existing.extend(pending);
367        Ok(existing)
368    }
369
370    async fn load_meta(&self, session_id: &str) -> Result<Option<SessionMeta>> {
371        let path = self.session_path(session_id)?;
372        let on_disk = load_meta_from_path(&path).await?;
373        let pending = {
374            let state = self.state.lock().await;
375            state.buffers.get(session_id).cloned().unwrap_or_default()
376        };
377        if pending.is_empty() {
378            return Ok(on_disk);
379        }
380        let mut entries = load_entries_from_path(&path).await?;
381        entries.extend(pending);
382        Ok(Some(synthesize_meta(
383            session_id,
384            &entries,
385            on_disk.as_ref(),
386            now_ms(),
387        )))
388    }
389
390    async fn update_meta(&self, session_id: &str, meta: &SessionMeta) -> Result<()> {
391        let path = self.session_path(session_id)?;
392        let session_lock = session_lock(&self.locks, session_id).await;
393        let _guard = session_lock.lock().await;
394        tokio::fs::create_dir_all(&self.base_dir).await?;
395
396        let pending = {
397            let mut state = self.state.lock().await;
398            state.buffers.remove(session_id).unwrap_or_default()
399        };
400
401        let mut existing_entries = load_entries_from_path(&path).await?;
402        existing_entries.extend(pending);
403
404        write_entries_file(&path, meta, &existing_entries).await
405    }
406
407    async fn list_meta(&self) -> Result<Vec<SessionMeta>> {
408        let ids = self.list().await?;
409        let mut out = Vec::with_capacity(ids.len());
410        for id in ids {
411            if let Some(meta) = self.load_meta(&id).await? {
412                out.push(meta);
413            }
414        }
415        out.sort_by_key(|meta| std::cmp::Reverse(meta.updated_at_ms));
416        Ok(out)
417    }
418}
419
420async fn session_lock(locks: &Arc<Mutex<FileStoreLocks>>, session_id: &str) -> Arc<Mutex<()>> {
421    let mut locks = locks.lock().await;
422    locks
423        .sessions
424        .entry(session_id.to_string())
425        .or_insert_with(|| Arc::new(Mutex::new(())))
426        .clone()
427}
428
429async fn flush_session(
430    base_dir: &Path,
431    state: &Arc<Mutex<FileStoreState>>,
432    locks: &Arc<Mutex<FileStoreLocks>>,
433    session_id: &str,
434) -> Result<()> {
435    let session_lock = session_lock(locks, session_id).await;
436    let _guard = session_lock.lock().await;
437
438    let pending = {
439        let mut state = state.lock().await;
440        state.buffers.remove(session_id).unwrap_or_default()
441    };
442
443    if pending.is_empty() {
444        return Ok(());
445    }
446
447    tokio::fs::create_dir_all(base_dir).await?;
448    let path = base_dir.join(format!("{session_id}.jsonl"));
449    let mut existing = load_entries_from_path(&path).await?;
450    existing.extend(pending);
451
452    let existing_meta = load_meta_from_path(&path).await?;
453    let meta = synthesize_meta(session_id, &existing, existing_meta.as_ref(), now_ms());
454    write_entries_file(&path, &meta, &existing).await
455}
456
457async fn write_entries_file(
458    path: &Path,
459    meta: &SessionMeta,
460    entries: &[(EntryId, SessionEntry)],
461) -> Result<()> {
462    let mut lines = Vec::with_capacity(entries.len() + 1);
463    lines.push(serde_json::to_string(meta)?);
464    for (id, entry) in entries {
465        lines.push(serde_json::to_string(&serde_json::json!({
466            "id": id,
467            "entry": entry,
468        }))?);
469    }
470    let payload = format!("{}\n", lines.join("\n"));
471    let tmp_path = path.with_extension("jsonl.tmp");
472    tokio::fs::write(&tmp_path, payload).await?;
473    tokio::fs::rename(&tmp_path, path).await?;
474    Ok(())
475}
476
477fn synthesize_meta(
478    session_id: &str,
479    entries: &[(EntryId, SessionEntry)],
480    existing_meta: Option<&SessionMeta>,
481    now: u64,
482) -> SessionMeta {
483    let first_user = entries
484        .iter()
485        .filter_map(|(_, e)| e.as_message())
486        .find(|m| matches!(m.role(), Role::User))
487        .map(|m| truncate_preview(&m.text()));
488    let entry_count = entries.len();
489    let message_count = entries
490        .iter()
491        .filter(|(_, e)| e.as_message().is_some())
492        .count();
493
494    SessionMeta {
495        session_id: session_id.to_string(),
496        created_at_ms: existing_meta
497            .map(|m| {
498                if m.created_at_ms == 0 {
499                    now
500                } else {
501                    m.created_at_ms
502                }
503            })
504            .unwrap_or(now),
505        updated_at_ms: now,
506        agent_loop_version: env!("CARGO_PKG_VERSION").to_string(),
507        entry_count,
508        message_count,
509        name: existing_meta.and_then(|m| m.name.clone()),
510        cwd: existing_meta.and_then(|m| m.cwd.clone()),
511        first_user_message: first_user
512            .or_else(|| existing_meta.and_then(|m| m.first_user_message.clone())),
513        extra: existing_meta.map(|m| m.extra.clone()).unwrap_or_default(),
514    }
515}
516
517fn truncate_preview(s: &str) -> String {
518    let char_count = s.chars().count();
519    if char_count > 200 {
520        let preview: String = s.chars().take(197).collect();
521        format!("{preview}...")
522    } else {
523        s.to_string()
524    }
525}
526
527async fn load_meta_from_path(path: &Path) -> Result<Option<SessionMeta>> {
528    if !path.exists() {
529        return Ok(None);
530    }
531    let content = tokio::fs::read_to_string(path).await?;
532    let mut lines = content.lines();
533    let Some(first) = lines.next() else {
534        return Ok(None);
535    };
536    let meta = serde_json::from_str(first)?;
537    Ok(Some(meta))
538}
539
540async fn load_entries_from_path(path: &Path) -> Result<Vec<(EntryId, SessionEntry)>> {
541    if !path.exists() {
542        return Ok(Vec::new());
543    }
544
545    let content = tokio::fs::read_to_string(path).await?;
546    let mut out = Vec::new();
547
548    #[derive(serde::Deserialize)]
549    struct EntryLine {
550        id: String,
551        entry: SessionEntry,
552    }
553
554    for (idx, line) in content.lines().enumerate() {
555        if line.trim().is_empty() {
556            continue;
557        }
558        if idx == 0 && serde_json::from_str::<SessionMeta>(line).is_ok() {
559            continue;
560        }
561        if let Ok(parsed) = serde_json::from_str::<EntryLine>(line) {
562            out.push((parsed.id, parsed.entry));
563            continue;
564        }
565        let msg: Message = serde_json::from_str(line)?;
566        out.push((
567            deterministic_entry_id(idx, line),
568            SessionEntry::message(msg),
569        ));
570    }
571
572    Ok(out)
573}
574
575pub(crate) fn now_ms() -> u64 {
576    std::time::SystemTime::now()
577        .duration_since(std::time::UNIX_EPOCH)
578        .map(|d| d.as_millis() as u64)
579        .unwrap_or_default()
580}
581
582#[cfg(test)]
583mod tests {
584    use super::*;
585    use crate::message::Message;
586    use tempfile::tempdir;
587
588    #[tokio::test]
589    async fn default_append_entry_rejects_custom_for_legacy_impl() {
590        struct LegacyStore;
591        #[async_trait]
592        impl SessionStore for LegacyStore {
593            async fn append(&self, _session_id: &str, _message: &Message) -> Result<()> {
594                Ok(())
595            }
596            async fn flush(&self, _session_id: &str) -> Result<()> {
597                Ok(())
598            }
599            async fn load(&self, _session_id: &str) -> Result<Vec<Message>> {
600                Ok(Vec::new())
601            }
602            async fn delete(&self, _session_id: &str) -> Result<()> {
603                Ok(())
604            }
605            async fn list(&self) -> Result<Vec<String>> {
606                Ok(Vec::new())
607            }
608            fn as_any(&self) -> &dyn Any {
609                self
610            }
611        }
612
613        let store = LegacyStore;
614        let err = store
615            .append_entry("s1", &SessionEntry::custom("x", serde_json::json!({})))
616            .await
617            .expect_err("custom should error by default");
618        assert!(err.to_string().contains("custom entries not supported"));
619    }
620
621    #[tokio::test]
622    async fn memory_store_round_trips_custom_entries() {
623        let store = MemorySessionStore::new();
624        let _ = store
625            .append_entry("s1", &SessionEntry::message(Message::user("hi")))
626            .await
627            .unwrap();
628        let _ = store
629            .append_entry(
630                "s1",
631                &SessionEntry::custom("compaction", serde_json::json!({ "summary": "x" })),
632            )
633            .await
634            .unwrap();
635        let entries = store.load_entries("s1").await.unwrap();
636        assert_eq!(entries.len(), 2);
637        let messages = store.load("s1").await.unwrap();
638        assert_eq!(messages.len(), 1);
639        assert_eq!(messages[0].text(), "hi");
640    }
641
642    #[tokio::test]
643    async fn file_store_round_trips_custom_entries() {
644        let dir = tempdir().unwrap();
645        let store = FileSessionStore::new(dir.path().to_path_buf());
646
647        let _ = store
648            .append_entry("sess-1", &SessionEntry::message(Message::user("hi")))
649            .await
650            .unwrap();
651        let _ = store
652            .append_entry(
653                "sess-1",
654                &SessionEntry::custom(
655                    "compaction",
656                    serde_json::json!({ "summary": "earlier stuff" }),
657                ),
658            )
659            .await
660            .unwrap();
661        store.flush("sess-1").await.unwrap();
662
663        let entries = store.load_entries("sess-1").await.unwrap();
664        assert_eq!(entries.len(), 2);
665        assert_eq!(entries[0].1.as_message().unwrap().text(), "hi");
666        assert_eq!(entries[1].1.custom_kind(), Some("compaction"));
667    }
668
669    #[tokio::test]
670    async fn file_store_loads_0_14_era_format() {
671        let dir = tempdir().unwrap();
672        let path = dir.path().join("legacy.jsonl");
673        let lines = [
674            serde_json::json!({
675                "session_id": "legacy",
676                "created_at_ms": 1_700_000_000_000u64,
677                "agent_loop_version": "0.14.0",
678                "message_count": 2
679            })
680            .to_string(),
681            serde_json::to_string(&Message::user("legacy user")).unwrap(),
682            serde_json::to_string(&Message::assistant("legacy assistant")).unwrap(),
683        ];
684        tokio::fs::write(&path, format!("{}\n", lines.join("\n")))
685            .await
686            .unwrap();
687
688        let store = FileSessionStore::new(dir.path().to_path_buf());
689        let meta = store.load_meta("legacy").await.unwrap().unwrap();
690        assert_eq!(meta.session_id, "legacy");
691        assert_eq!(meta.message_count, 2);
692        let entries = store.load_entries("legacy").await.unwrap();
693        assert_eq!(entries.len(), 2);
694        assert_eq!(entries[0].1.as_message().unwrap().text(), "legacy user");
695        assert_eq!(entries[0].0, deterministic_entry_id(1, &lines[1]));
696        assert_eq!(entries[1].0, deterministic_entry_id(2, &lines[2]));
697
698        let entries_again = store.load_entries("legacy").await.unwrap();
699        assert_eq!(entries_again[0].0, entries[0].0);
700        assert_eq!(entries_again[1].0, entries[1].0);
701    }
702
703    #[tokio::test]
704    async fn memory_store_append_keeps_meta_in_sync() {
705        let store = MemorySessionStore::new();
706        store.append("s1", &Message::user("hi")).await.unwrap();
707
708        let meta = store.load_meta("s1").await.unwrap().unwrap();
709        assert_eq!(meta.session_id, "s1");
710        assert_eq!(meta.message_count, 1);
711        assert_eq!(meta.entry_count, 1);
712
713        let listed = store.list_meta().await.unwrap();
714        assert_eq!(listed.len(), 1);
715        assert_eq!(listed[0].session_id, "s1");
716    }
717
718    #[tokio::test]
719    async fn file_store_update_meta_does_not_duplicate_pending_entries() {
720        let dir = tempdir().unwrap();
721        let store = FileSessionStore::new(dir.path().to_path_buf());
722
723        store
724            .append_entry("s1", &SessionEntry::message(Message::user("hi")))
725            .await
726            .unwrap();
727
728        let meta = SessionMeta {
729            session_id: "s1".into(),
730            name: Some("demo".into()),
731            ..Default::default()
732        };
733        store.update_meta("s1", &meta).await.unwrap();
734        store.flush("s1").await.unwrap();
735
736        let entries = store.load_entries("s1").await.unwrap();
737        assert_eq!(entries.len(), 1);
738        assert_eq!(entries[0].1.as_message().unwrap().text(), "hi");
739    }
740
741    #[tokio::test]
742    async fn file_store_update_meta_fails_fast_on_invalid_existing_log() {
743        let dir = tempdir().unwrap();
744        let path = dir.path().join("broken.jsonl");
745        tokio::fs::write(&path, "{not-json}\n").await.unwrap();
746
747        let store = FileSessionStore::new(dir.path().to_path_buf());
748        let meta = SessionMeta {
749            session_id: "broken".into(),
750            name: Some("demo".into()),
751            ..Default::default()
752        };
753        assert!(store.update_meta("broken", &meta).await.is_err());
754    }
755
756    #[tokio::test]
757    async fn file_store_rejects_path_traversal_session_id() {
758        let dir = tempdir().unwrap();
759        let store = FileSessionStore::new(dir.path().to_path_buf());
760
761        let err = store
762            .append_entry("../escape", &SessionEntry::message(Message::user("hi")))
763            .await
764            .expect_err("path traversal session id should be rejected");
765        assert!(err.to_string().contains("invalid session id"));
766    }
767
768    #[tokio::test]
769    async fn file_store_concurrent_flushes_do_not_race() {
770        let dir = tempdir().unwrap();
771        let store = Arc::new(FileSessionStore::new(dir.path().to_path_buf()));
772
773        store
774            .append_entry("race", &SessionEntry::message(Message::user("hi")))
775            .await
776            .unwrap();
777
778        let a = {
779            let store = store.clone();
780            tokio::spawn(async move { store.flush("race").await })
781        };
782        let b = {
783            let store = store.clone();
784            tokio::spawn(async move { store.flush("race").await })
785        };
786
787        a.await.unwrap().unwrap();
788        b.await.unwrap().unwrap();
789
790        let entries = store.load_entries("race").await.unwrap();
791        assert_eq!(entries.len(), 1);
792        assert_eq!(entries[0].1.as_message().unwrap().text(), "hi");
793    }
794
795    #[tokio::test]
796    async fn file_store_delete_races_flush_does_not_resurrect() {
797        let dir = tempdir().unwrap();
798        let store = Arc::new(FileSessionStore::new(dir.path().to_path_buf()));
799
800        store
801            .append_entry("del-race", &SessionEntry::message(Message::user("hi")))
802            .await
803            .unwrap();
804
805        let del = {
806            let store = store.clone();
807            tokio::spawn(async move { store.delete("del-race").await })
808        };
809        let flush = {
810            let store = store.clone();
811            tokio::spawn(async move { store.flush("del-race").await })
812        };
813
814        del.await.unwrap().unwrap();
815        flush.await.unwrap().unwrap();
816
817        let path = dir.path().join("del-race.jsonl");
818        assert!(
819            !path.exists(),
820            "session file resurrected after delete via flush race"
821        );
822        assert!(store.load_entries("del-race").await.unwrap().is_empty());
823    }
824}