1use super::*;
2
3mod evidence;
4
5#[derive(Debug, Clone, Default)]
12pub struct RecordParams<'a> {
13 pub trace_id: &'a str,
14 pub query: Option<&'a str>,
15 pub output: Option<&'a str>,
16 pub output_summary: Option<&'a str>,
17 pub outcome: Option<&'a str>,
18 pub used: Option<&'a [String]>,
19 pub used_attribution: &'a str,
20 pub used_complete: Option<bool>,
21 pub feedback_up: Option<&'a [String]>,
22 pub feedback_down: Option<&'a [String]>,
23 pub feedback_kind: &'a str,
24 pub feedback_actor: Option<&'a str>,
25 pub feedback_reason: Option<&'a str>,
26 pub nomination: Option<&'a str>,
27 pub priority: i64,
28 pub task_state: Option<&'a str>,
29 pub source: &'a str,
30 pub verdict_heeded: bool,
35}
36
37impl KnowledgeBase {
38 pub fn record(&self, params: RecordParams<'_>) -> Result<()> {
39 let RecordParams {
40 trace_id,
41 query,
42 output,
43 output_summary,
44 outcome,
45 used,
46 used_attribution,
47 used_complete,
48 feedback_up,
49 feedback_down,
50 feedback_kind,
51 feedback_actor,
52 feedback_reason,
53 nomination,
54 priority,
55 task_state,
56 source,
57 verdict_heeded,
58 } = params;
59 let used_attribution = if used_attribution.is_empty() {
60 "explicit"
61 } else {
62 used_attribution
63 };
64 let feedback_kind = if feedback_kind.is_empty() {
65 "user"
66 } else {
67 feedback_kind
68 };
69 let used_complete = used_complete.unwrap_or(true);
70 let dedupe_ids = |ids: &[String]| {
71 let mut seen = HashSet::new();
72 ids.iter()
73 .filter(|id| seen.insert((*id).clone()))
74 .cloned()
75 .collect::<Vec<_>>()
76 };
77 let normalized_used = used.map(dedupe_ids);
78 let normalized_feedback_up = feedback_up.map(dedupe_ids);
79 let normalized_feedback_down = feedback_down.map(dedupe_ids);
80 let used = normalized_used.as_deref();
81 let feedback_up = normalized_feedback_up.as_deref();
82 let feedback_down = normalized_feedback_down.as_deref();
83
84 if let Some(o) = outcome {
85 if !matches!(o, "ok" | "fail" | "unknown") {
86 return Err(InnateError::InvalidState(format!("invalid outcome: {o}")));
87 }
88 }
89 if !matches!(used_attribution, "explicit" | "cited" | "inferred") {
90 return Err(InnateError::InvalidState(format!(
91 "invalid used attribution: {used_attribution}"
92 )));
93 }
94 if !matches!(feedback_kind, "user" | "judge") {
95 return Err(InnateError::InvalidState(format!(
96 "invalid feedback kind: {feedback_kind}"
97 )));
98 }
99 if let Some(state) = task_state {
100 if !matches!(
101 state,
102 "recalled" | "running" | "completed" | "abandoned" | "timed_out"
103 ) {
104 return Err(InnateError::InvalidState(format!(
105 "invalid task state: {state}"
106 )));
107 }
108 }
109 validate_source(source)?;
110 if let (Some(ups), Some(downs)) = (feedback_up, feedback_down) {
111 let down_set: HashSet<&str> = downs.iter().map(String::as_str).collect();
112 if let Some(chunk_id) = ups.iter().find(|id| down_set.contains(id.as_str())) {
113 return Err(InnateError::InvalidState(format!(
114 "conflicting feedback for chunk {chunk_id}"
115 )));
116 }
117 }
118 let effective_priority = if nomination.is_some() && priority == 0 {
119 1
120 } else {
121 priority
122 };
123 let now = utc_now_iso();
124 let lib_id = self.storage.lib_id()?;
125
126 self.storage.begin_immediate()?;
127 let result = (|| -> Result<()> {
128 let log = self.storage.get_episodic_log(trace_id)?;
129 let mut is_fresh_insert = false;
130 let log = match log {
131 Some(l) => l,
132 None => {
133 let used_ids = used.map(serde_json::to_string).transpose()?;
134 let row = EpisodicLogRow {
135 id: gen_uuid(),
136 trace_id: trace_id.to_string(),
137 lib_id,
138 ts: now.clone(),
139 query: query.map(str::to_string).or_else(|| Some(String::new())),
140 output: output.map(str::to_string),
141 output_summary: output_summary.map(str::to_string),
142 outcome: outcome.map(str::to_string),
143 event_source: source.to_string(),
144 agent: agent_source(),
145 task_state: if matches!(outcome, Some("ok") | Some("fail")) {
146 "completed".to_string()
147 } else {
148 task_state.unwrap_or("running").to_string()
149 },
150 completed_at: matches!(outcome, Some("ok") | Some("fail"))
151 .then(|| now.clone()),
152 usage_state: usage_state(used).to_string(),
153 used_ids,
154 used_attribution: used.map(|_| used_attribution.to_string()),
155 used_complete,
156 context_key: query.map(|q| content_hash(&normalize_query(q))),
157 nomination: nomination.map(str::to_string),
158 priority: effective_priority,
159 distill_state: "open".to_string(),
160 ..Default::default()
161 };
162 self.storage.upsert_episodic_log(&row)?;
163 is_fresh_insert = true;
164 self.storage.get_episodic_log(trace_id)?.unwrap()
165 }
166 };
167 self.validate_trace_attribution(trace_id, used, "used")?;
168 self.validate_trace_attribution(trace_id, feedback_up, "feedback_up")?;
169 self.validate_trace_attribution(trace_id, feedback_down, "feedback_down")?;
170
171 let existing_outcome = log
172 .get("outcome")
173 .and_then(Value::as_str)
174 .map(str::to_string);
175 let effective_used_attribution = if used.is_some() {
177 used_attribution
178 } else {
179 log.get("used_attribution")
180 .and_then(Value::as_str)
181 .unwrap_or(used_attribution)
182 };
183 let used_strength = match effective_used_attribution {
184 "explicit" => 0.3,
185 "cited" => 0.25,
186 "inferred" => 0.15,
187 other => {
190 return Err(InnateError::InvalidState(format!(
191 "invalid stored used attribution: {other}"
192 )))
193 }
194 };
195 let existing_used_ids: Vec<String> = log
196 .get("used_ids")
197 .and_then(Value::as_str)
198 .and_then(|raw| serde_json::from_str(raw).ok())
199 .unwrap_or_default();
200 let existing_used_complete = log
201 .get("used_complete")
202 .and_then(Value::as_i64)
203 .unwrap_or(0)
204 != 0;
205 let effective_used_complete = used_complete || existing_used_complete;
206 let effective_used_ids = used.map(|reported| {
207 if used_complete {
208 reported.to_vec()
209 } else {
210 let mut merged = existing_used_ids.clone();
211 let mut seen: HashSet<String> = merged.iter().cloned().collect();
212 merged.extend(
213 reported
214 .iter()
215 .filter(|id| seen.insert((*id).clone()))
216 .cloned(),
217 );
218 merged
219 }
220 });
221 if let Some(used_ids) = effective_used_ids.as_deref() {
222 let previously_used: HashSet<String> = existing_used_ids.iter().cloned().collect();
223 if used_complete {
224 self.storage.replace_used_trace(
225 trace_id,
226 used_ids,
227 used_strength,
228 used_attribution,
229 source,
230 &now,
231 )?;
232 } else if let Some(reported) = used {
233 self.storage.merge_used_trace(
234 trace_id,
235 reported,
236 used_strength,
237 used_attribution,
238 source,
239 &now,
240 )?;
241 }
242 let affected: HashSet<String> = previously_used
243 .into_iter()
244 .chain(used_ids.iter().cloned())
245 .collect();
246 for cid in affected {
247 self.storage.refresh_chunk_last_used(&cid, &now)?;
248 }
249 }
250
251 if let Some(o) = outcome {
253 if matches!(o, "ok" | "fail") {
254 let event = if o == "ok" { "task_ok" } else { "task_fail" };
255 let strength = if event == "task_fail" { 0.15 } else { 1.0 };
256 self.storage.conn_execute(
257 "DELETE FROM usage_trace
258 WHERE trace_id=? AND event IN ('task_ok','task_fail')
259 AND chunk_id IS NULL",
260 rusqlite::params![trace_id],
261 )?;
262 self.storage.insert_usage_trace(
263 trace_id, None, event, strength, None, None, None, None, None, source, &now,
264 )?;
265 let observed_outcome = if event == "task_fail" { 1.0 } else { -1.0 };
271 let provenance = if verdict_heeded {
272 "counterfactual_censored"
273 } else {
274 "observed"
275 };
276 self.storage.backfill_verdict_outcome(
277 trace_id,
278 observed_outcome,
279 provenance,
280 &now,
281 )?;
282 }
283 }
284
285 let effective_outcome =
289 outcome
290 .filter(|value| *value != "unknown")
291 .or(existing_outcome
292 .as_deref()
293 .filter(|value| *value != "unknown"));
294 if let Some(o @ ("ok" | "fail")) = effective_outcome {
295 if used.is_some()
296 || (outcome.is_some_and(|value| value != "unknown")
297 && existing_outcome.as_deref() != outcome)
298 {
299 let fallback_ids: Vec<String>;
300 let effective_used: Option<&[String]> = if effective_used_ids.is_some() {
301 effective_used_ids.as_deref()
302 } else {
303 fallback_ids = log
304 .get("used_ids")
305 .and_then(Value::as_str)
306 .and_then(|s| serde_json::from_str(s).ok())
307 .unwrap_or_default();
308 if fallback_ids.is_empty() {
309 None
310 } else {
311 Some(&fallback_ids)
312 }
313 };
314 let effective_complete = if used.is_some() {
315 effective_used_complete
316 } else {
317 log.get("usage_state").and_then(Value::as_str) != Some("unknown")
318 && log
319 .get("used_complete")
320 .and_then(Value::as_i64)
321 .unwrap_or(1)
322 != 0
323 };
324 self.replace_outcome_evidence(
325 trace_id,
326 o,
327 effective_used,
328 effective_complete,
329 &now,
330 )?;
331 }
332 } else if used.is_some() && effective_used_complete {
333 self.replace_selected_unused_evidence(
334 trace_id,
335 effective_used_ids.as_deref().unwrap_or_default(),
336 &now,
337 )?;
338 }
339
340 let context_key = log
341 .get("context_key")
342 .and_then(Value::as_str)
343 .map(str::to_string)
344 .or_else(|| query.map(|q| content_hash(&normalize_query(q))));
345 let feedback_strength = if feedback_kind == "judge" { 0.6 } else { 1.0 };
346
347 let mut context_affected: HashSet<String> = HashSet::new();
351 if let Some(used_ids) = effective_used_ids.as_deref() {
352 for cid in used_ids {
353 context_affected.insert(cid.clone());
354 }
355 }
356 if let Some(ups) = feedback_up {
357 for cid in ups {
358 let corrected = self.storage.delete_feedback_event(trace_id, cid, "down")?;
359 self.storage.delete_chunk_trace_confidence_evidence(
360 trace_id,
361 cid,
362 "feedback_down",
363 )?;
364 let inserted = self.storage.insert_feedback_event(
365 &gen_uuid(),
366 trace_id,
367 cid,
368 "up",
369 feedback_strength,
370 source,
371 feedback_actor,
372 feedback_reason,
373 context_key.as_deref(),
374 &now,
375 )?;
376 if inserted > 0 {
377 self.upsert_trace_confidence_evidence(
378 trace_id,
379 cid,
380 "feedback_up",
381 1.0,
382 feedback_strength,
383 if feedback_kind == "judge" {
384 "judge_up"
385 } else {
386 "user_up"
387 },
388 context_key.as_deref(),
389 &now,
390 true,
391 )?;
392 self.storage.update_chunk_last_used(cid, &now)?;
393 self.refresh_governance_evidence(cid, &now)?;
394 context_affected.insert(cid.clone());
395 } else if corrected > 0 {
396 self.recompute_chunk_confidence(cid, &now)?;
397 self.refresh_governance_evidence(cid, &now)?;
398 context_affected.insert(cid.clone());
399 }
400 }
401 }
402 if let Some(downs) = feedback_down {
403 for cid in downs {
404 let corrected = self.storage.delete_feedback_event(trace_id, cid, "up")?;
405 self.storage.delete_chunk_trace_confidence_evidence(
406 trace_id,
407 cid,
408 "feedback_up",
409 )?;
410 let inserted = self.storage.insert_feedback_event(
411 &gen_uuid(),
412 trace_id,
413 cid,
414 "down",
415 feedback_strength,
416 source,
417 feedback_actor,
418 feedback_reason,
419 context_key.as_deref(),
420 &now,
421 )?;
422 if inserted > 0 {
423 self.upsert_trace_confidence_evidence(
424 trace_id,
425 cid,
426 "feedback_down",
427 0.0,
428 feedback_strength,
429 if feedback_kind == "judge" {
430 "judge_down"
431 } else {
432 "user_down"
433 },
434 context_key.as_deref(),
435 &now,
436 true,
437 )?;
438 self.refresh_governance_evidence(cid, &now)?;
439 context_affected.insert(cid.clone());
440 } else if corrected > 0 {
441 self.recompute_chunk_confidence(cid, &now)?;
442 self.refresh_governance_evidence(cid, &now)?;
443 context_affected.insert(cid.clone());
444 }
445 }
446 }
447 self.rebuild_context_stats_for(&context_affected, &now)?;
449
450 if !is_fresh_insert {
452 self.storage.patch_episodic_log_content(
453 trace_id,
454 query,
455 output,
456 output_summary,
457 nomination,
458 effective_priority,
459 )?;
460 }
461
462 let lifecycle_state = if effective_outcome.is_some() {
463 "completed"
464 } else {
465 task_state.unwrap_or_else(|| {
466 log.get("task_state")
467 .and_then(Value::as_str)
468 .unwrap_or("running")
469 })
470 };
471 let used_ids_json = effective_used_ids
472 .as_deref()
473 .map(serde_json::to_string)
474 .transpose()?;
475 self.storage.update_trace_lifecycle(
476 trace_id,
477 lifecycle_state,
478 (lifecycle_state == "completed").then_some(now.as_str()),
479 effective_used_ids
480 .as_deref()
481 .map(|ids| usage_state(Some(ids))),
482 used_ids_json.as_deref(),
483 used.map(|_| used_attribution),
484 used.map(|_| effective_used_complete),
485 )?;
486
487 let current_state = log
489 .get("distill_state")
490 .and_then(Value::as_str)
491 .unwrap_or("open");
492 let lifecycle_completed = lifecycle_state == "completed";
493 let has_material = output_summary.is_some()
494 || nomination.is_some()
495 || output.is_some()
496 || log.get("output_summary").and_then(Value::as_str).is_some()
497 || log.get("nomination").and_then(Value::as_str).is_some()
498 || log.get("output").and_then(Value::as_str).is_some();
499 let retryable_discard = current_state == "discarded"
500 && matches!(
501 log.get("distill_note").and_then(Value::as_str),
502 Some("insufficient_material" | "abandoned" | "timed_out")
503 );
504 let new_state = if current_state == "open"
505 && matches!(lifecycle_state, "abandoned" | "timed_out")
506 {
507 Some("discarded")
508 } else if lifecycle_completed && (current_state == "open" || retryable_discard) {
509 if has_material {
510 Some("new")
511 } else {
512 Some("discarded")
513 }
514 } else {
515 None
516 };
517 if let Some(state) = new_state {
518 let note = if state == "discarded" {
519 Some(if matches!(lifecycle_state, "abandoned" | "timed_out") {
520 lifecycle_state
521 } else {
522 "insufficient_material"
523 })
524 } else {
525 None
526 };
527 let outcome_str = outcome.map(str::to_string);
528 self.storage.update_episodic_log_state(
529 trace_id,
530 state,
531 note,
532 outcome_str.as_deref(),
533 )?;
534 if state == "new" && retryable_discard {
535 self.storage.conn_execute(
536 "UPDATE episodic_log SET distill_note=NULL WHERE trace_id=?",
537 rusqlite::params![trace_id],
538 )?;
539 }
540 } else if outcome.is_some() {
541 let outcome_str = outcome.map(str::to_string);
542 self.storage.update_episodic_log_state(
543 trace_id,
544 current_state,
545 None,
546 outcome_str.as_deref(),
547 )?;
548 }
549
550 self.storage.commit()
551 })();
552 if result.is_err() {
553 let _ = self.storage.rollback();
554 }
555 result?;
556 self.enqueue_evolve_if_needed(&now)?;
557 Ok(())
558 }
559}