1use std::path::Path;
23use std::sync::{Arc, Mutex};
24
25use aonyx_core::{AonyxError, Result};
26use async_trait::async_trait;
27use chrono::{DateTime, Utc};
28use rusqlite::{params, Connection};
29use serde::{Deserialize, Serialize};
30use serde_json::Value as JsonValue;
31use uuid::Uuid;
32
33pub type ChunkId = Uuid;
35
36#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
38pub struct Chunk {
39 pub id: ChunkId,
41 pub project: String,
43 pub source: String,
45 pub content: String,
47 pub ts: DateTime<Utc>,
49 pub kind: Option<String>,
51 #[serde(default)]
53 pub metadata: JsonValue,
54}
55
56impl Chunk {
57 pub fn new(
59 project: impl Into<String>,
60 source: impl Into<String>,
61 content: impl Into<String>,
62 ) -> Self {
63 Self {
64 id: Uuid::new_v4(),
65 project: project.into(),
66 source: source.into(),
67 content: content.into(),
68 ts: Utc::now(),
69 kind: None,
70 metadata: JsonValue::Null,
71 }
72 }
73
74 pub fn with_kind(mut self, kind: impl Into<String>) -> Self {
76 self.kind = Some(kind.into());
77 self
78 }
79
80 pub fn with_metadata(mut self, metadata: JsonValue) -> Self {
82 self.metadata = metadata;
83 self
84 }
85}
86
87#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
89pub struct ScoredChunk {
90 pub chunk: Chunk,
92 pub score: f32,
94}
95
96#[async_trait]
98pub trait ChunksStore: Send + Sync {
99 async fn append(&self, chunk: Chunk) -> Result<ChunkId>;
101
102 async fn search_bm25(
107 &self,
108 project: Option<&str>,
109 query: &str,
110 k: usize,
111 ) -> Result<Vec<ScoredChunk>>;
112
113 async fn count(&self, project: Option<&str>) -> Result<usize>;
115}
116
117#[derive(Clone)]
119pub struct SqliteChunksStore {
120 conn: Arc<Mutex<Connection>>,
121}
122
123impl SqliteChunksStore {
124 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
126 let conn = Connection::open(path.as_ref())
127 .map_err(|e| AonyxError::Memory(format!("open chunks db: {e}")))?;
128 Self::migrate(&conn)?;
129 Ok(Self {
130 conn: Arc::new(Mutex::new(conn)),
131 })
132 }
133
134 pub fn open_in_memory() -> Result<Self> {
136 let conn = Connection::open_in_memory()
137 .map_err(|e| AonyxError::Memory(format!("open in-memory chunks: {e}")))?;
138 Self::migrate(&conn)?;
139 Ok(Self {
140 conn: Arc::new(Mutex::new(conn)),
141 })
142 }
143
144 fn migrate(conn: &Connection) -> Result<()> {
145 conn.execute_batch(MIGRATION_V1)
146 .map_err(|e| AonyxError::Memory(format!("migrate chunks schema: {e}")))?;
147 Ok(())
148 }
149}
150
151const MIGRATION_V1: &str = r#"
152CREATE VIRTUAL TABLE IF NOT EXISTS chunks_fts USING fts5(
153 uuid UNINDEXED,
154 project UNINDEXED,
155 source UNINDEXED,
156 ts UNINDEXED,
157 kind UNINDEXED,
158 metadata_json UNINDEXED,
159 content,
160 tokenize = 'unicode61 remove_diacritics 2'
161);
162"#;
163
164#[async_trait]
165impl ChunksStore for SqliteChunksStore {
166 async fn append(&self, chunk: Chunk) -> Result<ChunkId> {
167 let conn = self.conn.clone();
168 let id = chunk.id;
169 tokio::task::spawn_blocking(move || -> Result<()> {
170 let lock = conn.lock().expect("chunks mutex poisoned");
171 lock.execute(
172 r#"
173 INSERT INTO chunks_fts (uuid, project, source, ts, kind, metadata_json, content)
174 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
175 "#,
176 params![
177 chunk.id.to_string(),
178 chunk.project,
179 chunk.source,
180 chunk.ts.to_rfc3339(),
181 chunk.kind,
182 serde_json::to_string(&chunk.metadata).ok(),
183 chunk.content,
184 ],
185 )
186 .map_err(|e| AonyxError::Memory(format!("chunks append: {e}")))?;
187 Ok(())
188 })
189 .await
190 .map_err(|e| AonyxError::Memory(format!("chunks append join: {e}")))??;
191 Ok(id)
192 }
193
194 async fn search_bm25(
195 &self,
196 project: Option<&str>,
197 query: &str,
198 k: usize,
199 ) -> Result<Vec<ScoredChunk>> {
200 let conn = self.conn.clone();
201 let query = query.to_string();
202 let project = project.map(str::to_string);
203 let limit = k as i64;
204 tokio::task::spawn_blocking(move || -> Result<Vec<ScoredChunk>> {
205 let lock = conn.lock().expect("chunks mutex poisoned");
206 let (sql, with_project) = if project.is_some() {
207 (
208 "SELECT uuid, project, source, ts, kind, metadata_json, content, bm25(chunks_fts) AS score
209 FROM chunks_fts
210 WHERE chunks_fts MATCH ?1 AND project = ?2
211 ORDER BY score ASC
212 LIMIT ?3",
213 true,
214 )
215 } else {
216 (
217 "SELECT uuid, project, source, ts, kind, metadata_json, content, bm25(chunks_fts) AS score
218 FROM chunks_fts
219 WHERE chunks_fts MATCH ?1
220 ORDER BY score ASC
221 LIMIT ?2",
222 false,
223 )
224 };
225 let mut stmt = lock
226 .prepare(sql)
227 .map_err(|e| AonyxError::Memory(format!("prepare search_bm25: {e}")))?;
228 let row_iter = if with_project {
229 stmt.query_map(
230 params![query, project.as_ref().expect("project guarded above"), limit],
231 decode_row,
232 )
233 } else {
234 stmt.query_map(params![query, limit], decode_row)
235 }
236 .map_err(|e| AonyxError::Memory(format!("query search_bm25: {e}")))?;
237
238 let mut out = Vec::new();
239 for r in row_iter {
240 out.push(r.map_err(|e| AonyxError::Memory(format!("row decode: {e}")))?);
241 }
242 Ok(out)
243 })
244 .await
245 .map_err(|e| AonyxError::Memory(format!("chunks search join: {e}")))?
246 }
247
248 async fn count(&self, project: Option<&str>) -> Result<usize> {
249 let conn = self.conn.clone();
250 let project = project.map(str::to_string);
251 tokio::task::spawn_blocking(move || -> Result<usize> {
252 let lock = conn.lock().expect("chunks mutex poisoned");
253 let n: i64 = match project {
254 Some(p) => lock
255 .query_row(
256 "SELECT COUNT(*) FROM chunks_fts WHERE project = ?1",
257 params![p],
258 |r| r.get(0),
259 )
260 .map_err(|e| AonyxError::Memory(format!("count: {e}")))?,
261 None => lock
262 .query_row("SELECT COUNT(*) FROM chunks_fts", [], |r| r.get(0))
263 .map_err(|e| AonyxError::Memory(format!("count: {e}")))?,
264 };
265 Ok(n.max(0) as usize)
266 })
267 .await
268 .map_err(|e| AonyxError::Memory(format!("chunks count join: {e}")))?
269 }
270}
271
272fn decode_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<ScoredChunk> {
273 let uuid_str: String = row.get(0)?;
274 let project: String = row.get(1)?;
275 let source: String = row.get(2)?;
276 let ts_raw: String = row.get(3)?;
277 let kind: Option<String> = row.get(4)?;
278 let metadata_raw: Option<String> = row.get(5)?;
279 let content: String = row.get(6)?;
280 let raw_score: f64 = row.get(7)?;
281
282 let id = Uuid::parse_str(&uuid_str).map_err(|e| {
283 rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e))
284 })?;
285 let ts = DateTime::parse_from_rfc3339(&ts_raw)
286 .map(|d| d.with_timezone(&Utc))
287 .unwrap_or_else(|_| Utc::now());
288 let metadata = metadata_raw
289 .and_then(|s| serde_json::from_str(&s).ok())
290 .unwrap_or(JsonValue::Null);
291
292 Ok(ScoredChunk {
293 chunk: Chunk {
294 id,
295 project,
296 source,
297 content,
298 ts,
299 kind,
300 metadata,
301 },
302 score: -(raw_score as f32),
304 })
305}
306
307#[cfg(test)]
308mod tests {
309 use super::*;
310
311 async fn seeded_store() -> SqliteChunksStore {
312 let store = SqliteChunksStore::open_in_memory().unwrap();
313 store
314 .append(Chunk::new(
315 "demo",
316 "src/lib.rs",
317 "the agent loops over tool calls",
318 ))
319 .await
320 .unwrap();
321 store
322 .append(Chunk::new(
323 "demo",
324 "src/runner.rs",
325 "compaction kicks in at fifty percent",
326 ))
327 .await
328 .unwrap();
329 store
330 .append(Chunk::new("other", "README.md", "another project entirely"))
331 .await
332 .unwrap();
333 store
334 }
335
336 #[tokio::test]
337 async fn append_then_count() {
338 let store = SqliteChunksStore::open_in_memory().unwrap();
339 store
340 .append(Chunk::new("demo", "a.txt", "hello aonyx"))
341 .await
342 .unwrap();
343 assert_eq!(store.count(None).await.unwrap(), 1);
344 assert_eq!(store.count(Some("demo")).await.unwrap(), 1);
345 assert_eq!(store.count(Some("other")).await.unwrap(), 0);
346 }
347
348 #[tokio::test]
349 async fn search_bm25_returns_relevant_chunks() {
350 let store = seeded_store().await;
351 let hits = store.search_bm25(None, "compaction", 10).await.unwrap();
352 assert_eq!(hits.len(), 1);
353 assert!(hits[0].chunk.content.contains("compaction"));
354 assert!(hits[0].score > 0.0);
355 }
356
357 #[tokio::test]
358 async fn search_bm25_can_scope_to_project() {
359 let store = seeded_store().await;
360 let in_demo = store
361 .search_bm25(Some("demo"), "project OR agent", 10)
362 .await
363 .unwrap();
364 let in_other = store
365 .search_bm25(Some("other"), "project OR agent", 10)
366 .await
367 .unwrap();
368 assert!(in_demo.iter().all(|h| h.chunk.project == "demo"));
369 assert!(in_other.iter().all(|h| h.chunk.project == "other"));
370 }
371
372 #[tokio::test]
373 async fn search_bm25_returns_empty_when_no_match() {
374 let store = seeded_store().await;
375 let hits = store
376 .search_bm25(None, "nothing_should_match_this", 10)
377 .await
378 .unwrap();
379 assert!(hits.is_empty());
380 }
381
382 #[tokio::test]
383 async fn search_bm25_honours_limit() {
384 let store = SqliteChunksStore::open_in_memory().unwrap();
385 for i in 0..5 {
386 store
387 .append(Chunk::new("demo", "x", format!("repeat token {i}")))
388 .await
389 .unwrap();
390 }
391 let hits = store.search_bm25(None, "repeat", 2).await.unwrap();
392 assert_eq!(hits.len(), 2);
393 }
394}