1use crate::compression::CompressionLevel;
4use crate::error::PersistenceError;
5use crate::error::Result;
6use crate::migration::MigrationManager;
7use crate::storage::SessionStorage;
8use crate::storage::StorageBackend;
9use crate::types::Checkpoint;
10use crate::types::CheckpointMetadata;
11use crate::types::ConversationContext;
12use crate::types::ConversationSnapshot;
13use crate::types::MessageMetadata;
14use crate::types::MessageSnapshot;
15use crate::types::SessionIndex;
16use crate::types::SessionMetadata;
17use crate::types::SessionState;
18use chrono::DateTime;
19use chrono::Utc;
20use crate::types::OperatingMode;
24use crate::types::ResponseItem;
25use dirs;
26use std::collections::HashMap;
27use std::path::PathBuf;
28use std::sync::Arc;
29use std::time::Duration;
30use tokio::sync::Mutex;
31use tokio::sync::RwLock;
32use tokio::time::interval;
33use tracing::error;
34use tracing::info;
35use uuid::Uuid;
36
37#[derive(Debug, Clone)]
39pub struct SessionManagerConfig {
40 pub storage_path: PathBuf,
42 pub auto_save_interval: Duration,
44 pub max_sessions: usize,
46 pub max_total_size: u64,
48 pub compression_level: CompressionLevel,
50 pub enable_auto_save: bool,
52 pub enable_mmap: bool,
54 pub max_checkpoints: usize,
56}
57
58impl Default for SessionManagerConfig {
59 fn default() -> Self {
60 let storage_path = dirs::home_dir()
61 .map(|p| p.join(".agcodex/history"))
62 .unwrap_or_else(|| PathBuf::from(".agcodex/history"));
63
64 Self {
65 storage_path,
66 auto_save_interval: Duration::from_secs(300), max_sessions: 100,
68 max_total_size: 1_073_741_824, compression_level: CompressionLevel::Balanced,
70 enable_auto_save: true,
71 enable_mmap: true,
72 max_checkpoints: 10,
73 }
74 }
75}
76
77pub struct SessionManager {
79 config: SessionManagerConfig,
80 storage: Arc<SessionStorage>,
81 index: Arc<RwLock<SessionIndex>>,
82 active_sessions: Arc<RwLock<HashMap<Uuid, ActiveSession>>>,
83 auto_save_handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
84}
85
86struct ActiveSession {
88 metadata: SessionMetadata,
89 conversation: ConversationSnapshot,
90 state: SessionState,
91 dirty: bool,
92 last_saved: DateTime<Utc>,
93}
94
95impl SessionManager {
96 pub async fn new(config: SessionManagerConfig) -> Result<Self> {
98 let storage = Arc::new(SessionStorage::new(
99 config.storage_path.clone(),
100 config.compression_level,
101 )?);
102
103 let index = match storage.load_index().await {
105 Ok(idx) => idx,
106 Err(_) => {
107 info!("Creating new session index");
108 SessionIndex::new()
109 }
110 };
111
112 let manager = Self {
113 config: config.clone(),
114 storage: storage.clone(),
115 index: Arc::new(RwLock::new(index)),
116 active_sessions: Arc::new(RwLock::new(HashMap::new())),
117 auto_save_handle: Arc::new(Mutex::new(None)),
118 };
119
120 let migration_manager = MigrationManager::new(config.storage_path.clone());
122 if let Some(plan) = migration_manager.check_migration_needed()? {
123 info!("Migration needed: {:?}", plan);
124 match migration_manager.migrate(plan).await {
125 Ok(report) => {
126 info!("Migration completed: {:?}", report);
127 if let Ok(idx) = storage.load_index().await {
129 *manager.index.write().await = idx;
130 }
131 }
132 Err(e) => {
133 error!("Migration failed: {}", e);
134 return Err(e);
135 }
136 }
137 }
138
139 if config.enable_auto_save {
141 manager.start_auto_save().await;
142 }
143
144 Ok(manager)
145 }
146
147 async fn start_auto_save(&self) {
149 let storage = self.storage.clone();
150 let active_sessions = self.active_sessions.clone();
151 let index = self.index.clone();
152 let interval_duration = self.config.auto_save_interval;
153
154 let handle = tokio::spawn(async move {
155 let mut ticker = interval(interval_duration);
156 ticker.tick().await; loop {
159 ticker.tick().await;
160
161 let sessions = active_sessions.read().await;
163 for (id, session) in sessions.iter() {
164 if session.dirty {
165 if let Err(e) = storage
166 .save_session(
167 *id,
168 &session.metadata,
169 &session.conversation,
170 &session.state,
171 )
172 .await
173 {
174 error!("Auto-save failed for session {}: {}", id, e);
175 } else {
176 info!("Auto-saved session {}", id);
177 }
178 }
179 }
180
181 if let Err(e) = storage.save_index(&*index.read().await).await {
183 error!("Failed to save index: {}", e);
184 }
185 }
186 });
187
188 *self.auto_save_handle.lock().await = Some(handle);
189 }
190
191 pub async fn create_session(
193 &self,
194 title: String,
195 model: String,
196 mode: OperatingMode,
197 ) -> Result<Uuid> {
198 let id = Uuid::new_v4();
199 let now = Utc::now();
200
201 let metadata = SessionMetadata {
202 id,
203 title,
204 created_at: now,
205 updated_at: now,
206 last_accessed: now,
207 message_count: 0,
208 turn_count: 0,
209 current_mode: mode,
210 model,
211 tags: Vec::new(),
212 is_favorite: false,
213 file_size: 0,
214 compression_ratio: 0.0,
215 format_version: crate::FORMAT_VERSION,
216 checkpoints: Vec::new(),
217 };
218
219 let conversation = ConversationSnapshot {
220 id,
221 messages: Vec::new(),
222 context: ConversationContext {
223 working_directory: std::env::current_dir().unwrap_or_default(),
224 environment_variables: HashMap::new(),
225 open_files: Vec::new(),
226 ast_index_state: None,
227 embedding_cache: None,
228 },
229 mode_history: vec![(mode, now)],
230 };
231
232 let state = SessionState {
233 cursor_position: 0,
234 scroll_offset: 0,
235 selected_message: None,
236 expanded_messages: Vec::new(),
237 active_panel: "main".to_string(),
238 panel_sizes: HashMap::new(),
239 search_query: None,
240 filter_settings: Default::default(),
241 };
242
243 self.active_sessions.write().await.insert(
245 id,
246 ActiveSession {
247 metadata: metadata.clone(),
248 conversation,
249 state,
250 dirty: true,
251 last_saved: now,
252 },
253 );
254
255 self.index.write().await.add_session(metadata);
257
258 info!("Created new session: {}", id);
259 Ok(id)
260 }
261
262 pub async fn load_session(&self, id: Uuid) -> Result<()> {
264 if self.active_sessions.read().await.contains_key(&id) {
266 return Ok(());
267 }
268
269 let (mut metadata, conversation, state) = self.storage.load_session(id).await?;
271
272 metadata.last_accessed = Utc::now();
274
275 self.active_sessions.write().await.insert(
277 id,
278 ActiveSession {
279 metadata: metadata.clone(),
280 conversation,
281 state,
282 dirty: false,
283 last_saved: Utc::now(),
284 },
285 );
286
287 self.index.write().await.add_session(metadata);
289
290 info!("Loaded session: {}", id);
291 Ok(())
292 }
293
294 pub async fn save_session(&self, id: Uuid) -> Result<()> {
296 let mut sessions = self.active_sessions.write().await;
297
298 let session = sessions
299 .get_mut(&id)
300 .ok_or(PersistenceError::SessionNotFound(id))?;
301
302 session.metadata.updated_at = Utc::now();
304 session.metadata.message_count = session.conversation.messages.len();
305
306 let uncompressed_size =
308 bincode::serde::encode_to_vec(&session.conversation, bincode::config::standard())?
309 .len();
310 session.metadata.file_size = uncompressed_size as u64;
311 session.metadata.compression_ratio = 0.7; self.storage
315 .save_session(id, &session.metadata, &session.conversation, &session.state)
316 .await?;
317
318 session.dirty = false;
320 session.last_saved = Utc::now();
321
322 self.index
324 .write()
325 .await
326 .add_session(session.metadata.clone());
327
328 info!("Saved session: {}", id);
329 Ok(())
330 }
331
332 pub async fn add_message(
334 &self,
335 session_id: Uuid,
336 item: ResponseItem,
337 metadata: Option<MessageMetadata>,
338 ) -> Result<()> {
339 let mut sessions = self.active_sessions.write().await;
340
341 let session = sessions
342 .get_mut(&session_id)
343 .ok_or(PersistenceError::SessionNotFound(session_id))?;
344
345 let message = MessageSnapshot {
346 item,
347 timestamp: Utc::now(),
348 turn_index: session.conversation.messages.len(),
349 metadata: metadata.unwrap_or_default(),
350 };
351
352 session.conversation.messages.push(message);
353 session.metadata.message_count += 1;
354 session.metadata.updated_at = Utc::now();
355 session.dirty = true;
356
357 Ok(())
358 }
359
360 pub async fn create_checkpoint(
362 &self,
363 session_id: Uuid,
364 name: String,
365 description: Option<String>,
366 ) -> Result<Uuid> {
367 let mut sessions = self.active_sessions.write().await;
368
369 let session = sessions
370 .get_mut(&session_id)
371 .ok_or(PersistenceError::SessionNotFound(session_id))?;
372
373 let checkpoint_id = Uuid::new_v4();
374 let checkpoint_metadata = CheckpointMetadata {
375 id: checkpoint_id,
376 name,
377 created_at: Utc::now(),
378 message_index: session.conversation.messages.len(),
379 description,
380 };
381
382 session
384 .metadata
385 .checkpoints
386 .push(checkpoint_metadata.clone());
387
388 if session.metadata.checkpoints.len() > self.config.max_checkpoints {
390 session.metadata.checkpoints.remove(0);
391 }
392
393 session.dirty = true;
394
395 let checkpoint = Checkpoint {
397 metadata: checkpoint_metadata,
398 conversation: session.conversation.clone(),
399 state: session.state.clone(),
400 };
401
402 let checkpoint_path = self
404 .config
405 .storage_path
406 .join("checkpoints")
407 .join(format!("{}_{}.ckpt", session_id, checkpoint_id));
408
409 tokio::fs::create_dir_all(checkpoint_path.parent().unwrap()).await?;
410
411 let checkpoint_bytes =
412 bincode::serde::encode_to_vec(&checkpoint, bincode::config::standard())?;
413 let compressed = zstd::encode_all(&checkpoint_bytes[..], 3)
414 .map_err(|e| PersistenceError::Compression(e.to_string()))?;
415
416 tokio::fs::write(&checkpoint_path, &compressed).await?;
417
418 info!(
419 "Created checkpoint {} for session {}",
420 checkpoint_id, session_id
421 );
422 Ok(checkpoint_id)
423 }
424
425 pub async fn restore_checkpoint(&self, session_id: Uuid, checkpoint_id: Uuid) -> Result<()> {
427 let checkpoint_path = self
428 .config
429 .storage_path
430 .join("checkpoints")
431 .join(format!("{}_{}.ckpt", session_id, checkpoint_id));
432
433 if !checkpoint_path.exists() {
434 return Err(PersistenceError::InvalidCheckpoint(format!(
435 "Checkpoint {} not found",
436 checkpoint_id
437 )));
438 }
439
440 let compressed = tokio::fs::read(&checkpoint_path).await?;
442 let checkpoint_bytes = zstd::decode_all(&compressed[..])
443 .map_err(|e| PersistenceError::Compression(e.to_string()))?;
444 let (checkpoint, _): (Checkpoint, _) =
445 bincode::serde::decode_from_slice(&checkpoint_bytes, bincode::config::standard())?;
446
447 let mut sessions = self.active_sessions.write().await;
449
450 if let Some(session) = sessions.get_mut(&session_id) {
451 session.conversation = checkpoint.conversation;
452 session.state = checkpoint.state;
453 session.dirty = true;
454 info!(
455 "Restored checkpoint {} for session {}",
456 checkpoint_id, session_id
457 );
458 } else {
459 return Err(PersistenceError::SessionNotFound(session_id));
460 }
461
462 Ok(())
463 }
464
465 pub async fn delete_session(&self, id: Uuid) -> Result<()> {
467 self.active_sessions.write().await.remove(&id);
469
470 self.index.write().await.remove_session(&id);
472
473 self.storage.delete_session(id).await?;
475
476 let checkpoint_dir = self.config.storage_path.join("checkpoints");
478 if checkpoint_dir.exists() {
479 let pattern = format!("{}_", id);
480 let mut entries = tokio::fs::read_dir(&checkpoint_dir).await?;
481
482 while let Some(entry) = entries.next_entry().await? {
483 let path = entry.path();
484 if let Some(name) = path.file_name()
485 && name.to_string_lossy().starts_with(&pattern)
486 {
487 tokio::fs::remove_file(&path).await?;
488 }
489 }
490 }
491
492 info!("Deleted session: {}", id);
493 Ok(())
494 }
495
496 pub async fn list_sessions(&self) -> Result<Vec<SessionMetadata>> {
498 let sessions = self.storage.list_sessions().await?;
499 Ok(sessions)
500 }
501
502 pub async fn search_sessions(&self, query: &str) -> Vec<SessionMetadata> {
504 let index = self.index.read().await;
505 index.search(query).into_iter().cloned().collect()
506 }
507
508 pub async fn get_session_metadata(&self, id: Uuid) -> Result<SessionMetadata> {
510 if let Some(session) = self.active_sessions.read().await.get(&id) {
512 return Ok(session.metadata.clone());
513 }
514
515 if let Some(metadata) = self.index.read().await.sessions.get(&id) {
517 return Ok(metadata.clone());
518 }
519
520 Err(PersistenceError::SessionNotFound(id))
521 }
522
523 pub async fn update_session_state(&self, session_id: Uuid, state: SessionState) -> Result<()> {
525 let mut sessions = self.active_sessions.write().await;
526
527 let session = sessions
528 .get_mut(&session_id)
529 .ok_or(PersistenceError::SessionNotFound(session_id))?;
530
531 session.state = state;
532 session.dirty = true;
533
534 Ok(())
535 }
536
537 pub async fn switch_mode(&self, session_id: Uuid, new_mode: OperatingMode) -> Result<()> {
539 let mut sessions = self.active_sessions.write().await;
540
541 let session = sessions
542 .get_mut(&session_id)
543 .ok_or(PersistenceError::SessionNotFound(session_id))?;
544
545 session.metadata.current_mode = new_mode;
546 session
547 .conversation
548 .mode_history
549 .push((new_mode, Utc::now()));
550 session.dirty = true;
551
552 Ok(())
553 }
554
555 pub async fn cleanup_old_sessions(&self) -> Result<()> {
557 let index = self.index.read().await;
558
559 if index.sessions.len() <= self.config.max_sessions
561 && index.total_size_bytes <= self.config.max_total_size
562 {
563 return Ok(());
564 }
565
566 let mut sessions: Vec<_> = index.sessions.values().collect();
568 sessions.sort_by(|a, b| a.last_accessed.cmp(&b.last_accessed));
569
570 let to_remove = if index.sessions.len() > self.config.max_sessions {
572 index.sessions.len() - self.config.max_sessions
573 } else {
574 0
575 };
576
577 for metadata in sessions.iter().take(to_remove) {
579 if !metadata.is_favorite {
580 self.delete_session(metadata.id).await?;
581 info!("Cleaned up old session: {}", metadata.id);
582 }
583 }
584
585 Ok(())
586 }
587
588 pub async fn shutdown(&self) -> Result<()> {
590 info!("Shutting down SessionManager");
591
592 if let Some(handle) = self.auto_save_handle.lock().await.take() {
594 handle.abort();
595 }
596
597 let sessions = self.active_sessions.read().await;
599 for (id, session) in sessions.iter() {
600 if session.dirty {
601 self.storage
602 .save_session(
603 *id,
604 &session.metadata,
605 &session.conversation,
606 &session.state,
607 )
608 .await?;
609 }
610 }
611
612 self.storage.save_index(&*self.index.read().await).await?;
614
615 info!("SessionManager shutdown complete");
616 Ok(())
617 }
618}
619
620impl Drop for SessionManager {
621 fn drop(&mut self) {
622 let storage = self.storage.clone();
624 let sessions = self.active_sessions.clone();
625 let index = self.index.clone();
626
627 tokio::spawn(async move {
628 let sessions = sessions.read().await;
629 for (id, session) in sessions.iter() {
630 if session.dirty {
631 let _ = storage
632 .save_session(
633 *id,
634 &session.metadata,
635 &session.conversation,
636 &session.state,
637 )
638 .await;
639 }
640 }
641 let _ = storage.save_index(&*index.read().await).await;
642 });
643 }
644}