1use crate::SaorsaAgentError;
4use crate::session::{Message, SessionId, SessionMetadata, SessionStorage};
5use std::sync::Arc;
6use std::time::Duration;
7use tokio::sync::{Mutex, RwLock, mpsc};
8use tokio::time::sleep;
9use tracing::{debug, error, warn};
10
11#[derive(Debug, Clone)]
13pub struct AutoSaveConfig {
14 pub save_interval: Duration,
16 pub max_batch_size: usize,
18 pub max_retries: usize,
20}
21
22impl Default for AutoSaveConfig {
23 fn default() -> Self {
24 Self {
25 save_interval: Duration::from_millis(500),
26 max_batch_size: 10,
27 max_retries: 3,
28 }
29 }
30}
31
32pub struct AutoSaveManager {
34 storage: Arc<SessionStorage>,
35 session_id: SessionId,
36 metadata: Arc<RwLock<SessionMetadata>>,
37 messages: Arc<RwLock<Vec<Message>>>,
38 dirty: Arc<Mutex<bool>>,
39 save_tx: mpsc::UnboundedSender<SaveRequest>,
40}
41
42#[derive(Debug)]
44enum SaveRequest {
45 Save,
47 Shutdown,
49}
50
51impl AutoSaveManager {
52 pub fn new(
54 storage: SessionStorage,
55 config: AutoSaveConfig,
56 session_id: SessionId,
57 metadata: SessionMetadata,
58 ) -> Self {
59 let storage = Arc::new(storage);
60 let metadata = Arc::new(RwLock::new(metadata));
61 let messages = Arc::new(RwLock::new(Vec::new()));
62 let dirty = Arc::new(Mutex::new(false));
63
64 let (save_tx, save_rx) = mpsc::unbounded_channel();
65
66 let task_storage = Arc::clone(&storage);
68 let task_metadata = Arc::clone(&metadata);
69 let task_messages = Arc::clone(&messages);
70 let task_dirty = Arc::clone(&dirty);
71 let task_config = config.clone();
72 let task_session_id = session_id;
73
74 tokio::spawn(async move {
75 Self::save_task(
76 task_storage,
77 task_config,
78 task_session_id,
79 task_metadata,
80 task_messages,
81 task_dirty,
82 save_rx,
83 )
84 .await;
85 });
86
87 Self {
88 storage,
89 session_id,
90 metadata,
91 messages,
92 dirty,
93 save_tx,
94 }
95 }
96
97 pub async fn add_message(&self, message: Message) {
99 let mut messages = self.messages.write().await;
100 messages.push(message);
101 drop(messages);
102
103 *self.dirty.lock().await = true;
104
105 let _ = self.save_tx.send(SaveRequest::Save);
107 }
108
109 pub async fn messages(&self) -> Vec<Message> {
111 self.messages.read().await.clone()
112 }
113
114 pub async fn update_metadata(&self, metadata: SessionMetadata) {
116 *self.metadata.write().await = metadata;
117 *self.dirty.lock().await = true;
118 let _ = self.save_tx.send(SaveRequest::Save);
119 }
120
121 pub async fn force_save(&self) -> Result<(), SaorsaAgentError> {
123 self.perform_save().await
124 }
125
126 pub fn shutdown(&self) {
128 let _ = self.save_tx.send(SaveRequest::Shutdown);
129 }
130
131 async fn save_task(
133 storage: Arc<SessionStorage>,
134 config: AutoSaveConfig,
135 session_id: SessionId,
136 metadata: Arc<RwLock<SessionMetadata>>,
137 messages: Arc<RwLock<Vec<Message>>>,
138 dirty: Arc<Mutex<bool>>,
139 mut save_rx: mpsc::UnboundedReceiver<SaveRequest>,
140 ) {
141 let mut pending_save = false;
142 let mut last_saved_count = 0;
143
144 loop {
145 tokio::select! {
146 request = save_rx.recv() => {
147 match request {
148 Some(SaveRequest::Save) => {
149 pending_save = true;
150 }
151 Some(SaveRequest::Shutdown) | None => {
152 debug!("Auto-save task shutting down");
153 break;
154 }
155 }
156 }
157 _ = sleep(config.save_interval), if pending_save => {
158 let is_dirty = *dirty.lock().await;
160 let current_count = messages.read().await.len();
161
162 let should_save = is_dirty ||
164 (current_count > last_saved_count &&
165 current_count - last_saved_count >= config.max_batch_size);
166
167 if should_save {
168 debug!(session_id = %session_id, "Performing auto-save");
169
170 let mut attempt = 0;
172 loop {
173 attempt += 1;
174
175 let metadata_clone = metadata.read().await.clone();
176 let messages_clone = messages.read().await.clone();
177
178 match Self::save_with_retry(
179 &storage,
180 session_id,
181 &metadata_clone,
182 &messages_clone,
183 last_saved_count,
184 )
185 .await
186 {
187 Ok(()) => {
188 *dirty.lock().await = false;
189 last_saved_count = messages_clone.len();
190 debug!(session_id = %session_id, messages = last_saved_count, "Auto-save complete");
191 break;
192 }
193 Err(e) => {
194 if attempt >= config.max_retries {
195 error!(
196 session_id = %session_id,
197 error = %e,
198 "Auto-save failed after {} retries",
199 config.max_retries
200 );
201 break;
202 } else {
203 warn!(
204 session_id = %session_id,
205 attempt,
206 error = %e,
207 "Auto-save failed, retrying..."
208 );
209 sleep(Duration::from_millis(100 * attempt as u64)).await;
210 }
211 }
212 }
213 }
214 }
215
216 pending_save = false;
217 }
218 }
219 }
220 }
221
222 async fn perform_save(&self) -> Result<(), SaorsaAgentError> {
224 let metadata = self.metadata.read().await.clone();
225 let messages = self.messages.read().await.clone();
226
227 Self::save_with_retry(&self.storage, self.session_id, &metadata, &messages, 0).await?;
228
229 *self.dirty.lock().await = false;
230 debug!(session_id = %self.session_id, "Force save complete");
231 Ok(())
232 }
233
234 async fn save_with_retry(
236 storage: &SessionStorage,
237 session_id: SessionId,
238 metadata: &SessionMetadata,
239 messages: &[Message],
240 last_saved_count: usize,
241 ) -> Result<(), SaorsaAgentError> {
242 storage.save_manifest(&session_id, metadata)?;
244
245 if last_saved_count < messages.len() {
247 for (idx, message) in messages.iter().enumerate().skip(last_saved_count) {
248 storage.save_message(&session_id, idx, message)?;
249 }
250 }
251
252 Ok(())
253 }
254}
255
256impl Drop for AutoSaveManager {
257 fn drop(&mut self) {
258 self.shutdown();
259 }
260}
261
262#[cfg(test)]
263mod tests {
264 use super::*;
265 use crate::session::{Message, SessionId, SessionMetadata};
266 use chrono::Utc;
267 use std::collections::HashSet;
268 use tempfile::TempDir;
269
270 #[tokio::test]
271 async fn test_debouncing_coalesces_rapid_saves() {
272 let temp_dir = match TempDir::new() {
273 Ok(dir) => dir,
274 Err(e) => panic!("Failed to create temp dir: {}", e),
275 };
276 let storage = SessionStorage::with_base_path(temp_dir.path().to_path_buf());
277
278 let config = AutoSaveConfig {
279 save_interval: Duration::from_millis(100),
280 max_batch_size: 10,
281 max_retries: 3,
282 };
283
284 let session_id = SessionId::new();
285 let now = Utc::now();
286 let metadata = SessionMetadata {
287 created: now,
288 modified: now,
289 last_active: now,
290 title: Some("Test Session".to_string()),
291 description: None,
292 tags: HashSet::new(),
293 };
294
295 let manager = AutoSaveManager::new(storage, config, session_id, metadata);
296
297 for i in 0..5 {
299 manager
300 .add_message(Message::user(format!("Message {}", i)))
301 .await;
302 sleep(Duration::from_millis(10)).await;
303 }
304
305 sleep(Duration::from_millis(200)).await;
307
308 let messages = manager.messages().await;
310 assert_eq!(messages.len(), 5);
311 }
312
313 #[tokio::test]
314 async fn test_incremental_save_appends_only_new_messages() {
315 let temp_dir = match TempDir::new() {
316 Ok(dir) => dir,
317 Err(e) => panic!("Failed to create temp dir: {}", e),
318 };
319 let storage = SessionStorage::with_base_path(temp_dir.path().to_path_buf());
320
321 let config = AutoSaveConfig {
322 save_interval: Duration::from_millis(50),
323 max_batch_size: 10,
324 max_retries: 3,
325 };
326
327 let session_id = SessionId::new();
328 let now = Utc::now();
329 let metadata = SessionMetadata {
330 created: now,
331 modified: now,
332 last_active: now,
333 title: Some("Incremental Test".to_string()),
334 description: None,
335 tags: HashSet::new(),
336 };
337
338 let manager = AutoSaveManager::new(storage, config, session_id, metadata);
339
340 manager
342 .add_message(Message::user("First".to_string()))
343 .await;
344 manager
345 .add_message(Message::user("Second".to_string()))
346 .await;
347 sleep(Duration::from_millis(100)).await;
348
349 manager
351 .add_message(Message::user("Third".to_string()))
352 .await;
353 sleep(Duration::from_millis(100)).await;
354
355 let messages = manager.messages().await;
356 assert_eq!(messages.len(), 3);
357 }
358
359 #[tokio::test]
360 async fn test_retry_logic_on_simulated_io_error() {
361 let temp_dir = match TempDir::new() {
364 Ok(dir) => dir,
365 Err(e) => panic!("Failed to create temp dir: {}", e),
366 };
367 let storage = SessionStorage::with_base_path(temp_dir.path().to_path_buf());
368
369 let config = AutoSaveConfig {
370 save_interval: Duration::from_millis(50),
371 max_batch_size: 10,
372 max_retries: 3,
373 };
374
375 let session_id = SessionId::new();
376 let now = Utc::now();
377 let metadata = SessionMetadata {
378 created: now,
379 modified: now,
380 last_active: now,
381 title: Some("Retry Test".to_string()),
382 description: None,
383 tags: HashSet::new(),
384 };
385
386 let manager = AutoSaveManager::new(storage, config, session_id, metadata);
387 manager.add_message(Message::user("Test".to_string())).await;
388 sleep(Duration::from_millis(150)).await;
389
390 let messages = manager.messages().await;
393 assert_eq!(messages.len(), 1);
394 }
395
396 #[tokio::test]
397 async fn test_session_state_persists_after_autosave() {
398 let temp_dir = match TempDir::new() {
399 Ok(dir) => dir,
400 Err(e) => panic!("Failed to create temp dir: {}", e),
401 };
402 let storage = SessionStorage::with_base_path(temp_dir.path().to_path_buf());
403
404 let config = AutoSaveConfig {
405 save_interval: Duration::from_millis(50),
406 max_batch_size: 10,
407 max_retries: 3,
408 };
409
410 let session_id = SessionId::new();
411 let now = Utc::now();
412 let metadata = SessionMetadata {
413 created: now,
414 modified: now,
415 last_active: now,
416 title: Some("Persist Test".to_string()),
417 description: None,
418 tags: HashSet::new(),
419 };
420
421 let manager = AutoSaveManager::new(storage.clone(), config, session_id, metadata.clone());
422
423 manager
424 .add_message(Message::user("Persisted".to_string()))
425 .await;
426 sleep(Duration::from_millis(150)).await;
427
428 let loaded_metadata = match storage.load_manifest(&session_id) {
430 Ok(meta) => meta,
431 Err(e) => panic!("Failed to load manifest: {}", e),
432 };
433 assert_eq!(loaded_metadata.title, Some("Persist Test".to_string()));
434
435 let loaded_messages = match storage.load_messages(&session_id) {
436 Ok(msgs) => msgs,
437 Err(e) => panic!("Failed to load messages: {}", e),
438 };
439 assert_eq!(loaded_messages.len(), 1);
440 }
441
442 #[tokio::test]
443 async fn test_no_data_loss_on_rapid_message_additions() {
444 let temp_dir = match TempDir::new() {
445 Ok(dir) => dir,
446 Err(e) => panic!("Failed to create temp dir: {}", e),
447 };
448 let storage = SessionStorage::with_base_path(temp_dir.path().to_path_buf());
449
450 let config = AutoSaveConfig {
451 save_interval: Duration::from_millis(100),
452 max_batch_size: 5, max_retries: 3,
454 };
455
456 let session_id = SessionId::new();
457 let now = Utc::now();
458 let metadata = SessionMetadata {
459 created: now,
460 modified: now,
461 last_active: now,
462 title: Some("Rapid Test".to_string()),
463 description: None,
464 tags: HashSet::new(),
465 };
466
467 let manager = AutoSaveManager::new(storage, config, session_id, metadata);
468
469 for i in 0..20 {
471 manager
472 .add_message(Message::user(format!("Rapid {}", i)))
473 .await;
474 }
475
476 sleep(Duration::from_millis(500)).await;
478
479 let messages = manager.messages().await;
480 assert_eq!(messages.len(), 20, "All messages should be preserved");
481 }
482}