1use super::*;
2
3impl KnowledgeBase {
4 pub fn evolve(&self, trigger: &str) -> Result<Value> {
5 if !matches!(trigger, "manual" | "scheduled" | "threshold") {
6 return Err(InnateError::InvalidState(format!(
7 "invalid evolve trigger: {trigger}"
8 )));
9 }
10 let evolve_started_at = utc_now_iso();
11 let retry_cutoff = minutes_ago(&evolve_started_at, 5);
12 let recovered_failed = self.storage.conn_execute_count(
13 "UPDATE episodic_log
14 SET distill_state='new', distill_note='retry_failed',
15 distill_locked_at=NULL, distill_run_id=NULL
16 WHERE distill_state='failed'
17 AND distill_attempts < 3
18 AND COALESCE(distill_last_failed_at, distill_accounted_at, ts) < ?",
19 rusqlite::params![retry_cutoff],
20 )?;
21 if recovered_failed > 0 {
22 self.storage
23 .request_evolve(&gen_uuid(), "distill_retry", &evolve_started_at)?;
24 }
25 if trigger == "scheduled" {
26 let age_cutoff = hours_ago(&evolve_started_at, self.evolve_schedule_interval_hours);
27 let aged_new = count_query_params(
28 &self.storage,
29 "SELECT COUNT(*) FROM episodic_log
30 WHERE distill_state='new' AND ts <= ?",
31 rusqlite::params![age_cutoff],
32 )?;
33 if aged_new > 0 {
34 self.storage
35 .request_evolve(&gen_uuid(), "scheduled", &evolve_started_at)?;
36 }
37 }
38 let request = self.storage.claim_evolve_request_with_reason(
39 &evolve_started_at,
40 &minutes_ago(&evolve_started_at, self.screening_timeout_minutes),
41 )?;
42 let request_id = request.as_ref().map(|claim| claim.id.as_str());
43 let request_reason = request.as_ref().map(|claim| claim.reason.as_str());
44 if trigger == "scheduled" && request_id.is_none() {
46 let curator = Arc::clone(&self.curator);
47 let curate = curator.run(self, &CurateScope::default())?;
48 return Ok(json!({
49 "distilled": 0,
50 "curate": self.format_curate_report(&curate),
51 "skipped": "no_evolve_request"
52 }));
53 }
54
55 if trigger == "threshold" {
57 let rows = self.storage.query_chunks(
58 "SELECT COUNT(*) AS cnt FROM episodic_log WHERE distill_state='new'",
59 )?;
60 let cnt = rows
61 .first()
62 .and_then(|r| r.get("cnt"))
63 .and_then(Value::as_i64)
64 .unwrap_or(0);
65 if cnt < self.evolve_threshold {
66 let curator = Arc::clone(&self.curator);
67 let curate = curator.run(self, &CurateScope::default())?;
68 if let Some(id) = request_id {
69 if matches!(request_reason, Some("governance" | "governance_ready")) {
70 self.storage.finish_evolve_request(
71 id,
72 "completed",
73 Some("curate_only"),
74 &utc_now_iso(),
75 )?;
76 } else {
77 self.storage.defer_evolve_request(
78 id,
79 "below_threshold",
80 &hours_after(
81 &evolve_started_at,
82 self.evolve_schedule_interval_hours.max(1),
83 ),
84 )?;
85 }
86 }
87 return Ok(json!({
88 "distilled": 0,
89 "curate": self.format_curate_report(&curate),
90 "skipped": "below_threshold"
91 }));
92 }
93 }
94
95 if trigger != "manual" {
96 if let Some(limit) = self
97 .storage
98 .get_meta("max_distill_tokens_per_period")?
99 .and_then(|value| value.parse::<i64>().ok())
100 .filter(|value| *value > 0)
101 {
102 let period_start = self.distill_token_period_start(&evolve_started_at)?;
103 let rows = self.storage.query_chunks_params(
104 "SELECT COALESCE(SUM(prompt_tokens + completion_tokens),0) AS used
105 FROM distill_token_usage
106 WHERE accounted_at >= ?",
107 rusqlite::params![period_start],
108 )?;
109 let used_tokens = rows
110 .first()
111 .and_then(|row| row.get("used"))
112 .and_then(Value::as_i64)
113 .unwrap_or(0);
114 if used_tokens >= limit {
115 let curator = Arc::clone(&self.curator);
116 let curate = curator.run(self, &CurateScope::default())?;
117 if let Some(id) = request_id {
118 if matches!(request_reason, Some("governance" | "governance_ready")) {
119 self.storage.finish_evolve_request(
120 id,
121 "completed",
122 Some("curate_only"),
123 &utc_now_iso(),
124 )?;
125 } else {
126 self.storage.defer_evolve_request(
127 id,
128 "distill_token_limit",
129 &hours_after(&evolve_started_at, 1),
130 )?;
131 }
132 }
133 return Ok(json!({
134 "distilled": 0,
135 "curate": self.format_curate_report(&curate),
136 "skipped": "distill_token_limit",
137 "distill_tokens_used": used_tokens,
138 "distill_token_limit": limit,
139 "period_start": period_start,
140 }));
141 }
142 }
143 }
144
145 let result = (|| -> Result<Value> {
146 let distill = self.distill_batch()?;
147 let curator = Arc::clone(&self.curator);
148 let curate = curator.run(self, &CurateScope::default())?;
149 Ok(json!({
150 "distilled": distill.distilled,
151 "distill_failed": distill.failed,
152 "curate": self.format_curate_report(&curate),
153 }))
154 })();
155 if let Some(id) = request_id {
156 let (state, note) = match &result {
157 Ok(_) => ("completed", None),
158 Err(error) => ("failed", Some(error.to_string())),
159 };
160 self.storage
161 .finish_evolve_request(id, state, note.as_deref(), &utc_now_iso())?;
162 }
163 if result.is_ok() {
164 self.storage
165 .finish_covered_evolve_requests(&evolve_started_at, &utc_now_iso())?;
166 }
167
168 let failed_remaining = count_query(
169 &self.storage,
170 "SELECT COUNT(*) FROM episodic_log
171 WHERE distill_state='failed' AND distill_attempts < 3",
172 )?;
173 if failed_remaining > 0 {
174 self.storage.request_evolve_at(
175 &gen_uuid(),
176 "distill_retry",
177 &utc_now_iso(),
178 Some(&minutes_after(&utc_now_iso(), 5)),
179 )?;
180 }
181
182 if result.is_ok() {
184 let remaining = count_query(
185 &self.storage,
186 "SELECT COUNT(*) FROM episodic_log WHERE distill_state='new'",
187 )?;
188 if remaining > 0 {
189 let _ = self
190 .storage
191 .request_evolve(&gen_uuid(), "batch_continue", &utc_now_iso());
192 }
193 }
194 result
195 }
196
197 fn format_curate_report(&self, curate: &CurateReport) -> Value {
198 json!({
199 "archived": curate.archived.len(),
200 "deduped": curate.deduped.len(),
201 "decayed": curate.decayed.len(),
202 "recovered": curate.recovered.len(),
203 "orphans": curate.orphans.len(),
204 "warnings": curate.warnings,
205 })
206 }
207
208 fn distill_batch(&self) -> Result<DistillBatchReport> {
209 let run_id = gen_uuid();
210 let now = utc_now_iso();
211
212 self.storage.begin_immediate()?;
214 let logs = match self
215 .storage
216 .claim_distill_batch(&run_id, self.distill_batch_size, &now)
217 {
218 Ok(l) => {
219 self.storage.commit()?;
220 l
221 }
222 Err(e) => {
223 let _ = self.storage.rollback();
224 return Err(e);
225 }
226 };
227
228 let mut chunks_by_log: HashMap<String, Vec<DistilledChunk>> = HashMap::new();
229 let mut failed_logs = HashSet::new();
230 let mut distill_errors = Vec::new();
231 for log in &logs {
232 let log_id = log.get("id").and_then(Value::as_str).unwrap_or("");
233 let context_key = log.get("context_key").and_then(Value::as_str);
234 let related_logs: Vec<Value> = logs
235 .iter()
236 .filter(|other| {
237 other.get("id").and_then(Value::as_str) == Some(log_id)
238 || (context_key.is_some()
239 && other.get("context_key").and_then(Value::as_str) == context_key)
240 })
241 .cloned()
242 .collect();
243 match self.distiller.distill_with_context(log, &related_logs) {
244 Ok(chunks) => {
245 if chunks.iter().any(|chunk| chunk.source_log_id != log_id) {
246 let error = "distiller returned a chunk for an unknown source log";
247 failed_logs.insert(log_id.to_string());
248 distill_errors.push(format!("{log_id}: {error}"));
249 self.finish_distill_log(
250 log_id,
251 "failed",
252 Some(&format!("distill_failed:{error}")),
253 estimate_distill_prompt_tokens(log, &related_logs),
254 0,
255 )?;
256 continue;
257 }
258 chunks_by_log.insert(log_id.to_string(), chunks);
259 }
260 Err(error) => {
261 let note = format!("distill_failed:{error}");
262 failed_logs.insert(log_id.to_string());
263 distill_errors.push(format!("{log_id}: {error}"));
264 self.finish_distill_log(
265 log_id,
266 "failed",
267 Some(¬e),
268 estimate_distill_prompt_tokens(log, &related_logs),
269 0,
270 )?;
271 }
272 }
273 }
274
275 let mut count = 0;
276 let provenance = self.distiller.provenance();
277 for log in &logs {
278 let log_id = log.get("id").and_then(Value::as_str).unwrap_or("");
279 if failed_logs.contains(log_id) {
280 continue;
281 }
282 let context_key = log.get("context_key").and_then(Value::as_str);
283 let related_logs: Vec<Value> = logs
284 .iter()
285 .filter(|other| {
286 other.get("id").and_then(Value::as_str) == Some(log_id)
287 || (context_key.is_some()
288 && other.get("context_key").and_then(Value::as_str) == context_key)
289 })
290 .cloned()
291 .collect();
292 let prompt_tokens = estimate_distill_prompt_tokens(log, &related_logs);
293 let chunks = chunks_by_log.remove(log_id).unwrap_or_default();
294 let completion_tokens = chunks
295 .iter()
296 .map(estimate_distilled_chunk_tokens)
297 .sum::<i64>();
298 if chunks.is_empty() {
299 self.finish_distill_log(
300 log_id,
301 "discarded",
302 Some("insufficient_material"),
303 prompt_tokens,
304 completion_tokens,
305 )?;
306 continue;
307 }
308
309 struct PreparedChunk {
314 row: ChunkRow,
315 cvec_bytes: Vec<u8>,
316 tvec_bytes: Vec<u8>,
317 }
318 let mut prepared: Vec<PreparedChunk> = Vec::with_capacity(chunks.len());
319 let mut embedding_failures = 0_usize;
320 for dc in chunks {
321 let (content, action) = self.sanitize_content(&dc.content);
322 if action == SanitizeAction::Discard {
323 continue; }
325 let h = content_hash(&content);
326 if self.storage.is_hash_invalidated(&h)? {
327 continue; }
329 let redacted = action == SanitizeAction::Redact;
330 let conf = if redacted { 0.4 } else { 0.55 };
331 let now2 = utc_now_iso();
332 let chunk_id = gen_uuid();
333 let tokens = estimate_tokens(&content) as i64;
334 let row = ChunkRow {
335 id: chunk_id,
336 content: content.clone(),
337 trigger_desc: dc.trigger_desc.clone(),
338 anti_trigger_desc: dc.anti_trigger_desc,
339 content_hash: h,
340 token_count: Some(tokens),
341 origin: "distilled".to_string(),
342 distilled_from: Some(dc.source_log_id),
343 distill_provider: provenance.provider.clone(),
344 distill_model: provenance.model.clone(),
345 distill_prompt_version: provenance.prompt_version.clone(),
346 state: "pending".to_string(),
347 state_reason: Some("init:distilled".to_string()),
348 confidence: conf,
349 confidence_reason: Some("init:distilled".to_string()),
350 version: 1,
351 embed_version: 1,
352 created_at: now2.clone(),
353 updated_at: now2,
354 ..Default::default()
355 };
356 let cvec = match self.embedding.embed_content(&content) {
357 Ok(v) => v,
358 Err(_) => {
359 embedding_failures += 1;
360 continue;
361 }
362 };
363 let tvec = match self
364 .embedding
365 .embed_trigger(row.trigger_desc.as_deref().unwrap_or(&content))
366 {
367 Ok(v) => v,
368 Err(_) => {
369 embedding_failures += 1;
370 continue;
371 }
372 };
373 prepared.push(PreparedChunk {
374 row,
375 cvec_bytes: pack_embedding(&cvec),
376 tvec_bytes: pack_embedding(&tvec),
377 });
378 }
379
380 if prepared.is_empty() {
381 let note = if embedding_failures > 0 {
382 "embedding_failed"
383 } else {
384 "all_chunks_filtered"
385 };
386 self.finish_distill_log(
387 log_id,
388 if embedding_failures > 0 {
389 "failed"
390 } else {
391 "discarded"
392 },
393 Some(note),
394 prompt_tokens,
395 completion_tokens,
396 )?;
397 if embedding_failures > 0 {
398 failed_logs.insert(log_id.to_string());
399 }
400 continue;
401 }
402
403 let accounted_at = utc_now_iso();
405 self.storage.begin_immediate()?;
406 let write_result = (|| -> Result<()> {
407 for pc in &prepared {
408 self.storage.insert_chunk(&pc.row)?;
409 self.storage
410 .insert_vec_content(&pc.row.id, &pc.cvec_bytes)?;
411 self.storage
412 .insert_vec_trigger(&pc.row.id, &pc.tvec_bytes)?;
413 }
414 let note = (embedding_failures > 0)
415 .then(|| format!("partial_embedding_failures:{embedding_failures}"));
416 self.storage.finish_distill_log(
417 log_id,
418 "distilled",
419 note.as_deref(),
420 prompt_tokens,
421 completion_tokens,
422 &accounted_at,
423 )?;
424 self.storage.commit()
425 })();
426 if let Err(error) = write_result {
427 let _ = self.storage.rollback();
428 let note = format!("distill_write_failed:{error}");
429 self.finish_distill_log(
430 log_id,
431 "failed",
432 Some(¬e),
433 prompt_tokens,
434 completion_tokens,
435 )?;
436 failed_logs.insert(log_id.to_string());
437 continue;
438 }
439 count += 1;
440 }
441 if !distill_errors.is_empty() {
442 eprintln!(
446 "[innate] distillation partial failure ({} log(s)): {}",
447 distill_errors.len(),
448 distill_errors.join("; ")
449 );
450 }
451 Ok(DistillBatchReport {
452 distilled: count,
453 failed: failed_logs.len(),
454 })
455 }
456
457 fn finish_distill_log(
458 &self,
459 log_id: &str,
460 state: &str,
461 note: Option<&str>,
462 prompt_tokens: i64,
463 completion_tokens: i64,
464 ) -> Result<()> {
465 let accounted_at = utc_now_iso();
466 self.storage.begin_immediate()?;
467 let result = (|| -> Result<()> {
468 self.storage.finish_distill_log(
469 log_id,
470 state,
471 note,
472 prompt_tokens,
473 completion_tokens,
474 &accounted_at,
475 )?;
476 self.storage.commit()
477 })();
478 if result.is_err() {
479 let _ = self.storage.rollback();
480 }
481 result
482 }
483
484 pub(super) fn distill_token_period_start(&self, now: &str) -> Result<String> {
485 let window_hours = self
486 .storage
487 .get_meta("evolve.distill_token_window_hours")?
488 .and_then(|value| value.parse::<i64>().ok())
489 .unwrap_or(24)
490 .max(1);
491 Ok(hours_ago(now, window_hours))
492 }
493}