1use super::*;
2
3impl KnowledgeBase {
4 pub fn evolve(&self, trigger: &str) -> Result<Value> {
5 if !matches!(trigger, "manual" | "scheduled" | "threshold") {
6 return Err(InnateError::InvalidState(format!(
7 "invalid evolve trigger: {trigger}"
8 )));
9 }
10 let evolve_started_at = utc_now_iso();
11 let retry_cutoff = minutes_ago(&evolve_started_at, 5);
12 let recovered_failed = self.storage.conn_execute_count(
13 "UPDATE episodic_log
14 SET distill_state='new', distill_note='retry_failed',
15 distill_locked_at=NULL, distill_run_id=NULL
16 WHERE distill_state='failed'
17 AND distill_attempts < 3
18 AND COALESCE(distill_last_failed_at, distill_accounted_at, ts) < ?",
19 rusqlite::params![retry_cutoff],
20 )?;
21 if recovered_failed > 0 {
22 self.storage
23 .request_evolve(&gen_uuid(), "distill_retry", &evolve_started_at)?;
24 }
25 if trigger == "scheduled" {
26 let age_cutoff = hours_ago(&evolve_started_at, self.evolve_schedule_interval_hours);
27 let aged_new = count_query_params(
28 &self.storage,
29 "SELECT COUNT(*) FROM episodic_log
30 WHERE distill_state='new' AND ts <= ?",
31 rusqlite::params![age_cutoff],
32 )?;
33 if aged_new > 0 {
34 self.storage
35 .request_evolve(&gen_uuid(), "scheduled", &evolve_started_at)?;
36 }
37 }
38 let request = self.storage.claim_evolve_request_with_reason(
39 &evolve_started_at,
40 &minutes_ago(&evolve_started_at, self.screening_timeout_minutes),
41 )?;
42 let request_id = request.as_ref().map(|claim| claim.id.as_str());
43 let request_reason = request.as_ref().map(|claim| claim.reason.as_str());
44 if trigger == "scheduled" && request_id.is_none() {
46 let curator = Arc::clone(&self.curator);
47 let curate = curator.run(self, &CurateScope::default())?;
48 return Ok(json!({
49 "distilled": 0,
50 "curate": self.format_curate_report(&curate),
51 "skipped": "no_evolve_request"
52 }));
53 }
54
55 if trigger == "threshold" {
57 let rows = self.storage.query_chunks(
58 "SELECT COUNT(*) AS cnt FROM episodic_log WHERE distill_state='new'",
59 )?;
60 let cnt = rows
61 .first()
62 .and_then(|r| r.get("cnt"))
63 .and_then(Value::as_i64)
64 .unwrap_or(0);
65 if cnt < self.evolve_threshold {
66 let curator = Arc::clone(&self.curator);
67 let curate = curator.run(self, &CurateScope::default())?;
68 if let Some(id) = request_id {
69 if matches!(request_reason, Some("governance" | "governance_ready")) {
70 self.storage.finish_evolve_request(
71 id,
72 "completed",
73 Some("curate_only"),
74 &utc_now_iso(),
75 )?;
76 } else {
77 self.storage.defer_evolve_request(
78 id,
79 "below_threshold",
80 &hours_after(
81 &evolve_started_at,
82 self.evolve_schedule_interval_hours.max(1),
83 ),
84 )?;
85 }
86 }
87 return Ok(json!({
88 "distilled": 0,
89 "curate": self.format_curate_report(&curate),
90 "skipped": "below_threshold"
91 }));
92 }
93 }
94
95 if trigger != "manual" {
96 if let Some(limit) = self
97 .storage
98 .get_meta("max_distill_tokens_per_period")?
99 .and_then(|value| value.parse::<i64>().ok())
100 .filter(|value| *value > 0)
101 {
102 let period_start = self.distill_token_period_start(&evolve_started_at)?;
103 let rows = self.storage.query_chunks_params(
104 "SELECT COALESCE(SUM(prompt_tokens + completion_tokens),0) AS used
105 FROM distill_token_usage
106 WHERE accounted_at >= ?",
107 rusqlite::params![period_start],
108 )?;
109 let used_tokens = rows
110 .first()
111 .and_then(|row| row.get("used"))
112 .and_then(Value::as_i64)
113 .unwrap_or(0);
114 if used_tokens >= limit {
115 let curator = Arc::clone(&self.curator);
116 let curate = curator.run(self, &CurateScope::default())?;
117 if let Some(id) = request_id {
118 if matches!(request_reason, Some("governance" | "governance_ready")) {
119 self.storage.finish_evolve_request(
120 id,
121 "completed",
122 Some("curate_only"),
123 &utc_now_iso(),
124 )?;
125 } else {
126 self.storage.defer_evolve_request(
127 id,
128 "distill_token_limit",
129 &hours_after(&evolve_started_at, 1),
130 )?;
131 }
132 }
133 return Ok(json!({
134 "distilled": 0,
135 "curate": self.format_curate_report(&curate),
136 "skipped": "distill_token_limit",
137 "distill_tokens_used": used_tokens,
138 "distill_token_limit": limit,
139 "period_start": period_start,
140 }));
141 }
142 }
143 }
144
145 let result = (|| -> Result<Value> {
146 let distill = self.distill_batch()?;
147 let curator = Arc::clone(&self.curator);
148 let curate = curator.run(self, &CurateScope::default())?;
149 Ok(json!({
150 "distilled": distill.distilled,
151 "distill_failed": distill.failed,
152 "curate": self.format_curate_report(&curate),
153 }))
154 })();
155 if let Some(id) = request_id {
156 let (state, note) = match &result {
157 Ok(_) => ("completed", None),
158 Err(error) => ("failed", Some(error.to_string())),
159 };
160 self.storage
161 .finish_evolve_request(id, state, note.as_deref(), &utc_now_iso())?;
162 }
163 if result.is_ok() {
164 self.storage
165 .finish_covered_evolve_requests(&evolve_started_at, &utc_now_iso())?;
166 }
167
168 let failed_remaining = count_query(
169 &self.storage,
170 "SELECT COUNT(*) FROM episodic_log
171 WHERE distill_state='failed' AND distill_attempts < 3",
172 )?;
173 if failed_remaining > 0 {
174 self.storage.request_evolve_at(
175 &gen_uuid(),
176 "distill_retry",
177 &utc_now_iso(),
178 Some(&minutes_after(&utc_now_iso(), 5)),
179 )?;
180 }
181
182 if result.is_ok() {
184 let remaining = count_query(
185 &self.storage,
186 "SELECT COUNT(*) FROM episodic_log WHERE distill_state='new'",
187 )?;
188 if remaining > 0 {
189 let _ = self
190 .storage
191 .request_evolve(&gen_uuid(), "batch_continue", &utc_now_iso());
192 }
193 }
194 result
195 }
196
197 fn format_curate_report(&self, curate: &CurateReport) -> Value {
198 json!({
199 "archived": curate.archived.len(),
200 "deduped": curate.deduped.len(),
201 "decayed": curate.decayed.len(),
202 "recovered": curate.recovered.len(),
203 "orphans": curate.orphans.len(),
204 "warnings": curate.warnings,
205 })
206 }
207
208 fn distill_batch(&self) -> Result<DistillBatchReport> {
209 let run_id = gen_uuid();
210 let now = utc_now_iso();
211
212 self.storage.begin_immediate()?;
214 let logs = match self
215 .storage
216 .claim_distill_batch(&run_id, self.distill_batch_size, &now)
217 {
218 Ok(l) => {
219 self.storage.commit()?;
220 l
221 }
222 Err(e) => {
223 let _ = self.storage.rollback();
224 return Err(e);
225 }
226 };
227
228 let mut chunks_by_log: HashMap<String, Vec<DistilledChunk>> = HashMap::new();
229 let mut failed_logs = HashSet::new();
230 let mut distill_errors = Vec::new();
231 for log in &logs {
232 let log_id = log.get("id").and_then(Value::as_str).unwrap_or("");
233 let context_key = log.get("context_key").and_then(Value::as_str);
234 let related_logs: Vec<Value> = logs
235 .iter()
236 .filter(|other| {
237 other.get("id").and_then(Value::as_str) == Some(log_id)
238 || (context_key.is_some()
239 && other.get("context_key").and_then(Value::as_str) == context_key)
240 })
241 .cloned()
242 .collect();
243 match self.distiller.distill_with_context(log, &related_logs) {
244 Ok(chunks) => {
245 if chunks.iter().any(|chunk| chunk.source_log_id != log_id) {
246 let error = "distiller returned a chunk for an unknown source log";
247 failed_logs.insert(log_id.to_string());
248 distill_errors.push(format!("{log_id}: {error}"));
249 self.finish_distill_log(
250 log_id,
251 "failed",
252 Some(&format!("distill_failed:{error}")),
253 estimate_distill_prompt_tokens(log, &related_logs),
254 0,
255 )?;
256 continue;
257 }
258 chunks_by_log.insert(log_id.to_string(), chunks);
259 }
260 Err(error) => {
261 let note = format!("distill_failed:{error}");
262 failed_logs.insert(log_id.to_string());
263 distill_errors.push(format!("{log_id}: {error}"));
264 self.finish_distill_log(
265 log_id,
266 "failed",
267 Some(¬e),
268 estimate_distill_prompt_tokens(log, &related_logs),
269 0,
270 )?;
271 }
272 }
273 }
274
275 let mut count = 0;
276 let provenance = self.distiller.provenance();
277 for log in &logs {
278 let log_id = log.get("id").and_then(Value::as_str).unwrap_or("");
279 if failed_logs.contains(log_id) {
280 continue;
281 }
282 let context_key = log.get("context_key").and_then(Value::as_str);
283 let related_logs: Vec<Value> = logs
284 .iter()
285 .filter(|other| {
286 other.get("id").and_then(Value::as_str) == Some(log_id)
287 || (context_key.is_some()
288 && other.get("context_key").and_then(Value::as_str) == context_key)
289 })
290 .cloned()
291 .collect();
292 let prompt_tokens = estimate_distill_prompt_tokens(log, &related_logs);
293 let chunks = chunks_by_log.remove(log_id).unwrap_or_default();
294 let completion_tokens = chunks
295 .iter()
296 .map(estimate_distilled_chunk_tokens)
297 .sum::<i64>();
298 if chunks.is_empty() {
299 self.finish_distill_log(
300 log_id,
301 "discarded",
302 Some("insufficient_material"),
303 prompt_tokens,
304 completion_tokens,
305 )?;
306 continue;
307 }
308
309 struct PreparedChunk {
314 row: ChunkRow,
315 cvec_bytes: Vec<u8>,
316 tvec_bytes: Vec<u8>,
317 }
318 let mut prepared: Vec<PreparedChunk> = Vec::with_capacity(chunks.len());
319 let mut embedding_failures = 0_usize;
320 for dc in chunks {
321 let (content, action) = self.sanitize_content(&dc.content);
322 if action == SanitizeAction::Discard {
323 continue; }
325 let h = content_hash(&content);
326 if self.storage.is_hash_invalidated(&h)? {
327 continue; }
329 let redacted = action == SanitizeAction::Redact;
330 let conf = if redacted { 0.4 } else { 0.55 };
331 let now2 = utc_now_iso();
332 let chunk_id = gen_uuid();
333 let tokens = estimate_tokens(&content) as i64;
334 let skill_name = dc
337 .skill_name
338 .clone()
339 .or_else(|| dc.trigger_desc.clone())
340 .filter(|s| !s.trim().is_empty());
341 let distill_provider = dc
344 .provider_override
345 .clone()
346 .or_else(|| provenance.provider.clone());
347 let row = ChunkRow {
348 id: chunk_id,
349 skill_name,
350 content: content.clone(),
351 trigger_desc: dc.trigger_desc.clone(),
352 anti_trigger_desc: dc.anti_trigger_desc,
353 content_hash: h,
354 token_count: Some(tokens),
355 origin: "distilled".to_string(),
356 distilled_from: Some(dc.source_log_id),
357 distill_provider,
358 distill_model: provenance.model.clone(),
359 distill_prompt_version: provenance.prompt_version.clone(),
360 state: "pending".to_string(),
361 state_reason: Some("init:distilled".to_string()),
362 confidence: conf,
363 confidence_reason: Some("init:distilled".to_string()),
364 version: 1,
365 embed_version: 1,
366 created_at: now2.clone(),
367 updated_at: now2,
368 ..Default::default()
369 };
370 let cvec = match self.embedding.embed_content(&content) {
371 Ok(v) if v.len() == self.embedding.content_dim() => v,
372 _ => {
376 embedding_failures += 1;
377 continue;
378 }
379 };
380 let tvec = match self
381 .embedding
382 .embed_trigger(row.trigger_desc.as_deref().unwrap_or(&content))
383 {
384 Ok(v) if v.len() == self.embedding.trigger_dim() => v,
385 _ => {
386 embedding_failures += 1;
387 continue;
388 }
389 };
390 prepared.push(PreparedChunk {
391 row,
392 cvec_bytes: pack_embedding(&cvec),
393 tvec_bytes: pack_embedding(&tvec),
394 });
395 }
396
397 if prepared.is_empty() {
398 let note = if embedding_failures > 0 {
399 "embedding_failed"
400 } else {
401 "all_chunks_filtered"
402 };
403 self.finish_distill_log(
404 log_id,
405 if embedding_failures > 0 {
406 "failed"
407 } else {
408 "discarded"
409 },
410 Some(note),
411 prompt_tokens,
412 completion_tokens,
413 )?;
414 if embedding_failures > 0 {
415 failed_logs.insert(log_id.to_string());
416 }
417 continue;
418 }
419
420 let accounted_at = utc_now_iso();
422 self.storage.begin_immediate()?;
423 let write_result = (|| -> Result<()> {
424 for pc in &prepared {
425 self.storage.insert_chunk(&pc.row)?;
426 self.storage
427 .insert_vec_content(&pc.row.id, &pc.cvec_bytes)?;
428 self.storage
429 .insert_vec_trigger(&pc.row.id, &pc.tvec_bytes)?;
430 }
431 let note = (embedding_failures > 0)
432 .then(|| format!("partial_embedding_failures:{embedding_failures}"));
433 self.storage.finish_distill_log(
434 log_id,
435 "distilled",
436 note.as_deref(),
437 prompt_tokens,
438 completion_tokens,
439 &accounted_at,
440 )?;
441 self.storage.commit()
442 })();
443 if let Err(error) = write_result {
444 let _ = self.storage.rollback();
445 let note = format!("distill_write_failed:{error}");
446 self.finish_distill_log(
447 log_id,
448 "failed",
449 Some(¬e),
450 prompt_tokens,
451 completion_tokens,
452 )?;
453 failed_logs.insert(log_id.to_string());
454 continue;
455 }
456 count += 1;
457 }
458 if !distill_errors.is_empty() {
459 eprintln!(
463 "[innate] distillation partial failure ({} log(s)): {}",
464 distill_errors.len(),
465 distill_errors.join("; ")
466 );
467 }
468 Ok(DistillBatchReport {
469 distilled: count,
470 failed: failed_logs.len(),
471 })
472 }
473
474 fn finish_distill_log(
475 &self,
476 log_id: &str,
477 state: &str,
478 note: Option<&str>,
479 prompt_tokens: i64,
480 completion_tokens: i64,
481 ) -> Result<()> {
482 let accounted_at = utc_now_iso();
483 self.storage.begin_immediate()?;
484 let result = (|| -> Result<()> {
485 self.storage.finish_distill_log(
486 log_id,
487 state,
488 note,
489 prompt_tokens,
490 completion_tokens,
491 &accounted_at,
492 )?;
493 self.storage.commit()
494 })();
495 if result.is_err() {
496 let _ = self.storage.rollback();
497 }
498 result
499 }
500
501 pub(super) fn distill_token_period_start(&self, now: &str) -> Result<String> {
502 let window_hours = self
503 .storage
504 .get_meta("evolve.distill_token_window_hours")?
505 .and_then(|value| value.parse::<i64>().ok())
506 .unwrap_or(24)
507 .max(1);
508 Ok(hours_ago(now, window_hours))
509 }
510}