Skip to main content

innate_core/storage/
chunks.rs

1use super::*;
2
3impl Storage {
4    pub fn insert_chunk(&self, c: &ChunkRow) -> Result<()> {
5        self.conn.execute(
6            "INSERT INTO chunks (
7                id, skill_name, seq, content, trigger_desc, anti_trigger_desc,
8                content_hash, token_count, origin, source, maturity, related_ids,
9                protected, state, state_reason, state_updated_at,
10                confidence, confidence_base, confidence_reason, version, distilled_from,
11                distill_provider, distill_model, distill_prompt_version, parent_id,
12                selected_count, used_count, used_success_count,
13                success_trace_ids_count, last_success_at, last_agg_ts,
14                embed_version, created_at, updated_at, last_used_at
15            ) VALUES (
16                ?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,
17                ?13,?14,?15,?16,?17,?18,?19,?20,?21,?22,?23,?24,?25,
18                ?26,?27,?28,?29,?30,?31,?32,?33,?34,?35
19            )",
20            params![
21                c.id,
22                c.skill_name,
23                c.seq,
24                c.content,
25                c.trigger_desc,
26                c.anti_trigger_desc,
27                c.content_hash,
28                c.token_count,
29                c.origin,
30                c.source,
31                c.maturity,
32                c.related_ids,
33                c.protected,
34                c.state,
35                c.state_reason,
36                c.state_updated_at,
37                c.confidence,
38                c.confidence,
39                c.confidence_reason,
40                c.version,
41                c.distilled_from,
42                c.distill_provider,
43                c.distill_model,
44                c.distill_prompt_version,
45                c.parent_id,
46                c.selected_count,
47                c.used_count,
48                c.used_success_count,
49                c.success_trace_ids_count,
50                c.last_success_at,
51                c.last_agg_ts,
52                c.embed_version,
53                c.created_at,
54                c.updated_at,
55                c.last_used_at
56            ],
57        )?;
58        Ok(())
59    }
60
61    pub fn insert_vec_content(&self, chunk_id: &str, emb: &[u8]) -> Result<()> {
62        self.conn.execute(
63            "INSERT OR REPLACE INTO vec_content(chunk_id, embedding) VALUES (?,?)",
64            params![chunk_id, emb],
65        )?;
66        self.note_vector_write(&self.vec_content_cache, chunk_id, emb)
67    }
68
69    pub fn insert_vec_trigger(&self, chunk_id: &str, emb: &[u8]) -> Result<()> {
70        self.conn.execute(
71            "INSERT OR REPLACE INTO vec_trigger(chunk_id, embedding) VALUES (?,?)",
72            params![chunk_id, emb],
73        )?;
74        self.note_vector_write(&self.vec_trigger_cache, chunk_id, emb)
75    }
76
77    /// Record a single vector write: bump the shared revision (so *other*
78    /// processes drop their caches), upsert the one entry into the warm cache
79    /// in place (so *this* long-lived process keeps its cache instead of
80    /// reloading the whole corpus), then re-sync the local revision tracker so
81    /// our own bump does not trip `refresh_vector_caches_if_changed`.
82    ///
83    /// A cold cache (None) is left cold — the next search loads everything,
84    /// including this row. This keeps bulk paths (e.g. full re-embed) O(N) by
85    /// invalidating once up front rather than upserting per write.
86    fn note_vector_write(&self, cache: &VectorCache, chunk_id: &str, emb: &[u8]) -> Result<()> {
87        self.bump_vector_revision()?;
88        if let Some(entries) = cache.borrow_mut().as_mut() {
89            let mut v = unpack_embedding(emb);
90            l2_normalize(&mut v);
91            match entries.iter_mut().find(|(id, _)| id == chunk_id) {
92                Some(slot) => slot.1 = v,
93                None => entries.push((chunk_id.to_string(), v)),
94            }
95        }
96        self.sync_vector_revision()
97    }
98
99    /// Monotonically advance `meta.vector_revision`. Any vector write must call
100    /// this so that other processes detect the change and drop their in-memory
101    /// caches on the next search (see `refresh_vector_caches_if_changed`).
102    fn bump_vector_revision(&self) -> Result<()> {
103        self.conn.execute(
104            "INSERT INTO meta(key, value) VALUES ('vector_revision', '1')
105             ON CONFLICT(key) DO UPDATE SET value=CAST(value AS INTEGER)+1",
106            [],
107        )?;
108        Ok(())
109    }
110
111    /// Align the local revision tracker with the persisted value so an in-place
112    /// cache update performed by this process is not discarded on the next search.
113    fn sync_vector_revision(&self) -> Result<()> {
114        let current = self
115            .get_meta("vector_revision")?
116            .and_then(|v| v.parse::<i64>().ok())
117            .unwrap_or(0);
118        self.vector_cache_revision.set(Some(current));
119        Ok(())
120    }
121
122    /// Drop both in-memory vector caches and reset the revision tracker. Used on
123    /// transaction rollback (in-place upserts may not have persisted) and before
124    /// bulk re-embed loops (to avoid O(N²) in-place upserts on a warm cache).
125    pub(crate) fn invalidate_vector_caches(&self) {
126        *self.vec_content_cache.borrow_mut() = None;
127        *self.vec_trigger_cache.borrow_mut() = None;
128        self.vector_cache_revision.set(None);
129    }
130
131    /// Paginated chunk listing for the web viewer. Filters by exact `state` and
132    /// `origin` when provided; returns a compact projection (content truncated to
133    /// a preview) ordered newest-first. Read-only — never mutates.
134    pub fn list_chunks(
135        &self,
136        state: Option<&str>,
137        origin: Option<&str>,
138        limit: usize,
139        offset: usize,
140    ) -> Result<Vec<Value>> {
141        let mut sql = String::from(
142            "SELECT id, skill_name, seq, origin, state, state_reason, maturity, \
143             confidence, token_count, protected, selected_count, used_count, \
144             used_success_count, substr(content, 1, 280) AS content_preview, \
145             created_at, updated_at, last_used_at \
146             FROM chunks",
147        );
148        let mut clauses: Vec<&str> = Vec::new();
149        if state.is_some() {
150            clauses.push("state = :state");
151        }
152        if origin.is_some() {
153            clauses.push("origin = :origin");
154        }
155        if !clauses.is_empty() {
156            sql.push_str(" WHERE ");
157            sql.push_str(&clauses.join(" AND "));
158        }
159        sql.push_str(" ORDER BY created_at DESC LIMIT :limit OFFSET :offset");
160
161        let mut stmt = self.conn.prepare(&sql)?;
162        let names: Vec<String> = stmt
163            .column_names()
164            .into_iter()
165            .map(String::from)
166            .collect();
167        let mut params: Vec<(&str, &dyn rusqlite::ToSql)> = Vec::new();
168        if let Some(s) = state.as_ref() {
169            params.push((":state", s));
170        }
171        if let Some(o) = origin.as_ref() {
172            params.push((":origin", o));
173        }
174        let limit_i = limit as i64;
175        let offset_i = offset as i64;
176        params.push((":limit", &limit_i));
177        params.push((":offset", &offset_i));
178
179        let rows = stmt.query_map(params.as_slice(), |r| row_to_json_with_names(r, &names))?;
180        let mut out = Vec::new();
181        for row in rows {
182            out.push(row?);
183        }
184        Ok(out)
185    }
186
187    pub fn get_chunk(&self, id: &str) -> Result<Option<Value>> {
188        let mut stmt = self.conn.prepare_cached("SELECT * FROM chunks WHERE id=?")?;
189        let row = stmt.query_row([id], row_to_json);
190        match row {
191            Ok(v) => Ok(Some(v)),
192            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
193            Err(e) => Err(e.into()),
194        }
195    }
196
197    pub fn update_chunk_state(
198        &self,
199        id: &str,
200        state: &str,
201        reason: Option<&str>,
202        now: &str,
203    ) -> Result<()> {
204        self.conn.execute(
205            "UPDATE chunks SET state=?, state_reason=?, state_updated_at=?, updated_at=? WHERE id=?",
206            params![state, reason, now, now, id],
207        )?;
208        Ok(())
209    }
210
211    pub fn update_chunk_confidence(
212        &self,
213        id: &str,
214        conf: f64,
215        reason: Option<&str>,
216        now: &str,
217    ) -> Result<()> {
218        self.conn.execute(
219            "UPDATE chunks
220             SET confidence=?, confidence_base=?, confidence_reason=?, updated_at=?
221             WHERE id=?",
222            params![conf, conf, reason, now, id],
223        )?;
224        Ok(())
225    }
226
227    pub fn update_chunk_last_used(&self, id: &str, now: &str) -> Result<()> {
228        self.conn.execute(
229            "UPDATE chunks SET last_used_at=?, updated_at=? WHERE id=?",
230            params![now, now, id],
231        )?;
232        Ok(())
233    }
234
235    pub fn get_chunk_by_hash(&self, hash: &str) -> Result<Option<Value>> {
236        let row = self.conn.query_row(
237            "SELECT * FROM chunks WHERE content_hash=? LIMIT 1",
238            [hash],
239            row_to_json,
240        );
241        match row {
242            Ok(v) => Ok(Some(v)),
243            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
244            Err(e) => Err(e.into()),
245        }
246    }
247
248    // ------------------------------------------------------------------
249    // Vector search (pure-Rust cosine similarity, replaces sqlite-vec)
250    // ------------------------------------------------------------------
251
252    pub fn search_vec_content(&self, query: &[f32], limit: usize) -> Result<Vec<(String, f32)>> {
253        self.search_vec(&self.vec_content_cache, "vec_content", query, limit)
254    }
255
256    pub fn search_vec_trigger(&self, query: &[f32], limit: usize) -> Result<Vec<(String, f32)>> {
257        self.search_vec(&self.vec_trigger_cache, "vec_trigger", query, limit)
258    }
259
260    fn search_vec(
261        &self,
262        cache_cell: &VectorCache,
263        table: &str,
264        query: &[f32],
265        limit: usize,
266    ) -> Result<Vec<(String, f32)>> {
267        if limit == 0 {
268            return Ok(Vec::new());
269        }
270        self.refresh_vector_caches_if_changed()?;
271
272        // Populate cache on first access after open or invalidation.
273        // Stored vectors are L2-normalised here so the search inner loop can use
274        // a plain dot product instead of recomputing norms on every comparison.
275        if cache_cell.borrow().is_none() {
276            let sql = format!("SELECT chunk_id, embedding FROM {table}");
277            let mut stmt = self.conn.prepare(&sql)?;
278            let raw: Vec<(String, Vec<u8>)> = stmt
279                .query_map([], |r| {
280                    Ok((r.get::<_, String>(0)?, r.get::<_, Vec<u8>>(1)?))
281                })?
282                .collect::<rusqlite::Result<Vec<_>>>()?;
283            let mut entries: Vec<(String, Vec<f32>)> = Vec::with_capacity(raw.len());
284            for (id, blob) in raw {
285                // Fail-closed: a persisted embedding must be a whole number of
286                // f32 values (4 bytes each). A structurally corrupt blob aborts
287                // the load rather than silently yielding a truncated vector.
288                if blob.is_empty() || blob.len() % 4 != 0 {
289                    return Err(crate::errors::InnateError::Other(format!(
290                        "corrupt embedding for chunk {id} in {table}: {} bytes (not a non-zero multiple of 4)",
291                        blob.len()
292                    )));
293                }
294                let mut v = unpack_embedding(&blob);
295                l2_normalize(&mut v);
296                entries.push((id, v));
297            }
298            *cache_cell.borrow_mut() = Some(entries);
299        }
300
301        let cache = cache_cell.borrow();
302        let entries = cache.as_ref().unwrap();
303
304        // Normalise the query once; cached vectors are already unit-length, so
305        // cosine similarity reduces to a dot product over each entry.
306        let mut q = query.to_vec();
307        l2_normalize(&mut q);
308
309        // Score by (index, similarity) without cloning ids; partial-sort the top
310        // `limit` to the front (O(N) select), then clone ids for the winners only.
311        // Only score vectors whose dimension matches the query. A mismatch means
312        // a stale embed_version vector or a (4-byte-aligned) corruption — either
313        // way it must not contribute a truncated/garbage dot product.
314        let mut scored: Vec<(usize, f32)> = entries
315            .iter()
316            .enumerate()
317            .filter(|(_, (_, v))| v.len() == q.len())
318            .map(|(i, (_, v))| (i, dot_product(&q, v)))
319            .collect();
320        if scored.len() > limit {
321            scored.select_nth_unstable_by(limit - 1, |a, b| {
322                b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)
323            });
324            scored.truncate(limit);
325        }
326        scored.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
327        Ok(scored
328            .into_iter()
329            .map(|(i, sim)| (entries[i].0.clone(), sim))
330            .collect())
331    }
332
333    fn refresh_vector_caches_if_changed(&self) -> Result<()> {
334        let current = self
335            .get_meta("vector_revision")?
336            .and_then(|value| value.parse::<i64>().ok())
337            .unwrap_or(0);
338        let previous = self.vector_cache_revision.replace(Some(current));
339        if previous.is_some_and(|revision| revision != current) {
340            *self.vec_content_cache.borrow_mut() = None;
341            *self.vec_trigger_cache.borrow_mut() = None;
342        }
343        Ok(())
344    }
345
346    /// Fetch multiple chunks by id in one query; returns a map of id → chunk JSON.
347    pub fn get_chunks_by_ids(&self, ids: &[&str]) -> Result<HashMap<String, Value>> {
348        if ids.is_empty() {
349            return Ok(HashMap::new());
350        }
351        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
352        let sql = format!("SELECT * FROM chunks WHERE id IN ({placeholders})");
353        let mut stmt = self.conn.prepare(&sql)?;
354        let names: Vec<String> = stmt.column_names().iter().map(|s| s.to_string()).collect();
355        let rows = stmt.query_map(rusqlite::params_from_iter(ids.iter()), |r| {
356            row_to_json_with_names(r, &names)
357        })?;
358        let mut map = HashMap::with_capacity(ids.len());
359        for row in rows {
360            let row = row?;
361            if let Some(id) = row.get("id").and_then(Value::as_str) {
362                map.insert(id.to_string(), row);
363            }
364        }
365        Ok(map)
366    }
367
368    // ------------------------------------------------------------------
369    // Invalidated hashes
370    // ------------------------------------------------------------------
371
372    pub fn is_hash_invalidated(&self, hash: &str) -> Result<bool> {
373        let count: i64 = self.conn.query_row(
374            "SELECT count(*) FROM invalidated_hashes WHERE content_hash=?",
375            [hash],
376            |r| r.get(0),
377        )?;
378        Ok(count > 0)
379    }
380
381    pub fn insert_invalidated_hash(
382        &self,
383        hash: &str,
384        reason: Option<&str>,
385        ts: &str,
386    ) -> Result<()> {
387        self.conn.execute(
388            "INSERT OR IGNORE INTO invalidated_hashes(content_hash, reason, ts) VALUES (?,?,?)",
389            params![hash, reason, ts],
390        )?;
391        Ok(())
392    }
393
394    // ------------------------------------------------------------------
395    // Usage trace
396    // ------------------------------------------------------------------
397
398    // Chunk queries (aggregate / curate helpers)
399    // ------------------------------------------------------------------
400
401    pub(crate) fn query_chunks(&self, sql: &str) -> Result<Vec<Value>> {
402        self.query_json(sql, params![])
403    }
404
405    pub(crate) fn query_chunks_params<P: rusqlite::Params>(
406        &self,
407        sql: &str,
408        p: P,
409    ) -> Result<Vec<Value>> {
410        self.query_json(sql, p)
411    }
412
413    // ------------------------------------------------------------------
414    // Deps
415    // ------------------------------------------------------------------
416
417    pub fn get_deps(&self, chunk_id: &str) -> Result<Vec<DepEdge>> {
418        let mut stmt = self
419            .conn
420            .prepare_cached("SELECT dst, kind, dst_lib FROM deps WHERE src=?")?;
421        let rows = stmt.query_map([chunk_id], |r| {
422            Ok((
423                r.get::<_, String>(0)?,
424                r.get::<_, String>(1)?,
425                r.get::<_, Option<String>>(2)?,
426            ))
427        })?;
428        Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
429    }
430
431    /// Batch variant of `get_deps`: fetch outgoing edges for many sources in one
432    /// query. Returns `src` → `[(dst, kind, dst_lib)]`. Sources with no edges are
433    /// simply absent from the map.
434    pub fn get_deps_batch(&self, srcs: &[&str]) -> Result<HashMap<String, Vec<DepEdge>>> {
435        if srcs.is_empty() {
436            return Ok(HashMap::new());
437        }
438        let placeholders = srcs.iter().map(|_| "?").collect::<Vec<_>>().join(",");
439        let sql = format!("SELECT src, dst, kind, dst_lib FROM deps WHERE src IN ({placeholders})");
440        let mut stmt = self.conn.prepare(&sql)?;
441        let rows = stmt.query_map(rusqlite::params_from_iter(srcs.iter()), |r| {
442            Ok((
443                r.get::<_, String>(0)?,
444                r.get::<_, String>(1)?,
445                r.get::<_, String>(2)?,
446                r.get::<_, Option<String>>(3)?,
447            ))
448        })?;
449        let mut map: HashMap<String, Vec<(String, String, Option<String>)>> = HashMap::new();
450        for row in rows {
451            let (src, dst, kind, lib) = row?;
452            map.entry(src).or_default().push((dst, kind, lib));
453        }
454        Ok(map)
455    }
456
457    pub fn get_reverse_deps(&self, chunk_id: &str) -> Result<Vec<String>> {
458        let mut stmt = self
459            .conn
460            .prepare_cached("SELECT src FROM deps WHERE dst=?")?;
461        let rows = stmt.query_map([chunk_id], |r| r.get::<_, String>(0))?;
462        Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
463    }
464
465    pub fn insert_dep(
466        &self,
467        src: &str,
468        dst: &str,
469        kind: &str,
470        dst_lib: Option<&str>,
471    ) -> Result<()> {
472        self.conn.execute(
473            "INSERT OR IGNORE INTO deps(src,dst,kind,dst_lib) VALUES (?,?,?,?)",
474            params![src, dst, kind, dst_lib],
475        )?;
476        Ok(())
477    }
478
479    // ------------------------------------------------------------------
480    // Chunk success traces (aggregate fact table)
481    // ------------------------------------------------------------------
482
483    pub fn upsert_chunk_success_trace(
484        &self,
485        chunk_id: &str,
486        trace_id: &str,
487        ts: &str,
488    ) -> Result<()> {
489        self.conn.execute(
490            "INSERT OR IGNORE INTO chunk_success_traces(chunk_id, trace_id, ts) VALUES (?,?,?)",
491            params![chunk_id, trace_id, ts],
492        )?;
493        Ok(())
494    }
495
496    // ------------------------------------------------------------------
497}