1use super::*;
2
3impl Storage {
4 #[allow(clippy::too_many_arguments)]
5 pub fn insert_usage_trace(
6 &self,
7 trace_id: &str,
8 chunk_id: Option<&str>,
9 event: &str,
10 strength: f64,
11 similarity: Option<f64>,
12 refine_mode: Option<&str>,
13 tokens: Option<i64>,
14 rank: Option<i64>,
15 attribution: Option<&str>,
16 source: &str,
17 ts: &str,
18 ) -> Result<usize> {
19 let mut stmt = self.conn.prepare_cached(
20 "INSERT OR IGNORE INTO usage_trace
21 (trace_id, chunk_id, event, strength, similarity, refine_mode, tokens, rank, attribution, source, ts)
22 VALUES (?,?,?,?,?,?,?,?,?,?,?)",
23 )?;
24 Ok(stmt.execute(
25 params![trace_id, chunk_id, event, strength, similarity, refine_mode, tokens, rank, attribution, source, ts],
26 )?)
27 }
28
29 pub fn replace_used_trace(
30 &self,
31 trace_id: &str,
32 used_ids: &[String],
33 strength: f64,
34 attribution: &str,
35 source: &str,
36 ts: &str,
37 ) -> Result<()> {
38 self.conn.execute(
39 "DELETE FROM usage_trace WHERE trace_id=? AND event='used'",
40 [trace_id],
41 )?;
42 for chunk_id in used_ids {
43 self.insert_usage_trace(
44 trace_id,
45 Some(chunk_id),
46 "used",
47 strength,
48 None,
49 None,
50 None,
51 None,
52 Some(attribution),
53 source,
54 ts,
55 )?;
56 }
57 Ok(())
58 }
59
60 pub fn merge_used_trace(
61 &self,
62 trace_id: &str,
63 used_ids: &[String],
64 strength: f64,
65 attribution: &str,
66 source: &str,
67 ts: &str,
68 ) -> Result<()> {
69 if used_ids.is_empty() {
70 return Ok(());
71 }
72 let attribution_rank = |value: &str| match value {
73 "explicit" => 3,
74 "cited" => 2,
75 "inferred" => 1,
76 _ => 0,
77 };
78
79 let placeholders = used_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
82 let sql = format!(
83 "SELECT chunk_id, attribution FROM usage_trace
84 WHERE trace_id=? AND event='used' AND chunk_id IN ({placeholders})"
85 );
86 let mut qparams: Vec<&str> = Vec::with_capacity(used_ids.len() + 1);
87 qparams.push(trace_id);
88 qparams.extend(used_ids.iter().map(String::as_str));
89 let existing: HashMap<String, String> = {
90 let mut stmt = self.conn.prepare(&sql)?;
91 let rows = stmt.query_map(rusqlite::params_from_iter(qparams.iter()), |r| {
92 let id: String = r.get(0)?;
93 let attr: Option<String> = r.get(1)?;
94 Ok((id, attr.unwrap_or_else(|| "inferred".to_string())))
95 })?;
96 rows.filter_map(|r| r.ok()).collect()
97 };
98
99 for chunk_id in used_ids {
100 match existing.get(chunk_id) {
101 Some(existing_attribution) => {
102 if attribution_rank(attribution) > attribution_rank(existing_attribution) {
103 self.conn.execute(
104 "UPDATE usage_trace
105 SET strength=?, attribution=?, source=?, ts=?
106 WHERE trace_id=? AND chunk_id=? AND event='used'",
107 params![strength, attribution, source, ts, trace_id, chunk_id],
108 )?;
109 }
110 }
111 None => {
112 self.insert_usage_trace(
113 trace_id,
114 Some(chunk_id),
115 "used",
116 strength,
117 None,
118 None,
119 None,
120 None,
121 Some(attribution),
122 source,
123 ts,
124 )?;
125 }
126 }
127 }
128 Ok(())
129 }
130
131 pub fn refresh_chunk_last_used(&self, chunk_id: &str, now: &str) -> Result<()> {
132 self.conn.execute(
133 "UPDATE chunks
134 SET last_used_at=COALESCE(
135 (SELECT MAX(ts) FROM usage_trace
136 WHERE chunk_id=? AND event='used'
137 AND ts > COALESCE(chunks.evidence_cutoff_at, '')),
138 last_used_base
139 ),
140 updated_at=?
141 WHERE id=?",
142 params![chunk_id, now, chunk_id],
143 )?;
144 Ok(())
145 }
146
147 pub fn get_outcome_for_trace(&self, trace_id: &str) -> Result<Option<String>> {
148 let row = self.conn.query_row(
149 "SELECT event FROM usage_trace
150 WHERE trace_id=? AND event IN ('task_ok','task_fail') AND chunk_id IS NULL
151 LIMIT 1",
152 [trace_id],
153 |r| r.get::<_, String>(0),
154 );
155 match row {
156 Ok(v) => Ok(Some(v)),
157 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
158 Err(e) => Err(e.into()),
159 }
160 }
161
162 pub fn purge_usage_trace(&self, before_ts: &str) -> Result<usize> {
163 let n = self.conn.execute(
165 "DELETE FROM usage_trace
166 WHERE ts < ?
167 AND event IN ('retrieved','refined')
168 AND NOT (event = 'retrieved'
169 AND chunk_id IN (SELECT id FROM chunks WHERE origin='spark'))",
170 [before_ts],
171 )?;
172 Ok(n)
173 }
174
175 pub fn upsert_episodic_log(&self, log: &EpisodicLogRow) -> Result<()> {
180 self.conn.execute(
181 "INSERT OR REPLACE INTO episodic_log
182 (id, trace_id, lib_id, ts, query, recall_snapshot, output,
183 output_summary, outcome, event_source, task_state, completed_at,
184 usage_state, used_ids, used_attribution, used_complete, context_key, nomination, priority,
185 distill_state, distill_note, distill_attempts, distill_last_failed_at)
186 VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,?18,?19,?20,?21,0,NULL)",
187 params![
188 log.id,
189 log.trace_id,
190 log.lib_id,
191 log.ts,
192 log.query,
193 log.recall_snapshot,
194 log.output,
195 log.output_summary,
196 log.outcome,
197 log.event_source,
198 log.task_state,
199 log.completed_at,
200 log.usage_state,
201 log.used_ids,
202 log.used_attribution,
203 i64::from(log.used_complete),
204 log.context_key,
205 log.nomination,
206 log.priority,
207 log.distill_state,
208 log.distill_note
209 ],
210 )?;
211 Ok(())
212 }
213
214 pub fn get_episodic_log(&self, trace_id: &str) -> Result<Option<Value>> {
215 let row = self.conn.query_row(
216 "SELECT * FROM episodic_log WHERE trace_id=?",
217 [trace_id],
218 row_to_json,
219 );
220 match row {
221 Ok(v) => Ok(Some(v)),
222 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
223 Err(e) => Err(e.into()),
224 }
225 }
226
227 pub fn update_episodic_log_state(
228 &self,
229 trace_id: &str,
230 state: &str,
231 note: Option<&str>,
232 outcome: Option<&str>,
233 ) -> Result<()> {
234 self.conn.execute(
235 "UPDATE episodic_log
236 SET distill_state=?, distill_note=COALESCE(?,distill_note),
237 outcome=COALESCE(?,outcome),
238 distill_run_id=NULL, distill_locked_at=NULL
239 WHERE trace_id=?",
240 params![state, note, outcome, trace_id],
241 )?;
242 Ok(())
243 }
244
245 pub fn patch_episodic_log_content(
247 &self,
248 trace_id: &str,
249 query: Option<&str>,
250 output: Option<&str>,
251 output_summary: Option<&str>,
252 nomination: Option<&str>,
253 priority: i64,
254 ) -> Result<()> {
255 self.conn.execute(
256 "UPDATE episodic_log
257 SET output_summary = COALESCE(?, output_summary),
258 nomination = COALESCE(?, nomination),
259 output = COALESCE(?, output),
260 query = COALESCE(?, query),
261 priority = MAX(priority, ?)
262 WHERE trace_id = ?",
263 params![
264 output_summary,
265 nomination,
266 output,
267 query,
268 priority,
269 trace_id
270 ],
271 )?;
272 Ok(())
273 }
274
275 #[allow(clippy::too_many_arguments)]
276 pub fn update_trace_lifecycle(
277 &self,
278 trace_id: &str,
279 task_state: &str,
280 completed_at: Option<&str>,
281 usage_state: Option<&str>,
282 used_ids: Option<&str>,
283 used_attribution: Option<&str>,
284 used_complete: Option<bool>,
285 ) -> Result<()> {
286 self.conn.execute(
287 "UPDATE episodic_log
288 SET task_state=?,
289 completed_at=COALESCE(?, completed_at),
290 usage_state=COALESCE(?, usage_state),
291 used_ids=COALESCE(?, used_ids),
292 used_attribution=COALESCE(?, used_attribution),
293 used_complete=COALESCE(?, used_complete)
294 WHERE trace_id=?",
295 params![
296 task_state,
297 completed_at,
298 usage_state,
299 used_ids,
300 used_attribution,
301 used_complete.map(i64::from),
302 trace_id
303 ],
304 )?;
305 Ok(())
306 }
307
308 #[allow(clippy::too_many_arguments)]
309 pub fn upsert_confidence_evidence(
310 &self,
311 id: &str,
312 trace_id: Option<&str>,
313 chunk_id: &str,
314 kind: &str,
315 target: f64,
316 alpha: f64,
317 reason: &str,
318 context_key: Option<&str>,
319 ts: &str,
320 ) -> Result<()> {
321 self.conn.execute(
322 "INSERT INTO confidence_evidence
323 (id, trace_id, chunk_id, kind, target, alpha, reason, context_key, ts)
324 VALUES (?,?,?,?,?,?,?,?,?)
325 ON CONFLICT(trace_id, chunk_id, kind) WHERE trace_id IS NOT NULL
326 DO UPDATE SET target=excluded.target, alpha=excluded.alpha,
327 reason=excluded.reason, context_key=excluded.context_key",
328 params![
329 id,
330 trace_id,
331 chunk_id,
332 kind,
333 target,
334 alpha,
335 reason,
336 context_key,
337 ts
338 ],
339 )?;
340 Ok(())
341 }
342
343 pub fn delete_trace_confidence_evidence(&self, trace_id: &str, kinds: &[&str]) -> Result<()> {
344 if kinds.is_empty() {
345 return Ok(());
346 }
347 let placeholders = kinds.iter().map(|_| "?").collect::<Vec<_>>().join(",");
348 let sql = format!(
349 "DELETE FROM confidence_evidence WHERE trace_id=? AND kind IN ({placeholders})"
350 );
351 let mut params: Vec<&str> = Vec::with_capacity(kinds.len() + 1);
352 params.push(trace_id);
353 params.extend_from_slice(kinds);
354 self.conn
355 .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
356 Ok(())
357 }
358
359 pub fn delete_chunk_trace_confidence_evidence(
360 &self,
361 trace_id: &str,
362 chunk_id: &str,
363 kind: &str,
364 ) -> Result<()> {
365 self.conn.execute(
366 "DELETE FROM confidence_evidence
367 WHERE trace_id=? AND chunk_id=? AND kind=?",
368 params![trace_id, chunk_id, kind],
369 )?;
370 Ok(())
371 }
372
373 pub fn confidence_evidence_for_chunk(&self, chunk_id: &str) -> Result<Vec<Value>> {
374 self.query_json(
375 "SELECT target, alpha, reason, ts, id
376 FROM confidence_evidence WHERE chunk_id=?
377 ORDER BY ts ASC,
378 CASE kind
379 WHEN 'outcome_ok' THEN 1
380 WHEN 'outcome_fail' THEN 1
381 WHEN 'selected_unused' THEN 2
382 WHEN 'feedback_up' THEN 3
383 WHEN 'feedback_down' THEN 3
384 WHEN 'decay' THEN 4
385 ELSE 5
386 END ASC,
387 kind ASC, id ASC",
388 [chunk_id],
389 )
390 }
391
392 #[allow(clippy::too_many_arguments)]
393 pub fn insert_feedback_event(
394 &self,
395 id: &str,
396 trace_id: &str,
397 chunk_id: &str,
398 signal: &str,
399 strength: f64,
400 source: &str,
401 actor: Option<&str>,
402 reason: Option<&str>,
403 context_key: Option<&str>,
404 ts: &str,
405 ) -> Result<usize> {
406 Ok(self.conn.execute(
407 "INSERT OR IGNORE INTO feedback_events
408 (id, trace_id, chunk_id, signal, strength, source, actor, reason, context_key, ts)
409 VALUES (?,?,?,?,?,?,?,?,?,?)",
410 params![
411 id,
412 trace_id,
413 chunk_id,
414 signal,
415 strength,
416 source,
417 actor,
418 reason,
419 context_key,
420 ts
421 ],
422 )?)
423 }
424
425 pub fn delete_feedback_event(
426 &self,
427 trace_id: &str,
428 chunk_id: &str,
429 signal: &str,
430 ) -> Result<usize> {
431 Ok(self.conn.execute(
432 "DELETE FROM feedback_events
433 WHERE trace_id=? AND chunk_id=? AND signal=?",
434 params![trace_id, chunk_id, signal],
435 )?)
436 }
437
438 pub fn update_chunk_last_decayed_at(&self, id: &str, now: &str) -> Result<()> {
439 self.conn.execute(
440 "UPDATE chunks SET last_decayed_at=?, updated_at=? WHERE id=?",
441 params![now, now, id],
442 )?;
443 Ok(())
444 }
445
446 #[allow(clippy::too_many_arguments)]
447 pub fn update_context_stat(
448 &self,
449 chunk_id: &str,
450 context_key: &str,
451 success: i64,
452 failure: i64,
453 positive: i64,
454 negative: i64,
455 now: &str,
456 ) -> Result<()> {
457 self.conn.execute(
458 "INSERT INTO chunk_context_stats
459 (chunk_id, context_key, success_count, failure_count,
460 positive_feedback, negative_feedback, last_updated_at)
461 VALUES (?,?,?,?,?,?,?)
462 ON CONFLICT(chunk_id, context_key) DO UPDATE SET
463 success_count=success_count+excluded.success_count,
464 failure_count=failure_count+excluded.failure_count,
465 positive_feedback=positive_feedback+excluded.positive_feedback,
466 negative_feedback=negative_feedback+excluded.negative_feedback,
467 last_updated_at=excluded.last_updated_at",
468 params![
469 chunk_id,
470 context_key,
471 success,
472 failure,
473 positive,
474 negative,
475 now
476 ],
477 )?;
478 Ok(())
479 }
480
481 pub fn context_score(&self, chunk_id: &str, context_key: &str) -> Result<f64> {
482 let mut stmt = self.conn.prepare_cached(
483 "SELECT success_count, failure_count, positive_feedback, negative_feedback
484 FROM chunk_context_stats WHERE chunk_id=? AND context_key=?",
485 )?;
486 let row = stmt
487 .query_row(params![chunk_id, context_key], |row| {
488 Ok((
489 row.get::<_, i64>(0)?,
490 row.get::<_, i64>(1)?,
491 row.get::<_, i64>(2)?,
492 row.get::<_, i64>(3)?,
493 ))
494 })
495 .optional()?;
496 let Some((success, failure, positive, negative)) = row else {
497 return Ok(0.0);
498 };
499 Ok(context_score_from_counts(success, failure, positive, negative))
500 }
501
502 pub fn context_scores_batch(
505 &self,
506 chunk_ids: &[&str],
507 context_key: &str,
508 ) -> Result<HashMap<String, f64>> {
509 if chunk_ids.is_empty() {
510 return Ok(HashMap::new());
511 }
512 let placeholders = chunk_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
513 let sql = format!(
514 "SELECT chunk_id, success_count, failure_count, positive_feedback, negative_feedback
515 FROM chunk_context_stats
516 WHERE context_key=? AND chunk_id IN ({placeholders})"
517 );
518 let mut params: Vec<&str> = Vec::with_capacity(chunk_ids.len() + 1);
519 params.push(context_key);
520 params.extend_from_slice(chunk_ids);
521 let mut stmt = self.conn.prepare(&sql)?;
522 let rows = stmt.query_map(rusqlite::params_from_iter(params.iter()), |r| {
523 Ok((
524 r.get::<_, String>(0)?,
525 r.get::<_, i64>(1)?,
526 r.get::<_, i64>(2)?,
527 r.get::<_, i64>(3)?,
528 r.get::<_, i64>(4)?,
529 ))
530 })?;
531 let mut map = HashMap::new();
532 for (id, success, failure, positive, negative) in rows.filter_map(|r| r.ok()) {
533 map.insert(
534 id,
535 context_score_from_counts(success, failure, positive, negative),
536 );
537 }
538 Ok(map)
539 }
540}
541
542fn context_score_from_counts(success: i64, failure: i64, positive: i64, negative: i64) -> f64 {
544 let wins = success as f64 + positive as f64 * 2.0;
545 let losses = failure as f64 + negative as f64 * 2.0;
546 let evidence = wins + losses;
547 let posterior = (wins + 1.0) / (evidence + 2.0);
548 let evidence_weight = (evidence / 5.0).min(1.0);
549 (posterior - 0.5) * 2.0 * evidence_weight
550}