agcodex_persistence/
session_manager.rs

1//! High-level session management with auto-save and checkpointing
2
3use crate::compression::CompressionLevel;
4use crate::error::PersistenceError;
5use crate::error::Result;
6use crate::migration::MigrationManager;
7use crate::storage::SessionStorage;
8use crate::storage::StorageBackend;
9use crate::types::Checkpoint;
10use crate::types::CheckpointMetadata;
11use crate::types::ConversationContext;
12use crate::types::ConversationSnapshot;
13use crate::types::MessageMetadata;
14use crate::types::MessageSnapshot;
15use crate::types::SessionIndex;
16use crate::types::SessionMetadata;
17use crate::types::SessionState;
18use chrono::DateTime;
19use chrono::Utc;
20// Temporarily use local types until core is fixed
21// use agcodex_core::models::ResponseItem;
22// use agcodex_core::modes::OperatingMode;
23use crate::types::OperatingMode;
24use crate::types::ResponseItem;
25use dirs;
26use std::collections::HashMap;
27use std::path::PathBuf;
28use std::sync::Arc;
29use std::time::Duration;
30use tokio::sync::Mutex;
31use tokio::sync::RwLock;
32use tokio::time::interval;
33use tracing::error;
34use tracing::info;
35use uuid::Uuid;
36
37/// Configuration for SessionManager
38#[derive(Debug, Clone)]
39pub struct SessionManagerConfig {
40    /// Base path for storage (defaults to ~/.agcodex/history/)
41    pub storage_path: PathBuf,
42    /// Auto-save interval (defaults to 5 minutes)
43    pub auto_save_interval: Duration,
44    /// Maximum number of sessions to keep (defaults to 100)
45    pub max_sessions: usize,
46    /// Maximum size of all sessions in bytes (defaults to 1GB)
47    pub max_total_size: u64,
48    /// Compression level (defaults to Balanced)
49    pub compression_level: CompressionLevel,
50    /// Enable auto-save (defaults to true)
51    pub enable_auto_save: bool,
52    /// Enable memory mapping for metadata (defaults to true)
53    pub enable_mmap: bool,
54    /// Maximum checkpoints per session (defaults to 10)
55    pub max_checkpoints: usize,
56}
57
58impl Default for SessionManagerConfig {
59    fn default() -> Self {
60        let storage_path = dirs::home_dir()
61            .map(|p| p.join(".agcodex/history"))
62            .unwrap_or_else(|| PathBuf::from(".agcodex/history"));
63
64        Self {
65            storage_path,
66            auto_save_interval: Duration::from_secs(300), // 5 minutes
67            max_sessions: 100,
68            max_total_size: 1_073_741_824, // 1GB
69            compression_level: CompressionLevel::Balanced,
70            enable_auto_save: true,
71            enable_mmap: true,
72            max_checkpoints: 10,
73        }
74    }
75}
76
77/// Main session manager for AGCodex
78pub struct SessionManager {
79    config: SessionManagerConfig,
80    storage: Arc<SessionStorage>,
81    index: Arc<RwLock<SessionIndex>>,
82    active_sessions: Arc<RwLock<HashMap<Uuid, ActiveSession>>>,
83    auto_save_handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
84}
85
86/// Active session being tracked
87struct ActiveSession {
88    metadata: SessionMetadata,
89    conversation: ConversationSnapshot,
90    state: SessionState,
91    dirty: bool,
92    last_saved: DateTime<Utc>,
93}
94
95impl SessionManager {
96    /// Create a new session manager
97    pub async fn new(config: SessionManagerConfig) -> Result<Self> {
98        let storage = Arc::new(SessionStorage::new(
99            config.storage_path.clone(),
100            config.compression_level,
101        )?);
102
103        // Load or create index
104        let index = match storage.load_index().await {
105            Ok(idx) => idx,
106            Err(_) => {
107                info!("Creating new session index");
108                SessionIndex::new()
109            }
110        };
111
112        let manager = Self {
113            config: config.clone(),
114            storage: storage.clone(),
115            index: Arc::new(RwLock::new(index)),
116            active_sessions: Arc::new(RwLock::new(HashMap::new())),
117            auto_save_handle: Arc::new(Mutex::new(None)),
118        };
119
120        // Check for needed migrations
121        let migration_manager = MigrationManager::new(config.storage_path.clone());
122        if let Some(plan) = migration_manager.check_migration_needed()? {
123            info!("Migration needed: {:?}", plan);
124            match migration_manager.migrate(plan).await {
125                Ok(report) => {
126                    info!("Migration completed: {:?}", report);
127                    // Reload index after migration
128                    if let Ok(idx) = storage.load_index().await {
129                        *manager.index.write().await = idx;
130                    }
131                }
132                Err(e) => {
133                    error!("Migration failed: {}", e);
134                    return Err(e);
135                }
136            }
137        }
138
139        // Start auto-save task if enabled
140        if config.enable_auto_save {
141            manager.start_auto_save().await;
142        }
143
144        Ok(manager)
145    }
146
147    /// Start auto-save task
148    async fn start_auto_save(&self) {
149        let storage = self.storage.clone();
150        let active_sessions = self.active_sessions.clone();
151        let index = self.index.clone();
152        let interval_duration = self.config.auto_save_interval;
153
154        let handle = tokio::spawn(async move {
155            let mut ticker = interval(interval_duration);
156            ticker.tick().await; // Skip first immediate tick
157
158            loop {
159                ticker.tick().await;
160
161                // Save all dirty sessions
162                let sessions = active_sessions.read().await;
163                for (id, session) in sessions.iter() {
164                    if session.dirty {
165                        if let Err(e) = storage
166                            .save_session(
167                                *id,
168                                &session.metadata,
169                                &session.conversation,
170                                &session.state,
171                            )
172                            .await
173                        {
174                            error!("Auto-save failed for session {}: {}", id, e);
175                        } else {
176                            info!("Auto-saved session {}", id);
177                        }
178                    }
179                }
180
181                // Save index
182                if let Err(e) = storage.save_index(&*index.read().await).await {
183                    error!("Failed to save index: {}", e);
184                }
185            }
186        });
187
188        *self.auto_save_handle.lock().await = Some(handle);
189    }
190
191    /// Create a new session
192    pub async fn create_session(
193        &self,
194        title: String,
195        model: String,
196        mode: OperatingMode,
197    ) -> Result<Uuid> {
198        let id = Uuid::new_v4();
199        let now = Utc::now();
200
201        let metadata = SessionMetadata {
202            id,
203            title,
204            created_at: now,
205            updated_at: now,
206            last_accessed: now,
207            message_count: 0,
208            turn_count: 0,
209            current_mode: mode,
210            model,
211            tags: Vec::new(),
212            is_favorite: false,
213            file_size: 0,
214            compression_ratio: 0.0,
215            format_version: crate::FORMAT_VERSION,
216            checkpoints: Vec::new(),
217        };
218
219        let conversation = ConversationSnapshot {
220            id,
221            messages: Vec::new(),
222            context: ConversationContext {
223                working_directory: std::env::current_dir().unwrap_or_default(),
224                environment_variables: HashMap::new(),
225                open_files: Vec::new(),
226                ast_index_state: None,
227                embedding_cache: None,
228            },
229            mode_history: vec![(mode, now)],
230        };
231
232        let state = SessionState {
233            cursor_position: 0,
234            scroll_offset: 0,
235            selected_message: None,
236            expanded_messages: Vec::new(),
237            active_panel: "main".to_string(),
238            panel_sizes: HashMap::new(),
239            search_query: None,
240            filter_settings: Default::default(),
241        };
242
243        // Add to active sessions
244        self.active_sessions.write().await.insert(
245            id,
246            ActiveSession {
247                metadata: metadata.clone(),
248                conversation,
249                state,
250                dirty: true,
251                last_saved: now,
252            },
253        );
254
255        // Update index
256        self.index.write().await.add_session(metadata);
257
258        info!("Created new session: {}", id);
259        Ok(id)
260    }
261
262    /// Load a session from storage
263    pub async fn load_session(&self, id: Uuid) -> Result<()> {
264        // Check if already loaded
265        if self.active_sessions.read().await.contains_key(&id) {
266            return Ok(());
267        }
268
269        // Load from storage
270        let (mut metadata, conversation, state) = self.storage.load_session(id).await?;
271
272        // Update last accessed time
273        metadata.last_accessed = Utc::now();
274
275        // Add to active sessions
276        self.active_sessions.write().await.insert(
277            id,
278            ActiveSession {
279                metadata: metadata.clone(),
280                conversation,
281                state,
282                dirty: false,
283                last_saved: Utc::now(),
284            },
285        );
286
287        // Update index
288        self.index.write().await.add_session(metadata);
289
290        info!("Loaded session: {}", id);
291        Ok(())
292    }
293
294    /// Save a session to storage
295    pub async fn save_session(&self, id: Uuid) -> Result<()> {
296        let mut sessions = self.active_sessions.write().await;
297
298        let session = sessions
299            .get_mut(&id)
300            .ok_or(PersistenceError::SessionNotFound(id))?;
301
302        // Update metadata
303        session.metadata.updated_at = Utc::now();
304        session.metadata.message_count = session.conversation.messages.len();
305
306        // Calculate file size and compression ratio (approximation)
307        let uncompressed_size =
308            bincode::serde::encode_to_vec(&session.conversation, bincode::config::standard())?
309                .len();
310        session.metadata.file_size = uncompressed_size as u64;
311        session.metadata.compression_ratio = 0.7; // Estimate
312
313        // Save to storage
314        self.storage
315            .save_session(id, &session.metadata, &session.conversation, &session.state)
316            .await?;
317
318        // Mark as clean
319        session.dirty = false;
320        session.last_saved = Utc::now();
321
322        // Update index
323        self.index
324            .write()
325            .await
326            .add_session(session.metadata.clone());
327
328        info!("Saved session: {}", id);
329        Ok(())
330    }
331
332    /// Add a message to a session
333    pub async fn add_message(
334        &self,
335        session_id: Uuid,
336        item: ResponseItem,
337        metadata: Option<MessageMetadata>,
338    ) -> Result<()> {
339        let mut sessions = self.active_sessions.write().await;
340
341        let session = sessions
342            .get_mut(&session_id)
343            .ok_or(PersistenceError::SessionNotFound(session_id))?;
344
345        let message = MessageSnapshot {
346            item,
347            timestamp: Utc::now(),
348            turn_index: session.conversation.messages.len(),
349            metadata: metadata.unwrap_or_default(),
350        };
351
352        session.conversation.messages.push(message);
353        session.metadata.message_count += 1;
354        session.metadata.updated_at = Utc::now();
355        session.dirty = true;
356
357        Ok(())
358    }
359
360    /// Create a checkpoint
361    pub async fn create_checkpoint(
362        &self,
363        session_id: Uuid,
364        name: String,
365        description: Option<String>,
366    ) -> Result<Uuid> {
367        let mut sessions = self.active_sessions.write().await;
368
369        let session = sessions
370            .get_mut(&session_id)
371            .ok_or(PersistenceError::SessionNotFound(session_id))?;
372
373        let checkpoint_id = Uuid::new_v4();
374        let checkpoint_metadata = CheckpointMetadata {
375            id: checkpoint_id,
376            name,
377            created_at: Utc::now(),
378            message_index: session.conversation.messages.len(),
379            description,
380        };
381
382        // Add to session metadata
383        session
384            .metadata
385            .checkpoints
386            .push(checkpoint_metadata.clone());
387
388        // Limit number of checkpoints
389        if session.metadata.checkpoints.len() > self.config.max_checkpoints {
390            session.metadata.checkpoints.remove(0);
391        }
392
393        session.dirty = true;
394
395        // Save checkpoint data
396        let checkpoint = Checkpoint {
397            metadata: checkpoint_metadata,
398            conversation: session.conversation.clone(),
399            state: session.state.clone(),
400        };
401
402        // Save checkpoint to disk
403        let checkpoint_path = self
404            .config
405            .storage_path
406            .join("checkpoints")
407            .join(format!("{}_{}.ckpt", session_id, checkpoint_id));
408
409        tokio::fs::create_dir_all(checkpoint_path.parent().unwrap()).await?;
410
411        let checkpoint_bytes =
412            bincode::serde::encode_to_vec(&checkpoint, bincode::config::standard())?;
413        let compressed = zstd::encode_all(&checkpoint_bytes[..], 3)
414            .map_err(|e| PersistenceError::Compression(e.to_string()))?;
415
416        tokio::fs::write(&checkpoint_path, &compressed).await?;
417
418        info!(
419            "Created checkpoint {} for session {}",
420            checkpoint_id, session_id
421        );
422        Ok(checkpoint_id)
423    }
424
425    /// Restore from a checkpoint
426    pub async fn restore_checkpoint(&self, session_id: Uuid, checkpoint_id: Uuid) -> Result<()> {
427        let checkpoint_path = self
428            .config
429            .storage_path
430            .join("checkpoints")
431            .join(format!("{}_{}.ckpt", session_id, checkpoint_id));
432
433        if !checkpoint_path.exists() {
434            return Err(PersistenceError::InvalidCheckpoint(format!(
435                "Checkpoint {} not found",
436                checkpoint_id
437            )));
438        }
439
440        // Load checkpoint
441        let compressed = tokio::fs::read(&checkpoint_path).await?;
442        let checkpoint_bytes = zstd::decode_all(&compressed[..])
443            .map_err(|e| PersistenceError::Compression(e.to_string()))?;
444        let (checkpoint, _): (Checkpoint, _) =
445            bincode::serde::decode_from_slice(&checkpoint_bytes, bincode::config::standard())?;
446
447        // Restore to active session
448        let mut sessions = self.active_sessions.write().await;
449
450        if let Some(session) = sessions.get_mut(&session_id) {
451            session.conversation = checkpoint.conversation;
452            session.state = checkpoint.state;
453            session.dirty = true;
454            info!(
455                "Restored checkpoint {} for session {}",
456                checkpoint_id, session_id
457            );
458        } else {
459            return Err(PersistenceError::SessionNotFound(session_id));
460        }
461
462        Ok(())
463    }
464
465    /// Delete a session
466    pub async fn delete_session(&self, id: Uuid) -> Result<()> {
467        // Remove from active sessions
468        self.active_sessions.write().await.remove(&id);
469
470        // Remove from index
471        self.index.write().await.remove_session(&id);
472
473        // Delete from storage
474        self.storage.delete_session(id).await?;
475
476        // Delete checkpoints
477        let checkpoint_dir = self.config.storage_path.join("checkpoints");
478        if checkpoint_dir.exists() {
479            let pattern = format!("{}_", id);
480            let mut entries = tokio::fs::read_dir(&checkpoint_dir).await?;
481
482            while let Some(entry) = entries.next_entry().await? {
483                let path = entry.path();
484                if let Some(name) = path.file_name()
485                    && name.to_string_lossy().starts_with(&pattern)
486                {
487                    tokio::fs::remove_file(&path).await?;
488                }
489            }
490        }
491
492        info!("Deleted session: {}", id);
493        Ok(())
494    }
495
496    /// List all sessions
497    pub async fn list_sessions(&self) -> Result<Vec<SessionMetadata>> {
498        let sessions = self.storage.list_sessions().await?;
499        Ok(sessions)
500    }
501
502    /// Search sessions
503    pub async fn search_sessions(&self, query: &str) -> Vec<SessionMetadata> {
504        let index = self.index.read().await;
505        index.search(query).into_iter().cloned().collect()
506    }
507
508    /// Get session metadata
509    pub async fn get_session_metadata(&self, id: Uuid) -> Result<SessionMetadata> {
510        // Check active sessions first
511        if let Some(session) = self.active_sessions.read().await.get(&id) {
512            return Ok(session.metadata.clone());
513        }
514
515        // Check index
516        if let Some(metadata) = self.index.read().await.sessions.get(&id) {
517            return Ok(metadata.clone());
518        }
519
520        Err(PersistenceError::SessionNotFound(id))
521    }
522
523    /// Update session state (cursor, scroll, etc.)
524    pub async fn update_session_state(&self, session_id: Uuid, state: SessionState) -> Result<()> {
525        let mut sessions = self.active_sessions.write().await;
526
527        let session = sessions
528            .get_mut(&session_id)
529            .ok_or(PersistenceError::SessionNotFound(session_id))?;
530
531        session.state = state;
532        session.dirty = true;
533
534        Ok(())
535    }
536
537    /// Switch operating mode for a session
538    pub async fn switch_mode(&self, session_id: Uuid, new_mode: OperatingMode) -> Result<()> {
539        let mut sessions = self.active_sessions.write().await;
540
541        let session = sessions
542            .get_mut(&session_id)
543            .ok_or(PersistenceError::SessionNotFound(session_id))?;
544
545        session.metadata.current_mode = new_mode;
546        session
547            .conversation
548            .mode_history
549            .push((new_mode, Utc::now()));
550        session.dirty = true;
551
552        Ok(())
553    }
554
555    /// Clean up old sessions based on max_sessions and max_total_size
556    pub async fn cleanup_old_sessions(&self) -> Result<()> {
557        let index = self.index.read().await;
558
559        // Check if cleanup is needed
560        if index.sessions.len() <= self.config.max_sessions
561            && index.total_size_bytes <= self.config.max_total_size
562        {
563            return Ok(());
564        }
565
566        // Sort sessions by last accessed time
567        let mut sessions: Vec<_> = index.sessions.values().collect();
568        sessions.sort_by(|a, b| a.last_accessed.cmp(&b.last_accessed));
569
570        // Calculate how many to remove
571        let to_remove = if index.sessions.len() > self.config.max_sessions {
572            index.sessions.len() - self.config.max_sessions
573        } else {
574            0
575        };
576
577        // Remove oldest sessions
578        for metadata in sessions.iter().take(to_remove) {
579            if !metadata.is_favorite {
580                self.delete_session(metadata.id).await?;
581                info!("Cleaned up old session: {}", metadata.id);
582            }
583        }
584
585        Ok(())
586    }
587
588    /// Shutdown the session manager
589    pub async fn shutdown(&self) -> Result<()> {
590        info!("Shutting down SessionManager");
591
592        // Stop auto-save task
593        if let Some(handle) = self.auto_save_handle.lock().await.take() {
594            handle.abort();
595        }
596
597        // Save all dirty sessions
598        let sessions = self.active_sessions.read().await;
599        for (id, session) in sessions.iter() {
600            if session.dirty {
601                self.storage
602                    .save_session(
603                        *id,
604                        &session.metadata,
605                        &session.conversation,
606                        &session.state,
607                    )
608                    .await?;
609            }
610        }
611
612        // Save index
613        self.storage.save_index(&*self.index.read().await).await?;
614
615        info!("SessionManager shutdown complete");
616        Ok(())
617    }
618}
619
620impl Drop for SessionManager {
621    fn drop(&mut self) {
622        // Try to save on drop (best effort)
623        let storage = self.storage.clone();
624        let sessions = self.active_sessions.clone();
625        let index = self.index.clone();
626
627        tokio::spawn(async move {
628            let sessions = sessions.read().await;
629            for (id, session) in sessions.iter() {
630                if session.dirty {
631                    let _ = storage
632                        .save_session(
633                            *id,
634                            &session.metadata,
635                            &session.conversation,
636                            &session.state,
637                        )
638                        .await;
639                }
640            }
641            let _ = storage.save_index(&*index.read().await).await;
642        });
643    }
644}