Skip to main content

aonyx_memory/
chunks.rs

1//! Searchable chunks store backed by SQLite FTS5.
2//!
3//! Port reference: Aonyx RAG `rag_system/utils/bm25_store.py` + `utils/hybrid_search.py`.
4//!
5//! ## V1 scope (this file)
6//! - Chunk = a piece of text + project + source + timestamp + free-form metadata.
7//! - SQLite **FTS5** virtual table provides BM25-ranked full-text search out
8//!   of the box, with a `unicode61 remove_diacritics 2` tokenizer that survives
9//!   accents.
10//! - `search_bm25(project?, query, k)` returns the top-`k` chunks ordered by
11//!   relevance, with positive `score = -bm25(...)` so larger = better.
12//!
13//! ## V1.1 (deferred)
14//! - Local embeddings via `fastembed-rs` (ONNX, ~30 MB model).
15//! - HNSW index for vector ANN search.
16//! - **RRF** fusion with `k = 60` combining BM25 + vectors.
17//! - Exponential temporal boost on recent chunks.
18//!
19//! The trait signature already accepts a `mode` field so V1.1 can extend it
20//! without breaking callers.
21
22use std::path::Path;
23use std::sync::{Arc, Mutex};
24
25use aonyx_core::{AonyxError, Result};
26use async_trait::async_trait;
27use chrono::{DateTime, Utc};
28use rusqlite::{params, Connection};
29use serde::{Deserialize, Serialize};
30use serde_json::Value as JsonValue;
31use uuid::Uuid;
32
33/// Stable identifier for a [`Chunk`].
34pub type ChunkId = Uuid;
35
36/// A piece of indexable text.
37#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
38pub struct Chunk {
39    /// Stable id (UUID v4 by default).
40    pub id: ChunkId,
41    /// Project slug this chunk belongs to.
42    pub project: String,
43    /// Source identifier (path, url, doc id).
44    pub source: String,
45    /// Raw chunk text.
46    pub content: String,
47    /// Creation timestamp.
48    pub ts: DateTime<Utc>,
49    /// Optional classifier (`"code"`, `"note"`, `"diary"`, `"doc"`).
50    pub kind: Option<String>,
51    /// Free-form JSON metadata (e.g. AST symbol name + line range for code chunks).
52    #[serde(default)]
53    pub metadata: JsonValue,
54}
55
56impl Chunk {
57    /// Build a new chunk with sensible defaults.
58    pub fn new(
59        project: impl Into<String>,
60        source: impl Into<String>,
61        content: impl Into<String>,
62    ) -> Self {
63        Self {
64            id: Uuid::new_v4(),
65            project: project.into(),
66            source: source.into(),
67            content: content.into(),
68            ts: Utc::now(),
69            kind: None,
70            metadata: JsonValue::Null,
71        }
72    }
73
74    /// Attach a classifier.
75    pub fn with_kind(mut self, kind: impl Into<String>) -> Self {
76        self.kind = Some(kind.into());
77        self
78    }
79
80    /// Attach JSON metadata.
81    pub fn with_metadata(mut self, metadata: JsonValue) -> Self {
82        self.metadata = metadata;
83        self
84    }
85}
86
87/// A search hit: a chunk and its score (larger = more relevant).
88#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
89pub struct ScoredChunk {
90    /// The matched chunk.
91    pub chunk: Chunk,
92    /// Relevance score (positive; we flip SQLite's negative BM25).
93    pub score: f32,
94}
95
96/// Async chunks store.
97#[async_trait]
98pub trait ChunksStore: Send + Sync {
99    /// Append a new chunk.
100    async fn append(&self, chunk: Chunk) -> Result<ChunkId>;
101
102    /// BM25 search.
103    ///
104    /// `project = None` searches across every project; `Some(p)` scopes to one.
105    /// `k` caps the number of hits.
106    async fn search_bm25(
107        &self,
108        project: Option<&str>,
109        query: &str,
110        k: usize,
111    ) -> Result<Vec<ScoredChunk>>;
112
113    /// Total chunk count, optionally scoped to a project.
114    async fn count(&self, project: Option<&str>) -> Result<usize>;
115}
116
117/// SQLite-backed [`ChunksStore`] using FTS5 for BM25 ranking.
118#[derive(Clone)]
119pub struct SqliteChunksStore {
120    conn: Arc<Mutex<Connection>>,
121}
122
123impl SqliteChunksStore {
124    /// Open (or create) the chunks database at `path`.
125    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
126        let conn = Connection::open(path.as_ref())
127            .map_err(|e| AonyxError::Memory(format!("open chunks db: {e}")))?;
128        Self::migrate(&conn)?;
129        Ok(Self {
130            conn: Arc::new(Mutex::new(conn)),
131        })
132    }
133
134    /// Open an in-memory database — convenient for tests.
135    pub fn open_in_memory() -> Result<Self> {
136        let conn = Connection::open_in_memory()
137            .map_err(|e| AonyxError::Memory(format!("open in-memory chunks: {e}")))?;
138        Self::migrate(&conn)?;
139        Ok(Self {
140            conn: Arc::new(Mutex::new(conn)),
141        })
142    }
143
144    fn migrate(conn: &Connection) -> Result<()> {
145        conn.execute_batch(MIGRATION_V1)
146            .map_err(|e| AonyxError::Memory(format!("migrate chunks schema: {e}")))?;
147        Ok(())
148    }
149}
150
151const MIGRATION_V1: &str = r#"
152CREATE VIRTUAL TABLE IF NOT EXISTS chunks_fts USING fts5(
153    uuid           UNINDEXED,
154    project        UNINDEXED,
155    source         UNINDEXED,
156    ts             UNINDEXED,
157    kind           UNINDEXED,
158    metadata_json  UNINDEXED,
159    content,
160    tokenize = 'unicode61 remove_diacritics 2'
161);
162"#;
163
164#[async_trait]
165impl ChunksStore for SqliteChunksStore {
166    async fn append(&self, chunk: Chunk) -> Result<ChunkId> {
167        let conn = self.conn.clone();
168        let id = chunk.id;
169        tokio::task::spawn_blocking(move || -> Result<()> {
170            let lock = conn.lock().expect("chunks mutex poisoned");
171            lock.execute(
172                r#"
173                INSERT INTO chunks_fts (uuid, project, source, ts, kind, metadata_json, content)
174                VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
175                "#,
176                params![
177                    chunk.id.to_string(),
178                    chunk.project,
179                    chunk.source,
180                    chunk.ts.to_rfc3339(),
181                    chunk.kind,
182                    serde_json::to_string(&chunk.metadata).ok(),
183                    chunk.content,
184                ],
185            )
186            .map_err(|e| AonyxError::Memory(format!("chunks append: {e}")))?;
187            Ok(())
188        })
189        .await
190        .map_err(|e| AonyxError::Memory(format!("chunks append join: {e}")))??;
191        Ok(id)
192    }
193
194    async fn search_bm25(
195        &self,
196        project: Option<&str>,
197        query: &str,
198        k: usize,
199    ) -> Result<Vec<ScoredChunk>> {
200        let conn = self.conn.clone();
201        let query = query.to_string();
202        let project = project.map(str::to_string);
203        let limit = k as i64;
204        tokio::task::spawn_blocking(move || -> Result<Vec<ScoredChunk>> {
205            let lock = conn.lock().expect("chunks mutex poisoned");
206            let (sql, with_project) = if project.is_some() {
207                (
208                    "SELECT uuid, project, source, ts, kind, metadata_json, content, bm25(chunks_fts) AS score
209                     FROM chunks_fts
210                     WHERE chunks_fts MATCH ?1 AND project = ?2
211                     ORDER BY score ASC
212                     LIMIT ?3",
213                    true,
214                )
215            } else {
216                (
217                    "SELECT uuid, project, source, ts, kind, metadata_json, content, bm25(chunks_fts) AS score
218                     FROM chunks_fts
219                     WHERE chunks_fts MATCH ?1
220                     ORDER BY score ASC
221                     LIMIT ?2",
222                    false,
223                )
224            };
225            let mut stmt = lock
226                .prepare(sql)
227                .map_err(|e| AonyxError::Memory(format!("prepare search_bm25: {e}")))?;
228            let row_iter = if with_project {
229                stmt.query_map(
230                    params![query, project.as_ref().expect("project guarded above"), limit],
231                    decode_row,
232                )
233            } else {
234                stmt.query_map(params![query, limit], decode_row)
235            }
236            .map_err(|e| AonyxError::Memory(format!("query search_bm25: {e}")))?;
237
238            let mut out = Vec::new();
239            for r in row_iter {
240                out.push(r.map_err(|e| AonyxError::Memory(format!("row decode: {e}")))?);
241            }
242            Ok(out)
243        })
244        .await
245        .map_err(|e| AonyxError::Memory(format!("chunks search join: {e}")))?
246    }
247
248    async fn count(&self, project: Option<&str>) -> Result<usize> {
249        let conn = self.conn.clone();
250        let project = project.map(str::to_string);
251        tokio::task::spawn_blocking(move || -> Result<usize> {
252            let lock = conn.lock().expect("chunks mutex poisoned");
253            let n: i64 = match project {
254                Some(p) => lock
255                    .query_row(
256                        "SELECT COUNT(*) FROM chunks_fts WHERE project = ?1",
257                        params![p],
258                        |r| r.get(0),
259                    )
260                    .map_err(|e| AonyxError::Memory(format!("count: {e}")))?,
261                None => lock
262                    .query_row("SELECT COUNT(*) FROM chunks_fts", [], |r| r.get(0))
263                    .map_err(|e| AonyxError::Memory(format!("count: {e}")))?,
264            };
265            Ok(n.max(0) as usize)
266        })
267        .await
268        .map_err(|e| AonyxError::Memory(format!("chunks count join: {e}")))?
269    }
270}
271
272fn decode_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<ScoredChunk> {
273    let uuid_str: String = row.get(0)?;
274    let project: String = row.get(1)?;
275    let source: String = row.get(2)?;
276    let ts_raw: String = row.get(3)?;
277    let kind: Option<String> = row.get(4)?;
278    let metadata_raw: Option<String> = row.get(5)?;
279    let content: String = row.get(6)?;
280    let raw_score: f64 = row.get(7)?;
281
282    let id = Uuid::parse_str(&uuid_str).map_err(|e| {
283        rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e))
284    })?;
285    let ts = DateTime::parse_from_rfc3339(&ts_raw)
286        .map(|d| d.with_timezone(&Utc))
287        .unwrap_or_else(|_| Utc::now());
288    let metadata = metadata_raw
289        .and_then(|s| serde_json::from_str(&s).ok())
290        .unwrap_or(JsonValue::Null);
291
292    Ok(ScoredChunk {
293        chunk: Chunk {
294            id,
295            project,
296            source,
297            content,
298            ts,
299            kind,
300            metadata,
301        },
302        // SQLite's bm25() returns negative values; flip the sign so larger = better.
303        score: -(raw_score as f32),
304    })
305}
306
307#[cfg(test)]
308mod tests {
309    use super::*;
310
311    async fn seeded_store() -> SqliteChunksStore {
312        let store = SqliteChunksStore::open_in_memory().unwrap();
313        store
314            .append(Chunk::new(
315                "demo",
316                "src/lib.rs",
317                "the agent loops over tool calls",
318            ))
319            .await
320            .unwrap();
321        store
322            .append(Chunk::new(
323                "demo",
324                "src/runner.rs",
325                "compaction kicks in at fifty percent",
326            ))
327            .await
328            .unwrap();
329        store
330            .append(Chunk::new("other", "README.md", "another project entirely"))
331            .await
332            .unwrap();
333        store
334    }
335
336    #[tokio::test]
337    async fn append_then_count() {
338        let store = SqliteChunksStore::open_in_memory().unwrap();
339        store
340            .append(Chunk::new("demo", "a.txt", "hello aonyx"))
341            .await
342            .unwrap();
343        assert_eq!(store.count(None).await.unwrap(), 1);
344        assert_eq!(store.count(Some("demo")).await.unwrap(), 1);
345        assert_eq!(store.count(Some("other")).await.unwrap(), 0);
346    }
347
348    #[tokio::test]
349    async fn search_bm25_returns_relevant_chunks() {
350        let store = seeded_store().await;
351        let hits = store.search_bm25(None, "compaction", 10).await.unwrap();
352        assert_eq!(hits.len(), 1);
353        assert!(hits[0].chunk.content.contains("compaction"));
354        assert!(hits[0].score > 0.0);
355    }
356
357    #[tokio::test]
358    async fn search_bm25_can_scope_to_project() {
359        let store = seeded_store().await;
360        let in_demo = store
361            .search_bm25(Some("demo"), "project OR agent", 10)
362            .await
363            .unwrap();
364        let in_other = store
365            .search_bm25(Some("other"), "project OR agent", 10)
366            .await
367            .unwrap();
368        assert!(in_demo.iter().all(|h| h.chunk.project == "demo"));
369        assert!(in_other.iter().all(|h| h.chunk.project == "other"));
370    }
371
372    #[tokio::test]
373    async fn search_bm25_returns_empty_when_no_match() {
374        let store = seeded_store().await;
375        let hits = store
376            .search_bm25(None, "nothing_should_match_this", 10)
377            .await
378            .unwrap();
379        assert!(hits.is_empty());
380    }
381
382    #[tokio::test]
383    async fn search_bm25_honours_limit() {
384        let store = SqliteChunksStore::open_in_memory().unwrap();
385        for i in 0..5 {
386            store
387                .append(Chunk::new("demo", "x", format!("repeat token {i}")))
388                .await
389                .unwrap();
390        }
391        let hits = store.search_bm25(None, "repeat", 2).await.unwrap();
392        assert_eq!(hits.len(), 2);
393    }
394}