Skip to main content

innate_core/storage/
chunks.rs

1use super::*;
2
3impl Storage {
4    pub fn insert_chunk(&self, c: &ChunkRow) -> Result<()> {
5        self.conn.execute(
6            "INSERT INTO chunks (
7                id, skill_name, seq, content, trigger_desc, anti_trigger_desc,
8                content_hash, token_count, origin, source, maturity, related_ids,
9                protected, state, state_reason, state_updated_at,
10                confidence, confidence_base, confidence_reason, version, distilled_from,
11                distill_provider, distill_model, distill_prompt_version, parent_id,
12                selected_count, used_count, used_success_count,
13                success_trace_ids_count, last_success_at, last_agg_ts,
14                embed_version, created_at, updated_at, last_used_at
15            ) VALUES (
16                ?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,
17                ?13,?14,?15,?16,?17,?18,?19,?20,?21,?22,?23,?24,?25,
18                ?26,?27,?28,?29,?30,?31,?32,?33,?34,?35
19            )",
20            params![
21                c.id,
22                c.skill_name,
23                c.seq,
24                c.content,
25                c.trigger_desc,
26                c.anti_trigger_desc,
27                c.content_hash,
28                c.token_count,
29                c.origin,
30                c.source,
31                c.maturity,
32                c.related_ids,
33                c.protected,
34                c.state,
35                c.state_reason,
36                c.state_updated_at,
37                c.confidence,
38                c.confidence,
39                c.confidence_reason,
40                c.version,
41                c.distilled_from,
42                c.distill_provider,
43                c.distill_model,
44                c.distill_prompt_version,
45                c.parent_id,
46                c.selected_count,
47                c.used_count,
48                c.used_success_count,
49                c.success_trace_ids_count,
50                c.last_success_at,
51                c.last_agg_ts,
52                c.embed_version,
53                c.created_at,
54                c.updated_at,
55                c.last_used_at
56            ],
57        )?;
58        Ok(())
59    }
60
61    pub fn insert_vec_content(&self, chunk_id: &str, emb: &[u8]) -> Result<()> {
62        self.conn.execute(
63            "INSERT OR REPLACE INTO vec_content(chunk_id, embedding) VALUES (?,?)",
64            params![chunk_id, emb],
65        )?;
66        self.note_vector_write(&self.vec_content_cache, chunk_id, emb)
67    }
68
69    pub fn insert_vec_trigger(&self, chunk_id: &str, emb: &[u8]) -> Result<()> {
70        self.conn.execute(
71            "INSERT OR REPLACE INTO vec_trigger(chunk_id, embedding) VALUES (?,?)",
72            params![chunk_id, emb],
73        )?;
74        self.note_vector_write(&self.vec_trigger_cache, chunk_id, emb)
75    }
76
77    /// Record a single vector write: bump the shared revision (so *other*
78    /// processes drop their caches), upsert the one entry into the warm cache
79    /// in place (so *this* long-lived process keeps its cache instead of
80    /// reloading the whole corpus), then re-sync the local revision tracker so
81    /// our own bump does not trip `refresh_vector_caches_if_changed`.
82    ///
83    /// A cold cache (None) is left cold — the next search loads everything,
84    /// including this row. This keeps bulk paths (e.g. full re-embed) O(N) by
85    /// invalidating once up front rather than upserting per write.
86    fn note_vector_write(&self, cache: &VectorCache, chunk_id: &str, emb: &[u8]) -> Result<()> {
87        self.bump_vector_revision()?;
88        if let Some(entries) = cache.borrow_mut().as_mut() {
89            let mut v = unpack_embedding(emb);
90            l2_normalize(&mut v);
91            match entries.iter_mut().find(|(id, _)| id == chunk_id) {
92                Some(slot) => slot.1 = v,
93                None => entries.push((chunk_id.to_string(), v)),
94            }
95        }
96        self.sync_vector_revision()
97    }
98
99    /// Monotonically advance `meta.vector_revision`. Any vector write must call
100    /// this so that other processes detect the change and drop their in-memory
101    /// caches on the next search (see `refresh_vector_caches_if_changed`).
102    fn bump_vector_revision(&self) -> Result<()> {
103        self.conn.execute(
104            "INSERT INTO meta(key, value) VALUES ('vector_revision', '1')
105             ON CONFLICT(key) DO UPDATE SET value=CAST(value AS INTEGER)+1",
106            [],
107        )?;
108        Ok(())
109    }
110
111    /// Align the local revision tracker with the persisted value so an in-place
112    /// cache update performed by this process is not discarded on the next search.
113    fn sync_vector_revision(&self) -> Result<()> {
114        let current = self
115            .get_meta("vector_revision")?
116            .and_then(|v| v.parse::<i64>().ok())
117            .unwrap_or(0);
118        self.vector_cache_revision.set(Some(current));
119        Ok(())
120    }
121
122    /// Drop both in-memory vector caches and reset the revision tracker. Used on
123    /// transaction rollback (in-place upserts may not have persisted) and before
124    /// bulk re-embed loops (to avoid O(N²) in-place upserts on a warm cache).
125    pub(crate) fn invalidate_vector_caches(&self) {
126        *self.vec_content_cache.borrow_mut() = None;
127        *self.vec_trigger_cache.borrow_mut() = None;
128        self.vector_cache_revision.set(None);
129    }
130
131    /// Paginated chunk listing for the web viewer. Filters by exact `state` and
132    /// `origin` when provided; returns a compact projection (content truncated to
133    /// a preview) ordered newest-first. Read-only — never mutates.
134    pub fn list_chunks(
135        &self,
136        state: Option<&str>,
137        origin: Option<&str>,
138        limit: usize,
139        offset: usize,
140    ) -> Result<Vec<Value>> {
141        let mut sql = String::from(
142            "SELECT id, skill_name, seq, origin, state, state_reason, maturity, \
143             confidence, token_count, protected, selected_count, used_count, \
144             used_success_count, substr(content, 1, 280) AS content_preview, \
145             created_at, updated_at, last_used_at \
146             FROM chunks",
147        );
148        let mut clauses: Vec<&str> = Vec::new();
149        if state.is_some() {
150            clauses.push("state = :state");
151        }
152        if origin.is_some() {
153            clauses.push("origin = :origin");
154        }
155        if !clauses.is_empty() {
156            sql.push_str(" WHERE ");
157            sql.push_str(&clauses.join(" AND "));
158        }
159        sql.push_str(" ORDER BY created_at DESC LIMIT :limit OFFSET :offset");
160
161        let mut stmt = self.conn.prepare(&sql)?;
162        let names: Vec<String> = stmt.column_names().into_iter().map(String::from).collect();
163        let mut params: Vec<(&str, &dyn rusqlite::ToSql)> = Vec::new();
164        if let Some(s) = state.as_ref() {
165            params.push((":state", s));
166        }
167        if let Some(o) = origin.as_ref() {
168            params.push((":origin", o));
169        }
170        let limit_i = limit as i64;
171        let offset_i = offset as i64;
172        params.push((":limit", &limit_i));
173        params.push((":offset", &offset_i));
174
175        let rows = stmt.query_map(params.as_slice(), |r| row_to_json_with_names(r, &names))?;
176        let mut out = Vec::new();
177        for row in rows {
178            out.push(row?);
179        }
180        Ok(out)
181    }
182
183    pub fn get_chunk(&self, id: &str) -> Result<Option<Value>> {
184        let mut stmt = self
185            .conn
186            .prepare_cached("SELECT * FROM chunks WHERE id=?")?;
187        let row = stmt.query_row([id], row_to_json);
188        match row {
189            Ok(v) => Ok(Some(v)),
190            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
191            Err(e) => Err(e.into()),
192        }
193    }
194
195    pub fn update_chunk_state(
196        &self,
197        id: &str,
198        state: &str,
199        reason: Option<&str>,
200        now: &str,
201    ) -> Result<()> {
202        self.conn.execute(
203            "UPDATE chunks SET state=?, state_reason=?, state_updated_at=?, updated_at=? WHERE id=?",
204            params![state, reason, now, now, id],
205        )?;
206        Ok(())
207    }
208
209    pub fn update_chunk_confidence(
210        &self,
211        id: &str,
212        conf: f64,
213        reason: Option<&str>,
214        now: &str,
215    ) -> Result<()> {
216        self.conn.execute(
217            "UPDATE chunks
218             SET confidence=?, confidence_base=?, confidence_reason=?, updated_at=?
219             WHERE id=?",
220            params![conf, conf, reason, now, id],
221        )?;
222        Ok(())
223    }
224
225    pub fn update_chunk_last_used(&self, id: &str, now: &str) -> Result<()> {
226        self.conn.execute(
227            "UPDATE chunks SET last_used_at=?, updated_at=? WHERE id=?",
228            params![now, now, id],
229        )?;
230        Ok(())
231    }
232
233    pub fn get_chunk_by_hash(&self, hash: &str) -> Result<Option<Value>> {
234        let row = self.conn.query_row(
235            "SELECT * FROM chunks WHERE content_hash=? LIMIT 1",
236            [hash],
237            row_to_json,
238        );
239        match row {
240            Ok(v) => Ok(Some(v)),
241            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
242            Err(e) => Err(e.into()),
243        }
244    }
245
246    // ------------------------------------------------------------------
247    // Vector search (pure-Rust cosine similarity, replaces sqlite-vec)
248    // ------------------------------------------------------------------
249
250    pub fn search_vec_content(&self, query: &[f32], limit: usize) -> Result<Vec<(String, f32)>> {
251        self.search_vec(&self.vec_content_cache, "vec_content", query, limit)
252    }
253
254    pub fn search_vec_trigger(&self, query: &[f32], limit: usize) -> Result<Vec<(String, f32)>> {
255        self.search_vec(&self.vec_trigger_cache, "vec_trigger", query, limit)
256    }
257
258    fn search_vec(
259        &self,
260        cache_cell: &VectorCache,
261        table: &str,
262        query: &[f32],
263        limit: usize,
264    ) -> Result<Vec<(String, f32)>> {
265        if limit == 0 {
266            return Ok(Vec::new());
267        }
268        self.refresh_vector_caches_if_changed()?;
269
270        // Populate cache on first access after open or invalidation.
271        // Stored vectors are L2-normalised here so the search inner loop can use
272        // a plain dot product instead of recomputing norms on every comparison.
273        if cache_cell.borrow().is_none() {
274            let sql = format!("SELECT chunk_id, embedding FROM {table}");
275            let mut stmt = self.conn.prepare(&sql)?;
276            let raw: Vec<(String, Vec<u8>)> = stmt
277                .query_map([], |r| {
278                    Ok((r.get::<_, String>(0)?, r.get::<_, Vec<u8>>(1)?))
279                })?
280                .collect::<rusqlite::Result<Vec<_>>>()?;
281            let mut entries: Vec<(String, Vec<f32>)> = Vec::with_capacity(raw.len());
282            for (id, blob) in raw {
283                // Fail-closed: a persisted embedding must be a whole number of
284                // f32 values (4 bytes each). A structurally corrupt blob aborts
285                // the load rather than silently yielding a truncated vector.
286                if blob.is_empty() || blob.len() % 4 != 0 {
287                    return Err(crate::errors::InnateError::Other(format!(
288                        "corrupt embedding for chunk {id} in {table}: {} bytes (not a non-zero multiple of 4)",
289                        blob.len()
290                    )));
291                }
292                let mut v = unpack_embedding(&blob);
293                l2_normalize(&mut v);
294                entries.push((id, v));
295            }
296            *cache_cell.borrow_mut() = Some(entries);
297        }
298
299        let cache = cache_cell.borrow();
300        let entries = cache.as_ref().unwrap();
301
302        // Normalise the query once; cached vectors are already unit-length, so
303        // cosine similarity reduces to a dot product over each entry.
304        let mut q = query.to_vec();
305        l2_normalize(&mut q);
306
307        // Score by (index, similarity) without cloning ids; partial-sort the top
308        // `limit` to the front (O(N) select), then clone ids for the winners only.
309        // Only score vectors whose dimension matches the query. A mismatch means
310        // a stale embed_version vector or a (4-byte-aligned) corruption — either
311        // way it must not contribute a truncated/garbage dot product.
312        let mut scored: Vec<(usize, f32)> = entries
313            .iter()
314            .enumerate()
315            .filter(|(_, (_, v))| v.len() == q.len())
316            .map(|(i, (_, v))| (i, dot_product(&q, v)))
317            .collect();
318        if scored.len() > limit {
319            scored.select_nth_unstable_by(limit - 1, |a, b| {
320                b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)
321            });
322            scored.truncate(limit);
323        }
324        scored.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
325        Ok(scored
326            .into_iter()
327            .map(|(i, sim)| (entries[i].0.clone(), sim))
328            .collect())
329    }
330
331    /// Lexical/BM25 retrieval channel (hybrid 检索的词法一路). Builds a safe FTS5
332    /// MATCH from the query's alphanumeric tokens and returns up to `limit`
333    /// non-archived, non-spark chunks ranked by BM25, with each score normalised
334    /// to `(0,1]` (best match → 1.0) so it can fuse alongside cosine sims.
335    /// Returns empty when the query has no usable tokens (degrades to vector-only).
336    pub fn search_lexical(&self, query: &str, limit: usize) -> Result<Vec<(String, f32)>> {
337        if limit == 0 {
338            return Ok(Vec::new());
339        }
340        let Some(match_expr) = fts5_match_query(query) else {
341            return Ok(Vec::new());
342        };
343        let mut stmt = self.conn.prepare_cached(
344            "SELECT chunks_fts.id, bm25(chunks_fts) AS score
345             FROM chunks_fts
346             JOIN chunks c ON c.id = chunks_fts.id
347             WHERE chunks_fts MATCH ?1
348               AND c.state != 'archived' AND c.origin != 'spark'
349             ORDER BY score ASC
350             LIMIT ?2",
351        )?;
352        let rows = stmt.query_map(params![match_expr, limit as i64], |r| {
353            Ok((r.get::<_, String>(0)?, r.get::<_, f64>(1)?))
354        })?;
355        // FTS5 bm25(): more negative = more relevant. Convert to positive
356        // relevance and normalise by the best in this result set (scale-stable
357        // across queries; the top lexical hit always maps to 1.0).
358        let raw: Vec<(String, f64)> = rows.collect::<rusqlite::Result<Vec<_>>>()?;
359        let best_rel = raw.iter().map(|(_, s)| -s).fold(f64::MIN, f64::max);
360        let out = raw
361            .into_iter()
362            .enumerate()
363            .map(|(i, (id, score))| {
364                let sim = if best_rel > 0.0 {
365                    (-score / best_rel).clamp(0.0, 1.0) as f32
366                } else {
367                    // Degenerate (non-positive relevances): fall back to rank decay.
368                    ((limit - i) as f32) / (limit as f32)
369                };
370                (id, sim)
371            })
372            .collect();
373        Ok(out)
374    }
375
376    fn refresh_vector_caches_if_changed(&self) -> Result<()> {
377        let current = self
378            .get_meta("vector_revision")?
379            .and_then(|value| value.parse::<i64>().ok())
380            .unwrap_or(0);
381        let previous = self.vector_cache_revision.replace(Some(current));
382        if previous.is_some_and(|revision| revision != current) {
383            *self.vec_content_cache.borrow_mut() = None;
384            *self.vec_trigger_cache.borrow_mut() = None;
385        }
386        Ok(())
387    }
388
389    /// Fetch multiple chunks by id in one query; returns a map of id → chunk JSON.
390    pub fn get_chunks_by_ids(&self, ids: &[&str]) -> Result<HashMap<String, Value>> {
391        if ids.is_empty() {
392            return Ok(HashMap::new());
393        }
394        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
395        let sql = format!("SELECT * FROM chunks WHERE id IN ({placeholders})");
396        let mut stmt = self.conn.prepare(&sql)?;
397        let names: Vec<String> = stmt.column_names().iter().map(|s| s.to_string()).collect();
398        let rows = stmt.query_map(rusqlite::params_from_iter(ids.iter()), |r| {
399            row_to_json_with_names(r, &names)
400        })?;
401        let mut map = HashMap::with_capacity(ids.len());
402        for row in rows {
403            let row = row?;
404            if let Some(id) = row.get("id").and_then(Value::as_str) {
405                map.insert(id.to_string(), row);
406            }
407        }
408        Ok(map)
409    }
410
411    // ------------------------------------------------------------------
412    // Invalidated hashes
413    // ------------------------------------------------------------------
414
415    pub fn is_hash_invalidated(&self, hash: &str) -> Result<bool> {
416        let count: i64 = self.conn.query_row(
417            "SELECT count(*) FROM invalidated_hashes WHERE content_hash=?",
418            [hash],
419            |r| r.get(0),
420        )?;
421        Ok(count > 0)
422    }
423
424    pub fn insert_invalidated_hash(
425        &self,
426        hash: &str,
427        reason: Option<&str>,
428        ts: &str,
429    ) -> Result<()> {
430        self.conn.execute(
431            "INSERT OR IGNORE INTO invalidated_hashes(content_hash, reason, ts) VALUES (?,?,?)",
432            params![hash, reason, ts],
433        )?;
434        Ok(())
435    }
436
437    // ------------------------------------------------------------------
438    // Usage trace
439    // ------------------------------------------------------------------
440
441    // Chunk queries (aggregate / curate helpers)
442    // ------------------------------------------------------------------
443
444    pub(crate) fn query_chunks(&self, sql: &str) -> Result<Vec<Value>> {
445        self.query_json(sql, params![])
446    }
447
448    pub(crate) fn query_chunks_params<P: rusqlite::Params>(
449        &self,
450        sql: &str,
451        p: P,
452    ) -> Result<Vec<Value>> {
453        self.query_json(sql, p)
454    }
455
456    // ------------------------------------------------------------------
457    // Deps
458    // ------------------------------------------------------------------
459
460    pub fn get_deps(&self, chunk_id: &str) -> Result<Vec<DepEdge>> {
461        let mut stmt = self
462            .conn
463            .prepare_cached("SELECT dst, kind, dst_lib FROM deps WHERE src=?")?;
464        let rows = stmt.query_map([chunk_id], |r| {
465            Ok((
466                r.get::<_, String>(0)?,
467                r.get::<_, String>(1)?,
468                r.get::<_, Option<String>>(2)?,
469            ))
470        })?;
471        Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
472    }
473
474    /// Batch variant of `get_deps`: fetch outgoing edges for many sources in one
475    /// query. Returns `src` → `[(dst, kind, dst_lib)]`. Sources with no edges are
476    /// simply absent from the map.
477    pub fn get_deps_batch(&self, srcs: &[&str]) -> Result<HashMap<String, Vec<DepEdge>>> {
478        if srcs.is_empty() {
479            return Ok(HashMap::new());
480        }
481        let placeholders = srcs.iter().map(|_| "?").collect::<Vec<_>>().join(",");
482        let sql = format!("SELECT src, dst, kind, dst_lib FROM deps WHERE src IN ({placeholders})");
483        let mut stmt = self.conn.prepare(&sql)?;
484        let rows = stmt.query_map(rusqlite::params_from_iter(srcs.iter()), |r| {
485            Ok((
486                r.get::<_, String>(0)?,
487                r.get::<_, String>(1)?,
488                r.get::<_, String>(2)?,
489                r.get::<_, Option<String>>(3)?,
490            ))
491        })?;
492        let mut map: HashMap<String, Vec<(String, String, Option<String>)>> = HashMap::new();
493        for row in rows {
494            let (src, dst, kind, lib) = row?;
495            map.entry(src).or_default().push((dst, kind, lib));
496        }
497        Ok(map)
498    }
499
500    pub fn get_reverse_deps(&self, chunk_id: &str) -> Result<Vec<String>> {
501        let mut stmt = self
502            .conn
503            .prepare_cached("SELECT src FROM deps WHERE dst=?")?;
504        let rows = stmt.query_map([chunk_id], |r| r.get::<_, String>(0))?;
505        Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
506    }
507
508    pub fn insert_dep(
509        &self,
510        src: &str,
511        dst: &str,
512        kind: &str,
513        dst_lib: Option<&str>,
514    ) -> Result<()> {
515        self.conn.execute(
516            "INSERT OR IGNORE INTO deps(src,dst,kind,dst_lib) VALUES (?,?,?,?)",
517            params![src, dst, kind, dst_lib],
518        )?;
519        Ok(())
520    }
521
522    // ------------------------------------------------------------------
523    // Chunk success traces (aggregate fact table)
524    // ------------------------------------------------------------------
525
526    pub fn upsert_chunk_success_trace(
527        &self,
528        chunk_id: &str,
529        trace_id: &str,
530        ts: &str,
531    ) -> Result<()> {
532        self.conn.execute(
533            "INSERT OR IGNORE INTO chunk_success_traces(chunk_id, trace_id, ts) VALUES (?,?,?)",
534            params![chunk_id, trace_id, ts],
535        )?;
536        Ok(())
537    }
538
539    // ------------------------------------------------------------------
540}
541
542/// Build a safe FTS5 MATCH expression from free text. Splits on non-alphanumeric
543/// boundaries, lowercases, drops 1-char and a few high-frequency tokens, quotes
544/// each remaining token as a phrase (so no FTS5 operator can be injected), and
545/// OR-joins them. BM25's IDF naturally downweights common terms, so aggressive
546/// stop-word pruning is unnecessary. Returns `None` when nothing usable remains.
547pub(crate) fn fts5_match_query(query: &str) -> Option<String> {
548    // Minimal stop set — only the highest-frequency function words. Kept small on
549    // purpose: BM25 idf handles the rest, and over-pruning loses recall.
550    const STOP: &[&str] = &[
551        "the", "a", "an", "to", "of", "in", "on", "for", "and", "or", "is", "do", "i",
552    ];
553    let mut seen = std::collections::HashSet::new();
554    let tokens: Vec<String> = query
555        .split(|c: char| !c.is_alphanumeric())
556        .filter(|t| t.len() >= 2)
557        .map(|t| t.to_lowercase())
558        .filter(|t| !STOP.contains(&t.as_str()))
559        .filter(|t| seen.insert(t.clone()))
560        .take(32)
561        .map(|t| format!("\"{t}\""))
562        .collect();
563    if tokens.is_empty() {
564        None
565    } else {
566        Some(tokens.join(" OR "))
567    }
568}