1use super::*;
2
3impl Storage {
4 pub fn insert_chunk(&self, c: &ChunkRow) -> Result<()> {
5 self.conn.execute(
6 "INSERT INTO chunks (
7 id, skill_name, seq, content, trigger_desc, anti_trigger_desc,
8 content_hash, token_count, origin, source, maturity, related_ids,
9 protected, state, state_reason, state_updated_at,
10 confidence, confidence_base, confidence_reason, version, distilled_from,
11 distill_provider, distill_model, distill_prompt_version, parent_id,
12 selected_count, used_count, used_success_count,
13 success_trace_ids_count, last_success_at, last_agg_ts,
14 embed_version, created_at, updated_at, last_used_at, agent
15 ) VALUES (
16 ?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,
17 ?13,?14,?15,?16,?17,?18,?19,?20,?21,?22,?23,?24,?25,
18 ?26,?27,?28,?29,?30,?31,?32,?33,?34,?35,?36
19 )",
20 params![
21 c.id,
22 c.skill_name,
23 c.seq,
24 c.content,
25 c.trigger_desc,
26 c.anti_trigger_desc,
27 c.content_hash,
28 c.token_count,
29 c.origin,
30 c.source,
31 c.maturity,
32 c.related_ids,
33 c.protected,
34 c.state,
35 c.state_reason,
36 c.state_updated_at,
37 c.confidence,
38 c.confidence,
39 c.confidence_reason,
40 c.version,
41 c.distilled_from,
42 c.distill_provider,
43 c.distill_model,
44 c.distill_prompt_version,
45 c.parent_id,
46 c.selected_count,
47 c.used_count,
48 c.used_success_count,
49 c.success_trace_ids_count,
50 c.last_success_at,
51 c.last_agg_ts,
52 c.embed_version,
53 c.created_at,
54 c.updated_at,
55 c.last_used_at,
56 c.agent
57 ],
58 )?;
59 Ok(())
60 }
61
62 pub fn insert_vec_content(&self, chunk_id: &str, emb: &[u8]) -> Result<()> {
63 self.conn.execute(
64 "INSERT OR REPLACE INTO vec_content(chunk_id, embedding) VALUES (?,?)",
65 params![chunk_id, emb],
66 )?;
67 self.note_vector_write(&self.vec_content_cache, chunk_id, emb)
68 }
69
70 pub fn insert_vec_trigger(&self, chunk_id: &str, emb: &[u8]) -> Result<()> {
71 self.conn.execute(
72 "INSERT OR REPLACE INTO vec_trigger(chunk_id, embedding) VALUES (?,?)",
73 params![chunk_id, emb],
74 )?;
75 self.note_vector_write(&self.vec_trigger_cache, chunk_id, emb)
76 }
77
78 fn note_vector_write(&self, cache: &VectorCache, chunk_id: &str, emb: &[u8]) -> Result<()> {
88 self.bump_vector_revision()?;
89 if let Some(entries) = cache.borrow_mut().as_mut() {
90 let mut v = unpack_embedding(emb);
91 l2_normalize(&mut v);
92 match entries.iter_mut().find(|(id, _)| id == chunk_id) {
93 Some(slot) => slot.1 = v,
94 None => entries.push((chunk_id.to_string(), v)),
95 }
96 }
97 self.sync_vector_revision()
98 }
99
100 fn bump_vector_revision(&self) -> Result<()> {
104 self.conn.execute(
105 "INSERT INTO meta(key, value) VALUES ('vector_revision', '1')
106 ON CONFLICT(key) DO UPDATE SET value=CAST(value AS INTEGER)+1",
107 [],
108 )?;
109 Ok(())
110 }
111
112 fn sync_vector_revision(&self) -> Result<()> {
115 let current = self
116 .get_meta("vector_revision")?
117 .and_then(|v| v.parse::<i64>().ok())
118 .unwrap_or(0);
119 self.vector_cache_revision.set(Some(current));
120 Ok(())
121 }
122
123 pub(crate) fn invalidate_vector_caches(&self) {
127 *self.vec_content_cache.borrow_mut() = None;
128 *self.vec_trigger_cache.borrow_mut() = None;
129 self.vector_cache_revision.set(None);
130 }
131
132 pub fn list_chunks(
136 &self,
137 state: Option<&str>,
138 origin: Option<&str>,
139 limit: usize,
140 offset: usize,
141 ) -> Result<Vec<Value>> {
142 let mut sql = String::from(
143 "SELECT id, skill_name, seq, origin, state, state_reason, maturity, \
144 confidence, token_count, protected, selected_count, used_count, \
145 used_success_count, substr(content, 1, 280) AS content_preview, \
146 created_at, updated_at, last_used_at \
147 FROM chunks",
148 );
149 let mut clauses: Vec<&str> = Vec::new();
150 if state.is_some() {
151 clauses.push("state = :state");
152 }
153 if origin.is_some() {
154 clauses.push("origin = :origin");
155 }
156 if !clauses.is_empty() {
157 sql.push_str(" WHERE ");
158 sql.push_str(&clauses.join(" AND "));
159 }
160 sql.push_str(" ORDER BY created_at DESC LIMIT :limit OFFSET :offset");
161
162 let mut stmt = self.conn.prepare(&sql)?;
163 let names: Vec<String> = stmt.column_names().into_iter().map(String::from).collect();
164 let mut params: Vec<(&str, &dyn rusqlite::ToSql)> = Vec::new();
165 if let Some(s) = state.as_ref() {
166 params.push((":state", s));
167 }
168 if let Some(o) = origin.as_ref() {
169 params.push((":origin", o));
170 }
171 let limit_i = limit as i64;
172 let offset_i = offset as i64;
173 params.push((":limit", &limit_i));
174 params.push((":offset", &offset_i));
175
176 let rows = stmt.query_map(params.as_slice(), |r| row_to_json_with_names(r, &names))?;
177 let mut out = Vec::new();
178 for row in rows {
179 out.push(row?);
180 }
181 Ok(out)
182 }
183
184 pub fn get_chunk(&self, id: &str) -> Result<Option<Value>> {
185 let mut stmt = self
186 .conn
187 .prepare_cached("SELECT * FROM chunks WHERE id=?")?;
188 let row = stmt.query_row([id], row_to_json);
189 match row {
190 Ok(v) => Ok(Some(v)),
191 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
192 Err(e) => Err(e.into()),
193 }
194 }
195
196 pub fn update_chunk_state(
197 &self,
198 id: &str,
199 state: &str,
200 reason: Option<&str>,
201 now: &str,
202 ) -> Result<()> {
203 self.conn.execute(
204 "UPDATE chunks SET state=?, state_reason=?, state_updated_at=?, updated_at=? WHERE id=?",
205 params![state, reason, now, now, id],
206 )?;
207 Ok(())
208 }
209
210 pub fn update_chunk_confidence(
211 &self,
212 id: &str,
213 conf: f64,
214 reason: Option<&str>,
215 now: &str,
216 ) -> Result<()> {
217 self.conn.execute(
218 "UPDATE chunks
219 SET confidence=?, confidence_base=?, confidence_reason=?, updated_at=?
220 WHERE id=?",
221 params![conf, conf, reason, now, id],
222 )?;
223 Ok(())
224 }
225
226 pub fn update_chunk_last_used(&self, id: &str, now: &str) -> Result<()> {
227 self.conn.execute(
228 "UPDATE chunks SET last_used_at=?, updated_at=? WHERE id=?",
229 params![now, now, id],
230 )?;
231 Ok(())
232 }
233
234 pub fn get_chunk_by_hash(&self, hash: &str) -> Result<Option<Value>> {
235 let row = self.conn.query_row(
236 "SELECT * FROM chunks WHERE content_hash=? LIMIT 1",
237 [hash],
238 row_to_json,
239 );
240 match row {
241 Ok(v) => Ok(Some(v)),
242 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
243 Err(e) => Err(e.into()),
244 }
245 }
246
247 pub fn search_vec_content(&self, query: &[f32], limit: usize) -> Result<Vec<(String, f32)>> {
252 self.search_vec(&self.vec_content_cache, "vec_content", query, limit)
253 }
254
255 pub fn search_vec_trigger(&self, query: &[f32], limit: usize) -> Result<Vec<(String, f32)>> {
256 self.search_vec(&self.vec_trigger_cache, "vec_trigger", query, limit)
257 }
258
259 fn search_vec(
260 &self,
261 cache_cell: &VectorCache,
262 table: &str,
263 query: &[f32],
264 limit: usize,
265 ) -> Result<Vec<(String, f32)>> {
266 if limit == 0 {
267 return Ok(Vec::new());
268 }
269 self.refresh_vector_caches_if_changed()?;
270
271 if cache_cell.borrow().is_none() {
275 let sql = format!("SELECT chunk_id, embedding FROM {table}");
276 let mut stmt = self.conn.prepare(&sql)?;
277 let raw: Vec<(String, Vec<u8>)> = stmt
278 .query_map([], |r| {
279 Ok((r.get::<_, String>(0)?, r.get::<_, Vec<u8>>(1)?))
280 })?
281 .collect::<rusqlite::Result<Vec<_>>>()?;
282 let mut entries: Vec<(String, Vec<f32>)> = Vec::with_capacity(raw.len());
283 for (id, blob) in raw {
284 if blob.is_empty() || blob.len() % 4 != 0 {
288 return Err(crate::errors::InnateError::Other(format!(
289 "corrupt embedding for chunk {id} in {table}: {} bytes (not a non-zero multiple of 4)",
290 blob.len()
291 )));
292 }
293 let mut v = unpack_embedding(&blob);
294 l2_normalize(&mut v);
295 entries.push((id, v));
296 }
297 *cache_cell.borrow_mut() = Some(entries);
298 }
299
300 let cache = cache_cell.borrow();
301 let entries = cache.as_ref().unwrap();
302
303 let mut q = query.to_vec();
306 l2_normalize(&mut q);
307
308 let mut scored: Vec<(usize, f32)> = entries
314 .iter()
315 .enumerate()
316 .filter(|(_, (_, v))| v.len() == q.len())
317 .map(|(i, (_, v))| (i, dot_product(&q, v)))
318 .collect();
319 if scored.len() > limit {
320 scored.select_nth_unstable_by(limit - 1, |a, b| {
321 b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)
322 });
323 scored.truncate(limit);
324 }
325 scored.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
326 Ok(scored
327 .into_iter()
328 .map(|(i, sim)| (entries[i].0.clone(), sim))
329 .collect())
330 }
331
332 pub fn search_lexical(&self, query: &str, limit: usize) -> Result<Vec<(String, f32)>> {
338 if limit == 0 {
339 return Ok(Vec::new());
340 }
341 let Some(match_expr) = fts5_match_query(query) else {
342 return Ok(Vec::new());
343 };
344 let mut stmt = self.conn.prepare_cached(
345 "SELECT chunks_fts.id, bm25(chunks_fts) AS score
346 FROM chunks_fts
347 JOIN chunks c ON c.id = chunks_fts.id
348 WHERE chunks_fts MATCH ?1
349 AND c.state != 'archived' AND c.origin != 'spark'
350 ORDER BY score ASC
351 LIMIT ?2",
352 )?;
353 let rows = stmt.query_map(params![match_expr, limit as i64], |r| {
354 Ok((r.get::<_, String>(0)?, r.get::<_, f64>(1)?))
355 })?;
356 let raw: Vec<(String, f64)> = rows.collect::<rusqlite::Result<Vec<_>>>()?;
360 let best_rel = raw.iter().map(|(_, s)| -s).fold(f64::MIN, f64::max);
361 let out = raw
362 .into_iter()
363 .enumerate()
364 .map(|(i, (id, score))| {
365 let sim = if best_rel > 0.0 {
366 (-score / best_rel).clamp(0.0, 1.0) as f32
367 } else {
368 ((limit - i) as f32) / (limit as f32)
370 };
371 (id, sim)
372 })
373 .collect();
374 Ok(out)
375 }
376
377 fn refresh_vector_caches_if_changed(&self) -> Result<()> {
378 let current = self
379 .get_meta("vector_revision")?
380 .and_then(|value| value.parse::<i64>().ok())
381 .unwrap_or(0);
382 let previous = self.vector_cache_revision.replace(Some(current));
383 if previous.is_some_and(|revision| revision != current) {
384 *self.vec_content_cache.borrow_mut() = None;
385 *self.vec_trigger_cache.borrow_mut() = None;
386 }
387 Ok(())
388 }
389
390 pub fn get_chunks_by_ids(&self, ids: &[&str]) -> Result<HashMap<String, Value>> {
392 if ids.is_empty() {
393 return Ok(HashMap::new());
394 }
395 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
396 let sql = format!("SELECT * FROM chunks WHERE id IN ({placeholders})");
397 let mut stmt = self.conn.prepare(&sql)?;
398 let names: Vec<String> = stmt.column_names().iter().map(|s| s.to_string()).collect();
399 let rows = stmt.query_map(rusqlite::params_from_iter(ids.iter()), |r| {
400 row_to_json_with_names(r, &names)
401 })?;
402 let mut map = HashMap::with_capacity(ids.len());
403 for row in rows {
404 let row = row?;
405 if let Some(id) = row.get("id").and_then(Value::as_str) {
406 map.insert(id.to_string(), row);
407 }
408 }
409 Ok(map)
410 }
411
412 pub fn is_hash_invalidated(&self, hash: &str) -> Result<bool> {
417 let count: i64 = self.conn.query_row(
418 "SELECT count(*) FROM invalidated_hashes WHERE content_hash=?",
419 [hash],
420 |r| r.get(0),
421 )?;
422 Ok(count > 0)
423 }
424
425 pub fn insert_invalidated_hash(
426 &self,
427 hash: &str,
428 reason: Option<&str>,
429 ts: &str,
430 ) -> Result<()> {
431 self.conn.execute(
432 "INSERT OR IGNORE INTO invalidated_hashes(content_hash, reason, ts) VALUES (?,?,?)",
433 params![hash, reason, ts],
434 )?;
435 Ok(())
436 }
437
438 pub(crate) fn query_chunks(&self, sql: &str) -> Result<Vec<Value>> {
446 self.query_json(sql, params![])
447 }
448
449 pub(crate) fn query_chunks_params<P: rusqlite::Params>(
450 &self,
451 sql: &str,
452 p: P,
453 ) -> Result<Vec<Value>> {
454 self.query_json(sql, p)
455 }
456
457 pub fn get_deps(&self, chunk_id: &str) -> Result<Vec<DepEdge>> {
462 let mut stmt = self
463 .conn
464 .prepare_cached("SELECT dst, kind, dst_lib FROM deps WHERE src=?")?;
465 let rows = stmt.query_map([chunk_id], |r| {
466 Ok((
467 r.get::<_, String>(0)?,
468 r.get::<_, String>(1)?,
469 r.get::<_, Option<String>>(2)?,
470 ))
471 })?;
472 Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
473 }
474
475 pub fn get_deps_batch(&self, srcs: &[&str]) -> Result<HashMap<String, Vec<DepEdge>>> {
479 if srcs.is_empty() {
480 return Ok(HashMap::new());
481 }
482 let placeholders = srcs.iter().map(|_| "?").collect::<Vec<_>>().join(",");
483 let sql = format!("SELECT src, dst, kind, dst_lib FROM deps WHERE src IN ({placeholders})");
484 let mut stmt = self.conn.prepare(&sql)?;
485 let rows = stmt.query_map(rusqlite::params_from_iter(srcs.iter()), |r| {
486 Ok((
487 r.get::<_, String>(0)?,
488 r.get::<_, String>(1)?,
489 r.get::<_, String>(2)?,
490 r.get::<_, Option<String>>(3)?,
491 ))
492 })?;
493 let mut map: HashMap<String, Vec<(String, String, Option<String>)>> = HashMap::new();
494 for row in rows {
495 let (src, dst, kind, lib) = row?;
496 map.entry(src).or_default().push((dst, kind, lib));
497 }
498 Ok(map)
499 }
500
501 pub fn get_reverse_deps(&self, chunk_id: &str) -> Result<Vec<String>> {
502 let mut stmt = self
503 .conn
504 .prepare_cached("SELECT src FROM deps WHERE dst=?")?;
505 let rows = stmt.query_map([chunk_id], |r| r.get::<_, String>(0))?;
506 Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
507 }
508
509 pub fn insert_dep(
510 &self,
511 src: &str,
512 dst: &str,
513 kind: &str,
514 dst_lib: Option<&str>,
515 ) -> Result<()> {
516 self.conn.execute(
517 "INSERT OR IGNORE INTO deps(src,dst,kind,dst_lib) VALUES (?,?,?,?)",
518 params![src, dst, kind, dst_lib],
519 )?;
520 Ok(())
521 }
522
523 pub fn upsert_chunk_success_trace(
528 &self,
529 chunk_id: &str,
530 trace_id: &str,
531 ts: &str,
532 ) -> Result<()> {
533 self.conn.execute(
534 "INSERT OR IGNORE INTO chunk_success_traces(chunk_id, trace_id, ts) VALUES (?,?,?)",
535 params![chunk_id, trace_id, ts],
536 )?;
537 Ok(())
538 }
539
540 }
542
543pub(crate) fn fts5_match_query(query: &str) -> Option<String> {
549 const STOP: &[&str] = &[
552 "the", "a", "an", "to", "of", "in", "on", "for", "and", "or", "is", "do", "i",
553 ];
554 let mut seen = std::collections::HashSet::new();
555 let tokens: Vec<String> = query
556 .split(|c: char| !c.is_alphanumeric())
557 .filter(|t| t.len() >= 2)
558 .map(|t| t.to_lowercase())
559 .filter(|t| !STOP.contains(&t.as_str()))
560 .filter(|t| seen.insert(t.clone()))
561 .take(32)
562 .map(|t| format!("\"{t}\""))
563 .collect();
564 if tokens.is_empty() {
565 None
566 } else {
567 Some(tokens.join(" OR "))
568 }
569}