agcodex_persistence/
storage.rs

1//! Storage backend for session persistence
2
3use crate::AGCX_MAGIC;
4use crate::FORMAT_VERSION;
5use crate::compression::CompressionLevel;
6use crate::compression::Compressor;
7use crate::error::PersistenceError;
8use crate::error::Result;
9use crate::types::ConversationSnapshot;
10use crate::types::MessageSnapshot;
11use crate::types::SessionIndex;
12use crate::types::SessionMetadata;
13use crate::types::SessionState;
14use memmap2::MmapOptions;
15use std::fs::File;
16use std::fs::{self};
17use std::io::BufReader;
18use std::io::BufWriter;
19use std::io::Read;
20use std::io::Write;
21use std::path::Path;
22use std::path::PathBuf;
23use tokio::fs as async_fs;
24use uuid::Uuid;
25
26/// Storage backend trait for different storage implementations
27pub trait StorageBackend: Send + Sync {
28    /// Save session to storage
29    fn save_session(
30        &self,
31        id: Uuid,
32        metadata: &SessionMetadata,
33        conversation: &ConversationSnapshot,
34        state: &SessionState,
35    ) -> impl std::future::Future<Output = Result<()>> + Send;
36
37    /// Load session from storage
38    fn load_session(
39        &self,
40        id: Uuid,
41    ) -> impl std::future::Future<
42        Output = Result<(SessionMetadata, ConversationSnapshot, SessionState)>,
43    > + Send;
44
45    /// Delete session from storage
46    fn delete_session(&self, id: Uuid) -> impl std::future::Future<Output = Result<()>> + Send;
47
48    /// List all sessions
49    fn list_sessions(
50        &self,
51    ) -> impl std::future::Future<Output = Result<Vec<SessionMetadata>>> + Send;
52
53    /// Load session index
54    fn load_index(&self) -> impl std::future::Future<Output = Result<SessionIndex>> + Send;
55
56    /// Save session index
57    fn save_index(
58        &self,
59        index: &SessionIndex,
60    ) -> impl std::future::Future<Output = Result<()>> + Send;
61}
62
63/// File-based storage implementation
64pub struct SessionStorage {
65    base_path: PathBuf,
66    compressor: Compressor,
67    use_mmap: bool,
68}
69
70impl SessionStorage {
71    /// Create a new session storage
72    pub fn new(base_path: PathBuf, compression_level: CompressionLevel) -> Result<Self> {
73        // Ensure base path exists
74        fs::create_dir_all(&base_path)?;
75
76        Ok(Self {
77            base_path,
78            compressor: Compressor::new(compression_level),
79            use_mmap: true,
80        })
81    }
82
83    /// Get the path for a session file
84    fn session_path(&self, id: Uuid) -> PathBuf {
85        self.base_path.join(format!("{}.agcx", id))
86    }
87
88    /// Get the path for the session index
89    fn index_path(&self) -> PathBuf {
90        self.base_path.join("sessions.idx")
91    }
92
93    /// Get the checkpoint directory
94    fn _checkpoint_dir(&self) -> PathBuf {
95        self.base_path.join("checkpoints")
96    }
97
98    /// Write session file with header
99    fn write_session_file(
100        &self,
101        path: &Path,
102        metadata: &SessionMetadata,
103        conversation: &ConversationSnapshot,
104        state: &SessionState,
105    ) -> Result<()> {
106        let file = File::create(path)?;
107        let mut writer = BufWriter::new(file);
108
109        // Write magic bytes and version
110        writer.write_all(AGCX_MAGIC)?;
111        writer.write_all(&FORMAT_VERSION.to_le_bytes())?;
112
113        // Serialize metadata with bincode
114        let metadata_bytes = bincode::serde::encode_to_vec(metadata, bincode::config::standard())?;
115        let metadata_len = metadata_bytes.len() as u32;
116        writer.write_all(&metadata_len.to_le_bytes())?;
117        writer.write_all(&metadata_bytes)?;
118
119        // Serialize messages with MessagePack and compress
120        let messages_bytes = rmp_serde::to_vec(&conversation.messages)?;
121        let compressed_messages = self.compressor.compress(&messages_bytes)?;
122        let messages_len = compressed_messages.len() as u32;
123        writer.write_all(&messages_len.to_le_bytes())?;
124        writer.write_all(&compressed_messages)?;
125
126        // Serialize context with MessagePack and compress
127        let context_bytes = rmp_serde::to_vec(&conversation.context)?;
128        let compressed_context = self.compressor.compress(&context_bytes)?;
129        let context_len = compressed_context.len() as u32;
130        writer.write_all(&context_len.to_le_bytes())?;
131        writer.write_all(&compressed_context)?;
132
133        // Serialize state with bincode
134        let state_bytes = bincode::serde::encode_to_vec(state, bincode::config::standard())?;
135        writer.write_all(&(state_bytes.len() as u32).to_le_bytes())?;
136        writer.write_all(&state_bytes)?;
137
138        writer.flush()?;
139        Ok(())
140    }
141
142    /// Read session file with header validation
143    fn read_session_file(
144        &self,
145        path: &Path,
146    ) -> Result<(SessionMetadata, ConversationSnapshot, SessionState)> {
147        let file = File::open(path)?;
148        let mut reader = BufReader::new(file);
149
150        // Validate magic bytes
151        let mut magic = [0u8; 4];
152        reader.read_exact(&mut magic)?;
153        if magic != AGCX_MAGIC {
154            return Err(PersistenceError::InvalidMagic);
155        }
156
157        // Validate version
158        let mut version_bytes = [0u8; 2];
159        reader.read_exact(&mut version_bytes)?;
160        let version = u16::from_le_bytes(version_bytes);
161        if version != FORMAT_VERSION {
162            return Err(PersistenceError::UnsupportedVersion(
163                version,
164                FORMAT_VERSION,
165            ));
166        }
167
168        // Read metadata
169        let mut metadata_len_bytes = [0u8; 4];
170        reader.read_exact(&mut metadata_len_bytes)?;
171        let metadata_len = u32::from_le_bytes(metadata_len_bytes) as usize;
172        let mut metadata_bytes = vec![0u8; metadata_len];
173        reader.read_exact(&mut metadata_bytes)?;
174        let (metadata, _): (SessionMetadata, _) =
175            bincode::serde::decode_from_slice(&metadata_bytes, bincode::config::standard())?;
176
177        // Read and decompress messages
178        let mut messages_len_bytes = [0u8; 4];
179        reader.read_exact(&mut messages_len_bytes)?;
180        let messages_len = u32::from_le_bytes(messages_len_bytes) as usize;
181        let mut compressed_messages = vec![0u8; messages_len];
182        reader.read_exact(&mut compressed_messages)?;
183        let messages_bytes = self.compressor.decompress(&compressed_messages)?;
184        let messages: Vec<MessageSnapshot> = rmp_serde::from_slice(&messages_bytes)?;
185
186        // Read and decompress context
187        let mut context_len_bytes = [0u8; 4];
188        reader.read_exact(&mut context_len_bytes)?;
189        let context_len = u32::from_le_bytes(context_len_bytes) as usize;
190        let mut compressed_context = vec![0u8; context_len];
191        reader.read_exact(&mut compressed_context)?;
192        let context_bytes = self.compressor.decompress(&compressed_context)?;
193        let context = rmp_serde::from_slice(&context_bytes)?;
194
195        // Read state
196        let mut state_len_bytes = [0u8; 4];
197        reader.read_exact(&mut state_len_bytes)?;
198        let state_len = u32::from_le_bytes(state_len_bytes) as usize;
199        let mut state_bytes = vec![0u8; state_len];
200        reader.read_exact(&mut state_bytes)?;
201        let (state, _): (SessionState, _) =
202            bincode::serde::decode_from_slice(&state_bytes, bincode::config::standard())?;
203
204        let conversation = ConversationSnapshot {
205            id: metadata.id,
206            messages,
207            context,
208            mode_history: vec![(metadata.current_mode, metadata.created_at)],
209        };
210
211        Ok((metadata, conversation, state))
212    }
213
214    /// Load metadata using memory mapping for fast access
215    pub fn load_metadata_mmap(&self, path: &Path) -> Result<SessionMetadata> {
216        if !self.use_mmap {
217            let (metadata, _, _) = self.read_session_file(path)?;
218            return Ok(metadata);
219        }
220
221        let file = File::open(path)?;
222        let mmap = unsafe { MmapOptions::new().map(&file)? };
223
224        // Validate magic and version
225        if mmap.len() < 6 {
226            return Err(PersistenceError::CorruptData("File too small".to_string()));
227        }
228
229        if &mmap[0..4] != AGCX_MAGIC {
230            return Err(PersistenceError::InvalidMagic);
231        }
232
233        let version = u16::from_le_bytes([mmap[4], mmap[5]]);
234        if version != FORMAT_VERSION {
235            return Err(PersistenceError::UnsupportedVersion(
236                version,
237                FORMAT_VERSION,
238            ));
239        }
240
241        // Read metadata length and deserialize
242        let metadata_len = u32::from_le_bytes([mmap[6], mmap[7], mmap[8], mmap[9]]) as usize;
243        let metadata_end = 10 + metadata_len;
244
245        if mmap.len() < metadata_end {
246            return Err(PersistenceError::CorruptData(
247                "Incomplete metadata".to_string(),
248            ));
249        }
250
251        let (metadata, _): (SessionMetadata, _) = bincode::serde::decode_from_slice(
252            &mmap[10..metadata_end],
253            bincode::config::standard(),
254        )?;
255        Ok(metadata)
256    }
257}
258
259impl StorageBackend for SessionStorage {
260    async fn save_session(
261        &self,
262        id: Uuid,
263        metadata: &SessionMetadata,
264        conversation: &ConversationSnapshot,
265        state: &SessionState,
266    ) -> Result<()> {
267        let path = self.session_path(id);
268
269        // Write to temporary file first
270        let temp_path = path.with_extension("tmp");
271        self.write_session_file(&temp_path, metadata, conversation, state)?;
272
273        // Atomic rename
274        async_fs::rename(&temp_path, &path).await?;
275
276        Ok(())
277    }
278
279    async fn load_session(
280        &self,
281        id: Uuid,
282    ) -> Result<(SessionMetadata, ConversationSnapshot, SessionState)> {
283        let path = self.session_path(id);
284
285        if !path.exists() {
286            return Err(PersistenceError::SessionNotFound(id));
287        }
288
289        // Use blocking I/O in a spawn_blocking task
290        let path_clone = path.clone();
291        let compressor = self.compressor.clone();
292
293        tokio::task::spawn_blocking(move || {
294            let storage = SessionStorage {
295                base_path: PathBuf::new(),
296                compressor,
297                use_mmap: false,
298            };
299            storage.read_session_file(&path_clone)
300        })
301        .await
302        .map_err(|e| PersistenceError::Io(std::io::Error::other(e)))?
303    }
304
305    async fn delete_session(&self, id: Uuid) -> Result<()> {
306        let path = self.session_path(id);
307
308        if path.exists() {
309            async_fs::remove_file(&path).await?;
310        }
311
312        Ok(())
313    }
314
315    async fn list_sessions(&self) -> Result<Vec<SessionMetadata>> {
316        let mut sessions = Vec::new();
317
318        let mut entries = async_fs::read_dir(&self.base_path).await?;
319
320        while let Some(entry) = entries.next_entry().await? {
321            let path = entry.path();
322
323            if path.extension().and_then(|ext| ext.to_str()) == Some("agcx") {
324                match self.load_metadata_mmap(&path) {
325                    Ok(metadata) => sessions.push(metadata),
326                    Err(e) => {
327                        tracing::warn!("Failed to load session metadata from {:?}: {}", path, e);
328                    }
329                }
330            }
331        }
332
333        // Sort by last accessed time
334        sessions.sort_by(|a, b| b.last_accessed.cmp(&a.last_accessed));
335
336        Ok(sessions)
337    }
338
339    async fn load_index(&self) -> Result<SessionIndex> {
340        let path = self.index_path();
341
342        if !path.exists() {
343            return Ok(SessionIndex::new());
344        }
345
346        let bytes = async_fs::read(&path).await?;
347        let (index, _) = bincode::serde::decode_from_slice(&bytes, bincode::config::standard())?;
348        Ok(index)
349    }
350
351    async fn save_index(&self, index: &SessionIndex) -> Result<()> {
352        let path = self.index_path();
353        let bytes = bincode::serde::encode_to_vec(index, bincode::config::standard())?;
354
355        // Write to temporary file first
356        let temp_path = path.with_extension("tmp");
357        async_fs::write(&temp_path, &bytes).await?;
358
359        // Atomic rename
360        async_fs::rename(&temp_path, &path).await?;
361
362        Ok(())
363    }
364}
365
366// Clone implementation for Compressor to use in async contexts
367impl Clone for Compressor {
368    fn clone(&self) -> Self {
369        Self::new(CompressionLevel::Balanced)
370    }
371}