Skip to main content

innate_core/kb/record/
mod.rs

1use super::*;
2
3mod evidence;
4
5/// Parameters for [`KnowledgeBase::record`].
6///
7/// Borrowed, `Default`-able: construct with `RecordParams { trace_id, source, ..Default::default() }`
8/// and set only the fields you need. Behavioural defaults are applied inside `record`:
9/// `used_attribution` empty → `"explicit"`, `feedback_kind` empty → `"user"`,
10/// `used_complete` `None` → `true` (a complete snapshot).
11#[derive(Debug, Clone, Default)]
12pub struct RecordParams<'a> {
13    pub trace_id: &'a str,
14    pub query: Option<&'a str>,
15    pub output: Option<&'a str>,
16    pub output_summary: Option<&'a str>,
17    pub outcome: Option<&'a str>,
18    pub used: Option<&'a [String]>,
19    pub used_attribution: &'a str,
20    pub used_complete: Option<bool>,
21    pub feedback_up: Option<&'a [String]>,
22    pub feedback_down: Option<&'a [String]>,
23    pub feedback_kind: &'a str,
24    pub feedback_actor: Option<&'a str>,
25    pub feedback_reason: Option<&'a str>,
26    pub nomination: Option<&'a str>,
27    pub priority: i64,
28    pub task_state: Option<&'a str>,
29    pub source: &'a str,
30    /// 方案 H — 反事实审查:此 trace 若来自一次 appraise,且 actor **因警告回避了动作**
31    /// (没有真正采取该步,outcome 反映的是回避后的世界),则该结果不能当作直觉对错的
32    /// 证据 —— 标记为 `counterfactual_censored`,不计入校准映射 / ECE。默认 `false`
33    /// (actor 实际采取动作并观测到结果 → `observed`,计入校准)。
34    pub verdict_heeded: bool,
35}
36
37impl KnowledgeBase {
38    pub fn record(&self, params: RecordParams<'_>) -> Result<()> {
39        let RecordParams {
40            trace_id,
41            query,
42            output,
43            output_summary,
44            outcome,
45            used,
46            used_attribution,
47            used_complete,
48            feedback_up,
49            feedback_down,
50            feedback_kind,
51            feedback_actor,
52            feedback_reason,
53            nomination,
54            priority,
55            task_state,
56            source,
57            verdict_heeded,
58        } = params;
59        let used_attribution = if used_attribution.is_empty() {
60            "explicit"
61        } else {
62            used_attribution
63        };
64        let feedback_kind = if feedback_kind.is_empty() {
65            "user"
66        } else {
67            feedback_kind
68        };
69        let used_complete = used_complete.unwrap_or(true);
70        let dedupe_ids = |ids: &[String]| {
71            let mut seen = HashSet::new();
72            ids.iter()
73                .filter(|id| seen.insert((*id).clone()))
74                .cloned()
75                .collect::<Vec<_>>()
76        };
77        let normalized_used = used.map(dedupe_ids);
78        let normalized_feedback_up = feedback_up.map(dedupe_ids);
79        let normalized_feedback_down = feedback_down.map(dedupe_ids);
80        let used = normalized_used.as_deref();
81        let feedback_up = normalized_feedback_up.as_deref();
82        let feedback_down = normalized_feedback_down.as_deref();
83
84        if let Some(o) = outcome {
85            if !matches!(o, "ok" | "fail" | "unknown") {
86                return Err(InnateError::InvalidState(format!("invalid outcome: {o}")));
87            }
88        }
89        if !matches!(used_attribution, "explicit" | "cited" | "inferred") {
90            return Err(InnateError::InvalidState(format!(
91                "invalid used attribution: {used_attribution}"
92            )));
93        }
94        if !matches!(feedback_kind, "user" | "judge") {
95            return Err(InnateError::InvalidState(format!(
96                "invalid feedback kind: {feedback_kind}"
97            )));
98        }
99        if let Some(state) = task_state {
100            if !matches!(
101                state,
102                "recalled" | "running" | "completed" | "abandoned" | "timed_out"
103            ) {
104                return Err(InnateError::InvalidState(format!(
105                    "invalid task state: {state}"
106                )));
107            }
108        }
109        validate_source(source)?;
110        if let (Some(ups), Some(downs)) = (feedback_up, feedback_down) {
111            let down_set: HashSet<&str> = downs.iter().map(String::as_str).collect();
112            if let Some(chunk_id) = ups.iter().find(|id| down_set.contains(id.as_str())) {
113                return Err(InnateError::InvalidState(format!(
114                    "conflicting feedback for chunk {chunk_id}"
115                )));
116            }
117        }
118        let effective_priority = if nomination.is_some() && priority == 0 {
119            1
120        } else {
121            priority
122        };
123        let now = utc_now_iso();
124        let lib_id = self.storage.lib_id()?;
125
126        self.storage.begin_immediate()?;
127        let result = (|| -> Result<()> {
128            let log = self.storage.get_episodic_log(trace_id)?;
129            let mut is_fresh_insert = false;
130            let log = match log {
131                Some(l) => l,
132                None => {
133                    let used_ids = used.map(serde_json::to_string).transpose()?;
134                    let row = EpisodicLogRow {
135                        id: gen_uuid(),
136                        trace_id: trace_id.to_string(),
137                        lib_id,
138                        ts: now.clone(),
139                        query: query.map(str::to_string).or_else(|| Some(String::new())),
140                        output: output.map(str::to_string),
141                        output_summary: output_summary.map(str::to_string),
142                        outcome: outcome.map(str::to_string),
143                        event_source: source.to_string(),
144                        task_state: if matches!(outcome, Some("ok") | Some("fail")) {
145                            "completed".to_string()
146                        } else {
147                            task_state.unwrap_or("running").to_string()
148                        },
149                        completed_at: matches!(outcome, Some("ok") | Some("fail"))
150                            .then(|| now.clone()),
151                        usage_state: usage_state(used).to_string(),
152                        used_ids,
153                        used_attribution: used.map(|_| used_attribution.to_string()),
154                        used_complete,
155                        context_key: query.map(|q| content_hash(&normalize_query(q))),
156                        nomination: nomination.map(str::to_string),
157                        priority: effective_priority,
158                        distill_state: "open".to_string(),
159                        ..Default::default()
160                    };
161                    self.storage.upsert_episodic_log(&row)?;
162                    is_fresh_insert = true;
163                    self.storage.get_episodic_log(trace_id)?.unwrap()
164                }
165            };
166            self.validate_trace_attribution(trace_id, used, "used")?;
167            self.validate_trace_attribution(trace_id, feedback_up, "feedback_up")?;
168            self.validate_trace_attribution(trace_id, feedback_down, "feedback_down")?;
169
170            let existing_outcome = log
171                .get("outcome")
172                .and_then(Value::as_str)
173                .map(str::to_string);
174            // usage_trace: used
175            let effective_used_attribution = if used.is_some() {
176                used_attribution
177            } else {
178                log.get("used_attribution")
179                    .and_then(Value::as_str)
180                    .unwrap_or(used_attribution)
181            };
182            let used_strength = match effective_used_attribution {
183                "explicit" => 0.3,
184                "cited" => 0.25,
185                "inferred" => 0.15,
186                // The parameter is validated above; this arm is only reachable if the
187                // stored used_attribution was tampered with outside the API.
188                other => {
189                    return Err(InnateError::InvalidState(format!(
190                        "invalid stored used attribution: {other}"
191                    )))
192                }
193            };
194            let existing_used_ids: Vec<String> = log
195                .get("used_ids")
196                .and_then(Value::as_str)
197                .and_then(|raw| serde_json::from_str(raw).ok())
198                .unwrap_or_default();
199            let existing_used_complete = log
200                .get("used_complete")
201                .and_then(Value::as_i64)
202                .unwrap_or(0)
203                != 0;
204            let effective_used_complete = used_complete || existing_used_complete;
205            let effective_used_ids = used.map(|reported| {
206                if used_complete {
207                    reported.to_vec()
208                } else {
209                    let mut merged = existing_used_ids.clone();
210                    let mut seen: HashSet<String> = merged.iter().cloned().collect();
211                    merged.extend(
212                        reported
213                            .iter()
214                            .filter(|id| seen.insert((*id).clone()))
215                            .cloned(),
216                    );
217                    merged
218                }
219            });
220            if let Some(used_ids) = effective_used_ids.as_deref() {
221                let previously_used: HashSet<String> = existing_used_ids.iter().cloned().collect();
222                if used_complete {
223                    self.storage.replace_used_trace(
224                        trace_id,
225                        used_ids,
226                        used_strength,
227                        used_attribution,
228                        source,
229                        &now,
230                    )?;
231                } else if let Some(reported) = used {
232                    self.storage.merge_used_trace(
233                        trace_id,
234                        reported,
235                        used_strength,
236                        used_attribution,
237                        source,
238                        &now,
239                    )?;
240                }
241                let affected: HashSet<String> = previously_used
242                    .into_iter()
243                    .chain(used_ids.iter().cloned())
244                    .collect();
245                for cid in affected {
246                    self.storage.refresh_chunk_last_used(&cid, &now)?;
247                }
248            }
249
250            // usage_trace: task_ok / task_fail
251            if let Some(o) = outcome {
252                if matches!(o, "ok" | "fail") {
253                    let event = if o == "ok" { "task_ok" } else { "task_fail" };
254                    let strength = if event == "task_fail" { 0.15 } else { 1.0 };
255                    self.storage.conn_execute(
256                        "DELETE FROM usage_trace
257                         WHERE trace_id=? AND event IN ('task_ok','task_fail')
258                           AND chunk_id IS NULL",
259                        rusqlite::params![trace_id],
260                    )?;
261                    self.storage.insert_usage_trace(
262                        trace_id, None, event, strength, None, None, None, None, None, source, &now,
263                    )?;
264                    // 方案 B/H:回填 verdict_log 的实际结果(若该 trace 是一次 appraise)。
265                    // observed_outcome = +1 坏结果发生(fail) / -1 好结果(ok)。
266                    // provenance='observed':actor 实际采取动作并观测到结果,计入校准。
267                    // verdict_heeded=true:因警告回避了动作,outcome 是反事实,不计校准
268                    // (counterfactual_censored,见原则 3 / 方案 H)。
269                    let observed_outcome = if event == "task_fail" { 1.0 } else { -1.0 };
270                    let provenance = if verdict_heeded {
271                        "counterfactual_censored"
272                    } else {
273                        "observed"
274                    };
275                    self.storage.backfill_verdict_outcome(
276                        trace_id,
277                        observed_outcome,
278                        provenance,
279                        &now,
280                    )?;
281                }
282            }
283
284            // Rebuild trace-derived evidence whenever either side of the pair arrives.
285            // This makes `outcome → used` and `used → outcome` equivalent and lets a
286            // later complete usage declaration replace an earlier one safely.
287            let effective_outcome =
288                outcome
289                    .filter(|value| *value != "unknown")
290                    .or(existing_outcome
291                        .as_deref()
292                        .filter(|value| *value != "unknown"));
293            if let Some(o @ ("ok" | "fail")) = effective_outcome {
294                if used.is_some()
295                    || (outcome.is_some_and(|value| value != "unknown")
296                        && existing_outcome.as_deref() != outcome)
297                {
298                    let fallback_ids: Vec<String>;
299                    let effective_used: Option<&[String]> = if effective_used_ids.is_some() {
300                        effective_used_ids.as_deref()
301                    } else {
302                        fallback_ids = log
303                            .get("used_ids")
304                            .and_then(Value::as_str)
305                            .and_then(|s| serde_json::from_str(s).ok())
306                            .unwrap_or_default();
307                        if fallback_ids.is_empty() {
308                            None
309                        } else {
310                            Some(&fallback_ids)
311                        }
312                    };
313                    let effective_complete = if used.is_some() {
314                        effective_used_complete
315                    } else {
316                        log.get("usage_state").and_then(Value::as_str) != Some("unknown")
317                            && log
318                                .get("used_complete")
319                                .and_then(Value::as_i64)
320                                .unwrap_or(1)
321                                != 0
322                    };
323                    self.replace_outcome_evidence(
324                        trace_id,
325                        o,
326                        effective_used,
327                        effective_complete,
328                        &now,
329                    )?;
330                }
331            } else if used.is_some() && effective_used_complete {
332                self.replace_selected_unused_evidence(
333                    trace_id,
334                    effective_used_ids.as_deref().unwrap_or_default(),
335                    &now,
336                )?;
337            }
338
339            let context_key = log
340                .get("context_key")
341                .and_then(Value::as_str)
342                .map(str::to_string)
343                .or_else(|| query.map(|q| content_hash(&normalize_query(q))));
344            let feedback_strength = if feedback_kind == "judge" { 0.6 } else { 1.0 };
345
346            // Persist feedback facts before reducing them into confidence.
347            // INSERT OR IGNORE: skip all derived updates for duplicate (trace_id, chunk_id, signal).
348            // Track affected chunks so we only rebuild their context_stats (not the full table).
349            let mut context_affected: HashSet<String> = HashSet::new();
350            if let Some(used_ids) = effective_used_ids.as_deref() {
351                for cid in used_ids {
352                    context_affected.insert(cid.clone());
353                }
354            }
355            if let Some(ups) = feedback_up {
356                for cid in ups {
357                    let corrected = self.storage.delete_feedback_event(trace_id, cid, "down")?;
358                    self.storage.delete_chunk_trace_confidence_evidence(
359                        trace_id,
360                        cid,
361                        "feedback_down",
362                    )?;
363                    let inserted = self.storage.insert_feedback_event(
364                        &gen_uuid(),
365                        trace_id,
366                        cid,
367                        "up",
368                        feedback_strength,
369                        source,
370                        feedback_actor,
371                        feedback_reason,
372                        context_key.as_deref(),
373                        &now,
374                    )?;
375                    if inserted > 0 {
376                        self.upsert_trace_confidence_evidence(
377                            trace_id,
378                            cid,
379                            "feedback_up",
380                            1.0,
381                            feedback_strength,
382                            if feedback_kind == "judge" {
383                                "judge_up"
384                            } else {
385                                "user_up"
386                            },
387                            context_key.as_deref(),
388                            &now,
389                            true,
390                        )?;
391                        self.storage.update_chunk_last_used(cid, &now)?;
392                        self.refresh_governance_evidence(cid, &now)?;
393                        context_affected.insert(cid.clone());
394                    } else if corrected > 0 {
395                        self.recompute_chunk_confidence(cid, &now)?;
396                        self.refresh_governance_evidence(cid, &now)?;
397                        context_affected.insert(cid.clone());
398                    }
399                }
400            }
401            if let Some(downs) = feedback_down {
402                for cid in downs {
403                    let corrected = self.storage.delete_feedback_event(trace_id, cid, "up")?;
404                    self.storage.delete_chunk_trace_confidence_evidence(
405                        trace_id,
406                        cid,
407                        "feedback_up",
408                    )?;
409                    let inserted = self.storage.insert_feedback_event(
410                        &gen_uuid(),
411                        trace_id,
412                        cid,
413                        "down",
414                        feedback_strength,
415                        source,
416                        feedback_actor,
417                        feedback_reason,
418                        context_key.as_deref(),
419                        &now,
420                    )?;
421                    if inserted > 0 {
422                        self.upsert_trace_confidence_evidence(
423                            trace_id,
424                            cid,
425                            "feedback_down",
426                            0.0,
427                            feedback_strength,
428                            if feedback_kind == "judge" {
429                                "judge_down"
430                            } else {
431                                "user_down"
432                            },
433                            context_key.as_deref(),
434                            &now,
435                            true,
436                        )?;
437                        self.refresh_governance_evidence(cid, &now)?;
438                        context_affected.insert(cid.clone());
439                    } else if corrected > 0 {
440                        self.recompute_chunk_confidence(cid, &now)?;
441                        self.refresh_governance_evidence(cid, &now)?;
442                        context_affected.insert(cid.clone());
443                    }
444                }
445            }
446            // Targeted rebuild — only update context_stats for chunks touched in this call.
447            self.rebuild_context_stats_for(&context_affected, &now)?;
448
449            // Fill in content fields (補写: output_summary, nomination, output, query) on existing log.
450            if !is_fresh_insert {
451                self.storage.patch_episodic_log_content(
452                    trace_id,
453                    query,
454                    output,
455                    output_summary,
456                    nomination,
457                    effective_priority,
458                )?;
459            }
460
461            let lifecycle_state = if effective_outcome.is_some() {
462                "completed"
463            } else {
464                task_state.unwrap_or_else(|| {
465                    log.get("task_state")
466                        .and_then(Value::as_str)
467                        .unwrap_or("running")
468                })
469            };
470            let used_ids_json = effective_used_ids
471                .as_deref()
472                .map(serde_json::to_string)
473                .transpose()?;
474            self.storage.update_trace_lifecycle(
475                trace_id,
476                lifecycle_state,
477                (lifecycle_state == "completed").then_some(now.as_str()),
478                effective_used_ids
479                    .as_deref()
480                    .map(|ids| usage_state(Some(ids))),
481                used_ids_json.as_deref(),
482                used.map(|_| used_attribution),
483                used.map(|_| effective_used_complete),
484            )?;
485
486            // Update episodic log
487            let current_state = log
488                .get("distill_state")
489                .and_then(Value::as_str)
490                .unwrap_or("open");
491            let lifecycle_completed = lifecycle_state == "completed";
492            let has_material = output_summary.is_some()
493                || nomination.is_some()
494                || output.is_some()
495                || log.get("output_summary").and_then(Value::as_str).is_some()
496                || log.get("nomination").and_then(Value::as_str).is_some()
497                || log.get("output").and_then(Value::as_str).is_some();
498            let retryable_discard = current_state == "discarded"
499                && matches!(
500                    log.get("distill_note").and_then(Value::as_str),
501                    Some("insufficient_material" | "abandoned" | "timed_out")
502                );
503            let new_state = if current_state == "open"
504                && matches!(lifecycle_state, "abandoned" | "timed_out")
505            {
506                Some("discarded")
507            } else if lifecycle_completed && (current_state == "open" || retryable_discard) {
508                if has_material {
509                    Some("new")
510                } else {
511                    Some("discarded")
512                }
513            } else {
514                None
515            };
516            if let Some(state) = new_state {
517                let note = if state == "discarded" {
518                    Some(if matches!(lifecycle_state, "abandoned" | "timed_out") {
519                        lifecycle_state
520                    } else {
521                        "insufficient_material"
522                    })
523                } else {
524                    None
525                };
526                let outcome_str = outcome.map(str::to_string);
527                self.storage.update_episodic_log_state(
528                    trace_id,
529                    state,
530                    note,
531                    outcome_str.as_deref(),
532                )?;
533                if state == "new" && retryable_discard {
534                    self.storage.conn_execute(
535                        "UPDATE episodic_log SET distill_note=NULL WHERE trace_id=?",
536                        rusqlite::params![trace_id],
537                    )?;
538                }
539            } else if outcome.is_some() {
540                let outcome_str = outcome.map(str::to_string);
541                self.storage.update_episodic_log_state(
542                    trace_id,
543                    current_state,
544                    None,
545                    outcome_str.as_deref(),
546                )?;
547            }
548
549            self.storage.commit()
550        })();
551        if result.is_err() {
552            let _ = self.storage.rollback();
553        }
554        result?;
555        self.enqueue_evolve_if_needed(&now)?;
556        Ok(())
557    }
558}