1use super::*;
2
3impl Storage {
4 #[allow(clippy::too_many_arguments)]
5 pub fn insert_usage_trace(
6 &self,
7 trace_id: &str,
8 chunk_id: Option<&str>,
9 event: &str,
10 strength: f64,
11 similarity: Option<f64>,
12 refine_mode: Option<&str>,
13 tokens: Option<i64>,
14 rank: Option<i64>,
15 attribution: Option<&str>,
16 source: &str,
17 ts: &str,
18 ) -> Result<usize> {
19 let mut stmt = self.conn.prepare_cached(
20 "INSERT OR IGNORE INTO usage_trace
21 (trace_id, chunk_id, event, strength, similarity, refine_mode, tokens, rank, attribution, source, ts)
22 VALUES (?,?,?,?,?,?,?,?,?,?,?)",
23 )?;
24 Ok(stmt.execute(params![
25 trace_id,
26 chunk_id,
27 event,
28 strength,
29 similarity,
30 refine_mode,
31 tokens,
32 rank,
33 attribution,
34 source,
35 ts
36 ])?)
37 }
38
39 pub fn replace_used_trace(
40 &self,
41 trace_id: &str,
42 used_ids: &[String],
43 strength: f64,
44 attribution: &str,
45 source: &str,
46 ts: &str,
47 ) -> Result<()> {
48 self.conn.execute(
49 "DELETE FROM usage_trace WHERE trace_id=? AND event='used'",
50 [trace_id],
51 )?;
52 for chunk_id in used_ids {
53 self.insert_usage_trace(
54 trace_id,
55 Some(chunk_id),
56 "used",
57 strength,
58 None,
59 None,
60 None,
61 None,
62 Some(attribution),
63 source,
64 ts,
65 )?;
66 }
67 Ok(())
68 }
69
70 pub fn merge_used_trace(
71 &self,
72 trace_id: &str,
73 used_ids: &[String],
74 strength: f64,
75 attribution: &str,
76 source: &str,
77 ts: &str,
78 ) -> Result<()> {
79 if used_ids.is_empty() {
80 return Ok(());
81 }
82 let attribution_rank = |value: &str| match value {
83 "explicit" => 3,
84 "cited" => 2,
85 "inferred" => 1,
86 _ => 0,
87 };
88
89 let placeholders = used_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
92 let sql = format!(
93 "SELECT chunk_id, attribution FROM usage_trace
94 WHERE trace_id=? AND event='used' AND chunk_id IN ({placeholders})"
95 );
96 let mut qparams: Vec<&str> = Vec::with_capacity(used_ids.len() + 1);
97 qparams.push(trace_id);
98 qparams.extend(used_ids.iter().map(String::as_str));
99 let existing: HashMap<String, String> = {
100 let mut stmt = self.conn.prepare(&sql)?;
101 let rows = stmt.query_map(rusqlite::params_from_iter(qparams.iter()), |r| {
102 let id: String = r.get(0)?;
103 let attr: Option<String> = r.get(1)?;
104 Ok((id, attr.unwrap_or_else(|| "inferred".to_string())))
105 })?;
106 rows.collect::<rusqlite::Result<HashMap<_, _>>>()?
107 };
108
109 for chunk_id in used_ids {
110 match existing.get(chunk_id) {
111 Some(existing_attribution) => {
112 if attribution_rank(attribution) > attribution_rank(existing_attribution) {
113 self.conn.execute(
114 "UPDATE usage_trace
115 SET strength=?, attribution=?, source=?, ts=?
116 WHERE trace_id=? AND chunk_id=? AND event='used'",
117 params![strength, attribution, source, ts, trace_id, chunk_id],
118 )?;
119 }
120 }
121 None => {
122 self.insert_usage_trace(
123 trace_id,
124 Some(chunk_id),
125 "used",
126 strength,
127 None,
128 None,
129 None,
130 None,
131 Some(attribution),
132 source,
133 ts,
134 )?;
135 }
136 }
137 }
138 Ok(())
139 }
140
141 pub fn refresh_chunk_last_used(&self, chunk_id: &str, now: &str) -> Result<()> {
142 self.conn.execute(
143 "UPDATE chunks
144 SET last_used_at=COALESCE(
145 (SELECT MAX(ts) FROM usage_trace
146 WHERE chunk_id=? AND event='used'
147 AND ts > COALESCE(chunks.evidence_cutoff_at, '')),
148 last_used_base
149 ),
150 updated_at=?
151 WHERE id=?",
152 params![chunk_id, now, chunk_id],
153 )?;
154 Ok(())
155 }
156
157 pub fn get_outcome_for_trace(&self, trace_id: &str) -> Result<Option<String>> {
158 let row = self.conn.query_row(
159 "SELECT event FROM usage_trace
160 WHERE trace_id=? AND event IN ('task_ok','task_fail') AND chunk_id IS NULL
161 LIMIT 1",
162 [trace_id],
163 |r| r.get::<_, String>(0),
164 );
165 match row {
166 Ok(v) => Ok(Some(v)),
167 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
168 Err(e) => Err(e.into()),
169 }
170 }
171
172 pub fn purge_usage_trace(&self, before_ts: &str) -> Result<usize> {
173 let n = self.conn.execute(
175 "DELETE FROM usage_trace
176 WHERE ts < ?
177 AND event IN ('retrieved','refined')
178 AND NOT (event = 'retrieved'
179 AND chunk_id IN (SELECT id FROM chunks WHERE origin='spark'))",
180 [before_ts],
181 )?;
182 Ok(n)
183 }
184
185 pub fn upsert_episodic_log(&self, log: &EpisodicLogRow) -> Result<()> {
190 self.conn.execute(
191 "INSERT OR REPLACE INTO episodic_log
192 (id, trace_id, lib_id, ts, query, recall_snapshot, output,
193 output_summary, outcome, event_source, task_state, completed_at,
194 usage_state, used_ids, used_attribution, used_complete, context_key, nomination, priority,
195 distill_state, distill_note, distill_attempts, distill_last_failed_at)
196 VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,?18,?19,?20,?21,0,NULL)",
197 params![
198 log.id,
199 log.trace_id,
200 log.lib_id,
201 log.ts,
202 log.query,
203 log.recall_snapshot,
204 log.output,
205 log.output_summary,
206 log.outcome,
207 log.event_source,
208 log.task_state,
209 log.completed_at,
210 log.usage_state,
211 log.used_ids,
212 log.used_attribution,
213 i64::from(log.used_complete),
214 log.context_key,
215 log.nomination,
216 log.priority,
217 log.distill_state,
218 log.distill_note
219 ],
220 )?;
221 Ok(())
222 }
223
224 pub fn get_episodic_log(&self, trace_id: &str) -> Result<Option<Value>> {
225 let row = self.conn.query_row(
226 "SELECT * FROM episodic_log WHERE trace_id=?",
227 [trace_id],
228 row_to_json,
229 );
230 match row {
231 Ok(v) => Ok(Some(v)),
232 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
233 Err(e) => Err(e.into()),
234 }
235 }
236
237 pub fn update_episodic_log_state(
238 &self,
239 trace_id: &str,
240 state: &str,
241 note: Option<&str>,
242 outcome: Option<&str>,
243 ) -> Result<()> {
244 self.conn.execute(
245 "UPDATE episodic_log
246 SET distill_state=?, distill_note=COALESCE(?,distill_note),
247 outcome=COALESCE(?,outcome),
248 distill_run_id=NULL, distill_locked_at=NULL
249 WHERE trace_id=?",
250 params![state, note, outcome, trace_id],
251 )?;
252 Ok(())
253 }
254
255 pub fn patch_episodic_log_content(
257 &self,
258 trace_id: &str,
259 query: Option<&str>,
260 output: Option<&str>,
261 output_summary: Option<&str>,
262 nomination: Option<&str>,
263 priority: i64,
264 ) -> Result<()> {
265 self.conn.execute(
266 "UPDATE episodic_log
267 SET output_summary = COALESCE(?, output_summary),
268 nomination = COALESCE(?, nomination),
269 output = COALESCE(?, output),
270 query = COALESCE(?, query),
271 priority = MAX(priority, ?)
272 WHERE trace_id = ?",
273 params![
274 output_summary,
275 nomination,
276 output,
277 query,
278 priority,
279 trace_id
280 ],
281 )?;
282 Ok(())
283 }
284
285 #[allow(clippy::too_many_arguments)]
286 pub fn update_trace_lifecycle(
287 &self,
288 trace_id: &str,
289 task_state: &str,
290 completed_at: Option<&str>,
291 usage_state: Option<&str>,
292 used_ids: Option<&str>,
293 used_attribution: Option<&str>,
294 used_complete: Option<bool>,
295 ) -> Result<()> {
296 self.conn.execute(
297 "UPDATE episodic_log
298 SET task_state=?,
299 completed_at=COALESCE(?, completed_at),
300 usage_state=COALESCE(?, usage_state),
301 used_ids=COALESCE(?, used_ids),
302 used_attribution=COALESCE(?, used_attribution),
303 used_complete=COALESCE(?, used_complete)
304 WHERE trace_id=?",
305 params![
306 task_state,
307 completed_at,
308 usage_state,
309 used_ids,
310 used_attribution,
311 used_complete.map(i64::from),
312 trace_id
313 ],
314 )?;
315 Ok(())
316 }
317
318 #[allow(clippy::too_many_arguments)]
319 pub fn upsert_confidence_evidence(
320 &self,
321 id: &str,
322 trace_id: Option<&str>,
323 chunk_id: &str,
324 kind: &str,
325 target: f64,
326 alpha: f64,
327 reason: &str,
328 context_key: Option<&str>,
329 ts: &str,
330 provenance: &str,
331 ) -> Result<()> {
332 self.conn.execute(
333 "INSERT INTO confidence_evidence
334 (id, trace_id, chunk_id, kind, target, alpha, reason, context_key, ts, provenance)
335 VALUES (?,?,?,?,?,?,?,?,?,?)
336 ON CONFLICT(trace_id, chunk_id, kind) WHERE trace_id IS NOT NULL
337 DO UPDATE SET target=excluded.target, alpha=excluded.alpha,
338 reason=excluded.reason, context_key=excluded.context_key,
339 provenance=excluded.provenance",
340 params![
341 id,
342 trace_id,
343 chunk_id,
344 kind,
345 target,
346 alpha,
347 reason,
348 context_key,
349 ts,
350 provenance
351 ],
352 )?;
353 Ok(())
354 }
355
356 pub fn observed_outcome_count(&self, chunk_id: &str) -> Result<i64> {
359 let n = self.conn.query_row(
360 "SELECT COUNT(*) FROM confidence_evidence
361 WHERE chunk_id=? AND provenance='observed'
362 AND kind IN ('outcome_ok','outcome_fail')",
363 params![chunk_id],
364 |r| r.get::<_, i64>(0),
365 )?;
366 Ok(n)
367 }
368
369 pub fn context_stat_present_batch(
373 &self,
374 chunk_ids: &[&str],
375 context_key: &str,
376 ) -> Result<std::collections::HashSet<String>> {
377 let mut set = std::collections::HashSet::new();
378 if chunk_ids.is_empty() {
379 return Ok(set);
380 }
381 let placeholders = chunk_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
382 let sql = format!(
383 "SELECT chunk_id FROM chunk_context_stats
384 WHERE context_key=? AND chunk_id IN ({placeholders})"
385 );
386 let mut params: Vec<&str> = Vec::with_capacity(chunk_ids.len() + 1);
387 params.push(context_key);
388 params.extend_from_slice(chunk_ids);
389 let mut stmt = self.conn.prepare(&sql)?;
390 let rows = stmt.query_map(rusqlite::params_from_iter(params.iter()), |r| {
391 r.get::<_, String>(0)
392 })?;
393 for row in rows {
394 set.insert(row?);
395 }
396 Ok(set)
397 }
398
399 #[allow(clippy::too_many_arguments)]
402 pub fn insert_verdict_log(
403 &self,
404 verdict_id: &str,
405 trace_id: &str,
406 situation_sig: &str,
407 emitted_valence: Option<&str>,
408 emitted_conf: Option<f64>,
409 emitted_strength: f64,
410 emitted_tier: Option<&str>,
411 abstain_reason: Option<&str>,
412 emitted_at: &str,
413 ) -> Result<()> {
414 self.conn.execute(
417 "INSERT INTO verdict_log
418 (verdict_id, trace_id, situation_sig, emitted_valence, emitted_conf,
419 emitted_strength, emitted_tier, abstain_reason, emitted_at)
420 VALUES (?,?,?,?,?,?,?,?,?)",
421 params![
422 verdict_id,
423 trace_id,
424 situation_sig,
425 emitted_valence,
426 emitted_conf,
427 emitted_strength,
428 emitted_tier,
429 abstain_reason,
430 emitted_at
431 ],
432 )?;
433 Ok(())
434 }
435
436 pub fn backfill_verdict_outcome(
440 &self,
441 trace_id: &str,
442 observed_outcome: f64,
443 provenance: &str,
444 observed_at: &str,
445 ) -> Result<()> {
446 self.conn.execute(
447 "UPDATE verdict_log
448 SET observed_outcome=?, outcome_observed_at=?, outcome_provenance=?
449 WHERE trace_id=? AND outcome_observed_at IS NULL",
450 params![observed_outcome, observed_at, provenance, trace_id],
451 )?;
452 Ok(())
453 }
454
455 pub fn load_calibration_map(&self) -> Result<Vec<(f64, f64, f64)>> {
457 let mut stmt = self.conn.prepare(
458 "SELECT claimed_lo, claimed_hi, observed_rate FROM calibration_map ORDER BY bucket",
459 )?;
460 let rows = stmt.query_map([], |r| {
461 Ok((
462 r.get::<_, f64>(0)?,
463 r.get::<_, f64>(1)?,
464 r.get::<_, f64>(2)?,
465 ))
466 })?;
467 let mut out = Vec::new();
468 for row in rows {
469 out.push(row?);
470 }
471 Ok(out)
472 }
473
474 pub fn verdict_calibration_samples(&self) -> Result<Vec<(f64, f64, f64)>> {
484 let mut stmt = self.conn.prepare(
485 "SELECT emitted_strength, emitted_conf,
486 CASE WHEN emitted_valence='affirm'
487 THEN (CASE WHEN observed_outcome < 0 THEN 1.0 ELSE 0.0 END)
488 ELSE (CASE WHEN observed_outcome > 0 THEN 1.0 ELSE 0.0 END) END
489 FROM verdict_log
490 WHERE outcome_provenance='observed'
491 AND emitted_conf IS NOT NULL AND emitted_strength IS NOT NULL
492 AND observed_outcome IS NOT NULL
493 AND emitted_valence IN ('affirm','caution')",
494 )?;
495 let rows = stmt.query_map([], |r| {
496 Ok((
497 r.get::<_, f64>(0)?,
498 r.get::<_, f64>(1)?,
499 r.get::<_, f64>(2)?,
500 ))
501 })?;
502 let mut out = Vec::new();
503 for row in rows {
504 out.push(row?);
505 }
506 Ok(out)
507 }
508
509 pub fn verdict_log_overview(&self) -> Result<(i64, i64, i64)> {
511 let total: i64 = self
512 .conn
513 .query_row("SELECT COUNT(*) FROM verdict_log", [], |r| r.get(0))?;
514 let abstained: i64 = self.conn.query_row(
515 "SELECT COUNT(*) FROM verdict_log WHERE abstain_reason IS NOT NULL",
516 [],
517 |r| r.get(0),
518 )?;
519 let observed: i64 = self.conn.query_row(
520 "SELECT COUNT(*) FROM verdict_log WHERE outcome_provenance='observed'",
521 [],
522 |r| r.get(0),
523 )?;
524 Ok((total, abstained, observed))
525 }
526
527 pub fn replace_calibration_map(
529 &self,
530 buckets: &[(f64, f64, f64, i64)],
531 now: &str,
532 ) -> Result<()> {
533 self.conn.execute("DELETE FROM calibration_map", [])?;
534 for (i, (lo, hi, rate, n)) in buckets.iter().enumerate() {
535 self.conn.execute(
536 "INSERT INTO calibration_map
537 (bucket, claimed_lo, claimed_hi, observed_rate, n, updated_at)
538 VALUES (?,?,?,?,?,?)",
539 params![i as i64, lo, hi, rate, n, now],
540 )?;
541 }
542 Ok(())
543 }
544
545 pub fn delete_trace_confidence_evidence(&self, trace_id: &str, kinds: &[&str]) -> Result<()> {
546 if kinds.is_empty() {
547 return Ok(());
548 }
549 let placeholders = kinds.iter().map(|_| "?").collect::<Vec<_>>().join(",");
550 let sql = format!(
551 "DELETE FROM confidence_evidence WHERE trace_id=? AND kind IN ({placeholders})"
552 );
553 let mut params: Vec<&str> = Vec::with_capacity(kinds.len() + 1);
554 params.push(trace_id);
555 params.extend_from_slice(kinds);
556 self.conn
557 .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
558 Ok(())
559 }
560
561 pub fn delete_chunk_trace_confidence_evidence(
562 &self,
563 trace_id: &str,
564 chunk_id: &str,
565 kind: &str,
566 ) -> Result<()> {
567 self.conn.execute(
568 "DELETE FROM confidence_evidence
569 WHERE trace_id=? AND chunk_id=? AND kind=?",
570 params![trace_id, chunk_id, kind],
571 )?;
572 Ok(())
573 }
574
575 pub fn confidence_evidence_for_chunk(&self, chunk_id: &str) -> Result<Vec<Value>> {
576 self.query_json(
578 "SELECT target, alpha, reason, ts, id
579 FROM confidence_evidence WHERE chunk_id=? AND provenance='observed'
580 ORDER BY ts ASC,
581 CASE kind
582 WHEN 'outcome_ok' THEN 1
583 WHEN 'outcome_fail' THEN 1
584 WHEN 'selected_unused' THEN 2
585 WHEN 'feedback_up' THEN 3
586 WHEN 'feedback_down' THEN 3
587 WHEN 'decay' THEN 4
588 ELSE 5
589 END ASC,
590 kind ASC, id ASC",
591 [chunk_id],
592 )
593 }
594
595 #[allow(clippy::too_many_arguments)]
596 pub fn insert_feedback_event(
597 &self,
598 id: &str,
599 trace_id: &str,
600 chunk_id: &str,
601 signal: &str,
602 strength: f64,
603 source: &str,
604 actor: Option<&str>,
605 reason: Option<&str>,
606 context_key: Option<&str>,
607 ts: &str,
608 ) -> Result<usize> {
609 Ok(self.conn.execute(
610 "INSERT OR IGNORE INTO feedback_events
611 (id, trace_id, chunk_id, signal, strength, source, actor, reason, context_key, ts)
612 VALUES (?,?,?,?,?,?,?,?,?,?)",
613 params![
614 id,
615 trace_id,
616 chunk_id,
617 signal,
618 strength,
619 source,
620 actor,
621 reason,
622 context_key,
623 ts
624 ],
625 )?)
626 }
627
628 pub fn delete_feedback_event(
629 &self,
630 trace_id: &str,
631 chunk_id: &str,
632 signal: &str,
633 ) -> Result<usize> {
634 Ok(self.conn.execute(
635 "DELETE FROM feedback_events
636 WHERE trace_id=? AND chunk_id=? AND signal=?",
637 params![trace_id, chunk_id, signal],
638 )?)
639 }
640
641 pub fn update_chunk_last_decayed_at(&self, id: &str, now: &str) -> Result<()> {
642 self.conn.execute(
643 "UPDATE chunks SET last_decayed_at=?, updated_at=? WHERE id=?",
644 params![now, now, id],
645 )?;
646 Ok(())
647 }
648
649 #[allow(clippy::too_many_arguments)]
650 pub fn update_context_stat(
651 &self,
652 chunk_id: &str,
653 context_key: &str,
654 success: i64,
655 failure: i64,
656 positive: i64,
657 negative: i64,
658 now: &str,
659 ) -> Result<()> {
660 self.conn.execute(
661 "INSERT INTO chunk_context_stats
662 (chunk_id, context_key, success_count, failure_count,
663 positive_feedback, negative_feedback, last_updated_at)
664 VALUES (?,?,?,?,?,?,?)
665 ON CONFLICT(chunk_id, context_key) DO UPDATE SET
666 success_count=success_count+excluded.success_count,
667 failure_count=failure_count+excluded.failure_count,
668 positive_feedback=positive_feedback+excluded.positive_feedback,
669 negative_feedback=negative_feedback+excluded.negative_feedback,
670 last_updated_at=excluded.last_updated_at",
671 params![
672 chunk_id,
673 context_key,
674 success,
675 failure,
676 positive,
677 negative,
678 now
679 ],
680 )?;
681 Ok(())
682 }
683
684 pub fn context_score(
685 &self,
686 chunk_id: &str,
687 context_key: &str,
688 prior_m: f64,
689 base_rate: f64,
690 ) -> Result<f64> {
691 let mut stmt = self.conn.prepare_cached(
692 "SELECT success_count, failure_count, positive_feedback, negative_feedback
693 FROM chunk_context_stats WHERE chunk_id=? AND context_key=?",
694 )?;
695 let row = stmt
696 .query_row(params![chunk_id, context_key], |row| {
697 Ok((
698 row.get::<_, i64>(0)?,
699 row.get::<_, i64>(1)?,
700 row.get::<_, i64>(2)?,
701 row.get::<_, i64>(3)?,
702 ))
703 })
704 .optional()?;
705 let Some((success, failure, positive, negative)) = row else {
706 return Ok(0.0);
707 };
708 Ok(context_score_from_counts(
709 success, failure, positive, negative, prior_m, base_rate,
710 ))
711 }
712
713 pub fn context_scores_batch(
716 &self,
717 chunk_ids: &[&str],
718 context_key: &str,
719 prior_m: f64,
720 base_rate: f64,
721 ) -> Result<HashMap<String, f64>> {
722 if chunk_ids.is_empty() {
723 return Ok(HashMap::new());
724 }
725 let placeholders = chunk_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
726 let sql = format!(
727 "SELECT chunk_id, success_count, failure_count, positive_feedback, negative_feedback
728 FROM chunk_context_stats
729 WHERE context_key=? AND chunk_id IN ({placeholders})"
730 );
731 let mut params: Vec<&str> = Vec::with_capacity(chunk_ids.len() + 1);
732 params.push(context_key);
733 params.extend_from_slice(chunk_ids);
734 let mut stmt = self.conn.prepare(&sql)?;
735 let rows = stmt.query_map(rusqlite::params_from_iter(params.iter()), |r| {
736 Ok((
737 r.get::<_, String>(0)?,
738 r.get::<_, i64>(1)?,
739 r.get::<_, i64>(2)?,
740 r.get::<_, i64>(3)?,
741 r.get::<_, i64>(4)?,
742 ))
743 })?;
744 let mut map = HashMap::new();
745 for row in rows {
746 let (id, success, failure, positive, negative) = row?;
747 map.insert(
748 id,
749 context_score_from_counts(success, failure, positive, negative, prior_m, base_rate),
750 );
751 }
752 Ok(map)
753 }
754}
755
756fn context_score_from_counts(
762 success: i64,
763 failure: i64,
764 positive: i64,
765 negative: i64,
766 prior_m: f64,
767 base_rate: f64,
768) -> f64 {
769 let wins = success as f64 + positive as f64 * 2.0;
770 let losses = failure as f64 + negative as f64 * 2.0;
771 let evidence = wins + losses;
772 let alpha0 = prior_m * base_rate;
773 let beta0 = prior_m * (1.0 - base_rate);
774 let posterior = (wins + alpha0) / (evidence + alpha0 + beta0);
775 let evidence_weight = (evidence / 5.0).min(1.0);
776 (posterior - 0.5) * 2.0 * evidence_weight
777}