1use super::*;
2
3impl Storage {
4 pub fn insert_chunk(&self, c: &ChunkRow) -> Result<()> {
5 self.conn.execute(
6 "INSERT INTO chunks (
7 id, skill_name, seq, content, trigger_desc, anti_trigger_desc,
8 content_hash, token_count, origin, source, maturity, related_ids,
9 protected, state, state_reason, state_updated_at,
10 confidence, confidence_base, confidence_reason, version, distilled_from,
11 distill_provider, distill_model, distill_prompt_version, parent_id,
12 selected_count, used_count, used_success_count,
13 success_trace_ids_count, last_success_at, last_agg_ts,
14 embed_version, created_at, updated_at, last_used_at
15 ) VALUES (
16 ?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,
17 ?13,?14,?15,?16,?17,?18,?19,?20,?21,?22,?23,?24,?25,
18 ?26,?27,?28,?29,?30,?31,?32,?33,?34,?35
19 )",
20 params![
21 c.id,
22 c.skill_name,
23 c.seq,
24 c.content,
25 c.trigger_desc,
26 c.anti_trigger_desc,
27 c.content_hash,
28 c.token_count,
29 c.origin,
30 c.source,
31 c.maturity,
32 c.related_ids,
33 c.protected,
34 c.state,
35 c.state_reason,
36 c.state_updated_at,
37 c.confidence,
38 c.confidence,
39 c.confidence_reason,
40 c.version,
41 c.distilled_from,
42 c.distill_provider,
43 c.distill_model,
44 c.distill_prompt_version,
45 c.parent_id,
46 c.selected_count,
47 c.used_count,
48 c.used_success_count,
49 c.success_trace_ids_count,
50 c.last_success_at,
51 c.last_agg_ts,
52 c.embed_version,
53 c.created_at,
54 c.updated_at,
55 c.last_used_at
56 ],
57 )?;
58 Ok(())
59 }
60
61 pub fn insert_vec_content(&self, chunk_id: &str, emb: &[u8]) -> Result<()> {
62 self.conn.execute(
63 "INSERT OR REPLACE INTO vec_content(chunk_id, embedding) VALUES (?,?)",
64 params![chunk_id, emb],
65 )?;
66 self.note_vector_write(&self.vec_content_cache, chunk_id, emb)
67 }
68
69 pub fn insert_vec_trigger(&self, chunk_id: &str, emb: &[u8]) -> Result<()> {
70 self.conn.execute(
71 "INSERT OR REPLACE INTO vec_trigger(chunk_id, embedding) VALUES (?,?)",
72 params![chunk_id, emb],
73 )?;
74 self.note_vector_write(&self.vec_trigger_cache, chunk_id, emb)
75 }
76
77 fn note_vector_write(&self, cache: &VectorCache, chunk_id: &str, emb: &[u8]) -> Result<()> {
87 self.bump_vector_revision()?;
88 if let Some(entries) = cache.borrow_mut().as_mut() {
89 let mut v = unpack_embedding(emb);
90 l2_normalize(&mut v);
91 match entries.iter_mut().find(|(id, _)| id == chunk_id) {
92 Some(slot) => slot.1 = v,
93 None => entries.push((chunk_id.to_string(), v)),
94 }
95 }
96 self.sync_vector_revision()
97 }
98
99 fn bump_vector_revision(&self) -> Result<()> {
103 self.conn.execute(
104 "INSERT INTO meta(key, value) VALUES ('vector_revision', '1')
105 ON CONFLICT(key) DO UPDATE SET value=CAST(value AS INTEGER)+1",
106 [],
107 )?;
108 Ok(())
109 }
110
111 fn sync_vector_revision(&self) -> Result<()> {
114 let current = self
115 .get_meta("vector_revision")?
116 .and_then(|v| v.parse::<i64>().ok())
117 .unwrap_or(0);
118 self.vector_cache_revision.set(Some(current));
119 Ok(())
120 }
121
122 pub(crate) fn invalidate_vector_caches(&self) {
126 *self.vec_content_cache.borrow_mut() = None;
127 *self.vec_trigger_cache.borrow_mut() = None;
128 self.vector_cache_revision.set(None);
129 }
130
131 pub fn list_chunks(
135 &self,
136 state: Option<&str>,
137 origin: Option<&str>,
138 limit: usize,
139 offset: usize,
140 ) -> Result<Vec<Value>> {
141 let mut sql = String::from(
142 "SELECT id, skill_name, seq, origin, state, state_reason, maturity, \
143 confidence, token_count, protected, selected_count, used_count, \
144 used_success_count, substr(content, 1, 280) AS content_preview, \
145 created_at, updated_at, last_used_at \
146 FROM chunks",
147 );
148 let mut clauses: Vec<&str> = Vec::new();
149 if state.is_some() {
150 clauses.push("state = :state");
151 }
152 if origin.is_some() {
153 clauses.push("origin = :origin");
154 }
155 if !clauses.is_empty() {
156 sql.push_str(" WHERE ");
157 sql.push_str(&clauses.join(" AND "));
158 }
159 sql.push_str(" ORDER BY created_at DESC LIMIT :limit OFFSET :offset");
160
161 let mut stmt = self.conn.prepare(&sql)?;
162 let names: Vec<String> = stmt.column_names().into_iter().map(String::from).collect();
163 let mut params: Vec<(&str, &dyn rusqlite::ToSql)> = Vec::new();
164 if let Some(s) = state.as_ref() {
165 params.push((":state", s));
166 }
167 if let Some(o) = origin.as_ref() {
168 params.push((":origin", o));
169 }
170 let limit_i = limit as i64;
171 let offset_i = offset as i64;
172 params.push((":limit", &limit_i));
173 params.push((":offset", &offset_i));
174
175 let rows = stmt.query_map(params.as_slice(), |r| row_to_json_with_names(r, &names))?;
176 let mut out = Vec::new();
177 for row in rows {
178 out.push(row?);
179 }
180 Ok(out)
181 }
182
183 pub fn get_chunk(&self, id: &str) -> Result<Option<Value>> {
184 let mut stmt = self
185 .conn
186 .prepare_cached("SELECT * FROM chunks WHERE id=?")?;
187 let row = stmt.query_row([id], row_to_json);
188 match row {
189 Ok(v) => Ok(Some(v)),
190 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
191 Err(e) => Err(e.into()),
192 }
193 }
194
195 pub fn update_chunk_state(
196 &self,
197 id: &str,
198 state: &str,
199 reason: Option<&str>,
200 now: &str,
201 ) -> Result<()> {
202 self.conn.execute(
203 "UPDATE chunks SET state=?, state_reason=?, state_updated_at=?, updated_at=? WHERE id=?",
204 params![state, reason, now, now, id],
205 )?;
206 Ok(())
207 }
208
209 pub fn update_chunk_confidence(
210 &self,
211 id: &str,
212 conf: f64,
213 reason: Option<&str>,
214 now: &str,
215 ) -> Result<()> {
216 self.conn.execute(
217 "UPDATE chunks
218 SET confidence=?, confidence_base=?, confidence_reason=?, updated_at=?
219 WHERE id=?",
220 params![conf, conf, reason, now, id],
221 )?;
222 Ok(())
223 }
224
225 pub fn update_chunk_last_used(&self, id: &str, now: &str) -> Result<()> {
226 self.conn.execute(
227 "UPDATE chunks SET last_used_at=?, updated_at=? WHERE id=?",
228 params![now, now, id],
229 )?;
230 Ok(())
231 }
232
233 pub fn get_chunk_by_hash(&self, hash: &str) -> Result<Option<Value>> {
234 let row = self.conn.query_row(
235 "SELECT * FROM chunks WHERE content_hash=? LIMIT 1",
236 [hash],
237 row_to_json,
238 );
239 match row {
240 Ok(v) => Ok(Some(v)),
241 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
242 Err(e) => Err(e.into()),
243 }
244 }
245
246 pub fn search_vec_content(&self, query: &[f32], limit: usize) -> Result<Vec<(String, f32)>> {
251 self.search_vec(&self.vec_content_cache, "vec_content", query, limit)
252 }
253
254 pub fn search_vec_trigger(&self, query: &[f32], limit: usize) -> Result<Vec<(String, f32)>> {
255 self.search_vec(&self.vec_trigger_cache, "vec_trigger", query, limit)
256 }
257
258 fn search_vec(
259 &self,
260 cache_cell: &VectorCache,
261 table: &str,
262 query: &[f32],
263 limit: usize,
264 ) -> Result<Vec<(String, f32)>> {
265 if limit == 0 {
266 return Ok(Vec::new());
267 }
268 self.refresh_vector_caches_if_changed()?;
269
270 if cache_cell.borrow().is_none() {
274 let sql = format!("SELECT chunk_id, embedding FROM {table}");
275 let mut stmt = self.conn.prepare(&sql)?;
276 let raw: Vec<(String, Vec<u8>)> = stmt
277 .query_map([], |r| {
278 Ok((r.get::<_, String>(0)?, r.get::<_, Vec<u8>>(1)?))
279 })?
280 .collect::<rusqlite::Result<Vec<_>>>()?;
281 let mut entries: Vec<(String, Vec<f32>)> = Vec::with_capacity(raw.len());
282 for (id, blob) in raw {
283 if blob.is_empty() || blob.len() % 4 != 0 {
287 return Err(crate::errors::InnateError::Other(format!(
288 "corrupt embedding for chunk {id} in {table}: {} bytes (not a non-zero multiple of 4)",
289 blob.len()
290 )));
291 }
292 let mut v = unpack_embedding(&blob);
293 l2_normalize(&mut v);
294 entries.push((id, v));
295 }
296 *cache_cell.borrow_mut() = Some(entries);
297 }
298
299 let cache = cache_cell.borrow();
300 let entries = cache.as_ref().unwrap();
301
302 let mut q = query.to_vec();
305 l2_normalize(&mut q);
306
307 let mut scored: Vec<(usize, f32)> = entries
313 .iter()
314 .enumerate()
315 .filter(|(_, (_, v))| v.len() == q.len())
316 .map(|(i, (_, v))| (i, dot_product(&q, v)))
317 .collect();
318 if scored.len() > limit {
319 scored.select_nth_unstable_by(limit - 1, |a, b| {
320 b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)
321 });
322 scored.truncate(limit);
323 }
324 scored.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
325 Ok(scored
326 .into_iter()
327 .map(|(i, sim)| (entries[i].0.clone(), sim))
328 .collect())
329 }
330
331 pub fn search_lexical(&self, query: &str, limit: usize) -> Result<Vec<(String, f32)>> {
337 if limit == 0 {
338 return Ok(Vec::new());
339 }
340 let Some(match_expr) = fts5_match_query(query) else {
341 return Ok(Vec::new());
342 };
343 let mut stmt = self.conn.prepare_cached(
344 "SELECT chunks_fts.id, bm25(chunks_fts) AS score
345 FROM chunks_fts
346 JOIN chunks c ON c.id = chunks_fts.id
347 WHERE chunks_fts MATCH ?1
348 AND c.state != 'archived' AND c.origin != 'spark'
349 ORDER BY score ASC
350 LIMIT ?2",
351 )?;
352 let rows = stmt.query_map(params![match_expr, limit as i64], |r| {
353 Ok((r.get::<_, String>(0)?, r.get::<_, f64>(1)?))
354 })?;
355 let raw: Vec<(String, f64)> = rows.collect::<rusqlite::Result<Vec<_>>>()?;
359 let best_rel = raw.iter().map(|(_, s)| -s).fold(f64::MIN, f64::max);
360 let out = raw
361 .into_iter()
362 .enumerate()
363 .map(|(i, (id, score))| {
364 let sim = if best_rel > 0.0 {
365 (-score / best_rel).clamp(0.0, 1.0) as f32
366 } else {
367 ((limit - i) as f32) / (limit as f32)
369 };
370 (id, sim)
371 })
372 .collect();
373 Ok(out)
374 }
375
376 fn refresh_vector_caches_if_changed(&self) -> Result<()> {
377 let current = self
378 .get_meta("vector_revision")?
379 .and_then(|value| value.parse::<i64>().ok())
380 .unwrap_or(0);
381 let previous = self.vector_cache_revision.replace(Some(current));
382 if previous.is_some_and(|revision| revision != current) {
383 *self.vec_content_cache.borrow_mut() = None;
384 *self.vec_trigger_cache.borrow_mut() = None;
385 }
386 Ok(())
387 }
388
389 pub fn get_chunks_by_ids(&self, ids: &[&str]) -> Result<HashMap<String, Value>> {
391 if ids.is_empty() {
392 return Ok(HashMap::new());
393 }
394 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
395 let sql = format!("SELECT * FROM chunks WHERE id IN ({placeholders})");
396 let mut stmt = self.conn.prepare(&sql)?;
397 let names: Vec<String> = stmt.column_names().iter().map(|s| s.to_string()).collect();
398 let rows = stmt.query_map(rusqlite::params_from_iter(ids.iter()), |r| {
399 row_to_json_with_names(r, &names)
400 })?;
401 let mut map = HashMap::with_capacity(ids.len());
402 for row in rows {
403 let row = row?;
404 if let Some(id) = row.get("id").and_then(Value::as_str) {
405 map.insert(id.to_string(), row);
406 }
407 }
408 Ok(map)
409 }
410
411 pub fn is_hash_invalidated(&self, hash: &str) -> Result<bool> {
416 let count: i64 = self.conn.query_row(
417 "SELECT count(*) FROM invalidated_hashes WHERE content_hash=?",
418 [hash],
419 |r| r.get(0),
420 )?;
421 Ok(count > 0)
422 }
423
424 pub fn insert_invalidated_hash(
425 &self,
426 hash: &str,
427 reason: Option<&str>,
428 ts: &str,
429 ) -> Result<()> {
430 self.conn.execute(
431 "INSERT OR IGNORE INTO invalidated_hashes(content_hash, reason, ts) VALUES (?,?,?)",
432 params![hash, reason, ts],
433 )?;
434 Ok(())
435 }
436
437 pub(crate) fn query_chunks(&self, sql: &str) -> Result<Vec<Value>> {
445 self.query_json(sql, params![])
446 }
447
448 pub(crate) fn query_chunks_params<P: rusqlite::Params>(
449 &self,
450 sql: &str,
451 p: P,
452 ) -> Result<Vec<Value>> {
453 self.query_json(sql, p)
454 }
455
456 pub fn get_deps(&self, chunk_id: &str) -> Result<Vec<DepEdge>> {
461 let mut stmt = self
462 .conn
463 .prepare_cached("SELECT dst, kind, dst_lib FROM deps WHERE src=?")?;
464 let rows = stmt.query_map([chunk_id], |r| {
465 Ok((
466 r.get::<_, String>(0)?,
467 r.get::<_, String>(1)?,
468 r.get::<_, Option<String>>(2)?,
469 ))
470 })?;
471 Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
472 }
473
474 pub fn get_deps_batch(&self, srcs: &[&str]) -> Result<HashMap<String, Vec<DepEdge>>> {
478 if srcs.is_empty() {
479 return Ok(HashMap::new());
480 }
481 let placeholders = srcs.iter().map(|_| "?").collect::<Vec<_>>().join(",");
482 let sql = format!("SELECT src, dst, kind, dst_lib FROM deps WHERE src IN ({placeholders})");
483 let mut stmt = self.conn.prepare(&sql)?;
484 let rows = stmt.query_map(rusqlite::params_from_iter(srcs.iter()), |r| {
485 Ok((
486 r.get::<_, String>(0)?,
487 r.get::<_, String>(1)?,
488 r.get::<_, String>(2)?,
489 r.get::<_, Option<String>>(3)?,
490 ))
491 })?;
492 let mut map: HashMap<String, Vec<(String, String, Option<String>)>> = HashMap::new();
493 for row in rows {
494 let (src, dst, kind, lib) = row?;
495 map.entry(src).or_default().push((dst, kind, lib));
496 }
497 Ok(map)
498 }
499
500 pub fn get_reverse_deps(&self, chunk_id: &str) -> Result<Vec<String>> {
501 let mut stmt = self
502 .conn
503 .prepare_cached("SELECT src FROM deps WHERE dst=?")?;
504 let rows = stmt.query_map([chunk_id], |r| r.get::<_, String>(0))?;
505 Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
506 }
507
508 pub fn insert_dep(
509 &self,
510 src: &str,
511 dst: &str,
512 kind: &str,
513 dst_lib: Option<&str>,
514 ) -> Result<()> {
515 self.conn.execute(
516 "INSERT OR IGNORE INTO deps(src,dst,kind,dst_lib) VALUES (?,?,?,?)",
517 params![src, dst, kind, dst_lib],
518 )?;
519 Ok(())
520 }
521
522 pub fn upsert_chunk_success_trace(
527 &self,
528 chunk_id: &str,
529 trace_id: &str,
530 ts: &str,
531 ) -> Result<()> {
532 self.conn.execute(
533 "INSERT OR IGNORE INTO chunk_success_traces(chunk_id, trace_id, ts) VALUES (?,?,?)",
534 params![chunk_id, trace_id, ts],
535 )?;
536 Ok(())
537 }
538
539 }
541
542pub(crate) fn fts5_match_query(query: &str) -> Option<String> {
548 const STOP: &[&str] = &[
551 "the", "a", "an", "to", "of", "in", "on", "for", "and", "or", "is", "do", "i",
552 ];
553 let mut seen = std::collections::HashSet::new();
554 let tokens: Vec<String> = query
555 .split(|c: char| !c.is_alphanumeric())
556 .filter(|t| t.len() >= 2)
557 .map(|t| t.to_lowercase())
558 .filter(|t| !STOP.contains(&t.as_str()))
559 .filter(|t| seen.insert(t.clone()))
560 .take(32)
561 .map(|t| format!("\"{t}\""))
562 .collect();
563 if tokens.is_empty() {
564 None
565 } else {
566 Some(tokens.join(" OR "))
567 }
568}