1use crate::error::PersistenceError;
5use crate::error::Result;
6use crate::types::ConversationSnapshot;
7use crate::types::SessionIndex;
8use crate::types::SessionMetadata;
9use crate::types::SessionState;
10use chrono::DateTime;
11use chrono::Utc;
12use rmp_serde;
13use serde::Deserialize;
14use serde::Serialize;
15use std::collections::HashMap;
16use std::fs::File;
17use std::fs::{self};
18use std::io::BufReader;
19use std::io::BufWriter;
20use std::io::Read;
21use std::io::Write;
22use std::path::Path;
23use std::path::PathBuf;
24use std::sync::Arc;
25use tokio::sync::RwLock;
26use uuid::Uuid;
27use zstd;
28
29const AGCX_MAGIC: &[u8] = b"AGCX";
31
32const FORMAT_VERSION: u32 = 2;
34
35#[derive(Debug, Serialize, Deserialize)]
37struct SessionHeader {
38 magic: [u8; 4],
39 version: u32,
40 session_id: Uuid,
41 created_at: DateTime<Utc>,
42 compression_level: i32,
43}
44
45#[derive(Debug, Serialize, Deserialize)]
47struct SessionData {
48 metadata: SessionMetadata,
49 conversation: ConversationSnapshot,
50 state: SessionState,
51 checksum: Option<u32>,
52}
53
54#[derive(Debug, Clone)]
56pub struct SessionStoreConfig {
57 pub base_path: PathBuf,
59 pub compression_level: i32,
61 pub enable_mmap: bool,
63 pub max_indexed_sessions: usize,
65 pub auto_save_interval: u64,
67}
68
69impl Default for SessionStoreConfig {
70 fn default() -> Self {
71 let base_path = dirs::home_dir()
72 .map(|p| p.join(".agcodex/history"))
73 .unwrap_or_else(|| PathBuf::from(".agcodex/history"));
74
75 Self {
76 base_path,
77 compression_level: 3,
78 enable_mmap: true,
79 max_indexed_sessions: 1000,
80 auto_save_interval: 300, }
82 }
83}
84
85pub struct SessionStore {
87 config: SessionStoreConfig,
88 index: Arc<RwLock<SessionIndex>>,
89 metadata_cache: Arc<RwLock<HashMap<Uuid, SessionMetadata>>>,
90 dirty_sessions: Arc<RwLock<HashMap<Uuid, DateTime<Utc>>>>,
91}
92
93impl SessionStore {
94 pub async fn new(config: SessionStoreConfig) -> Result<Self> {
96 fs::create_dir_all(&config.base_path)?;
98 fs::create_dir_all(config.base_path.join("sessions"))?;
99 fs::create_dir_all(config.base_path.join("checkpoints"))?;
100 fs::create_dir_all(config.base_path.join("metadata"))?;
101
102 let index = Self::load_or_create_index(&config.base_path).await?;
104
105 Ok(Self {
106 config,
107 index: Arc::new(RwLock::new(index)),
108 metadata_cache: Arc::new(RwLock::new(HashMap::new())),
109 dirty_sessions: Arc::new(RwLock::new(HashMap::new())),
110 })
111 }
112
113 async fn load_or_create_index(base_path: &Path) -> Result<SessionIndex> {
115 let index_path = base_path.join("sessions.idx");
116
117 if index_path.exists() {
118 let file = File::open(&index_path)?;
119 let reader = BufReader::new(file);
120
121 match rmp_serde::from_read(reader) {
123 Ok(index) => Ok(index),
124 Err(_) => {
125 eprintln!("Warning: Session index corrupted, rebuilding...");
127 Self::rebuild_index(base_path).await
128 }
129 }
130 } else {
131 Ok(SessionIndex::new())
132 }
133 }
134
135 async fn rebuild_index(base_path: &Path) -> Result<SessionIndex> {
137 let mut index = SessionIndex::new();
138 let sessions_dir = base_path.join("sessions");
139
140 if sessions_dir.exists() {
141 for entry in fs::read_dir(sessions_dir)? {
142 let entry = entry?;
143 let path = entry.path();
144
145 if path.extension().and_then(|s| s.to_str()) == Some("agcx") {
146 if let Ok(metadata) = Self::load_metadata_from_file(&path).await {
148 index.add_session(metadata);
149 }
150 }
151 }
152 }
153
154 Ok(index)
155 }
156
157 async fn load_metadata_from_file(path: &Path) -> Result<SessionMetadata> {
159 let file = File::open(path)?;
160 let mut reader = BufReader::new(file);
161
162 let mut magic = [0u8; 4];
164 reader.read_exact(&mut magic)?;
165 if magic != AGCX_MAGIC {
166 return Err(PersistenceError::InvalidFormat(
167 "Invalid magic bytes".to_string(),
168 ));
169 }
170
171 let mut version_bytes = [0u8; 4];
172 reader.read_exact(&mut version_bytes)?;
173 let version = u32::from_le_bytes(version_bytes);
174
175 if version != FORMAT_VERSION {
176 return Err(PersistenceError::IncompatibleVersion {
177 expected: FORMAT_VERSION as u16,
178 actual: version as u16,
179 });
180 }
181
182 let mut len_bytes = [0u8; 8];
184 reader.read_exact(&mut len_bytes)?;
185 let compressed_len = u64::from_le_bytes(len_bytes) as usize;
186
187 let mut compressed_data = vec![0u8; compressed_len];
189 reader.read_exact(&mut compressed_data)?;
190
191 let decompressed = zstd::decode_all(&compressed_data[..])
193 .map_err(|e| PersistenceError::Compression(e.to_string()))?;
194
195 let session_data: SessionData = rmp_serde::from_slice(&decompressed)?;
197
198 Ok(session_data.metadata)
199 }
200
201 pub async fn save_session(
203 &self,
204 id: Uuid,
205 metadata: &SessionMetadata,
206 conversation: &ConversationSnapshot,
207 state: &SessionState,
208 ) -> Result<()> {
209 let session_path = self
210 .config
211 .base_path
212 .join("sessions")
213 .join(format!("{}.agcx", id));
214
215 let session_data = SessionData {
217 metadata: metadata.clone(),
218 conversation: conversation.clone(),
219 state: state.clone(),
220 checksum: None, };
222
223 let serialized = rmp_serde::to_vec(&session_data)?;
225
226 let compressed = zstd::encode_all(&serialized[..], self.config.compression_level)
228 .map_err(|e| PersistenceError::Compression(e.to_string()))?;
229
230 let compression_ratio = compressed.len() as f64 / serialized.len() as f64;
232
233 let file = File::create(&session_path)?;
235 let mut writer = BufWriter::new(file);
236
237 writer.write_all(AGCX_MAGIC)?;
239 writer.write_all(&FORMAT_VERSION.to_le_bytes())?;
240 writer.write_all(&(compressed.len() as u64).to_le_bytes())?;
241
242 writer.write_all(&compressed)?;
244 writer.flush()?;
245
246 let mut updated_metadata = metadata.clone();
248 updated_metadata.file_size = compressed.len() as u64;
249 updated_metadata.compression_ratio = compression_ratio as f32;
250
251 self.index
253 .write()
254 .await
255 .add_session(updated_metadata.clone());
256
257 self.metadata_cache
259 .write()
260 .await
261 .insert(id, updated_metadata);
262
263 self.dirty_sessions.write().await.remove(&id);
265
266 self.save_metadata_cache(id, metadata).await?;
268
269 Ok(())
270 }
271
272 async fn save_metadata_cache(&self, id: Uuid, metadata: &SessionMetadata) -> Result<()> {
274 let metadata_path = self
275 .config
276 .base_path
277 .join("metadata")
278 .join(format!("{}.meta", id));
279
280 let file = File::create(&metadata_path)?;
281 let mut writer = BufWriter::new(file);
282
283 bincode::serde::encode_into_std_write(metadata, &mut writer, bincode::config::standard())?;
285
286 Ok(())
287 }
288
289 pub async fn load_session(
291 &self,
292 id: Uuid,
293 ) -> Result<(SessionMetadata, ConversationSnapshot, SessionState)> {
294 let session_path = self
295 .config
296 .base_path
297 .join("sessions")
298 .join(format!("{}.agcx", id));
299
300 if !session_path.exists() {
301 return Err(PersistenceError::SessionNotFound(id));
302 }
303
304 let file = File::open(&session_path)?;
305 let mut reader = BufReader::new(file);
306
307 let mut magic = [0u8; 4];
309 reader.read_exact(&mut magic)?;
310 if magic != AGCX_MAGIC {
311 return Err(PersistenceError::InvalidFormat(
312 "Invalid magic bytes".to_string(),
313 ));
314 }
315
316 let mut version_bytes = [0u8; 4];
317 reader.read_exact(&mut version_bytes)?;
318 let version = u32::from_le_bytes(version_bytes);
319
320 if version != FORMAT_VERSION {
321 return Err(PersistenceError::IncompatibleVersion {
322 expected: FORMAT_VERSION as u16,
323 actual: version as u16,
324 });
325 }
326
327 let mut len_bytes = [0u8; 8];
329 reader.read_exact(&mut len_bytes)?;
330 let compressed_len = u64::from_le_bytes(len_bytes) as usize;
331
332 let mut compressed_data = vec![0u8; compressed_len];
334 reader.read_exact(&mut compressed_data)?;
335
336 let decompressed = zstd::decode_all(&compressed_data[..])
338 .map_err(|e| PersistenceError::Compression(e.to_string()))?;
339
340 let session_data: SessionData = rmp_serde::from_slice(&decompressed)?;
342
343 Ok((
346 session_data.metadata,
347 session_data.conversation,
348 session_data.state,
349 ))
350 }
351
352 pub async fn delete_session(&self, id: Uuid) -> Result<()> {
354 let session_path = self
356 .config
357 .base_path
358 .join("sessions")
359 .join(format!("{}.agcx", id));
360 if session_path.exists() {
361 fs::remove_file(session_path)?;
362 }
363
364 let metadata_path = self
366 .config
367 .base_path
368 .join("metadata")
369 .join(format!("{}.meta", id));
370 if metadata_path.exists() {
371 fs::remove_file(metadata_path)?;
372 }
373
374 let checkpoint_dir = self.config.base_path.join("checkpoints");
376 if checkpoint_dir.exists() {
377 let pattern = format!("{}_", id);
378 for entry in fs::read_dir(checkpoint_dir)? {
379 let entry = entry?;
380 let path = entry.path();
381 if let Some(name) = path.file_name()
382 && name.to_string_lossy().starts_with(&pattern)
383 {
384 fs::remove_file(path)?;
385 }
386 }
387 }
388
389 self.index.write().await.remove_session(&id);
391
392 self.metadata_cache.write().await.remove(&id);
394 self.dirty_sessions.write().await.remove(&id);
395
396 Ok(())
397 }
398
399 pub async fn list_sessions(&self) -> Result<Vec<SessionMetadata>> {
401 let index = self.index.read().await;
402 let mut sessions: Vec<SessionMetadata> = index.sessions.values().cloned().collect();
403
404 sessions.sort_by(|a, b| b.last_accessed.cmp(&a.last_accessed));
406
407 Ok(sessions)
408 }
409
410 pub async fn search_sessions(&self, query: &str) -> Result<Vec<SessionMetadata>> {
412 let index = self.index.read().await;
413 let results = index.search(query);
414 Ok(results.into_iter().cloned().collect())
415 }
416
417 pub async fn mark_dirty(&self, id: Uuid) {
419 self.dirty_sessions.write().await.insert(id, Utc::now());
420 }
421
422 pub async fn get_dirty_sessions(&self) -> Vec<Uuid> {
424 self.dirty_sessions.read().await.keys().cloned().collect()
425 }
426
427 pub async fn save_dirty_sessions(&self) -> Result<()> {
429 let dirty_ids = self.get_dirty_sessions().await;
430
431 for id in dirty_ids {
432 if self.metadata_cache.read().await.contains_key(&id) {
436 self.dirty_sessions.write().await.remove(&id);
437 }
438 }
439
440 self.save_index().await?;
442
443 Ok(())
444 }
445
446 pub async fn save_index(&self) -> Result<()> {
448 let index_path = self.config.base_path.join("sessions.idx");
449 let index = self.index.read().await;
450
451 let file = File::create(&index_path)?;
452 let mut writer = BufWriter::new(file);
453
454 rmp_serde::encode::write(&mut writer, &*index)?;
456
457 Ok(())
458 }
459
460 pub async fn create_checkpoint(
462 &self,
463 session_id: Uuid,
464 checkpoint_id: Uuid,
465 conversation: &ConversationSnapshot,
466 state: &SessionState,
467 ) -> Result<()> {
468 let checkpoint_path = self
469 .config
470 .base_path
471 .join("checkpoints")
472 .join(format!("{}_{}.ckpt", session_id, checkpoint_id));
473
474 let checkpoint_data = (conversation, state);
476 let serialized = rmp_serde::to_vec(&checkpoint_data)?;
477
478 let compressed = zstd::encode_all(&serialized[..], 6)
480 .map_err(|e| PersistenceError::Compression(e.to_string()))?;
481
482 let file = File::create(&checkpoint_path)?;
484 let mut writer = BufWriter::new(file);
485
486 writer.write_all(AGCX_MAGIC)?;
488 writer.write_all(&FORMAT_VERSION.to_le_bytes())?;
489 writer.write_all(&(compressed.len() as u64).to_le_bytes())?;
490
491 writer.write_all(&compressed)?;
493 writer.flush()?;
494
495 Ok(())
496 }
497
498 pub async fn load_checkpoint(
500 &self,
501 session_id: Uuid,
502 checkpoint_id: Uuid,
503 ) -> Result<(ConversationSnapshot, SessionState)> {
504 let checkpoint_path = self
505 .config
506 .base_path
507 .join("checkpoints")
508 .join(format!("{}_{}.ckpt", session_id, checkpoint_id));
509
510 if !checkpoint_path.exists() {
511 return Err(PersistenceError::InvalidCheckpoint(format!(
512 "Checkpoint {} not found",
513 checkpoint_id
514 )));
515 }
516
517 let file = File::open(&checkpoint_path)?;
518 let mut reader = BufReader::new(file);
519
520 let mut magic = [0u8; 4];
522 reader.read_exact(&mut magic)?;
523 if magic != AGCX_MAGIC {
524 return Err(PersistenceError::InvalidFormat(
525 "Invalid checkpoint magic bytes".to_string(),
526 ));
527 }
528
529 let mut version_bytes = [0u8; 4];
530 reader.read_exact(&mut version_bytes)?;
531 let version = u32::from_le_bytes(version_bytes);
532
533 if version != FORMAT_VERSION {
534 return Err(PersistenceError::IncompatibleVersion {
535 expected: FORMAT_VERSION as u16,
536 actual: version as u16,
537 });
538 }
539
540 let mut len_bytes = [0u8; 8];
542 reader.read_exact(&mut len_bytes)?;
543 let compressed_len = u64::from_le_bytes(len_bytes) as usize;
544
545 let mut compressed_data = vec![0u8; compressed_len];
546 reader.read_exact(&mut compressed_data)?;
547
548 let decompressed = zstd::decode_all(&compressed_data[..])
550 .map_err(|e| PersistenceError::Compression(e.to_string()))?;
551
552 let checkpoint_data: (ConversationSnapshot, SessionState) =
554 rmp_serde::from_slice(&decompressed)?;
555
556 Ok(checkpoint_data)
557 }
558
559 pub async fn get_statistics(&self) -> Result<StorageStatistics> {
561 let index = self.index.read().await;
562 let sessions_dir = self.config.base_path.join("sessions");
563 let checkpoints_dir = self.config.base_path.join("checkpoints");
564
565 let mut total_size = 0u64;
566 let mut session_count = 0usize;
567 let mut checkpoint_count = 0usize;
568
569 if sessions_dir.exists() {
571 for entry in fs::read_dir(sessions_dir)? {
572 let entry = entry?;
573 let metadata = entry.metadata()?;
574 if metadata.is_file() {
575 total_size += metadata.len();
576 session_count += 1;
577 }
578 }
579 }
580
581 if checkpoints_dir.exists() {
583 for entry in fs::read_dir(checkpoints_dir)? {
584 let entry = entry?;
585 if entry.metadata()?.is_file() {
586 checkpoint_count += 1;
587 }
588 }
589 }
590
591 Ok(StorageStatistics {
592 total_sessions: session_count,
593 total_checkpoints: checkpoint_count,
594 total_size_bytes: total_size,
595 indexed_sessions: index.sessions.len(),
596 oldest_session: index
597 .sessions
598 .values()
599 .min_by_key(|s| s.created_at)
600 .map(|s| s.created_at),
601 newest_session: index
602 .sessions
603 .values()
604 .max_by_key(|s| s.created_at)
605 .map(|s| s.created_at),
606 })
607 }
608
609 pub async fn cleanup_old_sessions(
611 &self,
612 max_age_days: Option<i64>,
613 max_count: Option<usize>,
614 ) -> Result<Vec<Uuid>> {
615 let mut deleted = Vec::new();
616 let index = self.index.read().await.clone();
617
618 let mut sessions: Vec<_> = index.sessions.values().collect();
620 sessions.sort_by(|a, b| a.last_accessed.cmp(&b.last_accessed));
621
622 if let Some(max_age) = max_age_days {
624 let cutoff = Utc::now() - chrono::Duration::days(max_age);
625 for session in &sessions {
626 if session.last_accessed < cutoff && !session.is_favorite {
627 self.delete_session(session.id).await?;
628 deleted.push(session.id);
629 }
630 }
631 }
632
633 if let Some(max) = max_count {
635 let remaining = sessions.len().saturating_sub(deleted.len());
636 if remaining > max {
637 let to_delete = remaining - max;
638 for session in sessions.iter().take(to_delete) {
639 if !deleted.contains(&session.id) && !session.is_favorite {
640 self.delete_session(session.id).await?;
641 deleted.push(session.id);
642 }
643 }
644 }
645 }
646
647 Ok(deleted)
648 }
649}
650
651#[derive(Debug, Clone)]
653pub struct StorageStatistics {
654 pub total_sessions: usize,
655 pub total_checkpoints: usize,
656 pub total_size_bytes: u64,
657 pub indexed_sessions: usize,
658 pub oldest_session: Option<DateTime<Utc>>,
659 pub newest_session: Option<DateTime<Utc>>,
660}
661
662#[cfg(test)]
663mod tests {
664 use super::*;
665 use crate::types::ConversationContext;
666 use crate::types::OperatingMode;
667 use tempfile::TempDir;
668
669 #[tokio::test]
670 async fn test_session_store_creation() {
671 let temp_dir = TempDir::new().unwrap();
672 let config = SessionStoreConfig {
673 base_path: temp_dir.path().to_path_buf(),
674 ..Default::default()
675 };
676
677 let _store = SessionStore::new(config).await.unwrap();
678 assert!(temp_dir.path().join("sessions").exists());
679 assert!(temp_dir.path().join("checkpoints").exists());
680 assert!(temp_dir.path().join("metadata").exists());
681 }
682
683 #[tokio::test]
684 async fn test_save_and_load_session() {
685 let temp_dir = TempDir::new().unwrap();
686 let config = SessionStoreConfig {
687 base_path: temp_dir.path().to_path_buf(),
688 ..Default::default()
689 };
690
691 let store = SessionStore::new(config).await.unwrap();
692
693 let id = Uuid::new_v4();
695 let metadata = SessionMetadata {
696 id,
697 title: "Test Session".to_string(),
698 created_at: Utc::now(),
699 updated_at: Utc::now(),
700 last_accessed: Utc::now(),
701 message_count: 1,
702 turn_count: 1,
703 current_mode: OperatingMode::Build,
704 model: "gpt-4".to_string(),
705 tags: vec!["test".to_string()],
706 is_favorite: false,
707 file_size: 0,
708 compression_ratio: 0.0,
709 format_version: FORMAT_VERSION as u16,
710 checkpoints: vec![],
711 };
712
713 let conversation = ConversationSnapshot {
714 id,
715 messages: vec![],
716 context: ConversationContext {
717 working_directory: PathBuf::from("."),
718 environment_variables: HashMap::new(),
719 open_files: Vec::new(),
720 ast_index_state: None,
721 embedding_cache: None,
722 },
723 mode_history: vec![(OperatingMode::Build, Utc::now())],
724 };
725
726 let state = SessionState::default();
727
728 store
730 .save_session(id, &metadata, &conversation, &state)
731 .await
732 .unwrap();
733
734 let (loaded_meta, loaded_conv, _loaded_state) = store.load_session(id).await.unwrap();
736
737 assert_eq!(loaded_meta.id, id);
738 assert_eq!(loaded_meta.title, "Test Session");
739 assert_eq!(loaded_conv.id, id);
740 }
741
742 #[tokio::test]
743 async fn test_list_and_search_sessions() {
744 let temp_dir = TempDir::new().unwrap();
745 let config = SessionStoreConfig {
746 base_path: temp_dir.path().to_path_buf(),
747 ..Default::default()
748 };
749
750 let store = SessionStore::new(config).await.unwrap();
751
752 for i in 0..3 {
754 let id = Uuid::new_v4();
755 let metadata = SessionMetadata {
756 id,
757 title: format!("Test Session {}", i),
758 created_at: Utc::now(),
759 updated_at: Utc::now(),
760 last_accessed: Utc::now(),
761 message_count: i,
762 turn_count: i,
763 current_mode: OperatingMode::Build,
764 model: "gpt-4".to_string(),
765 tags: vec![],
766 is_favorite: false,
767 file_size: 0,
768 compression_ratio: 0.0,
769 format_version: FORMAT_VERSION as u16,
770 checkpoints: vec![],
771 };
772
773 let conversation = ConversationSnapshot {
774 id,
775 messages: vec![],
776 context: ConversationContext {
777 working_directory: PathBuf::from("."),
778 environment_variables: HashMap::new(),
779 open_files: Vec::new(),
780 ast_index_state: None,
781 embedding_cache: None,
782 },
783 mode_history: vec![],
784 };
785
786 let state = SessionState::default();
787
788 store
789 .save_session(id, &metadata, &conversation, &state)
790 .await
791 .unwrap();
792 }
793
794 let sessions = store.list_sessions().await.unwrap();
796 assert_eq!(sessions.len(), 3);
797
798 let results = store.search_sessions("Test Session 1").await.unwrap();
800 assert_eq!(results.len(), 1);
801 assert!(results[0].title.contains("Test Session 1"));
802 }
803}