avocado_core/storage/
sqlite.rs

1//! SQLite storage backend implementation
2//!
3//! Wraps the existing Database struct with async interface using spawn_blocking.
4
5use async_trait::async_trait;
6use std::path::Path;
7use std::sync::Arc;
8use tokio::task;
9
10use crate::db::Database;
11use crate::index::VectorIndex;
12use crate::storage::traits::StorageBackend;
13use crate::storage::vector::{VectorSearchProvider, VectorSearchResult};
14use crate::types::*;
15
16/// SQLite storage backend
17///
18/// Wraps the existing Database implementation with async interface.
19/// Uses `tokio::task::spawn_blocking` for rusqlite operations.
20pub struct SqliteBackend {
21    db: Database,
22}
23
24impl SqliteBackend {
25    /// Create new SQLite backend
26    ///
27    /// # Arguments
28    /// * `path` - Path to SQLite database file
29    pub async fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
30        let path = path.as_ref().to_path_buf();
31        let db = task::spawn_blocking(move || Database::new(&path))
32            .await
33            .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))??;
34
35        Ok(Self { db })
36    }
37
38    /// Get reference to underlying Database (for backward compatibility)
39    pub fn database(&self) -> &Database {
40        &self.db
41    }
42}
43
44#[async_trait]
45impl StorageBackend for SqliteBackend {
46    // ========== Lifecycle ==========
47
48    async fn get_stats(&self) -> Result<(usize, usize, usize)> {
49        let db = self.db.clone();
50        task::spawn_blocking(move || db.get_stats())
51            .await
52            .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
53    }
54
55    async fn clear(&self) -> Result<()> {
56        let db = self.db.clone();
57        task::spawn_blocking(move || db.clear())
58            .await
59            .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
60    }
61
62    // ========== Artifacts ==========
63
64    async fn insert_artifact(&self, artifact: &Artifact) -> Result<()> {
65        let db = self.db.clone();
66        let artifact = artifact.clone();
67        task::spawn_blocking(move || db.insert_artifact(&artifact))
68            .await
69            .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
70    }
71
72    async fn get_artifact(&self, artifact_id: &str) -> Result<Option<Artifact>> {
73        let db = self.db.clone();
74        let id = artifact_id.to_string();
75        task::spawn_blocking(move || db.get_artifact(&id))
76            .await
77            .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
78    }
79
80    async fn get_artifact_by_path(&self, path: &str) -> Result<Option<Artifact>> {
81        let db = self.db.clone();
82        let p = path.to_string();
83        task::spawn_blocking(move || db.get_artifact_by_path(&p))
84            .await
85            .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
86    }
87
88    async fn delete_artifact(&self, artifact_id: &str) -> Result<usize> {
89        let db = self.db.clone();
90        let id = artifact_id.to_string();
91        task::spawn_blocking(move || db.delete_artifact(&id))
92            .await
93            .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
94    }
95
96    async fn determine_ingest_action(
97        &self,
98        path: &str,
99        content_hash: &str,
100    ) -> Result<IngestAction> {
101        let db = self.db.clone();
102        let p = path.to_string();
103        let h = content_hash.to_string();
104        task::spawn_blocking(move || db.determine_ingest_action(&p, &h))
105            .await
106            .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
107    }
108
109    // ========== Spans ==========
110
111    async fn insert_spans(&self, spans: &[Span]) -> Result<()> {
112        let db = self.db.clone();
113        let spans = spans.to_vec();
114        task::spawn_blocking(move || db.insert_spans(&spans))
115            .await
116            .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
117    }
118
119    async fn get_all_spans(&self) -> Result<Vec<Span>> {
120        let db = self.db.clone();
121        task::spawn_blocking(move || db.get_all_spans())
122            .await
123            .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
124    }
125
126    async fn search_spans(&self, query: &str, limit: usize) -> Result<Vec<Span>> {
127        let db = self.db.clone();
128        let q = query.to_string();
129        task::spawn_blocking(move || db.search_spans(&q, limit))
130            .await
131            .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
132    }
133
134    // ========== Vector Search ==========
135
136    async fn get_vector_search(&self) -> Result<Arc<dyn VectorSearchProvider>> {
137        let db = self.db.clone();
138        let index = task::spawn_blocking(move || db.get_vector_index())
139            .await
140            .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))??;
141
142        Ok(Arc::new(HnswVectorSearch::new(index)))
143    }
144
145    async fn invalidate_vector_index(&self) {
146        // The existing Database handles this internally via index_dirty flag
147        // This is called after data changes to force index rebuild
148    }
149
150    // ========== Sessions ==========
151
152    async fn create_session(
153        &self,
154        user_id: Option<&str>,
155        title: Option<&str>,
156    ) -> Result<Session> {
157        let db = self.db.clone();
158        let uid = user_id.map(|s| s.to_string());
159        let t = title.map(|s| s.to_string());
160        task::spawn_blocking(move || db.create_session(uid.as_deref(), t.as_deref()))
161            .await
162            .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
163    }
164
165    async fn get_session(&self, session_id: &str) -> Result<Option<Session>> {
166        let db = self.db.clone();
167        let id = session_id.to_string();
168        task::spawn_blocking(move || db.get_session(&id))
169            .await
170            .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
171    }
172
173    async fn list_sessions(
174        &self,
175        user_id: Option<&str>,
176        limit: Option<usize>,
177    ) -> Result<Vec<Session>> {
178        let db = self.db.clone();
179        let uid = user_id.map(|s| s.to_string());
180        task::spawn_blocking(move || db.list_sessions(uid.as_deref(), limit))
181            .await
182            .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
183    }
184
185    async fn update_session(
186        &self,
187        session_id: &str,
188        title: Option<&str>,
189        metadata: Option<&serde_json::Value>,
190    ) -> Result<()> {
191        let db = self.db.clone();
192        let id = session_id.to_string();
193        let t = title.map(|s| s.to_string());
194        let m = metadata.cloned();
195        task::spawn_blocking(move || db.update_session(&id, t.as_deref(), m.as_ref()))
196            .await
197            .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
198    }
199
200    async fn delete_session(&self, session_id: &str) -> Result<()> {
201        let db = self.db.clone();
202        let id = session_id.to_string();
203        task::spawn_blocking(move || db.delete_session(&id))
204            .await
205            .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
206    }
207
208    // ========== Messages ==========
209
210    async fn add_message(
211        &self,
212        session_id: &str,
213        role: MessageRole,
214        content: &str,
215        metadata: Option<&serde_json::Value>,
216    ) -> Result<Message> {
217        let db = self.db.clone();
218        let sid = session_id.to_string();
219        let c = content.to_string();
220        let m = metadata.cloned();
221        task::spawn_blocking(move || db.add_message(&sid, role, &c, m.as_ref()))
222            .await
223            .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
224    }
225
226    async fn get_messages(
227        &self,
228        session_id: &str,
229        limit: Option<usize>,
230    ) -> Result<Vec<Message>> {
231        let db = self.db.clone();
232        let sid = session_id.to_string();
233        task::spawn_blocking(move || db.get_messages(&sid, limit))
234            .await
235            .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
236    }
237
238    // ========== Working Sets ==========
239
240    async fn associate_working_set(
241        &self,
242        session_id: &str,
243        message_id: Option<&str>,
244        working_set: &WorkingSet,
245        query: &str,
246        config: &CompilerConfig,
247    ) -> Result<SessionWorkingSet> {
248        let db = self.db.clone();
249        let sid = session_id.to_string();
250        let mid = message_id.map(|s| s.to_string());
251        let ws = working_set.clone();
252        let q = query.to_string();
253        let cfg = config.clone();
254        task::spawn_blocking(move || {
255            db.associate_working_set(&sid, mid.as_deref(), &ws, &q, &cfg)
256        })
257        .await
258        .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
259    }
260
261    async fn get_session_full(&self, session_id: &str) -> Result<Option<SessionWithMessages>> {
262        let db = self.db.clone();
263        let sid = session_id.to_string();
264        task::spawn_blocking(move || db.get_session_full(&sid))
265            .await
266            .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
267    }
268
269    // ========== Agents ==========
270
271    async fn register_agent(&self, agent: &Agent) -> Result<Agent> {
272        let db = self.db.clone();
273        let a = agent.clone();
274        task::spawn_blocking(move || db.register_agent(&a))
275            .await
276            .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
277    }
278
279    async fn get_agent(&self, agent_id: &str) -> Result<Option<Agent>> {
280        let db = self.db.clone();
281        let id = agent_id.to_string();
282        task::spawn_blocking(move || db.get_agent(&id))
283            .await
284            .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
285    }
286
287    async fn get_agent_by_name(&self, name: &str) -> Result<Option<Agent>> {
288        let db = self.db.clone();
289        let n = name.to_string();
290        task::spawn_blocking(move || db.get_agent_by_name(&n))
291            .await
292            .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
293    }
294
295    async fn list_agents(&self) -> Result<Vec<Agent>> {
296        let db = self.db.clone();
297        task::spawn_blocking(move || db.list_agents())
298            .await
299            .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
300    }
301
302    // ========== Agent Relations ==========
303
304    async fn add_agent_relation(
305        &self,
306        session_id: &str,
307        message_id: &str,
308        from_agent_id: &str,
309        target_message_id: &str,
310        stance: Stance,
311    ) -> Result<AgentRelation> {
312        let db = self.db.clone();
313        let sid = session_id.to_string();
314        let mid = message_id.to_string();
315        let fid = from_agent_id.to_string();
316        let tmid = target_message_id.to_string();
317        task::spawn_blocking(move || db.add_agent_relation(&sid, &mid, &fid, &tmid, stance))
318            .await
319            .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
320    }
321
322    async fn get_agent_relations(&self, session_id: &str) -> Result<AgentRelationSummary> {
323        let db = self.db.clone();
324        let sid = session_id.to_string();
325        task::spawn_blocking(move || db.get_agent_relations(&sid))
326            .await
327            .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
328    }
329
330    async fn get_session_agents(&self, session_id: &str) -> Result<Vec<Agent>> {
331        let db = self.db.clone();
332        let sid = session_id.to_string();
333        task::spawn_blocking(move || db.get_session_agents(&sid))
334            .await
335            .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
336    }
337}
338
339// ========== HNSW Vector Search Provider ==========
340
341/// HNSW-based vector search for SQLite backend
342struct HnswVectorSearch {
343    index: Arc<VectorIndex>,
344}
345
346impl HnswVectorSearch {
347    fn new(index: Arc<VectorIndex>) -> Self {
348        Self { index }
349    }
350}
351
352#[async_trait]
353impl VectorSearchProvider for HnswVectorSearch {
354    async fn search(&self, query_embedding: &[f32], k: usize) -> Result<Vec<VectorSearchResult>> {
355        let index = self.index.clone();
356        let query = query_embedding.to_vec();
357
358        let results = task::spawn_blocking(move || index.search(&query, k))
359            .await
360            .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))??;
361
362        Ok(results.into_iter().map(VectorSearchResult::from).collect())
363    }
364
365    fn len(&self) -> usize {
366        self.index.len()
367    }
368
369    fn dimension(&self) -> usize {
370        self.index
371            .spans()
372            .first()
373            .and_then(|s| s.embedding.as_ref().map(|e| e.len()))
374            .unwrap_or(384) // Default for all-MiniLM-L6-v2
375    }
376}