1use std::any::Any;
2use std::collections::HashMap;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use tokio::sync::Mutex;
8
9use crate::error::AgentError;
10use crate::message::{Message, Role};
11use crate::session::entry::{deterministic_entry_id, new_entry_id, EntryId, SessionEntry};
12use crate::Result;
13
14pub use crate::session::meta::SessionMeta;
15
16#[async_trait]
18pub trait SessionStore: Send + Sync {
19 async fn append(&self, session_id: &str, message: &Message) -> Result<()>;
20 async fn flush(&self, session_id: &str) -> Result<()>;
21 async fn load(&self, session_id: &str) -> Result<Vec<Message>>;
22 async fn delete(&self, session_id: &str) -> Result<()>;
23 async fn list(&self) -> Result<Vec<String>>;
24 fn as_any(&self) -> &dyn Any;
25
26 async fn append_entry(&self, session_id: &str, entry: &SessionEntry) -> Result<EntryId> {
27 match entry {
28 SessionEntry::Message { message } => {
29 self.append(session_id, message).await?;
30 Ok(new_entry_id())
31 }
32 SessionEntry::Custom { .. } => Err(AgentError::Session(
33 "custom entries not supported by this SessionStore implementation; override append_entry() to enable".into(),
34 )),
35 }
36 }
37
38 async fn load_entries(&self, session_id: &str) -> Result<Vec<(EntryId, SessionEntry)>> {
39 let messages = self.load(session_id).await?;
40 Ok(messages
41 .into_iter()
42 .enumerate()
43 .map(|(i, m)| {
44 let content = serde_json::to_string(&m).unwrap_or_default();
45 (
46 deterministic_entry_id(i, &content),
47 SessionEntry::message(m),
48 )
49 })
50 .collect())
51 }
52
53 async fn load_meta(&self, _session_id: &str) -> Result<Option<SessionMeta>> {
54 Ok(None)
55 }
56
57 async fn update_meta(&self, _session_id: &str, _meta: &SessionMeta) -> Result<()> {
58 Err(AgentError::Session(
59 "meta updates not supported by this SessionStore implementation".into(),
60 ))
61 }
62
63 async fn list_meta(&self) -> Result<Vec<SessionMeta>> {
64 let ids = self.list().await?;
65 let mut out = Vec::with_capacity(ids.len());
66 for id in ids {
67 if let Some(meta) = self.load_meta(&id).await? {
68 out.push(meta);
69 }
70 }
71 Ok(out)
72 }
73}
74
75type SessionEntryLog = Vec<(EntryId, SessionEntry)>;
76type SessionEntryMap = HashMap<String, SessionEntryLog>;
77
78#[derive(Default)]
79pub struct MemorySessionStore {
80 entries: Arc<Mutex<SessionEntryMap>>,
81 metas: Arc<Mutex<HashMap<String, SessionMeta>>>,
82}
83
84impl MemorySessionStore {
85 pub fn new() -> Self {
86 Self::default()
87 }
88}
89
90#[async_trait]
91impl SessionStore for MemorySessionStore {
92 async fn append(&self, session_id: &str, message: &Message) -> Result<()> {
93 let mut entries = self.entries.lock().await;
94 entries
95 .entry(session_id.to_string())
96 .or_default()
97 .push((new_entry_id(), SessionEntry::message(message.clone())));
98 drop(entries);
99 self.refresh_memory_meta(session_id).await?;
100 Ok(())
101 }
102
103 async fn flush(&self, _session_id: &str) -> Result<()> {
104 Ok(())
105 }
106
107 async fn load(&self, session_id: &str) -> Result<Vec<Message>> {
108 let entries = self.entries.lock().await;
109 Ok(entries
110 .get(session_id)
111 .map(|log| {
112 log.iter()
113 .filter_map(|(_, e)| e.as_message().cloned())
114 .collect()
115 })
116 .unwrap_or_default())
117 }
118
119 async fn delete(&self, session_id: &str) -> Result<()> {
120 let mut entries = self.entries.lock().await;
121 entries.remove(session_id);
122 let mut metas = self.metas.lock().await;
123 metas.remove(session_id);
124 Ok(())
125 }
126
127 async fn list(&self) -> Result<Vec<String>> {
128 let entries = self.entries.lock().await;
129 let metas = self.metas.lock().await;
130 let mut ids: Vec<String> = entries.keys().chain(metas.keys()).cloned().collect();
131 ids.sort();
132 ids.dedup();
133 Ok(ids)
134 }
135
136 fn as_any(&self) -> &dyn Any {
137 self
138 }
139
140 async fn append_entry(&self, session_id: &str, entry: &SessionEntry) -> Result<EntryId> {
141 let mut entries = self.entries.lock().await;
142 let id = new_entry_id();
143 entries
144 .entry(session_id.to_string())
145 .or_default()
146 .push((id.clone(), entry.clone()));
147 drop(entries);
148 self.refresh_memory_meta(session_id).await?;
149 Ok(id)
150 }
151
152 async fn load_entries(&self, session_id: &str) -> Result<Vec<(EntryId, SessionEntry)>> {
153 let entries = self.entries.lock().await;
154 Ok(entries.get(session_id).cloned().unwrap_or_default())
155 }
156
157 async fn load_meta(&self, session_id: &str) -> Result<Option<SessionMeta>> {
158 let metas = self.metas.lock().await;
159 Ok(metas.get(session_id).cloned())
160 }
161
162 async fn update_meta(&self, session_id: &str, meta: &SessionMeta) -> Result<()> {
163 let mut metas = self.metas.lock().await;
164 metas.insert(session_id.to_string(), meta.clone());
165 Ok(())
166 }
167
168 async fn list_meta(&self) -> Result<Vec<SessionMeta>> {
169 let metas = self.metas.lock().await;
170 let mut out: Vec<SessionMeta> = metas.values().cloned().collect();
171 out.sort_by(|a, b| a.session_id.cmp(&b.session_id));
172 Ok(out)
173 }
174}
175
176impl MemorySessionStore {
177 async fn refresh_memory_meta(&self, session_id: &str) -> Result<()> {
178 let entries = self.load_entries(session_id).await?;
179 let mut metas = self.metas.lock().await;
180 let existing = metas.get(session_id).cloned();
181 let now = now_ms();
182 let meta = synthesize_meta(session_id, &entries, existing.as_ref(), now);
183 metas.insert(session_id.to_string(), meta);
184 Ok(())
185 }
186}
187
188#[derive(Default)]
189struct FileStoreState {
190 buffers: HashMap<String, Vec<(EntryId, SessionEntry)>>,
191}
192
193#[derive(Default)]
194struct FileStoreLocks {
195 sessions: HashMap<String, Arc<Mutex<()>>>,
199}
200
201pub struct FileSessionStore {
202 base_dir: PathBuf,
203 state: Arc<Mutex<FileStoreState>>,
204 locks: Arc<Mutex<FileStoreLocks>>,
205 flush_task: Option<tokio::task::JoinHandle<()>>,
206}
207
208impl FileSessionStore {
209 pub fn new(base_dir: impl Into<PathBuf>) -> Self {
211 let base_dir = base_dir.into();
212 let state = Arc::new(Mutex::new(FileStoreState::default()));
213 let locks = Arc::new(Mutex::new(FileStoreLocks::default()));
214 let state_clone = state.clone();
215 let locks_clone = locks.clone();
216 let base_dir_clone = base_dir.clone();
217
218 let flush_task = tokio::runtime::Handle::try_current().ok().map(|handle| {
219 handle.spawn(async move {
220 let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
221 loop {
222 interval.tick().await;
223 let session_ids = {
224 let state = state_clone.lock().await;
225 state
226 .buffers
227 .iter()
228 .filter(|(_, entries)| !entries.is_empty())
229 .map(|(id, _)| id.clone())
230 .collect::<Vec<_>>()
231 };
232 for session_id in session_ids {
233 let _ =
234 flush_session(&base_dir_clone, &state_clone, &locks_clone, &session_id)
235 .await;
236 }
237 }
238 })
239 });
240
241 Self {
242 base_dir,
243 state,
244 locks,
245 flush_task,
246 }
247 }
248
249 fn session_path(&self, session_id: &str) -> Result<PathBuf> {
250 validate_session_id(session_id)?;
251 Ok(self.base_dir.join(format!("{session_id}.jsonl")))
252 }
253
254 pub async fn load_meta(&self, session_id: &str) -> Result<Option<SessionMeta>> {
256 <Self as SessionStore>::load_meta(self, session_id).await
257 }
258}
259
260fn validate_session_id(session_id: &str) -> Result<()> {
261 use std::path::{Component, Path};
262
263 if session_id.is_empty() {
264 return Err(AgentError::Session("session id must not be empty".into()));
265 }
266
267 let mut components = Path::new(session_id).components();
268 match (components.next(), components.next()) {
269 (Some(Component::Normal(_)), None) if session_id != "." && session_id != ".." => Ok(()),
270 _ => Err(AgentError::Session(format!(
271 "invalid session id '{session_id}': must be a single path-safe file stem"
272 ))),
273 }
274}
275
276impl Drop for FileSessionStore {
277 fn drop(&mut self) {
278 if let Some(task) = &self.flush_task {
279 task.abort();
280 }
281 }
282}
283
284#[async_trait]
285impl SessionStore for FileSessionStore {
286 async fn append(&self, session_id: &str, message: &Message) -> Result<()> {
287 let entry = SessionEntry::message(message.clone());
288 let _ = self.append_entry(session_id, &entry).await?;
289 Ok(())
290 }
291
292 async fn flush(&self, session_id: &str) -> Result<()> {
293 validate_session_id(session_id)?;
294 flush_session(&self.base_dir, &self.state, &self.locks, session_id).await
295 }
296
297 async fn load(&self, session_id: &str) -> Result<Vec<Message>> {
298 let path = self.session_path(session_id)?;
299 let entries = load_entries_from_path(&path).await?;
300 Ok(entries
301 .into_iter()
302 .filter_map(|(_, e)| e.as_message().cloned())
303 .collect())
304 }
305
306 async fn delete(&self, session_id: &str) -> Result<()> {
307 let path = self.session_path(session_id)?;
308 let session_lock = session_lock(&self.locks, session_id).await;
309 let _guard = session_lock.lock().await;
310 {
311 let mut state = self.state.lock().await;
312 state.buffers.remove(session_id);
313 }
314 match tokio::fs::remove_file(path).await {
315 Ok(()) => Ok(()),
316 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
317 Err(err) => Err(err.into()),
318 }
319 }
320
321 async fn list(&self) -> Result<Vec<String>> {
322 if !self.base_dir.exists() {
323 return Ok(Vec::new());
324 }
325 let mut out = Vec::new();
326 let mut entries = tokio::fs::read_dir(&self.base_dir).await?;
327 while let Some(entry) = entries.next_entry().await? {
328 let path = entry.path();
329 if path.extension().and_then(|ext| ext.to_str()) != Some("jsonl") {
330 continue;
331 }
332 if let Some(stem) = path.file_stem().and_then(|stem| stem.to_str()) {
333 out.push(stem.to_string());
334 }
335 }
336 out.sort();
337 Ok(out)
338 }
339
340 fn as_any(&self) -> &dyn Any {
341 self
342 }
343
344 async fn append_entry(&self, session_id: &str, entry: &SessionEntry) -> Result<EntryId> {
345 validate_session_id(session_id)?;
346 let id = new_entry_id();
347 let should_flush = {
348 let mut state = self.state.lock().await;
349 let buffer = state.buffers.entry(session_id.to_string()).or_default();
350 buffer.push((id.clone(), entry.clone()));
351 buffer.len() >= 20
352 };
353 if should_flush {
354 self.flush(session_id).await?;
355 }
356 Ok(id)
357 }
358
359 async fn load_entries(&self, session_id: &str) -> Result<Vec<(EntryId, SessionEntry)>> {
360 let path = self.session_path(session_id)?;
361 let mut existing = load_entries_from_path(&path).await?;
362 let pending = {
363 let state = self.state.lock().await;
364 state.buffers.get(session_id).cloned().unwrap_or_default()
365 };
366 existing.extend(pending);
367 Ok(existing)
368 }
369
370 async fn load_meta(&self, session_id: &str) -> Result<Option<SessionMeta>> {
371 let path = self.session_path(session_id)?;
372 let on_disk = load_meta_from_path(&path).await?;
373 let pending = {
374 let state = self.state.lock().await;
375 state.buffers.get(session_id).cloned().unwrap_or_default()
376 };
377 if pending.is_empty() {
378 return Ok(on_disk);
379 }
380 let mut entries = load_entries_from_path(&path).await?;
381 entries.extend(pending);
382 Ok(Some(synthesize_meta(
383 session_id,
384 &entries,
385 on_disk.as_ref(),
386 now_ms(),
387 )))
388 }
389
390 async fn update_meta(&self, session_id: &str, meta: &SessionMeta) -> Result<()> {
391 let path = self.session_path(session_id)?;
392 let session_lock = session_lock(&self.locks, session_id).await;
393 let _guard = session_lock.lock().await;
394 tokio::fs::create_dir_all(&self.base_dir).await?;
395
396 let pending = {
397 let mut state = self.state.lock().await;
398 state.buffers.remove(session_id).unwrap_or_default()
399 };
400
401 let mut existing_entries = load_entries_from_path(&path).await?;
402 existing_entries.extend(pending);
403
404 write_entries_file(&path, meta, &existing_entries).await
405 }
406
407 async fn list_meta(&self) -> Result<Vec<SessionMeta>> {
408 let ids = self.list().await?;
409 let mut out = Vec::with_capacity(ids.len());
410 for id in ids {
411 if let Some(meta) = self.load_meta(&id).await? {
412 out.push(meta);
413 }
414 }
415 out.sort_by_key(|meta| std::cmp::Reverse(meta.updated_at_ms));
416 Ok(out)
417 }
418}
419
420async fn session_lock(locks: &Arc<Mutex<FileStoreLocks>>, session_id: &str) -> Arc<Mutex<()>> {
421 let mut locks = locks.lock().await;
422 locks
423 .sessions
424 .entry(session_id.to_string())
425 .or_insert_with(|| Arc::new(Mutex::new(())))
426 .clone()
427}
428
429async fn flush_session(
430 base_dir: &Path,
431 state: &Arc<Mutex<FileStoreState>>,
432 locks: &Arc<Mutex<FileStoreLocks>>,
433 session_id: &str,
434) -> Result<()> {
435 let session_lock = session_lock(locks, session_id).await;
436 let _guard = session_lock.lock().await;
437
438 let pending = {
439 let mut state = state.lock().await;
440 state.buffers.remove(session_id).unwrap_or_default()
441 };
442
443 if pending.is_empty() {
444 return Ok(());
445 }
446
447 tokio::fs::create_dir_all(base_dir).await?;
448 let path = base_dir.join(format!("{session_id}.jsonl"));
449 let mut existing = load_entries_from_path(&path).await?;
450 existing.extend(pending);
451
452 let existing_meta = load_meta_from_path(&path).await?;
453 let meta = synthesize_meta(session_id, &existing, existing_meta.as_ref(), now_ms());
454 write_entries_file(&path, &meta, &existing).await
455}
456
457async fn write_entries_file(
458 path: &Path,
459 meta: &SessionMeta,
460 entries: &[(EntryId, SessionEntry)],
461) -> Result<()> {
462 let mut lines = Vec::with_capacity(entries.len() + 1);
463 lines.push(serde_json::to_string(meta)?);
464 for (id, entry) in entries {
465 lines.push(serde_json::to_string(&serde_json::json!({
466 "id": id,
467 "entry": entry,
468 }))?);
469 }
470 let payload = format!("{}\n", lines.join("\n"));
471 let tmp_path = path.with_extension("jsonl.tmp");
472 tokio::fs::write(&tmp_path, payload).await?;
473 tokio::fs::rename(&tmp_path, path).await?;
474 Ok(())
475}
476
477fn synthesize_meta(
478 session_id: &str,
479 entries: &[(EntryId, SessionEntry)],
480 existing_meta: Option<&SessionMeta>,
481 now: u64,
482) -> SessionMeta {
483 let first_user = entries
484 .iter()
485 .filter_map(|(_, e)| e.as_message())
486 .find(|m| matches!(m.role(), Role::User))
487 .map(|m| truncate_preview(&m.text()));
488 let entry_count = entries.len();
489 let message_count = entries
490 .iter()
491 .filter(|(_, e)| e.as_message().is_some())
492 .count();
493
494 SessionMeta {
495 session_id: session_id.to_string(),
496 created_at_ms: existing_meta
497 .map(|m| {
498 if m.created_at_ms == 0 {
499 now
500 } else {
501 m.created_at_ms
502 }
503 })
504 .unwrap_or(now),
505 updated_at_ms: now,
506 agent_loop_version: env!("CARGO_PKG_VERSION").to_string(),
507 entry_count,
508 message_count,
509 name: existing_meta.and_then(|m| m.name.clone()),
510 cwd: existing_meta.and_then(|m| m.cwd.clone()),
511 first_user_message: first_user
512 .or_else(|| existing_meta.and_then(|m| m.first_user_message.clone())),
513 extra: existing_meta.map(|m| m.extra.clone()).unwrap_or_default(),
514 }
515}
516
517fn truncate_preview(s: &str) -> String {
518 let char_count = s.chars().count();
519 if char_count > 200 {
520 let preview: String = s.chars().take(197).collect();
521 format!("{preview}...")
522 } else {
523 s.to_string()
524 }
525}
526
527async fn load_meta_from_path(path: &Path) -> Result<Option<SessionMeta>> {
528 if !path.exists() {
529 return Ok(None);
530 }
531 let content = tokio::fs::read_to_string(path).await?;
532 let mut lines = content.lines();
533 let Some(first) = lines.next() else {
534 return Ok(None);
535 };
536 let meta = serde_json::from_str(first)?;
537 Ok(Some(meta))
538}
539
540async fn load_entries_from_path(path: &Path) -> Result<Vec<(EntryId, SessionEntry)>> {
541 if !path.exists() {
542 return Ok(Vec::new());
543 }
544
545 let content = tokio::fs::read_to_string(path).await?;
546 let mut out = Vec::new();
547
548 #[derive(serde::Deserialize)]
549 struct EntryLine {
550 id: String,
551 entry: SessionEntry,
552 }
553
554 for (idx, line) in content.lines().enumerate() {
555 if line.trim().is_empty() {
556 continue;
557 }
558 if idx == 0 && serde_json::from_str::<SessionMeta>(line).is_ok() {
559 continue;
560 }
561 if let Ok(parsed) = serde_json::from_str::<EntryLine>(line) {
562 out.push((parsed.id, parsed.entry));
563 continue;
564 }
565 let msg: Message = serde_json::from_str(line)?;
566 out.push((
567 deterministic_entry_id(idx, line),
568 SessionEntry::message(msg),
569 ));
570 }
571
572 Ok(out)
573}
574
575pub(crate) fn now_ms() -> u64 {
576 std::time::SystemTime::now()
577 .duration_since(std::time::UNIX_EPOCH)
578 .map(|d| d.as_millis() as u64)
579 .unwrap_or_default()
580}
581
582#[cfg(test)]
583mod tests {
584 use super::*;
585 use crate::message::Message;
586 use tempfile::tempdir;
587
588 #[tokio::test]
589 async fn default_append_entry_rejects_custom_for_legacy_impl() {
590 struct LegacyStore;
591 #[async_trait]
592 impl SessionStore for LegacyStore {
593 async fn append(&self, _session_id: &str, _message: &Message) -> Result<()> {
594 Ok(())
595 }
596 async fn flush(&self, _session_id: &str) -> Result<()> {
597 Ok(())
598 }
599 async fn load(&self, _session_id: &str) -> Result<Vec<Message>> {
600 Ok(Vec::new())
601 }
602 async fn delete(&self, _session_id: &str) -> Result<()> {
603 Ok(())
604 }
605 async fn list(&self) -> Result<Vec<String>> {
606 Ok(Vec::new())
607 }
608 fn as_any(&self) -> &dyn Any {
609 self
610 }
611 }
612
613 let store = LegacyStore;
614 let err = store
615 .append_entry("s1", &SessionEntry::custom("x", serde_json::json!({})))
616 .await
617 .expect_err("custom should error by default");
618 assert!(err.to_string().contains("custom entries not supported"));
619 }
620
621 #[tokio::test]
622 async fn memory_store_round_trips_custom_entries() {
623 let store = MemorySessionStore::new();
624 let _ = store
625 .append_entry("s1", &SessionEntry::message(Message::user("hi")))
626 .await
627 .unwrap();
628 let _ = store
629 .append_entry(
630 "s1",
631 &SessionEntry::custom("compaction", serde_json::json!({ "summary": "x" })),
632 )
633 .await
634 .unwrap();
635 let entries = store.load_entries("s1").await.unwrap();
636 assert_eq!(entries.len(), 2);
637 let messages = store.load("s1").await.unwrap();
638 assert_eq!(messages.len(), 1);
639 assert_eq!(messages[0].text(), "hi");
640 }
641
642 #[tokio::test]
643 async fn file_store_round_trips_custom_entries() {
644 let dir = tempdir().unwrap();
645 let store = FileSessionStore::new(dir.path().to_path_buf());
646
647 let _ = store
648 .append_entry("sess-1", &SessionEntry::message(Message::user("hi")))
649 .await
650 .unwrap();
651 let _ = store
652 .append_entry(
653 "sess-1",
654 &SessionEntry::custom(
655 "compaction",
656 serde_json::json!({ "summary": "earlier stuff" }),
657 ),
658 )
659 .await
660 .unwrap();
661 store.flush("sess-1").await.unwrap();
662
663 let entries = store.load_entries("sess-1").await.unwrap();
664 assert_eq!(entries.len(), 2);
665 assert_eq!(entries[0].1.as_message().unwrap().text(), "hi");
666 assert_eq!(entries[1].1.custom_kind(), Some("compaction"));
667 }
668
669 #[tokio::test]
670 async fn file_store_loads_0_14_era_format() {
671 let dir = tempdir().unwrap();
672 let path = dir.path().join("legacy.jsonl");
673 let lines = [
674 serde_json::json!({
675 "session_id": "legacy",
676 "created_at_ms": 1_700_000_000_000u64,
677 "agent_loop_version": "0.14.0",
678 "message_count": 2
679 })
680 .to_string(),
681 serde_json::to_string(&Message::user("legacy user")).unwrap(),
682 serde_json::to_string(&Message::assistant("legacy assistant")).unwrap(),
683 ];
684 tokio::fs::write(&path, format!("{}\n", lines.join("\n")))
685 .await
686 .unwrap();
687
688 let store = FileSessionStore::new(dir.path().to_path_buf());
689 let meta = store.load_meta("legacy").await.unwrap().unwrap();
690 assert_eq!(meta.session_id, "legacy");
691 assert_eq!(meta.message_count, 2);
692 let entries = store.load_entries("legacy").await.unwrap();
693 assert_eq!(entries.len(), 2);
694 assert_eq!(entries[0].1.as_message().unwrap().text(), "legacy user");
695 assert_eq!(entries[0].0, deterministic_entry_id(1, &lines[1]));
696 assert_eq!(entries[1].0, deterministic_entry_id(2, &lines[2]));
697
698 let entries_again = store.load_entries("legacy").await.unwrap();
699 assert_eq!(entries_again[0].0, entries[0].0);
700 assert_eq!(entries_again[1].0, entries[1].0);
701 }
702
703 #[tokio::test]
704 async fn memory_store_append_keeps_meta_in_sync() {
705 let store = MemorySessionStore::new();
706 store.append("s1", &Message::user("hi")).await.unwrap();
707
708 let meta = store.load_meta("s1").await.unwrap().unwrap();
709 assert_eq!(meta.session_id, "s1");
710 assert_eq!(meta.message_count, 1);
711 assert_eq!(meta.entry_count, 1);
712
713 let listed = store.list_meta().await.unwrap();
714 assert_eq!(listed.len(), 1);
715 assert_eq!(listed[0].session_id, "s1");
716 }
717
718 #[tokio::test]
719 async fn file_store_update_meta_does_not_duplicate_pending_entries() {
720 let dir = tempdir().unwrap();
721 let store = FileSessionStore::new(dir.path().to_path_buf());
722
723 store
724 .append_entry("s1", &SessionEntry::message(Message::user("hi")))
725 .await
726 .unwrap();
727
728 let meta = SessionMeta {
729 session_id: "s1".into(),
730 name: Some("demo".into()),
731 ..Default::default()
732 };
733 store.update_meta("s1", &meta).await.unwrap();
734 store.flush("s1").await.unwrap();
735
736 let entries = store.load_entries("s1").await.unwrap();
737 assert_eq!(entries.len(), 1);
738 assert_eq!(entries[0].1.as_message().unwrap().text(), "hi");
739 }
740
741 #[tokio::test]
742 async fn file_store_update_meta_fails_fast_on_invalid_existing_log() {
743 let dir = tempdir().unwrap();
744 let path = dir.path().join("broken.jsonl");
745 tokio::fs::write(&path, "{not-json}\n").await.unwrap();
746
747 let store = FileSessionStore::new(dir.path().to_path_buf());
748 let meta = SessionMeta {
749 session_id: "broken".into(),
750 name: Some("demo".into()),
751 ..Default::default()
752 };
753 assert!(store.update_meta("broken", &meta).await.is_err());
754 }
755
756 #[tokio::test]
757 async fn file_store_rejects_path_traversal_session_id() {
758 let dir = tempdir().unwrap();
759 let store = FileSessionStore::new(dir.path().to_path_buf());
760
761 let err = store
762 .append_entry("../escape", &SessionEntry::message(Message::user("hi")))
763 .await
764 .expect_err("path traversal session id should be rejected");
765 assert!(err.to_string().contains("invalid session id"));
766 }
767
768 #[tokio::test]
769 async fn file_store_concurrent_flushes_do_not_race() {
770 let dir = tempdir().unwrap();
771 let store = Arc::new(FileSessionStore::new(dir.path().to_path_buf()));
772
773 store
774 .append_entry("race", &SessionEntry::message(Message::user("hi")))
775 .await
776 .unwrap();
777
778 let a = {
779 let store = store.clone();
780 tokio::spawn(async move { store.flush("race").await })
781 };
782 let b = {
783 let store = store.clone();
784 tokio::spawn(async move { store.flush("race").await })
785 };
786
787 a.await.unwrap().unwrap();
788 b.await.unwrap().unwrap();
789
790 let entries = store.load_entries("race").await.unwrap();
791 assert_eq!(entries.len(), 1);
792 assert_eq!(entries[0].1.as_message().unwrap().text(), "hi");
793 }
794
795 #[tokio::test]
796 async fn file_store_delete_races_flush_does_not_resurrect() {
797 let dir = tempdir().unwrap();
798 let store = Arc::new(FileSessionStore::new(dir.path().to_path_buf()));
799
800 store
801 .append_entry("del-race", &SessionEntry::message(Message::user("hi")))
802 .await
803 .unwrap();
804
805 let del = {
806 let store = store.clone();
807 tokio::spawn(async move { store.delete("del-race").await })
808 };
809 let flush = {
810 let store = store.clone();
811 tokio::spawn(async move { store.flush("del-race").await })
812 };
813
814 del.await.unwrap().unwrap();
815 flush.await.unwrap().unwrap();
816
817 let path = dir.path().join("del-race.jsonl");
818 assert!(
819 !path.exists(),
820 "session file resurrected after delete via flush race"
821 );
822 assert!(store.load_entries("del-race").await.unwrap().is_empty());
823 }
824}