ruvswarm_persistence/
sqlite.rs

1//! SQLite backend implementation for native platforms
2
3use crate::{models::*, Storage, StorageError, Transaction as TransactionTrait};
4use async_trait::async_trait;
5use parking_lot::Mutex;
6use r2d2::{Pool, PooledConnection};
7use r2d2_sqlite::SqliteConnectionManager;
8use rusqlite::{params, OptionalExtension};
9use std::sync::Arc;
10use std::time::Duration;
11use tracing::{debug, info};
12
13#[cfg(test)]
14use num_cpus;
15
16type SqlitePool = Pool<SqliteConnectionManager>;
17type SqliteConn = PooledConnection<SqliteConnectionManager>;
18
19/// SQLite storage implementation
20pub struct SqliteStorage {
21    pool: Arc<SqlitePool>,
22    path: String,
23}
24
25impl SqliteStorage {
26    /// Create new SQLite storage instance
27    pub async fn new(path: &str) -> Result<Self, StorageError> {
28        let manager = SqliteConnectionManager::file(path);
29
30        // Use larger pool size for tests to handle concurrent operations
31        #[cfg(test)]
32        let pool_size = (4 * num_cpus::get()).min(100) as u32;
33        #[cfg(not(test))]
34        let pool_size = 16;
35        
36        // Shorter timeout for tests
37        #[cfg(test)]
38        let connection_timeout = Duration::from_secs(5);
39        #[cfg(not(test))]
40        let connection_timeout = Duration::from_secs(30);
41
42        let pool = Pool::builder()
43            .max_size(pool_size)
44            .min_idle(Some(2))
45            .connection_timeout(connection_timeout)
46            .idle_timeout(Some(Duration::from_secs(300)))
47            .build(manager)
48            .map_err(|e| StorageError::Pool(e.to_string()))?;
49
50        let storage = Self {
51            pool: Arc::new(pool),
52            path: path.to_string(),
53        };
54
55        // Initialize schema using proper migration system
56        storage.init_schema_with_migrations().await?;
57        
58        // Configure SQLite settings after schema initialization
59        storage.configure_sqlite().await?;
60
61        info!("SQLite storage initialized at: {}", path);
62        Ok(storage)
63    }
64    
65    /// Create SQLite storage from an existing pool (for testing)
66    #[cfg(test)]
67    pub async fn from_pool(pool: SqlitePool) -> Result<Self, StorageError> {
68        let storage = Self {
69            pool: Arc::new(pool),
70            path: ":memory:".to_string(),
71        };
72        
73        // Schema and configuration should already be done by caller
74        Ok(storage)
75    }
76
77    /// Get connection from pool
78    fn get_conn(&self) -> Result<SqliteConn, StorageError> {
79        self.pool
80            .get()
81            .map_err(|e| StorageError::Pool(e.to_string()))
82    }
83    
84    /// Get connection from pool (for testing)
85    #[cfg(test)]
86    pub fn get_conn_test(&self) -> Result<SqliteConn, StorageError> {
87        self.get_conn()
88    }
89    
90    /// Execute a database operation with retry logic for handling locks
91    async fn with_retry<F, T>(&self, operation: F) -> Result<T, StorageError>
92    where
93        F: Fn(&SqliteConn) -> Result<T, rusqlite::Error> + Send,
94        T: Send,
95    {
96        const MAX_RETRIES: u32 = 10; // Increased retries
97        const BASE_DELAY_MS: u64 = 5; // Shorter base delay
98        
99        let mut retries = 0;
100        loop {
101            // (a) get a pooled connection and (b) run the closure in a short scope so conn drops immediately
102            let result = {
103                let conn = self.get_conn()?;
104                operation(&conn)
105            };
106            
107            match result {
108                Ok(result) => return Ok(result),
109                Err(e) => {
110                    let err_str = e.to_string();
111                    if (err_str.contains("database is locked") 
112                        || err_str.contains("database table is locked")
113                        || err_str.contains("SQLITE_BUSY")) 
114                        && retries < MAX_RETRIES {
115                        retries += 1;
116                        // Use randomized exponential backoff with jitter
117                        let base_delay = BASE_DELAY_MS * (1 << retries.min(5)); // Cap at 32x base
118                        let jitter = fastrand::u64(0..base_delay / 2); // Add up to 50% jitter
119                        let delay = base_delay + jitter;
120                        debug!("Database locked, retry {} of {} with {}ms delay", retries, MAX_RETRIES, delay);
121                        tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
122                        continue;
123                    }
124                    return Err(StorageError::Database(err_str));
125                }
126            }
127        }
128    }
129    
130    /// Execute a blocking database operation using spawn_blocking to prevent thread pool saturation
131    async fn exec_blocking<F, R>(&self, operation: F) -> Result<R, StorageError>
132    where
133        F: FnOnce(&SqliteConn) -> Result<R, rusqlite::Error> + Send + 'static,
134        R: Send + 'static,
135    {
136        let pool = self.pool.clone();
137        tokio::task::spawn_blocking(move || {
138            let conn = pool.get().map_err(|e| StorageError::Pool(e.to_string()))?;
139            operation(&conn).map_err(|e| StorageError::Database(e.to_string()))
140        })
141        .await
142        .map_err(|e| StorageError::Other(format!("Join error: {}", e)))?
143    }
144    
145    /// Execute a blocking database operation with retry logic
146    async fn exec_blocking_with_retry<F, R>(&self, operation: F) -> Result<R, StorageError>
147    where
148        F: Fn(&SqliteConn) -> Result<R, rusqlite::Error> + Send + Clone + 'static,
149        R: Send + 'static,
150    {
151        const MAX_RETRIES: u32 = 10;
152        const BASE_DELAY_MS: u64 = 5;
153        
154        let mut retries = 0;
155        loop {
156            let result = {
157                let pool = self.pool.clone();
158                let op = operation.clone();
159                tokio::task::spawn_blocking(move || {
160                    let conn = pool.get().map_err(|e| StorageError::Pool(e.to_string()))?;
161                    op(&conn).map_err(|e| StorageError::Database(e.to_string()))
162                })
163                .await
164                .map_err(|e| StorageError::Other(format!("Join error: {}", e)))?
165            };
166            
167            match result {
168                Ok(result) => return Ok(result),
169                Err(StorageError::Database(err_str)) => {
170                    if (err_str.contains("database is locked") 
171                        || err_str.contains("database table is locked")
172                        || err_str.contains("SQLITE_BUSY")) 
173                        && retries < MAX_RETRIES {
174                        retries += 1;
175                        let base_delay = BASE_DELAY_MS * (1 << retries.min(5));
176                        let jitter = fastrand::u64(0..base_delay / 2);
177                        let delay = base_delay + jitter;
178                        debug!("Database locked, retry {} of {} with {}ms delay", retries, MAX_RETRIES, delay);
179                        tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
180                        continue;
181                    }
182                    return Err(StorageError::Database(err_str));
183                }
184                Err(e) => return Err(e),
185            }
186        }
187    }
188
189    /// Initialize database schema using proper migration system
190    async fn init_schema_with_migrations(&self) -> Result<(), StorageError> {
191        self.exec_blocking(move |conn| {
192            let manager = crate::migrations::MigrationManager::new();
193            manager.migrate(conn).map_err(|e| {
194                match e {
195                    StorageError::Database(msg) => rusqlite::Error::SqliteFailure(
196                        rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_ERROR), 
197                        Some(msg)
198                    ),
199                    _ => rusqlite::Error::SqliteFailure(
200                        rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_ERROR), 
201                        Some(e.to_string())
202                    ),
203                }
204            })?;
205            debug!("Schema initialized via migrations");
206            Ok(())
207        }).await
208    }
209    
210    /// Legacy schema initialization (deprecated)
211    #[allow(dead_code)]
212    async fn init_schema(&self) -> Result<(), StorageError> {
213        let conn = self.get_conn()?;
214
215        conn.execute_batch(include_str!("../sql/schema.sql"))
216            .map_err(|e| StorageError::Migration(format!("Schema initialization failed: {}", e)))?;
217
218        Ok(())
219    }
220    
221    /// Configure SQLite settings after schema initialization
222    async fn configure_sqlite(&self) -> Result<(), StorageError> {
223        self.exec_blocking(move |conn| {
224            // Configure SQLite settings for better concurrency
225            conn.execute_batch(
226                r#"
227                PRAGMA journal_mode = WAL;
228                PRAGMA synchronous = NORMAL;
229                PRAGMA busy_timeout = 30000;
230                PRAGMA foreign_keys = ON;
231                PRAGMA wal_autocheckpoint = 1000;
232                PRAGMA temp_store = MEMORY;
233                PRAGMA mmap_size = 268435456;
234                "#
235            )?;
236                
237            debug!("SQLite configuration complete: WAL mode, busy_timeout=30s, optimized for concurrency");
238            Ok(())
239        }).await
240    }
241    
242    /// Helper to deserialize JSON data with proper error handling
243    fn deserialize_rows<T, I>(&self, rows: I) -> Result<Vec<T>, StorageError>
244    where
245        T: serde::de::DeserializeOwned,
246        I: Iterator<Item = Result<String, rusqlite::Error>>,
247    {
248        let mut results = Vec::new();
249        let mut errors = Vec::new();
250        
251        for (idx, row_result) in rows.enumerate() {
252            match row_result {
253                Ok(json) => {
254                    match serde_json::from_str::<T>(&json) {
255                        Ok(item) => results.push(item),
256                        Err(e) => {
257                            errors.push(format!("Row {}: JSON parse error: {}", idx, e));
258                            debug!("Failed to parse JSON at row {}: {}", idx, e);
259                        }
260                    }
261                }
262                Err(e) => {
263                    errors.push(format!("Row {}: Database error: {}", idx, e));
264                    debug!("Failed to read row {}: {}", idx, e);
265                }
266            }
267        }
268        
269        // If we have any errors, log them but still return successful results
270        if !errors.is_empty() {
271            debug!("Encountered {} errors during deserialization", errors.len());
272            // In production, you might want to return an error if error rate is too high
273            // For now, we'll return partial results with logging
274        }
275        
276        Ok(results)
277    }
278}
279
280#[async_trait]
281impl Storage for SqliteStorage {
282    type Error = StorageError;
283
284    // Agent operations
285    async fn store_agent(&self, agent: &AgentModel) -> Result<(), Self::Error> {
286        let json = serde_json::to_string(agent)?;
287        let capabilities_json = serde_json::to_string(&agent.capabilities)?;
288        let metadata_json = serde_json::to_string(&agent.metadata)?;
289        
290        let agent_id = agent.id.clone();
291        let agent_name = agent.name.clone();
292        let agent_type = agent.agent_type.clone();
293        let status = agent.status.to_string();
294        let heartbeat = agent.heartbeat.timestamp();
295        let created_at = agent.created_at.timestamp();
296        let updated_at = agent.updated_at.timestamp();
297
298        self.exec_blocking_with_retry(move |conn| {
299            conn.execute(
300                "INSERT INTO agents (id, name, agent_type, status, capabilities, metadata, heartbeat, created_at, updated_at, data) 
301                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
302                params![
303                    &agent_id,
304                    &agent_name,
305                    &agent_type,
306                    &status,
307                    &capabilities_json,
308                    &metadata_json,
309                    heartbeat,
310                    created_at,
311                    updated_at,
312                    &json
313                ],
314            )
315        }).await?;
316
317        debug!("Stored agent: {}", agent.id);
318        Ok(())
319    }
320
321    async fn get_agent(&self, id: &str) -> Result<Option<AgentModel>, Self::Error> {
322        let id = id.to_string();
323        let result = self.exec_blocking(move |conn| {
324            conn.query_row(
325                "SELECT data FROM agents WHERE id = ?1",
326                params![id],
327                |row| row.get::<_, String>(0),
328            )
329            .optional()
330        }).await?;
331
332        match result {
333            Some(json) => Ok(Some(serde_json::from_str(&json)?)),
334            None => Ok(None),
335        }
336    }
337
338    async fn update_agent(&self, agent: &AgentModel) -> Result<(), Self::Error> {
339        let json = serde_json::to_string(agent)?;
340        let capabilities_json = serde_json::to_string(&agent.capabilities)?;
341        let metadata_json = serde_json::to_string(&agent.metadata)?;
342        
343        let agent_id = agent.id.clone();
344        let agent_name = agent.name.clone();
345        let agent_type = agent.agent_type.clone();
346        let status = agent.status.to_string();
347        let heartbeat = agent.heartbeat.timestamp();
348        let updated_at = agent.updated_at.timestamp();
349
350        let rows = self.exec_blocking_with_retry(move |conn| {
351            conn.execute(
352                "UPDATE agents 
353             SET name = ?2, agent_type = ?3, status = ?4, capabilities = ?5, 
354                 metadata = ?6, heartbeat = ?7, updated_at = ?8, data = ?9
355             WHERE id = ?1",
356                params![
357                    &agent_id,
358                    &agent_name,
359                    &agent_type,
360                    &status,
361                    &capabilities_json,
362                    &metadata_json,
363                    heartbeat,
364                    updated_at,
365                    &json
366                ],
367            )
368        }).await?;
369
370        if rows == 0 {
371            return Err(StorageError::NotFound(format!(
372                "Agent {} not found",
373                agent.id
374            )));
375        }
376
377        debug!("Updated agent: {}", agent.id);
378        Ok(())
379    }
380
381    async fn delete_agent(&self, id: &str) -> Result<(), Self::Error> {
382        let id = id.to_string();
383        let id_for_debug = id.clone();
384        let rows = self.exec_blocking_with_retry(move |conn| {
385            conn.execute("DELETE FROM agents WHERE id = ?1", params![&id])
386        }).await?;
387
388        if rows > 0 {
389            debug!("Deleted agent: {}", id_for_debug);
390        } else {
391            debug!("Agent {} not found, delete is idempotent", id_for_debug);
392        }
393        Ok(())
394    }
395
396    async fn list_agents(&self) -> Result<Vec<AgentModel>, Self::Error> {
397        let json_results = self.exec_blocking(move |conn| {
398            let mut stmt = conn
399                .prepare("SELECT data FROM agents ORDER BY created_at DESC")?;
400
401            let agents: Result<Vec<String>, _> = stmt
402                .query_map([], |row| row.get::<_, String>(0))?
403                .collect();
404            
405            agents
406        }).await?;
407
408        let agents = json_results
409            .into_iter()
410            .filter_map(|json| serde_json::from_str(&json).ok())
411            .collect();
412
413        Ok(agents)
414    }
415
416    async fn list_agents_by_status(&self, status: &str) -> Result<Vec<AgentModel>, Self::Error> {
417        let conn = self.get_conn()?;
418
419        let mut stmt = conn
420            .prepare("SELECT data FROM agents WHERE status = ?1 ORDER BY created_at DESC")
421            .map_err(|e| StorageError::Database(e.to_string()))?;
422
423        let rows = stmt
424            .query_map(params![status], |row| row.get::<_, String>(0))
425            .map_err(|e| StorageError::Database(e.to_string()))?;
426
427        self.deserialize_rows(rows)
428    }
429
430    // Task operations
431    async fn store_task(&self, task: &TaskModel) -> Result<(), Self::Error> {
432        let conn = self.get_conn()?;
433        let json = serde_json::to_string(task)?;
434
435        conn.execute(
436            "INSERT INTO tasks (id, task_type, priority, status, assigned_to, payload, 
437                                created_at, updated_at, data) 
438             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
439            params![
440                task.id,
441                task.task_type,
442                task.priority as i32,
443                serde_json::to_value(&task.status)?.as_str().unwrap(),
444                task.assigned_to,
445                serde_json::to_string(&task.payload)?,
446                task.created_at.timestamp(),
447                task.updated_at.timestamp(),
448                json
449            ],
450        )
451        .map_err(|e| StorageError::Database(e.to_string()))?;
452
453        debug!("Stored task: {}", task.id);
454        Ok(())
455    }
456
457    async fn get_task(&self, id: &str) -> Result<Option<TaskModel>, Self::Error> {
458        let conn = self.get_conn()?;
459
460        let result = conn
461            .query_row("SELECT data FROM tasks WHERE id = ?1", params![id], |row| {
462                row.get::<_, String>(0)
463            })
464            .optional()
465            .map_err(|e| StorageError::Database(e.to_string()))?;
466
467        match result {
468            Some(json) => Ok(Some(serde_json::from_str(&json)?)),
469            None => Ok(None),
470        }
471    }
472
473    async fn update_task(&self, task: &TaskModel) -> Result<(), Self::Error> {
474        let conn = self.get_conn()?;
475        let json = serde_json::to_string(task)?;
476
477        let rows = conn
478            .execute(
479                "UPDATE tasks 
480             SET task_type = ?2, priority = ?3, status = ?4, assigned_to = ?5, 
481                 payload = ?6, updated_at = ?7, data = ?8
482             WHERE id = ?1",
483                params![
484                    task.id,
485                    task.task_type,
486                    task.priority as i32,
487                    serde_json::to_value(&task.status)?.as_str().unwrap(),
488                    task.assigned_to,
489                    serde_json::to_string(&task.payload)?,
490                    task.updated_at.timestamp(),
491                    json
492                ],
493            )
494            .map_err(|e| StorageError::Database(e.to_string()))?;
495
496        if rows == 0 {
497            return Err(StorageError::NotFound(format!(
498                "Task {} not found",
499                task.id
500            )));
501        }
502
503        debug!("Updated task: {}", task.id);
504        Ok(())
505    }
506
507    async fn get_pending_tasks(&self) -> Result<Vec<TaskModel>, Self::Error> {
508        let conn = self.get_conn()?;
509
510        let mut stmt = conn
511            .prepare(
512                "SELECT data FROM tasks 
513                 WHERE status = 'pending' 
514                 ORDER BY priority DESC, created_at ASC",
515            )
516            .map_err(|e| StorageError::Database(e.to_string()))?;
517
518        let tasks = stmt
519            .query_map([], |row| row.get::<_, String>(0))
520            .map_err(|e| StorageError::Database(e.to_string()))?
521            .filter_map(|r| r.ok())
522            .filter_map(|json| serde_json::from_str(&json).ok())
523            .collect();
524
525        Ok(tasks)
526    }
527
528    async fn get_tasks_by_agent(&self, agent_id: &str) -> Result<Vec<TaskModel>, Self::Error> {
529        let conn = self.get_conn()?;
530
531        let mut stmt = conn
532            .prepare(
533                "SELECT data FROM tasks 
534                 WHERE assigned_to = ?1 
535                 ORDER BY priority DESC, created_at ASC",
536            )
537            .map_err(|e| StorageError::Database(e.to_string()))?;
538
539        let tasks = stmt
540            .query_map(params![agent_id], |row| row.get::<_, String>(0))
541            .map_err(|e| StorageError::Database(e.to_string()))?
542            .filter_map(|r| r.ok())
543            .filter_map(|json| serde_json::from_str(&json).ok())
544            .collect();
545
546        Ok(tasks)
547    }
548
549    async fn claim_task(&self, task_id: &str, agent_id: &str) -> Result<bool, Self::Error> {
550        let task_id = task_id.to_owned();
551        let agent_id = agent_id.to_owned();
552        
553        self.with_retry(move |conn| {
554            let timestamp = chrono::Utc::now().timestamp();
555            conn.execute(
556                "UPDATE tasks 
557                 SET assigned_to = ?2, status = 'assigned', updated_at = ?3 
558                 WHERE id = ?1 AND status = 'pending'",
559                params![&task_id, &agent_id, timestamp],
560            )
561        }).await.map(|rows| rows > 0)
562    }
563
564    // Event operations
565    async fn store_event(&self, event: &EventModel) -> Result<(), Self::Error> {
566        let conn = self.get_conn()?;
567        let json = serde_json::to_string(event)?;
568
569        conn.execute(
570            "INSERT INTO events (id, event_type, agent_id, task_id, payload, metadata, 
571                                 timestamp, sequence, data) 
572             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
573            params![
574                event.id,
575                event.event_type,
576                event.agent_id,
577                event.task_id,
578                serde_json::to_string(&event.payload)?,
579                serde_json::to_string(&event.metadata)?,
580                event.timestamp.timestamp(),
581                event.sequence as i64,
582                json
583            ],
584        )
585        .map_err(|e| StorageError::Database(e.to_string()))?;
586
587        debug!("Stored event: {}", event.id);
588        Ok(())
589    }
590
591    async fn get_events_by_agent(
592        &self,
593        agent_id: &str,
594        limit: usize,
595    ) -> Result<Vec<EventModel>, Self::Error> {
596        let conn = self.get_conn()?;
597
598        let mut stmt = conn
599            .prepare(
600                "SELECT data FROM events 
601                 WHERE agent_id = ?1 
602                 ORDER BY timestamp DESC, id DESC 
603                 LIMIT ?2",
604            )
605            .map_err(|e| StorageError::Database(e.to_string()))?;
606
607        let events = stmt
608            .query_map(params![agent_id, limit], |row| row.get::<_, String>(0))
609            .map_err(|e| StorageError::Database(e.to_string()))?
610            .filter_map(|r| r.ok())
611            .filter_map(|json| serde_json::from_str(&json).ok())
612            .collect();
613
614        Ok(events)
615    }
616
617    async fn get_events_by_type(
618        &self,
619        event_type: &str,
620        limit: usize,
621    ) -> Result<Vec<EventModel>, Self::Error> {
622        let conn = self.get_conn()?;
623
624        let mut stmt = conn
625            .prepare(
626                "SELECT data FROM events 
627                 WHERE event_type = ?1 
628                 ORDER BY timestamp DESC, id DESC 
629                 LIMIT ?2",
630            )
631            .map_err(|e| StorageError::Database(e.to_string()))?;
632
633        let events = stmt
634            .query_map(params![event_type, limit], |row| {
635                row.get::<_, String>(0)
636            })
637            .map_err(|e| StorageError::Database(e.to_string()))?
638            .filter_map(|r| r.ok())
639            .filter_map(|json| serde_json::from_str(&json).ok())
640            .collect();
641
642        Ok(events)
643    }
644
645    async fn get_events_since(&self, timestamp: i64) -> Result<Vec<EventModel>, Self::Error> {
646        let conn = self.get_conn()?;
647
648        let mut stmt = conn
649            .prepare(
650                "SELECT data FROM events 
651                 WHERE timestamp >= ?1 
652                 ORDER BY timestamp ASC, id ASC",
653            )
654            .map_err(|e| StorageError::Database(e.to_string()))?;
655
656        let events = stmt
657            .query_map(params![timestamp], |row| row.get::<_, String>(0))
658            .map_err(|e| StorageError::Database(e.to_string()))?
659            .filter_map(|r| r.ok())
660            .filter_map(|json| serde_json::from_str(&json).ok())
661            .collect();
662
663        Ok(events)
664    }
665
666    // Message operations
667    async fn store_message(&self, message: &MessageModel) -> Result<(), Self::Error> {
668        let conn = self.get_conn()?;
669        let json = serde_json::to_string(message)?;
670
671        conn.execute(
672            "INSERT INTO messages (id, from_agent, to_agent, message_type, content, 
673                                   priority, read, created_at, data) 
674             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
675            params![
676                message.id,
677                message.from_agent,
678                message.to_agent,
679                message.message_type,
680                serde_json::to_string(&message.content)?,
681                serde_json::to_string(&message.priority)?,
682                message.read as i32,
683                message.created_at.timestamp(),
684                json
685            ],
686        )
687        .map_err(|e| StorageError::Database(e.to_string()))?;
688
689        debug!("Stored message: {}", message.id);
690        Ok(())
691    }
692
693    async fn get_messages_between(
694        &self,
695        agent1: &str,
696        agent2: &str,
697        limit: usize,
698    ) -> Result<Vec<MessageModel>, Self::Error> {
699        let conn = self.get_conn()?;
700
701        let mut stmt = conn
702            .prepare(
703                "SELECT data FROM messages 
704                 WHERE (from_agent = ?1 AND to_agent = ?2) OR (from_agent = ?2 AND to_agent = ?1) 
705                 ORDER BY created_at DESC 
706                 LIMIT ?3",
707            )
708            .map_err(|e| StorageError::Database(e.to_string()))?;
709
710        let messages = stmt
711            .query_map(params![agent1, agent2, limit], |row| {
712                row.get::<_, String>(0)
713            })
714            .map_err(|e| StorageError::Database(e.to_string()))?
715            .filter_map(|r| r.ok())
716            .filter_map(|json| serde_json::from_str(&json).ok())
717            .collect();
718
719        Ok(messages)
720    }
721
722    async fn get_unread_messages(&self, agent_id: &str) -> Result<Vec<MessageModel>, Self::Error> {
723        let conn = self.get_conn()?;
724
725        let mut stmt = conn
726            .prepare(
727                "SELECT data FROM messages 
728                 WHERE to_agent = ?1 AND read = 0 
729                 ORDER BY created_at ASC",
730            )
731            .map_err(|e| StorageError::Database(e.to_string()))?;
732
733        let messages = stmt
734            .query_map(params![agent_id], |row| row.get::<_, String>(0))
735            .map_err(|e| StorageError::Database(e.to_string()))?
736            .filter_map(|r| r.ok())
737            .filter_map(|json| serde_json::from_str(&json).ok())
738            .collect();
739
740        Ok(messages)
741    }
742
743    async fn mark_message_read(&self, message_id: &str) -> Result<(), Self::Error> {
744        let conn = self.get_conn()?;
745
746        let rows = conn
747            .execute(
748                "UPDATE messages SET read = 1, read_at = ?2 WHERE id = ?1",
749                params![message_id, chrono::Utc::now().timestamp()],
750            )
751            .map_err(|e| StorageError::Database(e.to_string()))?;
752
753        if rows == 0 {
754            return Err(StorageError::NotFound(format!(
755                "Message {} not found",
756                message_id
757            )));
758        }
759
760        Ok(())
761    }
762
763    // Metric operations
764    async fn store_metric(&self, metric: &MetricModel) -> Result<(), Self::Error> {
765        let conn = self.get_conn()?;
766        let json = serde_json::to_string(metric)?;
767
768        conn.execute(
769            "INSERT INTO metrics (id, metric_type, agent_id, value, unit, tags, timestamp, data) 
770             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
771            params![
772                metric.id,
773                metric.metric_type,
774                metric.agent_id,
775                metric.value,
776                metric.unit,
777                serde_json::to_string(&metric.tags)?,
778                metric.timestamp.timestamp(),
779                json
780            ],
781        )
782        .map_err(|e| StorageError::Database(e.to_string()))?;
783
784        debug!("Stored metric: {}", metric.id);
785        Ok(())
786    }
787
788    async fn get_metrics_by_agent(
789        &self,
790        agent_id: &str,
791        metric_type: &str,
792    ) -> Result<Vec<MetricModel>, Self::Error> {
793        let conn = self.get_conn()?;
794
795        let mut stmt = conn
796            .prepare(
797                "SELECT data FROM metrics 
798                 WHERE agent_id = ?1 AND metric_type = ?2 
799                 ORDER BY timestamp DESC, id DESC",
800            )
801            .map_err(|e| StorageError::Database(e.to_string()))?;
802
803        let metrics = stmt
804            .query_map(params![agent_id, metric_type], |row| {
805                row.get::<_, String>(0)
806            })
807            .map_err(|e| StorageError::Database(e.to_string()))?
808            .filter_map(|r| r.ok())
809            .filter_map(|json| serde_json::from_str(&json).ok())
810            .collect();
811
812        Ok(metrics)
813    }
814
815    async fn get_aggregated_metrics(
816        &self,
817        metric_type: &str,
818        start_time: i64,
819        end_time: i64,
820    ) -> Result<Vec<MetricModel>, Self::Error> {
821        let conn = self.get_conn()?;
822
823        let mut stmt = conn
824            .prepare(
825                "SELECT metric_type, agent_id, AVG(value) as value, unit, 
826                        MIN(timestamp) as timestamp, COUNT(*) as count
827                 FROM metrics 
828                 WHERE metric_type = ?1 AND timestamp >= ?2 AND timestamp <= ?3 
829                 GROUP BY metric_type, agent_id",
830            )
831            .map_err(|e| StorageError::Database(e.to_string()))?;
832
833        let metrics = stmt
834            .query_map(params![metric_type, start_time, end_time], |row| {
835                let mut metric = MetricModel::new(
836                    row.get::<_, String>(0)?,
837                    row.get::<_, f64>(2)?,
838                    row.get::<_, String>(3)?,
839                );
840                metric.agent_id = row.get::<_, Option<String>>(1)?;
841                metric
842                    .tags
843                    .insert("count".to_string(), row.get::<_, i64>(5)?.to_string());
844                Ok(metric)
845            })
846            .map_err(|e| StorageError::Database(e.to_string()))?
847            .filter_map(|r| r.ok())
848            .collect();
849
850        Ok(metrics)
851    }
852
853    // Transaction support
854    async fn begin_transaction(&self) -> Result<Box<dyn TransactionTrait>, Self::Error> {
855        let conn = self.get_conn()?;
856        let transaction = SqliteTransaction::new(conn)?;
857        Ok(Box::new(transaction))
858    }
859
860    // Maintenance operations
861    async fn vacuum(&self) -> Result<(), Self::Error> {
862        let conn = self.get_conn()?;
863        conn.execute("VACUUM", [])
864            .map_err(|e| StorageError::Database(e.to_string()))?;
865        info!("Database vacuumed");
866        Ok(())
867    }
868
869    async fn checkpoint(&self) -> Result<(), Self::Error> {
870        // Skip checkpoints during testing to avoid blocking
871        #[cfg(test)]
872        {
873            debug!("Skipping checkpoint in test mode");
874            return Ok(());
875        }
876        
877        #[cfg(not(test))]
878        {
879            self.exec_blocking(move |conn| {
880                conn.execute("PRAGMA wal_checkpoint(TRUNCATE)", [])?;
881                info!("Database checkpoint completed");
882                Ok(())
883            }).await
884        }
885    }
886
887    async fn get_storage_size(&self) -> Result<u64, Self::Error> {
888        let metadata =
889            std::fs::metadata(&self.path).map_err(|e| StorageError::Other(e.to_string()))?;
890        Ok(metadata.len())
891    }
892}
893
894/// SQLite transaction wrapper with real ACID guarantees
895struct SqliteTransaction {
896    conn: Mutex<Option<SqliteConn>>,
897    committed: Arc<Mutex<bool>>,
898}
899
900impl SqliteTransaction {
901    fn new(conn: SqliteConn) -> Result<Self, StorageError> {
902        // Use DEFERRED mode to avoid holding write locks until actually needed
903        // This prevents convoy effects and thread pool saturation under high concurrency
904        conn.execute("BEGIN DEFERRED", [])
905            .map_err(|e| {
906                debug!("Failed to begin transaction: {}", e);
907                StorageError::Transaction(format!("Failed to begin transaction: {}", e))
908            })?;
909            
910        Ok(Self {
911            conn: Mutex::new(Some(conn)),
912            committed: Arc::new(Mutex::new(false)),
913        })
914    }
915    
916    /// Execute an operation within this transaction
917    fn execute_in_transaction<F, T>(&self, operation: F) -> Result<T, StorageError>
918    where
919        F: FnOnce(&SqliteConn) -> Result<T, rusqlite::Error>,
920    {
921        let conn_guard = self.conn.lock();
922        if let Some(conn) = conn_guard.as_ref() {
923            operation(conn).map_err(|e| StorageError::Database(e.to_string()))
924        } else {
925            Err(StorageError::Transaction("Transaction already consumed".to_string()))
926        }
927    }
928}
929
930impl Drop for SqliteTransaction {
931    fn drop(&mut self) {
932        let committed = self.committed.lock();
933        if !*committed {
934            // Automatically rollback if not committed
935            if let Some(conn) = self.conn.lock().take() {
936                let _ = conn.execute("ROLLBACK", []);
937                debug!("Transaction automatically rolled back on drop");
938            }
939        }
940    }
941}
942
943#[async_trait]
944impl TransactionTrait for SqliteTransaction {
945    async fn commit(self: Box<Self>) -> Result<(), StorageError> {
946        let mut committed = self.committed.lock();
947        if *committed {
948            return Err(StorageError::Transaction("Transaction already committed".to_string()));
949        }
950        
951        if let Some(conn) = self.conn.lock().take() {
952            conn.execute("COMMIT", [])
953                .map_err(|e| StorageError::Transaction(format!("Failed to commit transaction: {}", e)))?;
954            *committed = true;
955            drop(conn);
956            Ok(())
957        } else {
958            Err(StorageError::Transaction("Transaction already consumed".to_string()))
959        }
960    }
961
962    async fn rollback(self: Box<Self>) -> Result<(), StorageError> {
963        let committed = self.committed.lock();
964        if *committed {
965            return Err(StorageError::Transaction("Cannot rollback committed transaction".to_string()));
966        }
967        drop(committed);
968        
969        if let Some(conn) = self.conn.lock().take() {
970            conn.execute("ROLLBACK", [])
971                .map_err(|e| StorageError::Transaction(format!("Failed to rollback transaction: {}", e)))?;
972            drop(conn);
973            Ok(())
974        } else {
975            Err(StorageError::Transaction("Transaction already consumed".to_string()))
976        }
977    }
978}
979
980#[cfg(test)]
981mod tests {
982    use super::*;
983    use tempfile::NamedTempFile;
984
985    #[tokio::test]
986    async fn test_sqlite_storage() {
987        let temp_file = NamedTempFile::new().unwrap();
988        let storage = SqliteStorage::new(temp_file.path().to_str().unwrap())
989            .await
990            .unwrap();
991
992        // Test agent operations
993        let agent = AgentModel::new(
994            "test-agent".to_string(),
995            "worker".to_string(),
996            vec!["compute".to_string()],
997        );
998
999        storage.store_agent(&agent).await.unwrap();
1000        let retrieved = storage.get_agent(&agent.id).await.unwrap();
1001        assert!(retrieved.is_some());
1002        assert_eq!(retrieved.unwrap().name, "test-agent");
1003
1004        // Test task operations
1005        let task = TaskModel::new(
1006            "process".to_string(),
1007            serde_json::json!({"data": "test"}),
1008            TaskPriority::High,
1009        );
1010
1011        storage.store_task(&task).await.unwrap();
1012        let pending = storage.get_pending_tasks().await.unwrap();
1013        assert_eq!(pending.len(), 1);
1014    }
1015}