Skip to main content

innate_core/storage/
chunks.rs

1use super::*;
2
3impl Storage {
4    pub fn insert_chunk(&self, c: &ChunkRow) -> Result<()> {
5        self.conn.execute(
6            "INSERT INTO chunks (
7                id, skill_name, seq, content, trigger_desc, anti_trigger_desc,
8                content_hash, token_count, origin, source, maturity, related_ids,
9                protected, state, state_reason, state_updated_at,
10                confidence, confidence_base, confidence_reason, version, distilled_from,
11                distill_provider, distill_model, distill_prompt_version, parent_id,
12                selected_count, used_count, used_success_count,
13                success_trace_ids_count, last_success_at, last_agg_ts,
14                embed_version, created_at, updated_at, last_used_at
15            ) VALUES (
16                ?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,
17                ?13,?14,?15,?16,?17,?18,?19,?20,?21,?22,?23,?24,?25,
18                ?26,?27,?28,?29,?30,?31,?32,?33,?34,?35
19            )",
20            params![
21                c.id,
22                c.skill_name,
23                c.seq,
24                c.content,
25                c.trigger_desc,
26                c.anti_trigger_desc,
27                c.content_hash,
28                c.token_count,
29                c.origin,
30                c.source,
31                c.maturity,
32                c.related_ids,
33                c.protected,
34                c.state,
35                c.state_reason,
36                c.state_updated_at,
37                c.confidence,
38                c.confidence,
39                c.confidence_reason,
40                c.version,
41                c.distilled_from,
42                c.distill_provider,
43                c.distill_model,
44                c.distill_prompt_version,
45                c.parent_id,
46                c.selected_count,
47                c.used_count,
48                c.used_success_count,
49                c.success_trace_ids_count,
50                c.last_success_at,
51                c.last_agg_ts,
52                c.embed_version,
53                c.created_at,
54                c.updated_at,
55                c.last_used_at
56            ],
57        )?;
58        Ok(())
59    }
60
61    pub fn insert_vec_content(&self, chunk_id: &str, emb: &[u8]) -> Result<()> {
62        self.conn.execute(
63            "INSERT OR REPLACE INTO vec_content(chunk_id, embedding) VALUES (?,?)",
64            params![chunk_id, emb],
65        )?;
66        self.note_vector_write(&self.vec_content_cache, chunk_id, emb)
67    }
68
69    pub fn insert_vec_trigger(&self, chunk_id: &str, emb: &[u8]) -> Result<()> {
70        self.conn.execute(
71            "INSERT OR REPLACE INTO vec_trigger(chunk_id, embedding) VALUES (?,?)",
72            params![chunk_id, emb],
73        )?;
74        self.note_vector_write(&self.vec_trigger_cache, chunk_id, emb)
75    }
76
77    /// Record a single vector write: bump the shared revision (so *other*
78    /// processes drop their caches), upsert the one entry into the warm cache
79    /// in place (so *this* long-lived process keeps its cache instead of
80    /// reloading the whole corpus), then re-sync the local revision tracker so
81    /// our own bump does not trip `refresh_vector_caches_if_changed`.
82    ///
83    /// A cold cache (None) is left cold — the next search loads everything,
84    /// including this row. This keeps bulk paths (e.g. full re-embed) O(N) by
85    /// invalidating once up front rather than upserting per write.
86    fn note_vector_write(&self, cache: &VectorCache, chunk_id: &str, emb: &[u8]) -> Result<()> {
87        self.bump_vector_revision()?;
88        if let Some(entries) = cache.borrow_mut().as_mut() {
89            let mut v = unpack_embedding(emb);
90            l2_normalize(&mut v);
91            match entries.iter_mut().find(|(id, _)| id == chunk_id) {
92                Some(slot) => slot.1 = v,
93                None => entries.push((chunk_id.to_string(), v)),
94            }
95        }
96        self.sync_vector_revision()
97    }
98
99    /// Monotonically advance `meta.vector_revision`. Any vector write must call
100    /// this so that other processes detect the change and drop their in-memory
101    /// caches on the next search (see `refresh_vector_caches_if_changed`).
102    fn bump_vector_revision(&self) -> Result<()> {
103        self.conn.execute(
104            "INSERT INTO meta(key, value) VALUES ('vector_revision', '1')
105             ON CONFLICT(key) DO UPDATE SET value=CAST(value AS INTEGER)+1",
106            [],
107        )?;
108        Ok(())
109    }
110
111    /// Align the local revision tracker with the persisted value so an in-place
112    /// cache update performed by this process is not discarded on the next search.
113    fn sync_vector_revision(&self) -> Result<()> {
114        let current = self
115            .get_meta("vector_revision")?
116            .and_then(|v| v.parse::<i64>().ok())
117            .unwrap_or(0);
118        self.vector_cache_revision.set(Some(current));
119        Ok(())
120    }
121
122    /// Drop both in-memory vector caches and reset the revision tracker. Used on
123    /// transaction rollback (in-place upserts may not have persisted) and before
124    /// bulk re-embed loops (to avoid O(N²) in-place upserts on a warm cache).
125    pub(crate) fn invalidate_vector_caches(&self) {
126        *self.vec_content_cache.borrow_mut() = None;
127        *self.vec_trigger_cache.borrow_mut() = None;
128        self.vector_cache_revision.set(None);
129    }
130
131    pub fn get_chunk(&self, id: &str) -> Result<Option<Value>> {
132        let mut stmt = self.conn.prepare_cached("SELECT * FROM chunks WHERE id=?")?;
133        let row = stmt.query_row([id], row_to_json);
134        match row {
135            Ok(v) => Ok(Some(v)),
136            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
137            Err(e) => Err(e.into()),
138        }
139    }
140
141    pub fn update_chunk_state(
142        &self,
143        id: &str,
144        state: &str,
145        reason: Option<&str>,
146        now: &str,
147    ) -> Result<()> {
148        self.conn.execute(
149            "UPDATE chunks SET state=?, state_reason=?, state_updated_at=?, updated_at=? WHERE id=?",
150            params![state, reason, now, now, id],
151        )?;
152        Ok(())
153    }
154
155    pub fn update_chunk_confidence(
156        &self,
157        id: &str,
158        conf: f64,
159        reason: Option<&str>,
160        now: &str,
161    ) -> Result<()> {
162        self.conn.execute(
163            "UPDATE chunks
164             SET confidence=?, confidence_base=?, confidence_reason=?, updated_at=?
165             WHERE id=?",
166            params![conf, conf, reason, now, id],
167        )?;
168        Ok(())
169    }
170
171    pub fn update_chunk_last_used(&self, id: &str, now: &str) -> Result<()> {
172        self.conn.execute(
173            "UPDATE chunks SET last_used_at=?, updated_at=? WHERE id=?",
174            params![now, now, id],
175        )?;
176        Ok(())
177    }
178
179    pub fn get_chunk_by_hash(&self, hash: &str) -> Result<Option<Value>> {
180        let row = self.conn.query_row(
181            "SELECT * FROM chunks WHERE content_hash=? LIMIT 1",
182            [hash],
183            row_to_json,
184        );
185        match row {
186            Ok(v) => Ok(Some(v)),
187            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
188            Err(e) => Err(e.into()),
189        }
190    }
191
192    // ------------------------------------------------------------------
193    // Vector search (pure-Rust cosine similarity, replaces sqlite-vec)
194    // ------------------------------------------------------------------
195
196    pub fn search_vec_content(&self, query: &[f32], limit: usize) -> Result<Vec<(String, f32)>> {
197        self.search_vec(&self.vec_content_cache, "vec_content", query, limit)
198    }
199
200    pub fn search_vec_trigger(&self, query: &[f32], limit: usize) -> Result<Vec<(String, f32)>> {
201        self.search_vec(&self.vec_trigger_cache, "vec_trigger", query, limit)
202    }
203
204    fn search_vec(
205        &self,
206        cache_cell: &VectorCache,
207        table: &str,
208        query: &[f32],
209        limit: usize,
210    ) -> Result<Vec<(String, f32)>> {
211        if limit == 0 {
212            return Ok(Vec::new());
213        }
214        self.refresh_vector_caches_if_changed()?;
215
216        // Populate cache on first access after open or invalidation.
217        // Stored vectors are L2-normalised here so the search inner loop can use
218        // a plain dot product instead of recomputing norms on every comparison.
219        if cache_cell.borrow().is_none() {
220            let sql = format!("SELECT chunk_id, embedding FROM {table}");
221            let mut stmt = self.conn.prepare(&sql)?;
222            let entries: Vec<(String, Vec<f32>)> = stmt
223                .query_map([], |r| {
224                    let id: String = r.get(0)?;
225                    let blob: Vec<u8> = r.get(1)?;
226                    Ok((id, blob))
227                })?
228                .filter_map(|r| r.ok())
229                .map(|(id, blob)| {
230                    let mut v = unpack_embedding(&blob);
231                    l2_normalize(&mut v);
232                    (id, v)
233                })
234                .collect();
235            *cache_cell.borrow_mut() = Some(entries);
236        }
237
238        let cache = cache_cell.borrow();
239        let entries = cache.as_ref().unwrap();
240
241        // Normalise the query once; cached vectors are already unit-length, so
242        // cosine similarity reduces to a dot product over each entry.
243        let mut q = query.to_vec();
244        l2_normalize(&mut q);
245
246        // Score by (index, similarity) without cloning ids; partial-sort the top
247        // `limit` to the front (O(N) select), then clone ids for the winners only.
248        let mut scored: Vec<(usize, f32)> = entries
249            .iter()
250            .enumerate()
251            .map(|(i, (_, v))| (i, dot_product(&q, v)))
252            .collect();
253        if scored.len() > limit {
254            scored.select_nth_unstable_by(limit - 1, |a, b| {
255                b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)
256            });
257            scored.truncate(limit);
258        }
259        scored.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
260        Ok(scored
261            .into_iter()
262            .map(|(i, sim)| (entries[i].0.clone(), sim))
263            .collect())
264    }
265
266    fn refresh_vector_caches_if_changed(&self) -> Result<()> {
267        let current = self
268            .get_meta("vector_revision")?
269            .and_then(|value| value.parse::<i64>().ok())
270            .unwrap_or(0);
271        let previous = self.vector_cache_revision.replace(Some(current));
272        if previous.is_some_and(|revision| revision != current) {
273            *self.vec_content_cache.borrow_mut() = None;
274            *self.vec_trigger_cache.borrow_mut() = None;
275        }
276        Ok(())
277    }
278
279    /// Fetch multiple chunks by id in one query; returns a map of id → chunk JSON.
280    pub fn get_chunks_by_ids(&self, ids: &[&str]) -> Result<HashMap<String, Value>> {
281        if ids.is_empty() {
282            return Ok(HashMap::new());
283        }
284        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
285        let sql = format!("SELECT * FROM chunks WHERE id IN ({placeholders})");
286        let mut stmt = self.conn.prepare(&sql)?;
287        let names: Vec<String> = stmt.column_names().iter().map(|s| s.to_string()).collect();
288        let rows = stmt.query_map(rusqlite::params_from_iter(ids.iter()), |r| {
289            row_to_json_with_names(r, &names)
290        })?;
291        let mut map = HashMap::with_capacity(ids.len());
292        for row in rows.filter_map(|r| r.ok()) {
293            if let Some(id) = row.get("id").and_then(Value::as_str) {
294                map.insert(id.to_string(), row);
295            }
296        }
297        Ok(map)
298    }
299
300    // ------------------------------------------------------------------
301    // Invalidated hashes
302    // ------------------------------------------------------------------
303
304    pub fn is_hash_invalidated(&self, hash: &str) -> Result<bool> {
305        let count: i64 = self.conn.query_row(
306            "SELECT count(*) FROM invalidated_hashes WHERE content_hash=?",
307            [hash],
308            |r| r.get(0),
309        )?;
310        Ok(count > 0)
311    }
312
313    pub fn insert_invalidated_hash(
314        &self,
315        hash: &str,
316        reason: Option<&str>,
317        ts: &str,
318    ) -> Result<()> {
319        self.conn.execute(
320            "INSERT OR IGNORE INTO invalidated_hashes(content_hash, reason, ts) VALUES (?,?,?)",
321            params![hash, reason, ts],
322        )?;
323        Ok(())
324    }
325
326    // ------------------------------------------------------------------
327    // Usage trace
328    // ------------------------------------------------------------------
329
330    // Chunk queries (aggregate / curate helpers)
331    // ------------------------------------------------------------------
332
333    pub(crate) fn query_chunks(&self, sql: &str) -> Result<Vec<Value>> {
334        self.query_json(sql, params![])
335    }
336
337    pub(crate) fn query_chunks_params<P: rusqlite::Params>(
338        &self,
339        sql: &str,
340        p: P,
341    ) -> Result<Vec<Value>> {
342        self.query_json(sql, p)
343    }
344
345    // ------------------------------------------------------------------
346    // Deps
347    // ------------------------------------------------------------------
348
349    pub fn get_deps(&self, chunk_id: &str) -> Result<Vec<DepEdge>> {
350        let mut stmt = self
351            .conn
352            .prepare_cached("SELECT dst, kind, dst_lib FROM deps WHERE src=?")?;
353        let rows = stmt.query_map([chunk_id], |r| {
354            Ok((
355                r.get::<_, String>(0)?,
356                r.get::<_, String>(1)?,
357                r.get::<_, Option<String>>(2)?,
358            ))
359        })?;
360        Ok(rows.filter_map(|r| r.ok()).collect())
361    }
362
363    /// Batch variant of `get_deps`: fetch outgoing edges for many sources in one
364    /// query. Returns `src` → `[(dst, kind, dst_lib)]`. Sources with no edges are
365    /// simply absent from the map.
366    pub fn get_deps_batch(&self, srcs: &[&str]) -> Result<HashMap<String, Vec<DepEdge>>> {
367        if srcs.is_empty() {
368            return Ok(HashMap::new());
369        }
370        let placeholders = srcs.iter().map(|_| "?").collect::<Vec<_>>().join(",");
371        let sql = format!("SELECT src, dst, kind, dst_lib FROM deps WHERE src IN ({placeholders})");
372        let mut stmt = self.conn.prepare(&sql)?;
373        let rows = stmt.query_map(rusqlite::params_from_iter(srcs.iter()), |r| {
374            Ok((
375                r.get::<_, String>(0)?,
376                r.get::<_, String>(1)?,
377                r.get::<_, String>(2)?,
378                r.get::<_, Option<String>>(3)?,
379            ))
380        })?;
381        let mut map: HashMap<String, Vec<(String, String, Option<String>)>> = HashMap::new();
382        for (src, dst, kind, lib) in rows.filter_map(|r| r.ok()) {
383            map.entry(src).or_default().push((dst, kind, lib));
384        }
385        Ok(map)
386    }
387
388    pub fn get_reverse_deps(&self, chunk_id: &str) -> Result<Vec<String>> {
389        let mut stmt = self
390            .conn
391            .prepare_cached("SELECT src FROM deps WHERE dst=?")?;
392        let rows = stmt.query_map([chunk_id], |r| r.get::<_, String>(0))?;
393        Ok(rows.filter_map(|r| r.ok()).collect())
394    }
395
396    pub fn insert_dep(
397        &self,
398        src: &str,
399        dst: &str,
400        kind: &str,
401        dst_lib: Option<&str>,
402    ) -> Result<()> {
403        self.conn.execute(
404            "INSERT OR IGNORE INTO deps(src,dst,kind,dst_lib) VALUES (?,?,?,?)",
405            params![src, dst, kind, dst_lib],
406        )?;
407        Ok(())
408    }
409
410    // ------------------------------------------------------------------
411    // Chunk success traces (aggregate fact table)
412    // ------------------------------------------------------------------
413
414    pub fn upsert_chunk_success_trace(
415        &self,
416        chunk_id: &str,
417        trace_id: &str,
418        ts: &str,
419    ) -> Result<()> {
420        self.conn.execute(
421            "INSERT OR IGNORE INTO chunk_success_traces(chunk_id, trace_id, ts) VALUES (?,?,?)",
422            params![chunk_id, trace_id, ts],
423        )?;
424        Ok(())
425    }
426
427    // ------------------------------------------------------------------
428}