1use async_trait::async_trait;
6use std::path::Path;
7use std::sync::Arc;
8use tokio::task;
9
10use crate::db::Database;
11use crate::index::VectorIndex;
12use crate::storage::traits::StorageBackend;
13use crate::storage::vector::{VectorSearchProvider, VectorSearchResult};
14use crate::types::*;
15
16pub struct SqliteBackend {
21 db: Database,
22}
23
24impl SqliteBackend {
25 pub async fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
30 let path = path.as_ref().to_path_buf();
31 let db = task::spawn_blocking(move || Database::new(&path))
32 .await
33 .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))??;
34
35 Ok(Self { db })
36 }
37
38 pub fn database(&self) -> &Database {
40 &self.db
41 }
42}
43
44#[async_trait]
45impl StorageBackend for SqliteBackend {
46 async fn get_stats(&self) -> Result<(usize, usize, usize)> {
49 let db = self.db.clone();
50 task::spawn_blocking(move || db.get_stats())
51 .await
52 .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
53 }
54
55 async fn clear(&self) -> Result<()> {
56 let db = self.db.clone();
57 task::spawn_blocking(move || db.clear())
58 .await
59 .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
60 }
61
62 async fn insert_artifact(&self, artifact: &Artifact) -> Result<()> {
65 let db = self.db.clone();
66 let artifact = artifact.clone();
67 task::spawn_blocking(move || db.insert_artifact(&artifact))
68 .await
69 .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
70 }
71
72 async fn get_artifact(&self, artifact_id: &str) -> Result<Option<Artifact>> {
73 let db = self.db.clone();
74 let id = artifact_id.to_string();
75 task::spawn_blocking(move || db.get_artifact(&id))
76 .await
77 .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
78 }
79
80 async fn get_artifact_by_path(&self, path: &str) -> Result<Option<Artifact>> {
81 let db = self.db.clone();
82 let p = path.to_string();
83 task::spawn_blocking(move || db.get_artifact_by_path(&p))
84 .await
85 .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
86 }
87
88 async fn delete_artifact(&self, artifact_id: &str) -> Result<usize> {
89 let db = self.db.clone();
90 let id = artifact_id.to_string();
91 task::spawn_blocking(move || db.delete_artifact(&id))
92 .await
93 .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
94 }
95
96 async fn determine_ingest_action(
97 &self,
98 path: &str,
99 content_hash: &str,
100 ) -> Result<IngestAction> {
101 let db = self.db.clone();
102 let p = path.to_string();
103 let h = content_hash.to_string();
104 task::spawn_blocking(move || db.determine_ingest_action(&p, &h))
105 .await
106 .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
107 }
108
109 async fn insert_spans(&self, spans: &[Span]) -> Result<()> {
112 let db = self.db.clone();
113 let spans = spans.to_vec();
114 task::spawn_blocking(move || db.insert_spans(&spans))
115 .await
116 .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
117 }
118
119 async fn get_all_spans(&self) -> Result<Vec<Span>> {
120 let db = self.db.clone();
121 task::spawn_blocking(move || db.get_all_spans())
122 .await
123 .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
124 }
125
126 async fn search_spans(&self, query: &str, limit: usize) -> Result<Vec<Span>> {
127 let db = self.db.clone();
128 let q = query.to_string();
129 task::spawn_blocking(move || db.search_spans(&q, limit))
130 .await
131 .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
132 }
133
134 async fn get_vector_search(&self) -> Result<Arc<dyn VectorSearchProvider>> {
137 let db = self.db.clone();
138 let index = task::spawn_blocking(move || db.get_vector_index())
139 .await
140 .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))??;
141
142 Ok(Arc::new(HnswVectorSearch::new(index)))
143 }
144
145 async fn invalidate_vector_index(&self) {
146 }
149
150 async fn create_session(
153 &self,
154 user_id: Option<&str>,
155 title: Option<&str>,
156 ) -> Result<Session> {
157 let db = self.db.clone();
158 let uid = user_id.map(|s| s.to_string());
159 let t = title.map(|s| s.to_string());
160 task::spawn_blocking(move || db.create_session(uid.as_deref(), t.as_deref()))
161 .await
162 .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
163 }
164
165 async fn get_session(&self, session_id: &str) -> Result<Option<Session>> {
166 let db = self.db.clone();
167 let id = session_id.to_string();
168 task::spawn_blocking(move || db.get_session(&id))
169 .await
170 .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
171 }
172
173 async fn list_sessions(
174 &self,
175 user_id: Option<&str>,
176 limit: Option<usize>,
177 ) -> Result<Vec<Session>> {
178 let db = self.db.clone();
179 let uid = user_id.map(|s| s.to_string());
180 task::spawn_blocking(move || db.list_sessions(uid.as_deref(), limit))
181 .await
182 .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
183 }
184
185 async fn update_session(
186 &self,
187 session_id: &str,
188 title: Option<&str>,
189 metadata: Option<&serde_json::Value>,
190 ) -> Result<()> {
191 let db = self.db.clone();
192 let id = session_id.to_string();
193 let t = title.map(|s| s.to_string());
194 let m = metadata.cloned();
195 task::spawn_blocking(move || db.update_session(&id, t.as_deref(), m.as_ref()))
196 .await
197 .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
198 }
199
200 async fn delete_session(&self, session_id: &str) -> Result<()> {
201 let db = self.db.clone();
202 let id = session_id.to_string();
203 task::spawn_blocking(move || db.delete_session(&id))
204 .await
205 .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
206 }
207
208 async fn add_message(
211 &self,
212 session_id: &str,
213 role: MessageRole,
214 content: &str,
215 metadata: Option<&serde_json::Value>,
216 ) -> Result<Message> {
217 let db = self.db.clone();
218 let sid = session_id.to_string();
219 let c = content.to_string();
220 let m = metadata.cloned();
221 task::spawn_blocking(move || db.add_message(&sid, role, &c, m.as_ref()))
222 .await
223 .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
224 }
225
226 async fn get_messages(
227 &self,
228 session_id: &str,
229 limit: Option<usize>,
230 ) -> Result<Vec<Message>> {
231 let db = self.db.clone();
232 let sid = session_id.to_string();
233 task::spawn_blocking(move || db.get_messages(&sid, limit))
234 .await
235 .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
236 }
237
238 async fn associate_working_set(
241 &self,
242 session_id: &str,
243 message_id: Option<&str>,
244 working_set: &WorkingSet,
245 query: &str,
246 config: &CompilerConfig,
247 ) -> Result<SessionWorkingSet> {
248 let db = self.db.clone();
249 let sid = session_id.to_string();
250 let mid = message_id.map(|s| s.to_string());
251 let ws = working_set.clone();
252 let q = query.to_string();
253 let cfg = config.clone();
254 task::spawn_blocking(move || {
255 db.associate_working_set(&sid, mid.as_deref(), &ws, &q, &cfg)
256 })
257 .await
258 .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
259 }
260
261 async fn get_session_full(&self, session_id: &str) -> Result<Option<SessionWithMessages>> {
262 let db = self.db.clone();
263 let sid = session_id.to_string();
264 task::spawn_blocking(move || db.get_session_full(&sid))
265 .await
266 .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
267 }
268
269 async fn register_agent(&self, agent: &Agent) -> Result<Agent> {
272 let db = self.db.clone();
273 let a = agent.clone();
274 task::spawn_blocking(move || db.register_agent(&a))
275 .await
276 .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
277 }
278
279 async fn get_agent(&self, agent_id: &str) -> Result<Option<Agent>> {
280 let db = self.db.clone();
281 let id = agent_id.to_string();
282 task::spawn_blocking(move || db.get_agent(&id))
283 .await
284 .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
285 }
286
287 async fn get_agent_by_name(&self, name: &str) -> Result<Option<Agent>> {
288 let db = self.db.clone();
289 let n = name.to_string();
290 task::spawn_blocking(move || db.get_agent_by_name(&n))
291 .await
292 .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
293 }
294
295 async fn list_agents(&self) -> Result<Vec<Agent>> {
296 let db = self.db.clone();
297 task::spawn_blocking(move || db.list_agents())
298 .await
299 .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
300 }
301
302 async fn add_agent_relation(
305 &self,
306 session_id: &str,
307 message_id: &str,
308 from_agent_id: &str,
309 target_message_id: &str,
310 stance: Stance,
311 ) -> Result<AgentRelation> {
312 let db = self.db.clone();
313 let sid = session_id.to_string();
314 let mid = message_id.to_string();
315 let fid = from_agent_id.to_string();
316 let tmid = target_message_id.to_string();
317 task::spawn_blocking(move || db.add_agent_relation(&sid, &mid, &fid, &tmid, stance))
318 .await
319 .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
320 }
321
322 async fn get_agent_relations(&self, session_id: &str) -> Result<AgentRelationSummary> {
323 let db = self.db.clone();
324 let sid = session_id.to_string();
325 task::spawn_blocking(move || db.get_agent_relations(&sid))
326 .await
327 .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
328 }
329
330 async fn get_session_agents(&self, session_id: &str) -> Result<Vec<Agent>> {
331 let db = self.db.clone();
332 let sid = session_id.to_string();
333 task::spawn_blocking(move || db.get_session_agents(&sid))
334 .await
335 .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))?
336 }
337}
338
339struct HnswVectorSearch {
343 index: Arc<VectorIndex>,
344}
345
346impl HnswVectorSearch {
347 fn new(index: Arc<VectorIndex>) -> Self {
348 Self { index }
349 }
350}
351
352#[async_trait]
353impl VectorSearchProvider for HnswVectorSearch {
354 async fn search(&self, query_embedding: &[f32], k: usize) -> Result<Vec<VectorSearchResult>> {
355 let index = self.index.clone();
356 let query = query_embedding.to_vec();
357
358 let results = task::spawn_blocking(move || index.search(&query, k))
359 .await
360 .map_err(|e| Error::Other(anyhow::anyhow!("Task join error: {}", e)))??;
361
362 Ok(results.into_iter().map(VectorSearchResult::from).collect())
363 }
364
365 fn len(&self) -> usize {
366 self.index.len()
367 }
368
369 fn dimension(&self) -> usize {
370 self.index
371 .spans()
372 .first()
373 .and_then(|s| s.embedding.as_ref().map(|e| e.len()))
374 .unwrap_or(384) }
376}