Skip to main content

innate_core/kb/record/
mod.rs

1use super::*;
2
3mod evidence;
4
5/// Parameters for [`KnowledgeBase::record`].
6///
7/// Borrowed, `Default`-able: construct with `RecordParams { trace_id, source, ..Default::default() }`
8/// and set only the fields you need. Behavioural defaults are applied inside `record`:
9/// `used_attribution` empty → `"explicit"`, `feedback_kind` empty → `"user"`,
10/// `used_complete` `None` → `true` (a complete snapshot).
11#[derive(Debug, Clone, Default)]
12pub struct RecordParams<'a> {
13    pub trace_id: &'a str,
14    pub query: Option<&'a str>,
15    pub output: Option<&'a str>,
16    pub output_summary: Option<&'a str>,
17    pub outcome: Option<&'a str>,
18    pub used: Option<&'a [String]>,
19    pub used_attribution: &'a str,
20    pub used_complete: Option<bool>,
21    pub feedback_up: Option<&'a [String]>,
22    pub feedback_down: Option<&'a [String]>,
23    pub feedback_kind: &'a str,
24    pub feedback_actor: Option<&'a str>,
25    pub feedback_reason: Option<&'a str>,
26    pub nomination: Option<&'a str>,
27    pub priority: i64,
28    pub task_state: Option<&'a str>,
29    pub source: &'a str,
30}
31
32impl KnowledgeBase {
33    pub fn record(&self, params: RecordParams<'_>) -> Result<()> {
34        let RecordParams {
35            trace_id,
36            query,
37            output,
38            output_summary,
39            outcome,
40            used,
41            used_attribution,
42            used_complete,
43            feedback_up,
44            feedback_down,
45            feedback_kind,
46            feedback_actor,
47            feedback_reason,
48            nomination,
49            priority,
50            task_state,
51            source,
52        } = params;
53        let used_attribution = if used_attribution.is_empty() {
54            "explicit"
55        } else {
56            used_attribution
57        };
58        let feedback_kind = if feedback_kind.is_empty() {
59            "user"
60        } else {
61            feedback_kind
62        };
63        let used_complete = used_complete.unwrap_or(true);
64        let dedupe_ids = |ids: &[String]| {
65            let mut seen = HashSet::new();
66            ids.iter()
67                .filter(|id| seen.insert((*id).clone()))
68                .cloned()
69                .collect::<Vec<_>>()
70        };
71        let normalized_used = used.map(dedupe_ids);
72        let normalized_feedback_up = feedback_up.map(dedupe_ids);
73        let normalized_feedback_down = feedback_down.map(dedupe_ids);
74        let used = normalized_used.as_deref();
75        let feedback_up = normalized_feedback_up.as_deref();
76        let feedback_down = normalized_feedback_down.as_deref();
77
78        if let Some(o) = outcome {
79            if !matches!(o, "ok" | "fail" | "unknown") {
80                return Err(InnateError::InvalidState(format!("invalid outcome: {o}")));
81            }
82        }
83        if !matches!(used_attribution, "explicit" | "cited" | "inferred") {
84            return Err(InnateError::InvalidState(format!(
85                "invalid used attribution: {used_attribution}"
86            )));
87        }
88        if !matches!(feedback_kind, "user" | "judge") {
89            return Err(InnateError::InvalidState(format!(
90                "invalid feedback kind: {feedback_kind}"
91            )));
92        }
93        if let Some(state) = task_state {
94            if !matches!(
95                state,
96                "recalled" | "running" | "completed" | "abandoned" | "timed_out"
97            ) {
98                return Err(InnateError::InvalidState(format!(
99                    "invalid task state: {state}"
100                )));
101            }
102        }
103        validate_source(source)?;
104        if let (Some(ups), Some(downs)) = (feedback_up, feedback_down) {
105            let down_set: HashSet<&str> = downs.iter().map(String::as_str).collect();
106            if let Some(chunk_id) = ups.iter().find(|id| down_set.contains(id.as_str())) {
107                return Err(InnateError::InvalidState(format!(
108                    "conflicting feedback for chunk {chunk_id}"
109                )));
110            }
111        }
112        let effective_priority = if nomination.is_some() && priority == 0 {
113            1
114        } else {
115            priority
116        };
117        let now = utc_now_iso();
118        let lib_id = self.storage.lib_id()?;
119
120        self.storage.begin_immediate()?;
121        let result = (|| -> Result<()> {
122            let log = self.storage.get_episodic_log(trace_id)?;
123            let mut is_fresh_insert = false;
124            let log = match log {
125                Some(l) => l,
126                None => {
127                    let used_ids = used.map(serde_json::to_string).transpose()?;
128                    let row = EpisodicLogRow {
129                        id: gen_uuid(),
130                        trace_id: trace_id.to_string(),
131                        lib_id,
132                        ts: now.clone(),
133                        query: query.map(str::to_string).or_else(|| Some(String::new())),
134                        output: output.map(str::to_string),
135                        output_summary: output_summary.map(str::to_string),
136                        outcome: outcome.map(str::to_string),
137                        event_source: source.to_string(),
138                        task_state: if matches!(outcome, Some("ok") | Some("fail")) {
139                            "completed".to_string()
140                        } else {
141                            task_state.unwrap_or("running").to_string()
142                        },
143                        completed_at: matches!(outcome, Some("ok") | Some("fail"))
144                            .then(|| now.clone()),
145                        usage_state: usage_state(used).to_string(),
146                        used_ids,
147                        used_attribution: used.map(|_| used_attribution.to_string()),
148                        used_complete,
149                        context_key: query.map(|q| content_hash(&normalize_query(q))),
150                        nomination: nomination.map(str::to_string),
151                        priority: effective_priority,
152                        distill_state: "open".to_string(),
153                        ..Default::default()
154                    };
155                    self.storage.upsert_episodic_log(&row)?;
156                    is_fresh_insert = true;
157                    self.storage.get_episodic_log(trace_id)?.unwrap()
158                }
159            };
160            self.validate_trace_attribution(trace_id, used, "used")?;
161            self.validate_trace_attribution(trace_id, feedback_up, "feedback_up")?;
162            self.validate_trace_attribution(trace_id, feedback_down, "feedback_down")?;
163
164            let existing_outcome = log
165                .get("outcome")
166                .and_then(Value::as_str)
167                .map(str::to_string);
168            // usage_trace: used
169            let effective_used_attribution = if used.is_some() {
170                used_attribution
171            } else {
172                log.get("used_attribution")
173                    .and_then(Value::as_str)
174                    .unwrap_or(used_attribution)
175            };
176            let used_strength = match effective_used_attribution {
177                "explicit" => 0.3,
178                "cited" => 0.25,
179                "inferred" => 0.15,
180                // The parameter is validated above; this arm is only reachable if the
181                // stored used_attribution was tampered with outside the API.
182                other => {
183                    return Err(InnateError::InvalidState(format!(
184                        "invalid stored used attribution: {other}"
185                    )))
186                }
187            };
188            let existing_used_ids: Vec<String> = log
189                .get("used_ids")
190                .and_then(Value::as_str)
191                .and_then(|raw| serde_json::from_str(raw).ok())
192                .unwrap_or_default();
193            let existing_used_complete = log
194                .get("used_complete")
195                .and_then(Value::as_i64)
196                .unwrap_or(0)
197                != 0;
198            let effective_used_complete = used_complete || existing_used_complete;
199            let effective_used_ids = used.map(|reported| {
200                if used_complete {
201                    reported.to_vec()
202                } else {
203                    let mut merged = existing_used_ids.clone();
204                    let mut seen: HashSet<String> = merged.iter().cloned().collect();
205                    merged.extend(
206                        reported
207                            .iter()
208                            .filter(|id| seen.insert((*id).clone()))
209                            .cloned(),
210                    );
211                    merged
212                }
213            });
214            if let Some(used_ids) = effective_used_ids.as_deref() {
215                let previously_used: HashSet<String> = existing_used_ids.iter().cloned().collect();
216                if used_complete {
217                    self.storage.replace_used_trace(
218                        trace_id,
219                        used_ids,
220                        used_strength,
221                        used_attribution,
222                        source,
223                        &now,
224                    )?;
225                } else if let Some(reported) = used {
226                    self.storage.merge_used_trace(
227                        trace_id,
228                        reported,
229                        used_strength,
230                        used_attribution,
231                        source,
232                        &now,
233                    )?;
234                }
235                let affected: HashSet<String> = previously_used
236                    .into_iter()
237                    .chain(used_ids.iter().cloned())
238                    .collect();
239                for cid in affected {
240                    self.storage.refresh_chunk_last_used(&cid, &now)?;
241                }
242            }
243
244            // usage_trace: task_ok / task_fail
245            if let Some(o) = outcome {
246                if matches!(o, "ok" | "fail") {
247                    let event = if o == "ok" { "task_ok" } else { "task_fail" };
248                    let strength = if event == "task_fail" { 0.15 } else { 1.0 };
249                    self.storage.conn_execute(
250                        "DELETE FROM usage_trace
251                         WHERE trace_id=? AND event IN ('task_ok','task_fail')
252                           AND chunk_id IS NULL",
253                        rusqlite::params![trace_id],
254                    )?;
255                    self.storage.insert_usage_trace(
256                        trace_id, None, event, strength, None, None, None, None, None, source, &now,
257                    )?;
258                }
259            }
260
261            // Rebuild trace-derived evidence whenever either side of the pair arrives.
262            // This makes `outcome → used` and `used → outcome` equivalent and lets a
263            // later complete usage declaration replace an earlier one safely.
264            let effective_outcome =
265                outcome
266                    .filter(|value| *value != "unknown")
267                    .or(existing_outcome
268                        .as_deref()
269                        .filter(|value| *value != "unknown"));
270            if let Some(o @ ("ok" | "fail")) = effective_outcome {
271                if used.is_some()
272                    || (outcome.is_some_and(|value| value != "unknown")
273                        && existing_outcome.as_deref() != outcome)
274                {
275                    let fallback_ids: Vec<String>;
276                    let effective_used: Option<&[String]> = if effective_used_ids.is_some() {
277                        effective_used_ids.as_deref()
278                    } else {
279                        fallback_ids = log
280                            .get("used_ids")
281                            .and_then(Value::as_str)
282                            .and_then(|s| serde_json::from_str(s).ok())
283                            .unwrap_or_default();
284                        if fallback_ids.is_empty() {
285                            None
286                        } else {
287                            Some(&fallback_ids)
288                        }
289                    };
290                    let effective_complete = if used.is_some() {
291                        effective_used_complete
292                    } else {
293                        log.get("usage_state").and_then(Value::as_str) != Some("unknown")
294                            && log
295                                .get("used_complete")
296                                .and_then(Value::as_i64)
297                                .unwrap_or(1)
298                                != 0
299                    };
300                    self.replace_outcome_evidence(
301                        trace_id,
302                        o,
303                        effective_used,
304                        effective_complete,
305                        &now,
306                    )?;
307                }
308            } else if used.is_some() && effective_used_complete {
309                self.replace_selected_unused_evidence(
310                    trace_id,
311                    effective_used_ids.as_deref().unwrap_or_default(),
312                    &now,
313                )?;
314            }
315
316            let context_key = log
317                .get("context_key")
318                .and_then(Value::as_str)
319                .map(str::to_string)
320                .or_else(|| query.map(|q| content_hash(&normalize_query(q))));
321            let feedback_strength = if feedback_kind == "judge" { 0.6 } else { 1.0 };
322
323            // Persist feedback facts before reducing them into confidence.
324            // INSERT OR IGNORE: skip all derived updates for duplicate (trace_id, chunk_id, signal).
325            // Track affected chunks so we only rebuild their context_stats (not the full table).
326            let mut context_affected: HashSet<String> = HashSet::new();
327            if let Some(used_ids) = effective_used_ids.as_deref() {
328                for cid in used_ids {
329                    context_affected.insert(cid.clone());
330                }
331            }
332            if let Some(ups) = feedback_up {
333                for cid in ups {
334                    let corrected = self.storage.delete_feedback_event(trace_id, cid, "down")?;
335                    self.storage.delete_chunk_trace_confidence_evidence(
336                        trace_id,
337                        cid,
338                        "feedback_down",
339                    )?;
340                    let inserted = self.storage.insert_feedback_event(
341                        &gen_uuid(),
342                        trace_id,
343                        cid,
344                        "up",
345                        feedback_strength,
346                        source,
347                        feedback_actor,
348                        feedback_reason,
349                        context_key.as_deref(),
350                        &now,
351                    )?;
352                    if inserted > 0 {
353                        self.upsert_trace_confidence_evidence(
354                            trace_id,
355                            cid,
356                            "feedback_up",
357                            1.0,
358                            feedback_strength,
359                            if feedback_kind == "judge" {
360                                "judge_up"
361                            } else {
362                                "user_up"
363                            },
364                            context_key.as_deref(),
365                            &now,
366                            true,
367                        )?;
368                        self.storage.update_chunk_last_used(cid, &now)?;
369                        self.refresh_governance_evidence(cid, &now)?;
370                        context_affected.insert(cid.clone());
371                    } else if corrected > 0 {
372                        self.recompute_chunk_confidence(cid, &now)?;
373                        self.refresh_governance_evidence(cid, &now)?;
374                        context_affected.insert(cid.clone());
375                    }
376                }
377            }
378            if let Some(downs) = feedback_down {
379                for cid in downs {
380                    let corrected = self.storage.delete_feedback_event(trace_id, cid, "up")?;
381                    self.storage.delete_chunk_trace_confidence_evidence(
382                        trace_id,
383                        cid,
384                        "feedback_up",
385                    )?;
386                    let inserted = self.storage.insert_feedback_event(
387                        &gen_uuid(),
388                        trace_id,
389                        cid,
390                        "down",
391                        feedback_strength,
392                        source,
393                        feedback_actor,
394                        feedback_reason,
395                        context_key.as_deref(),
396                        &now,
397                    )?;
398                    if inserted > 0 {
399                        self.upsert_trace_confidence_evidence(
400                            trace_id,
401                            cid,
402                            "feedback_down",
403                            0.0,
404                            feedback_strength,
405                            if feedback_kind == "judge" {
406                                "judge_down"
407                            } else {
408                                "user_down"
409                            },
410                            context_key.as_deref(),
411                            &now,
412                            true,
413                        )?;
414                        self.refresh_governance_evidence(cid, &now)?;
415                        context_affected.insert(cid.clone());
416                    } else if corrected > 0 {
417                        self.recompute_chunk_confidence(cid, &now)?;
418                        self.refresh_governance_evidence(cid, &now)?;
419                        context_affected.insert(cid.clone());
420                    }
421                }
422            }
423            // Targeted rebuild — only update context_stats for chunks touched in this call.
424            self.rebuild_context_stats_for(&context_affected, &now)?;
425
426            // Fill in content fields (補写: output_summary, nomination, output, query) on existing log.
427            if !is_fresh_insert {
428                self.storage.patch_episodic_log_content(
429                    trace_id,
430                    query,
431                    output,
432                    output_summary,
433                    nomination,
434                    effective_priority,
435                )?;
436            }
437
438            let lifecycle_state = if effective_outcome.is_some() {
439                "completed"
440            } else {
441                task_state.unwrap_or_else(|| {
442                    log.get("task_state")
443                        .and_then(Value::as_str)
444                        .unwrap_or("running")
445                })
446            };
447            let used_ids_json = effective_used_ids
448                .as_deref()
449                .map(serde_json::to_string)
450                .transpose()?;
451            self.storage.update_trace_lifecycle(
452                trace_id,
453                lifecycle_state,
454                (lifecycle_state == "completed").then_some(now.as_str()),
455                effective_used_ids
456                    .as_deref()
457                    .map(|ids| usage_state(Some(ids))),
458                used_ids_json.as_deref(),
459                used.map(|_| used_attribution),
460                used.map(|_| effective_used_complete),
461            )?;
462
463            // Update episodic log
464            let current_state = log
465                .get("distill_state")
466                .and_then(Value::as_str)
467                .unwrap_or("open");
468            let lifecycle_completed = lifecycle_state == "completed";
469            let has_material = output_summary.is_some()
470                || nomination.is_some()
471                || output.is_some()
472                || log.get("output_summary").and_then(Value::as_str).is_some()
473                || log.get("nomination").and_then(Value::as_str).is_some()
474                || log.get("output").and_then(Value::as_str).is_some();
475            let retryable_discard = current_state == "discarded"
476                && matches!(
477                    log.get("distill_note").and_then(Value::as_str),
478                    Some("insufficient_material" | "abandoned" | "timed_out")
479                );
480            let new_state = if current_state == "open"
481                && matches!(lifecycle_state, "abandoned" | "timed_out")
482            {
483                Some("discarded")
484            } else if lifecycle_completed && (current_state == "open" || retryable_discard) {
485                if has_material {
486                    Some("new")
487                } else {
488                    Some("discarded")
489                }
490            } else {
491                None
492            };
493            if let Some(state) = new_state {
494                let note = if state == "discarded" {
495                    Some(if matches!(lifecycle_state, "abandoned" | "timed_out") {
496                        lifecycle_state
497                    } else {
498                        "insufficient_material"
499                    })
500                } else {
501                    None
502                };
503                let outcome_str = outcome.map(str::to_string);
504                self.storage.update_episodic_log_state(
505                    trace_id,
506                    state,
507                    note,
508                    outcome_str.as_deref(),
509                )?;
510                if state == "new" && retryable_discard {
511                    self.storage.conn_execute(
512                        "UPDATE episodic_log SET distill_note=NULL WHERE trace_id=?",
513                        rusqlite::params![trace_id],
514                    )?;
515                }
516            } else if outcome.is_some() {
517                let outcome_str = outcome.map(str::to_string);
518                self.storage.update_episodic_log_state(
519                    trace_id,
520                    current_state,
521                    None,
522                    outcome_str.as_deref(),
523                )?;
524            }
525
526            self.storage.commit()
527        })();
528        if result.is_err() {
529            let _ = self.storage.rollback();
530        }
531        result?;
532        self.enqueue_evolve_if_needed(&now)?;
533        Ok(())
534    }
535}