1use crate::AGCX_MAGIC;
4use crate::FORMAT_VERSION;
5use crate::compression::CompressionLevel;
6use crate::compression::Compressor;
7use crate::error::PersistenceError;
8use crate::error::Result;
9use crate::types::ConversationSnapshot;
10use crate::types::MessageSnapshot;
11use crate::types::SessionIndex;
12use crate::types::SessionMetadata;
13use crate::types::SessionState;
14use memmap2::MmapOptions;
15use std::fs::File;
16use std::fs::{self};
17use std::io::BufReader;
18use std::io::BufWriter;
19use std::io::Read;
20use std::io::Write;
21use std::path::Path;
22use std::path::PathBuf;
23use tokio::fs as async_fs;
24use uuid::Uuid;
25
26pub trait StorageBackend: Send + Sync {
28 fn save_session(
30 &self,
31 id: Uuid,
32 metadata: &SessionMetadata,
33 conversation: &ConversationSnapshot,
34 state: &SessionState,
35 ) -> impl std::future::Future<Output = Result<()>> + Send;
36
37 fn load_session(
39 &self,
40 id: Uuid,
41 ) -> impl std::future::Future<
42 Output = Result<(SessionMetadata, ConversationSnapshot, SessionState)>,
43 > + Send;
44
45 fn delete_session(&self, id: Uuid) -> impl std::future::Future<Output = Result<()>> + Send;
47
48 fn list_sessions(
50 &self,
51 ) -> impl std::future::Future<Output = Result<Vec<SessionMetadata>>> + Send;
52
53 fn load_index(&self) -> impl std::future::Future<Output = Result<SessionIndex>> + Send;
55
56 fn save_index(
58 &self,
59 index: &SessionIndex,
60 ) -> impl std::future::Future<Output = Result<()>> + Send;
61}
62
63pub struct SessionStorage {
65 base_path: PathBuf,
66 compressor: Compressor,
67 use_mmap: bool,
68}
69
70impl SessionStorage {
71 pub fn new(base_path: PathBuf, compression_level: CompressionLevel) -> Result<Self> {
73 fs::create_dir_all(&base_path)?;
75
76 Ok(Self {
77 base_path,
78 compressor: Compressor::new(compression_level),
79 use_mmap: true,
80 })
81 }
82
83 fn session_path(&self, id: Uuid) -> PathBuf {
85 self.base_path.join(format!("{}.agcx", id))
86 }
87
88 fn index_path(&self) -> PathBuf {
90 self.base_path.join("sessions.idx")
91 }
92
93 fn _checkpoint_dir(&self) -> PathBuf {
95 self.base_path.join("checkpoints")
96 }
97
98 fn write_session_file(
100 &self,
101 path: &Path,
102 metadata: &SessionMetadata,
103 conversation: &ConversationSnapshot,
104 state: &SessionState,
105 ) -> Result<()> {
106 let file = File::create(path)?;
107 let mut writer = BufWriter::new(file);
108
109 writer.write_all(AGCX_MAGIC)?;
111 writer.write_all(&FORMAT_VERSION.to_le_bytes())?;
112
113 let metadata_bytes = bincode::serde::encode_to_vec(metadata, bincode::config::standard())?;
115 let metadata_len = metadata_bytes.len() as u32;
116 writer.write_all(&metadata_len.to_le_bytes())?;
117 writer.write_all(&metadata_bytes)?;
118
119 let messages_bytes = rmp_serde::to_vec(&conversation.messages)?;
121 let compressed_messages = self.compressor.compress(&messages_bytes)?;
122 let messages_len = compressed_messages.len() as u32;
123 writer.write_all(&messages_len.to_le_bytes())?;
124 writer.write_all(&compressed_messages)?;
125
126 let context_bytes = rmp_serde::to_vec(&conversation.context)?;
128 let compressed_context = self.compressor.compress(&context_bytes)?;
129 let context_len = compressed_context.len() as u32;
130 writer.write_all(&context_len.to_le_bytes())?;
131 writer.write_all(&compressed_context)?;
132
133 let state_bytes = bincode::serde::encode_to_vec(state, bincode::config::standard())?;
135 writer.write_all(&(state_bytes.len() as u32).to_le_bytes())?;
136 writer.write_all(&state_bytes)?;
137
138 writer.flush()?;
139 Ok(())
140 }
141
142 fn read_session_file(
144 &self,
145 path: &Path,
146 ) -> Result<(SessionMetadata, ConversationSnapshot, SessionState)> {
147 let file = File::open(path)?;
148 let mut reader = BufReader::new(file);
149
150 let mut magic = [0u8; 4];
152 reader.read_exact(&mut magic)?;
153 if magic != AGCX_MAGIC {
154 return Err(PersistenceError::InvalidMagic);
155 }
156
157 let mut version_bytes = [0u8; 2];
159 reader.read_exact(&mut version_bytes)?;
160 let version = u16::from_le_bytes(version_bytes);
161 if version != FORMAT_VERSION {
162 return Err(PersistenceError::UnsupportedVersion(
163 version,
164 FORMAT_VERSION,
165 ));
166 }
167
168 let mut metadata_len_bytes = [0u8; 4];
170 reader.read_exact(&mut metadata_len_bytes)?;
171 let metadata_len = u32::from_le_bytes(metadata_len_bytes) as usize;
172 let mut metadata_bytes = vec![0u8; metadata_len];
173 reader.read_exact(&mut metadata_bytes)?;
174 let (metadata, _): (SessionMetadata, _) =
175 bincode::serde::decode_from_slice(&metadata_bytes, bincode::config::standard())?;
176
177 let mut messages_len_bytes = [0u8; 4];
179 reader.read_exact(&mut messages_len_bytes)?;
180 let messages_len = u32::from_le_bytes(messages_len_bytes) as usize;
181 let mut compressed_messages = vec![0u8; messages_len];
182 reader.read_exact(&mut compressed_messages)?;
183 let messages_bytes = self.compressor.decompress(&compressed_messages)?;
184 let messages: Vec<MessageSnapshot> = rmp_serde::from_slice(&messages_bytes)?;
185
186 let mut context_len_bytes = [0u8; 4];
188 reader.read_exact(&mut context_len_bytes)?;
189 let context_len = u32::from_le_bytes(context_len_bytes) as usize;
190 let mut compressed_context = vec![0u8; context_len];
191 reader.read_exact(&mut compressed_context)?;
192 let context_bytes = self.compressor.decompress(&compressed_context)?;
193 let context = rmp_serde::from_slice(&context_bytes)?;
194
195 let mut state_len_bytes = [0u8; 4];
197 reader.read_exact(&mut state_len_bytes)?;
198 let state_len = u32::from_le_bytes(state_len_bytes) as usize;
199 let mut state_bytes = vec![0u8; state_len];
200 reader.read_exact(&mut state_bytes)?;
201 let (state, _): (SessionState, _) =
202 bincode::serde::decode_from_slice(&state_bytes, bincode::config::standard())?;
203
204 let conversation = ConversationSnapshot {
205 id: metadata.id,
206 messages,
207 context,
208 mode_history: vec![(metadata.current_mode, metadata.created_at)],
209 };
210
211 Ok((metadata, conversation, state))
212 }
213
214 pub fn load_metadata_mmap(&self, path: &Path) -> Result<SessionMetadata> {
216 if !self.use_mmap {
217 let (metadata, _, _) = self.read_session_file(path)?;
218 return Ok(metadata);
219 }
220
221 let file = File::open(path)?;
222 let mmap = unsafe { MmapOptions::new().map(&file)? };
223
224 if mmap.len() < 6 {
226 return Err(PersistenceError::CorruptData("File too small".to_string()));
227 }
228
229 if &mmap[0..4] != AGCX_MAGIC {
230 return Err(PersistenceError::InvalidMagic);
231 }
232
233 let version = u16::from_le_bytes([mmap[4], mmap[5]]);
234 if version != FORMAT_VERSION {
235 return Err(PersistenceError::UnsupportedVersion(
236 version,
237 FORMAT_VERSION,
238 ));
239 }
240
241 let metadata_len = u32::from_le_bytes([mmap[6], mmap[7], mmap[8], mmap[9]]) as usize;
243 let metadata_end = 10 + metadata_len;
244
245 if mmap.len() < metadata_end {
246 return Err(PersistenceError::CorruptData(
247 "Incomplete metadata".to_string(),
248 ));
249 }
250
251 let (metadata, _): (SessionMetadata, _) = bincode::serde::decode_from_slice(
252 &mmap[10..metadata_end],
253 bincode::config::standard(),
254 )?;
255 Ok(metadata)
256 }
257}
258
259impl StorageBackend for SessionStorage {
260 async fn save_session(
261 &self,
262 id: Uuid,
263 metadata: &SessionMetadata,
264 conversation: &ConversationSnapshot,
265 state: &SessionState,
266 ) -> Result<()> {
267 let path = self.session_path(id);
268
269 let temp_path = path.with_extension("tmp");
271 self.write_session_file(&temp_path, metadata, conversation, state)?;
272
273 async_fs::rename(&temp_path, &path).await?;
275
276 Ok(())
277 }
278
279 async fn load_session(
280 &self,
281 id: Uuid,
282 ) -> Result<(SessionMetadata, ConversationSnapshot, SessionState)> {
283 let path = self.session_path(id);
284
285 if !path.exists() {
286 return Err(PersistenceError::SessionNotFound(id));
287 }
288
289 let path_clone = path.clone();
291 let compressor = self.compressor.clone();
292
293 tokio::task::spawn_blocking(move || {
294 let storage = SessionStorage {
295 base_path: PathBuf::new(),
296 compressor,
297 use_mmap: false,
298 };
299 storage.read_session_file(&path_clone)
300 })
301 .await
302 .map_err(|e| PersistenceError::Io(std::io::Error::other(e)))?
303 }
304
305 async fn delete_session(&self, id: Uuid) -> Result<()> {
306 let path = self.session_path(id);
307
308 if path.exists() {
309 async_fs::remove_file(&path).await?;
310 }
311
312 Ok(())
313 }
314
315 async fn list_sessions(&self) -> Result<Vec<SessionMetadata>> {
316 let mut sessions = Vec::new();
317
318 let mut entries = async_fs::read_dir(&self.base_path).await?;
319
320 while let Some(entry) = entries.next_entry().await? {
321 let path = entry.path();
322
323 if path.extension().and_then(|ext| ext.to_str()) == Some("agcx") {
324 match self.load_metadata_mmap(&path) {
325 Ok(metadata) => sessions.push(metadata),
326 Err(e) => {
327 tracing::warn!("Failed to load session metadata from {:?}: {}", path, e);
328 }
329 }
330 }
331 }
332
333 sessions.sort_by(|a, b| b.last_accessed.cmp(&a.last_accessed));
335
336 Ok(sessions)
337 }
338
339 async fn load_index(&self) -> Result<SessionIndex> {
340 let path = self.index_path();
341
342 if !path.exists() {
343 return Ok(SessionIndex::new());
344 }
345
346 let bytes = async_fs::read(&path).await?;
347 let (index, _) = bincode::serde::decode_from_slice(&bytes, bincode::config::standard())?;
348 Ok(index)
349 }
350
351 async fn save_index(&self, index: &SessionIndex) -> Result<()> {
352 let path = self.index_path();
353 let bytes = bincode::serde::encode_to_vec(index, bincode::config::standard())?;
354
355 let temp_path = path.with_extension("tmp");
357 async_fs::write(&temp_path, &bytes).await?;
358
359 async_fs::rename(&temp_path, &path).await?;
361
362 Ok(())
363 }
364}
365
366impl Clone for Compressor {
368 fn clone(&self) -> Self {
369 Self::new(CompressionLevel::Balanced)
370 }
371}