1use super::*;
2
3impl Storage {
4 #[allow(clippy::too_many_arguments)]
5 pub fn insert_usage_trace(
6 &self,
7 trace_id: &str,
8 chunk_id: Option<&str>,
9 event: &str,
10 strength: f64,
11 similarity: Option<f64>,
12 refine_mode: Option<&str>,
13 tokens: Option<i64>,
14 rank: Option<i64>,
15 attribution: Option<&str>,
16 source: &str,
17 ts: &str,
18 ) -> Result<usize> {
19 let mut stmt = self.conn.prepare_cached(
20 "INSERT OR IGNORE INTO usage_trace
21 (trace_id, chunk_id, event, strength, similarity, refine_mode, tokens, rank, attribution, source, ts)
22 VALUES (?,?,?,?,?,?,?,?,?,?,?)",
23 )?;
24 Ok(stmt.execute(params![
25 trace_id,
26 chunk_id,
27 event,
28 strength,
29 similarity,
30 refine_mode,
31 tokens,
32 rank,
33 attribution,
34 source,
35 ts
36 ])?)
37 }
38
39 pub fn replace_used_trace(
40 &self,
41 trace_id: &str,
42 used_ids: &[String],
43 strength: f64,
44 attribution: &str,
45 source: &str,
46 ts: &str,
47 ) -> Result<()> {
48 self.conn.execute(
49 "DELETE FROM usage_trace WHERE trace_id=? AND event='used'",
50 [trace_id],
51 )?;
52 for chunk_id in used_ids {
53 self.insert_usage_trace(
54 trace_id,
55 Some(chunk_id),
56 "used",
57 strength,
58 None,
59 None,
60 None,
61 None,
62 Some(attribution),
63 source,
64 ts,
65 )?;
66 }
67 Ok(())
68 }
69
70 pub fn merge_used_trace(
71 &self,
72 trace_id: &str,
73 used_ids: &[String],
74 strength: f64,
75 attribution: &str,
76 source: &str,
77 ts: &str,
78 ) -> Result<()> {
79 if used_ids.is_empty() {
80 return Ok(());
81 }
82 let attribution_rank = |value: &str| match value {
83 "explicit" => 3,
84 "cited" => 2,
85 "inferred" => 1,
86 _ => 0,
87 };
88
89 let placeholders = used_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
92 let sql = format!(
93 "SELECT chunk_id, attribution FROM usage_trace
94 WHERE trace_id=? AND event='used' AND chunk_id IN ({placeholders})"
95 );
96 let mut qparams: Vec<&str> = Vec::with_capacity(used_ids.len() + 1);
97 qparams.push(trace_id);
98 qparams.extend(used_ids.iter().map(String::as_str));
99 let existing: HashMap<String, String> = {
100 let mut stmt = self.conn.prepare(&sql)?;
101 let rows = stmt.query_map(rusqlite::params_from_iter(qparams.iter()), |r| {
102 let id: String = r.get(0)?;
103 let attr: Option<String> = r.get(1)?;
104 Ok((id, attr.unwrap_or_else(|| "inferred".to_string())))
105 })?;
106 rows.collect::<rusqlite::Result<HashMap<_, _>>>()?
107 };
108
109 for chunk_id in used_ids {
110 match existing.get(chunk_id) {
111 Some(existing_attribution) => {
112 if attribution_rank(attribution) > attribution_rank(existing_attribution) {
113 self.conn.execute(
114 "UPDATE usage_trace
115 SET strength=?, attribution=?, source=?, ts=?
116 WHERE trace_id=? AND chunk_id=? AND event='used'",
117 params![strength, attribution, source, ts, trace_id, chunk_id],
118 )?;
119 }
120 }
121 None => {
122 self.insert_usage_trace(
123 trace_id,
124 Some(chunk_id),
125 "used",
126 strength,
127 None,
128 None,
129 None,
130 None,
131 Some(attribution),
132 source,
133 ts,
134 )?;
135 }
136 }
137 }
138 Ok(())
139 }
140
141 pub fn refresh_chunk_last_used(&self, chunk_id: &str, now: &str) -> Result<()> {
142 self.conn.execute(
143 "UPDATE chunks
144 SET last_used_at=COALESCE(
145 (SELECT MAX(ts) FROM usage_trace
146 WHERE chunk_id=? AND event='used'
147 AND ts > COALESCE(chunks.evidence_cutoff_at, '')),
148 last_used_base
149 ),
150 updated_at=?
151 WHERE id=?",
152 params![chunk_id, now, chunk_id],
153 )?;
154 Ok(())
155 }
156
157 pub fn get_outcome_for_trace(&self, trace_id: &str) -> Result<Option<String>> {
158 let row = self.conn.query_row(
159 "SELECT event FROM usage_trace
160 WHERE trace_id=? AND event IN ('task_ok','task_fail') AND chunk_id IS NULL
161 LIMIT 1",
162 [trace_id],
163 |r| r.get::<_, String>(0),
164 );
165 match row {
166 Ok(v) => Ok(Some(v)),
167 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
168 Err(e) => Err(e.into()),
169 }
170 }
171
172 pub fn purge_usage_trace(&self, before_ts: &str) -> Result<usize> {
173 let n = self.conn.execute(
175 "DELETE FROM usage_trace
176 WHERE ts < ?
177 AND event IN ('retrieved','refined')
178 AND NOT (event = 'retrieved'
179 AND chunk_id IN (SELECT id FROM chunks WHERE origin='spark'))",
180 [before_ts],
181 )?;
182 Ok(n)
183 }
184
185 pub fn upsert_episodic_log(&self, log: &EpisodicLogRow) -> Result<()> {
190 self.conn.execute(
191 "INSERT OR REPLACE INTO episodic_log
192 (id, trace_id, lib_id, ts, query, recall_snapshot, output,
193 output_summary, outcome, event_source, task_state, completed_at,
194 usage_state, used_ids, used_attribution, used_complete, context_key, nomination, priority,
195 distill_state, distill_note, distill_attempts, distill_last_failed_at, agent)
196 VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,?18,?19,?20,?21,0,NULL,?22)",
197 params![
198 log.id,
199 log.trace_id,
200 log.lib_id,
201 log.ts,
202 log.query,
203 log.recall_snapshot,
204 log.output,
205 log.output_summary,
206 log.outcome,
207 log.event_source,
208 log.task_state,
209 log.completed_at,
210 log.usage_state,
211 log.used_ids,
212 log.used_attribution,
213 i64::from(log.used_complete),
214 log.context_key,
215 log.nomination,
216 log.priority,
217 log.distill_state,
218 log.distill_note,
219 log.agent
220 ],
221 )?;
222 Ok(())
223 }
224
225 pub fn get_episodic_log(&self, trace_id: &str) -> Result<Option<Value>> {
226 let row = self.conn.query_row(
227 "SELECT * FROM episodic_log WHERE trace_id=?",
228 [trace_id],
229 row_to_json,
230 );
231 match row {
232 Ok(v) => Ok(Some(v)),
233 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
234 Err(e) => Err(e.into()),
235 }
236 }
237
238 pub fn update_episodic_log_state(
239 &self,
240 trace_id: &str,
241 state: &str,
242 note: Option<&str>,
243 outcome: Option<&str>,
244 ) -> Result<()> {
245 self.conn.execute(
246 "UPDATE episodic_log
247 SET distill_state=?, distill_note=COALESCE(?,distill_note),
248 outcome=COALESCE(?,outcome),
249 distill_run_id=NULL, distill_locked_at=NULL
250 WHERE trace_id=?",
251 params![state, note, outcome, trace_id],
252 )?;
253 Ok(())
254 }
255
256 pub fn patch_episodic_log_content(
258 &self,
259 trace_id: &str,
260 query: Option<&str>,
261 output: Option<&str>,
262 output_summary: Option<&str>,
263 nomination: Option<&str>,
264 priority: i64,
265 ) -> Result<()> {
266 self.conn.execute(
267 "UPDATE episodic_log
268 SET output_summary = COALESCE(?, output_summary),
269 nomination = COALESCE(?, nomination),
270 output = COALESCE(?, output),
271 query = COALESCE(?, query),
272 priority = MAX(priority, ?)
273 WHERE trace_id = ?",
274 params![
275 output_summary,
276 nomination,
277 output,
278 query,
279 priority,
280 trace_id
281 ],
282 )?;
283 Ok(())
284 }
285
286 #[allow(clippy::too_many_arguments)]
287 pub fn update_trace_lifecycle(
288 &self,
289 trace_id: &str,
290 task_state: &str,
291 completed_at: Option<&str>,
292 usage_state: Option<&str>,
293 used_ids: Option<&str>,
294 used_attribution: Option<&str>,
295 used_complete: Option<bool>,
296 ) -> Result<()> {
297 self.conn.execute(
298 "UPDATE episodic_log
299 SET task_state=?,
300 completed_at=COALESCE(?, completed_at),
301 usage_state=COALESCE(?, usage_state),
302 used_ids=COALESCE(?, used_ids),
303 used_attribution=COALESCE(?, used_attribution),
304 used_complete=COALESCE(?, used_complete)
305 WHERE trace_id=?",
306 params![
307 task_state,
308 completed_at,
309 usage_state,
310 used_ids,
311 used_attribution,
312 used_complete.map(i64::from),
313 trace_id
314 ],
315 )?;
316 Ok(())
317 }
318
319 #[allow(clippy::too_many_arguments)]
320 pub fn upsert_confidence_evidence(
321 &self,
322 id: &str,
323 trace_id: Option<&str>,
324 chunk_id: &str,
325 kind: &str,
326 target: f64,
327 alpha: f64,
328 reason: &str,
329 context_key: Option<&str>,
330 ts: &str,
331 provenance: &str,
332 ) -> Result<()> {
333 self.conn.execute(
334 "INSERT INTO confidence_evidence
335 (id, trace_id, chunk_id, kind, target, alpha, reason, context_key, ts, provenance)
336 VALUES (?,?,?,?,?,?,?,?,?,?)
337 ON CONFLICT(trace_id, chunk_id, kind) WHERE trace_id IS NOT NULL
338 DO UPDATE SET target=excluded.target, alpha=excluded.alpha,
339 reason=excluded.reason, context_key=excluded.context_key,
340 provenance=excluded.provenance",
341 params![
342 id,
343 trace_id,
344 chunk_id,
345 kind,
346 target,
347 alpha,
348 reason,
349 context_key,
350 ts,
351 provenance
352 ],
353 )?;
354 Ok(())
355 }
356
357 pub fn observed_outcome_count(&self, chunk_id: &str) -> Result<i64> {
360 let n = self.conn.query_row(
361 "SELECT COUNT(*) FROM confidence_evidence
362 WHERE chunk_id=? AND provenance='observed'
363 AND kind IN ('outcome_ok','outcome_fail')",
364 params![chunk_id],
365 |r| r.get::<_, i64>(0),
366 )?;
367 Ok(n)
368 }
369
370 pub fn context_stat_present_batch(
374 &self,
375 chunk_ids: &[&str],
376 context_key: &str,
377 ) -> Result<std::collections::HashSet<String>> {
378 let mut set = std::collections::HashSet::new();
379 if chunk_ids.is_empty() {
380 return Ok(set);
381 }
382 let placeholders = chunk_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
383 let sql = format!(
384 "SELECT chunk_id FROM chunk_context_stats
385 WHERE context_key=? AND chunk_id IN ({placeholders})"
386 );
387 let mut params: Vec<&str> = Vec::with_capacity(chunk_ids.len() + 1);
388 params.push(context_key);
389 params.extend_from_slice(chunk_ids);
390 let mut stmt = self.conn.prepare(&sql)?;
391 let rows = stmt.query_map(rusqlite::params_from_iter(params.iter()), |r| {
392 r.get::<_, String>(0)
393 })?;
394 for row in rows {
395 set.insert(row?);
396 }
397 Ok(set)
398 }
399
400 #[allow(clippy::too_many_arguments)]
403 pub fn insert_verdict_log(
404 &self,
405 verdict_id: &str,
406 trace_id: &str,
407 situation_sig: &str,
408 emitted_valence: Option<&str>,
409 emitted_conf: Option<f64>,
410 emitted_strength: f64,
411 emitted_tier: Option<&str>,
412 abstain_reason: Option<&str>,
413 emitted_at: &str,
414 ) -> Result<()> {
415 self.conn.execute(
418 "INSERT INTO verdict_log
419 (verdict_id, trace_id, situation_sig, emitted_valence, emitted_conf,
420 emitted_strength, emitted_tier, abstain_reason, emitted_at)
421 VALUES (?,?,?,?,?,?,?,?,?)",
422 params![
423 verdict_id,
424 trace_id,
425 situation_sig,
426 emitted_valence,
427 emitted_conf,
428 emitted_strength,
429 emitted_tier,
430 abstain_reason,
431 emitted_at
432 ],
433 )?;
434 Ok(())
435 }
436
437 pub fn backfill_verdict_outcome(
441 &self,
442 trace_id: &str,
443 observed_outcome: f64,
444 provenance: &str,
445 observed_at: &str,
446 ) -> Result<()> {
447 self.conn.execute(
448 "UPDATE verdict_log
449 SET observed_outcome=?, outcome_observed_at=?, outcome_provenance=?
450 WHERE trace_id=? AND outcome_observed_at IS NULL",
451 params![observed_outcome, observed_at, provenance, trace_id],
452 )?;
453 Ok(())
454 }
455
456 pub fn load_calibration_map(&self) -> Result<Vec<(f64, f64, f64)>> {
458 let mut stmt = self.conn.prepare(
459 "SELECT claimed_lo, claimed_hi, observed_rate FROM calibration_map ORDER BY bucket",
460 )?;
461 let rows = stmt.query_map([], |r| {
462 Ok((
463 r.get::<_, f64>(0)?,
464 r.get::<_, f64>(1)?,
465 r.get::<_, f64>(2)?,
466 ))
467 })?;
468 let mut out = Vec::new();
469 for row in rows {
470 out.push(row?);
471 }
472 Ok(out)
473 }
474
475 pub fn verdict_calibration_samples(&self) -> Result<Vec<(f64, f64, f64)>> {
485 let mut stmt = self.conn.prepare(
486 "SELECT emitted_strength, emitted_conf,
487 CASE WHEN emitted_valence='affirm'
488 THEN (CASE WHEN observed_outcome < 0 THEN 1.0 ELSE 0.0 END)
489 ELSE (CASE WHEN observed_outcome > 0 THEN 1.0 ELSE 0.0 END) END
490 FROM verdict_log
491 WHERE outcome_provenance='observed'
492 AND emitted_conf IS NOT NULL AND emitted_strength IS NOT NULL
493 AND observed_outcome IS NOT NULL
494 AND emitted_valence IN ('affirm','caution')",
495 )?;
496 let rows = stmt.query_map([], |r| {
497 Ok((
498 r.get::<_, f64>(0)?,
499 r.get::<_, f64>(1)?,
500 r.get::<_, f64>(2)?,
501 ))
502 })?;
503 let mut out = Vec::new();
504 for row in rows {
505 out.push(row?);
506 }
507 Ok(out)
508 }
509
510 pub fn verdict_log_overview(&self) -> Result<(i64, i64, i64)> {
512 let total: i64 = self
513 .conn
514 .query_row("SELECT COUNT(*) FROM verdict_log", [], |r| r.get(0))?;
515 let abstained: i64 = self.conn.query_row(
516 "SELECT COUNT(*) FROM verdict_log WHERE abstain_reason IS NOT NULL",
517 [],
518 |r| r.get(0),
519 )?;
520 let observed: i64 = self.conn.query_row(
521 "SELECT COUNT(*) FROM verdict_log WHERE outcome_provenance='observed'",
522 [],
523 |r| r.get(0),
524 )?;
525 Ok((total, abstained, observed))
526 }
527
528 pub fn replace_calibration_map(
530 &self,
531 buckets: &[(f64, f64, f64, i64)],
532 now: &str,
533 ) -> Result<()> {
534 self.conn.execute("DELETE FROM calibration_map", [])?;
535 for (i, (lo, hi, rate, n)) in buckets.iter().enumerate() {
536 self.conn.execute(
537 "INSERT INTO calibration_map
538 (bucket, claimed_lo, claimed_hi, observed_rate, n, updated_at)
539 VALUES (?,?,?,?,?,?)",
540 params![i as i64, lo, hi, rate, n, now],
541 )?;
542 }
543 Ok(())
544 }
545
546 pub fn delete_trace_confidence_evidence(&self, trace_id: &str, kinds: &[&str]) -> Result<()> {
547 if kinds.is_empty() {
548 return Ok(());
549 }
550 let placeholders = kinds.iter().map(|_| "?").collect::<Vec<_>>().join(",");
551 let sql = format!(
552 "DELETE FROM confidence_evidence WHERE trace_id=? AND kind IN ({placeholders})"
553 );
554 let mut params: Vec<&str> = Vec::with_capacity(kinds.len() + 1);
555 params.push(trace_id);
556 params.extend_from_slice(kinds);
557 self.conn
558 .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
559 Ok(())
560 }
561
562 pub fn delete_chunk_trace_confidence_evidence(
563 &self,
564 trace_id: &str,
565 chunk_id: &str,
566 kind: &str,
567 ) -> Result<()> {
568 self.conn.execute(
569 "DELETE FROM confidence_evidence
570 WHERE trace_id=? AND chunk_id=? AND kind=?",
571 params![trace_id, chunk_id, kind],
572 )?;
573 Ok(())
574 }
575
576 pub fn confidence_evidence_for_chunk(&self, chunk_id: &str) -> Result<Vec<Value>> {
577 self.query_json(
579 "SELECT target, alpha, reason, ts, id
580 FROM confidence_evidence WHERE chunk_id=? AND provenance='observed'
581 ORDER BY ts ASC,
582 CASE kind
583 WHEN 'outcome_ok' THEN 1
584 WHEN 'outcome_fail' THEN 1
585 WHEN 'selected_unused' THEN 2
586 WHEN 'feedback_up' THEN 3
587 WHEN 'feedback_down' THEN 3
588 WHEN 'decay' THEN 4
589 ELSE 5
590 END ASC,
591 kind ASC, id ASC",
592 [chunk_id],
593 )
594 }
595
596 #[allow(clippy::too_many_arguments)]
597 pub fn insert_feedback_event(
598 &self,
599 id: &str,
600 trace_id: &str,
601 chunk_id: &str,
602 signal: &str,
603 strength: f64,
604 source: &str,
605 actor: Option<&str>,
606 reason: Option<&str>,
607 context_key: Option<&str>,
608 ts: &str,
609 ) -> Result<usize> {
610 Ok(self.conn.execute(
611 "INSERT OR IGNORE INTO feedback_events
612 (id, trace_id, chunk_id, signal, strength, source, actor, reason, context_key, ts)
613 VALUES (?,?,?,?,?,?,?,?,?,?)",
614 params![
615 id,
616 trace_id,
617 chunk_id,
618 signal,
619 strength,
620 source,
621 actor,
622 reason,
623 context_key,
624 ts
625 ],
626 )?)
627 }
628
629 pub fn delete_feedback_event(
630 &self,
631 trace_id: &str,
632 chunk_id: &str,
633 signal: &str,
634 ) -> Result<usize> {
635 Ok(self.conn.execute(
636 "DELETE FROM feedback_events
637 WHERE trace_id=? AND chunk_id=? AND signal=?",
638 params![trace_id, chunk_id, signal],
639 )?)
640 }
641
642 pub fn update_chunk_last_decayed_at(&self, id: &str, now: &str) -> Result<()> {
643 self.conn.execute(
644 "UPDATE chunks SET last_decayed_at=?, updated_at=? WHERE id=?",
645 params![now, now, id],
646 )?;
647 Ok(())
648 }
649
650 #[allow(clippy::too_many_arguments)]
651 pub fn update_context_stat(
652 &self,
653 chunk_id: &str,
654 context_key: &str,
655 success: i64,
656 failure: i64,
657 positive: i64,
658 negative: i64,
659 now: &str,
660 ) -> Result<()> {
661 self.conn.execute(
662 "INSERT INTO chunk_context_stats
663 (chunk_id, context_key, success_count, failure_count,
664 positive_feedback, negative_feedback, last_updated_at)
665 VALUES (?,?,?,?,?,?,?)
666 ON CONFLICT(chunk_id, context_key) DO UPDATE SET
667 success_count=success_count+excluded.success_count,
668 failure_count=failure_count+excluded.failure_count,
669 positive_feedback=positive_feedback+excluded.positive_feedback,
670 negative_feedback=negative_feedback+excluded.negative_feedback,
671 last_updated_at=excluded.last_updated_at",
672 params![
673 chunk_id,
674 context_key,
675 success,
676 failure,
677 positive,
678 negative,
679 now
680 ],
681 )?;
682 Ok(())
683 }
684
685 pub fn context_score(
686 &self,
687 chunk_id: &str,
688 context_key: &str,
689 prior_m: f64,
690 base_rate: f64,
691 ) -> Result<f64> {
692 let mut stmt = self.conn.prepare_cached(
693 "SELECT success_count, failure_count, positive_feedback, negative_feedback
694 FROM chunk_context_stats WHERE chunk_id=? AND context_key=?",
695 )?;
696 let row = stmt
697 .query_row(params![chunk_id, context_key], |row| {
698 Ok((
699 row.get::<_, i64>(0)?,
700 row.get::<_, i64>(1)?,
701 row.get::<_, i64>(2)?,
702 row.get::<_, i64>(3)?,
703 ))
704 })
705 .optional()?;
706 let Some((success, failure, positive, negative)) = row else {
707 return Ok(0.0);
708 };
709 Ok(context_score_from_counts(
710 success, failure, positive, negative, prior_m, base_rate,
711 ))
712 }
713
714 pub fn context_scores_batch(
717 &self,
718 chunk_ids: &[&str],
719 context_key: &str,
720 prior_m: f64,
721 base_rate: f64,
722 ) -> Result<HashMap<String, f64>> {
723 if chunk_ids.is_empty() {
724 return Ok(HashMap::new());
725 }
726 let placeholders = chunk_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
727 let sql = format!(
728 "SELECT chunk_id, success_count, failure_count, positive_feedback, negative_feedback
729 FROM chunk_context_stats
730 WHERE context_key=? AND chunk_id IN ({placeholders})"
731 );
732 let mut params: Vec<&str> = Vec::with_capacity(chunk_ids.len() + 1);
733 params.push(context_key);
734 params.extend_from_slice(chunk_ids);
735 let mut stmt = self.conn.prepare(&sql)?;
736 let rows = stmt.query_map(rusqlite::params_from_iter(params.iter()), |r| {
737 Ok((
738 r.get::<_, String>(0)?,
739 r.get::<_, i64>(1)?,
740 r.get::<_, i64>(2)?,
741 r.get::<_, i64>(3)?,
742 r.get::<_, i64>(4)?,
743 ))
744 })?;
745 let mut map = HashMap::new();
746 for row in rows {
747 let (id, success, failure, positive, negative) = row?;
748 map.insert(
749 id,
750 context_score_from_counts(success, failure, positive, negative, prior_m, base_rate),
751 );
752 }
753 Ok(map)
754 }
755}
756
757fn context_score_from_counts(
763 success: i64,
764 failure: i64,
765 positive: i64,
766 negative: i64,
767 prior_m: f64,
768 base_rate: f64,
769) -> f64 {
770 let wins = success as f64 + positive as f64 * 2.0;
771 let losses = failure as f64 + negative as f64 * 2.0;
772 let evidence = wins + losses;
773 let alpha0 = prior_m * base_rate;
774 let beta0 = prior_m * (1.0 - base_rate);
775 let posterior = (wins + alpha0) / (evidence + alpha0 + beta0);
776 let evidence_weight = (evidence / 5.0).min(1.0);
777 (posterior - 0.5) * 2.0 * evidence_weight
778}