1use super::*;
2
3impl Storage {
4 pub fn insert_chunk(&self, c: &ChunkRow) -> Result<()> {
5 self.conn.execute(
6 "INSERT INTO chunks (
7 id, skill_name, seq, content, trigger_desc, anti_trigger_desc,
8 content_hash, token_count, origin, source, maturity, related_ids,
9 protected, state, state_reason, state_updated_at,
10 confidence, confidence_base, confidence_reason, version, distilled_from,
11 distill_provider, distill_model, distill_prompt_version, parent_id,
12 selected_count, used_count, used_success_count,
13 success_trace_ids_count, last_success_at, last_agg_ts,
14 embed_version, created_at, updated_at, last_used_at
15 ) VALUES (
16 ?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,
17 ?13,?14,?15,?16,?17,?18,?19,?20,?21,?22,?23,?24,?25,
18 ?26,?27,?28,?29,?30,?31,?32,?33,?34,?35
19 )",
20 params![
21 c.id,
22 c.skill_name,
23 c.seq,
24 c.content,
25 c.trigger_desc,
26 c.anti_trigger_desc,
27 c.content_hash,
28 c.token_count,
29 c.origin,
30 c.source,
31 c.maturity,
32 c.related_ids,
33 c.protected,
34 c.state,
35 c.state_reason,
36 c.state_updated_at,
37 c.confidence,
38 c.confidence,
39 c.confidence_reason,
40 c.version,
41 c.distilled_from,
42 c.distill_provider,
43 c.distill_model,
44 c.distill_prompt_version,
45 c.parent_id,
46 c.selected_count,
47 c.used_count,
48 c.used_success_count,
49 c.success_trace_ids_count,
50 c.last_success_at,
51 c.last_agg_ts,
52 c.embed_version,
53 c.created_at,
54 c.updated_at,
55 c.last_used_at
56 ],
57 )?;
58 Ok(())
59 }
60
61 pub fn insert_vec_content(&self, chunk_id: &str, emb: &[u8]) -> Result<()> {
62 self.conn.execute(
63 "INSERT OR REPLACE INTO vec_content(chunk_id, embedding) VALUES (?,?)",
64 params![chunk_id, emb],
65 )?;
66 self.note_vector_write(&self.vec_content_cache, chunk_id, emb)
67 }
68
69 pub fn insert_vec_trigger(&self, chunk_id: &str, emb: &[u8]) -> Result<()> {
70 self.conn.execute(
71 "INSERT OR REPLACE INTO vec_trigger(chunk_id, embedding) VALUES (?,?)",
72 params![chunk_id, emb],
73 )?;
74 self.note_vector_write(&self.vec_trigger_cache, chunk_id, emb)
75 }
76
77 fn note_vector_write(&self, cache: &VectorCache, chunk_id: &str, emb: &[u8]) -> Result<()> {
87 self.bump_vector_revision()?;
88 if let Some(entries) = cache.borrow_mut().as_mut() {
89 let mut v = unpack_embedding(emb);
90 l2_normalize(&mut v);
91 match entries.iter_mut().find(|(id, _)| id == chunk_id) {
92 Some(slot) => slot.1 = v,
93 None => entries.push((chunk_id.to_string(), v)),
94 }
95 }
96 self.sync_vector_revision()
97 }
98
99 fn bump_vector_revision(&self) -> Result<()> {
103 self.conn.execute(
104 "INSERT INTO meta(key, value) VALUES ('vector_revision', '1')
105 ON CONFLICT(key) DO UPDATE SET value=CAST(value AS INTEGER)+1",
106 [],
107 )?;
108 Ok(())
109 }
110
111 fn sync_vector_revision(&self) -> Result<()> {
114 let current = self
115 .get_meta("vector_revision")?
116 .and_then(|v| v.parse::<i64>().ok())
117 .unwrap_or(0);
118 self.vector_cache_revision.set(Some(current));
119 Ok(())
120 }
121
122 pub(crate) fn invalidate_vector_caches(&self) {
126 *self.vec_content_cache.borrow_mut() = None;
127 *self.vec_trigger_cache.borrow_mut() = None;
128 self.vector_cache_revision.set(None);
129 }
130
131 pub fn list_chunks(
135 &self,
136 state: Option<&str>,
137 origin: Option<&str>,
138 limit: usize,
139 offset: usize,
140 ) -> Result<Vec<Value>> {
141 let mut sql = String::from(
142 "SELECT id, skill_name, seq, origin, state, state_reason, maturity, \
143 confidence, token_count, protected, selected_count, used_count, \
144 used_success_count, substr(content, 1, 280) AS content_preview, \
145 created_at, updated_at, last_used_at \
146 FROM chunks",
147 );
148 let mut clauses: Vec<&str> = Vec::new();
149 if state.is_some() {
150 clauses.push("state = :state");
151 }
152 if origin.is_some() {
153 clauses.push("origin = :origin");
154 }
155 if !clauses.is_empty() {
156 sql.push_str(" WHERE ");
157 sql.push_str(&clauses.join(" AND "));
158 }
159 sql.push_str(" ORDER BY created_at DESC LIMIT :limit OFFSET :offset");
160
161 let mut stmt = self.conn.prepare(&sql)?;
162 let names: Vec<String> = stmt.column_names().into_iter().map(String::from).collect();
163 let mut params: Vec<(&str, &dyn rusqlite::ToSql)> = Vec::new();
164 if let Some(s) = state.as_ref() {
165 params.push((":state", s));
166 }
167 if let Some(o) = origin.as_ref() {
168 params.push((":origin", o));
169 }
170 let limit_i = limit as i64;
171 let offset_i = offset as i64;
172 params.push((":limit", &limit_i));
173 params.push((":offset", &offset_i));
174
175 let rows = stmt.query_map(params.as_slice(), |r| row_to_json_with_names(r, &names))?;
176 let mut out = Vec::new();
177 for row in rows {
178 out.push(row?);
179 }
180 Ok(out)
181 }
182
183 pub fn get_chunk(&self, id: &str) -> Result<Option<Value>> {
184 let mut stmt = self
185 .conn
186 .prepare_cached("SELECT * FROM chunks WHERE id=?")?;
187 let row = stmt.query_row([id], row_to_json);
188 match row {
189 Ok(v) => Ok(Some(v)),
190 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
191 Err(e) => Err(e.into()),
192 }
193 }
194
195 pub fn update_chunk_state(
196 &self,
197 id: &str,
198 state: &str,
199 reason: Option<&str>,
200 now: &str,
201 ) -> Result<()> {
202 self.conn.execute(
203 "UPDATE chunks SET state=?, state_reason=?, state_updated_at=?, updated_at=? WHERE id=?",
204 params![state, reason, now, now, id],
205 )?;
206 Ok(())
207 }
208
209 pub fn update_chunk_confidence(
210 &self,
211 id: &str,
212 conf: f64,
213 reason: Option<&str>,
214 now: &str,
215 ) -> Result<()> {
216 self.conn.execute(
217 "UPDATE chunks
218 SET confidence=?, confidence_base=?, confidence_reason=?, updated_at=?
219 WHERE id=?",
220 params![conf, conf, reason, now, id],
221 )?;
222 Ok(())
223 }
224
225 pub fn update_chunk_last_used(&self, id: &str, now: &str) -> Result<()> {
226 self.conn.execute(
227 "UPDATE chunks SET last_used_at=?, updated_at=? WHERE id=?",
228 params![now, now, id],
229 )?;
230 Ok(())
231 }
232
233 pub fn get_chunk_by_hash(&self, hash: &str) -> Result<Option<Value>> {
234 let row = self.conn.query_row(
235 "SELECT * FROM chunks WHERE content_hash=? LIMIT 1",
236 [hash],
237 row_to_json,
238 );
239 match row {
240 Ok(v) => Ok(Some(v)),
241 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
242 Err(e) => Err(e.into()),
243 }
244 }
245
246 pub fn search_vec_content(&self, query: &[f32], limit: usize) -> Result<Vec<(String, f32)>> {
251 self.search_vec(&self.vec_content_cache, "vec_content", query, limit)
252 }
253
254 pub fn search_vec_trigger(&self, query: &[f32], limit: usize) -> Result<Vec<(String, f32)>> {
255 self.search_vec(&self.vec_trigger_cache, "vec_trigger", query, limit)
256 }
257
258 fn search_vec(
259 &self,
260 cache_cell: &VectorCache,
261 table: &str,
262 query: &[f32],
263 limit: usize,
264 ) -> Result<Vec<(String, f32)>> {
265 if limit == 0 {
266 return Ok(Vec::new());
267 }
268 self.refresh_vector_caches_if_changed()?;
269
270 if cache_cell.borrow().is_none() {
274 let sql = format!("SELECT chunk_id, embedding FROM {table}");
275 let mut stmt = self.conn.prepare(&sql)?;
276 let raw: Vec<(String, Vec<u8>)> = stmt
277 .query_map([], |r| {
278 Ok((r.get::<_, String>(0)?, r.get::<_, Vec<u8>>(1)?))
279 })?
280 .collect::<rusqlite::Result<Vec<_>>>()?;
281 let mut entries: Vec<(String, Vec<f32>)> = Vec::with_capacity(raw.len());
282 for (id, blob) in raw {
283 if blob.is_empty() || blob.len() % 4 != 0 {
287 return Err(crate::errors::InnateError::Other(format!(
288 "corrupt embedding for chunk {id} in {table}: {} bytes (not a non-zero multiple of 4)",
289 blob.len()
290 )));
291 }
292 let mut v = unpack_embedding(&blob);
293 l2_normalize(&mut v);
294 entries.push((id, v));
295 }
296 *cache_cell.borrow_mut() = Some(entries);
297 }
298
299 let cache = cache_cell.borrow();
300 let entries = cache.as_ref().unwrap();
301
302 let mut q = query.to_vec();
305 l2_normalize(&mut q);
306
307 let mut scored: Vec<(usize, f32)> = entries
313 .iter()
314 .enumerate()
315 .filter(|(_, (_, v))| v.len() == q.len())
316 .map(|(i, (_, v))| (i, dot_product(&q, v)))
317 .collect();
318 if scored.len() > limit {
319 scored.select_nth_unstable_by(limit - 1, |a, b| {
320 b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)
321 });
322 scored.truncate(limit);
323 }
324 scored.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
325 Ok(scored
326 .into_iter()
327 .map(|(i, sim)| (entries[i].0.clone(), sim))
328 .collect())
329 }
330
331 fn refresh_vector_caches_if_changed(&self) -> Result<()> {
332 let current = self
333 .get_meta("vector_revision")?
334 .and_then(|value| value.parse::<i64>().ok())
335 .unwrap_or(0);
336 let previous = self.vector_cache_revision.replace(Some(current));
337 if previous.is_some_and(|revision| revision != current) {
338 *self.vec_content_cache.borrow_mut() = None;
339 *self.vec_trigger_cache.borrow_mut() = None;
340 }
341 Ok(())
342 }
343
344 pub fn get_chunks_by_ids(&self, ids: &[&str]) -> Result<HashMap<String, Value>> {
346 if ids.is_empty() {
347 return Ok(HashMap::new());
348 }
349 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
350 let sql = format!("SELECT * FROM chunks WHERE id IN ({placeholders})");
351 let mut stmt = self.conn.prepare(&sql)?;
352 let names: Vec<String> = stmt.column_names().iter().map(|s| s.to_string()).collect();
353 let rows = stmt.query_map(rusqlite::params_from_iter(ids.iter()), |r| {
354 row_to_json_with_names(r, &names)
355 })?;
356 let mut map = HashMap::with_capacity(ids.len());
357 for row in rows {
358 let row = row?;
359 if let Some(id) = row.get("id").and_then(Value::as_str) {
360 map.insert(id.to_string(), row);
361 }
362 }
363 Ok(map)
364 }
365
366 pub fn is_hash_invalidated(&self, hash: &str) -> Result<bool> {
371 let count: i64 = self.conn.query_row(
372 "SELECT count(*) FROM invalidated_hashes WHERE content_hash=?",
373 [hash],
374 |r| r.get(0),
375 )?;
376 Ok(count > 0)
377 }
378
379 pub fn insert_invalidated_hash(
380 &self,
381 hash: &str,
382 reason: Option<&str>,
383 ts: &str,
384 ) -> Result<()> {
385 self.conn.execute(
386 "INSERT OR IGNORE INTO invalidated_hashes(content_hash, reason, ts) VALUES (?,?,?)",
387 params![hash, reason, ts],
388 )?;
389 Ok(())
390 }
391
392 pub(crate) fn query_chunks(&self, sql: &str) -> Result<Vec<Value>> {
400 self.query_json(sql, params![])
401 }
402
403 pub(crate) fn query_chunks_params<P: rusqlite::Params>(
404 &self,
405 sql: &str,
406 p: P,
407 ) -> Result<Vec<Value>> {
408 self.query_json(sql, p)
409 }
410
411 pub fn get_deps(&self, chunk_id: &str) -> Result<Vec<DepEdge>> {
416 let mut stmt = self
417 .conn
418 .prepare_cached("SELECT dst, kind, dst_lib FROM deps WHERE src=?")?;
419 let rows = stmt.query_map([chunk_id], |r| {
420 Ok((
421 r.get::<_, String>(0)?,
422 r.get::<_, String>(1)?,
423 r.get::<_, Option<String>>(2)?,
424 ))
425 })?;
426 Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
427 }
428
429 pub fn get_deps_batch(&self, srcs: &[&str]) -> Result<HashMap<String, Vec<DepEdge>>> {
433 if srcs.is_empty() {
434 return Ok(HashMap::new());
435 }
436 let placeholders = srcs.iter().map(|_| "?").collect::<Vec<_>>().join(",");
437 let sql = format!("SELECT src, dst, kind, dst_lib FROM deps WHERE src IN ({placeholders})");
438 let mut stmt = self.conn.prepare(&sql)?;
439 let rows = stmt.query_map(rusqlite::params_from_iter(srcs.iter()), |r| {
440 Ok((
441 r.get::<_, String>(0)?,
442 r.get::<_, String>(1)?,
443 r.get::<_, String>(2)?,
444 r.get::<_, Option<String>>(3)?,
445 ))
446 })?;
447 let mut map: HashMap<String, Vec<(String, String, Option<String>)>> = HashMap::new();
448 for row in rows {
449 let (src, dst, kind, lib) = row?;
450 map.entry(src).or_default().push((dst, kind, lib));
451 }
452 Ok(map)
453 }
454
455 pub fn get_reverse_deps(&self, chunk_id: &str) -> Result<Vec<String>> {
456 let mut stmt = self
457 .conn
458 .prepare_cached("SELECT src FROM deps WHERE dst=?")?;
459 let rows = stmt.query_map([chunk_id], |r| r.get::<_, String>(0))?;
460 Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
461 }
462
463 pub fn insert_dep(
464 &self,
465 src: &str,
466 dst: &str,
467 kind: &str,
468 dst_lib: Option<&str>,
469 ) -> Result<()> {
470 self.conn.execute(
471 "INSERT OR IGNORE INTO deps(src,dst,kind,dst_lib) VALUES (?,?,?,?)",
472 params![src, dst, kind, dst_lib],
473 )?;
474 Ok(())
475 }
476
477 pub fn upsert_chunk_success_trace(
482 &self,
483 chunk_id: &str,
484 trace_id: &str,
485 ts: &str,
486 ) -> Result<()> {
487 self.conn.execute(
488 "INSERT OR IGNORE INTO chunk_success_traces(chunk_id, trace_id, ts) VALUES (?,?,?)",
489 params![chunk_id, trace_id, ts],
490 )?;
491 Ok(())
492 }
493
494 }