Skip to main content

thulp_workspace/
session_manager.rs

1//! Session manager for persisting and managing sessions.
2//!
3//! This module provides the `SessionManager` for creating, loading,
4//! saving, and querying sessions with file-based persistence.
5
6use crate::filter::SessionFilter;
7use crate::session::{
8    EntryType, Session, SessionEntry, SessionId, SessionMetadata, SessionStatus, SessionType,
9    Timestamp,
10};
11use crate::{Result, Workspace, WorkspaceError};
12use serde_json::Value;
13use std::collections::HashMap;
14use std::path::PathBuf;
15use std::sync::Arc;
16use tokio::fs;
17use tokio::sync::RwLock;
18use tracing::{debug, info, warn};
19
20/// Manager for session persistence and lifecycle.
21///
22/// The `SessionManager` provides file-based persistence for sessions,
23/// storing them in `{workspace}/.thulp/sessions/` as JSON files.
24///
25/// # Example
26///
27/// ```ignore
28/// use thulp_workspace::{Workspace, SessionManager, SessionType};
29///
30/// let workspace = Workspace::new("my-workspace", "My Workspace", PathBuf::from("."));
31/// let manager = SessionManager::new(&workspace).await?;
32///
33/// let session = manager.create_session("My Session", SessionType::Conversation {
34///     purpose: "Testing".to_string(),
35/// }).await?;
36///
37/// manager.add_entry(&session.id(), EntryType::UserMessage, json!({"text": "Hello"})).await?;
38/// ```
39pub struct SessionManager {
40    /// Directory where sessions are stored.
41    sessions_dir: PathBuf,
42    /// In-memory cache of active sessions.
43    active_sessions: Arc<RwLock<HashMap<SessionId, Session>>>,
44}
45
46impl SessionManager {
47    /// Create a new session manager for the given workspace.
48    ///
49    /// This will create the sessions directory if it doesn't exist.
50    pub async fn new(workspace: &Workspace) -> Result<Self> {
51        let sessions_dir = workspace.root.join(".thulp").join("sessions");
52        fs::create_dir_all(&sessions_dir).await?;
53
54        debug!(?sessions_dir, "Initialized session manager");
55
56        Ok(Self {
57            sessions_dir,
58            active_sessions: Arc::new(RwLock::new(HashMap::new())),
59        })
60    }
61
62    /// Create a new session manager with a custom sessions directory.
63    pub async fn with_sessions_dir(sessions_dir: PathBuf) -> Result<Self> {
64        fs::create_dir_all(&sessions_dir).await?;
65
66        Ok(Self {
67            sessions_dir,
68            active_sessions: Arc::new(RwLock::new(HashMap::new())),
69        })
70    }
71
72    /// Get the path to a session file.
73    fn session_path(&self, id: &SessionId) -> PathBuf {
74        self.sessions_dir.join(format!("{}.json", id))
75    }
76
77    /// Create a new session.
78    ///
79    /// The session is automatically persisted to disk and cached in memory.
80    pub async fn create_session(
81        &self,
82        name: impl Into<String>,
83        session_type: SessionType,
84    ) -> Result<Session> {
85        let session = Session::new(name, session_type);
86        let id = session.id().clone();
87
88        // Persist to disk
89        self.save_session_internal(&session).await?;
90
91        // Cache in memory
92        {
93            let mut sessions = self.active_sessions.write().await;
94            sessions.insert(id.clone(), session.clone());
95        }
96
97        info!(session_id = %id, "Created new session");
98        Ok(session)
99    }
100
101    /// Load a session from disk.
102    ///
103    /// If the session is already cached in memory, returns the cached version.
104    pub async fn load_session(&self, id: &SessionId) -> Result<Session> {
105        // Check cache first
106        {
107            let sessions = self.active_sessions.read().await;
108            if let Some(session) = sessions.get(id) {
109                return Ok(session.clone());
110            }
111        }
112
113        // Load from disk
114        let path = self.session_path(id);
115        let content = fs::read_to_string(&path).await.map_err(|e| {
116            if e.kind() == std::io::ErrorKind::NotFound {
117                WorkspaceError::NotFound(format!("Session {} not found", id))
118            } else {
119                WorkspaceError::Io(e)
120            }
121        })?;
122
123        let session: Session = serde_json::from_str(&content)
124            .map_err(|e| WorkspaceError::Serialization(e.to_string()))?;
125
126        // Cache for future access
127        {
128            let mut sessions = self.active_sessions.write().await;
129            sessions.insert(id.clone(), session.clone());
130        }
131
132        debug!(session_id = %id, "Loaded session from disk");
133        Ok(session)
134    }
135
136    /// Save a session to disk.
137    pub async fn save_session(&self, session: &Session) -> Result<()> {
138        self.save_session_internal(session).await?;
139
140        // Update cache
141        {
142            let mut sessions = self.active_sessions.write().await;
143            sessions.insert(session.id().clone(), session.clone());
144        }
145
146        Ok(())
147    }
148
149    /// Internal save without updating cache (to avoid deadlocks).
150    async fn save_session_internal(&self, session: &Session) -> Result<()> {
151        let path = self.session_path(session.id());
152        let content = serde_json::to_string_pretty(session)
153            .map_err(|e| WorkspaceError::Serialization(e.to_string()))?;
154
155        fs::write(&path, content).await?;
156        debug!(session_id = %session.id(), "Saved session to disk");
157        Ok(())
158    }
159
160    /// Add an entry to a session.
161    ///
162    /// The session is automatically saved after adding the entry.
163    pub async fn add_entry(
164        &self,
165        session_id: &SessionId,
166        entry_type: EntryType,
167        content: Value,
168    ) -> Result<SessionEntry> {
169        let entry = SessionEntry::new(entry_type, content);
170
171        {
172            let mut sessions = self.active_sessions.write().await;
173            if let Some(session) = sessions.get_mut(session_id) {
174                session.add_entry(entry.clone());
175                // Save to disk
176                self.save_session_internal(session).await?;
177            } else {
178                // Load from disk, add entry, save
179                drop(sessions); // Release lock before loading
180                let mut session = self.load_session(session_id).await?;
181                session.add_entry(entry.clone());
182                self.save_session(&session).await?;
183            }
184        }
185
186        debug!(session_id = %session_id, entry_id = %entry.id, "Added entry to session");
187        Ok(entry)
188    }
189
190    /// Complete a session.
191    ///
192    /// Marks the session as completed and saves it.
193    pub async fn complete_session(&self, session_id: &SessionId) -> Result<()> {
194        self.update_status(session_id, SessionStatus::Completed)
195            .await
196    }
197
198    /// Fail a session.
199    ///
200    /// Marks the session as failed and saves it.
201    pub async fn fail_session(&self, session_id: &SessionId) -> Result<()> {
202        self.update_status(session_id, SessionStatus::Failed).await
203    }
204
205    /// Cancel a session.
206    ///
207    /// Marks the session as cancelled and saves it.
208    pub async fn cancel_session(&self, session_id: &SessionId) -> Result<()> {
209        self.update_status(session_id, SessionStatus::Cancelled)
210            .await
211    }
212
213    /// Pause a session.
214    ///
215    /// Marks the session as paused and saves it.
216    pub async fn pause_session(&self, session_id: &SessionId) -> Result<()> {
217        self.update_status(session_id, SessionStatus::Paused).await
218    }
219
220    /// Resume a paused session.
221    ///
222    /// Marks the session as active and saves it.
223    pub async fn resume_session(&self, session_id: &SessionId) -> Result<()> {
224        self.update_status(session_id, SessionStatus::Active).await
225    }
226
227    /// Update session status.
228    async fn update_status(&self, session_id: &SessionId, status: SessionStatus) -> Result<()> {
229        let mut session = self.load_session(session_id).await?;
230        session.set_status(status);
231        self.save_session(&session).await?;
232
233        info!(session_id = %session_id, ?status, "Updated session status");
234        Ok(())
235    }
236
237    /// List all sessions, optionally filtered.
238    pub async fn list_sessions(
239        &self,
240        filter: Option<&SessionFilter>,
241    ) -> Result<Vec<SessionMetadata>> {
242        let mut metadata_list = Vec::new();
243
244        let mut entries = fs::read_dir(&self.sessions_dir).await?;
245        while let Some(entry) = entries.next_entry().await? {
246            let path = entry.path();
247            if path.extension().and_then(|s| s.to_str()) != Some("json") {
248                continue;
249            }
250
251            match fs::read_to_string(&path).await {
252                Ok(content) => {
253                    match serde_json::from_str::<Session>(&content) {
254                        Ok(session) => {
255                            let metadata = session.metadata.clone();
256
257                            // Apply filter if provided
258                            if let Some(filter) = filter {
259                                if filter.matches(&session) {
260                                    metadata_list.push(metadata);
261                                }
262                            } else {
263                                metadata_list.push(metadata);
264                            }
265                        }
266                        Err(e) => {
267                            warn!(path = ?path, error = %e, "Failed to parse session file");
268                        }
269                    }
270                }
271                Err(e) => {
272                    warn!(path = ?path, error = %e, "Failed to read session file");
273                }
274            }
275        }
276
277        // Sort by updated_at descending (most recent first)
278        metadata_list.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
279
280        Ok(metadata_list)
281    }
282
283    /// Delete a session.
284    ///
285    /// Removes the session from disk and cache.
286    pub async fn delete_session(&self, session_id: &SessionId) -> Result<()> {
287        // Remove from cache
288        {
289            let mut sessions = self.active_sessions.write().await;
290            sessions.remove(session_id);
291        }
292
293        // Remove from disk
294        let path = self.session_path(session_id);
295        if path.exists() {
296            fs::remove_file(&path).await?;
297            info!(session_id = %session_id, "Deleted session");
298        }
299
300        Ok(())
301    }
302
303    /// Check if a session exists.
304    pub async fn session_exists(&self, session_id: &SessionId) -> bool {
305        // Check cache
306        {
307            let sessions = self.active_sessions.read().await;
308            if sessions.contains_key(session_id) {
309                return true;
310            }
311        }
312
313        // Check disk
314        self.session_path(session_id).exists()
315    }
316
317    /// Get a session without loading it into cache.
318    ///
319    /// Useful for one-off reads where caching isn't beneficial.
320    pub async fn peek_session(&self, session_id: &SessionId) -> Result<Session> {
321        let path = self.session_path(session_id);
322        let content = fs::read_to_string(&path).await.map_err(|e| {
323            if e.kind() == std::io::ErrorKind::NotFound {
324                WorkspaceError::NotFound(format!("Session {} not found", session_id))
325            } else {
326                WorkspaceError::Io(e)
327            }
328        })?;
329
330        serde_json::from_str(&content).map_err(|e| WorkspaceError::Serialization(e.to_string()))
331    }
332
333    /// Evict a session from the in-memory cache.
334    ///
335    /// The session remains on disk but is removed from memory.
336    pub async fn evict_from_cache(&self, session_id: &SessionId) {
337        let mut sessions = self.active_sessions.write().await;
338        sessions.remove(session_id);
339    }
340
341    /// Clear all sessions from the in-memory cache.
342    pub async fn clear_cache(&self) {
343        let mut sessions = self.active_sessions.write().await;
344        sessions.clear();
345    }
346
347    /// Get the number of cached sessions.
348    pub async fn cached_session_count(&self) -> usize {
349        let sessions = self.active_sessions.read().await;
350        sessions.len()
351    }
352
353    /// Get all active sessions (in-memory only).
354    pub async fn active_sessions(&self) -> Vec<Session> {
355        let sessions = self.active_sessions.read().await;
356        sessions.values().cloned().collect()
357    }
358
359    /// Find sessions by tag.
360    pub async fn find_by_tag(&self, tag: &str) -> Result<Vec<SessionMetadata>> {
361        self.list_sessions(Some(&SessionFilter::HasTag(tag.to_string())))
362            .await
363    }
364
365    /// Find sessions by type.
366    pub async fn find_by_type(&self, session_type_name: &str) -> Result<Vec<SessionMetadata>> {
367        self.list_sessions(Some(&SessionFilter::ByTypeName(
368            session_type_name.to_string(),
369        )))
370        .await
371    }
372
373    /// Find sessions by status.
374    pub async fn find_by_status(&self, status: SessionStatus) -> Result<Vec<SessionMetadata>> {
375        self.list_sessions(Some(&SessionFilter::ByStatus(status)))
376            .await
377    }
378
379    /// Find sessions created after a timestamp.
380    pub async fn find_created_after(&self, timestamp: Timestamp) -> Result<Vec<SessionMetadata>> {
381        self.list_sessions(Some(&SessionFilter::CreatedAfter(timestamp)))
382            .await
383    }
384
385    /// Find sessions updated after a timestamp.
386    pub async fn find_updated_after(&self, timestamp: Timestamp) -> Result<Vec<SessionMetadata>> {
387        self.list_sessions(Some(&SessionFilter::UpdatedAfter(timestamp)))
388            .await
389    }
390
391    /// Get session count on disk.
392    pub async fn session_count(&self) -> Result<usize> {
393        let mut count = 0;
394        let mut entries = fs::read_dir(&self.sessions_dir).await?;
395        while let Some(entry) = entries.next_entry().await? {
396            if entry.path().extension().and_then(|s| s.to_str()) == Some("json") {
397                count += 1;
398            }
399        }
400        Ok(count)
401    }
402}
403
404impl std::fmt::Debug for SessionManager {
405    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
406        f.debug_struct("SessionManager")
407            .field("sessions_dir", &self.sessions_dir)
408            .finish_non_exhaustive()
409    }
410}
411
412#[cfg(test)]
413mod tests {
414    use super::*;
415    use tempfile::TempDir;
416
417    async fn create_test_manager() -> (SessionManager, TempDir) {
418        let temp_dir = TempDir::new().unwrap();
419        let sessions_dir = temp_dir.path().join("sessions");
420        let manager = SessionManager::with_sessions_dir(sessions_dir)
421            .await
422            .unwrap();
423        (manager, temp_dir)
424    }
425
426    #[tokio::test]
427    async fn test_create_session() {
428        let (manager, _temp) = create_test_manager().await;
429
430        let session = manager
431            .create_session(
432                "Test Session",
433                SessionType::Conversation {
434                    purpose: "Testing".to_string(),
435                },
436            )
437            .await
438            .unwrap();
439
440        assert_eq!(session.name(), "Test Session");
441        assert_eq!(session.status(), SessionStatus::Active);
442    }
443
444    #[tokio::test]
445    async fn test_load_session() {
446        let (manager, _temp) = create_test_manager().await;
447
448        let created = manager
449            .create_session(
450                "Test Session",
451                SessionType::Conversation {
452                    purpose: "Testing".to_string(),
453                },
454            )
455            .await
456            .unwrap();
457
458        // Clear cache to force disk load
459        manager.clear_cache().await;
460
461        let loaded = manager.load_session(created.id()).await.unwrap();
462        assert_eq!(loaded.name(), created.name());
463        assert_eq!(loaded.id(), created.id());
464    }
465
466    #[tokio::test]
467    async fn test_add_entry() {
468        let (manager, _temp) = create_test_manager().await;
469
470        let session = manager
471            .create_session(
472                "Test Session",
473                SessionType::Conversation {
474                    purpose: "Testing".to_string(),
475                },
476            )
477            .await
478            .unwrap();
479
480        let entry = manager
481            .add_entry(
482                session.id(),
483                EntryType::UserMessage,
484                serde_json::json!({"text": "Hello"}),
485            )
486            .await
487            .unwrap();
488
489        assert!(matches!(entry.entry_type, EntryType::UserMessage));
490
491        // Verify entry was persisted
492        manager.clear_cache().await;
493        let loaded = manager.load_session(session.id()).await.unwrap();
494        assert_eq!(loaded.entries.len(), 1);
495    }
496
497    #[tokio::test]
498    async fn test_complete_session() {
499        let (manager, _temp) = create_test_manager().await;
500
501        let session = manager
502            .create_session(
503                "Test Session",
504                SessionType::Conversation {
505                    purpose: "Testing".to_string(),
506                },
507            )
508            .await
509            .unwrap();
510
511        manager.complete_session(session.id()).await.unwrap();
512
513        let loaded = manager.load_session(session.id()).await.unwrap();
514        assert_eq!(loaded.status(), SessionStatus::Completed);
515    }
516
517    #[tokio::test]
518    async fn test_list_sessions() {
519        let (manager, _temp) = create_test_manager().await;
520
521        // Create multiple sessions
522        manager
523            .create_session(
524                "Session 1",
525                SessionType::Conversation {
526                    purpose: "Test 1".to_string(),
527                },
528            )
529            .await
530            .unwrap();
531
532        manager
533            .create_session(
534                "Session 2",
535                SessionType::Conversation {
536                    purpose: "Test 2".to_string(),
537                },
538            )
539            .await
540            .unwrap();
541
542        let sessions = manager.list_sessions(None).await.unwrap();
543        assert_eq!(sessions.len(), 2);
544    }
545
546    #[tokio::test]
547    async fn test_delete_session() {
548        let (manager, _temp) = create_test_manager().await;
549
550        let session = manager
551            .create_session(
552                "Test Session",
553                SessionType::Conversation {
554                    purpose: "Testing".to_string(),
555                },
556            )
557            .await
558            .unwrap();
559
560        manager.delete_session(session.id()).await.unwrap();
561
562        assert!(!manager.session_exists(session.id()).await);
563    }
564
565    #[tokio::test]
566    async fn test_session_exists() {
567        let (manager, _temp) = create_test_manager().await;
568
569        let session = manager
570            .create_session(
571                "Test Session",
572                SessionType::Conversation {
573                    purpose: "Testing".to_string(),
574                },
575            )
576            .await
577            .unwrap();
578
579        assert!(manager.session_exists(session.id()).await);
580
581        let fake_id = SessionId::new();
582        assert!(!manager.session_exists(&fake_id).await);
583    }
584
585    #[tokio::test]
586    async fn test_filter_by_status() {
587        let (manager, _temp) = create_test_manager().await;
588
589        let session1 = manager
590            .create_session(
591                "Active Session",
592                SessionType::Conversation {
593                    purpose: "Test".to_string(),
594                },
595            )
596            .await
597            .unwrap();
598
599        let session2 = manager
600            .create_session(
601                "Completed Session",
602                SessionType::Conversation {
603                    purpose: "Test".to_string(),
604                },
605            )
606            .await
607            .unwrap();
608
609        manager.complete_session(session2.id()).await.unwrap();
610
611        let active = manager.find_by_status(SessionStatus::Active).await.unwrap();
612        assert_eq!(active.len(), 1);
613        assert_eq!(active[0].id, session1.metadata.id);
614
615        let completed = manager
616            .find_by_status(SessionStatus::Completed)
617            .await
618            .unwrap();
619        assert_eq!(completed.len(), 1);
620        assert_eq!(completed[0].id, session2.metadata.id);
621    }
622
623    #[tokio::test]
624    async fn test_cache_operations() {
625        let (manager, _temp) = create_test_manager().await;
626
627        let session = manager
628            .create_session(
629                "Test Session",
630                SessionType::Conversation {
631                    purpose: "Testing".to_string(),
632                },
633            )
634            .await
635            .unwrap();
636
637        assert_eq!(manager.cached_session_count().await, 1);
638
639        manager.evict_from_cache(session.id()).await;
640        assert_eq!(manager.cached_session_count().await, 0);
641
642        // Session should still exist on disk
643        assert!(manager.session_exists(session.id()).await);
644    }
645
646    #[tokio::test]
647    async fn test_session_count() {
648        let (manager, _temp) = create_test_manager().await;
649
650        assert_eq!(manager.session_count().await.unwrap(), 0);
651
652        manager
653            .create_session(
654                "Session 1",
655                SessionType::Conversation {
656                    purpose: "Test".to_string(),
657                },
658            )
659            .await
660            .unwrap();
661
662        manager
663            .create_session(
664                "Session 2",
665                SessionType::Conversation {
666                    purpose: "Test".to_string(),
667                },
668            )
669            .await
670            .unwrap();
671
672        assert_eq!(manager.session_count().await.unwrap(), 2);
673    }
674}