1use super::*;
2
3impl Storage {
4 pub fn insert_chunk(&self, c: &ChunkRow) -> Result<()> {
5 self.conn.execute(
6 "INSERT INTO chunks (
7 id, skill_name, seq, content, trigger_desc, anti_trigger_desc,
8 content_hash, token_count, origin, source, maturity, related_ids,
9 protected, state, state_reason, state_updated_at,
10 confidence, confidence_base, confidence_reason, version, distilled_from,
11 distill_provider, distill_model, distill_prompt_version, parent_id,
12 selected_count, used_count, used_success_count,
13 success_trace_ids_count, last_success_at, last_agg_ts,
14 embed_version, created_at, updated_at, last_used_at
15 ) VALUES (
16 ?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,
17 ?13,?14,?15,?16,?17,?18,?19,?20,?21,?22,?23,?24,?25,
18 ?26,?27,?28,?29,?30,?31,?32,?33,?34,?35
19 )",
20 params![
21 c.id,
22 c.skill_name,
23 c.seq,
24 c.content,
25 c.trigger_desc,
26 c.anti_trigger_desc,
27 c.content_hash,
28 c.token_count,
29 c.origin,
30 c.source,
31 c.maturity,
32 c.related_ids,
33 c.protected,
34 c.state,
35 c.state_reason,
36 c.state_updated_at,
37 c.confidence,
38 c.confidence,
39 c.confidence_reason,
40 c.version,
41 c.distilled_from,
42 c.distill_provider,
43 c.distill_model,
44 c.distill_prompt_version,
45 c.parent_id,
46 c.selected_count,
47 c.used_count,
48 c.used_success_count,
49 c.success_trace_ids_count,
50 c.last_success_at,
51 c.last_agg_ts,
52 c.embed_version,
53 c.created_at,
54 c.updated_at,
55 c.last_used_at
56 ],
57 )?;
58 Ok(())
59 }
60
61 pub fn insert_vec_content(&self, chunk_id: &str, emb: &[u8]) -> Result<()> {
62 self.conn.execute(
63 "INSERT OR REPLACE INTO vec_content(chunk_id, embedding) VALUES (?,?)",
64 params![chunk_id, emb],
65 )?;
66 self.note_vector_write(&self.vec_content_cache, chunk_id, emb)
67 }
68
69 pub fn insert_vec_trigger(&self, chunk_id: &str, emb: &[u8]) -> Result<()> {
70 self.conn.execute(
71 "INSERT OR REPLACE INTO vec_trigger(chunk_id, embedding) VALUES (?,?)",
72 params![chunk_id, emb],
73 )?;
74 self.note_vector_write(&self.vec_trigger_cache, chunk_id, emb)
75 }
76
77 fn note_vector_write(&self, cache: &VectorCache, chunk_id: &str, emb: &[u8]) -> Result<()> {
87 self.bump_vector_revision()?;
88 if let Some(entries) = cache.borrow_mut().as_mut() {
89 let mut v = unpack_embedding(emb);
90 l2_normalize(&mut v);
91 match entries.iter_mut().find(|(id, _)| id == chunk_id) {
92 Some(slot) => slot.1 = v,
93 None => entries.push((chunk_id.to_string(), v)),
94 }
95 }
96 self.sync_vector_revision()
97 }
98
99 fn bump_vector_revision(&self) -> Result<()> {
103 self.conn.execute(
104 "INSERT INTO meta(key, value) VALUES ('vector_revision', '1')
105 ON CONFLICT(key) DO UPDATE SET value=CAST(value AS INTEGER)+1",
106 [],
107 )?;
108 Ok(())
109 }
110
111 fn sync_vector_revision(&self) -> Result<()> {
114 let current = self
115 .get_meta("vector_revision")?
116 .and_then(|v| v.parse::<i64>().ok())
117 .unwrap_or(0);
118 self.vector_cache_revision.set(Some(current));
119 Ok(())
120 }
121
122 pub(crate) fn invalidate_vector_caches(&self) {
126 *self.vec_content_cache.borrow_mut() = None;
127 *self.vec_trigger_cache.borrow_mut() = None;
128 self.vector_cache_revision.set(None);
129 }
130
131 pub fn get_chunk(&self, id: &str) -> Result<Option<Value>> {
132 let mut stmt = self.conn.prepare_cached("SELECT * FROM chunks WHERE id=?")?;
133 let row = stmt.query_row([id], row_to_json);
134 match row {
135 Ok(v) => Ok(Some(v)),
136 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
137 Err(e) => Err(e.into()),
138 }
139 }
140
141 pub fn update_chunk_state(
142 &self,
143 id: &str,
144 state: &str,
145 reason: Option<&str>,
146 now: &str,
147 ) -> Result<()> {
148 self.conn.execute(
149 "UPDATE chunks SET state=?, state_reason=?, state_updated_at=?, updated_at=? WHERE id=?",
150 params![state, reason, now, now, id],
151 )?;
152 Ok(())
153 }
154
155 pub fn update_chunk_confidence(
156 &self,
157 id: &str,
158 conf: f64,
159 reason: Option<&str>,
160 now: &str,
161 ) -> Result<()> {
162 self.conn.execute(
163 "UPDATE chunks
164 SET confidence=?, confidence_base=?, confidence_reason=?, updated_at=?
165 WHERE id=?",
166 params![conf, conf, reason, now, id],
167 )?;
168 Ok(())
169 }
170
171 pub fn update_chunk_last_used(&self, id: &str, now: &str) -> Result<()> {
172 self.conn.execute(
173 "UPDATE chunks SET last_used_at=?, updated_at=? WHERE id=?",
174 params![now, now, id],
175 )?;
176 Ok(())
177 }
178
179 pub fn get_chunk_by_hash(&self, hash: &str) -> Result<Option<Value>> {
180 let row = self.conn.query_row(
181 "SELECT * FROM chunks WHERE content_hash=? LIMIT 1",
182 [hash],
183 row_to_json,
184 );
185 match row {
186 Ok(v) => Ok(Some(v)),
187 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
188 Err(e) => Err(e.into()),
189 }
190 }
191
192 pub fn search_vec_content(&self, query: &[f32], limit: usize) -> Result<Vec<(String, f32)>> {
197 self.search_vec(&self.vec_content_cache, "vec_content", query, limit)
198 }
199
200 pub fn search_vec_trigger(&self, query: &[f32], limit: usize) -> Result<Vec<(String, f32)>> {
201 self.search_vec(&self.vec_trigger_cache, "vec_trigger", query, limit)
202 }
203
204 fn search_vec(
205 &self,
206 cache_cell: &VectorCache,
207 table: &str,
208 query: &[f32],
209 limit: usize,
210 ) -> Result<Vec<(String, f32)>> {
211 if limit == 0 {
212 return Ok(Vec::new());
213 }
214 self.refresh_vector_caches_if_changed()?;
215
216 if cache_cell.borrow().is_none() {
220 let sql = format!("SELECT chunk_id, embedding FROM {table}");
221 let mut stmt = self.conn.prepare(&sql)?;
222 let entries: Vec<(String, Vec<f32>)> = stmt
223 .query_map([], |r| {
224 let id: String = r.get(0)?;
225 let blob: Vec<u8> = r.get(1)?;
226 Ok((id, blob))
227 })?
228 .filter_map(|r| r.ok())
229 .map(|(id, blob)| {
230 let mut v = unpack_embedding(&blob);
231 l2_normalize(&mut v);
232 (id, v)
233 })
234 .collect();
235 *cache_cell.borrow_mut() = Some(entries);
236 }
237
238 let cache = cache_cell.borrow();
239 let entries = cache.as_ref().unwrap();
240
241 let mut q = query.to_vec();
244 l2_normalize(&mut q);
245
246 let mut scored: Vec<(usize, f32)> = entries
249 .iter()
250 .enumerate()
251 .map(|(i, (_, v))| (i, dot_product(&q, v)))
252 .collect();
253 if scored.len() > limit {
254 scored.select_nth_unstable_by(limit - 1, |a, b| {
255 b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)
256 });
257 scored.truncate(limit);
258 }
259 scored.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
260 Ok(scored
261 .into_iter()
262 .map(|(i, sim)| (entries[i].0.clone(), sim))
263 .collect())
264 }
265
266 fn refresh_vector_caches_if_changed(&self) -> Result<()> {
267 let current = self
268 .get_meta("vector_revision")?
269 .and_then(|value| value.parse::<i64>().ok())
270 .unwrap_or(0);
271 let previous = self.vector_cache_revision.replace(Some(current));
272 if previous.is_some_and(|revision| revision != current) {
273 *self.vec_content_cache.borrow_mut() = None;
274 *self.vec_trigger_cache.borrow_mut() = None;
275 }
276 Ok(())
277 }
278
279 pub fn get_chunks_by_ids(&self, ids: &[&str]) -> Result<HashMap<String, Value>> {
281 if ids.is_empty() {
282 return Ok(HashMap::new());
283 }
284 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
285 let sql = format!("SELECT * FROM chunks WHERE id IN ({placeholders})");
286 let mut stmt = self.conn.prepare(&sql)?;
287 let names: Vec<String> = stmt.column_names().iter().map(|s| s.to_string()).collect();
288 let rows = stmt.query_map(rusqlite::params_from_iter(ids.iter()), |r| {
289 row_to_json_with_names(r, &names)
290 })?;
291 let mut map = HashMap::with_capacity(ids.len());
292 for row in rows.filter_map(|r| r.ok()) {
293 if let Some(id) = row.get("id").and_then(Value::as_str) {
294 map.insert(id.to_string(), row);
295 }
296 }
297 Ok(map)
298 }
299
300 pub fn is_hash_invalidated(&self, hash: &str) -> Result<bool> {
305 let count: i64 = self.conn.query_row(
306 "SELECT count(*) FROM invalidated_hashes WHERE content_hash=?",
307 [hash],
308 |r| r.get(0),
309 )?;
310 Ok(count > 0)
311 }
312
313 pub fn insert_invalidated_hash(
314 &self,
315 hash: &str,
316 reason: Option<&str>,
317 ts: &str,
318 ) -> Result<()> {
319 self.conn.execute(
320 "INSERT OR IGNORE INTO invalidated_hashes(content_hash, reason, ts) VALUES (?,?,?)",
321 params![hash, reason, ts],
322 )?;
323 Ok(())
324 }
325
326 pub(crate) fn query_chunks(&self, sql: &str) -> Result<Vec<Value>> {
334 self.query_json(sql, params![])
335 }
336
337 pub(crate) fn query_chunks_params<P: rusqlite::Params>(
338 &self,
339 sql: &str,
340 p: P,
341 ) -> Result<Vec<Value>> {
342 self.query_json(sql, p)
343 }
344
345 pub fn get_deps(&self, chunk_id: &str) -> Result<Vec<DepEdge>> {
350 let mut stmt = self
351 .conn
352 .prepare_cached("SELECT dst, kind, dst_lib FROM deps WHERE src=?")?;
353 let rows = stmt.query_map([chunk_id], |r| {
354 Ok((
355 r.get::<_, String>(0)?,
356 r.get::<_, String>(1)?,
357 r.get::<_, Option<String>>(2)?,
358 ))
359 })?;
360 Ok(rows.filter_map(|r| r.ok()).collect())
361 }
362
363 pub fn get_deps_batch(&self, srcs: &[&str]) -> Result<HashMap<String, Vec<DepEdge>>> {
367 if srcs.is_empty() {
368 return Ok(HashMap::new());
369 }
370 let placeholders = srcs.iter().map(|_| "?").collect::<Vec<_>>().join(",");
371 let sql = format!("SELECT src, dst, kind, dst_lib FROM deps WHERE src IN ({placeholders})");
372 let mut stmt = self.conn.prepare(&sql)?;
373 let rows = stmt.query_map(rusqlite::params_from_iter(srcs.iter()), |r| {
374 Ok((
375 r.get::<_, String>(0)?,
376 r.get::<_, String>(1)?,
377 r.get::<_, String>(2)?,
378 r.get::<_, Option<String>>(3)?,
379 ))
380 })?;
381 let mut map: HashMap<String, Vec<(String, String, Option<String>)>> = HashMap::new();
382 for (src, dst, kind, lib) in rows.filter_map(|r| r.ok()) {
383 map.entry(src).or_default().push((dst, kind, lib));
384 }
385 Ok(map)
386 }
387
388 pub fn get_reverse_deps(&self, chunk_id: &str) -> Result<Vec<String>> {
389 let mut stmt = self
390 .conn
391 .prepare_cached("SELECT src FROM deps WHERE dst=?")?;
392 let rows = stmt.query_map([chunk_id], |r| r.get::<_, String>(0))?;
393 Ok(rows.filter_map(|r| r.ok()).collect())
394 }
395
396 pub fn insert_dep(
397 &self,
398 src: &str,
399 dst: &str,
400 kind: &str,
401 dst_lib: Option<&str>,
402 ) -> Result<()> {
403 self.conn.execute(
404 "INSERT OR IGNORE INTO deps(src,dst,kind,dst_lib) VALUES (?,?,?,?)",
405 params![src, dst, kind, dst_lib],
406 )?;
407 Ok(())
408 }
409
410 pub fn upsert_chunk_success_trace(
415 &self,
416 chunk_id: &str,
417 trace_id: &str,
418 ts: &str,
419 ) -> Result<()> {
420 self.conn.execute(
421 "INSERT OR IGNORE INTO chunk_success_traces(chunk_id, trace_id, ts) VALUES (?,?,?)",
422 params![chunk_id, trace_id, ts],
423 )?;
424 Ok(())
425 }
426
427 }