1use super::*;
2
3mod evidence;
4
5#[derive(Debug, Clone, Default)]
12pub struct RecordParams<'a> {
13 pub trace_id: &'a str,
14 pub query: Option<&'a str>,
15 pub output: Option<&'a str>,
16 pub output_summary: Option<&'a str>,
17 pub outcome: Option<&'a str>,
18 pub used: Option<&'a [String]>,
19 pub used_attribution: &'a str,
20 pub used_complete: Option<bool>,
21 pub feedback_up: Option<&'a [String]>,
22 pub feedback_down: Option<&'a [String]>,
23 pub feedback_kind: &'a str,
24 pub feedback_actor: Option<&'a str>,
25 pub feedback_reason: Option<&'a str>,
26 pub nomination: Option<&'a str>,
27 pub priority: i64,
28 pub task_state: Option<&'a str>,
29 pub source: &'a str,
30 pub verdict_heeded: bool,
35}
36
37impl KnowledgeBase {
38 pub fn record(&self, params: RecordParams<'_>) -> Result<()> {
39 let RecordParams {
40 trace_id,
41 query,
42 output,
43 output_summary,
44 outcome,
45 used,
46 used_attribution,
47 used_complete,
48 feedback_up,
49 feedback_down,
50 feedback_kind,
51 feedback_actor,
52 feedback_reason,
53 nomination,
54 priority,
55 task_state,
56 source,
57 verdict_heeded,
58 } = params;
59 let used_attribution = if used_attribution.is_empty() {
60 "explicit"
61 } else {
62 used_attribution
63 };
64 let feedback_kind = if feedback_kind.is_empty() {
65 "user"
66 } else {
67 feedback_kind
68 };
69 let used_complete = used_complete.unwrap_or(true);
70 let dedupe_ids = |ids: &[String]| {
71 let mut seen = HashSet::new();
72 ids.iter()
73 .filter(|id| seen.insert((*id).clone()))
74 .cloned()
75 .collect::<Vec<_>>()
76 };
77 let normalized_used = used.map(dedupe_ids);
78 let normalized_feedback_up = feedback_up.map(dedupe_ids);
79 let normalized_feedback_down = feedback_down.map(dedupe_ids);
80 let used = normalized_used.as_deref();
81 let feedback_up = normalized_feedback_up.as_deref();
82 let feedback_down = normalized_feedback_down.as_deref();
83
84 if let Some(o) = outcome {
85 if !matches!(o, "ok" | "fail" | "unknown") {
86 return Err(InnateError::InvalidState(format!("invalid outcome: {o}")));
87 }
88 }
89 if !matches!(used_attribution, "explicit" | "cited" | "inferred") {
90 return Err(InnateError::InvalidState(format!(
91 "invalid used attribution: {used_attribution}"
92 )));
93 }
94 if !matches!(feedback_kind, "user" | "judge") {
95 return Err(InnateError::InvalidState(format!(
96 "invalid feedback kind: {feedback_kind}"
97 )));
98 }
99 if let Some(state) = task_state {
100 if !matches!(
101 state,
102 "recalled" | "running" | "completed" | "abandoned" | "timed_out"
103 ) {
104 return Err(InnateError::InvalidState(format!(
105 "invalid task state: {state}"
106 )));
107 }
108 }
109 validate_source(source)?;
110 if let (Some(ups), Some(downs)) = (feedback_up, feedback_down) {
111 let down_set: HashSet<&str> = downs.iter().map(String::as_str).collect();
112 if let Some(chunk_id) = ups.iter().find(|id| down_set.contains(id.as_str())) {
113 return Err(InnateError::InvalidState(format!(
114 "conflicting feedback for chunk {chunk_id}"
115 )));
116 }
117 }
118 let effective_priority = if nomination.is_some() && priority == 0 {
119 1
120 } else {
121 priority
122 };
123 let now = utc_now_iso();
124 let lib_id = self.storage.lib_id()?;
125
126 self.storage.begin_immediate()?;
127 let result = (|| -> Result<()> {
128 let log = self.storage.get_episodic_log(trace_id)?;
129 let mut is_fresh_insert = false;
130 let log = match log {
131 Some(l) => l,
132 None => {
133 let used_ids = used.map(serde_json::to_string).transpose()?;
134 let row = EpisodicLogRow {
135 id: gen_uuid(),
136 trace_id: trace_id.to_string(),
137 lib_id,
138 ts: now.clone(),
139 query: query.map(str::to_string).or_else(|| Some(String::new())),
140 output: output.map(str::to_string),
141 output_summary: output_summary.map(str::to_string),
142 outcome: outcome.map(str::to_string),
143 event_source: source.to_string(),
144 task_state: if matches!(outcome, Some("ok") | Some("fail")) {
145 "completed".to_string()
146 } else {
147 task_state.unwrap_or("running").to_string()
148 },
149 completed_at: matches!(outcome, Some("ok") | Some("fail"))
150 .then(|| now.clone()),
151 usage_state: usage_state(used).to_string(),
152 used_ids,
153 used_attribution: used.map(|_| used_attribution.to_string()),
154 used_complete,
155 context_key: query.map(|q| content_hash(&normalize_query(q))),
156 nomination: nomination.map(str::to_string),
157 priority: effective_priority,
158 distill_state: "open".to_string(),
159 ..Default::default()
160 };
161 self.storage.upsert_episodic_log(&row)?;
162 is_fresh_insert = true;
163 self.storage.get_episodic_log(trace_id)?.unwrap()
164 }
165 };
166 self.validate_trace_attribution(trace_id, used, "used")?;
167 self.validate_trace_attribution(trace_id, feedback_up, "feedback_up")?;
168 self.validate_trace_attribution(trace_id, feedback_down, "feedback_down")?;
169
170 let existing_outcome = log
171 .get("outcome")
172 .and_then(Value::as_str)
173 .map(str::to_string);
174 let effective_used_attribution = if used.is_some() {
176 used_attribution
177 } else {
178 log.get("used_attribution")
179 .and_then(Value::as_str)
180 .unwrap_or(used_attribution)
181 };
182 let used_strength = match effective_used_attribution {
183 "explicit" => 0.3,
184 "cited" => 0.25,
185 "inferred" => 0.15,
186 other => {
189 return Err(InnateError::InvalidState(format!(
190 "invalid stored used attribution: {other}"
191 )))
192 }
193 };
194 let existing_used_ids: Vec<String> = log
195 .get("used_ids")
196 .and_then(Value::as_str)
197 .and_then(|raw| serde_json::from_str(raw).ok())
198 .unwrap_or_default();
199 let existing_used_complete = log
200 .get("used_complete")
201 .and_then(Value::as_i64)
202 .unwrap_or(0)
203 != 0;
204 let effective_used_complete = used_complete || existing_used_complete;
205 let effective_used_ids = used.map(|reported| {
206 if used_complete {
207 reported.to_vec()
208 } else {
209 let mut merged = existing_used_ids.clone();
210 let mut seen: HashSet<String> = merged.iter().cloned().collect();
211 merged.extend(
212 reported
213 .iter()
214 .filter(|id| seen.insert((*id).clone()))
215 .cloned(),
216 );
217 merged
218 }
219 });
220 if let Some(used_ids) = effective_used_ids.as_deref() {
221 let previously_used: HashSet<String> = existing_used_ids.iter().cloned().collect();
222 if used_complete {
223 self.storage.replace_used_trace(
224 trace_id,
225 used_ids,
226 used_strength,
227 used_attribution,
228 source,
229 &now,
230 )?;
231 } else if let Some(reported) = used {
232 self.storage.merge_used_trace(
233 trace_id,
234 reported,
235 used_strength,
236 used_attribution,
237 source,
238 &now,
239 )?;
240 }
241 let affected: HashSet<String> = previously_used
242 .into_iter()
243 .chain(used_ids.iter().cloned())
244 .collect();
245 for cid in affected {
246 self.storage.refresh_chunk_last_used(&cid, &now)?;
247 }
248 }
249
250 if let Some(o) = outcome {
252 if matches!(o, "ok" | "fail") {
253 let event = if o == "ok" { "task_ok" } else { "task_fail" };
254 let strength = if event == "task_fail" { 0.15 } else { 1.0 };
255 self.storage.conn_execute(
256 "DELETE FROM usage_trace
257 WHERE trace_id=? AND event IN ('task_ok','task_fail')
258 AND chunk_id IS NULL",
259 rusqlite::params![trace_id],
260 )?;
261 self.storage.insert_usage_trace(
262 trace_id, None, event, strength, None, None, None, None, None, source, &now,
263 )?;
264 let observed_outcome = if event == "task_fail" { 1.0 } else { -1.0 };
270 let provenance = if verdict_heeded {
271 "counterfactual_censored"
272 } else {
273 "observed"
274 };
275 self.storage.backfill_verdict_outcome(
276 trace_id,
277 observed_outcome,
278 provenance,
279 &now,
280 )?;
281 }
282 }
283
284 let effective_outcome =
288 outcome
289 .filter(|value| *value != "unknown")
290 .or(existing_outcome
291 .as_deref()
292 .filter(|value| *value != "unknown"));
293 if let Some(o @ ("ok" | "fail")) = effective_outcome {
294 if used.is_some()
295 || (outcome.is_some_and(|value| value != "unknown")
296 && existing_outcome.as_deref() != outcome)
297 {
298 let fallback_ids: Vec<String>;
299 let effective_used: Option<&[String]> = if effective_used_ids.is_some() {
300 effective_used_ids.as_deref()
301 } else {
302 fallback_ids = log
303 .get("used_ids")
304 .and_then(Value::as_str)
305 .and_then(|s| serde_json::from_str(s).ok())
306 .unwrap_or_default();
307 if fallback_ids.is_empty() {
308 None
309 } else {
310 Some(&fallback_ids)
311 }
312 };
313 let effective_complete = if used.is_some() {
314 effective_used_complete
315 } else {
316 log.get("usage_state").and_then(Value::as_str) != Some("unknown")
317 && log
318 .get("used_complete")
319 .and_then(Value::as_i64)
320 .unwrap_or(1)
321 != 0
322 };
323 self.replace_outcome_evidence(
324 trace_id,
325 o,
326 effective_used,
327 effective_complete,
328 &now,
329 )?;
330 }
331 } else if used.is_some() && effective_used_complete {
332 self.replace_selected_unused_evidence(
333 trace_id,
334 effective_used_ids.as_deref().unwrap_or_default(),
335 &now,
336 )?;
337 }
338
339 let context_key = log
340 .get("context_key")
341 .and_then(Value::as_str)
342 .map(str::to_string)
343 .or_else(|| query.map(|q| content_hash(&normalize_query(q))));
344 let feedback_strength = if feedback_kind == "judge" { 0.6 } else { 1.0 };
345
346 let mut context_affected: HashSet<String> = HashSet::new();
350 if let Some(used_ids) = effective_used_ids.as_deref() {
351 for cid in used_ids {
352 context_affected.insert(cid.clone());
353 }
354 }
355 if let Some(ups) = feedback_up {
356 for cid in ups {
357 let corrected = self.storage.delete_feedback_event(trace_id, cid, "down")?;
358 self.storage.delete_chunk_trace_confidence_evidence(
359 trace_id,
360 cid,
361 "feedback_down",
362 )?;
363 let inserted = self.storage.insert_feedback_event(
364 &gen_uuid(),
365 trace_id,
366 cid,
367 "up",
368 feedback_strength,
369 source,
370 feedback_actor,
371 feedback_reason,
372 context_key.as_deref(),
373 &now,
374 )?;
375 if inserted > 0 {
376 self.upsert_trace_confidence_evidence(
377 trace_id,
378 cid,
379 "feedback_up",
380 1.0,
381 feedback_strength,
382 if feedback_kind == "judge" {
383 "judge_up"
384 } else {
385 "user_up"
386 },
387 context_key.as_deref(),
388 &now,
389 true,
390 )?;
391 self.storage.update_chunk_last_used(cid, &now)?;
392 self.refresh_governance_evidence(cid, &now)?;
393 context_affected.insert(cid.clone());
394 } else if corrected > 0 {
395 self.recompute_chunk_confidence(cid, &now)?;
396 self.refresh_governance_evidence(cid, &now)?;
397 context_affected.insert(cid.clone());
398 }
399 }
400 }
401 if let Some(downs) = feedback_down {
402 for cid in downs {
403 let corrected = self.storage.delete_feedback_event(trace_id, cid, "up")?;
404 self.storage.delete_chunk_trace_confidence_evidence(
405 trace_id,
406 cid,
407 "feedback_up",
408 )?;
409 let inserted = self.storage.insert_feedback_event(
410 &gen_uuid(),
411 trace_id,
412 cid,
413 "down",
414 feedback_strength,
415 source,
416 feedback_actor,
417 feedback_reason,
418 context_key.as_deref(),
419 &now,
420 )?;
421 if inserted > 0 {
422 self.upsert_trace_confidence_evidence(
423 trace_id,
424 cid,
425 "feedback_down",
426 0.0,
427 feedback_strength,
428 if feedback_kind == "judge" {
429 "judge_down"
430 } else {
431 "user_down"
432 },
433 context_key.as_deref(),
434 &now,
435 true,
436 )?;
437 self.refresh_governance_evidence(cid, &now)?;
438 context_affected.insert(cid.clone());
439 } else if corrected > 0 {
440 self.recompute_chunk_confidence(cid, &now)?;
441 self.refresh_governance_evidence(cid, &now)?;
442 context_affected.insert(cid.clone());
443 }
444 }
445 }
446 self.rebuild_context_stats_for(&context_affected, &now)?;
448
449 if !is_fresh_insert {
451 self.storage.patch_episodic_log_content(
452 trace_id,
453 query,
454 output,
455 output_summary,
456 nomination,
457 effective_priority,
458 )?;
459 }
460
461 let lifecycle_state = if effective_outcome.is_some() {
462 "completed"
463 } else {
464 task_state.unwrap_or_else(|| {
465 log.get("task_state")
466 .and_then(Value::as_str)
467 .unwrap_or("running")
468 })
469 };
470 let used_ids_json = effective_used_ids
471 .as_deref()
472 .map(serde_json::to_string)
473 .transpose()?;
474 self.storage.update_trace_lifecycle(
475 trace_id,
476 lifecycle_state,
477 (lifecycle_state == "completed").then_some(now.as_str()),
478 effective_used_ids
479 .as_deref()
480 .map(|ids| usage_state(Some(ids))),
481 used_ids_json.as_deref(),
482 used.map(|_| used_attribution),
483 used.map(|_| effective_used_complete),
484 )?;
485
486 let current_state = log
488 .get("distill_state")
489 .and_then(Value::as_str)
490 .unwrap_or("open");
491 let lifecycle_completed = lifecycle_state == "completed";
492 let has_material = output_summary.is_some()
493 || nomination.is_some()
494 || output.is_some()
495 || log.get("output_summary").and_then(Value::as_str).is_some()
496 || log.get("nomination").and_then(Value::as_str).is_some()
497 || log.get("output").and_then(Value::as_str).is_some();
498 let retryable_discard = current_state == "discarded"
499 && matches!(
500 log.get("distill_note").and_then(Value::as_str),
501 Some("insufficient_material" | "abandoned" | "timed_out")
502 );
503 let new_state = if current_state == "open"
504 && matches!(lifecycle_state, "abandoned" | "timed_out")
505 {
506 Some("discarded")
507 } else if lifecycle_completed && (current_state == "open" || retryable_discard) {
508 if has_material {
509 Some("new")
510 } else {
511 Some("discarded")
512 }
513 } else {
514 None
515 };
516 if let Some(state) = new_state {
517 let note = if state == "discarded" {
518 Some(if matches!(lifecycle_state, "abandoned" | "timed_out") {
519 lifecycle_state
520 } else {
521 "insufficient_material"
522 })
523 } else {
524 None
525 };
526 let outcome_str = outcome.map(str::to_string);
527 self.storage.update_episodic_log_state(
528 trace_id,
529 state,
530 note,
531 outcome_str.as_deref(),
532 )?;
533 if state == "new" && retryable_discard {
534 self.storage.conn_execute(
535 "UPDATE episodic_log SET distill_note=NULL WHERE trace_id=?",
536 rusqlite::params![trace_id],
537 )?;
538 }
539 } else if outcome.is_some() {
540 let outcome_str = outcome.map(str::to_string);
541 self.storage.update_episodic_log_state(
542 trace_id,
543 current_state,
544 None,
545 outcome_str.as_deref(),
546 )?;
547 }
548
549 self.storage.commit()
550 })();
551 if result.is_err() {
552 let _ = self.storage.rollback();
553 }
554 result?;
555 self.enqueue_evolve_if_needed(&now)?;
556 Ok(())
557 }
558}