1use super::*;
2
3impl Storage {
4 pub fn insert_chunk(&self, c: &ChunkRow) -> Result<()> {
5 self.conn.execute(
6 "INSERT INTO chunks (
7 id, skill_name, seq, content, trigger_desc, anti_trigger_desc,
8 content_hash, token_count, origin, source, maturity, related_ids,
9 protected, state, state_reason, state_updated_at,
10 confidence, confidence_base, confidence_reason, version, distilled_from,
11 distill_provider, distill_model, distill_prompt_version, parent_id,
12 selected_count, used_count, used_success_count,
13 success_trace_ids_count, last_success_at, last_agg_ts,
14 embed_version, created_at, updated_at, last_used_at
15 ) VALUES (
16 ?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,
17 ?13,?14,?15,?16,?17,?18,?19,?20,?21,?22,?23,?24,?25,
18 ?26,?27,?28,?29,?30,?31,?32,?33,?34,?35
19 )",
20 params![
21 c.id,
22 c.skill_name,
23 c.seq,
24 c.content,
25 c.trigger_desc,
26 c.anti_trigger_desc,
27 c.content_hash,
28 c.token_count,
29 c.origin,
30 c.source,
31 c.maturity,
32 c.related_ids,
33 c.protected,
34 c.state,
35 c.state_reason,
36 c.state_updated_at,
37 c.confidence,
38 c.confidence,
39 c.confidence_reason,
40 c.version,
41 c.distilled_from,
42 c.distill_provider,
43 c.distill_model,
44 c.distill_prompt_version,
45 c.parent_id,
46 c.selected_count,
47 c.used_count,
48 c.used_success_count,
49 c.success_trace_ids_count,
50 c.last_success_at,
51 c.last_agg_ts,
52 c.embed_version,
53 c.created_at,
54 c.updated_at,
55 c.last_used_at
56 ],
57 )?;
58 Ok(())
59 }
60
61 pub fn insert_vec_content(&self, chunk_id: &str, emb: &[u8]) -> Result<()> {
62 self.conn.execute(
63 "INSERT OR REPLACE INTO vec_content(chunk_id, embedding) VALUES (?,?)",
64 params![chunk_id, emb],
65 )?;
66 self.note_vector_write(&self.vec_content_cache, chunk_id, emb)
67 }
68
69 pub fn insert_vec_trigger(&self, chunk_id: &str, emb: &[u8]) -> Result<()> {
70 self.conn.execute(
71 "INSERT OR REPLACE INTO vec_trigger(chunk_id, embedding) VALUES (?,?)",
72 params![chunk_id, emb],
73 )?;
74 self.note_vector_write(&self.vec_trigger_cache, chunk_id, emb)
75 }
76
77 fn note_vector_write(&self, cache: &VectorCache, chunk_id: &str, emb: &[u8]) -> Result<()> {
87 self.bump_vector_revision()?;
88 if let Some(entries) = cache.borrow_mut().as_mut() {
89 let mut v = unpack_embedding(emb);
90 l2_normalize(&mut v);
91 match entries.iter_mut().find(|(id, _)| id == chunk_id) {
92 Some(slot) => slot.1 = v,
93 None => entries.push((chunk_id.to_string(), v)),
94 }
95 }
96 self.sync_vector_revision()
97 }
98
99 fn bump_vector_revision(&self) -> Result<()> {
103 self.conn.execute(
104 "INSERT INTO meta(key, value) VALUES ('vector_revision', '1')
105 ON CONFLICT(key) DO UPDATE SET value=CAST(value AS INTEGER)+1",
106 [],
107 )?;
108 Ok(())
109 }
110
111 fn sync_vector_revision(&self) -> Result<()> {
114 let current = self
115 .get_meta("vector_revision")?
116 .and_then(|v| v.parse::<i64>().ok())
117 .unwrap_or(0);
118 self.vector_cache_revision.set(Some(current));
119 Ok(())
120 }
121
122 pub(crate) fn invalidate_vector_caches(&self) {
126 *self.vec_content_cache.borrow_mut() = None;
127 *self.vec_trigger_cache.borrow_mut() = None;
128 self.vector_cache_revision.set(None);
129 }
130
131 pub fn list_chunks(
135 &self,
136 state: Option<&str>,
137 origin: Option<&str>,
138 limit: usize,
139 offset: usize,
140 ) -> Result<Vec<Value>> {
141 let mut sql = String::from(
142 "SELECT id, skill_name, seq, origin, state, state_reason, maturity, \
143 confidence, token_count, protected, selected_count, used_count, \
144 used_success_count, substr(content, 1, 280) AS content_preview, \
145 created_at, updated_at, last_used_at \
146 FROM chunks",
147 );
148 let mut clauses: Vec<&str> = Vec::new();
149 if state.is_some() {
150 clauses.push("state = :state");
151 }
152 if origin.is_some() {
153 clauses.push("origin = :origin");
154 }
155 if !clauses.is_empty() {
156 sql.push_str(" WHERE ");
157 sql.push_str(&clauses.join(" AND "));
158 }
159 sql.push_str(" ORDER BY created_at DESC LIMIT :limit OFFSET :offset");
160
161 let mut stmt = self.conn.prepare(&sql)?;
162 let names: Vec<String> = stmt
163 .column_names()
164 .into_iter()
165 .map(String::from)
166 .collect();
167 let mut params: Vec<(&str, &dyn rusqlite::ToSql)> = Vec::new();
168 if let Some(s) = state.as_ref() {
169 params.push((":state", s));
170 }
171 if let Some(o) = origin.as_ref() {
172 params.push((":origin", o));
173 }
174 let limit_i = limit as i64;
175 let offset_i = offset as i64;
176 params.push((":limit", &limit_i));
177 params.push((":offset", &offset_i));
178
179 let rows = stmt.query_map(params.as_slice(), |r| row_to_json_with_names(r, &names))?;
180 let mut out = Vec::new();
181 for row in rows {
182 out.push(row?);
183 }
184 Ok(out)
185 }
186
187 pub fn get_chunk(&self, id: &str) -> Result<Option<Value>> {
188 let mut stmt = self.conn.prepare_cached("SELECT * FROM chunks WHERE id=?")?;
189 let row = stmt.query_row([id], row_to_json);
190 match row {
191 Ok(v) => Ok(Some(v)),
192 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
193 Err(e) => Err(e.into()),
194 }
195 }
196
197 pub fn update_chunk_state(
198 &self,
199 id: &str,
200 state: &str,
201 reason: Option<&str>,
202 now: &str,
203 ) -> Result<()> {
204 self.conn.execute(
205 "UPDATE chunks SET state=?, state_reason=?, state_updated_at=?, updated_at=? WHERE id=?",
206 params![state, reason, now, now, id],
207 )?;
208 Ok(())
209 }
210
211 pub fn update_chunk_confidence(
212 &self,
213 id: &str,
214 conf: f64,
215 reason: Option<&str>,
216 now: &str,
217 ) -> Result<()> {
218 self.conn.execute(
219 "UPDATE chunks
220 SET confidence=?, confidence_base=?, confidence_reason=?, updated_at=?
221 WHERE id=?",
222 params![conf, conf, reason, now, id],
223 )?;
224 Ok(())
225 }
226
227 pub fn update_chunk_last_used(&self, id: &str, now: &str) -> Result<()> {
228 self.conn.execute(
229 "UPDATE chunks SET last_used_at=?, updated_at=? WHERE id=?",
230 params![now, now, id],
231 )?;
232 Ok(())
233 }
234
235 pub fn get_chunk_by_hash(&self, hash: &str) -> Result<Option<Value>> {
236 let row = self.conn.query_row(
237 "SELECT * FROM chunks WHERE content_hash=? LIMIT 1",
238 [hash],
239 row_to_json,
240 );
241 match row {
242 Ok(v) => Ok(Some(v)),
243 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
244 Err(e) => Err(e.into()),
245 }
246 }
247
248 pub fn search_vec_content(&self, query: &[f32], limit: usize) -> Result<Vec<(String, f32)>> {
253 self.search_vec(&self.vec_content_cache, "vec_content", query, limit)
254 }
255
256 pub fn search_vec_trigger(&self, query: &[f32], limit: usize) -> Result<Vec<(String, f32)>> {
257 self.search_vec(&self.vec_trigger_cache, "vec_trigger", query, limit)
258 }
259
260 fn search_vec(
261 &self,
262 cache_cell: &VectorCache,
263 table: &str,
264 query: &[f32],
265 limit: usize,
266 ) -> Result<Vec<(String, f32)>> {
267 if limit == 0 {
268 return Ok(Vec::new());
269 }
270 self.refresh_vector_caches_if_changed()?;
271
272 if cache_cell.borrow().is_none() {
276 let sql = format!("SELECT chunk_id, embedding FROM {table}");
277 let mut stmt = self.conn.prepare(&sql)?;
278 let raw: Vec<(String, Vec<u8>)> = stmt
279 .query_map([], |r| {
280 Ok((r.get::<_, String>(0)?, r.get::<_, Vec<u8>>(1)?))
281 })?
282 .collect::<rusqlite::Result<Vec<_>>>()?;
283 let mut entries: Vec<(String, Vec<f32>)> = Vec::with_capacity(raw.len());
284 for (id, blob) in raw {
285 if blob.is_empty() || blob.len() % 4 != 0 {
289 return Err(crate::errors::InnateError::Other(format!(
290 "corrupt embedding for chunk {id} in {table}: {} bytes (not a non-zero multiple of 4)",
291 blob.len()
292 )));
293 }
294 let mut v = unpack_embedding(&blob);
295 l2_normalize(&mut v);
296 entries.push((id, v));
297 }
298 *cache_cell.borrow_mut() = Some(entries);
299 }
300
301 let cache = cache_cell.borrow();
302 let entries = cache.as_ref().unwrap();
303
304 let mut q = query.to_vec();
307 l2_normalize(&mut q);
308
309 let mut scored: Vec<(usize, f32)> = entries
315 .iter()
316 .enumerate()
317 .filter(|(_, (_, v))| v.len() == q.len())
318 .map(|(i, (_, v))| (i, dot_product(&q, v)))
319 .collect();
320 if scored.len() > limit {
321 scored.select_nth_unstable_by(limit - 1, |a, b| {
322 b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)
323 });
324 scored.truncate(limit);
325 }
326 scored.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
327 Ok(scored
328 .into_iter()
329 .map(|(i, sim)| (entries[i].0.clone(), sim))
330 .collect())
331 }
332
333 fn refresh_vector_caches_if_changed(&self) -> Result<()> {
334 let current = self
335 .get_meta("vector_revision")?
336 .and_then(|value| value.parse::<i64>().ok())
337 .unwrap_or(0);
338 let previous = self.vector_cache_revision.replace(Some(current));
339 if previous.is_some_and(|revision| revision != current) {
340 *self.vec_content_cache.borrow_mut() = None;
341 *self.vec_trigger_cache.borrow_mut() = None;
342 }
343 Ok(())
344 }
345
346 pub fn get_chunks_by_ids(&self, ids: &[&str]) -> Result<HashMap<String, Value>> {
348 if ids.is_empty() {
349 return Ok(HashMap::new());
350 }
351 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
352 let sql = format!("SELECT * FROM chunks WHERE id IN ({placeholders})");
353 let mut stmt = self.conn.prepare(&sql)?;
354 let names: Vec<String> = stmt.column_names().iter().map(|s| s.to_string()).collect();
355 let rows = stmt.query_map(rusqlite::params_from_iter(ids.iter()), |r| {
356 row_to_json_with_names(r, &names)
357 })?;
358 let mut map = HashMap::with_capacity(ids.len());
359 for row in rows {
360 let row = row?;
361 if let Some(id) = row.get("id").and_then(Value::as_str) {
362 map.insert(id.to_string(), row);
363 }
364 }
365 Ok(map)
366 }
367
368 pub fn is_hash_invalidated(&self, hash: &str) -> Result<bool> {
373 let count: i64 = self.conn.query_row(
374 "SELECT count(*) FROM invalidated_hashes WHERE content_hash=?",
375 [hash],
376 |r| r.get(0),
377 )?;
378 Ok(count > 0)
379 }
380
381 pub fn insert_invalidated_hash(
382 &self,
383 hash: &str,
384 reason: Option<&str>,
385 ts: &str,
386 ) -> Result<()> {
387 self.conn.execute(
388 "INSERT OR IGNORE INTO invalidated_hashes(content_hash, reason, ts) VALUES (?,?,?)",
389 params![hash, reason, ts],
390 )?;
391 Ok(())
392 }
393
394 pub(crate) fn query_chunks(&self, sql: &str) -> Result<Vec<Value>> {
402 self.query_json(sql, params![])
403 }
404
405 pub(crate) fn query_chunks_params<P: rusqlite::Params>(
406 &self,
407 sql: &str,
408 p: P,
409 ) -> Result<Vec<Value>> {
410 self.query_json(sql, p)
411 }
412
413 pub fn get_deps(&self, chunk_id: &str) -> Result<Vec<DepEdge>> {
418 let mut stmt = self
419 .conn
420 .prepare_cached("SELECT dst, kind, dst_lib FROM deps WHERE src=?")?;
421 let rows = stmt.query_map([chunk_id], |r| {
422 Ok((
423 r.get::<_, String>(0)?,
424 r.get::<_, String>(1)?,
425 r.get::<_, Option<String>>(2)?,
426 ))
427 })?;
428 Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
429 }
430
431 pub fn get_deps_batch(&self, srcs: &[&str]) -> Result<HashMap<String, Vec<DepEdge>>> {
435 if srcs.is_empty() {
436 return Ok(HashMap::new());
437 }
438 let placeholders = srcs.iter().map(|_| "?").collect::<Vec<_>>().join(",");
439 let sql = format!("SELECT src, dst, kind, dst_lib FROM deps WHERE src IN ({placeholders})");
440 let mut stmt = self.conn.prepare(&sql)?;
441 let rows = stmt.query_map(rusqlite::params_from_iter(srcs.iter()), |r| {
442 Ok((
443 r.get::<_, String>(0)?,
444 r.get::<_, String>(1)?,
445 r.get::<_, String>(2)?,
446 r.get::<_, Option<String>>(3)?,
447 ))
448 })?;
449 let mut map: HashMap<String, Vec<(String, String, Option<String>)>> = HashMap::new();
450 for row in rows {
451 let (src, dst, kind, lib) = row?;
452 map.entry(src).or_default().push((dst, kind, lib));
453 }
454 Ok(map)
455 }
456
457 pub fn get_reverse_deps(&self, chunk_id: &str) -> Result<Vec<String>> {
458 let mut stmt = self
459 .conn
460 .prepare_cached("SELECT src FROM deps WHERE dst=?")?;
461 let rows = stmt.query_map([chunk_id], |r| r.get::<_, String>(0))?;
462 Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
463 }
464
465 pub fn insert_dep(
466 &self,
467 src: &str,
468 dst: &str,
469 kind: &str,
470 dst_lib: Option<&str>,
471 ) -> Result<()> {
472 self.conn.execute(
473 "INSERT OR IGNORE INTO deps(src,dst,kind,dst_lib) VALUES (?,?,?,?)",
474 params![src, dst, kind, dst_lib],
475 )?;
476 Ok(())
477 }
478
479 pub fn upsert_chunk_success_trace(
484 &self,
485 chunk_id: &str,
486 trace_id: &str,
487 ts: &str,
488 ) -> Result<()> {
489 self.conn.execute(
490 "INSERT OR IGNORE INTO chunk_success_traces(chunk_id, trace_id, ts) VALUES (?,?,?)",
491 params![chunk_id, trace_id, ts],
492 )?;
493 Ok(())
494 }
495
496 }