1use crate::{models::*, Storage, StorageError, Transaction as TransactionTrait};
4use async_trait::async_trait;
5use parking_lot::Mutex;
6use r2d2::{Pool, PooledConnection};
7use r2d2_sqlite::SqliteConnectionManager;
8use rusqlite::{params, OptionalExtension};
9use std::sync::Arc;
10use std::time::Duration;
11use tracing::{debug, info};
12
13#[cfg(test)]
14use num_cpus;
15
16type SqlitePool = Pool<SqliteConnectionManager>;
17type SqliteConn = PooledConnection<SqliteConnectionManager>;
18
19pub struct SqliteStorage {
21 pool: Arc<SqlitePool>,
22 path: String,
23}
24
25impl SqliteStorage {
26 pub async fn new(path: &str) -> Result<Self, StorageError> {
28 let manager = SqliteConnectionManager::file(path);
29
30 #[cfg(test)]
32 let pool_size = (4 * num_cpus::get()).min(100) as u32;
33 #[cfg(not(test))]
34 let pool_size = 16;
35
36 #[cfg(test)]
38 let connection_timeout = Duration::from_secs(5);
39 #[cfg(not(test))]
40 let connection_timeout = Duration::from_secs(30);
41
42 let pool = Pool::builder()
43 .max_size(pool_size)
44 .min_idle(Some(2))
45 .connection_timeout(connection_timeout)
46 .idle_timeout(Some(Duration::from_secs(300)))
47 .build(manager)
48 .map_err(|e| StorageError::Pool(e.to_string()))?;
49
50 let storage = Self {
51 pool: Arc::new(pool),
52 path: path.to_string(),
53 };
54
55 storage.init_schema_with_migrations().await?;
57
58 storage.configure_sqlite().await?;
60
61 info!("SQLite storage initialized at: {}", path);
62 Ok(storage)
63 }
64
65 #[cfg(test)]
67 pub async fn from_pool(pool: SqlitePool) -> Result<Self, StorageError> {
68 let storage = Self {
69 pool: Arc::new(pool),
70 path: ":memory:".to_string(),
71 };
72
73 Ok(storage)
75 }
76
77 fn get_conn(&self) -> Result<SqliteConn, StorageError> {
79 self.pool
80 .get()
81 .map_err(|e| StorageError::Pool(e.to_string()))
82 }
83
84 #[cfg(test)]
86 pub fn get_conn_test(&self) -> Result<SqliteConn, StorageError> {
87 self.get_conn()
88 }
89
90 async fn with_retry<F, T>(&self, operation: F) -> Result<T, StorageError>
92 where
93 F: Fn(&SqliteConn) -> Result<T, rusqlite::Error> + Send,
94 T: Send,
95 {
96 const MAX_RETRIES: u32 = 10; const BASE_DELAY_MS: u64 = 5; let mut retries = 0;
100 loop {
101 let result = {
103 let conn = self.get_conn()?;
104 operation(&conn)
105 };
106
107 match result {
108 Ok(result) => return Ok(result),
109 Err(e) => {
110 let err_str = e.to_string();
111 if (err_str.contains("database is locked")
112 || err_str.contains("database table is locked")
113 || err_str.contains("SQLITE_BUSY"))
114 && retries < MAX_RETRIES {
115 retries += 1;
116 let base_delay = BASE_DELAY_MS * (1 << retries.min(5)); let jitter = fastrand::u64(0..base_delay / 2); let delay = base_delay + jitter;
120 debug!("Database locked, retry {} of {} with {}ms delay", retries, MAX_RETRIES, delay);
121 tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
122 continue;
123 }
124 return Err(StorageError::Database(err_str));
125 }
126 }
127 }
128 }
129
130 async fn exec_blocking<F, R>(&self, operation: F) -> Result<R, StorageError>
132 where
133 F: FnOnce(&SqliteConn) -> Result<R, rusqlite::Error> + Send + 'static,
134 R: Send + 'static,
135 {
136 let pool = self.pool.clone();
137 tokio::task::spawn_blocking(move || {
138 let conn = pool.get().map_err(|e| StorageError::Pool(e.to_string()))?;
139 operation(&conn).map_err(|e| StorageError::Database(e.to_string()))
140 })
141 .await
142 .map_err(|e| StorageError::Other(format!("Join error: {}", e)))?
143 }
144
145 async fn exec_blocking_with_retry<F, R>(&self, operation: F) -> Result<R, StorageError>
147 where
148 F: Fn(&SqliteConn) -> Result<R, rusqlite::Error> + Send + Clone + 'static,
149 R: Send + 'static,
150 {
151 const MAX_RETRIES: u32 = 10;
152 const BASE_DELAY_MS: u64 = 5;
153
154 let mut retries = 0;
155 loop {
156 let result = {
157 let pool = self.pool.clone();
158 let op = operation.clone();
159 tokio::task::spawn_blocking(move || {
160 let conn = pool.get().map_err(|e| StorageError::Pool(e.to_string()))?;
161 op(&conn).map_err(|e| StorageError::Database(e.to_string()))
162 })
163 .await
164 .map_err(|e| StorageError::Other(format!("Join error: {}", e)))?
165 };
166
167 match result {
168 Ok(result) => return Ok(result),
169 Err(StorageError::Database(err_str)) => {
170 if (err_str.contains("database is locked")
171 || err_str.contains("database table is locked")
172 || err_str.contains("SQLITE_BUSY"))
173 && retries < MAX_RETRIES {
174 retries += 1;
175 let base_delay = BASE_DELAY_MS * (1 << retries.min(5));
176 let jitter = fastrand::u64(0..base_delay / 2);
177 let delay = base_delay + jitter;
178 debug!("Database locked, retry {} of {} with {}ms delay", retries, MAX_RETRIES, delay);
179 tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
180 continue;
181 }
182 return Err(StorageError::Database(err_str));
183 }
184 Err(e) => return Err(e),
185 }
186 }
187 }
188
189 async fn init_schema_with_migrations(&self) -> Result<(), StorageError> {
191 self.exec_blocking(move |conn| {
192 let manager = crate::migrations::MigrationManager::new();
193 manager.migrate(conn).map_err(|e| {
194 match e {
195 StorageError::Database(msg) => rusqlite::Error::SqliteFailure(
196 rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_ERROR),
197 Some(msg)
198 ),
199 _ => rusqlite::Error::SqliteFailure(
200 rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_ERROR),
201 Some(e.to_string())
202 ),
203 }
204 })?;
205 debug!("Schema initialized via migrations");
206 Ok(())
207 }).await
208 }
209
210 #[allow(dead_code)]
212 async fn init_schema(&self) -> Result<(), StorageError> {
213 let conn = self.get_conn()?;
214
215 conn.execute_batch(include_str!("../sql/schema.sql"))
216 .map_err(|e| StorageError::Migration(format!("Schema initialization failed: {}", e)))?;
217
218 Ok(())
219 }
220
221 async fn configure_sqlite(&self) -> Result<(), StorageError> {
223 self.exec_blocking(move |conn| {
224 conn.execute_batch(
226 r#"
227 PRAGMA journal_mode = WAL;
228 PRAGMA synchronous = NORMAL;
229 PRAGMA busy_timeout = 30000;
230 PRAGMA foreign_keys = ON;
231 PRAGMA wal_autocheckpoint = 1000;
232 PRAGMA temp_store = MEMORY;
233 PRAGMA mmap_size = 268435456;
234 "#
235 )?;
236
237 debug!("SQLite configuration complete: WAL mode, busy_timeout=30s, optimized for concurrency");
238 Ok(())
239 }).await
240 }
241
242 fn deserialize_rows<T, I>(&self, rows: I) -> Result<Vec<T>, StorageError>
244 where
245 T: serde::de::DeserializeOwned,
246 I: Iterator<Item = Result<String, rusqlite::Error>>,
247 {
248 let mut results = Vec::new();
249 let mut errors = Vec::new();
250
251 for (idx, row_result) in rows.enumerate() {
252 match row_result {
253 Ok(json) => {
254 match serde_json::from_str::<T>(&json) {
255 Ok(item) => results.push(item),
256 Err(e) => {
257 errors.push(format!("Row {}: JSON parse error: {}", idx, e));
258 debug!("Failed to parse JSON at row {}: {}", idx, e);
259 }
260 }
261 }
262 Err(e) => {
263 errors.push(format!("Row {}: Database error: {}", idx, e));
264 debug!("Failed to read row {}: {}", idx, e);
265 }
266 }
267 }
268
269 if !errors.is_empty() {
271 debug!("Encountered {} errors during deserialization", errors.len());
272 }
275
276 Ok(results)
277 }
278}
279
280#[async_trait]
281impl Storage for SqliteStorage {
282 type Error = StorageError;
283
284 async fn store_agent(&self, agent: &AgentModel) -> Result<(), Self::Error> {
286 let json = serde_json::to_string(agent)?;
287 let capabilities_json = serde_json::to_string(&agent.capabilities)?;
288 let metadata_json = serde_json::to_string(&agent.metadata)?;
289
290 let agent_id = agent.id.clone();
291 let agent_name = agent.name.clone();
292 let agent_type = agent.agent_type.clone();
293 let status = agent.status.to_string();
294 let heartbeat = agent.heartbeat.timestamp();
295 let created_at = agent.created_at.timestamp();
296 let updated_at = agent.updated_at.timestamp();
297
298 self.exec_blocking_with_retry(move |conn| {
299 conn.execute(
300 "INSERT INTO agents (id, name, agent_type, status, capabilities, metadata, heartbeat, created_at, updated_at, data)
301 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
302 params![
303 &agent_id,
304 &agent_name,
305 &agent_type,
306 &status,
307 &capabilities_json,
308 &metadata_json,
309 heartbeat,
310 created_at,
311 updated_at,
312 &json
313 ],
314 )
315 }).await?;
316
317 debug!("Stored agent: {}", agent.id);
318 Ok(())
319 }
320
321 async fn get_agent(&self, id: &str) -> Result<Option<AgentModel>, Self::Error> {
322 let id = id.to_string();
323 let result = self.exec_blocking(move |conn| {
324 conn.query_row(
325 "SELECT data FROM agents WHERE id = ?1",
326 params![id],
327 |row| row.get::<_, String>(0),
328 )
329 .optional()
330 }).await?;
331
332 match result {
333 Some(json) => Ok(Some(serde_json::from_str(&json)?)),
334 None => Ok(None),
335 }
336 }
337
338 async fn update_agent(&self, agent: &AgentModel) -> Result<(), Self::Error> {
339 let json = serde_json::to_string(agent)?;
340 let capabilities_json = serde_json::to_string(&agent.capabilities)?;
341 let metadata_json = serde_json::to_string(&agent.metadata)?;
342
343 let agent_id = agent.id.clone();
344 let agent_name = agent.name.clone();
345 let agent_type = agent.agent_type.clone();
346 let status = agent.status.to_string();
347 let heartbeat = agent.heartbeat.timestamp();
348 let updated_at = agent.updated_at.timestamp();
349
350 let rows = self.exec_blocking_with_retry(move |conn| {
351 conn.execute(
352 "UPDATE agents
353 SET name = ?2, agent_type = ?3, status = ?4, capabilities = ?5,
354 metadata = ?6, heartbeat = ?7, updated_at = ?8, data = ?9
355 WHERE id = ?1",
356 params![
357 &agent_id,
358 &agent_name,
359 &agent_type,
360 &status,
361 &capabilities_json,
362 &metadata_json,
363 heartbeat,
364 updated_at,
365 &json
366 ],
367 )
368 }).await?;
369
370 if rows == 0 {
371 return Err(StorageError::NotFound(format!(
372 "Agent {} not found",
373 agent.id
374 )));
375 }
376
377 debug!("Updated agent: {}", agent.id);
378 Ok(())
379 }
380
381 async fn delete_agent(&self, id: &str) -> Result<(), Self::Error> {
382 let id = id.to_string();
383 let id_for_debug = id.clone();
384 let rows = self.exec_blocking_with_retry(move |conn| {
385 conn.execute("DELETE FROM agents WHERE id = ?1", params![&id])
386 }).await?;
387
388 if rows > 0 {
389 debug!("Deleted agent: {}", id_for_debug);
390 } else {
391 debug!("Agent {} not found, delete is idempotent", id_for_debug);
392 }
393 Ok(())
394 }
395
396 async fn list_agents(&self) -> Result<Vec<AgentModel>, Self::Error> {
397 let json_results = self.exec_blocking(move |conn| {
398 let mut stmt = conn
399 .prepare("SELECT data FROM agents ORDER BY created_at DESC")?;
400
401 let agents: Result<Vec<String>, _> = stmt
402 .query_map([], |row| row.get::<_, String>(0))?
403 .collect();
404
405 agents
406 }).await?;
407
408 let agents = json_results
409 .into_iter()
410 .filter_map(|json| serde_json::from_str(&json).ok())
411 .collect();
412
413 Ok(agents)
414 }
415
416 async fn list_agents_by_status(&self, status: &str) -> Result<Vec<AgentModel>, Self::Error> {
417 let conn = self.get_conn()?;
418
419 let mut stmt = conn
420 .prepare("SELECT data FROM agents WHERE status = ?1 ORDER BY created_at DESC")
421 .map_err(|e| StorageError::Database(e.to_string()))?;
422
423 let rows = stmt
424 .query_map(params![status], |row| row.get::<_, String>(0))
425 .map_err(|e| StorageError::Database(e.to_string()))?;
426
427 self.deserialize_rows(rows)
428 }
429
430 async fn store_task(&self, task: &TaskModel) -> Result<(), Self::Error> {
432 let conn = self.get_conn()?;
433 let json = serde_json::to_string(task)?;
434
435 conn.execute(
436 "INSERT INTO tasks (id, task_type, priority, status, assigned_to, payload,
437 created_at, updated_at, data)
438 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
439 params![
440 task.id,
441 task.task_type,
442 task.priority as i32,
443 serde_json::to_value(&task.status)?.as_str().unwrap(),
444 task.assigned_to,
445 serde_json::to_string(&task.payload)?,
446 task.created_at.timestamp(),
447 task.updated_at.timestamp(),
448 json
449 ],
450 )
451 .map_err(|e| StorageError::Database(e.to_string()))?;
452
453 debug!("Stored task: {}", task.id);
454 Ok(())
455 }
456
457 async fn get_task(&self, id: &str) -> Result<Option<TaskModel>, Self::Error> {
458 let conn = self.get_conn()?;
459
460 let result = conn
461 .query_row("SELECT data FROM tasks WHERE id = ?1", params![id], |row| {
462 row.get::<_, String>(0)
463 })
464 .optional()
465 .map_err(|e| StorageError::Database(e.to_string()))?;
466
467 match result {
468 Some(json) => Ok(Some(serde_json::from_str(&json)?)),
469 None => Ok(None),
470 }
471 }
472
473 async fn update_task(&self, task: &TaskModel) -> Result<(), Self::Error> {
474 let conn = self.get_conn()?;
475 let json = serde_json::to_string(task)?;
476
477 let rows = conn
478 .execute(
479 "UPDATE tasks
480 SET task_type = ?2, priority = ?3, status = ?4, assigned_to = ?5,
481 payload = ?6, updated_at = ?7, data = ?8
482 WHERE id = ?1",
483 params![
484 task.id,
485 task.task_type,
486 task.priority as i32,
487 serde_json::to_value(&task.status)?.as_str().unwrap(),
488 task.assigned_to,
489 serde_json::to_string(&task.payload)?,
490 task.updated_at.timestamp(),
491 json
492 ],
493 )
494 .map_err(|e| StorageError::Database(e.to_string()))?;
495
496 if rows == 0 {
497 return Err(StorageError::NotFound(format!(
498 "Task {} not found",
499 task.id
500 )));
501 }
502
503 debug!("Updated task: {}", task.id);
504 Ok(())
505 }
506
507 async fn get_pending_tasks(&self) -> Result<Vec<TaskModel>, Self::Error> {
508 let conn = self.get_conn()?;
509
510 let mut stmt = conn
511 .prepare(
512 "SELECT data FROM tasks
513 WHERE status = 'pending'
514 ORDER BY priority DESC, created_at ASC",
515 )
516 .map_err(|e| StorageError::Database(e.to_string()))?;
517
518 let tasks = stmt
519 .query_map([], |row| row.get::<_, String>(0))
520 .map_err(|e| StorageError::Database(e.to_string()))?
521 .filter_map(|r| r.ok())
522 .filter_map(|json| serde_json::from_str(&json).ok())
523 .collect();
524
525 Ok(tasks)
526 }
527
528 async fn get_tasks_by_agent(&self, agent_id: &str) -> Result<Vec<TaskModel>, Self::Error> {
529 let conn = self.get_conn()?;
530
531 let mut stmt = conn
532 .prepare(
533 "SELECT data FROM tasks
534 WHERE assigned_to = ?1
535 ORDER BY priority DESC, created_at ASC",
536 )
537 .map_err(|e| StorageError::Database(e.to_string()))?;
538
539 let tasks = stmt
540 .query_map(params![agent_id], |row| row.get::<_, String>(0))
541 .map_err(|e| StorageError::Database(e.to_string()))?
542 .filter_map(|r| r.ok())
543 .filter_map(|json| serde_json::from_str(&json).ok())
544 .collect();
545
546 Ok(tasks)
547 }
548
549 async fn claim_task(&self, task_id: &str, agent_id: &str) -> Result<bool, Self::Error> {
550 let task_id = task_id.to_owned();
551 let agent_id = agent_id.to_owned();
552
553 self.with_retry(move |conn| {
554 let timestamp = chrono::Utc::now().timestamp();
555 conn.execute(
556 "UPDATE tasks
557 SET assigned_to = ?2, status = 'assigned', updated_at = ?3
558 WHERE id = ?1 AND status = 'pending'",
559 params![&task_id, &agent_id, timestamp],
560 )
561 }).await.map(|rows| rows > 0)
562 }
563
564 async fn store_event(&self, event: &EventModel) -> Result<(), Self::Error> {
566 let conn = self.get_conn()?;
567 let json = serde_json::to_string(event)?;
568
569 conn.execute(
570 "INSERT INTO events (id, event_type, agent_id, task_id, payload, metadata,
571 timestamp, sequence, data)
572 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
573 params![
574 event.id,
575 event.event_type,
576 event.agent_id,
577 event.task_id,
578 serde_json::to_string(&event.payload)?,
579 serde_json::to_string(&event.metadata)?,
580 event.timestamp.timestamp(),
581 event.sequence as i64,
582 json
583 ],
584 )
585 .map_err(|e| StorageError::Database(e.to_string()))?;
586
587 debug!("Stored event: {}", event.id);
588 Ok(())
589 }
590
591 async fn get_events_by_agent(
592 &self,
593 agent_id: &str,
594 limit: usize,
595 ) -> Result<Vec<EventModel>, Self::Error> {
596 let conn = self.get_conn()?;
597
598 let mut stmt = conn
599 .prepare(
600 "SELECT data FROM events
601 WHERE agent_id = ?1
602 ORDER BY timestamp DESC, id DESC
603 LIMIT ?2",
604 )
605 .map_err(|e| StorageError::Database(e.to_string()))?;
606
607 let events = stmt
608 .query_map(params![agent_id, limit], |row| row.get::<_, String>(0))
609 .map_err(|e| StorageError::Database(e.to_string()))?
610 .filter_map(|r| r.ok())
611 .filter_map(|json| serde_json::from_str(&json).ok())
612 .collect();
613
614 Ok(events)
615 }
616
617 async fn get_events_by_type(
618 &self,
619 event_type: &str,
620 limit: usize,
621 ) -> Result<Vec<EventModel>, Self::Error> {
622 let conn = self.get_conn()?;
623
624 let mut stmt = conn
625 .prepare(
626 "SELECT data FROM events
627 WHERE event_type = ?1
628 ORDER BY timestamp DESC, id DESC
629 LIMIT ?2",
630 )
631 .map_err(|e| StorageError::Database(e.to_string()))?;
632
633 let events = stmt
634 .query_map(params![event_type, limit], |row| {
635 row.get::<_, String>(0)
636 })
637 .map_err(|e| StorageError::Database(e.to_string()))?
638 .filter_map(|r| r.ok())
639 .filter_map(|json| serde_json::from_str(&json).ok())
640 .collect();
641
642 Ok(events)
643 }
644
645 async fn get_events_since(&self, timestamp: i64) -> Result<Vec<EventModel>, Self::Error> {
646 let conn = self.get_conn()?;
647
648 let mut stmt = conn
649 .prepare(
650 "SELECT data FROM events
651 WHERE timestamp >= ?1
652 ORDER BY timestamp ASC, id ASC",
653 )
654 .map_err(|e| StorageError::Database(e.to_string()))?;
655
656 let events = stmt
657 .query_map(params![timestamp], |row| row.get::<_, String>(0))
658 .map_err(|e| StorageError::Database(e.to_string()))?
659 .filter_map(|r| r.ok())
660 .filter_map(|json| serde_json::from_str(&json).ok())
661 .collect();
662
663 Ok(events)
664 }
665
666 async fn store_message(&self, message: &MessageModel) -> Result<(), Self::Error> {
668 let conn = self.get_conn()?;
669 let json = serde_json::to_string(message)?;
670
671 conn.execute(
672 "INSERT INTO messages (id, from_agent, to_agent, message_type, content,
673 priority, read, created_at, data)
674 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
675 params![
676 message.id,
677 message.from_agent,
678 message.to_agent,
679 message.message_type,
680 serde_json::to_string(&message.content)?,
681 serde_json::to_string(&message.priority)?,
682 message.read as i32,
683 message.created_at.timestamp(),
684 json
685 ],
686 )
687 .map_err(|e| StorageError::Database(e.to_string()))?;
688
689 debug!("Stored message: {}", message.id);
690 Ok(())
691 }
692
693 async fn get_messages_between(
694 &self,
695 agent1: &str,
696 agent2: &str,
697 limit: usize,
698 ) -> Result<Vec<MessageModel>, Self::Error> {
699 let conn = self.get_conn()?;
700
701 let mut stmt = conn
702 .prepare(
703 "SELECT data FROM messages
704 WHERE (from_agent = ?1 AND to_agent = ?2) OR (from_agent = ?2 AND to_agent = ?1)
705 ORDER BY created_at DESC
706 LIMIT ?3",
707 )
708 .map_err(|e| StorageError::Database(e.to_string()))?;
709
710 let messages = stmt
711 .query_map(params![agent1, agent2, limit], |row| {
712 row.get::<_, String>(0)
713 })
714 .map_err(|e| StorageError::Database(e.to_string()))?
715 .filter_map(|r| r.ok())
716 .filter_map(|json| serde_json::from_str(&json).ok())
717 .collect();
718
719 Ok(messages)
720 }
721
722 async fn get_unread_messages(&self, agent_id: &str) -> Result<Vec<MessageModel>, Self::Error> {
723 let conn = self.get_conn()?;
724
725 let mut stmt = conn
726 .prepare(
727 "SELECT data FROM messages
728 WHERE to_agent = ?1 AND read = 0
729 ORDER BY created_at ASC",
730 )
731 .map_err(|e| StorageError::Database(e.to_string()))?;
732
733 let messages = stmt
734 .query_map(params![agent_id], |row| row.get::<_, String>(0))
735 .map_err(|e| StorageError::Database(e.to_string()))?
736 .filter_map(|r| r.ok())
737 .filter_map(|json| serde_json::from_str(&json).ok())
738 .collect();
739
740 Ok(messages)
741 }
742
743 async fn mark_message_read(&self, message_id: &str) -> Result<(), Self::Error> {
744 let conn = self.get_conn()?;
745
746 let rows = conn
747 .execute(
748 "UPDATE messages SET read = 1, read_at = ?2 WHERE id = ?1",
749 params![message_id, chrono::Utc::now().timestamp()],
750 )
751 .map_err(|e| StorageError::Database(e.to_string()))?;
752
753 if rows == 0 {
754 return Err(StorageError::NotFound(format!(
755 "Message {} not found",
756 message_id
757 )));
758 }
759
760 Ok(())
761 }
762
763 async fn store_metric(&self, metric: &MetricModel) -> Result<(), Self::Error> {
765 let conn = self.get_conn()?;
766 let json = serde_json::to_string(metric)?;
767
768 conn.execute(
769 "INSERT INTO metrics (id, metric_type, agent_id, value, unit, tags, timestamp, data)
770 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
771 params![
772 metric.id,
773 metric.metric_type,
774 metric.agent_id,
775 metric.value,
776 metric.unit,
777 serde_json::to_string(&metric.tags)?,
778 metric.timestamp.timestamp(),
779 json
780 ],
781 )
782 .map_err(|e| StorageError::Database(e.to_string()))?;
783
784 debug!("Stored metric: {}", metric.id);
785 Ok(())
786 }
787
788 async fn get_metrics_by_agent(
789 &self,
790 agent_id: &str,
791 metric_type: &str,
792 ) -> Result<Vec<MetricModel>, Self::Error> {
793 let conn = self.get_conn()?;
794
795 let mut stmt = conn
796 .prepare(
797 "SELECT data FROM metrics
798 WHERE agent_id = ?1 AND metric_type = ?2
799 ORDER BY timestamp DESC, id DESC",
800 )
801 .map_err(|e| StorageError::Database(e.to_string()))?;
802
803 let metrics = stmt
804 .query_map(params![agent_id, metric_type], |row| {
805 row.get::<_, String>(0)
806 })
807 .map_err(|e| StorageError::Database(e.to_string()))?
808 .filter_map(|r| r.ok())
809 .filter_map(|json| serde_json::from_str(&json).ok())
810 .collect();
811
812 Ok(metrics)
813 }
814
815 async fn get_aggregated_metrics(
816 &self,
817 metric_type: &str,
818 start_time: i64,
819 end_time: i64,
820 ) -> Result<Vec<MetricModel>, Self::Error> {
821 let conn = self.get_conn()?;
822
823 let mut stmt = conn
824 .prepare(
825 "SELECT metric_type, agent_id, AVG(value) as value, unit,
826 MIN(timestamp) as timestamp, COUNT(*) as count
827 FROM metrics
828 WHERE metric_type = ?1 AND timestamp >= ?2 AND timestamp <= ?3
829 GROUP BY metric_type, agent_id",
830 )
831 .map_err(|e| StorageError::Database(e.to_string()))?;
832
833 let metrics = stmt
834 .query_map(params![metric_type, start_time, end_time], |row| {
835 let mut metric = MetricModel::new(
836 row.get::<_, String>(0)?,
837 row.get::<_, f64>(2)?,
838 row.get::<_, String>(3)?,
839 );
840 metric.agent_id = row.get::<_, Option<String>>(1)?;
841 metric
842 .tags
843 .insert("count".to_string(), row.get::<_, i64>(5)?.to_string());
844 Ok(metric)
845 })
846 .map_err(|e| StorageError::Database(e.to_string()))?
847 .filter_map(|r| r.ok())
848 .collect();
849
850 Ok(metrics)
851 }
852
853 async fn begin_transaction(&self) -> Result<Box<dyn TransactionTrait>, Self::Error> {
855 let conn = self.get_conn()?;
856 let transaction = SqliteTransaction::new(conn)?;
857 Ok(Box::new(transaction))
858 }
859
860 async fn vacuum(&self) -> Result<(), Self::Error> {
862 let conn = self.get_conn()?;
863 conn.execute("VACUUM", [])
864 .map_err(|e| StorageError::Database(e.to_string()))?;
865 info!("Database vacuumed");
866 Ok(())
867 }
868
869 async fn checkpoint(&self) -> Result<(), Self::Error> {
870 #[cfg(test)]
872 {
873 debug!("Skipping checkpoint in test mode");
874 return Ok(());
875 }
876
877 #[cfg(not(test))]
878 {
879 self.exec_blocking(move |conn| {
880 conn.execute("PRAGMA wal_checkpoint(TRUNCATE)", [])?;
881 info!("Database checkpoint completed");
882 Ok(())
883 }).await
884 }
885 }
886
887 async fn get_storage_size(&self) -> Result<u64, Self::Error> {
888 let metadata =
889 std::fs::metadata(&self.path).map_err(|e| StorageError::Other(e.to_string()))?;
890 Ok(metadata.len())
891 }
892}
893
894struct SqliteTransaction {
896 conn: Mutex<Option<SqliteConn>>,
897 committed: Arc<Mutex<bool>>,
898}
899
900impl SqliteTransaction {
901 fn new(conn: SqliteConn) -> Result<Self, StorageError> {
902 conn.execute("BEGIN DEFERRED", [])
905 .map_err(|e| {
906 debug!("Failed to begin transaction: {}", e);
907 StorageError::Transaction(format!("Failed to begin transaction: {}", e))
908 })?;
909
910 Ok(Self {
911 conn: Mutex::new(Some(conn)),
912 committed: Arc::new(Mutex::new(false)),
913 })
914 }
915
916 fn execute_in_transaction<F, T>(&self, operation: F) -> Result<T, StorageError>
918 where
919 F: FnOnce(&SqliteConn) -> Result<T, rusqlite::Error>,
920 {
921 let conn_guard = self.conn.lock();
922 if let Some(conn) = conn_guard.as_ref() {
923 operation(conn).map_err(|e| StorageError::Database(e.to_string()))
924 } else {
925 Err(StorageError::Transaction("Transaction already consumed".to_string()))
926 }
927 }
928}
929
930impl Drop for SqliteTransaction {
931 fn drop(&mut self) {
932 let committed = self.committed.lock();
933 if !*committed {
934 if let Some(conn) = self.conn.lock().take() {
936 let _ = conn.execute("ROLLBACK", []);
937 debug!("Transaction automatically rolled back on drop");
938 }
939 }
940 }
941}
942
943#[async_trait]
944impl TransactionTrait for SqliteTransaction {
945 async fn commit(self: Box<Self>) -> Result<(), StorageError> {
946 let mut committed = self.committed.lock();
947 if *committed {
948 return Err(StorageError::Transaction("Transaction already committed".to_string()));
949 }
950
951 if let Some(conn) = self.conn.lock().take() {
952 conn.execute("COMMIT", [])
953 .map_err(|e| StorageError::Transaction(format!("Failed to commit transaction: {}", e)))?;
954 *committed = true;
955 drop(conn);
956 Ok(())
957 } else {
958 Err(StorageError::Transaction("Transaction already consumed".to_string()))
959 }
960 }
961
962 async fn rollback(self: Box<Self>) -> Result<(), StorageError> {
963 let committed = self.committed.lock();
964 if *committed {
965 return Err(StorageError::Transaction("Cannot rollback committed transaction".to_string()));
966 }
967 drop(committed);
968
969 if let Some(conn) = self.conn.lock().take() {
970 conn.execute("ROLLBACK", [])
971 .map_err(|e| StorageError::Transaction(format!("Failed to rollback transaction: {}", e)))?;
972 drop(conn);
973 Ok(())
974 } else {
975 Err(StorageError::Transaction("Transaction already consumed".to_string()))
976 }
977 }
978}
979
980#[cfg(test)]
981mod tests {
982 use super::*;
983 use tempfile::NamedTempFile;
984
985 #[tokio::test]
986 async fn test_sqlite_storage() {
987 let temp_file = NamedTempFile::new().unwrap();
988 let storage = SqliteStorage::new(temp_file.path().to_str().unwrap())
989 .await
990 .unwrap();
991
992 let agent = AgentModel::new(
994 "test-agent".to_string(),
995 "worker".to_string(),
996 vec!["compute".to_string()],
997 );
998
999 storage.store_agent(&agent).await.unwrap();
1000 let retrieved = storage.get_agent(&agent.id).await.unwrap();
1001 assert!(retrieved.is_some());
1002 assert_eq!(retrieved.unwrap().name, "test-agent");
1003
1004 let task = TaskModel::new(
1006 "process".to_string(),
1007 serde_json::json!({"data": "test"}),
1008 TaskPriority::High,
1009 );
1010
1011 storage.store_task(&task).await.unwrap();
1012 let pending = storage.get_pending_tasks().await.unwrap();
1013 assert_eq!(pending.len(), 1);
1014 }
1015}