Skip to main content

oxios_kernel/
state_store.rs

1//! Filesystem-based state store.
2//!
3//! All state is persisted as markdown or JSON files organized
4//! by category. This is the "filesystem" of Oxios.
5
6use anyhow::{Result, bail};
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Deserializer, Serialize, Serializer, de::DeserializeOwned};
9use std::path::PathBuf;
10use tokio::fs;
11
12/// Unique identifier for a session.
13#[derive(Debug, Clone, PartialEq, Eq, Hash)]
14pub struct SessionId(pub String);
15
16impl SessionId {
17    /// Creates a new random session ID.
18    pub fn new() -> Self {
19        Self(uuid::Uuid::new_v4().to_string())
20    }
21}
22
23impl Default for SessionId {
24    fn default() -> Self {
25        Self::new()
26    }
27}
28
29impl std::fmt::Display for SessionId {
30    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31        write!(f, "{}", self.0)
32    }
33}
34
35impl Serialize for SessionId {
36    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
37    where
38        S: Serializer,
39    {
40        serializer.serialize_str(&self.0)
41    }
42}
43
44impl<'de> Deserialize<'de> for SessionId {
45    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
46    where
47        D: Deserializer<'de>,
48    {
49        let s = String::deserialize(deserializer)?;
50        Ok(Self(s))
51    }
52}
53
54/// A user message in a session.
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct UserMessage {
57    /// Message content.
58    pub content: String,
59    /// Timestamp when the message was sent.
60    pub timestamp: DateTime<Utc>,
61}
62
63/// An agent response in a session.
64#[derive(Debug, Clone, Serialize, Deserialize)]
65pub struct AgentResponse {
66    /// Response content.
67    pub content: String,
68    /// Session ID associated with this response.
69    pub session_id: Option<String>,
70    /// Seed ID used for this response (if any).
71    pub seed_id: Option<String>,
72    /// Phase reached during orchestration.
73    pub phase_reached: Option<String>,
74    /// Whether evaluation passed.
75    pub evaluation_passed: Option<bool>,
76    /// Timestamp when the response was generated.
77    pub timestamp: DateTime<Utc>,
78    /// Index range into `Session::trajectory_steps` for tool calls that
79    /// occurred during this response. `None` when no tool calls were made.
80    /// Used by the Web UI to render per-turn execution timelines.
81    #[serde(skip_serializing_if = "Option::is_none")]
82    pub trajectory_range: Option<TrajectoryRange>,
83}
84
85/// Index range (exclusive end) into `Session::trajectory_steps`.
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct TrajectoryRange {
88    /// Start index (inclusive).
89    pub start: usize,
90    /// End index (exclusive).
91    pub end: usize,
92}
93
94/// A single tool execution step recorded in a session (RFC-015).
95///
96/// Persisted alongside the agent response so that the Web UI can render the
97/// execution timeline (tool calls, durations, errors) when the user
98/// re-opens the session later. Mirrors `memory::sona::TrajectoryStep` but
99/// is duplicated here to avoid a kernel-state → memory dependency cycle.
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct TrajectoryStepRecord {
102    /// Name of the tool that was called.
103    pub tool_name: String,
104    /// Tool input arguments (JSON).
105    pub tool_args: serde_json::Value,
106    /// Truncated output (max ~500 chars).
107    pub output_summary: String,
108    /// Wall-clock duration in milliseconds.
109    pub duration_ms: u64,
110    /// Whether the tool returned an error.
111    pub is_error: bool,
112    /// Provider-specific tool call ID (for start/end correlation).
113    pub tool_call_id: String,
114    /// Timestamp when the step started.
115    pub timestamp: DateTime<Utc>,
116}
117
118/// Arbitrary key-value metadata for a session.
119pub type SessionMetadata = std::collections::HashMap<String, serde_json::Value>;
120
121/// A session represents a single user conversation.
122///
123/// Sessions track the full message history and metadata for
124/// a user conversation. They are created per user interaction
125/// and persisted for later retrieval.
126#[derive(Debug, Clone, Serialize, Deserialize)]
127pub struct Session {
128    /// Unique session identifier.
129    pub id: SessionId,
130    /// User ID who owns this session.
131    pub user_id: String,
132    /// All user messages in this session.
133    #[serde(default)]
134    pub user_messages: Vec<UserMessage>,
135    /// All agent responses in this session.
136    #[serde(default)]
137    pub agent_responses: Vec<AgentResponse>,
138    /// RFC-015: tool execution trajectory accumulated for this session.
139    /// Appended on each orchestrator run; consumed by the Web UI to render
140    /// the execution timeline when the session is re-opened.
141    #[serde(default, skip_serializing_if = "Vec::is_empty")]
142    pub trajectory_steps: Vec<TrajectoryStepRecord>,
143    /// Currently active seed ID (if any).
144    #[serde(skip_serializing_if = "Option::is_none")]
145    pub active_seed_id: Option<String>,
146    /// Currently active persona ID (for future multi-persona support).
147    #[serde(skip_serializing_if = "Option::is_none")]
148    pub active_persona_id: Option<String>,
149    /// RFC-025: Project this session belongs to (singular, grouping only).
150    /// Set by the sidebar/drag-to-reparent; consumed for Project-tree view.
151    #[serde(default, skip_serializing_if = "Option::is_none")]
152    pub project_id: Option<String>,
153    /// Timestamp when the session was created.
154    pub created_at: DateTime<Utc>,
155    /// Timestamp when the session was last updated.
156    pub updated_at: DateTime<Utc>,
157    /// Arbitrary key-value metadata.
158    #[serde(default)]
159    pub metadata: SessionMetadata,
160}
161
162impl Session {
163    /// Creates a new session for a user.
164    pub fn new(user_id: impl Into<String>) -> Self {
165        let now = Utc::now();
166        Self {
167            id: SessionId::new(),
168            user_id: user_id.into(),
169            user_messages: Vec::new(),
170            agent_responses: Vec::new(),
171            trajectory_steps: Vec::new(),
172            active_seed_id: None,
173            active_persona_id: None,
174            project_id: None,
175            created_at: now,
176            updated_at: now,
177            metadata: SessionMetadata::new(),
178        }
179    }
180
181    /// Creates a session with a specific ID.
182    pub fn with_id(user_id: impl Into<String>, session_id: SessionId) -> Self {
183        let now = Utc::now();
184        Self {
185            id: session_id,
186            user_id: user_id.into(),
187            user_messages: Vec::new(),
188            agent_responses: Vec::new(),
189            trajectory_steps: Vec::new(),
190            active_seed_id: None,
191            active_persona_id: None,
192            project_id: None,
193            created_at: now,
194            updated_at: now,
195            metadata: SessionMetadata::new(),
196        }
197    }
198
199    /// Adds a user message to the session.
200    pub fn add_user_message(&mut self, content: impl Into<String>) {
201        self.user_messages.push(UserMessage {
202            content: content.into(),
203            timestamp: Utc::now(),
204        });
205        self.updated_at = Utc::now();
206    }
207
208    /// Adds an agent response to the session.
209    pub fn add_agent_response(&mut self, response: AgentResponse) {
210        self.agent_responses.push(response);
211        self.updated_at = Utc::now();
212    }
213
214    /// Appends trajectory steps to the session (RFC-015).
215    ///
216    /// Called by the orchestrator after each run so the Web UI can
217    /// re-render the execution timeline when the user re-opens the session.
218    pub fn extend_trajectory(&mut self, steps: Vec<TrajectoryStepRecord>) {
219        if steps.is_empty() {
220            return;
221        }
222        self.trajectory_steps.extend(steps);
223        self.updated_at = Utc::now();
224    }
225
226    /// Returns the trajectory steps recorded in this session.
227    pub fn trajectory(&self) -> &[TrajectoryStepRecord] {
228        &self.trajectory_steps
229    }
230
231    /// Sets the active seed ID.
232    pub fn set_active_seed(&mut self, seed_id: Option<String>) {
233        self.active_seed_id = seed_id;
234        self.updated_at = Utc::now();
235    }
236
237    /// Sets the active persona ID.
238    pub fn set_active_persona(&mut self, persona_id: Option<String>) {
239        self.active_persona_id = persona_id;
240        self.updated_at = Utc::now();
241    }
242
243    /// Sets a metadata value.
244    pub fn set_metadata(&mut self, key: impl Into<String>, value: serde_json::Value) {
245        self.metadata.insert(key.into(), value);
246        self.updated_at = Utc::now();
247    }
248
249    /// Gets a metadata value.
250    pub fn get_metadata(&self, key: &str) -> Option<&serde_json::Value> {
251        self.metadata.get(key)
252    }
253
254    /// Returns the total number of exchanges in this session.
255    pub fn exchange_count(&self) -> usize {
256        self.user_messages.len().min(self.agent_responses.len())
257    }
258
259    /// Returns true if the session is empty (no messages).
260    pub fn is_empty(&self) -> bool {
261        self.user_messages.is_empty()
262    }
263}
264/// A filesystem-based persistent state store.
265///
266/// Files are organized as `<base_path>/<category>/<name>.md` or
267/// `<base_path>/<category>/<name>.json`.
268#[derive(Clone)]
269pub struct StateStore {
270    /// Root directory for all state files.
271    pub base_path: PathBuf,
272}
273
274impl StateStore {
275    /// Creates a new state store, initializing the directory if needed.
276    ///
277    /// # Example
278    ///
279    /// ```no_run
280    /// use oxios_kernel::state_store::StateStore;
281    /// use std::path::PathBuf;
282    ///
283    /// let store = StateStore::new(PathBuf::from("/tmp/oxios-state")).unwrap();
284    /// ```
285    pub fn new(base_path: PathBuf) -> Result<Self> {
286        Ok(Self { base_path })
287    }
288
289    /// Validate that a category name does not contain path traversal.
290    fn validate_category(category: &str) -> Result<()> {
291        if category.contains("..") || category.contains('\\') {
292            bail!("invalid category name: '{category}'");
293        }
294        if category.is_empty()
295            || category.starts_with('/')
296            || category.ends_with('/')
297            || category.contains("//")
298        {
299            bail!("invalid category name: '{category}'");
300        }
301        Ok(())
302    }
303
304    /// Validate that a file name does not contain path traversal.
305    fn validate_name(name: &str) -> Result<()> {
306        if name.contains("..") || name.contains('/') || name.contains('\\') {
307            bail!("invalid file name: '{name}'");
308        }
309        Ok(())
310    }
311
312    /// Save a markdown file under the given category.
313    pub async fn save_markdown(&self, category: &str, name: &str, content: &str) -> Result<()> {
314        Self::validate_category(category)?;
315        Self::validate_name(name)?;
316        let dir = self.base_path.join(category);
317        fs::create_dir_all(&dir).await?;
318        let path = dir.join(format!("{name}.md"));
319
320        // Write to temp file first, then atomic rename
321        let temp_path = dir.join(format!(
322            "{name}.{}.{}.tmp",
323            std::process::id(),
324            uuid::Uuid::new_v4()
325        ));
326        fs::write(&temp_path, content).await?;
327        tokio::fs::rename(&temp_path, &path).await?;
328
329        Ok(())
330    }
331
332    /// Load a markdown file from the given category.
333    pub async fn load_markdown(&self, category: &str, name: &str) -> Result<Option<String>> {
334        Self::validate_category(category)?;
335        Self::validate_name(name)?;
336        let path = self.base_path.join(category).join(format!("{name}.md"));
337        match fs::read_to_string(&path).await {
338            Ok(content) => Ok(Some(content)),
339            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
340            Err(e) => Err(e.into()),
341        }
342    }
343
344    /// List all markdown files in a category (names without extension).
345    pub async fn list_category(&self, category: &str) -> Result<Vec<String>> {
346        Self::validate_category(category)?;
347        let dir = self.base_path.join(category);
348        if !dir.exists() {
349            return Ok(Vec::new());
350        }
351        let mut entries = fs::read_dir(&dir).await?;
352        let mut names = Vec::new();
353        while let Some(entry) = entries.next_entry().await? {
354            let path = entry.path();
355            if let Some(ext) = path.extension()
356                && (ext == "md" || ext == "json")
357                && let Some(stem) = path.file_stem()
358            {
359                names.push(stem.to_string_lossy().into_owned());
360            }
361        }
362        names.sort();
363        Ok(names)
364    }
365
366    /// Save a serializable value as JSON under the given category.
367    pub async fn save_json<T: Serialize>(
368        &self,
369        category: &str,
370        name: &str,
371        data: &T,
372    ) -> Result<()> {
373        Self::validate_category(category)?;
374        Self::validate_name(name)?;
375        let dir = self.base_path.join(category);
376        fs::create_dir_all(&dir).await?;
377        let path = dir.join(format!("{name}.json"));
378
379        let content = serde_json::to_string_pretty(data)?;
380
381        // Write to temp file first, then atomic rename
382        let temp_path = dir.join(format!(
383            "{name}.{}.{}.tmp",
384            std::process::id(),
385            uuid::Uuid::new_v4()
386        ));
387        fs::write(&temp_path, &content).await?;
388        tokio::fs::rename(&temp_path, &path).await?;
389
390        Ok(())
391    }
392
393    /// Load a deserializable value from JSON in the given category.
394    pub async fn load_json<T: DeserializeOwned>(
395        &self,
396        category: &str,
397        name: &str,
398    ) -> Result<Option<T>> {
399        Self::validate_category(category)?;
400        Self::validate_name(name)?;
401        let path = self.base_path.join(category).join(format!("{name}.json"));
402        match fs::read_to_string(&path).await {
403            Ok(content) => Ok(Some(serde_json::from_str(&content)?)),
404            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
405            Err(e) => Err(e.into()),
406        }
407    }
408
409    /// Delete a file from the given category.
410    pub async fn delete_file(&self, category: &str, name: &str) -> Result<bool> {
411        Self::validate_category(category)?;
412        Self::validate_name(name)?;
413        let path = self.base_path.join(category).join(format!("{name}.json"));
414        if path.exists() {
415            tokio::fs::remove_file(path).await?;
416            Ok(true)
417        } else {
418            let path = self.base_path.join(category).join(format!("{name}.md"));
419            if path.exists() {
420                tokio::fs::remove_file(path).await?;
421                Ok(true)
422            } else {
423                Ok(false)
424            }
425        }
426    }
427}
428
429impl std::fmt::Debug for StateStore {
430    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
431        f.debug_struct("StateStore")
432            .field("base_path", &self.base_path)
433            .finish()
434    }
435}
436
437impl StateStore {
438    /// Saves a session to the sessions category.
439    pub async fn save_session(&self, session: &Session) -> Result<()> {
440        self.save_json("sessions", &session.id.0, session).await
441    }
442
443    /// Saves a session and then runs pruning if auto_prune is enabled.
444    pub async fn save_session_with_prune(
445        &self,
446        session: &Session,
447        prune_config: &PruneConfig,
448    ) -> Result<()> {
449        self.save_session(session).await?;
450        // Prune in the background — don't block the response
451        let store = self.clone();
452        let config = prune_config.clone();
453        tokio::spawn(async move {
454            if let Err(e) = store.prune_sessions(&config).await {
455                tracing::warn!(error = %e, "Background session pruning failed");
456            }
457        });
458        Ok(())
459    }
460
461    /// Loads a session by ID.
462    pub async fn load_session(&self, session_id: &SessionId) -> Result<Option<Session>> {
463        self.load_json("sessions", &session_id.0).await
464    }
465
466    /// RFC-025 Phase 5: Load all sessions in full (messages + trajectories).
467    ///
468    /// Used by the Mount auto-promotion scanner, which needs trajectory
469    /// tool_args to identify paths the agent actually worked on. Cheaper to
470    /// call once per scan than `load_session` per id.
471    ///
472    /// **Warning:** loads every session on disk. Prefer
473    /// [`load_sessions_for_promotion`](Self::load_sessions_for_promotion)
474    /// for the promotion scanner to bound memory usage.
475    pub async fn load_all_sessions(&self) -> Result<Vec<Session>> {
476        let mut sessions = Vec::new();
477        if let Ok(names) = self.list_category("sessions").await {
478            for name in names {
479                if let Ok(Some(session)) = self.load_json::<Session>("sessions", &name).await {
480                    sessions.push(session);
481                }
482            }
483        }
484        Ok(sessions)
485    }
486
487    /// RFC-025 Phase 5: Load only sessions updated at or after `since`.
488    ///
489    /// Bounds memory usage for the Mount auto-promotion scanner: instead of
490    /// deserializing every session into a single `Vec`, we deserialize each
491    /// one and skip it immediately if its `updated_at` predates the cutoff.
492    /// The promotion window is bounded by `PromotionConfig::window_days`, so
493    /// loading older sessions is pure waste.
494    ///
495    /// Sessions that fail to deserialize are skipped (best effort), mirroring
496    /// [`load_all_sessions`](Self::load_all_sessions).
497    pub async fn load_sessions_for_promotion(&self, since: DateTime<Utc>) -> Result<Vec<Session>> {
498        let mut sessions = Vec::new();
499        if let Ok(names) = self.list_category("sessions").await {
500            for name in names {
501                if let Ok(Some(session)) = self.load_json::<Session>("sessions", &name).await {
502                    // Skip sessions outside the promotion window *before*
503                    // collecting — this is the whole point of the cutoff.
504                    if session.updated_at < since {
505                        continue;
506                    }
507                    sessions.push(session);
508                }
509            }
510        }
511        Ok(sessions)
512    }
513
514    /// Lists all sessions (sorted by updated_at descending).
515    pub async fn list_sessions(&self) -> Result<Vec<SessionSummary>> {
516        let mut sessions = Vec::new();
517
518        if let Ok(names) = self.list_category("sessions").await {
519            for name in names {
520                if let Ok(Some(session)) = self.load_json::<Session>("sessions", &name).await {
521                    sessions.push(SessionSummary {
522                        id: session.id.0.clone(),
523                        user_id: session.user_id.clone(),
524                        message_count: session.user_messages.len(),
525                        title: session
526                            .metadata
527                            .get("title")
528                            .and_then(|v| v.as_str())
529                            .map(String::from)
530                            .or_else(|| {
531                                // Auto-generate from first user message
532                                session.user_messages.first().map(|m| {
533                                    let s = m.content.lines().next().unwrap_or("");
534                                    if s.len() > 60 {
535                                        format!("{}…", &s[..s.ceil_char_boundary(59)])
536                                    } else {
537                                        s.to_string()
538                                    }
539                                })
540                            }),
541                        active_seed_id: session.active_seed_id.clone(),
542                        project_id: session
543                            .project_id
544                            .clone()
545                            // Backward-compat: fall back to legacy metadata keys.
546                            .or_else(|| {
547                                session
548                                    .metadata
549                                    .get("project_id")
550                                    .and_then(|v| v.as_str())
551                                    .map(String::from)
552                            })
553                            .or_else(|| {
554                                session
555                                    .metadata
556                                    .get("project_ids")
557                                    .and_then(|v| v.as_str())
558                                    .and_then(|s| s.split(',').next().map(String::from))
559                            }),
560                        created_at: session.created_at,
561                        updated_at: session.updated_at,
562                    });
563                }
564            }
565        }
566
567        // Sort by updated_at descending (most recent first)
568        sessions.sort_by_key(|b| std::cmp::Reverse(b.updated_at));
569        Ok(sessions)
570    }
571
572    /// Deletes a session by ID.
573    pub async fn delete_session(&self, session_id: &SessionId) -> Result<bool> {
574        let path = self
575            .base_path
576            .join("sessions")
577            .join(format!("{}.json", session_id.0));
578        match fs::remove_file(&path).await {
579            Ok(()) => {
580                tracing::info!(session_id = %session_id, "Session deleted");
581                Ok(true)
582            }
583            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
584            Err(e) => Err(e.into()),
585        }
586    }
587
588    /// Gets or creates a session for a user, initializing with the given session ID.
589    pub async fn get_or_create_session(
590        &self,
591        user_id: &str,
592        session_id: Option<&SessionId>,
593    ) -> Result<Session> {
594        if let Some(sid) = session_id
595            && let Some(existing) = self.load_session(sid).await?
596        {
597            return Ok(existing);
598        }
599
600        // Create new session
601        let session = match session_id {
602            Some(sid) => Session::with_id(user_id, sid.clone()),
603            None => Session::new(user_id),
604        };
605
606        self.save_session(&session).await?;
607        Ok(session)
608    }
609
610    /// Updates an existing session, saving it to disk.
611    pub async fn update_session(&self, session: &Session) -> Result<()> {
612        self.save_session(session).await
613    }
614
615    /// RFC-025: Move a session to a different Project (drag-to-reparent).
616    ///
617    /// Pass `None` to unassign (move to "unfiled").
618    pub async fn move_session_to_project(
619        &self,
620        session_id: &SessionId,
621        project_id: Option<&str>,
622    ) -> Result<bool> {
623        match self.load_session(session_id).await? {
624            Some(mut session) => {
625                session.project_id = project_id.map(String::from);
626                session.updated_at = Utc::now();
627                self.save_session(&session).await?;
628                Ok(true)
629            }
630            None => Ok(false),
631        }
632    }
633
634    /// Prune sessions based on configuration.
635    ///
636    /// Removes sessions that exceed TTL or exceed the maximum count.
637    /// Returns the number of sessions pruned.
638    pub async fn prune_sessions(&self, config: &PruneConfig) -> Result<usize> {
639        let mut sessions = self.list_sessions().await?;
640        let mut pruned = 0;
641
642        // TTL-based pruning: remove sessions older than ttl_hours
643        if config.ttl_hours > 0 {
644            let cutoff = Utc::now() - chrono::Duration::hours(config.ttl_hours as i64);
645            let to_prune_ttl: Vec<String> = sessions
646                .iter()
647                .filter(|s| s.updated_at < cutoff)
648                .map(|s| s.id.clone())
649                .collect();
650
651            for id in &to_prune_ttl {
652                let sid = SessionId(id.clone());
653                if self.delete_session(&sid).await.is_ok() {
654                    pruned += 1;
655                }
656            }
657
658            // Remove pruned sessions from the list for count-based pruning
659            sessions.retain(|s| !to_prune_ttl.contains(&s.id));
660        }
661
662        // Count-based pruning: keep only the most recent `max_sessions`
663        if config.max_sessions > 0 && sessions.len() > config.max_sessions {
664            // sessions are already sorted by updated_at descending
665            let excess = sessions.len() - config.max_sessions;
666            for session in sessions.into_iter().rev().take(excess) {
667                let sid = SessionId(session.id);
668                if self.delete_session(&sid).await.is_ok() {
669                    pruned += 1;
670                }
671            }
672        }
673
674        if pruned > 0 {
675            tracing::info!(pruned = pruned, "Session pruning completed");
676        }
677
678        Ok(pruned)
679    }
680
681    /// Prune agent records based on config.
682    ///
683    /// 1. TTL-based: delete agents with created_at older than ttl_hours.
684    /// 2. Count-based: if still over max_entries, delete oldest.
685    pub async fn prune_agents_by_config(
686        &self,
687        max_entries: usize,
688        ttl_hours: u64,
689        batch_size: usize,
690    ) -> Result<usize> {
691        let mut pruned = 0usize;
692
693        let names = self.list_category("agents").await?;
694        if names.is_empty() {
695            return Ok(0);
696        }
697
698        let now = Utc::now();
699
700        // 1. TTL-based pruning
701        let mut remaining: Vec<(String, DateTime<Utc>)> = Vec::with_capacity(names.len());
702
703        if ttl_hours > 0 {
704            let cutoff = now - chrono::Duration::hours(ttl_hours as i64);
705            for name in &names {
706                // Load just enough to check created_at
707                if let Ok(Some(info)) = self
708                    .load_json::<crate::types::AgentInfo>("agents", name)
709                    .await
710                {
711                    if info.created_at < cutoff {
712                        if self.delete_file("agents", name).await.unwrap_or(false) {
713                            pruned += 1;
714                        }
715                    } else {
716                        remaining.push((name.clone(), info.created_at));
717                    }
718                }
719            }
720        } else {
721            // Load all created_at for count-based pruning
722            for name in &names {
723                if let Ok(Some(info)) = self
724                    .load_json::<crate::types::AgentInfo>("agents", name)
725                    .await
726                {
727                    remaining.push((name.clone(), info.created_at));
728                }
729            }
730        }
731
732        // 2. Count-based pruning
733        if max_entries > 0 && remaining.len() > max_entries {
734            // Sort by created_at ascending (oldest first)
735            remaining.sort_by_key(|a| a.1);
736
737            let excess = remaining.len() - max_entries;
738            let to_delete = excess.min(batch_size);
739
740            for (name, _) in remaining.iter().take(to_delete) {
741                if self.delete_file("agents", name).await.unwrap_or(false) {
742                    pruned += 1;
743                }
744            }
745        }
746
747        if pruned > 0 {
748            tracing::info!(pruned = pruned, "Agent filesystem pruning completed");
749        }
750
751        Ok(pruned)
752    }
753}
754
755/// Summary of a session for listing (without full message history).
756#[derive(Debug, Clone, Serialize, Deserialize)]
757pub struct SessionSummary {
758    /// Session ID.
759    pub id: String,
760    /// User ID who owns this session.
761    pub user_id: String,
762    /// Number of messages in this session.
763    pub message_count: usize,
764    /// Auto-generated title for this session. Derived from the first user
765    /// message (truncated to ~60 chars) when not explicitly set.
766    #[serde(skip_serializing_if = "Option::is_none")]
767    pub title: Option<String>,
768    /// Active seed ID if any.
769    #[serde(skip_serializing_if = "Option::is_none")]
770    pub active_seed_id: Option<String>,
771    /// Active project ID(s) this session belongs to.
772    #[serde(skip_serializing_if = "Option::is_none")]
773    pub project_id: Option<String>,
774    /// When the session was created.
775    pub created_at: DateTime<Utc>,
776    /// When the session was last updated.
777    pub updated_at: DateTime<Utc>,
778}
779
780/// Configuration for session pruning.
781#[derive(Debug, Clone)]
782pub struct PruneConfig {
783    /// Maximum number of sessions to keep. 0 = unlimited.
784    pub max_sessions: usize,
785    /// TTL in hours. Sessions older than this are pruned. 0 = no TTL.
786    pub ttl_hours: u64,
787}
788
789impl Default for PruneConfig {
790    fn default() -> Self {
791        Self {
792            max_sessions: 100,
793            ttl_hours: 168, // 7 days
794        }
795    }
796}
797
798/// Tracks the last time a prune was performed, enabling cooldown.
799pub struct PruneThrottle {
800    /// Instant of the last prune (monotonic).
801    last_prune: std::sync::Mutex<Option<std::time::Instant>>,
802    /// Minimum seconds between prune runs.
803    cooldown_secs: u64,
804}
805
806impl PruneThrottle {
807    /// Create a new throttle with the given cooldown.
808    pub fn new(cooldown_secs: u64) -> Self {
809        Self {
810            last_prune: std::sync::Mutex::new(None),
811            cooldown_secs,
812        }
813    }
814
815    /// Check if enough time has elapsed since the last prune.
816    /// Returns `true` if prune should proceed.
817    pub fn should_prune(&self) -> bool {
818        // SAFETY: parking_lot::Mutex never poisons, but std::sync::Mutex does.
819        // Recover from poison by taking the inner value so pruning continues.
820        let mut guard = self.last_prune.lock().unwrap_or_else(|e| {
821            tracing::warn!("PruneThrottle mutex poisoned, recovering: {e}");
822            e.into_inner()
823        });
824        let now = std::time::Instant::now();
825        match *guard {
826            Some(last) => {
827                if now.duration_since(last).as_secs() >= self.cooldown_secs {
828                    *guard = Some(now);
829                    true
830                } else {
831                    false
832                }
833            }
834            None => {
835                *guard = Some(now);
836                true
837            }
838        }
839    }
840}
841
842#[cfg(test)]
843mod tests {
844    use super::*;
845
846    #[tokio::test]
847    async fn test_session_creation_and_persistence() {
848        let temp_dir = tempfile::tempdir().unwrap();
849        let store = StateStore::new(temp_dir.path().to_path_buf()).unwrap();
850
851        // Create a session
852        let mut session = Session::new("user-123");
853        session.add_user_message("Hello");
854
855        // Save and load
856        store.save_session(&session).await.unwrap();
857        let loaded = store.load_session(&session.id).await.unwrap();
858        assert!(loaded.is_some());
859        let loaded = loaded.unwrap();
860        assert_eq!(loaded.user_id, "user-123");
861        assert_eq!(loaded.user_messages.len(), 1);
862    }
863
864    #[tokio::test]
865    async fn test_session_list_sorts_by_updated() {
866        let temp_dir = tempfile::tempdir().unwrap();
867        let store = StateStore::new(temp_dir.path().to_path_buf()).unwrap();
868
869        // Create multiple sessions
870        for i in 0..3 {
871            let mut session = Session::new(&format!("user-{}", i));
872            session.add_user_message(&format!("Message {}", i));
873            store.save_session(&session).await.unwrap();
874            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
875        }
876
877        let sessions = store.list_sessions().await.unwrap();
878        assert_eq!(sessions.len(), 3);
879        // Most recently updated should be first
880        assert_eq!(sessions[0].user_id, "user-2");
881    }
882
883    #[tokio::test]
884    async fn test_delete_session() {
885        let temp_dir = tempfile::tempdir().unwrap();
886        let store = StateStore::new(temp_dir.path().to_path_buf()).unwrap();
887
888        let session = Session::new("user-123");
889        store.save_session(&session).await.unwrap();
890
891        // Delete and verify
892        let deleted = store.delete_session(&session.id).await.unwrap();
893        assert!(deleted);
894
895        let loaded = store.load_session(&session.id).await.unwrap();
896        assert!(loaded.is_none());
897    }
898
899    #[tokio::test]
900    async fn test_get_or_create_session_existing() {
901        let temp_dir = tempfile::tempdir().unwrap();
902        let store = StateStore::new(temp_dir.path().to_path_buf()).unwrap();
903
904        let mut existing = Session::new("user-123");
905        existing.add_user_message("Original message");
906        store.save_session(&existing).await.unwrap();
907
908        // Get or create with same ID should return existing
909        let retrieved = store
910            .get_or_create_session("user-123", Some(&existing.id))
911            .await
912            .unwrap();
913        assert_eq!(retrieved.id, existing.id);
914        assert_eq!(retrieved.user_messages.len(), 1);
915    }
916
917    #[tokio::test]
918    async fn test_get_or_create_session_new() {
919        let temp_dir = tempfile::tempdir().unwrap();
920        let store = StateStore::new(temp_dir.path().to_path_buf()).unwrap();
921
922        // Get or create without existing session should create new
923        let session = store.get_or_create_session("user-456", None).await.unwrap();
924        assert_eq!(session.user_id, "user-456");
925        assert!(session.user_messages.is_empty());
926    }
927
928    #[tokio::test]
929    async fn test_prune_sessions_by_count() {
930        let temp_dir = tempfile::tempdir().unwrap();
931        let store = StateStore::new(temp_dir.path().to_path_buf()).unwrap();
932
933        // Create 5 sessions
934        for i in 0..5 {
935            let mut session = Session::new(&format!("user-{}", i));
936            session.add_user_message(&format!("Message {}", i));
937            store.save_session(&session).await.unwrap();
938            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
939        }
940
941        // Prune to max 3
942        let config = PruneConfig {
943            max_sessions: 3,
944            ttl_hours: 0,
945        };
946        let pruned = store.prune_sessions(&config).await.unwrap();
947        assert_eq!(pruned, 2);
948
949        let remaining = store.list_sessions().await.unwrap();
950        assert_eq!(remaining.len(), 3);
951        // Oldest sessions (user-0, user-1) should be pruned
952        let remaining_ids: Vec<&str> = remaining.iter().map(|s| s.user_id.as_str()).collect();
953        assert!(remaining_ids.contains(&"user-2"));
954        assert!(remaining_ids.contains(&"user-3"));
955        assert!(remaining_ids.contains(&"user-4"));
956    }
957
958    #[tokio::test]
959    async fn test_prune_sessions_by_ttl() {
960        let temp_dir = tempfile::tempdir().unwrap();
961        let store = StateStore::new(temp_dir.path().to_path_buf()).unwrap();
962
963        // Create a session and manually set updated_at to the past
964        let mut old_session = Session::new("old-user");
965        old_session.updated_at = Utc::now() - chrono::Duration::hours(48);
966        store.save_session(&old_session).await.unwrap();
967
968        // Create a recent session
969        let mut recent_session = Session::new("recent-user");
970        recent_session.add_user_message("Hello");
971        store.save_session(&recent_session).await.unwrap();
972
973        // Prune with 24-hour TTL
974        let config = PruneConfig {
975            max_sessions: 0,
976            ttl_hours: 24,
977        };
978        let pruned = store.prune_sessions(&config).await.unwrap();
979        assert_eq!(pruned, 1);
980
981        let remaining = store.list_sessions().await.unwrap();
982        assert_eq!(remaining.len(), 1);
983        assert_eq!(remaining[0].user_id, "recent-user");
984    }
985
986    #[tokio::test]
987    async fn test_load_sessions_for_promotion_filters_by_cutoff() {
988        // Promo-1: sessions older than the cutoff must be skipped before
989        // being collected, bounding memory for the promotion scanner.
990        let temp_dir = tempfile::tempdir().unwrap();
991        let store = StateStore::new(temp_dir.path().to_path_buf()).unwrap();
992
993        // Old session — updated 48h ago.
994        let mut old_session = Session::new("old-user");
995        old_session.updated_at = Utc::now() - chrono::Duration::hours(48);
996        store.save_session(&old_session).await.unwrap();
997
998        // Recent session — updated now.
999        let recent_session = Session::new("recent-user");
1000        store.save_session(&recent_session).await.unwrap();
1001
1002        // Cutoff 24h ago: only the recent session should be loaded.
1003        let cutoff = Utc::now() - chrono::Duration::hours(24);
1004        let sessions = store.load_sessions_for_promotion(cutoff).await.unwrap();
1005        assert_eq!(sessions.len(), 1, "old session must be filtered out");
1006        assert_eq!(sessions[0].user_id, "recent-user");
1007
1008        // A cutoff far in the future returns everything (boundary check).
1009        let far_cutoff = Utc::now() - chrono::Duration::days(365);
1010        let all = store.load_sessions_for_promotion(far_cutoff).await.unwrap();
1011        assert_eq!(all.len(), 2);
1012    }
1013}