1use super::*;
2
3impl KnowledgeBase {
4 pub fn evolve(&self, trigger: &str) -> Result<Value> {
5 if !matches!(trigger, "manual" | "scheduled" | "threshold") {
6 return Err(InnateError::InvalidState(format!(
7 "invalid evolve trigger: {trigger}"
8 )));
9 }
10 let evolve_started_at = utc_now_iso();
11 let retry_cutoff = minutes_ago(&evolve_started_at, 5);
12 let recovered_failed = self.storage.conn_execute_count(
13 "UPDATE episodic_log
14 SET distill_state='new', distill_note='retry_failed',
15 distill_locked_at=NULL, distill_run_id=NULL
16 WHERE distill_state='failed'
17 AND distill_attempts < 3
18 AND COALESCE(distill_last_failed_at, distill_accounted_at, ts) < ?",
19 rusqlite::params![retry_cutoff],
20 )?;
21 if recovered_failed > 0 {
22 self.storage
23 .request_evolve(&gen_uuid(), "distill_retry", &evolve_started_at)?;
24 }
25 if trigger == "scheduled" {
26 let age_cutoff = hours_ago(&evolve_started_at, self.evolve_schedule_interval_hours);
27 let aged_new = count_query_params(
28 &self.storage,
29 "SELECT COUNT(*) FROM episodic_log
30 WHERE distill_state='new' AND ts <= ?",
31 rusqlite::params![age_cutoff],
32 )?;
33 if aged_new > 0 {
34 self.storage
35 .request_evolve(&gen_uuid(), "scheduled", &evolve_started_at)?;
36 }
37 }
38 let request = self.storage.claim_evolve_request_with_reason(
39 &evolve_started_at,
40 &minutes_ago(&evolve_started_at, self.screening_timeout_minutes),
41 )?;
42 let request_id = request.as_ref().map(|claim| claim.id.as_str());
43 let request_reason = request.as_ref().map(|claim| claim.reason.as_str());
44 if trigger == "scheduled" && request_id.is_none() {
46 let curator = Arc::clone(&self.curator);
47 let curate = curator.run(self, &CurateScope::default())?;
48 return Ok(json!({
49 "distilled": 0,
50 "curate": self.format_curate_report(&curate),
51 "skipped": "no_evolve_request"
52 }));
53 }
54
55 if trigger == "threshold" {
57 let rows = self.storage.query_chunks(
58 "SELECT COUNT(*) AS cnt FROM episodic_log WHERE distill_state='new'",
59 )?;
60 let cnt = rows
61 .first()
62 .and_then(|r| r.get("cnt"))
63 .and_then(Value::as_i64)
64 .unwrap_or(0);
65 if cnt < self.evolve_threshold {
66 let curator = Arc::clone(&self.curator);
67 let curate = curator.run(self, &CurateScope::default())?;
68 if let Some(id) = request_id {
69 if matches!(request_reason, Some("governance" | "governance_ready")) {
70 self.storage.finish_evolve_request(
71 id,
72 "completed",
73 Some("curate_only"),
74 &utc_now_iso(),
75 )?;
76 } else {
77 self.storage.defer_evolve_request(
78 id,
79 "below_threshold",
80 &hours_after(
81 &evolve_started_at,
82 self.evolve_schedule_interval_hours.max(1),
83 ),
84 )?;
85 }
86 }
87 return Ok(json!({
88 "distilled": 0,
89 "curate": self.format_curate_report(&curate),
90 "skipped": "below_threshold"
91 }));
92 }
93 }
94
95 if trigger != "manual" {
96 if let Some(limit) = self
97 .storage
98 .get_meta("max_distill_tokens_per_period")?
99 .and_then(|value| value.parse::<i64>().ok())
100 .filter(|value| *value > 0)
101 {
102 let period_start = self.distill_token_period_start(&evolve_started_at)?;
103 let rows = self.storage.query_chunks_params(
104 "SELECT COALESCE(SUM(prompt_tokens + completion_tokens),0) AS used
105 FROM distill_token_usage
106 WHERE accounted_at >= ?",
107 rusqlite::params![period_start],
108 )?;
109 let used_tokens = rows
110 .first()
111 .and_then(|row| row.get("used"))
112 .and_then(Value::as_i64)
113 .unwrap_or(0);
114 if used_tokens >= limit {
115 let curator = Arc::clone(&self.curator);
116 let curate = curator.run(self, &CurateScope::default())?;
117 if let Some(id) = request_id {
118 if matches!(request_reason, Some("governance" | "governance_ready")) {
119 self.storage.finish_evolve_request(
120 id,
121 "completed",
122 Some("curate_only"),
123 &utc_now_iso(),
124 )?;
125 } else {
126 self.storage.defer_evolve_request(
127 id,
128 "distill_token_limit",
129 &hours_after(&evolve_started_at, 1),
130 )?;
131 }
132 }
133 return Ok(json!({
134 "distilled": 0,
135 "curate": self.format_curate_report(&curate),
136 "skipped": "distill_token_limit",
137 "distill_tokens_used": used_tokens,
138 "distill_token_limit": limit,
139 "period_start": period_start,
140 }));
141 }
142 }
143 }
144
145 let result = (|| -> Result<Value> {
146 let distill = self.distill_batch()?;
147 let curator = Arc::clone(&self.curator);
148 let curate = curator.run(self, &CurateScope::default())?;
149 Ok(json!({
150 "distilled": distill.distilled,
151 "distill_failed": distill.failed,
152 "curate": self.format_curate_report(&curate),
153 }))
154 })();
155 if let Some(id) = request_id {
156 let (state, note) = match &result {
157 Ok(_) => ("completed", None),
158 Err(error) => ("failed", Some(error.to_string())),
159 };
160 self.storage
161 .finish_evolve_request(id, state, note.as_deref(), &utc_now_iso())?;
162 }
163 if result.is_ok() {
164 self.storage
165 .finish_covered_evolve_requests(&evolve_started_at, &utc_now_iso())?;
166 }
167
168 let failed_remaining = count_query(
169 &self.storage,
170 "SELECT COUNT(*) FROM episodic_log
171 WHERE distill_state='failed' AND distill_attempts < 3",
172 )?;
173 if failed_remaining > 0 {
174 self.storage.request_evolve_at(
175 &gen_uuid(),
176 "distill_retry",
177 &utc_now_iso(),
178 Some(&minutes_after(&utc_now_iso(), 5)),
179 )?;
180 }
181
182 if result.is_ok() {
184 let remaining = count_query(
185 &self.storage,
186 "SELECT COUNT(*) FROM episodic_log WHERE distill_state='new'",
187 )?;
188 if remaining > 0 {
189 let _ = self
190 .storage
191 .request_evolve(&gen_uuid(), "batch_continue", &utc_now_iso());
192 }
193 }
194 result
195 }
196
197 fn format_curate_report(&self, curate: &CurateReport) -> Value {
198 json!({
199 "archived": curate.archived.len(),
200 "deduped": curate.deduped.len(),
201 "decayed": curate.decayed.len(),
202 "recovered": curate.recovered.len(),
203 "orphans": curate.orphans.len(),
204 "warnings": curate.warnings,
205 })
206 }
207
208 fn distill_batch(&self) -> Result<DistillBatchReport> {
209 let run_id = gen_uuid();
210 let now = utc_now_iso();
211
212 self.storage.begin_immediate()?;
214 let logs = match self
215 .storage
216 .claim_distill_batch(&run_id, self.distill_batch_size, &now)
217 {
218 Ok(l) => {
219 self.storage.commit()?;
220 l
221 }
222 Err(e) => {
223 let _ = self.storage.rollback();
224 return Err(e);
225 }
226 };
227
228 let mut chunks_by_log: HashMap<String, Vec<DistilledChunk>> = HashMap::new();
229 let mut failed_logs = HashSet::new();
230 let mut distill_errors = Vec::new();
231 for log in &logs {
232 let log_id = log.get("id").and_then(Value::as_str).unwrap_or("");
233 let context_key = log.get("context_key").and_then(Value::as_str);
234 let related_logs: Vec<Value> = logs
235 .iter()
236 .filter(|other| {
237 other.get("id").and_then(Value::as_str) == Some(log_id)
238 || (context_key.is_some()
239 && other.get("context_key").and_then(Value::as_str) == context_key)
240 })
241 .cloned()
242 .collect();
243 match self.distiller.distill_with_context(log, &related_logs) {
244 Ok(chunks) => {
245 if chunks.iter().any(|chunk| chunk.source_log_id != log_id) {
246 let error = "distiller returned a chunk for an unknown source log";
247 failed_logs.insert(log_id.to_string());
248 distill_errors.push(format!("{log_id}: {error}"));
249 self.finish_distill_log(
250 log_id,
251 "failed",
252 Some(&format!("distill_failed:{error}")),
253 estimate_distill_prompt_tokens(log, &related_logs),
254 0,
255 )?;
256 continue;
257 }
258 chunks_by_log.insert(log_id.to_string(), chunks);
259 }
260 Err(error) => {
261 let note = format!("distill_failed:{error}");
262 failed_logs.insert(log_id.to_string());
263 distill_errors.push(format!("{log_id}: {error}"));
264 self.finish_distill_log(
265 log_id,
266 "failed",
267 Some(¬e),
268 estimate_distill_prompt_tokens(log, &related_logs),
269 0,
270 )?;
271 }
272 }
273 }
274
275 let mut count = 0;
276 let provenance = self.distiller.provenance();
277 for log in &logs {
278 let log_id = log.get("id").and_then(Value::as_str).unwrap_or("");
279 if failed_logs.contains(log_id) {
280 continue;
281 }
282 let context_key = log.get("context_key").and_then(Value::as_str);
283 let related_logs: Vec<Value> = logs
284 .iter()
285 .filter(|other| {
286 other.get("id").and_then(Value::as_str) == Some(log_id)
287 || (context_key.is_some()
288 && other.get("context_key").and_then(Value::as_str) == context_key)
289 })
290 .cloned()
291 .collect();
292 let prompt_tokens = estimate_distill_prompt_tokens(log, &related_logs);
293 let chunks = chunks_by_log.remove(log_id).unwrap_or_default();
294 let completion_tokens = chunks
295 .iter()
296 .map(estimate_distilled_chunk_tokens)
297 .sum::<i64>();
298 if chunks.is_empty() {
299 self.finish_distill_log(
300 log_id,
301 "discarded",
302 Some("insufficient_material"),
303 prompt_tokens,
304 completion_tokens,
305 )?;
306 continue;
307 }
308
309 struct PreparedChunk {
314 row: ChunkRow,
315 cvec_bytes: Vec<u8>,
316 tvec_bytes: Vec<u8>,
317 }
318 let mut prepared: Vec<PreparedChunk> = Vec::with_capacity(chunks.len());
319 let mut embedding_failures = 0_usize;
320 for dc in chunks {
321 let (content, action) = self.sanitize_content(&dc.content);
322 if action == SanitizeAction::Discard {
323 continue; }
325 let h = content_hash(&content);
326 if self.storage.is_hash_invalidated(&h)? {
327 continue; }
329 let redacted = action == SanitizeAction::Redact;
330 let conf = if redacted { 0.4 } else { 0.55 };
331 let now2 = utc_now_iso();
332 let chunk_id = gen_uuid();
333 let tokens = estimate_tokens(&content) as i64;
334 let skill_name = dc
337 .skill_name
338 .clone()
339 .or_else(|| dc.trigger_desc.clone())
340 .filter(|s| !s.trim().is_empty());
341 let row = ChunkRow {
342 id: chunk_id,
343 skill_name,
344 content: content.clone(),
345 trigger_desc: dc.trigger_desc.clone(),
346 anti_trigger_desc: dc.anti_trigger_desc,
347 content_hash: h,
348 token_count: Some(tokens),
349 origin: "distilled".to_string(),
350 distilled_from: Some(dc.source_log_id),
351 distill_provider: provenance.provider.clone(),
352 distill_model: provenance.model.clone(),
353 distill_prompt_version: provenance.prompt_version.clone(),
354 state: "pending".to_string(),
355 state_reason: Some("init:distilled".to_string()),
356 confidence: conf,
357 confidence_reason: Some("init:distilled".to_string()),
358 version: 1,
359 embed_version: 1,
360 created_at: now2.clone(),
361 updated_at: now2,
362 ..Default::default()
363 };
364 let cvec = match self.embedding.embed_content(&content) {
365 Ok(v) if v.len() == self.embedding.content_dim() => v,
366 _ => {
370 embedding_failures += 1;
371 continue;
372 }
373 };
374 let tvec = match self
375 .embedding
376 .embed_trigger(row.trigger_desc.as_deref().unwrap_or(&content))
377 {
378 Ok(v) if v.len() == self.embedding.trigger_dim() => v,
379 _ => {
380 embedding_failures += 1;
381 continue;
382 }
383 };
384 prepared.push(PreparedChunk {
385 row,
386 cvec_bytes: pack_embedding(&cvec),
387 tvec_bytes: pack_embedding(&tvec),
388 });
389 }
390
391 if prepared.is_empty() {
392 let note = if embedding_failures > 0 {
393 "embedding_failed"
394 } else {
395 "all_chunks_filtered"
396 };
397 self.finish_distill_log(
398 log_id,
399 if embedding_failures > 0 {
400 "failed"
401 } else {
402 "discarded"
403 },
404 Some(note),
405 prompt_tokens,
406 completion_tokens,
407 )?;
408 if embedding_failures > 0 {
409 failed_logs.insert(log_id.to_string());
410 }
411 continue;
412 }
413
414 let accounted_at = utc_now_iso();
416 self.storage.begin_immediate()?;
417 let write_result = (|| -> Result<()> {
418 for pc in &prepared {
419 self.storage.insert_chunk(&pc.row)?;
420 self.storage
421 .insert_vec_content(&pc.row.id, &pc.cvec_bytes)?;
422 self.storage
423 .insert_vec_trigger(&pc.row.id, &pc.tvec_bytes)?;
424 }
425 let note = (embedding_failures > 0)
426 .then(|| format!("partial_embedding_failures:{embedding_failures}"));
427 self.storage.finish_distill_log(
428 log_id,
429 "distilled",
430 note.as_deref(),
431 prompt_tokens,
432 completion_tokens,
433 &accounted_at,
434 )?;
435 self.storage.commit()
436 })();
437 if let Err(error) = write_result {
438 let _ = self.storage.rollback();
439 let note = format!("distill_write_failed:{error}");
440 self.finish_distill_log(
441 log_id,
442 "failed",
443 Some(¬e),
444 prompt_tokens,
445 completion_tokens,
446 )?;
447 failed_logs.insert(log_id.to_string());
448 continue;
449 }
450 count += 1;
451 }
452 if !distill_errors.is_empty() {
453 eprintln!(
457 "[innate] distillation partial failure ({} log(s)): {}",
458 distill_errors.len(),
459 distill_errors.join("; ")
460 );
461 }
462 Ok(DistillBatchReport {
463 distilled: count,
464 failed: failed_logs.len(),
465 })
466 }
467
468 fn finish_distill_log(
469 &self,
470 log_id: &str,
471 state: &str,
472 note: Option<&str>,
473 prompt_tokens: i64,
474 completion_tokens: i64,
475 ) -> Result<()> {
476 let accounted_at = utc_now_iso();
477 self.storage.begin_immediate()?;
478 let result = (|| -> Result<()> {
479 self.storage.finish_distill_log(
480 log_id,
481 state,
482 note,
483 prompt_tokens,
484 completion_tokens,
485 &accounted_at,
486 )?;
487 self.storage.commit()
488 })();
489 if result.is_err() {
490 let _ = self.storage.rollback();
491 }
492 result
493 }
494
495 pub(super) fn distill_token_period_start(&self, now: &str) -> Result<String> {
496 let window_hours = self
497 .storage
498 .get_meta("evolve.distill_token_window_hours")?
499 .and_then(|value| value.parse::<i64>().ok())
500 .unwrap_or(24)
501 .max(1);
502 Ok(hours_ago(now, window_hours))
503 }
504}