Skip to main content

innate_core/kb/record/
mod.rs

1use super::*;
2
3mod evidence;
4
5impl KnowledgeBase {
6    #[allow(clippy::too_many_arguments)]
7    pub fn record(
8        &self,
9        trace_id: &str,
10        query: Option<&str>,
11        output: Option<&str>,
12        output_summary: Option<&str>,
13        outcome: Option<&str>,
14        used: Option<&[String]>,
15        feedback_up: Option<&[String]>,
16        feedback_down: Option<&[String]>,
17        nomination: Option<&str>,
18        priority: i64,
19        source: &str,
20    ) -> Result<()> {
21        self.record_detailed(
22            trace_id,
23            query,
24            output,
25            output_summary,
26            outcome,
27            used,
28            "explicit",
29            true,
30            feedback_up,
31            feedback_down,
32            "user",
33            None,
34            None,
35            nomination,
36            priority,
37            None,
38            source,
39        )
40    }
41
42    #[allow(clippy::too_many_arguments)]
43    pub fn record_detailed(
44        &self,
45        trace_id: &str,
46        query: Option<&str>,
47        output: Option<&str>,
48        output_summary: Option<&str>,
49        outcome: Option<&str>,
50        used: Option<&[String]>,
51        used_attribution: &str,
52        used_complete: bool,
53        feedback_up: Option<&[String]>,
54        feedback_down: Option<&[String]>,
55        feedback_kind: &str,
56        feedback_actor: Option<&str>,
57        feedback_reason: Option<&str>,
58        nomination: Option<&str>,
59        priority: i64,
60        task_state: Option<&str>,
61        source: &str,
62    ) -> Result<()> {
63        let dedupe_ids = |ids: &[String]| {
64            let mut seen = HashSet::new();
65            ids.iter()
66                .filter(|id| seen.insert((*id).clone()))
67                .cloned()
68                .collect::<Vec<_>>()
69        };
70        let normalized_used = used.map(dedupe_ids);
71        let normalized_feedback_up = feedback_up.map(dedupe_ids);
72        let normalized_feedback_down = feedback_down.map(dedupe_ids);
73        let used = normalized_used.as_deref();
74        let feedback_up = normalized_feedback_up.as_deref();
75        let feedback_down = normalized_feedback_down.as_deref();
76
77        if let Some(o) = outcome {
78            if !matches!(o, "ok" | "fail" | "unknown") {
79                return Err(InnateError::InvalidState(format!("invalid outcome: {o}")));
80            }
81        }
82        if !matches!(used_attribution, "explicit" | "cited" | "inferred") {
83            return Err(InnateError::InvalidState(format!(
84                "invalid used attribution: {used_attribution}"
85            )));
86        }
87        if !matches!(feedback_kind, "user" | "judge") {
88            return Err(InnateError::InvalidState(format!(
89                "invalid feedback kind: {feedback_kind}"
90            )));
91        }
92        if let Some(state) = task_state {
93            if !matches!(
94                state,
95                "recalled" | "running" | "completed" | "abandoned" | "timed_out"
96            ) {
97                return Err(InnateError::InvalidState(format!(
98                    "invalid task state: {state}"
99                )));
100            }
101        }
102        validate_source(source)?;
103        if let (Some(ups), Some(downs)) = (feedback_up, feedback_down) {
104            let down_set: HashSet<&str> = downs.iter().map(String::as_str).collect();
105            if let Some(chunk_id) = ups.iter().find(|id| down_set.contains(id.as_str())) {
106                return Err(InnateError::InvalidState(format!(
107                    "conflicting feedback for chunk {chunk_id}"
108                )));
109            }
110        }
111        let effective_priority = if nomination.is_some() && priority == 0 {
112            1
113        } else {
114            priority
115        };
116        let now = utc_now_iso();
117        let lib_id = self.storage.lib_id()?;
118
119        self.storage.begin_immediate()?;
120        let result = (|| -> Result<()> {
121            let log = self.storage.get_episodic_log(trace_id)?;
122            let mut is_fresh_insert = false;
123            let log = match log {
124                Some(l) => l,
125                None => {
126                    let used_ids = used.map(serde_json::to_string).transpose()?;
127                    let row = EpisodicLogRow {
128                        id: gen_uuid(),
129                        trace_id: trace_id.to_string(),
130                        lib_id,
131                        ts: now.clone(),
132                        query: query.map(str::to_string).or_else(|| Some(String::new())),
133                        output: output.map(str::to_string),
134                        output_summary: output_summary.map(str::to_string),
135                        outcome: outcome.map(str::to_string),
136                        event_source: source.to_string(),
137                        task_state: if matches!(outcome, Some("ok") | Some("fail")) {
138                            "completed".to_string()
139                        } else {
140                            task_state.unwrap_or("running").to_string()
141                        },
142                        completed_at: matches!(outcome, Some("ok") | Some("fail"))
143                            .then(|| now.clone()),
144                        usage_state: usage_state(used).to_string(),
145                        used_ids,
146                        used_attribution: used.map(|_| used_attribution.to_string()),
147                        used_complete,
148                        context_key: query.map(|q| content_hash(&normalize_query(q))),
149                        nomination: nomination.map(str::to_string),
150                        priority: effective_priority,
151                        distill_state: "open".to_string(),
152                        ..Default::default()
153                    };
154                    self.storage.upsert_episodic_log(&row)?;
155                    is_fresh_insert = true;
156                    self.storage.get_episodic_log(trace_id)?.unwrap()
157                }
158            };
159            self.validate_trace_attribution(trace_id, used, "used")?;
160            self.validate_trace_attribution(trace_id, feedback_up, "feedback_up")?;
161            self.validate_trace_attribution(trace_id, feedback_down, "feedback_down")?;
162
163            let existing_outcome = log
164                .get("outcome")
165                .and_then(Value::as_str)
166                .map(str::to_string);
167            // usage_trace: used
168            let effective_used_attribution = if used.is_some() {
169                used_attribution
170            } else {
171                log.get("used_attribution")
172                    .and_then(Value::as_str)
173                    .unwrap_or(used_attribution)
174            };
175            let used_strength = match effective_used_attribution {
176                "explicit" => 0.3,
177                "cited" => 0.25,
178                "inferred" => 0.15,
179                // The parameter is validated above; this arm is only reachable if the
180                // stored used_attribution was tampered with outside the API.
181                other => {
182                    return Err(InnateError::InvalidState(format!(
183                        "invalid stored used attribution: {other}"
184                    )))
185                }
186            };
187            let existing_used_ids: Vec<String> = log
188                .get("used_ids")
189                .and_then(Value::as_str)
190                .and_then(|raw| serde_json::from_str(raw).ok())
191                .unwrap_or_default();
192            let existing_used_complete = log
193                .get("used_complete")
194                .and_then(Value::as_i64)
195                .unwrap_or(0)
196                != 0;
197            let effective_used_complete = used_complete || existing_used_complete;
198            let effective_used_ids = used.map(|reported| {
199                if used_complete {
200                    reported.to_vec()
201                } else {
202                    let mut merged = existing_used_ids.clone();
203                    let mut seen: HashSet<String> = merged.iter().cloned().collect();
204                    merged.extend(
205                        reported
206                            .iter()
207                            .filter(|id| seen.insert((*id).clone()))
208                            .cloned(),
209                    );
210                    merged
211                }
212            });
213            if let Some(used_ids) = effective_used_ids.as_deref() {
214                let previously_used: HashSet<String> = existing_used_ids.iter().cloned().collect();
215                if used_complete {
216                    self.storage.replace_used_trace(
217                        trace_id,
218                        used_ids,
219                        used_strength,
220                        used_attribution,
221                        source,
222                        &now,
223                    )?;
224                } else if let Some(reported) = used {
225                    self.storage.merge_used_trace(
226                        trace_id,
227                        reported,
228                        used_strength,
229                        used_attribution,
230                        source,
231                        &now,
232                    )?;
233                }
234                let affected: HashSet<String> = previously_used
235                    .into_iter()
236                    .chain(used_ids.iter().cloned())
237                    .collect();
238                for cid in affected {
239                    self.storage.refresh_chunk_last_used(&cid, &now)?;
240                }
241            }
242
243            // usage_trace: task_ok / task_fail
244            if let Some(o) = outcome {
245                if matches!(o, "ok" | "fail") {
246                    let event = if o == "ok" { "task_ok" } else { "task_fail" };
247                    let strength = if event == "task_fail" { 0.15 } else { 1.0 };
248                    self.storage.conn_execute(
249                        "DELETE FROM usage_trace
250                         WHERE trace_id=? AND event IN ('task_ok','task_fail')
251                           AND chunk_id IS NULL",
252                        rusqlite::params![trace_id],
253                    )?;
254                    self.storage.insert_usage_trace(
255                        trace_id, None, event, strength, None, None, None, None, None, source, &now,
256                    )?;
257                }
258            }
259
260            // Rebuild trace-derived evidence whenever either side of the pair arrives.
261            // This makes `outcome → used` and `used → outcome` equivalent and lets a
262            // later complete usage declaration replace an earlier one safely.
263            let effective_outcome =
264                outcome
265                    .filter(|value| *value != "unknown")
266                    .or(existing_outcome
267                        .as_deref()
268                        .filter(|value| *value != "unknown"));
269            if let Some(o @ ("ok" | "fail")) = effective_outcome {
270                if used.is_some()
271                    || (outcome.is_some_and(|value| value != "unknown")
272                        && existing_outcome.as_deref() != outcome)
273                {
274                    let fallback_ids: Vec<String>;
275                    let effective_used: Option<&[String]> = if effective_used_ids.is_some() {
276                        effective_used_ids.as_deref()
277                    } else {
278                        fallback_ids = log
279                            .get("used_ids")
280                            .and_then(Value::as_str)
281                            .and_then(|s| serde_json::from_str(s).ok())
282                            .unwrap_or_default();
283                        if fallback_ids.is_empty() {
284                            None
285                        } else {
286                            Some(&fallback_ids)
287                        }
288                    };
289                    let effective_complete = if used.is_some() {
290                        effective_used_complete
291                    } else {
292                        log.get("usage_state").and_then(Value::as_str) != Some("unknown")
293                            && log
294                                .get("used_complete")
295                                .and_then(Value::as_i64)
296                                .unwrap_or(1)
297                                != 0
298                    };
299                    self.replace_outcome_evidence(
300                        trace_id,
301                        o,
302                        effective_used,
303                        effective_complete,
304                        &now,
305                    )?;
306                }
307            } else if used.is_some() && effective_used_complete {
308                self.replace_selected_unused_evidence(
309                    trace_id,
310                    effective_used_ids.as_deref().unwrap_or_default(),
311                    &now,
312                )?;
313            }
314
315            let context_key = log
316                .get("context_key")
317                .and_then(Value::as_str)
318                .map(str::to_string)
319                .or_else(|| query.map(|q| content_hash(&normalize_query(q))));
320            let feedback_strength = if feedback_kind == "judge" { 0.6 } else { 1.0 };
321
322            // Persist feedback facts before reducing them into confidence.
323            // INSERT OR IGNORE: skip all derived updates for duplicate (trace_id, chunk_id, signal).
324            // Track affected chunks so we only rebuild their context_stats (not the full table).
325            let mut context_affected: HashSet<String> = HashSet::new();
326            if let Some(used_ids) = effective_used_ids.as_deref() {
327                for cid in used_ids {
328                    context_affected.insert(cid.clone());
329                }
330            }
331            if let Some(ups) = feedback_up {
332                for cid in ups {
333                    let corrected = self.storage.delete_feedback_event(trace_id, cid, "down")?;
334                    self.storage.delete_chunk_trace_confidence_evidence(
335                        trace_id,
336                        cid,
337                        "feedback_down",
338                    )?;
339                    let inserted = self.storage.insert_feedback_event(
340                        &gen_uuid(),
341                        trace_id,
342                        cid,
343                        "up",
344                        feedback_strength,
345                        source,
346                        feedback_actor,
347                        feedback_reason,
348                        context_key.as_deref(),
349                        &now,
350                    )?;
351                    if inserted > 0 {
352                        self.upsert_trace_confidence_evidence(
353                            trace_id,
354                            cid,
355                            "feedback_up",
356                            1.0,
357                            feedback_strength,
358                            if feedback_kind == "judge" {
359                                "judge_up"
360                            } else {
361                                "user_up"
362                            },
363                            context_key.as_deref(),
364                            &now,
365                            true,
366                        )?;
367                        self.storage.update_chunk_last_used(cid, &now)?;
368                        self.refresh_governance_evidence(cid, &now)?;
369                        context_affected.insert(cid.clone());
370                    } else if corrected > 0 {
371                        self.recompute_chunk_confidence(cid, &now)?;
372                        self.refresh_governance_evidence(cid, &now)?;
373                        context_affected.insert(cid.clone());
374                    }
375                }
376            }
377            if let Some(downs) = feedback_down {
378                for cid in downs {
379                    let corrected = self.storage.delete_feedback_event(trace_id, cid, "up")?;
380                    self.storage.delete_chunk_trace_confidence_evidence(
381                        trace_id,
382                        cid,
383                        "feedback_up",
384                    )?;
385                    let inserted = self.storage.insert_feedback_event(
386                        &gen_uuid(),
387                        trace_id,
388                        cid,
389                        "down",
390                        feedback_strength,
391                        source,
392                        feedback_actor,
393                        feedback_reason,
394                        context_key.as_deref(),
395                        &now,
396                    )?;
397                    if inserted > 0 {
398                        self.upsert_trace_confidence_evidence(
399                            trace_id,
400                            cid,
401                            "feedback_down",
402                            0.0,
403                            feedback_strength,
404                            if feedback_kind == "judge" {
405                                "judge_down"
406                            } else {
407                                "user_down"
408                            },
409                            context_key.as_deref(),
410                            &now,
411                            true,
412                        )?;
413                        self.refresh_governance_evidence(cid, &now)?;
414                        context_affected.insert(cid.clone());
415                    } else if corrected > 0 {
416                        self.recompute_chunk_confidence(cid, &now)?;
417                        self.refresh_governance_evidence(cid, &now)?;
418                        context_affected.insert(cid.clone());
419                    }
420                }
421            }
422            // Targeted rebuild — only update context_stats for chunks touched in this call.
423            self.rebuild_context_stats_for(&context_affected, &now)?;
424
425            // Fill in content fields (補写: output_summary, nomination, output, query) on existing log.
426            if !is_fresh_insert {
427                self.storage.patch_episodic_log_content(
428                    trace_id,
429                    query,
430                    output,
431                    output_summary,
432                    nomination,
433                    effective_priority,
434                )?;
435            }
436
437            let lifecycle_state = if effective_outcome.is_some() {
438                "completed"
439            } else {
440                task_state.unwrap_or_else(|| {
441                    log.get("task_state")
442                        .and_then(Value::as_str)
443                        .unwrap_or("running")
444                })
445            };
446            let used_ids_json = effective_used_ids
447                .as_deref()
448                .map(serde_json::to_string)
449                .transpose()?;
450            self.storage.update_trace_lifecycle(
451                trace_id,
452                lifecycle_state,
453                (lifecycle_state == "completed").then_some(now.as_str()),
454                effective_used_ids
455                    .as_deref()
456                    .map(|ids| usage_state(Some(ids))),
457                used_ids_json.as_deref(),
458                used.map(|_| used_attribution),
459                used.map(|_| effective_used_complete),
460            )?;
461
462            // Update episodic log
463            let current_state = log
464                .get("distill_state")
465                .and_then(Value::as_str)
466                .unwrap_or("open");
467            let lifecycle_completed = lifecycle_state == "completed";
468            let has_material = output_summary.is_some()
469                || nomination.is_some()
470                || output.is_some()
471                || log.get("output_summary").and_then(Value::as_str).is_some()
472                || log.get("nomination").and_then(Value::as_str).is_some()
473                || log.get("output").and_then(Value::as_str).is_some();
474            let retryable_discard = current_state == "discarded"
475                && matches!(
476                    log.get("distill_note").and_then(Value::as_str),
477                    Some("insufficient_material" | "abandoned" | "timed_out")
478                );
479            let new_state = if current_state == "open"
480                && matches!(lifecycle_state, "abandoned" | "timed_out")
481            {
482                Some("discarded")
483            } else if lifecycle_completed && (current_state == "open" || retryable_discard) {
484                if has_material {
485                    Some("new")
486                } else {
487                    Some("discarded")
488                }
489            } else {
490                None
491            };
492            if let Some(state) = new_state {
493                let note = if state == "discarded" {
494                    Some(if matches!(lifecycle_state, "abandoned" | "timed_out") {
495                        lifecycle_state
496                    } else {
497                        "insufficient_material"
498                    })
499                } else {
500                    None
501                };
502                let outcome_str = outcome.map(str::to_string);
503                self.storage.update_episodic_log_state(
504                    trace_id,
505                    state,
506                    note,
507                    outcome_str.as_deref(),
508                )?;
509                if state == "new" && retryable_discard {
510                    self.storage.conn_execute(
511                        "UPDATE episodic_log SET distill_note=NULL WHERE trace_id=?",
512                        rusqlite::params![trace_id],
513                    )?;
514                }
515            } else if outcome.is_some() {
516                let outcome_str = outcome.map(str::to_string);
517                self.storage.update_episodic_log_state(
518                    trace_id,
519                    current_state,
520                    None,
521                    outcome_str.as_deref(),
522                )?;
523            }
524
525            self.storage.commit()
526        })();
527        if result.is_err() {
528            let _ = self.storage.rollback();
529        }
530        result?;
531        self.enqueue_evolve_if_needed(&now)?;
532        Ok(())
533    }
534}