Skip to main content

innate_core/storage/
chunks.rs

1use super::*;
2
3impl Storage {
4    pub fn insert_chunk(&self, c: &ChunkRow) -> Result<()> {
5        self.conn.execute(
6            "INSERT INTO chunks (
7                id, skill_name, seq, content, trigger_desc, anti_trigger_desc,
8                content_hash, token_count, origin, source, maturity, related_ids,
9                protected, state, state_reason, state_updated_at,
10                confidence, confidence_base, confidence_reason, version, distilled_from,
11                distill_provider, distill_model, distill_prompt_version, parent_id,
12                selected_count, used_count, used_success_count,
13                success_trace_ids_count, last_success_at, last_agg_ts,
14                embed_version, created_at, updated_at, last_used_at, agent
15            ) VALUES (
16                ?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,
17                ?13,?14,?15,?16,?17,?18,?19,?20,?21,?22,?23,?24,?25,
18                ?26,?27,?28,?29,?30,?31,?32,?33,?34,?35,?36
19            )",
20            params![
21                c.id,
22                c.skill_name,
23                c.seq,
24                c.content,
25                c.trigger_desc,
26                c.anti_trigger_desc,
27                c.content_hash,
28                c.token_count,
29                c.origin,
30                c.source,
31                c.maturity,
32                c.related_ids,
33                c.protected,
34                c.state,
35                c.state_reason,
36                c.state_updated_at,
37                c.confidence,
38                c.confidence,
39                c.confidence_reason,
40                c.version,
41                c.distilled_from,
42                c.distill_provider,
43                c.distill_model,
44                c.distill_prompt_version,
45                c.parent_id,
46                c.selected_count,
47                c.used_count,
48                c.used_success_count,
49                c.success_trace_ids_count,
50                c.last_success_at,
51                c.last_agg_ts,
52                c.embed_version,
53                c.created_at,
54                c.updated_at,
55                c.last_used_at,
56                c.agent
57            ],
58        )?;
59        Ok(())
60    }
61
62    pub fn insert_vec_content(&self, chunk_id: &str, emb: &[u8]) -> Result<()> {
63        self.conn.execute(
64            "INSERT OR REPLACE INTO vec_content(chunk_id, embedding) VALUES (?,?)",
65            params![chunk_id, emb],
66        )?;
67        self.note_vector_write(&self.vec_content_cache, chunk_id, emb)
68    }
69
70    pub fn insert_vec_trigger(&self, chunk_id: &str, emb: &[u8]) -> Result<()> {
71        self.conn.execute(
72            "INSERT OR REPLACE INTO vec_trigger(chunk_id, embedding) VALUES (?,?)",
73            params![chunk_id, emb],
74        )?;
75        self.note_vector_write(&self.vec_trigger_cache, chunk_id, emb)
76    }
77
78    /// Record a single vector write: bump the shared revision (so *other*
79    /// processes drop their caches), upsert the one entry into the warm cache
80    /// in place (so *this* long-lived process keeps its cache instead of
81    /// reloading the whole corpus), then re-sync the local revision tracker so
82    /// our own bump does not trip `refresh_vector_caches_if_changed`.
83    ///
84    /// A cold cache (None) is left cold — the next search loads everything,
85    /// including this row. This keeps bulk paths (e.g. full re-embed) O(N) by
86    /// invalidating once up front rather than upserting per write.
87    fn note_vector_write(&self, cache: &VectorCache, chunk_id: &str, emb: &[u8]) -> Result<()> {
88        self.bump_vector_revision()?;
89        if let Some(entries) = cache.borrow_mut().as_mut() {
90            let mut v = unpack_embedding(emb);
91            l2_normalize(&mut v);
92            match entries.iter_mut().find(|(id, _)| id == chunk_id) {
93                Some(slot) => slot.1 = v,
94                None => entries.push((chunk_id.to_string(), v)),
95            }
96        }
97        self.sync_vector_revision()
98    }
99
100    /// Monotonically advance `meta.vector_revision`. Any vector write must call
101    /// this so that other processes detect the change and drop their in-memory
102    /// caches on the next search (see `refresh_vector_caches_if_changed`).
103    fn bump_vector_revision(&self) -> Result<()> {
104        self.conn.execute(
105            "INSERT INTO meta(key, value) VALUES ('vector_revision', '1')
106             ON CONFLICT(key) DO UPDATE SET value=CAST(value AS INTEGER)+1",
107            [],
108        )?;
109        Ok(())
110    }
111
112    /// Align the local revision tracker with the persisted value so an in-place
113    /// cache update performed by this process is not discarded on the next search.
114    fn sync_vector_revision(&self) -> Result<()> {
115        let current = self
116            .get_meta("vector_revision")?
117            .and_then(|v| v.parse::<i64>().ok())
118            .unwrap_or(0);
119        self.vector_cache_revision.set(Some(current));
120        Ok(())
121    }
122
123    /// Drop both in-memory vector caches and reset the revision tracker. Used on
124    /// transaction rollback (in-place upserts may not have persisted) and before
125    /// bulk re-embed loops (to avoid O(N²) in-place upserts on a warm cache).
126    pub(crate) fn invalidate_vector_caches(&self) {
127        *self.vec_content_cache.borrow_mut() = None;
128        *self.vec_trigger_cache.borrow_mut() = None;
129        self.vector_cache_revision.set(None);
130    }
131
132    /// Paginated chunk listing for the web viewer. Filters by exact `state` and
133    /// `origin` when provided; returns a compact projection (content truncated to
134    /// a preview) ordered newest-first. Read-only — never mutates.
135    pub fn list_chunks(
136        &self,
137        state: Option<&str>,
138        origin: Option<&str>,
139        limit: usize,
140        offset: usize,
141    ) -> Result<Vec<Value>> {
142        let mut sql = String::from(
143            "SELECT id, skill_name, seq, origin, state, state_reason, maturity, \
144             confidence, token_count, protected, selected_count, used_count, \
145             used_success_count, substr(content, 1, 280) AS content_preview, \
146             created_at, updated_at, last_used_at \
147             FROM chunks",
148        );
149        let mut clauses: Vec<&str> = Vec::new();
150        if state.is_some() {
151            clauses.push("state = :state");
152        }
153        if origin.is_some() {
154            clauses.push("origin = :origin");
155        }
156        if !clauses.is_empty() {
157            sql.push_str(" WHERE ");
158            sql.push_str(&clauses.join(" AND "));
159        }
160        sql.push_str(" ORDER BY created_at DESC LIMIT :limit OFFSET :offset");
161
162        let mut stmt = self.conn.prepare(&sql)?;
163        let names: Vec<String> = stmt.column_names().into_iter().map(String::from).collect();
164        let mut params: Vec<(&str, &dyn rusqlite::ToSql)> = Vec::new();
165        if let Some(s) = state.as_ref() {
166            params.push((":state", s));
167        }
168        if let Some(o) = origin.as_ref() {
169            params.push((":origin", o));
170        }
171        let limit_i = limit as i64;
172        let offset_i = offset as i64;
173        params.push((":limit", &limit_i));
174        params.push((":offset", &offset_i));
175
176        let rows = stmt.query_map(params.as_slice(), |r| row_to_json_with_names(r, &names))?;
177        let mut out = Vec::new();
178        for row in rows {
179            out.push(row?);
180        }
181        Ok(out)
182    }
183
184    pub fn get_chunk(&self, id: &str) -> Result<Option<Value>> {
185        let mut stmt = self
186            .conn
187            .prepare_cached("SELECT * FROM chunks WHERE id=?")?;
188        let row = stmt.query_row([id], row_to_json);
189        match row {
190            Ok(v) => Ok(Some(v)),
191            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
192            Err(e) => Err(e.into()),
193        }
194    }
195
196    pub fn update_chunk_state(
197        &self,
198        id: &str,
199        state: &str,
200        reason: Option<&str>,
201        now: &str,
202    ) -> Result<()> {
203        self.conn.execute(
204            "UPDATE chunks SET state=?, state_reason=?, state_updated_at=?, updated_at=? WHERE id=?",
205            params![state, reason, now, now, id],
206        )?;
207        Ok(())
208    }
209
210    pub fn update_chunk_confidence(
211        &self,
212        id: &str,
213        conf: f64,
214        reason: Option<&str>,
215        now: &str,
216    ) -> Result<()> {
217        self.conn.execute(
218            "UPDATE chunks
219             SET confidence=?, confidence_base=?, confidence_reason=?, updated_at=?
220             WHERE id=?",
221            params![conf, conf, reason, now, id],
222        )?;
223        Ok(())
224    }
225
226    pub fn update_chunk_last_used(&self, id: &str, now: &str) -> Result<()> {
227        self.conn.execute(
228            "UPDATE chunks SET last_used_at=?, updated_at=? WHERE id=?",
229            params![now, now, id],
230        )?;
231        Ok(())
232    }
233
234    pub fn get_chunk_by_hash(&self, hash: &str) -> Result<Option<Value>> {
235        let row = self.conn.query_row(
236            "SELECT * FROM chunks WHERE content_hash=? LIMIT 1",
237            [hash],
238            row_to_json,
239        );
240        match row {
241            Ok(v) => Ok(Some(v)),
242            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
243            Err(e) => Err(e.into()),
244        }
245    }
246
247    // ------------------------------------------------------------------
248    // Vector search (pure-Rust cosine similarity, replaces sqlite-vec)
249    // ------------------------------------------------------------------
250
251    pub fn search_vec_content(&self, query: &[f32], limit: usize) -> Result<Vec<(String, f32)>> {
252        self.search_vec(&self.vec_content_cache, "vec_content", query, limit)
253    }
254
255    pub fn search_vec_trigger(&self, query: &[f32], limit: usize) -> Result<Vec<(String, f32)>> {
256        self.search_vec(&self.vec_trigger_cache, "vec_trigger", query, limit)
257    }
258
259    fn search_vec(
260        &self,
261        cache_cell: &VectorCache,
262        table: &str,
263        query: &[f32],
264        limit: usize,
265    ) -> Result<Vec<(String, f32)>> {
266        if limit == 0 {
267            return Ok(Vec::new());
268        }
269        self.refresh_vector_caches_if_changed()?;
270
271        // Populate cache on first access after open or invalidation.
272        // Stored vectors are L2-normalised here so the search inner loop can use
273        // a plain dot product instead of recomputing norms on every comparison.
274        if cache_cell.borrow().is_none() {
275            let sql = format!("SELECT chunk_id, embedding FROM {table}");
276            let mut stmt = self.conn.prepare(&sql)?;
277            let raw: Vec<(String, Vec<u8>)> = stmt
278                .query_map([], |r| {
279                    Ok((r.get::<_, String>(0)?, r.get::<_, Vec<u8>>(1)?))
280                })?
281                .collect::<rusqlite::Result<Vec<_>>>()?;
282            let mut entries: Vec<(String, Vec<f32>)> = Vec::with_capacity(raw.len());
283            for (id, blob) in raw {
284                // Fail-closed: a persisted embedding must be a whole number of
285                // f32 values (4 bytes each). A structurally corrupt blob aborts
286                // the load rather than silently yielding a truncated vector.
287                if blob.is_empty() || blob.len() % 4 != 0 {
288                    return Err(crate::errors::InnateError::Other(format!(
289                        "corrupt embedding for chunk {id} in {table}: {} bytes (not a non-zero multiple of 4)",
290                        blob.len()
291                    )));
292                }
293                let mut v = unpack_embedding(&blob);
294                l2_normalize(&mut v);
295                entries.push((id, v));
296            }
297            *cache_cell.borrow_mut() = Some(entries);
298        }
299
300        let cache = cache_cell.borrow();
301        let entries = cache.as_ref().unwrap();
302
303        // Normalise the query once; cached vectors are already unit-length, so
304        // cosine similarity reduces to a dot product over each entry.
305        let mut q = query.to_vec();
306        l2_normalize(&mut q);
307
308        // Score by (index, similarity) without cloning ids; partial-sort the top
309        // `limit` to the front (O(N) select), then clone ids for the winners only.
310        // Only score vectors whose dimension matches the query. A mismatch means
311        // a stale embed_version vector or a (4-byte-aligned) corruption — either
312        // way it must not contribute a truncated/garbage dot product.
313        let mut scored: Vec<(usize, f32)> = entries
314            .iter()
315            .enumerate()
316            .filter(|(_, (_, v))| v.len() == q.len())
317            .map(|(i, (_, v))| (i, dot_product(&q, v)))
318            .collect();
319        if scored.len() > limit {
320            scored.select_nth_unstable_by(limit - 1, |a, b| {
321                b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)
322            });
323            scored.truncate(limit);
324        }
325        scored.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
326        Ok(scored
327            .into_iter()
328            .map(|(i, sim)| (entries[i].0.clone(), sim))
329            .collect())
330    }
331
332    /// Lexical/BM25 retrieval channel (hybrid 检索的词法一路). Builds a safe FTS5
333    /// MATCH from the query's alphanumeric tokens and returns up to `limit`
334    /// non-archived, non-spark chunks ranked by BM25, with each score normalised
335    /// to `(0,1]` (best match → 1.0) so it can fuse alongside cosine sims.
336    /// Returns empty when the query has no usable tokens (degrades to vector-only).
337    pub fn search_lexical(&self, query: &str, limit: usize) -> Result<Vec<(String, f32)>> {
338        if limit == 0 {
339            return Ok(Vec::new());
340        }
341        let Some(match_expr) = fts5_match_query(query) else {
342            return Ok(Vec::new());
343        };
344        let mut stmt = self.conn.prepare_cached(
345            "SELECT chunks_fts.id, bm25(chunks_fts) AS score
346             FROM chunks_fts
347             JOIN chunks c ON c.id = chunks_fts.id
348             WHERE chunks_fts MATCH ?1
349               AND c.state != 'archived' AND c.origin != 'spark'
350             ORDER BY score ASC
351             LIMIT ?2",
352        )?;
353        let rows = stmt.query_map(params![match_expr, limit as i64], |r| {
354            Ok((r.get::<_, String>(0)?, r.get::<_, f64>(1)?))
355        })?;
356        // FTS5 bm25(): more negative = more relevant. Convert to positive
357        // relevance and normalise by the best in this result set (scale-stable
358        // across queries; the top lexical hit always maps to 1.0).
359        let raw: Vec<(String, f64)> = rows.collect::<rusqlite::Result<Vec<_>>>()?;
360        let best_rel = raw.iter().map(|(_, s)| -s).fold(f64::MIN, f64::max);
361        let out = raw
362            .into_iter()
363            .enumerate()
364            .map(|(i, (id, score))| {
365                let sim = if best_rel > 0.0 {
366                    (-score / best_rel).clamp(0.0, 1.0) as f32
367                } else {
368                    // Degenerate (non-positive relevances): fall back to rank decay.
369                    ((limit - i) as f32) / (limit as f32)
370                };
371                (id, sim)
372            })
373            .collect();
374        Ok(out)
375    }
376
377    fn refresh_vector_caches_if_changed(&self) -> Result<()> {
378        let current = self
379            .get_meta("vector_revision")?
380            .and_then(|value| value.parse::<i64>().ok())
381            .unwrap_or(0);
382        let previous = self.vector_cache_revision.replace(Some(current));
383        if previous.is_some_and(|revision| revision != current) {
384            *self.vec_content_cache.borrow_mut() = None;
385            *self.vec_trigger_cache.borrow_mut() = None;
386        }
387        Ok(())
388    }
389
390    /// Fetch multiple chunks by id in one query; returns a map of id → chunk JSON.
391    pub fn get_chunks_by_ids(&self, ids: &[&str]) -> Result<HashMap<String, Value>> {
392        if ids.is_empty() {
393            return Ok(HashMap::new());
394        }
395        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
396        let sql = format!("SELECT * FROM chunks WHERE id IN ({placeholders})");
397        let mut stmt = self.conn.prepare(&sql)?;
398        let names: Vec<String> = stmt.column_names().iter().map(|s| s.to_string()).collect();
399        let rows = stmt.query_map(rusqlite::params_from_iter(ids.iter()), |r| {
400            row_to_json_with_names(r, &names)
401        })?;
402        let mut map = HashMap::with_capacity(ids.len());
403        for row in rows {
404            let row = row?;
405            if let Some(id) = row.get("id").and_then(Value::as_str) {
406                map.insert(id.to_string(), row);
407            }
408        }
409        Ok(map)
410    }
411
412    // ------------------------------------------------------------------
413    // Invalidated hashes
414    // ------------------------------------------------------------------
415
416    pub fn is_hash_invalidated(&self, hash: &str) -> Result<bool> {
417        let count: i64 = self.conn.query_row(
418            "SELECT count(*) FROM invalidated_hashes WHERE content_hash=?",
419            [hash],
420            |r| r.get(0),
421        )?;
422        Ok(count > 0)
423    }
424
425    pub fn insert_invalidated_hash(
426        &self,
427        hash: &str,
428        reason: Option<&str>,
429        ts: &str,
430    ) -> Result<()> {
431        self.conn.execute(
432            "INSERT OR IGNORE INTO invalidated_hashes(content_hash, reason, ts) VALUES (?,?,?)",
433            params![hash, reason, ts],
434        )?;
435        Ok(())
436    }
437
438    // ------------------------------------------------------------------
439    // Usage trace
440    // ------------------------------------------------------------------
441
442    // Chunk queries (aggregate / curate helpers)
443    // ------------------------------------------------------------------
444
445    pub(crate) fn query_chunks(&self, sql: &str) -> Result<Vec<Value>> {
446        self.query_json(sql, params![])
447    }
448
449    pub(crate) fn query_chunks_params<P: rusqlite::Params>(
450        &self,
451        sql: &str,
452        p: P,
453    ) -> Result<Vec<Value>> {
454        self.query_json(sql, p)
455    }
456
457    // ------------------------------------------------------------------
458    // Deps
459    // ------------------------------------------------------------------
460
461    pub fn get_deps(&self, chunk_id: &str) -> Result<Vec<DepEdge>> {
462        let mut stmt = self
463            .conn
464            .prepare_cached("SELECT dst, kind, dst_lib FROM deps WHERE src=?")?;
465        let rows = stmt.query_map([chunk_id], |r| {
466            Ok((
467                r.get::<_, String>(0)?,
468                r.get::<_, String>(1)?,
469                r.get::<_, Option<String>>(2)?,
470            ))
471        })?;
472        Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
473    }
474
475    /// Batch variant of `get_deps`: fetch outgoing edges for many sources in one
476    /// query. Returns `src` → `[(dst, kind, dst_lib)]`. Sources with no edges are
477    /// simply absent from the map.
478    pub fn get_deps_batch(&self, srcs: &[&str]) -> Result<HashMap<String, Vec<DepEdge>>> {
479        if srcs.is_empty() {
480            return Ok(HashMap::new());
481        }
482        let placeholders = srcs.iter().map(|_| "?").collect::<Vec<_>>().join(",");
483        let sql = format!("SELECT src, dst, kind, dst_lib FROM deps WHERE src IN ({placeholders})");
484        let mut stmt = self.conn.prepare(&sql)?;
485        let rows = stmt.query_map(rusqlite::params_from_iter(srcs.iter()), |r| {
486            Ok((
487                r.get::<_, String>(0)?,
488                r.get::<_, String>(1)?,
489                r.get::<_, String>(2)?,
490                r.get::<_, Option<String>>(3)?,
491            ))
492        })?;
493        let mut map: HashMap<String, Vec<(String, String, Option<String>)>> = HashMap::new();
494        for row in rows {
495            let (src, dst, kind, lib) = row?;
496            map.entry(src).or_default().push((dst, kind, lib));
497        }
498        Ok(map)
499    }
500
501    pub fn get_reverse_deps(&self, chunk_id: &str) -> Result<Vec<String>> {
502        let mut stmt = self
503            .conn
504            .prepare_cached("SELECT src FROM deps WHERE dst=?")?;
505        let rows = stmt.query_map([chunk_id], |r| r.get::<_, String>(0))?;
506        Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
507    }
508
509    pub fn insert_dep(
510        &self,
511        src: &str,
512        dst: &str,
513        kind: &str,
514        dst_lib: Option<&str>,
515    ) -> Result<()> {
516        self.conn.execute(
517            "INSERT OR IGNORE INTO deps(src,dst,kind,dst_lib) VALUES (?,?,?,?)",
518            params![src, dst, kind, dst_lib],
519        )?;
520        Ok(())
521    }
522
523    // ------------------------------------------------------------------
524    // Chunk success traces (aggregate fact table)
525    // ------------------------------------------------------------------
526
527    pub fn upsert_chunk_success_trace(
528        &self,
529        chunk_id: &str,
530        trace_id: &str,
531        ts: &str,
532    ) -> Result<()> {
533        self.conn.execute(
534            "INSERT OR IGNORE INTO chunk_success_traces(chunk_id, trace_id, ts) VALUES (?,?,?)",
535            params![chunk_id, trace_id, ts],
536        )?;
537        Ok(())
538    }
539
540    // ------------------------------------------------------------------
541}
542
543/// Build a safe FTS5 MATCH expression from free text. Splits on non-alphanumeric
544/// boundaries, lowercases, drops 1-char and a few high-frequency tokens, quotes
545/// each remaining token as a phrase (so no FTS5 operator can be injected), and
546/// OR-joins them. BM25's IDF naturally downweights common terms, so aggressive
547/// stop-word pruning is unnecessary. Returns `None` when nothing usable remains.
548pub(crate) fn fts5_match_query(query: &str) -> Option<String> {
549    // Minimal stop set — only the highest-frequency function words. Kept small on
550    // purpose: BM25 idf handles the rest, and over-pruning loses recall.
551    const STOP: &[&str] = &[
552        "the", "a", "an", "to", "of", "in", "on", "for", "and", "or", "is", "do", "i",
553    ];
554    let mut seen = std::collections::HashSet::new();
555    let tokens: Vec<String> = query
556        .split(|c: char| !c.is_alphanumeric())
557        .filter(|t| t.len() >= 2)
558        .map(|t| t.to_lowercase())
559        .filter(|t| !STOP.contains(&t.as_str()))
560        .filter(|t| seen.insert(t.clone()))
561        .take(32)
562        .map(|t| format!("\"{t}\""))
563        .collect();
564    if tokens.is_empty() {
565        None
566    } else {
567        Some(tokens.join(" OR "))
568    }
569}