Skip to main content

innate_core/storage/
traces.rs

1use super::*;
2
3impl Storage {
4    #[allow(clippy::too_many_arguments)]
5    pub fn insert_usage_trace(
6        &self,
7        trace_id: &str,
8        chunk_id: Option<&str>,
9        event: &str,
10        strength: f64,
11        similarity: Option<f64>,
12        refine_mode: Option<&str>,
13        tokens: Option<i64>,
14        rank: Option<i64>,
15        attribution: Option<&str>,
16        source: &str,
17        ts: &str,
18    ) -> Result<usize> {
19        let mut stmt = self.conn.prepare_cached(
20            "INSERT OR IGNORE INTO usage_trace
21             (trace_id, chunk_id, event, strength, similarity, refine_mode, tokens, rank, attribution, source, ts)
22             VALUES (?,?,?,?,?,?,?,?,?,?,?)",
23        )?;
24        Ok(stmt.execute(
25            params![trace_id, chunk_id, event, strength, similarity, refine_mode, tokens, rank, attribution, source, ts],
26        )?)
27    }
28
29    pub fn replace_used_trace(
30        &self,
31        trace_id: &str,
32        used_ids: &[String],
33        strength: f64,
34        attribution: &str,
35        source: &str,
36        ts: &str,
37    ) -> Result<()> {
38        self.conn.execute(
39            "DELETE FROM usage_trace WHERE trace_id=? AND event='used'",
40            [trace_id],
41        )?;
42        for chunk_id in used_ids {
43            self.insert_usage_trace(
44                trace_id,
45                Some(chunk_id),
46                "used",
47                strength,
48                None,
49                None,
50                None,
51                None,
52                Some(attribution),
53                source,
54                ts,
55            )?;
56        }
57        Ok(())
58    }
59
60    pub fn merge_used_trace(
61        &self,
62        trace_id: &str,
63        used_ids: &[String],
64        strength: f64,
65        attribution: &str,
66        source: &str,
67        ts: &str,
68    ) -> Result<()> {
69        if used_ids.is_empty() {
70            return Ok(());
71        }
72        let attribution_rank = |value: &str| match value {
73            "explicit" => 3,
74            "cited" => 2,
75            "inferred" => 1,
76            _ => 0,
77        };
78
79        // Batch-fetch all existing 'used' rows for this trace in one query
80        // instead of one SELECT per chunk id.
81        let placeholders = used_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
82        let sql = format!(
83            "SELECT chunk_id, attribution FROM usage_trace
84             WHERE trace_id=? AND event='used' AND chunk_id IN ({placeholders})"
85        );
86        let mut qparams: Vec<&str> = Vec::with_capacity(used_ids.len() + 1);
87        qparams.push(trace_id);
88        qparams.extend(used_ids.iter().map(String::as_str));
89        let existing: HashMap<String, String> = {
90            let mut stmt = self.conn.prepare(&sql)?;
91            let rows = stmt.query_map(rusqlite::params_from_iter(qparams.iter()), |r| {
92                let id: String = r.get(0)?;
93                let attr: Option<String> = r.get(1)?;
94                Ok((id, attr.unwrap_or_else(|| "inferred".to_string())))
95            })?;
96            rows.collect::<rusqlite::Result<HashMap<_, _>>>()?
97        };
98
99        for chunk_id in used_ids {
100            match existing.get(chunk_id) {
101                Some(existing_attribution) => {
102                    if attribution_rank(attribution) > attribution_rank(existing_attribution) {
103                        self.conn.execute(
104                            "UPDATE usage_trace
105                             SET strength=?, attribution=?, source=?, ts=?
106                             WHERE trace_id=? AND chunk_id=? AND event='used'",
107                            params![strength, attribution, source, ts, trace_id, chunk_id],
108                        )?;
109                    }
110                }
111                None => {
112                    self.insert_usage_trace(
113                        trace_id,
114                        Some(chunk_id),
115                        "used",
116                        strength,
117                        None,
118                        None,
119                        None,
120                        None,
121                        Some(attribution),
122                        source,
123                        ts,
124                    )?;
125                }
126            }
127        }
128        Ok(())
129    }
130
131    pub fn refresh_chunk_last_used(&self, chunk_id: &str, now: &str) -> Result<()> {
132        self.conn.execute(
133            "UPDATE chunks
134             SET last_used_at=COALESCE(
135                   (SELECT MAX(ts) FROM usage_trace
136                    WHERE chunk_id=? AND event='used'
137                      AND ts > COALESCE(chunks.evidence_cutoff_at, '')),
138                   last_used_base
139                 ),
140                 updated_at=?
141             WHERE id=?",
142            params![chunk_id, now, chunk_id],
143        )?;
144        Ok(())
145    }
146
147    pub fn get_outcome_for_trace(&self, trace_id: &str) -> Result<Option<String>> {
148        let row = self.conn.query_row(
149            "SELECT event FROM usage_trace
150             WHERE trace_id=? AND event IN ('task_ok','task_fail') AND chunk_id IS NULL
151             LIMIT 1",
152            [trace_id],
153            |r| r.get::<_, String>(0),
154        );
155        match row {
156            Ok(v) => Ok(Some(v)),
157            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
158            Err(e) => Err(e.into()),
159        }
160    }
161
162    pub fn purge_usage_trace(&self, before_ts: &str) -> Result<usize> {
163        // Preserve compact attribution facts. They are required to replay corrections.
164        let n = self.conn.execute(
165            "DELETE FROM usage_trace
166             WHERE ts < ?
167             AND event IN ('retrieved','refined')
168             AND NOT (event = 'retrieved'
169                      AND chunk_id IN (SELECT id FROM chunks WHERE origin='spark'))",
170            [before_ts],
171        )?;
172        Ok(n)
173    }
174
175    // ------------------------------------------------------------------
176    // Episodic log
177    // ------------------------------------------------------------------
178
179    pub fn upsert_episodic_log(&self, log: &EpisodicLogRow) -> Result<()> {
180        self.conn.execute(
181            "INSERT OR REPLACE INTO episodic_log
182             (id, trace_id, lib_id, ts, query, recall_snapshot, output,
183              output_summary, outcome, event_source, task_state, completed_at,
184              usage_state, used_ids, used_attribution, used_complete, context_key, nomination, priority,
185              distill_state, distill_note, distill_attempts, distill_last_failed_at)
186             VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,?18,?19,?20,?21,0,NULL)",
187            params![
188                log.id,
189                log.trace_id,
190                log.lib_id,
191                log.ts,
192                log.query,
193                log.recall_snapshot,
194                log.output,
195                log.output_summary,
196                log.outcome,
197                log.event_source,
198                log.task_state,
199                log.completed_at,
200                log.usage_state,
201                log.used_ids,
202                log.used_attribution,
203                i64::from(log.used_complete),
204                log.context_key,
205                log.nomination,
206                log.priority,
207                log.distill_state,
208                log.distill_note
209            ],
210        )?;
211        Ok(())
212    }
213
214    pub fn get_episodic_log(&self, trace_id: &str) -> Result<Option<Value>> {
215        let row = self.conn.query_row(
216            "SELECT * FROM episodic_log WHERE trace_id=?",
217            [trace_id],
218            row_to_json,
219        );
220        match row {
221            Ok(v) => Ok(Some(v)),
222            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
223            Err(e) => Err(e.into()),
224        }
225    }
226
227    pub fn update_episodic_log_state(
228        &self,
229        trace_id: &str,
230        state: &str,
231        note: Option<&str>,
232        outcome: Option<&str>,
233    ) -> Result<()> {
234        self.conn.execute(
235            "UPDATE episodic_log
236             SET distill_state=?, distill_note=COALESCE(?,distill_note),
237                 outcome=COALESCE(?,outcome),
238                 distill_run_id=NULL, distill_locked_at=NULL
239             WHERE trace_id=?",
240            params![state, note, outcome, trace_id],
241        )?;
242        Ok(())
243    }
244
245    /// Patch content fields on an existing episodic_log row (補写: output_summary, nomination, etc.)
246    pub fn patch_episodic_log_content(
247        &self,
248        trace_id: &str,
249        query: Option<&str>,
250        output: Option<&str>,
251        output_summary: Option<&str>,
252        nomination: Option<&str>,
253        priority: i64,
254    ) -> Result<()> {
255        self.conn.execute(
256            "UPDATE episodic_log
257             SET output_summary = COALESCE(?, output_summary),
258                 nomination     = COALESCE(?, nomination),
259                 output         = COALESCE(?, output),
260                 query          = COALESCE(?, query),
261                 priority       = MAX(priority, ?)
262             WHERE trace_id = ?",
263            params![
264                output_summary,
265                nomination,
266                output,
267                query,
268                priority,
269                trace_id
270            ],
271        )?;
272        Ok(())
273    }
274
275    #[allow(clippy::too_many_arguments)]
276    pub fn update_trace_lifecycle(
277        &self,
278        trace_id: &str,
279        task_state: &str,
280        completed_at: Option<&str>,
281        usage_state: Option<&str>,
282        used_ids: Option<&str>,
283        used_attribution: Option<&str>,
284        used_complete: Option<bool>,
285    ) -> Result<()> {
286        self.conn.execute(
287            "UPDATE episodic_log
288             SET task_state=?,
289                 completed_at=COALESCE(?, completed_at),
290                 usage_state=COALESCE(?, usage_state),
291                 used_ids=COALESCE(?, used_ids),
292                 used_attribution=COALESCE(?, used_attribution),
293                 used_complete=COALESCE(?, used_complete)
294             WHERE trace_id=?",
295            params![
296                task_state,
297                completed_at,
298                usage_state,
299                used_ids,
300                used_attribution,
301                used_complete.map(i64::from),
302                trace_id
303            ],
304        )?;
305        Ok(())
306    }
307
308    #[allow(clippy::too_many_arguments)]
309    pub fn upsert_confidence_evidence(
310        &self,
311        id: &str,
312        trace_id: Option<&str>,
313        chunk_id: &str,
314        kind: &str,
315        target: f64,
316        alpha: f64,
317        reason: &str,
318        context_key: Option<&str>,
319        ts: &str,
320    ) -> Result<()> {
321        self.conn.execute(
322            "INSERT INTO confidence_evidence
323             (id, trace_id, chunk_id, kind, target, alpha, reason, context_key, ts)
324             VALUES (?,?,?,?,?,?,?,?,?)
325             ON CONFLICT(trace_id, chunk_id, kind) WHERE trace_id IS NOT NULL
326             DO UPDATE SET target=excluded.target, alpha=excluded.alpha,
327                           reason=excluded.reason, context_key=excluded.context_key",
328            params![
329                id,
330                trace_id,
331                chunk_id,
332                kind,
333                target,
334                alpha,
335                reason,
336                context_key,
337                ts
338            ],
339        )?;
340        Ok(())
341    }
342
343    pub fn delete_trace_confidence_evidence(&self, trace_id: &str, kinds: &[&str]) -> Result<()> {
344        if kinds.is_empty() {
345            return Ok(());
346        }
347        let placeholders = kinds.iter().map(|_| "?").collect::<Vec<_>>().join(",");
348        let sql = format!(
349            "DELETE FROM confidence_evidence WHERE trace_id=? AND kind IN ({placeholders})"
350        );
351        let mut params: Vec<&str> = Vec::with_capacity(kinds.len() + 1);
352        params.push(trace_id);
353        params.extend_from_slice(kinds);
354        self.conn
355            .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
356        Ok(())
357    }
358
359    pub fn delete_chunk_trace_confidence_evidence(
360        &self,
361        trace_id: &str,
362        chunk_id: &str,
363        kind: &str,
364    ) -> Result<()> {
365        self.conn.execute(
366            "DELETE FROM confidence_evidence
367             WHERE trace_id=? AND chunk_id=? AND kind=?",
368            params![trace_id, chunk_id, kind],
369        )?;
370        Ok(())
371    }
372
373    pub fn confidence_evidence_for_chunk(&self, chunk_id: &str) -> Result<Vec<Value>> {
374        self.query_json(
375            "SELECT target, alpha, reason, ts, id
376             FROM confidence_evidence WHERE chunk_id=?
377             ORDER BY ts ASC,
378                      CASE kind
379                        WHEN 'outcome_ok' THEN 1
380                        WHEN 'outcome_fail' THEN 1
381                        WHEN 'selected_unused' THEN 2
382                        WHEN 'feedback_up' THEN 3
383                        WHEN 'feedback_down' THEN 3
384                        WHEN 'decay' THEN 4
385                        ELSE 5
386                      END ASC,
387                      kind ASC, id ASC",
388            [chunk_id],
389        )
390    }
391
392    #[allow(clippy::too_many_arguments)]
393    pub fn insert_feedback_event(
394        &self,
395        id: &str,
396        trace_id: &str,
397        chunk_id: &str,
398        signal: &str,
399        strength: f64,
400        source: &str,
401        actor: Option<&str>,
402        reason: Option<&str>,
403        context_key: Option<&str>,
404        ts: &str,
405    ) -> Result<usize> {
406        Ok(self.conn.execute(
407            "INSERT OR IGNORE INTO feedback_events
408             (id, trace_id, chunk_id, signal, strength, source, actor, reason, context_key, ts)
409             VALUES (?,?,?,?,?,?,?,?,?,?)",
410            params![
411                id,
412                trace_id,
413                chunk_id,
414                signal,
415                strength,
416                source,
417                actor,
418                reason,
419                context_key,
420                ts
421            ],
422        )?)
423    }
424
425    pub fn delete_feedback_event(
426        &self,
427        trace_id: &str,
428        chunk_id: &str,
429        signal: &str,
430    ) -> Result<usize> {
431        Ok(self.conn.execute(
432            "DELETE FROM feedback_events
433             WHERE trace_id=? AND chunk_id=? AND signal=?",
434            params![trace_id, chunk_id, signal],
435        )?)
436    }
437
438    pub fn update_chunk_last_decayed_at(&self, id: &str, now: &str) -> Result<()> {
439        self.conn.execute(
440            "UPDATE chunks SET last_decayed_at=?, updated_at=? WHERE id=?",
441            params![now, now, id],
442        )?;
443        Ok(())
444    }
445
446    #[allow(clippy::too_many_arguments)]
447    pub fn update_context_stat(
448        &self,
449        chunk_id: &str,
450        context_key: &str,
451        success: i64,
452        failure: i64,
453        positive: i64,
454        negative: i64,
455        now: &str,
456    ) -> Result<()> {
457        self.conn.execute(
458            "INSERT INTO chunk_context_stats
459             (chunk_id, context_key, success_count, failure_count,
460              positive_feedback, negative_feedback, last_updated_at)
461             VALUES (?,?,?,?,?,?,?)
462             ON CONFLICT(chunk_id, context_key) DO UPDATE SET
463               success_count=success_count+excluded.success_count,
464               failure_count=failure_count+excluded.failure_count,
465               positive_feedback=positive_feedback+excluded.positive_feedback,
466               negative_feedback=negative_feedback+excluded.negative_feedback,
467               last_updated_at=excluded.last_updated_at",
468            params![
469                chunk_id,
470                context_key,
471                success,
472                failure,
473                positive,
474                negative,
475                now
476            ],
477        )?;
478        Ok(())
479    }
480
481    pub fn context_score(&self, chunk_id: &str, context_key: &str) -> Result<f64> {
482        let mut stmt = self.conn.prepare_cached(
483            "SELECT success_count, failure_count, positive_feedback, negative_feedback
484             FROM chunk_context_stats WHERE chunk_id=? AND context_key=?",
485        )?;
486        let row = stmt
487            .query_row(params![chunk_id, context_key], |row| {
488                Ok((
489                    row.get::<_, i64>(0)?,
490                    row.get::<_, i64>(1)?,
491                    row.get::<_, i64>(2)?,
492                    row.get::<_, i64>(3)?,
493                ))
494            })
495            .optional()?;
496        let Some((success, failure, positive, negative)) = row else {
497            return Ok(0.0);
498        };
499        Ok(context_score_from_counts(success, failure, positive, negative))
500    }
501
502    /// Batch variant of `context_score`: one query for many chunk ids under a
503    /// single context key. Chunks with no stats are absent from the map (score 0).
504    pub fn context_scores_batch(
505        &self,
506        chunk_ids: &[&str],
507        context_key: &str,
508    ) -> Result<HashMap<String, f64>> {
509        if chunk_ids.is_empty() {
510            return Ok(HashMap::new());
511        }
512        let placeholders = chunk_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
513        let sql = format!(
514            "SELECT chunk_id, success_count, failure_count, positive_feedback, negative_feedback
515             FROM chunk_context_stats
516             WHERE context_key=? AND chunk_id IN ({placeholders})"
517        );
518        let mut params: Vec<&str> = Vec::with_capacity(chunk_ids.len() + 1);
519        params.push(context_key);
520        params.extend_from_slice(chunk_ids);
521        let mut stmt = self.conn.prepare(&sql)?;
522        let rows = stmt.query_map(rusqlite::params_from_iter(params.iter()), |r| {
523            Ok((
524                r.get::<_, String>(0)?,
525                r.get::<_, i64>(1)?,
526                r.get::<_, i64>(2)?,
527                r.get::<_, i64>(3)?,
528                r.get::<_, i64>(4)?,
529            ))
530        })?;
531        let mut map = HashMap::new();
532        for row in rows {
533            let (id, success, failure, positive, negative) = row?;
534            map.insert(
535                id,
536                context_score_from_counts(success, failure, positive, negative),
537            );
538        }
539        Ok(map)
540    }
541}
542
543/// Shared scoring math for `context_score` / `context_scores_batch`.
544fn context_score_from_counts(success: i64, failure: i64, positive: i64, negative: i64) -> f64 {
545    let wins = success as f64 + positive as f64 * 2.0;
546    let losses = failure as f64 + negative as f64 * 2.0;
547    let evidence = wins + losses;
548    let posterior = (wins + 1.0) / (evidence + 2.0);
549    let evidence_weight = (evidence / 5.0).min(1.0);
550    (posterior - 0.5) * 2.0 * evidence_weight
551}