Skip to main content

oxios_kernel/
state_store.rs

1//! Filesystem-based state store.
2//!
3//! All state is persisted as markdown or JSON files organized
4//! by category. This is the "filesystem" of Oxios.
5
6use anyhow::{Result, bail};
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Deserializer, Serialize, Serializer, de::DeserializeOwned};
9use std::collections::HashMap;
10use std::path::PathBuf;
11use std::sync::Arc;
12use tokio::fs;
13use tokio::io::AsyncWriteExt;
14
15/// Unique identifier for a session.
16#[derive(Debug, Clone, PartialEq, Eq, Hash)]
17pub struct SessionId(pub String);
18
19impl SessionId {
20    /// Creates a new random session ID.
21    pub fn new() -> Self {
22        Self(uuid::Uuid::new_v4().to_string())
23    }
24}
25
26impl Default for SessionId {
27    fn default() -> Self {
28        Self::new()
29    }
30}
31
32impl std::fmt::Display for SessionId {
33    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34        write!(f, "{}", self.0)
35    }
36}
37
38impl Serialize for SessionId {
39    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
40    where
41        S: Serializer,
42    {
43        serializer.serialize_str(&self.0)
44    }
45}
46
47impl<'de> Deserialize<'de> for SessionId {
48    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
49    where
50        D: Deserializer<'de>,
51    {
52        let s = String::deserialize(deserializer)?;
53        Ok(Self(s))
54    }
55}
56
57/// A user message in a session.
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct UserMessage {
60    /// Message content.
61    pub content: String,
62    /// Timestamp when the message was sent.
63    pub timestamp: DateTime<Utc>,
64}
65
66/// An agent response in a session.
67#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct AgentResponse {
69    /// Response content.
70    pub content: String,
71    /// Session ID associated with this response.
72    pub session_id: Option<String>,
73    /// Seed ID used for this response (if any).
74    pub seed_id: Option<String>,
75    /// Phase reached during orchestration.
76    pub phase_reached: Option<String>,
77    /// Whether evaluation passed.
78    pub evaluation_passed: Option<bool>,
79    /// Timestamp when the response was generated.
80    pub timestamp: DateTime<Utc>,
81    /// Index range into `Session::trajectory_steps` for tool calls that
82    /// occurred during this response. `None` when no tool calls were made.
83    /// Used by the Web UI to render per-turn execution timelines.
84    #[serde(skip_serializing_if = "Option::is_none")]
85    pub trajectory_range: Option<TrajectoryRange>,
86}
87
88/// Index range (exclusive end) into `Session::trajectory_steps`.
89#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct TrajectoryRange {
91    /// Start index (inclusive).
92    pub start: usize,
93    /// End index (exclusive).
94    pub end: usize,
95}
96
97/// A single tool execution step recorded in a session (RFC-015).
98///
99/// Persisted alongside the agent response so that the Web UI can render the
100/// execution timeline (tool calls, durations, errors) when the user
101/// re-opens the session later. Mirrors `memory::sona::TrajectoryStep` but
102/// is duplicated here to avoid a kernel-state → memory dependency cycle.
103#[derive(Debug, Clone, Serialize, Deserialize)]
104pub struct TrajectoryStepRecord {
105    /// Name of the tool that was called.
106    pub tool_name: String,
107    /// Tool input arguments (JSON).
108    pub tool_args: serde_json::Value,
109    /// Truncated output (max ~500 chars).
110    pub output_summary: String,
111    /// Wall-clock duration in milliseconds.
112    pub duration_ms: u64,
113    /// Whether the tool returned an error.
114    pub is_error: bool,
115    /// Provider-specific tool call ID (for start/end correlation).
116    pub tool_call_id: String,
117    /// Timestamp when the step started.
118    pub timestamp: DateTime<Utc>,
119}
120
121/// Arbitrary key-value metadata for a session.
122pub type SessionMetadata = std::collections::HashMap<String, serde_json::Value>;
123
124/// A session represents a single user conversation.
125///
126/// Sessions track the full message history and metadata for
127/// a user conversation. They are created per user interaction
128/// and persisted for later retrieval.
129#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct Session {
131    /// Unique session identifier.
132    pub id: SessionId,
133    /// User ID who owns this session.
134    pub user_id: String,
135    /// All user messages in this session.
136    #[serde(default)]
137    pub user_messages: Vec<UserMessage>,
138    /// All agent responses in this session.
139    #[serde(default)]
140    pub agent_responses: Vec<AgentResponse>,
141    /// RFC-015: tool execution trajectory accumulated for this session.
142    /// Appended on each orchestrator run; consumed by the Web UI to render
143    /// the execution timeline when the session is re-opened.
144    #[serde(default, skip_serializing_if = "Vec::is_empty")]
145    pub trajectory_steps: Vec<TrajectoryStepRecord>,
146    /// Currently active seed ID (if any).
147    #[serde(skip_serializing_if = "Option::is_none")]
148    pub active_seed_id: Option<String>,
149    /// Currently active persona ID (for future multi-persona support).
150    #[serde(skip_serializing_if = "Option::is_none")]
151    pub active_persona_id: Option<String>,
152    /// RFC-025: Project this session belongs to (singular, grouping only).
153    /// Set by the sidebar/drag-to-reparent; consumed for Project-tree view.
154    #[serde(default, skip_serializing_if = "Option::is_none")]
155    pub project_id: Option<String>,
156    /// Timestamp when the session was created.
157    pub created_at: DateTime<Utc>,
158    /// Timestamp when the session was last updated.
159    pub updated_at: DateTime<Utc>,
160    /// Arbitrary key-value metadata.
161    #[serde(default)]
162    pub metadata: SessionMetadata,
163}
164
165impl Session {
166    /// Creates a new session for a user.
167    pub fn new(user_id: impl Into<String>) -> Self {
168        let now = Utc::now();
169        Self {
170            id: SessionId::new(),
171            user_id: user_id.into(),
172            user_messages: Vec::new(),
173            agent_responses: Vec::new(),
174            trajectory_steps: Vec::new(),
175            active_seed_id: None,
176            active_persona_id: None,
177            project_id: None,
178            created_at: now,
179            updated_at: now,
180            metadata: SessionMetadata::new(),
181        }
182    }
183
184    /// Creates a session with a specific ID.
185    pub fn with_id(user_id: impl Into<String>, session_id: SessionId) -> Self {
186        let now = Utc::now();
187        Self {
188            id: session_id,
189            user_id: user_id.into(),
190            user_messages: Vec::new(),
191            agent_responses: Vec::new(),
192            trajectory_steps: Vec::new(),
193            active_seed_id: None,
194            active_persona_id: None,
195            project_id: None,
196            created_at: now,
197            updated_at: now,
198            metadata: SessionMetadata::new(),
199        }
200    }
201
202    /// Adds a user message to the session.
203    pub fn add_user_message(&mut self, content: impl Into<String>) {
204        self.user_messages.push(UserMessage {
205            content: content.into(),
206            timestamp: Utc::now(),
207        });
208        self.updated_at = Utc::now();
209    }
210
211    /// Adds an agent response to the session.
212    pub fn add_agent_response(&mut self, response: AgentResponse) {
213        self.agent_responses.push(response);
214        self.updated_at = Utc::now();
215    }
216
217    /// Appends trajectory steps to the session (RFC-015).
218    ///
219    /// Called by the orchestrator after each run so the Web UI can
220    /// re-render the execution timeline when the user re-opens the session.
221    pub fn extend_trajectory(&mut self, steps: Vec<TrajectoryStepRecord>) {
222        if steps.is_empty() {
223            return;
224        }
225        self.trajectory_steps.extend(steps);
226        self.updated_at = Utc::now();
227    }
228
229    /// Returns the trajectory steps recorded in this session.
230    pub fn trajectory(&self) -> &[TrajectoryStepRecord] {
231        &self.trajectory_steps
232    }
233
234    /// Sets the active seed ID.
235    pub fn set_active_seed(&mut self, seed_id: Option<String>) {
236        self.active_seed_id = seed_id;
237        self.updated_at = Utc::now();
238    }
239
240    /// Sets the active persona ID.
241    pub fn set_active_persona(&mut self, persona_id: Option<String>) {
242        self.active_persona_id = persona_id;
243        self.updated_at = Utc::now();
244    }
245
246    /// Sets a metadata value.
247    pub fn set_metadata(&mut self, key: impl Into<String>, value: serde_json::Value) {
248        self.metadata.insert(key.into(), value);
249        self.updated_at = Utc::now();
250    }
251
252    /// Gets a metadata value.
253    pub fn get_metadata(&self, key: &str) -> Option<&serde_json::Value> {
254        self.metadata.get(key)
255    }
256
257    /// Returns the total number of exchanges in this session.
258    pub fn exchange_count(&self) -> usize {
259        self.user_messages.len().min(self.agent_responses.len())
260    }
261
262    /// Returns true if the session is empty (no messages).
263    pub fn is_empty(&self) -> bool {
264        self.user_messages.is_empty()
265    }
266}
267/// A filesystem-based persistent state store.
268///
269/// Files are organized as `<base_path>/<category>/<name>.md` or
270/// `<base_path>/<category>/<name>.json`.
271#[derive(Clone)]
272pub struct StateStore {
273    /// Root directory for all state files.
274    pub base_path: PathBuf,
275    /// Per-session write locks used by [`StateStore::update_session_with`]
276    /// to serialize concurrent load-modify-save cycles on the same session
277    /// (state-area F1). Legacy callers that do their own
278    /// `load_session` → `save_session` without this primitive remain racy
279    /// and should migrate.
280    session_locks: Arc<parking_lot::RwLock<HashMap<String, Arc<tokio::sync::Mutex<()>>>>>,
281}
282
283impl StateStore {
284    /// Creates a new state store, initializing the directory if needed.
285    ///
286    /// # Example
287    ///
288    /// ```no_run
289    /// use oxios_kernel::state_store::StateStore;
290    /// use std::path::PathBuf;
291    ///
292    /// let store = StateStore::new(PathBuf::from("/tmp/oxios-state")).unwrap();
293    /// ```
294    pub fn new(base_path: PathBuf) -> Result<Self> {
295        Ok(Self {
296            base_path,
297            session_locks: Arc::new(parking_lot::RwLock::new(HashMap::new())),
298        })
299    }
300
301    /// Validate that a category name does not contain path traversal.
302    fn validate_category(category: &str) -> Result<()> {
303        if category.contains("..") || category.contains('\\') {
304            bail!("invalid category name: '{category}'");
305        }
306        if category.is_empty()
307            || category.starts_with('/')
308            || category.ends_with('/')
309            || category.contains("//")
310        {
311            bail!("invalid category name: '{category}'");
312        }
313        Ok(())
314    }
315
316    /// Validate that a file name does not contain path traversal.
317    fn validate_name(name: &str) -> Result<()> {
318        if name.contains("..") || name.contains('/') || name.contains('\\') {
319            bail!("invalid file name: '{name}'");
320        }
321        Ok(())
322    }
323
324    /// Durable atomic write: temp file → fsync(file) → rename → fsync(dir).
325    ///
326    /// `rename(2)` only guarantees *metadata* atomicity, not data
327    /// durability: if the data pages haven't been flushed before the
328    /// rename's metadata commit, a crash can surface the new filename
329    /// with stale or zero contents. fsyncing the temp file before the
330    /// rename, and the parent directory after, closes that window. This
331    /// matters because StateStore is the source of truth for sessions,
332    /// agents, and project metadata. (state-area F9.)
333    async fn durable_write(
334        dir: &std::path::Path,
335        target: &std::path::Path,
336        content: &[u8],
337    ) -> Result<()> {
338        let file_name = target
339            .file_name()
340            .and_then(|n| n.to_str())
341            .unwrap_or("state");
342        let temp_path = dir.join(format!(
343            "{file_name}.{}.{}.tmp",
344            std::process::id(),
345            uuid::Uuid::new_v4()
346        ));
347        {
348            let mut file = fs::File::create(&temp_path).await?;
349            file.write_all(content).await?;
350            file.sync_all().await?;
351        }
352        tokio::fs::rename(&temp_path, target).await?;
353        // Best-effort directory fsync; ignore errors (not all platforms
354        // support it, and the file fsync + rename already did the work).
355        if let Ok(dir_file) = fs::File::open(dir).await {
356            let _ = dir_file.sync_all().await;
357        }
358        Ok(())
359    }
360
361    pub async fn save_markdown(&self, category: &str, name: &str, content: &str) -> Result<()> {
362        Self::validate_category(category)?;
363        Self::validate_name(name)?;
364        let dir = self.base_path.join(category);
365        fs::create_dir_all(&dir).await?;
366        let path = dir.join(format!("{name}.md"));
367        Self::durable_write(&dir, &path, content.as_bytes()).await?;
368        Ok(())
369    }
370
371    /// Load a markdown file from the given category.
372    pub async fn load_markdown(&self, category: &str, name: &str) -> Result<Option<String>> {
373        Self::validate_category(category)?;
374        Self::validate_name(name)?;
375        let path = self.base_path.join(category).join(format!("{name}.md"));
376        match fs::read_to_string(&path).await {
377            Ok(content) => Ok(Some(content)),
378            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
379            Err(e) => Err(e.into()),
380        }
381    }
382
383    /// List all markdown files in a category (names without extension).
384    pub async fn list_category(&self, category: &str) -> Result<Vec<String>> {
385        Self::validate_category(category)?;
386        let dir = self.base_path.join(category);
387        if !dir.exists() {
388            return Ok(Vec::new());
389        }
390        let mut entries = fs::read_dir(&dir).await?;
391        let mut names = Vec::new();
392        while let Some(entry) = entries.next_entry().await? {
393            let path = entry.path();
394            if let Some(ext) = path.extension()
395                && (ext == "md" || ext == "json")
396                && let Some(stem) = path.file_stem()
397            {
398                names.push(stem.to_string_lossy().into_owned());
399            }
400        }
401        names.sort();
402        Ok(names)
403    }
404
405    /// Save a serializable value as JSON under the given category.
406    pub async fn save_json<T: Serialize>(
407        &self,
408        category: &str,
409        name: &str,
410        data: &T,
411    ) -> Result<()> {
412        Self::validate_category(category)?;
413        Self::validate_name(name)?;
414        let dir = self.base_path.join(category);
415        fs::create_dir_all(&dir).await?;
416        let path = dir.join(format!("{name}.json"));
417        let content = serde_json::to_string_pretty(data)?;
418        Self::durable_write(&dir, &path, content.as_bytes()).await?;
419        Ok(())
420    }
421
422    /// Load a deserializable value from JSON in the given category.
423    pub async fn load_json<T: DeserializeOwned>(
424        &self,
425        category: &str,
426        name: &str,
427    ) -> Result<Option<T>> {
428        Self::validate_category(category)?;
429        Self::validate_name(name)?;
430        let path = self.base_path.join(category).join(format!("{name}.json"));
431        match fs::read_to_string(&path).await {
432            Ok(content) => Ok(Some(serde_json::from_str(&content)?)),
433            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
434            Err(e) => Err(e.into()),
435        }
436    }
437
438    /// Delete a file from the given category.
439    pub async fn delete_file(&self, category: &str, name: &str) -> Result<bool> {
440        Self::validate_category(category)?;
441        Self::validate_name(name)?;
442        let path = self.base_path.join(category).join(format!("{name}.json"));
443        if path.exists() {
444            tokio::fs::remove_file(path).await?;
445            Ok(true)
446        } else {
447            let path = self.base_path.join(category).join(format!("{name}.md"));
448            if path.exists() {
449                tokio::fs::remove_file(path).await?;
450                Ok(true)
451            } else {
452                Ok(false)
453            }
454        }
455    }
456}
457
458impl std::fmt::Debug for StateStore {
459    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
460        f.debug_struct("StateStore")
461            .field("base_path", &self.base_path)
462            .finish()
463    }
464}
465
466impl StateStore {
467    /// Saves a session to the sessions category.
468    pub async fn save_session(&self, session: &Session) -> Result<()> {
469        self.save_json("sessions", &session.id.0, session).await
470    }
471
472    /// Atomically load, mutate, and save a session under a per-session lock.
473    ///
474    /// This is the safe primitive for read-modify-write on a session: it
475    /// serializes concurrent modifications to the same session, preventing
476    /// the lost-update race that plain `load_session` → `save_session`
477    /// sequences suffer when two callers read the same starting copy and
478    /// the later save clobbers the earlier one (state-area F1).
479    ///
480    /// Returns `Ok(Some(session))` after the mutation+save, or `Ok(None)`
481    /// if the session did not exist.
482    pub async fn update_session_with<F>(
483        &self,
484        session_id: &SessionId,
485        f: F,
486    ) -> Result<Option<Session>>
487    where
488        F: FnOnce(&mut Session) -> Result<()>,
489    {
490        let lock = Self::session_lock(&self.session_locks, &session_id.0);
491        let _guard = lock.lock().await;
492        let mut session = match self.load_session(session_id).await? {
493            Some(s) => s,
494            None => return Ok(None),
495        };
496        f(&mut session)?;
497        self.save_session(&session).await?;
498        Ok(Some(session))
499    }
500
501    /// Get (or lazily create) the per-session mutex. Double-checked under
502    /// the map's read/write guards so concurrent callers for the same
503    /// session share one `Arc<Mutex<()>>`.
504    fn session_lock(
505        map: &parking_lot::RwLock<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
506        session_id: &str,
507    ) -> Arc<tokio::sync::Mutex<()>> {
508        // Fast path: shared read.
509        if let Some(lock) = map.read().get(session_id) {
510            return Arc::clone(lock);
511        }
512        // Slow path: exclusive write, insert if absent.
513        Arc::clone(
514            map.write()
515                .entry(session_id.to_string())
516                .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(()))),
517        )
518    }
519
520    /// Saves a session and then runs pruning if auto_prune is enabled.
521    pub async fn save_session_with_prune(
522        &self,
523        session: &Session,
524        prune_config: &PruneConfig,
525    ) -> Result<()> {
526        self.save_session(session).await?;
527        // Prune in the background — don't block the response
528        let store = self.clone();
529        let config = prune_config.clone();
530        tokio::spawn(async move {
531            if let Err(e) = store.prune_sessions(&config).await {
532                tracing::warn!(error = %e, "Background session pruning failed");
533            }
534        });
535        Ok(())
536    }
537
538    /// Loads a session by ID.
539    pub async fn load_session(&self, session_id: &SessionId) -> Result<Option<Session>> {
540        self.load_json("sessions", &session_id.0).await
541    }
542
543    /// RFC-025 Phase 5: Load all sessions in full (messages + trajectories).
544    ///
545    /// Used by the Mount auto-promotion scanner, which needs trajectory
546    /// tool_args to identify paths the agent actually worked on. Cheaper to
547    /// call once per scan than `load_session` per id.
548    ///
549    /// **Warning:** loads every session on disk. Prefer
550    /// [`load_sessions_for_promotion`](Self::load_sessions_for_promotion)
551    /// for the promotion scanner to bound memory usage.
552    pub async fn load_all_sessions(&self) -> Result<Vec<Session>> {
553        let mut sessions = Vec::new();
554        if let Ok(names) = self.list_category("sessions").await {
555            for name in names {
556                if let Ok(Some(session)) = self.load_json::<Session>("sessions", &name).await {
557                    sessions.push(session);
558                }
559            }
560        }
561        Ok(sessions)
562    }
563
564    /// RFC-025 Phase 5: Load only sessions updated at or after `since`.
565    ///
566    /// Bounds memory usage for the Mount auto-promotion scanner: instead of
567    /// deserializing every session into a single `Vec`, we deserialize each
568    /// one and skip it immediately if its `updated_at` predates the cutoff.
569    /// The promotion window is bounded by `PromotionConfig::window_days`, so
570    /// loading older sessions is pure waste.
571    ///
572    /// Sessions that fail to deserialize are skipped (best effort), mirroring
573    /// [`load_all_sessions`](Self::load_all_sessions).
574    pub async fn load_sessions_for_promotion(&self, since: DateTime<Utc>) -> Result<Vec<Session>> {
575        let mut sessions = Vec::new();
576        if let Ok(names) = self.list_category("sessions").await {
577            for name in names {
578                if let Ok(Some(session)) = self.load_json::<Session>("sessions", &name).await {
579                    // Skip sessions outside the promotion window *before*
580                    // collecting — this is the whole point of the cutoff.
581                    if session.updated_at < since {
582                        continue;
583                    }
584                    sessions.push(session);
585                }
586            }
587        }
588        Ok(sessions)
589    }
590
591    /// Lists all sessions (sorted by updated_at descending).
592    pub async fn list_sessions(&self) -> Result<Vec<SessionSummary>> {
593        let mut sessions = Vec::new();
594
595        if let Ok(names) = self.list_category("sessions").await {
596            for name in names {
597                if let Ok(Some(session)) = self.load_json::<Session>("sessions", &name).await {
598                    sessions.push(SessionSummary {
599                        id: session.id.0.clone(),
600                        user_id: session.user_id.clone(),
601                        message_count: session.user_messages.len(),
602                        title: session
603                            .metadata
604                            .get("title")
605                            .and_then(|v| v.as_str())
606                            .map(String::from)
607                            .or_else(|| {
608                                // Auto-generate from first user message
609                                session.user_messages.first().map(|m| {
610                                    let s = m.content.lines().next().unwrap_or("");
611                                    if s.len() > 60 {
612                                        format!("{}…", &s[..s.ceil_char_boundary(59)])
613                                    } else {
614                                        s.to_string()
615                                    }
616                                })
617                            }),
618                        active_seed_id: session.active_seed_id.clone(),
619                        project_id: session
620                            .project_id
621                            .clone()
622                            // Backward-compat: fall back to legacy metadata keys.
623                            .or_else(|| {
624                                session
625                                    .metadata
626                                    .get("project_id")
627                                    .and_then(|v| v.as_str())
628                                    .map(String::from)
629                            })
630                            .or_else(|| {
631                                session
632                                    .metadata
633                                    .get("project_ids")
634                                    .and_then(|v| v.as_str())
635                                    .and_then(|s| s.split(',').next().map(String::from))
636                            }),
637                        created_at: session.created_at,
638                        updated_at: session.updated_at,
639                    });
640                }
641            }
642        }
643
644        // Sort by updated_at descending (most recent first)
645        sessions.sort_by_key(|b| std::cmp::Reverse(b.updated_at));
646        Ok(sessions)
647    }
648
649    /// Deletes a session by ID.
650    pub async fn delete_session(&self, session_id: &SessionId) -> Result<bool> {
651        let path = self
652            .base_path
653            .join("sessions")
654            .join(format!("{}.json", session_id.0));
655        match fs::remove_file(&path).await {
656            Ok(()) => {
657                tracing::info!(session_id = %session_id, "Session deleted");
658                Ok(true)
659            }
660            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
661            Err(e) => Err(e.into()),
662        }
663    }
664
665    /// Gets or creates a session for a user, initializing with the given session ID.
666    pub async fn get_or_create_session(
667        &self,
668        user_id: &str,
669        session_id: Option<&SessionId>,
670    ) -> Result<Session> {
671        if let Some(sid) = session_id
672            && let Some(existing) = self.load_session(sid).await?
673        {
674            return Ok(existing);
675        }
676
677        // Create new session
678        let session = match session_id {
679            Some(sid) => Session::with_id(user_id, sid.clone()),
680            None => Session::new(user_id),
681        };
682
683        self.save_session(&session).await?;
684        Ok(session)
685    }
686
687    /// Updates an existing session, saving it to disk.
688    pub async fn update_session(&self, session: &Session) -> Result<()> {
689        self.save_session(session).await
690    }
691
692    /// RFC-025: Move a session to a different Project (drag-to-reparent).
693    ///
694    /// Pass `None` to unassign (move to "unfiled").
695    pub async fn move_session_to_project(
696        &self,
697        session_id: &SessionId,
698        project_id: Option<&str>,
699    ) -> Result<bool> {
700        // Use update_session_with so concurrent modifications to the same
701        // session can't lose this project reassignment (state-area F1).
702        let project_id_owned = project_id.map(String::from);
703        let updated = self
704            .update_session_with(session_id, |session| {
705                session.project_id = project_id_owned;
706                session.updated_at = Utc::now();
707                Ok(())
708            })
709            .await?;
710        Ok(updated.is_some())
711    }
712
713    /// Prune sessions based on configuration.
714    ///
715    /// Removes sessions that exceed TTL or exceed the maximum count.
716    /// Returns the number of sessions pruned.
717    pub async fn prune_sessions(&self, config: &PruneConfig) -> Result<usize> {
718        let mut sessions = self.list_sessions().await?;
719        let mut pruned = 0;
720
721        // TTL-based pruning: remove sessions older than ttl_hours
722        if config.ttl_hours > 0 {
723            let cutoff = Utc::now() - chrono::Duration::hours(config.ttl_hours as i64);
724            let to_prune_ttl: std::collections::HashSet<String> = sessions
725                .iter()
726                .filter(|s| s.updated_at < cutoff)
727                .map(|s| s.id.clone())
728                .collect();
729
730            for id in &to_prune_ttl {
731                let sid = SessionId(id.clone());
732                if self.delete_session(&sid).await.is_ok() {
733                    pruned += 1;
734                }
735            }
736
737            // Remove pruned sessions from the list for count-based pruning
738            sessions.retain(|s| !to_prune_ttl.contains(&s.id));
739        }
740
741        // Count-based pruning: keep only the most recent `max_sessions`
742        if config.max_sessions > 0 && sessions.len() > config.max_sessions {
743            // sessions are already sorted by updated_at descending
744            let excess = sessions.len() - config.max_sessions;
745            for session in sessions.into_iter().rev().take(excess) {
746                let sid = SessionId(session.id);
747                if self.delete_session(&sid).await.is_ok() {
748                    pruned += 1;
749                }
750            }
751        }
752
753        if pruned > 0 {
754            tracing::info!(pruned = pruned, "Session pruning completed");
755        }
756
757        Ok(pruned)
758    }
759
760    /// Prune agent records based on config.
761    ///
762    /// 1. TTL-based: delete agents with created_at older than ttl_hours.
763    /// 2. Count-based: if still over max_entries, delete oldest.
764    pub async fn prune_agents_by_config(
765        &self,
766        max_entries: usize,
767        ttl_hours: u64,
768        batch_size: usize,
769    ) -> Result<usize> {
770        let mut pruned = 0usize;
771
772        let names = self.list_category("agents").await?;
773        if names.is_empty() {
774            return Ok(0);
775        }
776
777        let now = Utc::now();
778
779        // 1. TTL-based pruning
780        let mut remaining: Vec<(String, DateTime<Utc>)> = Vec::with_capacity(names.len());
781
782        if ttl_hours > 0 {
783            let cutoff = now - chrono::Duration::hours(ttl_hours as i64);
784            for name in &names {
785                // Load just enough to check created_at
786                if let Ok(Some(info)) = self
787                    .load_json::<crate::types::AgentInfo>("agents", name)
788                    .await
789                {
790                    if info.created_at < cutoff {
791                        if self.delete_file("agents", name).await.unwrap_or(false) {
792                            pruned += 1;
793                        }
794                    } else {
795                        remaining.push((name.clone(), info.created_at));
796                    }
797                }
798            }
799        } else {
800            // Load all created_at for count-based pruning
801            for name in &names {
802                if let Ok(Some(info)) = self
803                    .load_json::<crate::types::AgentInfo>("agents", name)
804                    .await
805                {
806                    remaining.push((name.clone(), info.created_at));
807                }
808            }
809        }
810
811        // 2. Count-based pruning
812        if max_entries > 0 && remaining.len() > max_entries {
813            // Sort by created_at ascending (oldest first)
814            remaining.sort_by_key(|a| a.1);
815
816            let excess = remaining.len() - max_entries;
817            let to_delete = excess.min(batch_size);
818
819            for (name, _) in remaining.iter().take(to_delete) {
820                if self.delete_file("agents", name).await.unwrap_or(false) {
821                    pruned += 1;
822                }
823            }
824        }
825
826        if pruned > 0 {
827            tracing::info!(pruned = pruned, "Agent filesystem pruning completed");
828        }
829
830        Ok(pruned)
831    }
832}
833
834/// Summary of a session for listing (without full message history).
835#[derive(Debug, Clone, Serialize, Deserialize)]
836pub struct SessionSummary {
837    /// Session ID.
838    pub id: String,
839    /// User ID who owns this session.
840    pub user_id: String,
841    /// Number of messages in this session.
842    pub message_count: usize,
843    /// Auto-generated title for this session. Derived from the first user
844    /// message (truncated to ~60 chars) when not explicitly set.
845    #[serde(skip_serializing_if = "Option::is_none")]
846    pub title: Option<String>,
847    /// Active seed ID if any.
848    #[serde(skip_serializing_if = "Option::is_none")]
849    pub active_seed_id: Option<String>,
850    /// Active project ID(s) this session belongs to.
851    #[serde(skip_serializing_if = "Option::is_none")]
852    pub project_id: Option<String>,
853    /// When the session was created.
854    pub created_at: DateTime<Utc>,
855    /// When the session was last updated.
856    pub updated_at: DateTime<Utc>,
857}
858
859/// Configuration for session pruning.
860#[derive(Debug, Clone)]
861pub struct PruneConfig {
862    /// Maximum number of sessions to keep. 0 = unlimited.
863    pub max_sessions: usize,
864    /// TTL in hours. Sessions older than this are pruned. 0 = no TTL.
865    pub ttl_hours: u64,
866}
867
868impl Default for PruneConfig {
869    fn default() -> Self {
870        Self {
871            max_sessions: 100,
872            ttl_hours: 168, // 7 days
873        }
874    }
875}
876
877/// Tracks the last time a prune was performed, enabling cooldown.
878pub struct PruneThrottle {
879    /// Instant of the last prune (monotonic).
880    last_prune: std::sync::Mutex<Option<std::time::Instant>>,
881    /// Minimum seconds between prune runs.
882    cooldown_secs: u64,
883}
884
885impl PruneThrottle {
886    /// Create a new throttle with the given cooldown.
887    pub fn new(cooldown_secs: u64) -> Self {
888        Self {
889            last_prune: std::sync::Mutex::new(None),
890            cooldown_secs,
891        }
892    }
893
894    /// Check if enough time has elapsed since the last prune.
895    /// Returns `true` if prune should proceed.
896    pub fn should_prune(&self) -> bool {
897        // std::sync::Mutex can poison if a holder panics; recover here by
898        // taking the inner value so pruning continues.
899        let mut guard = self.last_prune.lock().unwrap_or_else(|e| {
900            tracing::warn!("PruneThrottle mutex poisoned, recovering: {e}");
901            e.into_inner()
902        });
903        let now = std::time::Instant::now();
904        match *guard {
905            Some(last) => {
906                if now.duration_since(last).as_secs() >= self.cooldown_secs {
907                    *guard = Some(now);
908                    true
909                } else {
910                    false
911                }
912            }
913            None => {
914                *guard = Some(now);
915                true
916            }
917        }
918    }
919}
920
921#[cfg(test)]
922mod tests {
923    use super::*;
924    #[tokio::test]
925    async fn test_session_creation_and_persistence() {
926        let temp_dir = tempfile::tempdir().unwrap();
927        let store = StateStore::new(temp_dir.path().to_path_buf()).unwrap();
928
929        // Create a session
930        let mut session = Session::new("user-123");
931        session.add_user_message("Hello");
932
933        // Save and load
934        store.save_session(&session).await.unwrap();
935        let loaded = store.load_session(&session.id).await.unwrap();
936        assert!(loaded.is_some());
937        let loaded = loaded.unwrap();
938        assert_eq!(loaded.user_id, "user-123");
939        assert_eq!(loaded.user_messages.len(), 1);
940    }
941
942    #[tokio::test]
943    async fn test_session_list_sorts_by_updated() {
944        let temp_dir = tempfile::tempdir().unwrap();
945        let store = StateStore::new(temp_dir.path().to_path_buf()).unwrap();
946
947        // Create multiple sessions
948        for i in 0..3 {
949            let mut session = Session::new(format!("user-{}", i));
950            session.add_user_message(format!("Message {}", i));
951            store.save_session(&session).await.unwrap();
952            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
953        }
954
955        let sessions = store.list_sessions().await.unwrap();
956        assert_eq!(sessions.len(), 3);
957        // Most recently updated should be first
958        assert_eq!(sessions[0].user_id, "user-2");
959    }
960
961    #[tokio::test]
962    async fn test_delete_session() {
963        let temp_dir = tempfile::tempdir().unwrap();
964        let store = StateStore::new(temp_dir.path().to_path_buf()).unwrap();
965
966        let session = Session::new("user-123");
967        store.save_session(&session).await.unwrap();
968
969        // Delete and verify
970        let deleted = store.delete_session(&session.id).await.unwrap();
971        assert!(deleted);
972
973        let loaded = store.load_session(&session.id).await.unwrap();
974        assert!(loaded.is_none());
975    }
976
977    #[tokio::test]
978    async fn test_get_or_create_session_existing() {
979        let temp_dir = tempfile::tempdir().unwrap();
980        let store = StateStore::new(temp_dir.path().to_path_buf()).unwrap();
981
982        let mut existing = Session::new("user-123");
983        existing.add_user_message("Original message");
984        store.save_session(&existing).await.unwrap();
985
986        // Get or create with same ID should return existing
987        let retrieved = store
988            .get_or_create_session("user-123", Some(&existing.id))
989            .await
990            .unwrap();
991        assert_eq!(retrieved.id, existing.id);
992        assert_eq!(retrieved.user_messages.len(), 1);
993    }
994
995    #[tokio::test]
996    async fn test_get_or_create_session_new() {
997        let temp_dir = tempfile::tempdir().unwrap();
998        let store = StateStore::new(temp_dir.path().to_path_buf()).unwrap();
999
1000        // Get or create without existing session should create new
1001        let session = store.get_or_create_session("user-456", None).await.unwrap();
1002        assert_eq!(session.user_id, "user-456");
1003        assert!(session.user_messages.is_empty());
1004    }
1005
1006    #[tokio::test]
1007    async fn test_prune_sessions_by_count() {
1008        let temp_dir = tempfile::tempdir().unwrap();
1009        let store = StateStore::new(temp_dir.path().to_path_buf()).unwrap();
1010
1011        // Create 5 sessions
1012        for i in 0..5 {
1013            let mut session = Session::new(format!("user-{}", i));
1014            session.add_user_message(format!("Message {}", i));
1015            store.save_session(&session).await.unwrap();
1016            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1017        }
1018
1019        // Prune to max 3
1020        let config = PruneConfig {
1021            max_sessions: 3,
1022            ttl_hours: 0,
1023        };
1024        let pruned = store.prune_sessions(&config).await.unwrap();
1025        assert_eq!(pruned, 2);
1026
1027        let remaining = store.list_sessions().await.unwrap();
1028        assert_eq!(remaining.len(), 3);
1029        // Oldest sessions (user-0, user-1) should be pruned
1030        let remaining_ids: Vec<&str> = remaining.iter().map(|s| s.user_id.as_str()).collect();
1031        assert!(remaining_ids.contains(&"user-2"));
1032        assert!(remaining_ids.contains(&"user-3"));
1033        assert!(remaining_ids.contains(&"user-4"));
1034    }
1035
1036    #[tokio::test]
1037    async fn test_prune_sessions_by_ttl() {
1038        let temp_dir = tempfile::tempdir().unwrap();
1039        let store = StateStore::new(temp_dir.path().to_path_buf()).unwrap();
1040
1041        // Create a session and manually set updated_at to the past
1042        let mut old_session = Session::new("old-user");
1043        old_session.updated_at = Utc::now() - chrono::Duration::hours(48);
1044        store.save_session(&old_session).await.unwrap();
1045
1046        // Create a recent session
1047        let mut recent_session = Session::new("recent-user");
1048        recent_session.add_user_message("Hello");
1049        store.save_session(&recent_session).await.unwrap();
1050
1051        // Prune with 24-hour TTL
1052        let config = PruneConfig {
1053            max_sessions: 0,
1054            ttl_hours: 24,
1055        };
1056        let pruned = store.prune_sessions(&config).await.unwrap();
1057        assert_eq!(pruned, 1);
1058
1059        let remaining = store.list_sessions().await.unwrap();
1060        assert_eq!(remaining.len(), 1);
1061        assert_eq!(remaining[0].user_id, "recent-user");
1062    }
1063
1064    #[tokio::test]
1065    async fn test_load_sessions_for_promotion_filters_by_cutoff() {
1066        // Promo-1: sessions older than the cutoff must be skipped before
1067        // being collected, bounding memory for the promotion scanner.
1068        let temp_dir = tempfile::tempdir().unwrap();
1069        let store = StateStore::new(temp_dir.path().to_path_buf()).unwrap();
1070
1071        // Old session — updated 48h ago.
1072        let mut old_session = Session::new("old-user");
1073        old_session.updated_at = Utc::now() - chrono::Duration::hours(48);
1074        store.save_session(&old_session).await.unwrap();
1075
1076        // Recent session — updated now.
1077        let recent_session = Session::new("recent-user");
1078        store.save_session(&recent_session).await.unwrap();
1079
1080        // Cutoff 24h ago: only the recent session should be loaded.
1081        let cutoff = Utc::now() - chrono::Duration::hours(24);
1082        let sessions = store.load_sessions_for_promotion(cutoff).await.unwrap();
1083        assert_eq!(sessions.len(), 1, "old session must be filtered out");
1084        assert_eq!(sessions[0].user_id, "recent-user");
1085
1086        // A cutoff far in the future returns everything (boundary check).
1087        let far_cutoff = Utc::now() - chrono::Duration::days(365);
1088        let all = store.load_sessions_for_promotion(far_cutoff).await.unwrap();
1089        assert_eq!(all.len(), 2);
1090    }
1091}