1use super::*;
2
3mod evidence;
4
5#[derive(Debug, Clone, Default)]
12pub struct RecordParams<'a> {
13 pub trace_id: &'a str,
14 pub query: Option<&'a str>,
15 pub output: Option<&'a str>,
16 pub output_summary: Option<&'a str>,
17 pub outcome: Option<&'a str>,
18 pub used: Option<&'a [String]>,
19 pub used_attribution: &'a str,
20 pub used_complete: Option<bool>,
21 pub feedback_up: Option<&'a [String]>,
22 pub feedback_down: Option<&'a [String]>,
23 pub feedback_kind: &'a str,
24 pub feedback_actor: Option<&'a str>,
25 pub feedback_reason: Option<&'a str>,
26 pub nomination: Option<&'a str>,
27 pub priority: i64,
28 pub task_state: Option<&'a str>,
29 pub source: &'a str,
30}
31
32impl KnowledgeBase {
33 pub fn record(&self, params: RecordParams<'_>) -> Result<()> {
34 let RecordParams {
35 trace_id,
36 query,
37 output,
38 output_summary,
39 outcome,
40 used,
41 used_attribution,
42 used_complete,
43 feedback_up,
44 feedback_down,
45 feedback_kind,
46 feedback_actor,
47 feedback_reason,
48 nomination,
49 priority,
50 task_state,
51 source,
52 } = params;
53 let used_attribution = if used_attribution.is_empty() {
54 "explicit"
55 } else {
56 used_attribution
57 };
58 let feedback_kind = if feedback_kind.is_empty() {
59 "user"
60 } else {
61 feedback_kind
62 };
63 let used_complete = used_complete.unwrap_or(true);
64 let dedupe_ids = |ids: &[String]| {
65 let mut seen = HashSet::new();
66 ids.iter()
67 .filter(|id| seen.insert((*id).clone()))
68 .cloned()
69 .collect::<Vec<_>>()
70 };
71 let normalized_used = used.map(dedupe_ids);
72 let normalized_feedback_up = feedback_up.map(dedupe_ids);
73 let normalized_feedback_down = feedback_down.map(dedupe_ids);
74 let used = normalized_used.as_deref();
75 let feedback_up = normalized_feedback_up.as_deref();
76 let feedback_down = normalized_feedback_down.as_deref();
77
78 if let Some(o) = outcome {
79 if !matches!(o, "ok" | "fail" | "unknown") {
80 return Err(InnateError::InvalidState(format!("invalid outcome: {o}")));
81 }
82 }
83 if !matches!(used_attribution, "explicit" | "cited" | "inferred") {
84 return Err(InnateError::InvalidState(format!(
85 "invalid used attribution: {used_attribution}"
86 )));
87 }
88 if !matches!(feedback_kind, "user" | "judge") {
89 return Err(InnateError::InvalidState(format!(
90 "invalid feedback kind: {feedback_kind}"
91 )));
92 }
93 if let Some(state) = task_state {
94 if !matches!(
95 state,
96 "recalled" | "running" | "completed" | "abandoned" | "timed_out"
97 ) {
98 return Err(InnateError::InvalidState(format!(
99 "invalid task state: {state}"
100 )));
101 }
102 }
103 validate_source(source)?;
104 if let (Some(ups), Some(downs)) = (feedback_up, feedback_down) {
105 let down_set: HashSet<&str> = downs.iter().map(String::as_str).collect();
106 if let Some(chunk_id) = ups.iter().find(|id| down_set.contains(id.as_str())) {
107 return Err(InnateError::InvalidState(format!(
108 "conflicting feedback for chunk {chunk_id}"
109 )));
110 }
111 }
112 let effective_priority = if nomination.is_some() && priority == 0 {
113 1
114 } else {
115 priority
116 };
117 let now = utc_now_iso();
118 let lib_id = self.storage.lib_id()?;
119
120 self.storage.begin_immediate()?;
121 let result = (|| -> Result<()> {
122 let log = self.storage.get_episodic_log(trace_id)?;
123 let mut is_fresh_insert = false;
124 let log = match log {
125 Some(l) => l,
126 None => {
127 let used_ids = used.map(serde_json::to_string).transpose()?;
128 let row = EpisodicLogRow {
129 id: gen_uuid(),
130 trace_id: trace_id.to_string(),
131 lib_id,
132 ts: now.clone(),
133 query: query.map(str::to_string).or_else(|| Some(String::new())),
134 output: output.map(str::to_string),
135 output_summary: output_summary.map(str::to_string),
136 outcome: outcome.map(str::to_string),
137 event_source: source.to_string(),
138 task_state: if matches!(outcome, Some("ok") | Some("fail")) {
139 "completed".to_string()
140 } else {
141 task_state.unwrap_or("running").to_string()
142 },
143 completed_at: matches!(outcome, Some("ok") | Some("fail"))
144 .then(|| now.clone()),
145 usage_state: usage_state(used).to_string(),
146 used_ids,
147 used_attribution: used.map(|_| used_attribution.to_string()),
148 used_complete,
149 context_key: query.map(|q| content_hash(&normalize_query(q))),
150 nomination: nomination.map(str::to_string),
151 priority: effective_priority,
152 distill_state: "open".to_string(),
153 ..Default::default()
154 };
155 self.storage.upsert_episodic_log(&row)?;
156 is_fresh_insert = true;
157 self.storage.get_episodic_log(trace_id)?.unwrap()
158 }
159 };
160 self.validate_trace_attribution(trace_id, used, "used")?;
161 self.validate_trace_attribution(trace_id, feedback_up, "feedback_up")?;
162 self.validate_trace_attribution(trace_id, feedback_down, "feedback_down")?;
163
164 let existing_outcome = log
165 .get("outcome")
166 .and_then(Value::as_str)
167 .map(str::to_string);
168 let effective_used_attribution = if used.is_some() {
170 used_attribution
171 } else {
172 log.get("used_attribution")
173 .and_then(Value::as_str)
174 .unwrap_or(used_attribution)
175 };
176 let used_strength = match effective_used_attribution {
177 "explicit" => 0.3,
178 "cited" => 0.25,
179 "inferred" => 0.15,
180 other => {
183 return Err(InnateError::InvalidState(format!(
184 "invalid stored used attribution: {other}"
185 )))
186 }
187 };
188 let existing_used_ids: Vec<String> = log
189 .get("used_ids")
190 .and_then(Value::as_str)
191 .and_then(|raw| serde_json::from_str(raw).ok())
192 .unwrap_or_default();
193 let existing_used_complete = log
194 .get("used_complete")
195 .and_then(Value::as_i64)
196 .unwrap_or(0)
197 != 0;
198 let effective_used_complete = used_complete || existing_used_complete;
199 let effective_used_ids = used.map(|reported| {
200 if used_complete {
201 reported.to_vec()
202 } else {
203 let mut merged = existing_used_ids.clone();
204 let mut seen: HashSet<String> = merged.iter().cloned().collect();
205 merged.extend(
206 reported
207 .iter()
208 .filter(|id| seen.insert((*id).clone()))
209 .cloned(),
210 );
211 merged
212 }
213 });
214 if let Some(used_ids) = effective_used_ids.as_deref() {
215 let previously_used: HashSet<String> = existing_used_ids.iter().cloned().collect();
216 if used_complete {
217 self.storage.replace_used_trace(
218 trace_id,
219 used_ids,
220 used_strength,
221 used_attribution,
222 source,
223 &now,
224 )?;
225 } else if let Some(reported) = used {
226 self.storage.merge_used_trace(
227 trace_id,
228 reported,
229 used_strength,
230 used_attribution,
231 source,
232 &now,
233 )?;
234 }
235 let affected: HashSet<String> = previously_used
236 .into_iter()
237 .chain(used_ids.iter().cloned())
238 .collect();
239 for cid in affected {
240 self.storage.refresh_chunk_last_used(&cid, &now)?;
241 }
242 }
243
244 if let Some(o) = outcome {
246 if matches!(o, "ok" | "fail") {
247 let event = if o == "ok" { "task_ok" } else { "task_fail" };
248 let strength = if event == "task_fail" { 0.15 } else { 1.0 };
249 self.storage.conn_execute(
250 "DELETE FROM usage_trace
251 WHERE trace_id=? AND event IN ('task_ok','task_fail')
252 AND chunk_id IS NULL",
253 rusqlite::params![trace_id],
254 )?;
255 self.storage.insert_usage_trace(
256 trace_id, None, event, strength, None, None, None, None, None, source, &now,
257 )?;
258 }
259 }
260
261 let effective_outcome =
265 outcome
266 .filter(|value| *value != "unknown")
267 .or(existing_outcome
268 .as_deref()
269 .filter(|value| *value != "unknown"));
270 if let Some(o @ ("ok" | "fail")) = effective_outcome {
271 if used.is_some()
272 || (outcome.is_some_and(|value| value != "unknown")
273 && existing_outcome.as_deref() != outcome)
274 {
275 let fallback_ids: Vec<String>;
276 let effective_used: Option<&[String]> = if effective_used_ids.is_some() {
277 effective_used_ids.as_deref()
278 } else {
279 fallback_ids = log
280 .get("used_ids")
281 .and_then(Value::as_str)
282 .and_then(|s| serde_json::from_str(s).ok())
283 .unwrap_or_default();
284 if fallback_ids.is_empty() {
285 None
286 } else {
287 Some(&fallback_ids)
288 }
289 };
290 let effective_complete = if used.is_some() {
291 effective_used_complete
292 } else {
293 log.get("usage_state").and_then(Value::as_str) != Some("unknown")
294 && log
295 .get("used_complete")
296 .and_then(Value::as_i64)
297 .unwrap_or(1)
298 != 0
299 };
300 self.replace_outcome_evidence(
301 trace_id,
302 o,
303 effective_used,
304 effective_complete,
305 &now,
306 )?;
307 }
308 } else if used.is_some() && effective_used_complete {
309 self.replace_selected_unused_evidence(
310 trace_id,
311 effective_used_ids.as_deref().unwrap_or_default(),
312 &now,
313 )?;
314 }
315
316 let context_key = log
317 .get("context_key")
318 .and_then(Value::as_str)
319 .map(str::to_string)
320 .or_else(|| query.map(|q| content_hash(&normalize_query(q))));
321 let feedback_strength = if feedback_kind == "judge" { 0.6 } else { 1.0 };
322
323 let mut context_affected: HashSet<String> = HashSet::new();
327 if let Some(used_ids) = effective_used_ids.as_deref() {
328 for cid in used_ids {
329 context_affected.insert(cid.clone());
330 }
331 }
332 if let Some(ups) = feedback_up {
333 for cid in ups {
334 let corrected = self.storage.delete_feedback_event(trace_id, cid, "down")?;
335 self.storage.delete_chunk_trace_confidence_evidence(
336 trace_id,
337 cid,
338 "feedback_down",
339 )?;
340 let inserted = self.storage.insert_feedback_event(
341 &gen_uuid(),
342 trace_id,
343 cid,
344 "up",
345 feedback_strength,
346 source,
347 feedback_actor,
348 feedback_reason,
349 context_key.as_deref(),
350 &now,
351 )?;
352 if inserted > 0 {
353 self.upsert_trace_confidence_evidence(
354 trace_id,
355 cid,
356 "feedback_up",
357 1.0,
358 feedback_strength,
359 if feedback_kind == "judge" {
360 "judge_up"
361 } else {
362 "user_up"
363 },
364 context_key.as_deref(),
365 &now,
366 true,
367 )?;
368 self.storage.update_chunk_last_used(cid, &now)?;
369 self.refresh_governance_evidence(cid, &now)?;
370 context_affected.insert(cid.clone());
371 } else if corrected > 0 {
372 self.recompute_chunk_confidence(cid, &now)?;
373 self.refresh_governance_evidence(cid, &now)?;
374 context_affected.insert(cid.clone());
375 }
376 }
377 }
378 if let Some(downs) = feedback_down {
379 for cid in downs {
380 let corrected = self.storage.delete_feedback_event(trace_id, cid, "up")?;
381 self.storage.delete_chunk_trace_confidence_evidence(
382 trace_id,
383 cid,
384 "feedback_up",
385 )?;
386 let inserted = self.storage.insert_feedback_event(
387 &gen_uuid(),
388 trace_id,
389 cid,
390 "down",
391 feedback_strength,
392 source,
393 feedback_actor,
394 feedback_reason,
395 context_key.as_deref(),
396 &now,
397 )?;
398 if inserted > 0 {
399 self.upsert_trace_confidence_evidence(
400 trace_id,
401 cid,
402 "feedback_down",
403 0.0,
404 feedback_strength,
405 if feedback_kind == "judge" {
406 "judge_down"
407 } else {
408 "user_down"
409 },
410 context_key.as_deref(),
411 &now,
412 true,
413 )?;
414 self.refresh_governance_evidence(cid, &now)?;
415 context_affected.insert(cid.clone());
416 } else if corrected > 0 {
417 self.recompute_chunk_confidence(cid, &now)?;
418 self.refresh_governance_evidence(cid, &now)?;
419 context_affected.insert(cid.clone());
420 }
421 }
422 }
423 self.rebuild_context_stats_for(&context_affected, &now)?;
425
426 if !is_fresh_insert {
428 self.storage.patch_episodic_log_content(
429 trace_id,
430 query,
431 output,
432 output_summary,
433 nomination,
434 effective_priority,
435 )?;
436 }
437
438 let lifecycle_state = if effective_outcome.is_some() {
439 "completed"
440 } else {
441 task_state.unwrap_or_else(|| {
442 log.get("task_state")
443 .and_then(Value::as_str)
444 .unwrap_or("running")
445 })
446 };
447 let used_ids_json = effective_used_ids
448 .as_deref()
449 .map(serde_json::to_string)
450 .transpose()?;
451 self.storage.update_trace_lifecycle(
452 trace_id,
453 lifecycle_state,
454 (lifecycle_state == "completed").then_some(now.as_str()),
455 effective_used_ids
456 .as_deref()
457 .map(|ids| usage_state(Some(ids))),
458 used_ids_json.as_deref(),
459 used.map(|_| used_attribution),
460 used.map(|_| effective_used_complete),
461 )?;
462
463 let current_state = log
465 .get("distill_state")
466 .and_then(Value::as_str)
467 .unwrap_or("open");
468 let lifecycle_completed = lifecycle_state == "completed";
469 let has_material = output_summary.is_some()
470 || nomination.is_some()
471 || output.is_some()
472 || log.get("output_summary").and_then(Value::as_str).is_some()
473 || log.get("nomination").and_then(Value::as_str).is_some()
474 || log.get("output").and_then(Value::as_str).is_some();
475 let retryable_discard = current_state == "discarded"
476 && matches!(
477 log.get("distill_note").and_then(Value::as_str),
478 Some("insufficient_material" | "abandoned" | "timed_out")
479 );
480 let new_state = if current_state == "open"
481 && matches!(lifecycle_state, "abandoned" | "timed_out")
482 {
483 Some("discarded")
484 } else if lifecycle_completed && (current_state == "open" || retryable_discard) {
485 if has_material {
486 Some("new")
487 } else {
488 Some("discarded")
489 }
490 } else {
491 None
492 };
493 if let Some(state) = new_state {
494 let note = if state == "discarded" {
495 Some(if matches!(lifecycle_state, "abandoned" | "timed_out") {
496 lifecycle_state
497 } else {
498 "insufficient_material"
499 })
500 } else {
501 None
502 };
503 let outcome_str = outcome.map(str::to_string);
504 self.storage.update_episodic_log_state(
505 trace_id,
506 state,
507 note,
508 outcome_str.as_deref(),
509 )?;
510 if state == "new" && retryable_discard {
511 self.storage.conn_execute(
512 "UPDATE episodic_log SET distill_note=NULL WHERE trace_id=?",
513 rusqlite::params![trace_id],
514 )?;
515 }
516 } else if outcome.is_some() {
517 let outcome_str = outcome.map(str::to_string);
518 self.storage.update_episodic_log_state(
519 trace_id,
520 current_state,
521 None,
522 outcome_str.as_deref(),
523 )?;
524 }
525
526 self.storage.commit()
527 })();
528 if result.is_err() {
529 let _ = self.storage.rollback();
530 }
531 result?;
532 self.enqueue_evolve_if_needed(&now)?;
533 Ok(())
534 }
535}