ruvswarm_persistence/
lib.rs

1//! Persistence layer for ruvswarm with SQLite and ORM support
2//!
3//! This crate provides a flexible persistence layer with support for:
4//! - SQLite backend for native platforms
5//! - IndexedDB backend for WASM targets
6//! - In-memory storage for testing
7//! - Repository pattern with type-safe queries
8//! - Transaction support and connection pooling
9
10pub mod memory;
11pub mod migrations;
12pub mod models;
13#[cfg(not(target_arch = "wasm32"))]
14pub mod sqlite;
15#[cfg(target_arch = "wasm32")]
16pub mod wasm;
17
18use async_trait::async_trait;
19use std::error::Error as StdError;
20use thiserror::Error;
21
22pub use models::{AgentModel, EventModel, MessageModel, MetricModel, TaskModel};
23
24pub use memory::MemoryStorage;
25#[cfg(not(target_arch = "wasm32"))]
26pub use sqlite::SqliteStorage;
27#[cfg(target_arch = "wasm32")]
28pub use wasm::WasmStorage;
29
30/// Storage error types
31#[derive(Error, Debug)]
32pub enum StorageError {
33    #[error("Database error: {0}")]
34    Database(String),
35
36    #[error("Serialization error: {0}")]
37    Serialization(#[from] serde_json::Error),
38
39    #[error("Not found: {0}")]
40    NotFound(String),
41
42    #[error("Transaction error: {0}")]
43    Transaction(String),
44
45    #[error("Migration error: {0}")]
46    Migration(String),
47
48    #[error("Connection pool error: {0}")]
49    Pool(String),
50
51    #[error("Other error: {0}")]
52    Other(String),
53}
54
55impl From<Box<dyn StdError + Send + Sync>> for StorageError {
56    fn from(err: Box<dyn StdError + Send + Sync>) -> Self {
57        StorageError::Other(err.to_string())
58    }
59}
60
61/// Storage trait for persistence operations
62#[async_trait]
63pub trait Storage: Send + Sync {
64    type Error: StdError + Send + Sync + 'static;
65
66    // Agent operations
67    async fn store_agent(&self, agent: &AgentModel) -> Result<(), Self::Error>;
68    async fn get_agent(&self, id: &str) -> Result<Option<AgentModel>, Self::Error>;
69    async fn update_agent(&self, agent: &AgentModel) -> Result<(), Self::Error>;
70    async fn delete_agent(&self, id: &str) -> Result<(), Self::Error>;
71    async fn list_agents(&self) -> Result<Vec<AgentModel>, Self::Error>;
72    async fn list_agents_by_status(&self, status: &str) -> Result<Vec<AgentModel>, Self::Error>;
73
74    // Task operations
75    async fn store_task(&self, task: &TaskModel) -> Result<(), Self::Error>;
76    async fn get_task(&self, id: &str) -> Result<Option<TaskModel>, Self::Error>;
77    async fn update_task(&self, task: &TaskModel) -> Result<(), Self::Error>;
78    async fn get_pending_tasks(&self) -> Result<Vec<TaskModel>, Self::Error>;
79    async fn get_tasks_by_agent(&self, agent_id: &str) -> Result<Vec<TaskModel>, Self::Error>;
80    async fn claim_task(&self, task_id: &str, agent_id: &str) -> Result<bool, Self::Error>;
81
82    // Event operations
83    async fn store_event(&self, event: &EventModel) -> Result<(), Self::Error>;
84    async fn get_events_by_agent(
85        &self,
86        agent_id: &str,
87        limit: usize,
88    ) -> Result<Vec<EventModel>, Self::Error>;
89    async fn get_events_by_type(
90        &self,
91        event_type: &str,
92        limit: usize,
93    ) -> Result<Vec<EventModel>, Self::Error>;
94    async fn get_events_since(&self, timestamp: i64) -> Result<Vec<EventModel>, Self::Error>;
95
96    // Message operations
97    async fn store_message(&self, message: &MessageModel) -> Result<(), Self::Error>;
98    async fn get_messages_between(
99        &self,
100        agent1: &str,
101        agent2: &str,
102        limit: usize,
103    ) -> Result<Vec<MessageModel>, Self::Error>;
104    async fn get_unread_messages(&self, agent_id: &str) -> Result<Vec<MessageModel>, Self::Error>;
105    async fn mark_message_read(&self, message_id: &str) -> Result<(), Self::Error>;
106
107    // Metric operations
108    async fn store_metric(&self, metric: &MetricModel) -> Result<(), Self::Error>;
109    async fn get_metrics_by_agent(
110        &self,
111        agent_id: &str,
112        metric_type: &str,
113    ) -> Result<Vec<MetricModel>, Self::Error>;
114    async fn get_aggregated_metrics(
115        &self,
116        metric_type: &str,
117        start_time: i64,
118        end_time: i64,
119    ) -> Result<Vec<MetricModel>, Self::Error>;
120
121    // Transaction support
122    async fn begin_transaction(&self) -> Result<Box<dyn Transaction>, Self::Error>;
123
124    // Maintenance operations
125    async fn vacuum(&self) -> Result<(), Self::Error>;
126    async fn checkpoint(&self) -> Result<(), Self::Error>;
127    async fn get_storage_size(&self) -> Result<u64, Self::Error>;
128}
129
130/// Transaction trait for atomic operations
131#[async_trait]
132pub trait Transaction: Send + Sync {
133    async fn commit(self: Box<Self>) -> Result<(), StorageError>;
134    async fn rollback(self: Box<Self>) -> Result<(), StorageError>;
135}
136
137/// Query builder for type-safe queries with parameterized values
138pub struct QueryBuilder<T> {
139    table: String,
140    conditions: Vec<String>,
141    parameters: Vec<String>,
142    order_by: Option<String>,
143    limit: Option<usize>,
144    offset: Option<usize>,
145    _phantom: std::marker::PhantomData<T>,
146}
147
148impl<T> QueryBuilder<T> {
149    pub fn new(table: &str) -> Self {
150        Self {
151            table: table.to_string(),
152            conditions: Vec::new(),
153            parameters: Vec::new(),
154            order_by: None,
155            limit: None,
156            offset: None,
157            _phantom: std::marker::PhantomData,
158        }
159    }
160
161    pub fn where_eq(mut self, field: &str, value: &str) -> Self {
162        // Use parameterized placeholders to prevent SQL injection
163        self.conditions.push(format!("{} = ?", field));
164        self.parameters.push(value.to_string());
165        self
166    }
167
168    pub fn where_like(mut self, field: &str, pattern: &str) -> Self {
169        // Use parameterized placeholders to prevent SQL injection
170        self.conditions.push(format!("{} LIKE ?", field));
171        self.parameters.push(pattern.to_string());
172        self
173    }
174
175    pub fn where_gt(mut self, field: &str, value: i64) -> Self {
176        self.conditions.push(format!("{} > {}", field, value));
177        self
178    }
179
180    pub fn order_by(mut self, field: &str, desc: bool) -> Self {
181        self.order_by = Some(format!("{} {}", field, if desc { "DESC" } else { "ASC" }));
182        self
183    }
184
185    pub fn limit(mut self, limit: usize) -> Self {
186        self.limit = Some(limit);
187        self
188    }
189
190    pub fn offset(mut self, offset: usize) -> Self {
191        self.offset = Some(offset);
192        self
193    }
194
195    pub fn build(&self) -> (String, Vec<String>) {
196        let mut query = format!("SELECT * FROM {}", self.table);
197
198        if !self.conditions.is_empty() {
199            query.push_str(" WHERE ");
200            query.push_str(&self.conditions.join(" AND "));
201        }
202
203        if let Some(ref order) = self.order_by {
204            query.push_str(" ORDER BY ");
205            query.push_str(order);
206        }
207
208        if let Some(limit) = self.limit {
209            query.push_str(&format!(" LIMIT {}", limit));
210        }
211
212        if let Some(offset) = self.offset {
213            query.push_str(&format!(" OFFSET {}", offset));
214        }
215
216        (query, self.parameters.clone())
217    }
218}
219
220/// Repository pattern implementation
221pub trait Repository<T> {
222    type Error: StdError + Send + Sync + 'static;
223
224    fn find_by_id(&self, id: &str) -> Result<Option<T>, Self::Error>;
225    fn find_all(&self) -> Result<Vec<T>, Self::Error>;
226    fn save(&self, entity: &T) -> Result<(), Self::Error>;
227    fn update(&self, entity: &T) -> Result<(), Self::Error>;
228    fn delete(&self, id: &str) -> Result<(), Self::Error>;
229    fn query(&self, builder: QueryBuilder<T>) -> Result<Vec<T>, Self::Error>;
230}
231
232/// Initialize storage based on target platform
233pub async fn init_storage(
234    path: Option<&str>,
235) -> Result<Box<dyn Storage<Error = StorageError>>, StorageError> {
236    #[cfg(not(target_arch = "wasm32"))]
237    {
238        let path = path.unwrap_or("swarm.db");
239        Ok(Box::new(SqliteStorage::new(path).await?))
240    }
241
242    #[cfg(target_arch = "wasm32")]
243    {
244        Ok(Box::new(WasmStorage::new().await?))
245    }
246}
247
248#[cfg(test)]
249mod tests;