Skip to main content

innate_core/kb/
evolve.rs

1use super::*;
2
3impl KnowledgeBase {
4    pub fn evolve(&self, trigger: &str) -> Result<Value> {
5        if !matches!(trigger, "manual" | "scheduled" | "threshold") {
6            return Err(InnateError::InvalidState(format!(
7                "invalid evolve trigger: {trigger}"
8            )));
9        }
10        let evolve_started_at = utc_now_iso();
11        let retry_cutoff = minutes_ago(&evolve_started_at, 5);
12        let recovered_failed = self.storage.conn_execute_count(
13            "UPDATE episodic_log
14             SET distill_state='new', distill_note='retry_failed',
15                 distill_locked_at=NULL, distill_run_id=NULL
16             WHERE distill_state='failed'
17               AND distill_attempts < 3
18               AND COALESCE(distill_last_failed_at, distill_accounted_at, ts) < ?",
19            rusqlite::params![retry_cutoff],
20        )?;
21        if recovered_failed > 0 {
22            self.storage
23                .request_evolve(&gen_uuid(), "distill_retry", &evolve_started_at)?;
24        }
25        if trigger == "scheduled" {
26            let age_cutoff = hours_ago(&evolve_started_at, self.evolve_schedule_interval_hours);
27            let aged_new = count_query_params(
28                &self.storage,
29                "SELECT COUNT(*) FROM episodic_log
30                 WHERE distill_state='new' AND ts <= ?",
31                rusqlite::params![age_cutoff],
32            )?;
33            if aged_new > 0 {
34                self.storage
35                    .request_evolve(&gen_uuid(), "scheduled", &evolve_started_at)?;
36            }
37        }
38        let request = self.storage.claim_evolve_request_with_reason(
39            &evolve_started_at,
40            &minutes_ago(&evolve_started_at, self.screening_timeout_minutes),
41        )?;
42        let request_id = request.as_ref().map(|claim| claim.id.as_str());
43        let request_reason = request.as_ref().map(|claim| claim.reason.as_str());
44        // Issue 7: scheduled without pending request → still run curate (time-based maintenance).
45        if trigger == "scheduled" && request_id.is_none() {
46            let curator = Arc::clone(&self.curator);
47            let curate = curator.run(self, &CurateScope::default())?;
48            return Ok(json!({
49                "distilled": 0,
50                "curate": self.format_curate_report(&curate),
51                "skipped": "no_evolve_request"
52            }));
53        }
54
55        // Issue 8: threshold / token-limit gates should not suppress curate.
56        if trigger == "threshold" {
57            let rows = self.storage.query_chunks(
58                "SELECT COUNT(*) AS cnt FROM episodic_log WHERE distill_state='new'",
59            )?;
60            let cnt = rows
61                .first()
62                .and_then(|r| r.get("cnt"))
63                .and_then(Value::as_i64)
64                .unwrap_or(0);
65            if cnt < self.evolve_threshold {
66                let curator = Arc::clone(&self.curator);
67                let curate = curator.run(self, &CurateScope::default())?;
68                if let Some(id) = request_id {
69                    if matches!(request_reason, Some("governance" | "governance_ready")) {
70                        self.storage.finish_evolve_request(
71                            id,
72                            "completed",
73                            Some("curate_only"),
74                            &utc_now_iso(),
75                        )?;
76                    } else {
77                        self.storage.defer_evolve_request(
78                            id,
79                            "below_threshold",
80                            &hours_after(
81                                &evolve_started_at,
82                                self.evolve_schedule_interval_hours.max(1),
83                            ),
84                        )?;
85                    }
86                }
87                return Ok(json!({
88                    "distilled": 0,
89                    "curate": self.format_curate_report(&curate),
90                    "skipped": "below_threshold"
91                }));
92            }
93        }
94
95        if trigger != "manual" {
96            if let Some(limit) = self
97                .storage
98                .get_meta("max_distill_tokens_per_period")?
99                .and_then(|value| value.parse::<i64>().ok())
100                .filter(|value| *value > 0)
101            {
102                let period_start = self.distill_token_period_start(&evolve_started_at)?;
103                let rows = self.storage.query_chunks_params(
104                    "SELECT COALESCE(SUM(prompt_tokens + completion_tokens),0) AS used
105                     FROM distill_token_usage
106                     WHERE accounted_at >= ?",
107                    rusqlite::params![period_start],
108                )?;
109                let used_tokens = rows
110                    .first()
111                    .and_then(|row| row.get("used"))
112                    .and_then(Value::as_i64)
113                    .unwrap_or(0);
114                if used_tokens >= limit {
115                    let curator = Arc::clone(&self.curator);
116                    let curate = curator.run(self, &CurateScope::default())?;
117                    if let Some(id) = request_id {
118                        if matches!(request_reason, Some("governance" | "governance_ready")) {
119                            self.storage.finish_evolve_request(
120                                id,
121                                "completed",
122                                Some("curate_only"),
123                                &utc_now_iso(),
124                            )?;
125                        } else {
126                            self.storage.defer_evolve_request(
127                                id,
128                                "distill_token_limit",
129                                &hours_after(&evolve_started_at, 1),
130                            )?;
131                        }
132                    }
133                    return Ok(json!({
134                        "distilled": 0,
135                        "curate": self.format_curate_report(&curate),
136                        "skipped": "distill_token_limit",
137                        "distill_tokens_used": used_tokens,
138                        "distill_token_limit": limit,
139                        "period_start": period_start,
140                    }));
141                }
142            }
143        }
144
145        let result = (|| -> Result<Value> {
146            let distill = self.distill_batch()?;
147            let curator = Arc::clone(&self.curator);
148            let curate = curator.run(self, &CurateScope::default())?;
149            Ok(json!({
150                "distilled": distill.distilled,
151                "distill_failed": distill.failed,
152                "curate": self.format_curate_report(&curate),
153            }))
154        })();
155        if let Some(id) = request_id {
156            let (state, note) = match &result {
157                Ok(_) => ("completed", None),
158                Err(error) => ("failed", Some(error.to_string())),
159            };
160            self.storage
161                .finish_evolve_request(id, state, note.as_deref(), &utc_now_iso())?;
162        }
163        if result.is_ok() {
164            self.storage
165                .finish_covered_evolve_requests(&evolve_started_at, &utc_now_iso())?;
166        }
167
168        let failed_remaining = count_query(
169            &self.storage,
170            "SELECT COUNT(*) FROM episodic_log
171             WHERE distill_state='failed' AND distill_attempts < 3",
172        )?;
173        if failed_remaining > 0 {
174            self.storage.request_evolve_at(
175                &gen_uuid(),
176                "distill_retry",
177                &utc_now_iso(),
178                Some(&minutes_after(&utc_now_iso(), 5)),
179            )?;
180        }
181
182        // Issue 9: if more 'new' logs remain after this batch, self-queue so the next evolve drains them.
183        if result.is_ok() {
184            let remaining = count_query(
185                &self.storage,
186                "SELECT COUNT(*) FROM episodic_log WHERE distill_state='new'",
187            )?;
188            if remaining > 0 {
189                let _ = self
190                    .storage
191                    .request_evolve(&gen_uuid(), "batch_continue", &utc_now_iso());
192            }
193        }
194        result
195    }
196
197    fn format_curate_report(&self, curate: &CurateReport) -> Value {
198        json!({
199            "archived": curate.archived.len(),
200            "deduped": curate.deduped.len(),
201            "decayed": curate.decayed.len(),
202            "recovered": curate.recovered.len(),
203            "orphans": curate.orphans.len(),
204            "warnings": curate.warnings,
205        })
206    }
207
208    fn distill_batch(&self) -> Result<DistillBatchReport> {
209        let run_id = gen_uuid();
210        let now = utc_now_iso();
211
212        // Atomically claim a batch of 'new' logs → mark 'screening'.
213        self.storage.begin_immediate()?;
214        let logs = match self
215            .storage
216            .claim_distill_batch(&run_id, self.distill_batch_size, &now)
217        {
218            Ok(l) => {
219                self.storage.commit()?;
220                l
221            }
222            Err(e) => {
223                let _ = self.storage.rollback();
224                return Err(e);
225            }
226        };
227
228        let mut chunks_by_log: HashMap<String, Vec<DistilledChunk>> = HashMap::new();
229        let mut failed_logs = HashSet::new();
230        let mut distill_errors = Vec::new();
231        for log in &logs {
232            let log_id = log.get("id").and_then(Value::as_str).unwrap_or("");
233            let context_key = log.get("context_key").and_then(Value::as_str);
234            let related_logs: Vec<Value> = logs
235                .iter()
236                .filter(|other| {
237                    other.get("id").and_then(Value::as_str) == Some(log_id)
238                        || (context_key.is_some()
239                            && other.get("context_key").and_then(Value::as_str) == context_key)
240                })
241                .cloned()
242                .collect();
243            match self.distiller.distill_with_context(log, &related_logs) {
244                Ok(chunks) => {
245                    if chunks.iter().any(|chunk| chunk.source_log_id != log_id) {
246                        let error = "distiller returned a chunk for an unknown source log";
247                        failed_logs.insert(log_id.to_string());
248                        distill_errors.push(format!("{log_id}: {error}"));
249                        self.finish_distill_log(
250                            log_id,
251                            "failed",
252                            Some(&format!("distill_failed:{error}")),
253                            estimate_distill_prompt_tokens(log, &related_logs),
254                            0,
255                        )?;
256                        continue;
257                    }
258                    chunks_by_log.insert(log_id.to_string(), chunks);
259                }
260                Err(error) => {
261                    let note = format!("distill_failed:{error}");
262                    failed_logs.insert(log_id.to_string());
263                    distill_errors.push(format!("{log_id}: {error}"));
264                    self.finish_distill_log(
265                        log_id,
266                        "failed",
267                        Some(&note),
268                        estimate_distill_prompt_tokens(log, &related_logs),
269                        0,
270                    )?;
271                }
272            }
273        }
274
275        let mut count = 0;
276        let provenance = self.distiller.provenance();
277        for log in &logs {
278            let log_id = log.get("id").and_then(Value::as_str).unwrap_or("");
279            if failed_logs.contains(log_id) {
280                continue;
281            }
282            let context_key = log.get("context_key").and_then(Value::as_str);
283            let related_logs: Vec<Value> = logs
284                .iter()
285                .filter(|other| {
286                    other.get("id").and_then(Value::as_str) == Some(log_id)
287                        || (context_key.is_some()
288                            && other.get("context_key").and_then(Value::as_str) == context_key)
289                })
290                .cloned()
291                .collect();
292            let prompt_tokens = estimate_distill_prompt_tokens(log, &related_logs);
293            let chunks = chunks_by_log.remove(log_id).unwrap_or_default();
294            let completion_tokens = chunks
295                .iter()
296                .map(estimate_distilled_chunk_tokens)
297                .sum::<i64>();
298            if chunks.is_empty() {
299                self.finish_distill_log(
300                    log_id,
301                    "discarded",
302                    Some("insufficient_material"),
303                    prompt_tokens,
304                    completion_tokens,
305                )?;
306                continue;
307            }
308
309            // Prepare all chunks + embeddings outside the write transaction so
310            // slow embedding calls do not hold an exclusive SQLite lock.
311            // Supports N >= 1 chunks per log (e.g. multi-concept LLM distillation).
312            // Bad individual chunks are skipped; valid siblings still survive.
313            struct PreparedChunk {
314                row: ChunkRow,
315                cvec_bytes: Vec<u8>,
316                tvec_bytes: Vec<u8>,
317            }
318            let mut prepared: Vec<PreparedChunk> = Vec::with_capacity(chunks.len());
319            let mut embedding_failures = 0_usize;
320            for dc in chunks {
321                let (content, action) = self.sanitize_content(&dc.content);
322                if action == SanitizeAction::Discard {
323                    continue; // skip this chunk, try others
324                }
325                let h = content_hash(&content);
326                if self.storage.is_hash_invalidated(&h)? {
327                    continue; // skip invalidated content, try others
328                }
329                let redacted = action == SanitizeAction::Redact;
330                let conf = if redacted { 0.4 } else { 0.55 };
331                let now2 = utc_now_iso();
332                let chunk_id = gen_uuid();
333                let tokens = estimate_tokens(&content) as i64;
334                // Short label for the web UI's row-skill slot; fall back to the
335                // canonical trigger phrase when the distiller produced none.
336                let skill_name = dc
337                    .skill_name
338                    .clone()
339                    .or_else(|| dc.trigger_desc.clone())
340                    .filter(|s| !s.trim().is_empty());
341                // Per-chunk fallback provenance wins over the batch-level provider,
342                // so heuristic-fallback chunks are tagged accurately.
343                let distill_provider = dc
344                    .provider_override
345                    .clone()
346                    .or_else(|| provenance.provider.clone());
347                let row = ChunkRow {
348                    id: chunk_id,
349                    skill_name,
350                    content: content.clone(),
351                    trigger_desc: dc.trigger_desc.clone(),
352                    anti_trigger_desc: dc.anti_trigger_desc,
353                    content_hash: h,
354                    token_count: Some(tokens),
355                    origin: "distilled".to_string(),
356                    distilled_from: Some(dc.source_log_id),
357                    distill_provider,
358                    distill_model: provenance.model.clone(),
359                    distill_prompt_version: provenance.prompt_version.clone(),
360                    state: "pending".to_string(),
361                    state_reason: Some("init:distilled".to_string()),
362                    confidence: conf,
363                    confidence_reason: Some("init:distilled".to_string()),
364                    version: 1,
365                    embed_version: 1,
366                    created_at: now2.clone(),
367                    updated_at: now2,
368                    ..Default::default()
369                };
370                let cvec = match self.embedding.embed_content(&content) {
371                    Ok(v) if v.len() == self.embedding.content_dim() => v,
372                    // A failed or wrong-dimension embedding is deferred (not
373                    // written): a dim mismatch would be silently dropped at
374                    // search time, so it must never reach storage.
375                    _ => {
376                        embedding_failures += 1;
377                        continue;
378                    }
379                };
380                let tvec = match self
381                    .embedding
382                    .embed_trigger(row.trigger_desc.as_deref().unwrap_or(&content))
383                {
384                    Ok(v) if v.len() == self.embedding.trigger_dim() => v,
385                    _ => {
386                        embedding_failures += 1;
387                        continue;
388                    }
389                };
390                prepared.push(PreparedChunk {
391                    row,
392                    cvec_bytes: pack_embedding(&cvec),
393                    tvec_bytes: pack_embedding(&tvec),
394                });
395            }
396
397            if prepared.is_empty() {
398                let note = if embedding_failures > 0 {
399                    "embedding_failed"
400                } else {
401                    "all_chunks_filtered"
402                };
403                self.finish_distill_log(
404                    log_id,
405                    if embedding_failures > 0 {
406                        "failed"
407                    } else {
408                        "discarded"
409                    },
410                    Some(note),
411                    prompt_tokens,
412                    completion_tokens,
413                )?;
414                if embedding_failures > 0 {
415                    failed_logs.insert(log_id.to_string());
416                }
417                continue;
418            }
419
420            // Write all chunks, vectors, token accounting, and terminal log state atomically.
421            let accounted_at = utc_now_iso();
422            self.storage.begin_immediate()?;
423            let write_result = (|| -> Result<()> {
424                for pc in &prepared {
425                    self.storage.insert_chunk(&pc.row)?;
426                    self.storage
427                        .insert_vec_content(&pc.row.id, &pc.cvec_bytes)?;
428                    self.storage
429                        .insert_vec_trigger(&pc.row.id, &pc.tvec_bytes)?;
430                }
431                let note = (embedding_failures > 0)
432                    .then(|| format!("partial_embedding_failures:{embedding_failures}"));
433                self.storage.finish_distill_log(
434                    log_id,
435                    "distilled",
436                    note.as_deref(),
437                    prompt_tokens,
438                    completion_tokens,
439                    &accounted_at,
440                )?;
441                self.storage.commit()
442            })();
443            if let Err(error) = write_result {
444                let _ = self.storage.rollback();
445                let note = format!("distill_write_failed:{error}");
446                self.finish_distill_log(
447                    log_id,
448                    "failed",
449                    Some(&note),
450                    prompt_tokens,
451                    completion_tokens,
452                )?;
453                failed_logs.insert(log_id.to_string());
454                continue;
455            }
456            count += 1;
457        }
458        if !distill_errors.is_empty() {
459            // Log failures but do not abort: successful chunks are already committed and
460            // failed logs are marked 'failed' for bounded retry. Returning Ok preserves
461            // evolve request state and allows finish_covered_evolve_requests to run.
462            eprintln!(
463                "[innate] distillation partial failure ({} log(s)): {}",
464                distill_errors.len(),
465                distill_errors.join("; ")
466            );
467        }
468        Ok(DistillBatchReport {
469            distilled: count,
470            failed: failed_logs.len(),
471        })
472    }
473
474    fn finish_distill_log(
475        &self,
476        log_id: &str,
477        state: &str,
478        note: Option<&str>,
479        prompt_tokens: i64,
480        completion_tokens: i64,
481    ) -> Result<()> {
482        let accounted_at = utc_now_iso();
483        self.storage.begin_immediate()?;
484        let result = (|| -> Result<()> {
485            self.storage.finish_distill_log(
486                log_id,
487                state,
488                note,
489                prompt_tokens,
490                completion_tokens,
491                &accounted_at,
492            )?;
493            self.storage.commit()
494        })();
495        if result.is_err() {
496            let _ = self.storage.rollback();
497        }
498        result
499    }
500
501    pub(super) fn distill_token_period_start(&self, now: &str) -> Result<String> {
502        let window_hours = self
503            .storage
504            .get_meta("evolve.distill_token_window_hours")?
505            .and_then(|value| value.parse::<i64>().ok())
506            .unwrap_or(24)
507            .max(1);
508        Ok(hours_ago(now, window_hours))
509    }
510}