Skip to main content

innate_core/storage/
chunks.rs

1use super::*;
2
3impl Storage {
4    pub fn insert_chunk(&self, c: &ChunkRow) -> Result<()> {
5        self.conn.execute(
6            "INSERT INTO chunks (
7                id, skill_name, seq, content, trigger_desc, anti_trigger_desc,
8                content_hash, token_count, origin, source, maturity, related_ids,
9                protected, state, state_reason, state_updated_at,
10                confidence, confidence_base, confidence_reason, version, distilled_from,
11                distill_provider, distill_model, distill_prompt_version, parent_id,
12                selected_count, used_count, used_success_count,
13                success_trace_ids_count, last_success_at, last_agg_ts,
14                embed_version, created_at, updated_at, last_used_at
15            ) VALUES (
16                ?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,
17                ?13,?14,?15,?16,?17,?18,?19,?20,?21,?22,?23,?24,?25,
18                ?26,?27,?28,?29,?30,?31,?32,?33,?34,?35
19            )",
20            params![
21                c.id,
22                c.skill_name,
23                c.seq,
24                c.content,
25                c.trigger_desc,
26                c.anti_trigger_desc,
27                c.content_hash,
28                c.token_count,
29                c.origin,
30                c.source,
31                c.maturity,
32                c.related_ids,
33                c.protected,
34                c.state,
35                c.state_reason,
36                c.state_updated_at,
37                c.confidence,
38                c.confidence,
39                c.confidence_reason,
40                c.version,
41                c.distilled_from,
42                c.distill_provider,
43                c.distill_model,
44                c.distill_prompt_version,
45                c.parent_id,
46                c.selected_count,
47                c.used_count,
48                c.used_success_count,
49                c.success_trace_ids_count,
50                c.last_success_at,
51                c.last_agg_ts,
52                c.embed_version,
53                c.created_at,
54                c.updated_at,
55                c.last_used_at
56            ],
57        )?;
58        Ok(())
59    }
60
61    pub fn insert_vec_content(&self, chunk_id: &str, emb: &[u8]) -> Result<()> {
62        self.conn.execute(
63            "INSERT OR REPLACE INTO vec_content(chunk_id, embedding) VALUES (?,?)",
64            params![chunk_id, emb],
65        )?;
66        self.note_vector_write(&self.vec_content_cache, chunk_id, emb)
67    }
68
69    pub fn insert_vec_trigger(&self, chunk_id: &str, emb: &[u8]) -> Result<()> {
70        self.conn.execute(
71            "INSERT OR REPLACE INTO vec_trigger(chunk_id, embedding) VALUES (?,?)",
72            params![chunk_id, emb],
73        )?;
74        self.note_vector_write(&self.vec_trigger_cache, chunk_id, emb)
75    }
76
77    /// Record a single vector write: bump the shared revision (so *other*
78    /// processes drop their caches), upsert the one entry into the warm cache
79    /// in place (so *this* long-lived process keeps its cache instead of
80    /// reloading the whole corpus), then re-sync the local revision tracker so
81    /// our own bump does not trip `refresh_vector_caches_if_changed`.
82    ///
83    /// A cold cache (None) is left cold — the next search loads everything,
84    /// including this row. This keeps bulk paths (e.g. full re-embed) O(N) by
85    /// invalidating once up front rather than upserting per write.
86    fn note_vector_write(&self, cache: &VectorCache, chunk_id: &str, emb: &[u8]) -> Result<()> {
87        self.bump_vector_revision()?;
88        if let Some(entries) = cache.borrow_mut().as_mut() {
89            let mut v = unpack_embedding(emb);
90            l2_normalize(&mut v);
91            match entries.iter_mut().find(|(id, _)| id == chunk_id) {
92                Some(slot) => slot.1 = v,
93                None => entries.push((chunk_id.to_string(), v)),
94            }
95        }
96        self.sync_vector_revision()
97    }
98
99    /// Monotonically advance `meta.vector_revision`. Any vector write must call
100    /// this so that other processes detect the change and drop their in-memory
101    /// caches on the next search (see `refresh_vector_caches_if_changed`).
102    fn bump_vector_revision(&self) -> Result<()> {
103        self.conn.execute(
104            "INSERT INTO meta(key, value) VALUES ('vector_revision', '1')
105             ON CONFLICT(key) DO UPDATE SET value=CAST(value AS INTEGER)+1",
106            [],
107        )?;
108        Ok(())
109    }
110
111    /// Align the local revision tracker with the persisted value so an in-place
112    /// cache update performed by this process is not discarded on the next search.
113    fn sync_vector_revision(&self) -> Result<()> {
114        let current = self
115            .get_meta("vector_revision")?
116            .and_then(|v| v.parse::<i64>().ok())
117            .unwrap_or(0);
118        self.vector_cache_revision.set(Some(current));
119        Ok(())
120    }
121
122    /// Drop both in-memory vector caches and reset the revision tracker. Used on
123    /// transaction rollback (in-place upserts may not have persisted) and before
124    /// bulk re-embed loops (to avoid O(N²) in-place upserts on a warm cache).
125    pub(crate) fn invalidate_vector_caches(&self) {
126        *self.vec_content_cache.borrow_mut() = None;
127        *self.vec_trigger_cache.borrow_mut() = None;
128        self.vector_cache_revision.set(None);
129    }
130
131    /// Paginated chunk listing for the web viewer. Filters by exact `state` and
132    /// `origin` when provided; returns a compact projection (content truncated to
133    /// a preview) ordered newest-first. Read-only — never mutates.
134    pub fn list_chunks(
135        &self,
136        state: Option<&str>,
137        origin: Option<&str>,
138        limit: usize,
139        offset: usize,
140    ) -> Result<Vec<Value>> {
141        let mut sql = String::from(
142            "SELECT id, skill_name, seq, origin, state, state_reason, maturity, \
143             confidence, token_count, protected, selected_count, used_count, \
144             used_success_count, substr(content, 1, 280) AS content_preview, \
145             created_at, updated_at, last_used_at \
146             FROM chunks",
147        );
148        let mut clauses: Vec<&str> = Vec::new();
149        if state.is_some() {
150            clauses.push("state = :state");
151        }
152        if origin.is_some() {
153            clauses.push("origin = :origin");
154        }
155        if !clauses.is_empty() {
156            sql.push_str(" WHERE ");
157            sql.push_str(&clauses.join(" AND "));
158        }
159        sql.push_str(" ORDER BY created_at DESC LIMIT :limit OFFSET :offset");
160
161        let mut stmt = self.conn.prepare(&sql)?;
162        let names: Vec<String> = stmt.column_names().into_iter().map(String::from).collect();
163        let mut params: Vec<(&str, &dyn rusqlite::ToSql)> = Vec::new();
164        if let Some(s) = state.as_ref() {
165            params.push((":state", s));
166        }
167        if let Some(o) = origin.as_ref() {
168            params.push((":origin", o));
169        }
170        let limit_i = limit as i64;
171        let offset_i = offset as i64;
172        params.push((":limit", &limit_i));
173        params.push((":offset", &offset_i));
174
175        let rows = stmt.query_map(params.as_slice(), |r| row_to_json_with_names(r, &names))?;
176        let mut out = Vec::new();
177        for row in rows {
178            out.push(row?);
179        }
180        Ok(out)
181    }
182
183    pub fn get_chunk(&self, id: &str) -> Result<Option<Value>> {
184        let mut stmt = self
185            .conn
186            .prepare_cached("SELECT * FROM chunks WHERE id=?")?;
187        let row = stmt.query_row([id], row_to_json);
188        match row {
189            Ok(v) => Ok(Some(v)),
190            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
191            Err(e) => Err(e.into()),
192        }
193    }
194
195    pub fn update_chunk_state(
196        &self,
197        id: &str,
198        state: &str,
199        reason: Option<&str>,
200        now: &str,
201    ) -> Result<()> {
202        self.conn.execute(
203            "UPDATE chunks SET state=?, state_reason=?, state_updated_at=?, updated_at=? WHERE id=?",
204            params![state, reason, now, now, id],
205        )?;
206        Ok(())
207    }
208
209    pub fn update_chunk_confidence(
210        &self,
211        id: &str,
212        conf: f64,
213        reason: Option<&str>,
214        now: &str,
215    ) -> Result<()> {
216        self.conn.execute(
217            "UPDATE chunks
218             SET confidence=?, confidence_base=?, confidence_reason=?, updated_at=?
219             WHERE id=?",
220            params![conf, conf, reason, now, id],
221        )?;
222        Ok(())
223    }
224
225    pub fn update_chunk_last_used(&self, id: &str, now: &str) -> Result<()> {
226        self.conn.execute(
227            "UPDATE chunks SET last_used_at=?, updated_at=? WHERE id=?",
228            params![now, now, id],
229        )?;
230        Ok(())
231    }
232
233    pub fn get_chunk_by_hash(&self, hash: &str) -> Result<Option<Value>> {
234        let row = self.conn.query_row(
235            "SELECT * FROM chunks WHERE content_hash=? LIMIT 1",
236            [hash],
237            row_to_json,
238        );
239        match row {
240            Ok(v) => Ok(Some(v)),
241            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
242            Err(e) => Err(e.into()),
243        }
244    }
245
246    // ------------------------------------------------------------------
247    // Vector search (pure-Rust cosine similarity, replaces sqlite-vec)
248    // ------------------------------------------------------------------
249
250    pub fn search_vec_content(&self, query: &[f32], limit: usize) -> Result<Vec<(String, f32)>> {
251        self.search_vec(&self.vec_content_cache, "vec_content", query, limit)
252    }
253
254    pub fn search_vec_trigger(&self, query: &[f32], limit: usize) -> Result<Vec<(String, f32)>> {
255        self.search_vec(&self.vec_trigger_cache, "vec_trigger", query, limit)
256    }
257
258    fn search_vec(
259        &self,
260        cache_cell: &VectorCache,
261        table: &str,
262        query: &[f32],
263        limit: usize,
264    ) -> Result<Vec<(String, f32)>> {
265        if limit == 0 {
266            return Ok(Vec::new());
267        }
268        self.refresh_vector_caches_if_changed()?;
269
270        // Populate cache on first access after open or invalidation.
271        // Stored vectors are L2-normalised here so the search inner loop can use
272        // a plain dot product instead of recomputing norms on every comparison.
273        if cache_cell.borrow().is_none() {
274            let sql = format!("SELECT chunk_id, embedding FROM {table}");
275            let mut stmt = self.conn.prepare(&sql)?;
276            let raw: Vec<(String, Vec<u8>)> = stmt
277                .query_map([], |r| {
278                    Ok((r.get::<_, String>(0)?, r.get::<_, Vec<u8>>(1)?))
279                })?
280                .collect::<rusqlite::Result<Vec<_>>>()?;
281            let mut entries: Vec<(String, Vec<f32>)> = Vec::with_capacity(raw.len());
282            for (id, blob) in raw {
283                // Fail-closed: a persisted embedding must be a whole number of
284                // f32 values (4 bytes each). A structurally corrupt blob aborts
285                // the load rather than silently yielding a truncated vector.
286                if blob.is_empty() || blob.len() % 4 != 0 {
287                    return Err(crate::errors::InnateError::Other(format!(
288                        "corrupt embedding for chunk {id} in {table}: {} bytes (not a non-zero multiple of 4)",
289                        blob.len()
290                    )));
291                }
292                let mut v = unpack_embedding(&blob);
293                l2_normalize(&mut v);
294                entries.push((id, v));
295            }
296            *cache_cell.borrow_mut() = Some(entries);
297        }
298
299        let cache = cache_cell.borrow();
300        let entries = cache.as_ref().unwrap();
301
302        // Normalise the query once; cached vectors are already unit-length, so
303        // cosine similarity reduces to a dot product over each entry.
304        let mut q = query.to_vec();
305        l2_normalize(&mut q);
306
307        // Score by (index, similarity) without cloning ids; partial-sort the top
308        // `limit` to the front (O(N) select), then clone ids for the winners only.
309        // Only score vectors whose dimension matches the query. A mismatch means
310        // a stale embed_version vector or a (4-byte-aligned) corruption — either
311        // way it must not contribute a truncated/garbage dot product.
312        let mut scored: Vec<(usize, f32)> = entries
313            .iter()
314            .enumerate()
315            .filter(|(_, (_, v))| v.len() == q.len())
316            .map(|(i, (_, v))| (i, dot_product(&q, v)))
317            .collect();
318        if scored.len() > limit {
319            scored.select_nth_unstable_by(limit - 1, |a, b| {
320                b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)
321            });
322            scored.truncate(limit);
323        }
324        scored.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
325        Ok(scored
326            .into_iter()
327            .map(|(i, sim)| (entries[i].0.clone(), sim))
328            .collect())
329    }
330
331    fn refresh_vector_caches_if_changed(&self) -> Result<()> {
332        let current = self
333            .get_meta("vector_revision")?
334            .and_then(|value| value.parse::<i64>().ok())
335            .unwrap_or(0);
336        let previous = self.vector_cache_revision.replace(Some(current));
337        if previous.is_some_and(|revision| revision != current) {
338            *self.vec_content_cache.borrow_mut() = None;
339            *self.vec_trigger_cache.borrow_mut() = None;
340        }
341        Ok(())
342    }
343
344    /// Fetch multiple chunks by id in one query; returns a map of id → chunk JSON.
345    pub fn get_chunks_by_ids(&self, ids: &[&str]) -> Result<HashMap<String, Value>> {
346        if ids.is_empty() {
347            return Ok(HashMap::new());
348        }
349        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
350        let sql = format!("SELECT * FROM chunks WHERE id IN ({placeholders})");
351        let mut stmt = self.conn.prepare(&sql)?;
352        let names: Vec<String> = stmt.column_names().iter().map(|s| s.to_string()).collect();
353        let rows = stmt.query_map(rusqlite::params_from_iter(ids.iter()), |r| {
354            row_to_json_with_names(r, &names)
355        })?;
356        let mut map = HashMap::with_capacity(ids.len());
357        for row in rows {
358            let row = row?;
359            if let Some(id) = row.get("id").and_then(Value::as_str) {
360                map.insert(id.to_string(), row);
361            }
362        }
363        Ok(map)
364    }
365
366    // ------------------------------------------------------------------
367    // Invalidated hashes
368    // ------------------------------------------------------------------
369
370    pub fn is_hash_invalidated(&self, hash: &str) -> Result<bool> {
371        let count: i64 = self.conn.query_row(
372            "SELECT count(*) FROM invalidated_hashes WHERE content_hash=?",
373            [hash],
374            |r| r.get(0),
375        )?;
376        Ok(count > 0)
377    }
378
379    pub fn insert_invalidated_hash(
380        &self,
381        hash: &str,
382        reason: Option<&str>,
383        ts: &str,
384    ) -> Result<()> {
385        self.conn.execute(
386            "INSERT OR IGNORE INTO invalidated_hashes(content_hash, reason, ts) VALUES (?,?,?)",
387            params![hash, reason, ts],
388        )?;
389        Ok(())
390    }
391
392    // ------------------------------------------------------------------
393    // Usage trace
394    // ------------------------------------------------------------------
395
396    // Chunk queries (aggregate / curate helpers)
397    // ------------------------------------------------------------------
398
399    pub(crate) fn query_chunks(&self, sql: &str) -> Result<Vec<Value>> {
400        self.query_json(sql, params![])
401    }
402
403    pub(crate) fn query_chunks_params<P: rusqlite::Params>(
404        &self,
405        sql: &str,
406        p: P,
407    ) -> Result<Vec<Value>> {
408        self.query_json(sql, p)
409    }
410
411    // ------------------------------------------------------------------
412    // Deps
413    // ------------------------------------------------------------------
414
415    pub fn get_deps(&self, chunk_id: &str) -> Result<Vec<DepEdge>> {
416        let mut stmt = self
417            .conn
418            .prepare_cached("SELECT dst, kind, dst_lib FROM deps WHERE src=?")?;
419        let rows = stmt.query_map([chunk_id], |r| {
420            Ok((
421                r.get::<_, String>(0)?,
422                r.get::<_, String>(1)?,
423                r.get::<_, Option<String>>(2)?,
424            ))
425        })?;
426        Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
427    }
428
429    /// Batch variant of `get_deps`: fetch outgoing edges for many sources in one
430    /// query. Returns `src` → `[(dst, kind, dst_lib)]`. Sources with no edges are
431    /// simply absent from the map.
432    pub fn get_deps_batch(&self, srcs: &[&str]) -> Result<HashMap<String, Vec<DepEdge>>> {
433        if srcs.is_empty() {
434            return Ok(HashMap::new());
435        }
436        let placeholders = srcs.iter().map(|_| "?").collect::<Vec<_>>().join(",");
437        let sql = format!("SELECT src, dst, kind, dst_lib FROM deps WHERE src IN ({placeholders})");
438        let mut stmt = self.conn.prepare(&sql)?;
439        let rows = stmt.query_map(rusqlite::params_from_iter(srcs.iter()), |r| {
440            Ok((
441                r.get::<_, String>(0)?,
442                r.get::<_, String>(1)?,
443                r.get::<_, String>(2)?,
444                r.get::<_, Option<String>>(3)?,
445            ))
446        })?;
447        let mut map: HashMap<String, Vec<(String, String, Option<String>)>> = HashMap::new();
448        for row in rows {
449            let (src, dst, kind, lib) = row?;
450            map.entry(src).or_default().push((dst, kind, lib));
451        }
452        Ok(map)
453    }
454
455    pub fn get_reverse_deps(&self, chunk_id: &str) -> Result<Vec<String>> {
456        let mut stmt = self
457            .conn
458            .prepare_cached("SELECT src FROM deps WHERE dst=?")?;
459        let rows = stmt.query_map([chunk_id], |r| r.get::<_, String>(0))?;
460        Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
461    }
462
463    pub fn insert_dep(
464        &self,
465        src: &str,
466        dst: &str,
467        kind: &str,
468        dst_lib: Option<&str>,
469    ) -> Result<()> {
470        self.conn.execute(
471            "INSERT OR IGNORE INTO deps(src,dst,kind,dst_lib) VALUES (?,?,?,?)",
472            params![src, dst, kind, dst_lib],
473        )?;
474        Ok(())
475    }
476
477    // ------------------------------------------------------------------
478    // Chunk success traces (aggregate fact table)
479    // ------------------------------------------------------------------
480
481    pub fn upsert_chunk_success_trace(
482        &self,
483        chunk_id: &str,
484        trace_id: &str,
485        ts: &str,
486    ) -> Result<()> {
487        self.conn.execute(
488            "INSERT OR IGNORE INTO chunk_success_traces(chunk_id, trace_id, ts) VALUES (?,?,?)",
489            params![chunk_id, trace_id, ts],
490        )?;
491        Ok(())
492    }
493
494    // ------------------------------------------------------------------
495}