Skip to main content

innate_core/kb/record/
mod.rs

1use super::*;
2
3mod evidence;
4
5/// Parameters for [`KnowledgeBase::record`].
6///
7/// Borrowed, `Default`-able: construct with `RecordParams { trace_id, source, ..Default::default() }`
8/// and set only the fields you need. Behavioural defaults are applied inside `record`:
9/// `used_attribution` empty → `"explicit"`, `feedback_kind` empty → `"user"`,
10/// `used_complete` `None` → `true` (a complete snapshot).
11#[derive(Debug, Clone, Default)]
12pub struct RecordParams<'a> {
13    pub trace_id: &'a str,
14    pub query: Option<&'a str>,
15    pub output: Option<&'a str>,
16    pub output_summary: Option<&'a str>,
17    pub outcome: Option<&'a str>,
18    pub used: Option<&'a [String]>,
19    pub used_attribution: &'a str,
20    pub used_complete: Option<bool>,
21    pub feedback_up: Option<&'a [String]>,
22    pub feedback_down: Option<&'a [String]>,
23    pub feedback_kind: &'a str,
24    pub feedback_actor: Option<&'a str>,
25    pub feedback_reason: Option<&'a str>,
26    pub nomination: Option<&'a str>,
27    pub priority: i64,
28    pub task_state: Option<&'a str>,
29    pub source: &'a str,
30    /// 方案 H — 反事实审查:此 trace 若来自一次 appraise,且 actor **因警告回避了动作**
31    /// (没有真正采取该步,outcome 反映的是回避后的世界),则该结果不能当作直觉对错的
32    /// 证据 —— 标记为 `counterfactual_censored`,不计入校准映射 / ECE。默认 `false`
33    /// (actor 实际采取动作并观测到结果 → `observed`,计入校准)。
34    pub verdict_heeded: bool,
35}
36
37impl KnowledgeBase {
38    pub fn record(&self, params: RecordParams<'_>) -> Result<()> {
39        let RecordParams {
40            trace_id,
41            query,
42            output,
43            output_summary,
44            outcome,
45            used,
46            used_attribution,
47            used_complete,
48            feedback_up,
49            feedback_down,
50            feedback_kind,
51            feedback_actor,
52            feedback_reason,
53            nomination,
54            priority,
55            task_state,
56            source,
57            verdict_heeded,
58        } = params;
59        let used_attribution = if used_attribution.is_empty() {
60            "explicit"
61        } else {
62            used_attribution
63        };
64        let feedback_kind = if feedback_kind.is_empty() {
65            "user"
66        } else {
67            feedback_kind
68        };
69        let used_complete = used_complete.unwrap_or(true);
70        let dedupe_ids = |ids: &[String]| {
71            let mut seen = HashSet::new();
72            ids.iter()
73                .filter(|id| seen.insert((*id).clone()))
74                .cloned()
75                .collect::<Vec<_>>()
76        };
77        let normalized_used = used.map(dedupe_ids);
78        let normalized_feedback_up = feedback_up.map(dedupe_ids);
79        let normalized_feedback_down = feedback_down.map(dedupe_ids);
80        let used = normalized_used.as_deref();
81        let feedback_up = normalized_feedback_up.as_deref();
82        let feedback_down = normalized_feedback_down.as_deref();
83
84        if let Some(o) = outcome {
85            if !matches!(o, "ok" | "fail" | "unknown") {
86                return Err(InnateError::InvalidState(format!("invalid outcome: {o}")));
87            }
88        }
89        if !matches!(used_attribution, "explicit" | "cited" | "inferred") {
90            return Err(InnateError::InvalidState(format!(
91                "invalid used attribution: {used_attribution}"
92            )));
93        }
94        if !matches!(feedback_kind, "user" | "judge") {
95            return Err(InnateError::InvalidState(format!(
96                "invalid feedback kind: {feedback_kind}"
97            )));
98        }
99        if let Some(state) = task_state {
100            if !matches!(
101                state,
102                "recalled" | "running" | "completed" | "abandoned" | "timed_out"
103            ) {
104                return Err(InnateError::InvalidState(format!(
105                    "invalid task state: {state}"
106                )));
107            }
108        }
109        validate_source(source)?;
110        if let (Some(ups), Some(downs)) = (feedback_up, feedback_down) {
111            let down_set: HashSet<&str> = downs.iter().map(String::as_str).collect();
112            if let Some(chunk_id) = ups.iter().find(|id| down_set.contains(id.as_str())) {
113                return Err(InnateError::InvalidState(format!(
114                    "conflicting feedback for chunk {chunk_id}"
115                )));
116            }
117        }
118        let effective_priority = if nomination.is_some() && priority == 0 {
119            1
120        } else {
121            priority
122        };
123        let now = utc_now_iso();
124        let lib_id = self.storage.lib_id()?;
125
126        self.storage.begin_immediate()?;
127        let result = (|| -> Result<()> {
128            let log = self.storage.get_episodic_log(trace_id)?;
129            let mut is_fresh_insert = false;
130            let log = match log {
131                Some(l) => l,
132                None => {
133                    let used_ids = used.map(serde_json::to_string).transpose()?;
134                    let row = EpisodicLogRow {
135                        id: gen_uuid(),
136                        trace_id: trace_id.to_string(),
137                        lib_id,
138                        ts: now.clone(),
139                        query: query.map(str::to_string).or_else(|| Some(String::new())),
140                        output: output.map(str::to_string),
141                        output_summary: output_summary.map(str::to_string),
142                        outcome: outcome.map(str::to_string),
143                        event_source: source.to_string(),
144                        agent: agent_source(),
145                        task_state: if matches!(outcome, Some("ok") | Some("fail")) {
146                            "completed".to_string()
147                        } else {
148                            task_state.unwrap_or("running").to_string()
149                        },
150                        completed_at: matches!(outcome, Some("ok") | Some("fail"))
151                            .then(|| now.clone()),
152                        usage_state: usage_state(used).to_string(),
153                        used_ids,
154                        used_attribution: used.map(|_| used_attribution.to_string()),
155                        used_complete,
156                        context_key: query.map(|q| content_hash(&normalize_query(q))),
157                        nomination: nomination.map(str::to_string),
158                        priority: effective_priority,
159                        distill_state: "open".to_string(),
160                        ..Default::default()
161                    };
162                    self.storage.upsert_episodic_log(&row)?;
163                    is_fresh_insert = true;
164                    self.storage.get_episodic_log(trace_id)?.unwrap()
165                }
166            };
167            self.validate_trace_attribution(trace_id, used, "used")?;
168            self.validate_trace_attribution(trace_id, feedback_up, "feedback_up")?;
169            self.validate_trace_attribution(trace_id, feedback_down, "feedback_down")?;
170
171            let existing_outcome = log
172                .get("outcome")
173                .and_then(Value::as_str)
174                .map(str::to_string);
175            // usage_trace: used
176            let effective_used_attribution = if used.is_some() {
177                used_attribution
178            } else {
179                log.get("used_attribution")
180                    .and_then(Value::as_str)
181                    .unwrap_or(used_attribution)
182            };
183            let used_strength = match effective_used_attribution {
184                "explicit" => 0.3,
185                "cited" => 0.25,
186                "inferred" => 0.15,
187                // The parameter is validated above; this arm is only reachable if the
188                // stored used_attribution was tampered with outside the API.
189                other => {
190                    return Err(InnateError::InvalidState(format!(
191                        "invalid stored used attribution: {other}"
192                    )))
193                }
194            };
195            let existing_used_ids: Vec<String> = log
196                .get("used_ids")
197                .and_then(Value::as_str)
198                .and_then(|raw| serde_json::from_str(raw).ok())
199                .unwrap_or_default();
200            let existing_used_complete = log
201                .get("used_complete")
202                .and_then(Value::as_i64)
203                .unwrap_or(0)
204                != 0;
205            let effective_used_complete = used_complete || existing_used_complete;
206            let effective_used_ids = used.map(|reported| {
207                if used_complete {
208                    reported.to_vec()
209                } else {
210                    let mut merged = existing_used_ids.clone();
211                    let mut seen: HashSet<String> = merged.iter().cloned().collect();
212                    merged.extend(
213                        reported
214                            .iter()
215                            .filter(|id| seen.insert((*id).clone()))
216                            .cloned(),
217                    );
218                    merged
219                }
220            });
221            if let Some(used_ids) = effective_used_ids.as_deref() {
222                let previously_used: HashSet<String> = existing_used_ids.iter().cloned().collect();
223                if used_complete {
224                    self.storage.replace_used_trace(
225                        trace_id,
226                        used_ids,
227                        used_strength,
228                        used_attribution,
229                        source,
230                        &now,
231                    )?;
232                } else if let Some(reported) = used {
233                    self.storage.merge_used_trace(
234                        trace_id,
235                        reported,
236                        used_strength,
237                        used_attribution,
238                        source,
239                        &now,
240                    )?;
241                }
242                let affected: HashSet<String> = previously_used
243                    .into_iter()
244                    .chain(used_ids.iter().cloned())
245                    .collect();
246                for cid in affected {
247                    self.storage.refresh_chunk_last_used(&cid, &now)?;
248                }
249            }
250
251            // usage_trace: task_ok / task_fail
252            if let Some(o) = outcome {
253                if matches!(o, "ok" | "fail") {
254                    let event = if o == "ok" { "task_ok" } else { "task_fail" };
255                    let strength = if event == "task_fail" { 0.15 } else { 1.0 };
256                    self.storage.conn_execute(
257                        "DELETE FROM usage_trace
258                         WHERE trace_id=? AND event IN ('task_ok','task_fail')
259                           AND chunk_id IS NULL",
260                        rusqlite::params![trace_id],
261                    )?;
262                    self.storage.insert_usage_trace(
263                        trace_id, None, event, strength, None, None, None, None, None, source, &now,
264                    )?;
265                    // 方案 B/H:回填 verdict_log 的实际结果(若该 trace 是一次 appraise)。
266                    // observed_outcome = +1 坏结果发生(fail) / -1 好结果(ok)。
267                    // provenance='observed':actor 实际采取动作并观测到结果,计入校准。
268                    // verdict_heeded=true:因警告回避了动作,outcome 是反事实,不计校准
269                    // (counterfactual_censored,见原则 3 / 方案 H)。
270                    let observed_outcome = if event == "task_fail" { 1.0 } else { -1.0 };
271                    let provenance = if verdict_heeded {
272                        "counterfactual_censored"
273                    } else {
274                        "observed"
275                    };
276                    self.storage.backfill_verdict_outcome(
277                        trace_id,
278                        observed_outcome,
279                        provenance,
280                        &now,
281                    )?;
282                }
283            }
284
285            // Rebuild trace-derived evidence whenever either side of the pair arrives.
286            // This makes `outcome → used` and `used → outcome` equivalent and lets a
287            // later complete usage declaration replace an earlier one safely.
288            let effective_outcome =
289                outcome
290                    .filter(|value| *value != "unknown")
291                    .or(existing_outcome
292                        .as_deref()
293                        .filter(|value| *value != "unknown"));
294            if let Some(o @ ("ok" | "fail")) = effective_outcome {
295                if used.is_some()
296                    || (outcome.is_some_and(|value| value != "unknown")
297                        && existing_outcome.as_deref() != outcome)
298                {
299                    let fallback_ids: Vec<String>;
300                    let effective_used: Option<&[String]> = if effective_used_ids.is_some() {
301                        effective_used_ids.as_deref()
302                    } else {
303                        fallback_ids = log
304                            .get("used_ids")
305                            .and_then(Value::as_str)
306                            .and_then(|s| serde_json::from_str(s).ok())
307                            .unwrap_or_default();
308                        if fallback_ids.is_empty() {
309                            None
310                        } else {
311                            Some(&fallback_ids)
312                        }
313                    };
314                    let effective_complete = if used.is_some() {
315                        effective_used_complete
316                    } else {
317                        log.get("usage_state").and_then(Value::as_str) != Some("unknown")
318                            && log
319                                .get("used_complete")
320                                .and_then(Value::as_i64)
321                                .unwrap_or(1)
322                                != 0
323                    };
324                    self.replace_outcome_evidence(
325                        trace_id,
326                        o,
327                        effective_used,
328                        effective_complete,
329                        &now,
330                    )?;
331                }
332            } else if used.is_some() && effective_used_complete {
333                self.replace_selected_unused_evidence(
334                    trace_id,
335                    effective_used_ids.as_deref().unwrap_or_default(),
336                    &now,
337                )?;
338            }
339
340            let context_key = log
341                .get("context_key")
342                .and_then(Value::as_str)
343                .map(str::to_string)
344                .or_else(|| query.map(|q| content_hash(&normalize_query(q))));
345            let feedback_strength = if feedback_kind == "judge" { 0.6 } else { 1.0 };
346
347            // Persist feedback facts before reducing them into confidence.
348            // INSERT OR IGNORE: skip all derived updates for duplicate (trace_id, chunk_id, signal).
349            // Track affected chunks so we only rebuild their context_stats (not the full table).
350            let mut context_affected: HashSet<String> = HashSet::new();
351            if let Some(used_ids) = effective_used_ids.as_deref() {
352                for cid in used_ids {
353                    context_affected.insert(cid.clone());
354                }
355            }
356            if let Some(ups) = feedback_up {
357                for cid in ups {
358                    let corrected = self.storage.delete_feedback_event(trace_id, cid, "down")?;
359                    self.storage.delete_chunk_trace_confidence_evidence(
360                        trace_id,
361                        cid,
362                        "feedback_down",
363                    )?;
364                    let inserted = self.storage.insert_feedback_event(
365                        &gen_uuid(),
366                        trace_id,
367                        cid,
368                        "up",
369                        feedback_strength,
370                        source,
371                        feedback_actor,
372                        feedback_reason,
373                        context_key.as_deref(),
374                        &now,
375                    )?;
376                    if inserted > 0 {
377                        self.upsert_trace_confidence_evidence(
378                            trace_id,
379                            cid,
380                            "feedback_up",
381                            1.0,
382                            feedback_strength,
383                            if feedback_kind == "judge" {
384                                "judge_up"
385                            } else {
386                                "user_up"
387                            },
388                            context_key.as_deref(),
389                            &now,
390                            true,
391                        )?;
392                        self.storage.update_chunk_last_used(cid, &now)?;
393                        self.refresh_governance_evidence(cid, &now)?;
394                        context_affected.insert(cid.clone());
395                    } else if corrected > 0 {
396                        self.recompute_chunk_confidence(cid, &now)?;
397                        self.refresh_governance_evidence(cid, &now)?;
398                        context_affected.insert(cid.clone());
399                    }
400                }
401            }
402            if let Some(downs) = feedback_down {
403                for cid in downs {
404                    let corrected = self.storage.delete_feedback_event(trace_id, cid, "up")?;
405                    self.storage.delete_chunk_trace_confidence_evidence(
406                        trace_id,
407                        cid,
408                        "feedback_up",
409                    )?;
410                    let inserted = self.storage.insert_feedback_event(
411                        &gen_uuid(),
412                        trace_id,
413                        cid,
414                        "down",
415                        feedback_strength,
416                        source,
417                        feedback_actor,
418                        feedback_reason,
419                        context_key.as_deref(),
420                        &now,
421                    )?;
422                    if inserted > 0 {
423                        self.upsert_trace_confidence_evidence(
424                            trace_id,
425                            cid,
426                            "feedback_down",
427                            0.0,
428                            feedback_strength,
429                            if feedback_kind == "judge" {
430                                "judge_down"
431                            } else {
432                                "user_down"
433                            },
434                            context_key.as_deref(),
435                            &now,
436                            true,
437                        )?;
438                        self.refresh_governance_evidence(cid, &now)?;
439                        context_affected.insert(cid.clone());
440                    } else if corrected > 0 {
441                        self.recompute_chunk_confidence(cid, &now)?;
442                        self.refresh_governance_evidence(cid, &now)?;
443                        context_affected.insert(cid.clone());
444                    }
445                }
446            }
447            // Targeted rebuild — only update context_stats for chunks touched in this call.
448            self.rebuild_context_stats_for(&context_affected, &now)?;
449
450            // Fill in content fields (補写: output_summary, nomination, output, query) on existing log.
451            if !is_fresh_insert {
452                self.storage.patch_episodic_log_content(
453                    trace_id,
454                    query,
455                    output,
456                    output_summary,
457                    nomination,
458                    effective_priority,
459                )?;
460            }
461
462            let lifecycle_state = if effective_outcome.is_some() {
463                "completed"
464            } else {
465                task_state.unwrap_or_else(|| {
466                    log.get("task_state")
467                        .and_then(Value::as_str)
468                        .unwrap_or("running")
469                })
470            };
471            let used_ids_json = effective_used_ids
472                .as_deref()
473                .map(serde_json::to_string)
474                .transpose()?;
475            self.storage.update_trace_lifecycle(
476                trace_id,
477                lifecycle_state,
478                (lifecycle_state == "completed").then_some(now.as_str()),
479                effective_used_ids
480                    .as_deref()
481                    .map(|ids| usage_state(Some(ids))),
482                used_ids_json.as_deref(),
483                used.map(|_| used_attribution),
484                used.map(|_| effective_used_complete),
485            )?;
486
487            // Update episodic log
488            let current_state = log
489                .get("distill_state")
490                .and_then(Value::as_str)
491                .unwrap_or("open");
492            let lifecycle_completed = lifecycle_state == "completed";
493            let has_material = output_summary.is_some()
494                || nomination.is_some()
495                || output.is_some()
496                || log.get("output_summary").and_then(Value::as_str).is_some()
497                || log.get("nomination").and_then(Value::as_str).is_some()
498                || log.get("output").and_then(Value::as_str).is_some();
499            let retryable_discard = current_state == "discarded"
500                && matches!(
501                    log.get("distill_note").and_then(Value::as_str),
502                    Some("insufficient_material" | "abandoned" | "timed_out")
503                );
504            let new_state = if current_state == "open"
505                && matches!(lifecycle_state, "abandoned" | "timed_out")
506            {
507                Some("discarded")
508            } else if lifecycle_completed && (current_state == "open" || retryable_discard) {
509                if has_material {
510                    Some("new")
511                } else {
512                    Some("discarded")
513                }
514            } else {
515                None
516            };
517            if let Some(state) = new_state {
518                let note = if state == "discarded" {
519                    Some(if matches!(lifecycle_state, "abandoned" | "timed_out") {
520                        lifecycle_state
521                    } else {
522                        "insufficient_material"
523                    })
524                } else {
525                    None
526                };
527                let outcome_str = outcome.map(str::to_string);
528                self.storage.update_episodic_log_state(
529                    trace_id,
530                    state,
531                    note,
532                    outcome_str.as_deref(),
533                )?;
534                if state == "new" && retryable_discard {
535                    self.storage.conn_execute(
536                        "UPDATE episodic_log SET distill_note=NULL WHERE trace_id=?",
537                        rusqlite::params![trace_id],
538                    )?;
539                }
540            } else if outcome.is_some() {
541                let outcome_str = outcome.map(str::to_string);
542                self.storage.update_episodic_log_state(
543                    trace_id,
544                    current_state,
545                    None,
546                    outcome_str.as_deref(),
547                )?;
548            }
549
550            self.storage.commit()
551        })();
552        if result.is_err() {
553            let _ = self.storage.rollback();
554        }
555        result?;
556        self.enqueue_evolve_if_needed(&now)?;
557        Ok(())
558    }
559}