1use super::*;
2
3mod evidence;
4
5impl KnowledgeBase {
6 #[allow(clippy::too_many_arguments)]
7 pub fn record(
8 &self,
9 trace_id: &str,
10 query: Option<&str>,
11 output: Option<&str>,
12 output_summary: Option<&str>,
13 outcome: Option<&str>,
14 used: Option<&[String]>,
15 feedback_up: Option<&[String]>,
16 feedback_down: Option<&[String]>,
17 nomination: Option<&str>,
18 priority: i64,
19 source: &str,
20 ) -> Result<()> {
21 self.record_detailed(
22 trace_id,
23 query,
24 output,
25 output_summary,
26 outcome,
27 used,
28 "explicit",
29 true,
30 feedback_up,
31 feedback_down,
32 "user",
33 None,
34 None,
35 nomination,
36 priority,
37 None,
38 source,
39 )
40 }
41
42 #[allow(clippy::too_many_arguments)]
43 pub fn record_detailed(
44 &self,
45 trace_id: &str,
46 query: Option<&str>,
47 output: Option<&str>,
48 output_summary: Option<&str>,
49 outcome: Option<&str>,
50 used: Option<&[String]>,
51 used_attribution: &str,
52 used_complete: bool,
53 feedback_up: Option<&[String]>,
54 feedback_down: Option<&[String]>,
55 feedback_kind: &str,
56 feedback_actor: Option<&str>,
57 feedback_reason: Option<&str>,
58 nomination: Option<&str>,
59 priority: i64,
60 task_state: Option<&str>,
61 source: &str,
62 ) -> Result<()> {
63 let dedupe_ids = |ids: &[String]| {
64 let mut seen = HashSet::new();
65 ids.iter()
66 .filter(|id| seen.insert((*id).clone()))
67 .cloned()
68 .collect::<Vec<_>>()
69 };
70 let normalized_used = used.map(dedupe_ids);
71 let normalized_feedback_up = feedback_up.map(dedupe_ids);
72 let normalized_feedback_down = feedback_down.map(dedupe_ids);
73 let used = normalized_used.as_deref();
74 let feedback_up = normalized_feedback_up.as_deref();
75 let feedback_down = normalized_feedback_down.as_deref();
76
77 if let Some(o) = outcome {
78 if !matches!(o, "ok" | "fail" | "unknown") {
79 return Err(InnateError::InvalidState(format!("invalid outcome: {o}")));
80 }
81 }
82 if !matches!(used_attribution, "explicit" | "cited" | "inferred") {
83 return Err(InnateError::InvalidState(format!(
84 "invalid used attribution: {used_attribution}"
85 )));
86 }
87 if !matches!(feedback_kind, "user" | "judge") {
88 return Err(InnateError::InvalidState(format!(
89 "invalid feedback kind: {feedback_kind}"
90 )));
91 }
92 if let Some(state) = task_state {
93 if !matches!(
94 state,
95 "recalled" | "running" | "completed" | "abandoned" | "timed_out"
96 ) {
97 return Err(InnateError::InvalidState(format!(
98 "invalid task state: {state}"
99 )));
100 }
101 }
102 validate_source(source)?;
103 if let (Some(ups), Some(downs)) = (feedback_up, feedback_down) {
104 let down_set: HashSet<&str> = downs.iter().map(String::as_str).collect();
105 if let Some(chunk_id) = ups.iter().find(|id| down_set.contains(id.as_str())) {
106 return Err(InnateError::InvalidState(format!(
107 "conflicting feedback for chunk {chunk_id}"
108 )));
109 }
110 }
111 let effective_priority = if nomination.is_some() && priority == 0 {
112 1
113 } else {
114 priority
115 };
116 let now = utc_now_iso();
117 let lib_id = self.storage.lib_id()?;
118
119 self.storage.begin_immediate()?;
120 let result = (|| -> Result<()> {
121 let log = self.storage.get_episodic_log(trace_id)?;
122 let mut is_fresh_insert = false;
123 let log = match log {
124 Some(l) => l,
125 None => {
126 let used_ids = used.map(serde_json::to_string).transpose()?;
127 let row = EpisodicLogRow {
128 id: gen_uuid(),
129 trace_id: trace_id.to_string(),
130 lib_id,
131 ts: now.clone(),
132 query: query.map(str::to_string).or_else(|| Some(String::new())),
133 output: output.map(str::to_string),
134 output_summary: output_summary.map(str::to_string),
135 outcome: outcome.map(str::to_string),
136 event_source: source.to_string(),
137 task_state: if matches!(outcome, Some("ok") | Some("fail")) {
138 "completed".to_string()
139 } else {
140 task_state.unwrap_or("running").to_string()
141 },
142 completed_at: matches!(outcome, Some("ok") | Some("fail"))
143 .then(|| now.clone()),
144 usage_state: usage_state(used).to_string(),
145 used_ids,
146 used_attribution: used.map(|_| used_attribution.to_string()),
147 used_complete,
148 context_key: query.map(|q| content_hash(&normalize_query(q))),
149 nomination: nomination.map(str::to_string),
150 priority: effective_priority,
151 distill_state: "open".to_string(),
152 ..Default::default()
153 };
154 self.storage.upsert_episodic_log(&row)?;
155 is_fresh_insert = true;
156 self.storage.get_episodic_log(trace_id)?.unwrap()
157 }
158 };
159 self.validate_trace_attribution(trace_id, used, "used")?;
160 self.validate_trace_attribution(trace_id, feedback_up, "feedback_up")?;
161 self.validate_trace_attribution(trace_id, feedback_down, "feedback_down")?;
162
163 let existing_outcome = log
164 .get("outcome")
165 .and_then(Value::as_str)
166 .map(str::to_string);
167 let effective_used_attribution = if used.is_some() {
169 used_attribution
170 } else {
171 log.get("used_attribution")
172 .and_then(Value::as_str)
173 .unwrap_or(used_attribution)
174 };
175 let used_strength = match effective_used_attribution {
176 "explicit" => 0.3,
177 "cited" => 0.25,
178 "inferred" => 0.15,
179 other => {
182 return Err(InnateError::InvalidState(format!(
183 "invalid stored used attribution: {other}"
184 )))
185 }
186 };
187 let existing_used_ids: Vec<String> = log
188 .get("used_ids")
189 .and_then(Value::as_str)
190 .and_then(|raw| serde_json::from_str(raw).ok())
191 .unwrap_or_default();
192 let existing_used_complete = log
193 .get("used_complete")
194 .and_then(Value::as_i64)
195 .unwrap_or(0)
196 != 0;
197 let effective_used_complete = used_complete || existing_used_complete;
198 let effective_used_ids = used.map(|reported| {
199 if used_complete {
200 reported.to_vec()
201 } else {
202 let mut merged = existing_used_ids.clone();
203 let mut seen: HashSet<String> = merged.iter().cloned().collect();
204 merged.extend(
205 reported
206 .iter()
207 .filter(|id| seen.insert((*id).clone()))
208 .cloned(),
209 );
210 merged
211 }
212 });
213 if let Some(used_ids) = effective_used_ids.as_deref() {
214 let previously_used: HashSet<String> = existing_used_ids.iter().cloned().collect();
215 if used_complete {
216 self.storage.replace_used_trace(
217 trace_id,
218 used_ids,
219 used_strength,
220 used_attribution,
221 source,
222 &now,
223 )?;
224 } else if let Some(reported) = used {
225 self.storage.merge_used_trace(
226 trace_id,
227 reported,
228 used_strength,
229 used_attribution,
230 source,
231 &now,
232 )?;
233 }
234 let affected: HashSet<String> = previously_used
235 .into_iter()
236 .chain(used_ids.iter().cloned())
237 .collect();
238 for cid in affected {
239 self.storage.refresh_chunk_last_used(&cid, &now)?;
240 }
241 }
242
243 if let Some(o) = outcome {
245 if matches!(o, "ok" | "fail") {
246 let event = if o == "ok" { "task_ok" } else { "task_fail" };
247 let strength = if event == "task_fail" { 0.15 } else { 1.0 };
248 self.storage.conn_execute(
249 "DELETE FROM usage_trace
250 WHERE trace_id=? AND event IN ('task_ok','task_fail')
251 AND chunk_id IS NULL",
252 rusqlite::params![trace_id],
253 )?;
254 self.storage.insert_usage_trace(
255 trace_id, None, event, strength, None, None, None, None, None, source, &now,
256 )?;
257 }
258 }
259
260 let effective_outcome =
264 outcome
265 .filter(|value| *value != "unknown")
266 .or(existing_outcome
267 .as_deref()
268 .filter(|value| *value != "unknown"));
269 if let Some(o @ ("ok" | "fail")) = effective_outcome {
270 if used.is_some()
271 || (outcome.is_some_and(|value| value != "unknown")
272 && existing_outcome.as_deref() != outcome)
273 {
274 let fallback_ids: Vec<String>;
275 let effective_used: Option<&[String]> = if effective_used_ids.is_some() {
276 effective_used_ids.as_deref()
277 } else {
278 fallback_ids = log
279 .get("used_ids")
280 .and_then(Value::as_str)
281 .and_then(|s| serde_json::from_str(s).ok())
282 .unwrap_or_default();
283 if fallback_ids.is_empty() {
284 None
285 } else {
286 Some(&fallback_ids)
287 }
288 };
289 let effective_complete = if used.is_some() {
290 effective_used_complete
291 } else {
292 log.get("usage_state").and_then(Value::as_str) != Some("unknown")
293 && log
294 .get("used_complete")
295 .and_then(Value::as_i64)
296 .unwrap_or(1)
297 != 0
298 };
299 self.replace_outcome_evidence(
300 trace_id,
301 o,
302 effective_used,
303 effective_complete,
304 &now,
305 )?;
306 }
307 } else if used.is_some() && effective_used_complete {
308 self.replace_selected_unused_evidence(
309 trace_id,
310 effective_used_ids.as_deref().unwrap_or_default(),
311 &now,
312 )?;
313 }
314
315 let context_key = log
316 .get("context_key")
317 .and_then(Value::as_str)
318 .map(str::to_string)
319 .or_else(|| query.map(|q| content_hash(&normalize_query(q))));
320 let feedback_strength = if feedback_kind == "judge" { 0.6 } else { 1.0 };
321
322 let mut context_affected: HashSet<String> = HashSet::new();
326 if let Some(used_ids) = effective_used_ids.as_deref() {
327 for cid in used_ids {
328 context_affected.insert(cid.clone());
329 }
330 }
331 if let Some(ups) = feedback_up {
332 for cid in ups {
333 let corrected = self.storage.delete_feedback_event(trace_id, cid, "down")?;
334 self.storage.delete_chunk_trace_confidence_evidence(
335 trace_id,
336 cid,
337 "feedback_down",
338 )?;
339 let inserted = self.storage.insert_feedback_event(
340 &gen_uuid(),
341 trace_id,
342 cid,
343 "up",
344 feedback_strength,
345 source,
346 feedback_actor,
347 feedback_reason,
348 context_key.as_deref(),
349 &now,
350 )?;
351 if inserted > 0 {
352 self.upsert_trace_confidence_evidence(
353 trace_id,
354 cid,
355 "feedback_up",
356 1.0,
357 feedback_strength,
358 if feedback_kind == "judge" {
359 "judge_up"
360 } else {
361 "user_up"
362 },
363 context_key.as_deref(),
364 &now,
365 true,
366 )?;
367 self.storage.update_chunk_last_used(cid, &now)?;
368 self.refresh_governance_evidence(cid, &now)?;
369 context_affected.insert(cid.clone());
370 } else if corrected > 0 {
371 self.recompute_chunk_confidence(cid, &now)?;
372 self.refresh_governance_evidence(cid, &now)?;
373 context_affected.insert(cid.clone());
374 }
375 }
376 }
377 if let Some(downs) = feedback_down {
378 for cid in downs {
379 let corrected = self.storage.delete_feedback_event(trace_id, cid, "up")?;
380 self.storage.delete_chunk_trace_confidence_evidence(
381 trace_id,
382 cid,
383 "feedback_up",
384 )?;
385 let inserted = self.storage.insert_feedback_event(
386 &gen_uuid(),
387 trace_id,
388 cid,
389 "down",
390 feedback_strength,
391 source,
392 feedback_actor,
393 feedback_reason,
394 context_key.as_deref(),
395 &now,
396 )?;
397 if inserted > 0 {
398 self.upsert_trace_confidence_evidence(
399 trace_id,
400 cid,
401 "feedback_down",
402 0.0,
403 feedback_strength,
404 if feedback_kind == "judge" {
405 "judge_down"
406 } else {
407 "user_down"
408 },
409 context_key.as_deref(),
410 &now,
411 true,
412 )?;
413 self.refresh_governance_evidence(cid, &now)?;
414 context_affected.insert(cid.clone());
415 } else if corrected > 0 {
416 self.recompute_chunk_confidence(cid, &now)?;
417 self.refresh_governance_evidence(cid, &now)?;
418 context_affected.insert(cid.clone());
419 }
420 }
421 }
422 self.rebuild_context_stats_for(&context_affected, &now)?;
424
425 if !is_fresh_insert {
427 self.storage.patch_episodic_log_content(
428 trace_id,
429 query,
430 output,
431 output_summary,
432 nomination,
433 effective_priority,
434 )?;
435 }
436
437 let lifecycle_state = if effective_outcome.is_some() {
438 "completed"
439 } else {
440 task_state.unwrap_or_else(|| {
441 log.get("task_state")
442 .and_then(Value::as_str)
443 .unwrap_or("running")
444 })
445 };
446 let used_ids_json = effective_used_ids
447 .as_deref()
448 .map(serde_json::to_string)
449 .transpose()?;
450 self.storage.update_trace_lifecycle(
451 trace_id,
452 lifecycle_state,
453 (lifecycle_state == "completed").then_some(now.as_str()),
454 effective_used_ids
455 .as_deref()
456 .map(|ids| usage_state(Some(ids))),
457 used_ids_json.as_deref(),
458 used.map(|_| used_attribution),
459 used.map(|_| effective_used_complete),
460 )?;
461
462 let current_state = log
464 .get("distill_state")
465 .and_then(Value::as_str)
466 .unwrap_or("open");
467 let lifecycle_completed = lifecycle_state == "completed";
468 let has_material = output_summary.is_some()
469 || nomination.is_some()
470 || output.is_some()
471 || log.get("output_summary").and_then(Value::as_str).is_some()
472 || log.get("nomination").and_then(Value::as_str).is_some()
473 || log.get("output").and_then(Value::as_str).is_some();
474 let retryable_discard = current_state == "discarded"
475 && matches!(
476 log.get("distill_note").and_then(Value::as_str),
477 Some("insufficient_material" | "abandoned" | "timed_out")
478 );
479 let new_state = if current_state == "open"
480 && matches!(lifecycle_state, "abandoned" | "timed_out")
481 {
482 Some("discarded")
483 } else if lifecycle_completed && (current_state == "open" || retryable_discard) {
484 if has_material {
485 Some("new")
486 } else {
487 Some("discarded")
488 }
489 } else {
490 None
491 };
492 if let Some(state) = new_state {
493 let note = if state == "discarded" {
494 Some(if matches!(lifecycle_state, "abandoned" | "timed_out") {
495 lifecycle_state
496 } else {
497 "insufficient_material"
498 })
499 } else {
500 None
501 };
502 let outcome_str = outcome.map(str::to_string);
503 self.storage.update_episodic_log_state(
504 trace_id,
505 state,
506 note,
507 outcome_str.as_deref(),
508 )?;
509 if state == "new" && retryable_discard {
510 self.storage.conn_execute(
511 "UPDATE episodic_log SET distill_note=NULL WHERE trace_id=?",
512 rusqlite::params![trace_id],
513 )?;
514 }
515 } else if outcome.is_some() {
516 let outcome_str = outcome.map(str::to_string);
517 self.storage.update_episodic_log_state(
518 trace_id,
519 current_state,
520 None,
521 outcome_str.as_deref(),
522 )?;
523 }
524
525 self.storage.commit()
526 })();
527 if result.is_err() {
528 let _ = self.storage.rollback();
529 }
530 result?;
531 self.enqueue_evolve_if_needed(&now)?;
532 Ok(())
533 }
534}