Skip to main content

oris_evokernel/
core.rs

1//! EvoKernel orchestration: mutation capture, validation, capsule construction, and replay-first reuse.
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::fs;
5use std::path::Path;
6use std::process::Command;
7use std::sync::{Arc, Mutex};
8
9use async_trait::async_trait;
10use chrono::{DateTime, Duration, Utc};
11use oris_agent_contract::{
12    AgentRole, CoordinationMessage, CoordinationPlan, CoordinationPrimitive, CoordinationResult,
13    CoordinationTask, ExecutionFeedback, MutationProposal as AgentMutationProposal,
14};
15use oris_economics::{EconomicsSignal, EvuLedger, StakePolicy};
16use oris_evolution::{
17    compute_artifact_hash, next_id, stable_hash_json, AssetState, BlastRadius, CandidateSource,
18    Capsule, CapsuleId, EnvFingerprint, EvolutionError, EvolutionEvent, EvolutionProjection,
19    EvolutionStore, Gene, GeneCandidate, MutationId, PreparedMutation, Selector, SelectorInput,
20    StoreBackedSelector, StoredEvolutionEvent, ValidationSnapshot,
21};
22use oris_evolution_network::{EvolutionEnvelope, NetworkAsset};
23use oris_governor::{DefaultGovernor, Governor, GovernorDecision, GovernorInput};
24use oris_kernel::{Kernel, KernelState, RunId};
25use oris_sandbox::{
26    compute_blast_radius, execute_allowed_command, Sandbox, SandboxPolicy, SandboxReceipt,
27};
28use oris_spec::CompiledMutationPlan;
29use serde::{Deserialize, Serialize};
30use thiserror::Error;
31
32pub use oris_evolution::{
33    default_store_root, ArtifactEncoding, AssetState as EvoAssetState,
34    BlastRadius as EvoBlastRadius, CandidateSource as EvoCandidateSource,
35    EnvFingerprint as EvoEnvFingerprint, EvolutionStore as EvoEvolutionStore, JsonlEvolutionStore,
36    MutationArtifact, MutationIntent, MutationTarget, Outcome, RiskLevel,
37    SelectorInput as EvoSelectorInput,
38};
39pub use oris_evolution_network::{
40    FetchQuery, FetchResponse, MessageType, PublishRequest, RevokeNotice,
41};
42pub use oris_governor::{CoolingWindow, GovernorConfig, RevocationReason};
43pub use oris_sandbox::{LocalProcessSandbox, SandboxPolicy as EvoSandboxPolicy};
44pub use oris_spec::{SpecCompileError, SpecCompiler, SpecDocument};
45
46#[derive(Clone, Debug, Serialize, Deserialize)]
47pub struct ValidationPlan {
48    pub profile: String,
49    pub stages: Vec<ValidationStage>,
50}
51
52impl ValidationPlan {
53    pub fn oris_default() -> Self {
54        Self {
55            profile: "oris-default".into(),
56            stages: vec![
57                ValidationStage::Command {
58                    program: "cargo".into(),
59                    args: vec!["fmt".into(), "--all".into(), "--check".into()],
60                    timeout_ms: 60_000,
61                },
62                ValidationStage::Command {
63                    program: "cargo".into(),
64                    args: vec!["check".into(), "--workspace".into()],
65                    timeout_ms: 180_000,
66                },
67                ValidationStage::Command {
68                    program: "cargo".into(),
69                    args: vec![
70                        "test".into(),
71                        "-p".into(),
72                        "oris-kernel".into(),
73                        "-p".into(),
74                        "oris-evolution".into(),
75                        "-p".into(),
76                        "oris-sandbox".into(),
77                        "-p".into(),
78                        "oris-evokernel".into(),
79                        "--lib".into(),
80                    ],
81                    timeout_ms: 300_000,
82                },
83                ValidationStage::Command {
84                    program: "cargo".into(),
85                    args: vec![
86                        "test".into(),
87                        "-p".into(),
88                        "oris-runtime".into(),
89                        "--lib".into(),
90                    ],
91                    timeout_ms: 300_000,
92                },
93            ],
94        }
95    }
96}
97
98#[derive(Clone, Debug, Serialize, Deserialize)]
99pub enum ValidationStage {
100    Command {
101        program: String,
102        args: Vec<String>,
103        timeout_ms: u64,
104    },
105}
106
107#[derive(Clone, Debug, Serialize, Deserialize)]
108pub struct ValidationStageReport {
109    pub stage: String,
110    pub success: bool,
111    pub exit_code: Option<i32>,
112    pub duration_ms: u64,
113    pub stdout: String,
114    pub stderr: String,
115}
116
117#[derive(Clone, Debug, Serialize, Deserialize)]
118pub struct ValidationReport {
119    pub success: bool,
120    pub duration_ms: u64,
121    pub stages: Vec<ValidationStageReport>,
122    pub logs: String,
123}
124
125#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
126pub struct SignalExtractionInput {
127    pub patch_diff: String,
128    pub intent: String,
129    pub expected_effect: String,
130    pub declared_signals: Vec<String>,
131    pub changed_files: Vec<String>,
132    pub validation_success: bool,
133    pub validation_logs: String,
134    pub stage_outputs: Vec<String>,
135}
136
137#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
138pub struct SignalExtractionOutput {
139    pub values: Vec<String>,
140    pub hash: String,
141}
142
143impl ValidationReport {
144    pub fn to_snapshot(&self, profile: &str) -> ValidationSnapshot {
145        ValidationSnapshot {
146            success: self.success,
147            profile: profile.to_string(),
148            duration_ms: self.duration_ms,
149            summary: if self.success {
150                "validation passed".into()
151            } else {
152                "validation failed".into()
153            },
154        }
155    }
156}
157
158pub fn extract_deterministic_signals(input: &SignalExtractionInput) -> SignalExtractionOutput {
159    let mut signals = BTreeSet::new();
160
161    for declared in &input.declared_signals {
162        if let Some(phrase) = normalize_signal_phrase(declared) {
163            signals.insert(phrase);
164        }
165        extend_signal_tokens(&mut signals, declared);
166    }
167
168    for text in [
169        input.patch_diff.as_str(),
170        input.intent.as_str(),
171        input.expected_effect.as_str(),
172        input.validation_logs.as_str(),
173    ] {
174        extend_signal_tokens(&mut signals, text);
175    }
176
177    for changed_file in &input.changed_files {
178        extend_signal_tokens(&mut signals, changed_file);
179    }
180
181    for stage_output in &input.stage_outputs {
182        extend_signal_tokens(&mut signals, stage_output);
183    }
184
185    signals.insert(if input.validation_success {
186        "validation passed".into()
187    } else {
188        "validation failed".into()
189    });
190
191    let values = signals.into_iter().take(32).collect::<Vec<_>>();
192    let hash =
193        stable_hash_json(&values).unwrap_or_else(|_| compute_artifact_hash(&values.join("\n")));
194    SignalExtractionOutput { values, hash }
195}
196
197#[derive(Debug, Error)]
198pub enum ValidationError {
199    #[error("validation execution failed: {0}")]
200    Execution(String),
201}
202
203#[async_trait]
204pub trait Validator: Send + Sync {
205    async fn run(
206        &self,
207        receipt: &SandboxReceipt,
208        plan: &ValidationPlan,
209    ) -> Result<ValidationReport, ValidationError>;
210}
211
212pub struct CommandValidator {
213    policy: SandboxPolicy,
214}
215
216impl CommandValidator {
217    pub fn new(policy: SandboxPolicy) -> Self {
218        Self { policy }
219    }
220}
221
222#[async_trait]
223impl Validator for CommandValidator {
224    async fn run(
225        &self,
226        receipt: &SandboxReceipt,
227        plan: &ValidationPlan,
228    ) -> Result<ValidationReport, ValidationError> {
229        let started = std::time::Instant::now();
230        let mut stages = Vec::new();
231        let mut success = true;
232        let mut logs = String::new();
233
234        for stage in &plan.stages {
235            match stage {
236                ValidationStage::Command {
237                    program,
238                    args,
239                    timeout_ms,
240                } => {
241                    let result = execute_allowed_command(
242                        &self.policy,
243                        &receipt.workdir,
244                        program,
245                        args,
246                        *timeout_ms,
247                    )
248                    .await;
249                    let report = match result {
250                        Ok(output) => ValidationStageReport {
251                            stage: format!("{program} {}", args.join(" ")),
252                            success: output.success,
253                            exit_code: output.exit_code,
254                            duration_ms: output.duration_ms,
255                            stdout: output.stdout,
256                            stderr: output.stderr,
257                        },
258                        Err(err) => ValidationStageReport {
259                            stage: format!("{program} {}", args.join(" ")),
260                            success: false,
261                            exit_code: None,
262                            duration_ms: 0,
263                            stdout: String::new(),
264                            stderr: err.to_string(),
265                        },
266                    };
267                    if !report.success {
268                        success = false;
269                    }
270                    if !report.stdout.is_empty() {
271                        logs.push_str(&report.stdout);
272                        logs.push('\n');
273                    }
274                    if !report.stderr.is_empty() {
275                        logs.push_str(&report.stderr);
276                        logs.push('\n');
277                    }
278                    stages.push(report);
279                    if !success {
280                        break;
281                    }
282                }
283            }
284        }
285
286        Ok(ValidationReport {
287            success,
288            duration_ms: started.elapsed().as_millis() as u64,
289            stages,
290            logs,
291        })
292    }
293}
294
295#[derive(Clone, Debug)]
296pub struct ReplayDecision {
297    pub used_capsule: bool,
298    pub capsule_id: Option<CapsuleId>,
299    pub fallback_to_planner: bool,
300    pub reason: String,
301}
302
303#[derive(Clone, Copy, Debug, Eq, PartialEq)]
304enum CoordinationTaskState {
305    Ready,
306    Waiting,
307    BlockedByFailure,
308    PermanentlyBlocked,
309}
310
311#[derive(Clone, Debug, Default)]
312pub struct MultiAgentCoordinator;
313
314impl MultiAgentCoordinator {
315    pub fn new() -> Self {
316        Self
317    }
318
319    pub fn coordinate(&self, plan: CoordinationPlan) -> CoordinationResult {
320        let primitive = plan.primitive.clone();
321        let root_goal = plan.root_goal.clone();
322        let timeout_ms = plan.timeout_ms;
323        let max_retries = plan.max_retries;
324        let mut tasks = BTreeMap::new();
325        for task in plan.tasks {
326            tasks.entry(task.id.clone()).or_insert(task);
327        }
328
329        let mut pending = tasks.keys().cloned().collect::<BTreeSet<_>>();
330        let mut completed = BTreeSet::new();
331        let mut failed = BTreeSet::new();
332        let mut completed_order = Vec::new();
333        let mut failed_order = Vec::new();
334        let mut skipped = BTreeSet::new();
335        let mut attempts = BTreeMap::new();
336        let mut messages = Vec::new();
337
338        loop {
339            if matches!(primitive, CoordinationPrimitive::Conditional) {
340                self.apply_conditional_skips(
341                    &tasks,
342                    &mut pending,
343                    &completed,
344                    &failed,
345                    &mut skipped,
346                    &mut messages,
347                );
348            }
349
350            let mut ready = self.ready_task_ids(&tasks, &pending, &completed, &failed, &skipped);
351            if ready.is_empty() {
352                break;
353            }
354            if matches!(primitive, CoordinationPrimitive::Sequential) {
355                ready.truncate(1);
356            }
357
358            for task_id in ready {
359                let Some(task) = tasks.get(&task_id) else {
360                    continue;
361                };
362                if !pending.contains(&task_id) {
363                    continue;
364                }
365                self.record_handoff_messages(task, &tasks, &completed, &failed, &mut messages);
366
367                let prior_failures = attempts.get(&task_id).copied().unwrap_or(0);
368                if Self::simulate_task_failure(task, prior_failures) {
369                    let failure_count = prior_failures + 1;
370                    attempts.insert(task_id.clone(), failure_count);
371                    let will_retry = failure_count <= max_retries;
372                    messages.push(CoordinationMessage {
373                        from_role: task.role.clone(),
374                        to_role: task.role.clone(),
375                        task_id: task_id.clone(),
376                        content: if will_retry {
377                            format!("task {task_id} failed on attempt {failure_count} and will retry")
378                        } else {
379                            format!(
380                                "task {task_id} failed on attempt {failure_count} and exhausted retries"
381                            )
382                        },
383                    });
384                    if !will_retry {
385                        pending.remove(&task_id);
386                        if failed.insert(task_id.clone()) {
387                            failed_order.push(task_id);
388                        }
389                    }
390                    continue;
391                }
392
393                pending.remove(&task_id);
394                if completed.insert(task_id.clone()) {
395                    completed_order.push(task_id);
396                }
397            }
398        }
399
400        let blocked_ids = pending.into_iter().collect::<Vec<_>>();
401        for task_id in blocked_ids {
402            let Some(task) = tasks.get(&task_id) else {
403                continue;
404            };
405            let state = self.classify_task(task, &tasks, &completed, &failed, &skipped);
406            let content = match state {
407                CoordinationTaskState::BlockedByFailure => {
408                    format!("task {task_id} blocked by failed dependencies")
409                }
410                CoordinationTaskState::PermanentlyBlocked => {
411                    format!("task {task_id} has invalid coordination prerequisites")
412                }
413                CoordinationTaskState::Waiting => {
414                    format!("task {task_id} has unresolved dependencies")
415                }
416                CoordinationTaskState::Ready => {
417                    format!("task {task_id} was left pending unexpectedly")
418                }
419            };
420            messages.push(CoordinationMessage {
421                from_role: task.role.clone(),
422                to_role: task.role.clone(),
423                task_id: task_id.clone(),
424                content,
425            });
426            if failed.insert(task_id.clone()) {
427                failed_order.push(task_id);
428            }
429        }
430
431        CoordinationResult {
432            completed_tasks: completed_order,
433            failed_tasks: failed_order,
434            messages,
435            summary: format!(
436                "goal '{}' completed {} tasks, failed {}, skipped {} using {:?} coordination (timeout={}ms, max_retries={})",
437                root_goal,
438                completed.len(),
439                failed.len(),
440                skipped.len(),
441                primitive,
442                timeout_ms,
443                max_retries
444            ),
445        }
446    }
447
448    fn ready_task_ids(
449        &self,
450        tasks: &BTreeMap<String, CoordinationTask>,
451        pending: &BTreeSet<String>,
452        completed: &BTreeSet<String>,
453        failed: &BTreeSet<String>,
454        skipped: &BTreeSet<String>,
455    ) -> Vec<String> {
456        pending
457            .iter()
458            .filter_map(|task_id| {
459                let task = tasks.get(task_id)?;
460                (self.classify_task(task, tasks, completed, failed, skipped)
461                    == CoordinationTaskState::Ready)
462                    .then(|| task_id.clone())
463            })
464            .collect()
465    }
466
467    fn apply_conditional_skips(
468        &self,
469        tasks: &BTreeMap<String, CoordinationTask>,
470        pending: &mut BTreeSet<String>,
471        completed: &BTreeSet<String>,
472        failed: &BTreeSet<String>,
473        skipped: &mut BTreeSet<String>,
474        messages: &mut Vec<CoordinationMessage>,
475    ) {
476        let skip_ids = pending
477            .iter()
478            .filter_map(|task_id| {
479                let task = tasks.get(task_id)?;
480                (self.classify_task(task, tasks, completed, failed, skipped)
481                    == CoordinationTaskState::BlockedByFailure)
482                    .then(|| task_id.clone())
483            })
484            .collect::<Vec<_>>();
485
486        for task_id in skip_ids {
487            let Some(task) = tasks.get(&task_id) else {
488                continue;
489            };
490            pending.remove(&task_id);
491            skipped.insert(task_id.clone());
492            messages.push(CoordinationMessage {
493                from_role: task.role.clone(),
494                to_role: task.role.clone(),
495                task_id: task_id.clone(),
496                content: format!("task {task_id} skipped due to failed dependency chain"),
497            });
498        }
499    }
500
501    fn classify_task(
502        &self,
503        task: &CoordinationTask,
504        tasks: &BTreeMap<String, CoordinationTask>,
505        completed: &BTreeSet<String>,
506        failed: &BTreeSet<String>,
507        skipped: &BTreeSet<String>,
508    ) -> CoordinationTaskState {
509        match task.role {
510            AgentRole::Planner | AgentRole::Coder => {
511                let mut waiting = false;
512                for dependency_id in &task.depends_on {
513                    if !tasks.contains_key(dependency_id) {
514                        return CoordinationTaskState::PermanentlyBlocked;
515                    }
516                    if skipped.contains(dependency_id) || failed.contains(dependency_id) {
517                        return CoordinationTaskState::BlockedByFailure;
518                    }
519                    if !completed.contains(dependency_id) {
520                        waiting = true;
521                    }
522                }
523                if waiting {
524                    CoordinationTaskState::Waiting
525                } else {
526                    CoordinationTaskState::Ready
527                }
528            }
529            AgentRole::Repair => {
530                let mut waiting = false;
531                let mut has_coder_dependency = false;
532                let mut has_failed_coder = false;
533                for dependency_id in &task.depends_on {
534                    let Some(dependency) = tasks.get(dependency_id) else {
535                        return CoordinationTaskState::PermanentlyBlocked;
536                    };
537                    let is_coder = matches!(dependency.role, AgentRole::Coder);
538                    if is_coder {
539                        has_coder_dependency = true;
540                    }
541                    if skipped.contains(dependency_id) {
542                        return CoordinationTaskState::BlockedByFailure;
543                    }
544                    if failed.contains(dependency_id) {
545                        if is_coder {
546                            has_failed_coder = true;
547                        } else {
548                            return CoordinationTaskState::BlockedByFailure;
549                        }
550                        continue;
551                    }
552                    if !completed.contains(dependency_id) {
553                        waiting = true;
554                    }
555                }
556                if !has_coder_dependency {
557                    CoordinationTaskState::PermanentlyBlocked
558                } else if waiting {
559                    CoordinationTaskState::Waiting
560                } else if has_failed_coder {
561                    CoordinationTaskState::Ready
562                } else {
563                    CoordinationTaskState::PermanentlyBlocked
564                }
565            }
566            AgentRole::Optimizer => {
567                let mut waiting = false;
568                let mut has_impl_dependency = false;
569                let mut has_completed_impl = false;
570                let mut has_failed_impl = false;
571                for dependency_id in &task.depends_on {
572                    let Some(dependency) = tasks.get(dependency_id) else {
573                        return CoordinationTaskState::PermanentlyBlocked;
574                    };
575                    let is_impl = matches!(dependency.role, AgentRole::Coder | AgentRole::Repair);
576                    if is_impl {
577                        has_impl_dependency = true;
578                    }
579                    if skipped.contains(dependency_id) || failed.contains(dependency_id) {
580                        if is_impl {
581                            has_failed_impl = true;
582                            continue;
583                        }
584                        return CoordinationTaskState::BlockedByFailure;
585                    }
586                    if completed.contains(dependency_id) {
587                        if is_impl {
588                            has_completed_impl = true;
589                        }
590                        continue;
591                    }
592                    waiting = true;
593                }
594                if !has_impl_dependency {
595                    CoordinationTaskState::PermanentlyBlocked
596                } else if waiting {
597                    CoordinationTaskState::Waiting
598                } else if has_completed_impl {
599                    CoordinationTaskState::Ready
600                } else if has_failed_impl {
601                    CoordinationTaskState::BlockedByFailure
602                } else {
603                    CoordinationTaskState::PermanentlyBlocked
604                }
605            }
606        }
607    }
608
609    fn record_handoff_messages(
610        &self,
611        task: &CoordinationTask,
612        tasks: &BTreeMap<String, CoordinationTask>,
613        completed: &BTreeSet<String>,
614        failed: &BTreeSet<String>,
615        messages: &mut Vec<CoordinationMessage>,
616    ) {
617        let mut dependency_ids = task.depends_on.clone();
618        dependency_ids.sort();
619        dependency_ids.dedup();
620
621        for dependency_id in dependency_ids {
622            let Some(dependency) = tasks.get(&dependency_id) else {
623                continue;
624            };
625            if completed.contains(&dependency_id) {
626                messages.push(CoordinationMessage {
627                    from_role: dependency.role.clone(),
628                    to_role: task.role.clone(),
629                    task_id: task.id.clone(),
630                    content: format!("handoff from {dependency_id} to {}", task.id),
631                });
632            } else if failed.contains(&dependency_id) {
633                messages.push(CoordinationMessage {
634                    from_role: dependency.role.clone(),
635                    to_role: task.role.clone(),
636                    task_id: task.id.clone(),
637                    content: format!("failed dependency {dependency_id} routed to {}", task.id),
638                });
639            }
640        }
641    }
642
643    fn simulate_task_failure(task: &CoordinationTask, prior_failures: u32) -> bool {
644        let normalized = task.description.to_ascii_lowercase();
645        normalized.contains("force-fail")
646            || (normalized.contains("fail-once") && prior_failures == 0)
647    }
648}
649
650#[derive(Debug, Error)]
651pub enum ReplayError {
652    #[error("store error: {0}")]
653    Store(String),
654    #[error("sandbox error: {0}")]
655    Sandbox(String),
656    #[error("validation error: {0}")]
657    Validation(String),
658}
659
660#[async_trait]
661pub trait ReplayExecutor: Send + Sync {
662    async fn try_replay(
663        &self,
664        input: &SelectorInput,
665        policy: &SandboxPolicy,
666        validation: &ValidationPlan,
667    ) -> Result<ReplayDecision, ReplayError>;
668}
669
670pub struct StoreReplayExecutor {
671    pub sandbox: Arc<dyn Sandbox>,
672    pub validator: Arc<dyn Validator>,
673    pub store: Arc<dyn EvolutionStore>,
674    pub selector: Arc<dyn Selector>,
675    pub governor: Arc<dyn Governor>,
676    pub economics: Option<Arc<Mutex<EvuLedger>>>,
677    pub remote_publishers: Option<Arc<Mutex<BTreeMap<String, String>>>>,
678    pub stake_policy: StakePolicy,
679}
680
681#[async_trait]
682impl ReplayExecutor for StoreReplayExecutor {
683    async fn try_replay(
684        &self,
685        input: &SelectorInput,
686        policy: &SandboxPolicy,
687        validation: &ValidationPlan,
688    ) -> Result<ReplayDecision, ReplayError> {
689        let mut selector_input = input.clone();
690        if self.economics.is_some() && self.remote_publishers.is_some() {
691            selector_input.limit = selector_input.limit.max(4);
692        }
693        let mut candidates = self.selector.select(&selector_input);
694        self.rerank_with_reputation_bias(&mut candidates);
695        let mut exact_match = false;
696        if candidates.is_empty() {
697            let mut exact_candidates = exact_match_candidates(self.store.as_ref(), input);
698            self.rerank_with_reputation_bias(&mut exact_candidates);
699            if !exact_candidates.is_empty() {
700                candidates = exact_candidates;
701                exact_match = true;
702            }
703        }
704        if candidates.is_empty() {
705            let mut remote_candidates =
706                quarantined_remote_exact_match_candidates(self.store.as_ref(), input);
707            self.rerank_with_reputation_bias(&mut remote_candidates);
708            if !remote_candidates.is_empty() {
709                candidates = remote_candidates;
710                exact_match = true;
711            }
712        }
713        candidates.truncate(input.limit.max(1));
714        let Some(best) = candidates.into_iter().next() else {
715            return Ok(ReplayDecision {
716                used_capsule: false,
717                capsule_id: None,
718                fallback_to_planner: true,
719                reason: "no matching gene".into(),
720            });
721        };
722        let remote_publisher = self.publisher_for_gene(&best.gene.id);
723
724        if !exact_match && best.score < 0.82 {
725            return Ok(ReplayDecision {
726                used_capsule: false,
727                capsule_id: None,
728                fallback_to_planner: true,
729                reason: format!("best gene score {:.3} below replay threshold", best.score),
730            });
731        }
732
733        let Some(capsule) = best.capsules.first().cloned() else {
734            return Ok(ReplayDecision {
735                used_capsule: false,
736                capsule_id: None,
737                fallback_to_planner: true,
738                reason: "candidate gene has no capsule".into(),
739            });
740        };
741
742        let Some(mutation) = find_declared_mutation(self.store.as_ref(), &capsule.mutation_id)
743            .map_err(|err| ReplayError::Store(err.to_string()))?
744        else {
745            return Ok(ReplayDecision {
746                used_capsule: false,
747                capsule_id: None,
748                fallback_to_planner: true,
749                reason: "mutation payload missing from store".into(),
750            });
751        };
752
753        let receipt = match self.sandbox.apply(&mutation, policy).await {
754            Ok(receipt) => receipt,
755            Err(err) => {
756                self.record_reuse_settlement(remote_publisher.as_deref(), false);
757                return Ok(ReplayDecision {
758                    used_capsule: false,
759                    capsule_id: Some(capsule.id.clone()),
760                    fallback_to_planner: true,
761                    reason: format!("replay patch apply failed: {err}"),
762                });
763            }
764        };
765
766        let report = self
767            .validator
768            .run(&receipt, validation)
769            .await
770            .map_err(|err| ReplayError::Validation(err.to_string()))?;
771        if !report.success {
772            self.record_replay_validation_failure(&best, &capsule, validation, &report)?;
773            self.record_reuse_settlement(remote_publisher.as_deref(), false);
774            return Ok(ReplayDecision {
775                used_capsule: false,
776                capsule_id: Some(capsule.id.clone()),
777                fallback_to_planner: true,
778                reason: "replay validation failed".into(),
779            });
780        }
781
782        if matches!(capsule.state, AssetState::Quarantined) {
783            self.store
784                .append_event(EvolutionEvent::ValidationPassed {
785                    mutation_id: capsule.mutation_id.clone(),
786                    report: report.to_snapshot(&validation.profile),
787                    gene_id: Some(best.gene.id.clone()),
788                })
789                .map_err(|err| ReplayError::Store(err.to_string()))?;
790            self.store
791                .append_event(EvolutionEvent::CapsuleReleased {
792                    capsule_id: capsule.id.clone(),
793                    state: AssetState::Promoted,
794                })
795                .map_err(|err| ReplayError::Store(err.to_string()))?;
796        }
797
798        self.store
799            .append_event(EvolutionEvent::CapsuleReused {
800                capsule_id: capsule.id.clone(),
801                gene_id: capsule.gene_id.clone(),
802                run_id: capsule.run_id.clone(),
803            })
804            .map_err(|err| ReplayError::Store(err.to_string()))?;
805        self.record_reuse_settlement(remote_publisher.as_deref(), true);
806
807        Ok(ReplayDecision {
808            used_capsule: true,
809            capsule_id: Some(capsule.id),
810            fallback_to_planner: false,
811            reason: if exact_match {
812                "replayed via exact-match cold-start lookup".into()
813            } else {
814                "replayed via selector".into()
815            },
816        })
817    }
818}
819
820impl StoreReplayExecutor {
821    fn rerank_with_reputation_bias(&self, candidates: &mut [GeneCandidate]) {
822        let Some(ledger) = self.economics.as_ref() else {
823            return;
824        };
825        let Some(remote_publishers) = self.remote_publishers.as_ref() else {
826            return;
827        };
828        let reputation_bias = ledger
829            .lock()
830            .ok()
831            .map(|locked| locked.selector_reputation_bias())
832            .unwrap_or_default();
833        if reputation_bias.is_empty() {
834            return;
835        }
836        let publisher_map = remote_publishers
837            .lock()
838            .ok()
839            .map(|locked| locked.clone())
840            .unwrap_or_default();
841        candidates.sort_by(|left, right| {
842            effective_candidate_score(right, &publisher_map, &reputation_bias)
843                .partial_cmp(&effective_candidate_score(
844                    left,
845                    &publisher_map,
846                    &reputation_bias,
847                ))
848                .unwrap_or(std::cmp::Ordering::Equal)
849                .then_with(|| left.gene.id.cmp(&right.gene.id))
850        });
851    }
852
853    fn publisher_for_gene(&self, gene_id: &str) -> Option<String> {
854        self.remote_publishers
855            .as_ref()?
856            .lock()
857            .ok()?
858            .get(gene_id)
859            .cloned()
860    }
861
862    fn record_reuse_settlement(&self, publisher_id: Option<&str>, success: bool) {
863        let Some(publisher_id) = publisher_id else {
864            return;
865        };
866        let Some(ledger) = self.economics.as_ref() else {
867            return;
868        };
869        if let Ok(mut locked) = ledger.lock() {
870            locked.settle_remote_reuse(publisher_id, success, &self.stake_policy);
871        }
872    }
873
874    fn record_replay_validation_failure(
875        &self,
876        best: &GeneCandidate,
877        capsule: &Capsule,
878        validation: &ValidationPlan,
879        report: &ValidationReport,
880    ) -> Result<(), ReplayError> {
881        let projection = self
882            .store
883            .rebuild_projection()
884            .map_err(|err| ReplayError::Store(err.to_string()))?;
885        let (current_confidence, historical_peak_confidence, confidence_last_updated_secs) =
886            Self::confidence_context(&projection, &best.gene.id);
887
888        self.store
889            .append_event(EvolutionEvent::ValidationFailed {
890                mutation_id: capsule.mutation_id.clone(),
891                report: report.to_snapshot(&validation.profile),
892                gene_id: Some(best.gene.id.clone()),
893            })
894            .map_err(|err| ReplayError::Store(err.to_string()))?;
895
896        let replay_failures = self.replay_failure_count(&best.gene.id)?;
897        let governor_decision = self.governor.evaluate(GovernorInput {
898            candidate_source: if self.publisher_for_gene(&best.gene.id).is_some() {
899                CandidateSource::Remote
900            } else {
901                CandidateSource::Local
902            },
903            success_count: 0,
904            blast_radius: BlastRadius {
905                files_changed: capsule.outcome.changed_files.len(),
906                lines_changed: capsule.outcome.lines_changed,
907            },
908            replay_failures,
909            recent_mutation_ages_secs: Vec::new(),
910            current_confidence,
911            historical_peak_confidence,
912            confidence_last_updated_secs,
913        });
914
915        if matches!(governor_decision.target_state, AssetState::Revoked) {
916            self.store
917                .append_event(EvolutionEvent::PromotionEvaluated {
918                    gene_id: best.gene.id.clone(),
919                    state: AssetState::Revoked,
920                    reason: governor_decision.reason.clone(),
921                })
922                .map_err(|err| ReplayError::Store(err.to_string()))?;
923            self.store
924                .append_event(EvolutionEvent::GeneRevoked {
925                    gene_id: best.gene.id.clone(),
926                    reason: governor_decision.reason,
927                })
928                .map_err(|err| ReplayError::Store(err.to_string()))?;
929            for related in &best.capsules {
930                self.store
931                    .append_event(EvolutionEvent::CapsuleQuarantined {
932                        capsule_id: related.id.clone(),
933                    })
934                    .map_err(|err| ReplayError::Store(err.to_string()))?;
935            }
936        }
937
938        Ok(())
939    }
940
941    fn confidence_context(
942        projection: &EvolutionProjection,
943        gene_id: &str,
944    ) -> (f32, f32, Option<u64>) {
945        let peak_confidence = projection
946            .capsules
947            .iter()
948            .filter(|capsule| capsule.gene_id == gene_id)
949            .map(|capsule| capsule.confidence)
950            .fold(0.0_f32, f32::max);
951        let age_secs = projection
952            .last_updated_at
953            .get(gene_id)
954            .and_then(|timestamp| Self::seconds_since_timestamp(timestamp, Utc::now()));
955        (peak_confidence, peak_confidence, age_secs)
956    }
957
958    fn seconds_since_timestamp(timestamp: &str, now: DateTime<Utc>) -> Option<u64> {
959        let parsed = DateTime::parse_from_rfc3339(timestamp)
960            .ok()?
961            .with_timezone(&Utc);
962        let elapsed = now.signed_duration_since(parsed);
963        if elapsed < Duration::zero() {
964            Some(0)
965        } else {
966            u64::try_from(elapsed.num_seconds()).ok()
967        }
968    }
969
970    fn replay_failure_count(&self, gene_id: &str) -> Result<u64, ReplayError> {
971        Ok(self
972            .store
973            .scan(1)
974            .map_err(|err| ReplayError::Store(err.to_string()))?
975            .into_iter()
976            .filter(|stored| {
977                matches!(
978                    &stored.event,
979                    EvolutionEvent::ValidationFailed {
980                        gene_id: Some(current_gene_id),
981                        ..
982                    } if current_gene_id == gene_id
983                )
984            })
985            .count() as u64)
986    }
987}
988
989#[derive(Debug, Error)]
990pub enum EvoKernelError {
991    #[error("sandbox error: {0}")]
992    Sandbox(String),
993    #[error("validation error: {0}")]
994    Validation(String),
995    #[error("validation failed")]
996    ValidationFailed(ValidationReport),
997    #[error("store error: {0}")]
998    Store(String),
999}
1000
1001#[derive(Clone, Debug)]
1002pub struct CaptureOutcome {
1003    pub capsule: Capsule,
1004    pub gene: Gene,
1005    pub governor_decision: GovernorDecision,
1006}
1007
1008#[derive(Clone, Debug, Serialize, Deserialize)]
1009pub struct ImportOutcome {
1010    pub imported_asset_ids: Vec<String>,
1011    pub accepted: bool,
1012}
1013
1014#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
1015pub struct EvolutionMetricsSnapshot {
1016    pub replay_attempts_total: u64,
1017    pub replay_success_total: u64,
1018    pub replay_success_rate: f64,
1019    pub mutation_declared_total: u64,
1020    pub promoted_mutations_total: u64,
1021    pub promotion_ratio: f64,
1022    pub gene_revocations_total: u64,
1023    pub mutation_velocity_last_hour: u64,
1024    pub revoke_frequency_last_hour: u64,
1025    pub promoted_genes: u64,
1026    pub promoted_capsules: u64,
1027    pub last_event_seq: u64,
1028}
1029
1030#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
1031pub struct EvolutionHealthSnapshot {
1032    pub status: String,
1033    pub last_event_seq: u64,
1034    pub promoted_genes: u64,
1035    pub promoted_capsules: u64,
1036}
1037
1038#[derive(Clone)]
1039pub struct EvolutionNetworkNode {
1040    pub store: Arc<dyn EvolutionStore>,
1041}
1042
1043impl EvolutionNetworkNode {
1044    pub fn new(store: Arc<dyn EvolutionStore>) -> Self {
1045        Self { store }
1046    }
1047
1048    pub fn with_default_store() -> Self {
1049        Self {
1050            store: Arc::new(JsonlEvolutionStore::new(default_store_root())),
1051        }
1052    }
1053
1054    pub fn accept_publish_request(
1055        &self,
1056        request: &PublishRequest,
1057    ) -> Result<ImportOutcome, EvoKernelError> {
1058        import_remote_envelope_into_store(
1059            self.store.as_ref(),
1060            &EvolutionEnvelope::publish(request.sender_id.clone(), request.assets.clone()),
1061        )
1062    }
1063
1064    pub fn publish_local_assets(
1065        &self,
1066        sender_id: impl Into<String>,
1067    ) -> Result<EvolutionEnvelope, EvoKernelError> {
1068        export_promoted_assets_from_store(self.store.as_ref(), sender_id)
1069    }
1070
1071    pub fn fetch_assets(
1072        &self,
1073        responder_id: impl Into<String>,
1074        query: &FetchQuery,
1075    ) -> Result<FetchResponse, EvoKernelError> {
1076        fetch_assets_from_store(self.store.as_ref(), responder_id, query)
1077    }
1078
1079    pub fn revoke_assets(&self, notice: &RevokeNotice) -> Result<RevokeNotice, EvoKernelError> {
1080        revoke_assets_in_store(self.store.as_ref(), notice)
1081    }
1082
1083    pub fn metrics_snapshot(&self) -> Result<EvolutionMetricsSnapshot, EvoKernelError> {
1084        evolution_metrics_snapshot(self.store.as_ref())
1085    }
1086
1087    pub fn render_metrics_prometheus(&self) -> Result<String, EvoKernelError> {
1088        self.metrics_snapshot().map(|snapshot| {
1089            let health = evolution_health_snapshot(&snapshot);
1090            render_evolution_metrics_prometheus(&snapshot, &health)
1091        })
1092    }
1093
1094    pub fn health_snapshot(&self) -> Result<EvolutionHealthSnapshot, EvoKernelError> {
1095        self.metrics_snapshot()
1096            .map(|snapshot| evolution_health_snapshot(&snapshot))
1097    }
1098}
1099
1100pub struct EvoKernel<S: KernelState> {
1101    pub kernel: Arc<Kernel<S>>,
1102    pub sandbox: Arc<dyn Sandbox>,
1103    pub validator: Arc<dyn Validator>,
1104    pub store: Arc<dyn EvolutionStore>,
1105    pub selector: Arc<dyn Selector>,
1106    pub governor: Arc<dyn Governor>,
1107    pub economics: Arc<Mutex<EvuLedger>>,
1108    pub remote_publishers: Arc<Mutex<BTreeMap<String, String>>>,
1109    pub stake_policy: StakePolicy,
1110    pub sandbox_policy: SandboxPolicy,
1111    pub validation_plan: ValidationPlan,
1112}
1113
1114impl<S: KernelState> EvoKernel<S> {
1115    fn recent_prior_mutation_ages_secs(
1116        &self,
1117        exclude_mutation_id: Option<&str>,
1118    ) -> Result<Vec<u64>, EvolutionError> {
1119        let now = Utc::now();
1120        let mut ages = self
1121            .store
1122            .scan(1)?
1123            .into_iter()
1124            .filter_map(|stored| match stored.event {
1125                EvolutionEvent::MutationDeclared { mutation }
1126                    if exclude_mutation_id != Some(mutation.intent.id.as_str()) =>
1127                {
1128                    Self::seconds_since_timestamp(&stored.timestamp, now)
1129                }
1130                _ => None,
1131            })
1132            .collect::<Vec<_>>();
1133        ages.sort_unstable();
1134        Ok(ages)
1135    }
1136
1137    fn seconds_since_timestamp(timestamp: &str, now: DateTime<Utc>) -> Option<u64> {
1138        let parsed = DateTime::parse_from_rfc3339(timestamp)
1139            .ok()?
1140            .with_timezone(&Utc);
1141        let elapsed = now.signed_duration_since(parsed);
1142        if elapsed < Duration::zero() {
1143            Some(0)
1144        } else {
1145            u64::try_from(elapsed.num_seconds()).ok()
1146        }
1147    }
1148
1149    pub fn new(
1150        kernel: Arc<Kernel<S>>,
1151        sandbox: Arc<dyn Sandbox>,
1152        validator: Arc<dyn Validator>,
1153        store: Arc<dyn EvolutionStore>,
1154    ) -> Self {
1155        let selector: Arc<dyn Selector> = Arc::new(StoreBackedSelector::new(store.clone()));
1156        Self {
1157            kernel,
1158            sandbox,
1159            validator,
1160            store,
1161            selector,
1162            governor: Arc::new(DefaultGovernor::default()),
1163            economics: Arc::new(Mutex::new(EvuLedger::default())),
1164            remote_publishers: Arc::new(Mutex::new(BTreeMap::new())),
1165            stake_policy: StakePolicy::default(),
1166            sandbox_policy: SandboxPolicy::oris_default(),
1167            validation_plan: ValidationPlan::oris_default(),
1168        }
1169    }
1170
1171    pub fn with_selector(mut self, selector: Arc<dyn Selector>) -> Self {
1172        self.selector = selector;
1173        self
1174    }
1175
1176    pub fn with_sandbox_policy(mut self, policy: SandboxPolicy) -> Self {
1177        self.sandbox_policy = policy;
1178        self
1179    }
1180
1181    pub fn with_governor(mut self, governor: Arc<dyn Governor>) -> Self {
1182        self.governor = governor;
1183        self
1184    }
1185
1186    pub fn with_economics(mut self, economics: Arc<Mutex<EvuLedger>>) -> Self {
1187        self.economics = economics;
1188        self
1189    }
1190
1191    pub fn with_stake_policy(mut self, policy: StakePolicy) -> Self {
1192        self.stake_policy = policy;
1193        self
1194    }
1195
1196    pub fn with_validation_plan(mut self, plan: ValidationPlan) -> Self {
1197        self.validation_plan = plan;
1198        self
1199    }
1200
1201    pub fn select_candidates(&self, input: &SelectorInput) -> Vec<GeneCandidate> {
1202        self.selector.select(input)
1203    }
1204
1205    pub async fn capture_successful_mutation(
1206        &self,
1207        run_id: &RunId,
1208        mutation: PreparedMutation,
1209    ) -> Result<Capsule, EvoKernelError> {
1210        Ok(self
1211            .capture_mutation_with_governor(run_id, mutation)
1212            .await?
1213            .capsule)
1214    }
1215
1216    pub async fn capture_mutation_with_governor(
1217        &self,
1218        run_id: &RunId,
1219        mutation: PreparedMutation,
1220    ) -> Result<CaptureOutcome, EvoKernelError> {
1221        self.store
1222            .append_event(EvolutionEvent::MutationDeclared {
1223                mutation: mutation.clone(),
1224            })
1225            .map_err(store_err)?;
1226
1227        let receipt = match self.sandbox.apply(&mutation, &self.sandbox_policy).await {
1228            Ok(receipt) => receipt,
1229            Err(err) => {
1230                self.store
1231                    .append_event(EvolutionEvent::MutationRejected {
1232                        mutation_id: mutation.intent.id.clone(),
1233                        reason: err.to_string(),
1234                    })
1235                    .map_err(store_err)?;
1236                return Err(EvoKernelError::Sandbox(err.to_string()));
1237            }
1238        };
1239
1240        self.store
1241            .append_event(EvolutionEvent::MutationApplied {
1242                mutation_id: mutation.intent.id.clone(),
1243                patch_hash: receipt.patch_hash.clone(),
1244                changed_files: receipt
1245                    .changed_files
1246                    .iter()
1247                    .map(|path| path.to_string_lossy().to_string())
1248                    .collect(),
1249            })
1250            .map_err(store_err)?;
1251
1252        let report = self
1253            .validator
1254            .run(&receipt, &self.validation_plan)
1255            .await
1256            .map_err(|err| EvoKernelError::Validation(err.to_string()))?;
1257        if !report.success {
1258            self.store
1259                .append_event(EvolutionEvent::ValidationFailed {
1260                    mutation_id: mutation.intent.id.clone(),
1261                    report: report.to_snapshot(&self.validation_plan.profile),
1262                    gene_id: None,
1263                })
1264                .map_err(store_err)?;
1265            return Err(EvoKernelError::ValidationFailed(report));
1266        }
1267
1268        self.store
1269            .append_event(EvolutionEvent::ValidationPassed {
1270                mutation_id: mutation.intent.id.clone(),
1271                report: report.to_snapshot(&self.validation_plan.profile),
1272                gene_id: None,
1273            })
1274            .map_err(store_err)?;
1275
1276        let extracted_signals = extract_deterministic_signals(&SignalExtractionInput {
1277            patch_diff: mutation.artifact.payload.clone(),
1278            intent: mutation.intent.intent.clone(),
1279            expected_effect: mutation.intent.expected_effect.clone(),
1280            declared_signals: mutation.intent.signals.clone(),
1281            changed_files: receipt
1282                .changed_files
1283                .iter()
1284                .map(|path| path.to_string_lossy().to_string())
1285                .collect(),
1286            validation_success: report.success,
1287            validation_logs: report.logs.clone(),
1288            stage_outputs: report
1289                .stages
1290                .iter()
1291                .flat_map(|stage| [stage.stdout.clone(), stage.stderr.clone()])
1292                .filter(|value| !value.is_empty())
1293                .collect(),
1294        });
1295        self.store
1296            .append_event(EvolutionEvent::SignalsExtracted {
1297                mutation_id: mutation.intent.id.clone(),
1298                hash: extracted_signals.hash.clone(),
1299                signals: extracted_signals.values.clone(),
1300            })
1301            .map_err(store_err)?;
1302
1303        let projection = self.store.rebuild_projection().map_err(store_err)?;
1304        let blast_radius = compute_blast_radius(&mutation.artifact.payload);
1305        let recent_mutation_ages_secs = self
1306            .recent_prior_mutation_ages_secs(Some(mutation.intent.id.as_str()))
1307            .map_err(store_err)?;
1308        let mut gene = derive_gene(
1309            &mutation,
1310            &receipt,
1311            &self.validation_plan.profile,
1312            &extracted_signals.values,
1313        );
1314        let success_count = projection
1315            .genes
1316            .iter()
1317            .find(|existing| existing.id == gene.id)
1318            .map(|existing| {
1319                projection
1320                    .capsules
1321                    .iter()
1322                    .filter(|capsule| capsule.gene_id == existing.id)
1323                    .count() as u64
1324            })
1325            .unwrap_or(0)
1326            + 1;
1327        let governor_decision = self.governor.evaluate(GovernorInput {
1328            candidate_source: CandidateSource::Local,
1329            success_count,
1330            blast_radius: blast_radius.clone(),
1331            replay_failures: 0,
1332            recent_mutation_ages_secs,
1333            current_confidence: 0.7,
1334            historical_peak_confidence: 0.7,
1335            confidence_last_updated_secs: Some(0),
1336        });
1337
1338        gene.state = governor_decision.target_state.clone();
1339        self.store
1340            .append_event(EvolutionEvent::GeneProjected { gene: gene.clone() })
1341            .map_err(store_err)?;
1342        self.store
1343            .append_event(EvolutionEvent::PromotionEvaluated {
1344                gene_id: gene.id.clone(),
1345                state: governor_decision.target_state.clone(),
1346                reason: governor_decision.reason.clone(),
1347            })
1348            .map_err(store_err)?;
1349        if matches!(governor_decision.target_state, AssetState::Promoted) {
1350            self.store
1351                .append_event(EvolutionEvent::GenePromoted {
1352                    gene_id: gene.id.clone(),
1353                })
1354                .map_err(store_err)?;
1355        }
1356        if let Some(spec_id) = &mutation.intent.spec_id {
1357            self.store
1358                .append_event(EvolutionEvent::SpecLinked {
1359                    mutation_id: mutation.intent.id.clone(),
1360                    spec_id: spec_id.clone(),
1361                })
1362                .map_err(store_err)?;
1363        }
1364
1365        let mut capsule = build_capsule(
1366            run_id,
1367            &mutation,
1368            &receipt,
1369            &report,
1370            &self.validation_plan.profile,
1371            &gene,
1372            &blast_radius,
1373        )
1374        .map_err(|err| EvoKernelError::Validation(err.to_string()))?;
1375        capsule.state = governor_decision.target_state.clone();
1376        self.store
1377            .append_event(EvolutionEvent::CapsuleCommitted {
1378                capsule: capsule.clone(),
1379            })
1380            .map_err(store_err)?;
1381        if matches!(governor_decision.target_state, AssetState::Quarantined) {
1382            self.store
1383                .append_event(EvolutionEvent::CapsuleQuarantined {
1384                    capsule_id: capsule.id.clone(),
1385                })
1386                .map_err(store_err)?;
1387        }
1388
1389        Ok(CaptureOutcome {
1390            capsule,
1391            gene,
1392            governor_decision,
1393        })
1394    }
1395
1396    pub async fn capture_from_proposal(
1397        &self,
1398        run_id: &RunId,
1399        proposal: &AgentMutationProposal,
1400        diff_payload: String,
1401        base_revision: Option<String>,
1402    ) -> Result<CaptureOutcome, EvoKernelError> {
1403        let intent = MutationIntent {
1404            id: next_id("proposal"),
1405            intent: proposal.intent.clone(),
1406            target: MutationTarget::Paths {
1407                allow: proposal.files.clone(),
1408            },
1409            expected_effect: proposal.expected_effect.clone(),
1410            risk: RiskLevel::Low,
1411            signals: proposal.files.clone(),
1412            spec_id: None,
1413        };
1414        self.capture_mutation_with_governor(
1415            run_id,
1416            prepare_mutation(intent, diff_payload, base_revision),
1417        )
1418        .await
1419    }
1420
1421    pub fn feedback_for_agent(outcome: &CaptureOutcome) -> ExecutionFeedback {
1422        ExecutionFeedback {
1423            accepted: !matches!(outcome.governor_decision.target_state, AssetState::Revoked),
1424            asset_state: Some(format!("{:?}", outcome.governor_decision.target_state)),
1425            summary: outcome.governor_decision.reason.clone(),
1426        }
1427    }
1428
1429    pub fn coordinate(&self, plan: CoordinationPlan) -> CoordinationResult {
1430        MultiAgentCoordinator::new().coordinate(plan)
1431    }
1432
1433    pub fn export_promoted_assets(
1434        &self,
1435        sender_id: impl Into<String>,
1436    ) -> Result<EvolutionEnvelope, EvoKernelError> {
1437        let sender_id = sender_id.into();
1438        let envelope = export_promoted_assets_from_store(self.store.as_ref(), sender_id.clone())?;
1439        if !envelope.assets.is_empty() {
1440            let mut ledger = self
1441                .economics
1442                .lock()
1443                .map_err(|_| EvoKernelError::Validation("economics ledger lock poisoned".into()))?;
1444            if ledger
1445                .reserve_publish_stake(&sender_id, &self.stake_policy)
1446                .is_none()
1447            {
1448                return Err(EvoKernelError::Validation(
1449                    "insufficient EVU for remote publish".into(),
1450                ));
1451            }
1452        }
1453        Ok(envelope)
1454    }
1455
1456    pub fn import_remote_envelope(
1457        &self,
1458        envelope: &EvolutionEnvelope,
1459    ) -> Result<ImportOutcome, EvoKernelError> {
1460        let outcome = import_remote_envelope_into_store(self.store.as_ref(), envelope)?;
1461        self.record_remote_publishers(envelope);
1462        Ok(outcome)
1463    }
1464
1465    pub fn fetch_assets(
1466        &self,
1467        responder_id: impl Into<String>,
1468        query: &FetchQuery,
1469    ) -> Result<FetchResponse, EvoKernelError> {
1470        fetch_assets_from_store(self.store.as_ref(), responder_id, query)
1471    }
1472
1473    pub fn revoke_assets(&self, notice: &RevokeNotice) -> Result<RevokeNotice, EvoKernelError> {
1474        revoke_assets_in_store(self.store.as_ref(), notice)
1475    }
1476
1477    pub async fn replay_or_fallback(
1478        &self,
1479        input: SelectorInput,
1480    ) -> Result<ReplayDecision, EvoKernelError> {
1481        let executor = StoreReplayExecutor {
1482            sandbox: self.sandbox.clone(),
1483            validator: self.validator.clone(),
1484            store: self.store.clone(),
1485            selector: self.selector.clone(),
1486            governor: self.governor.clone(),
1487            economics: Some(self.economics.clone()),
1488            remote_publishers: Some(self.remote_publishers.clone()),
1489            stake_policy: self.stake_policy.clone(),
1490        };
1491        executor
1492            .try_replay(&input, &self.sandbox_policy, &self.validation_plan)
1493            .await
1494            .map_err(|err| EvoKernelError::Validation(err.to_string()))
1495    }
1496
1497    pub fn economics_signal(&self, node_id: &str) -> Option<EconomicsSignal> {
1498        self.economics.lock().ok()?.governor_signal(node_id)
1499    }
1500
1501    pub fn selector_reputation_bias(&self) -> BTreeMap<String, f32> {
1502        self.economics
1503            .lock()
1504            .ok()
1505            .map(|locked| locked.selector_reputation_bias())
1506            .unwrap_or_default()
1507    }
1508
1509    pub fn metrics_snapshot(&self) -> Result<EvolutionMetricsSnapshot, EvoKernelError> {
1510        evolution_metrics_snapshot(self.store.as_ref())
1511    }
1512
1513    pub fn render_metrics_prometheus(&self) -> Result<String, EvoKernelError> {
1514        self.metrics_snapshot().map(|snapshot| {
1515            let health = evolution_health_snapshot(&snapshot);
1516            render_evolution_metrics_prometheus(&snapshot, &health)
1517        })
1518    }
1519
1520    pub fn health_snapshot(&self) -> Result<EvolutionHealthSnapshot, EvoKernelError> {
1521        self.metrics_snapshot()
1522            .map(|snapshot| evolution_health_snapshot(&snapshot))
1523    }
1524
1525    fn record_remote_publishers(&self, envelope: &EvolutionEnvelope) {
1526        let sender_id = envelope.sender_id.trim();
1527        if sender_id.is_empty() {
1528            return;
1529        }
1530        let Ok(mut publishers) = self.remote_publishers.lock() else {
1531            return;
1532        };
1533        for asset in &envelope.assets {
1534            match asset {
1535                NetworkAsset::Gene { gene } => {
1536                    publishers.insert(gene.id.clone(), sender_id.to_string());
1537                }
1538                NetworkAsset::Capsule { capsule } => {
1539                    publishers.insert(capsule.gene_id.clone(), sender_id.to_string());
1540                }
1541                NetworkAsset::EvolutionEvent { .. } => {}
1542            }
1543        }
1544    }
1545}
1546
1547pub fn prepare_mutation(
1548    intent: MutationIntent,
1549    diff_payload: String,
1550    base_revision: Option<String>,
1551) -> PreparedMutation {
1552    PreparedMutation {
1553        intent,
1554        artifact: MutationArtifact {
1555            encoding: ArtifactEncoding::UnifiedDiff,
1556            content_hash: compute_artifact_hash(&diff_payload),
1557            payload: diff_payload,
1558            base_revision,
1559        },
1560    }
1561}
1562
1563pub fn prepare_mutation_from_spec(
1564    plan: CompiledMutationPlan,
1565    diff_payload: String,
1566    base_revision: Option<String>,
1567) -> PreparedMutation {
1568    prepare_mutation(plan.mutation_intent, diff_payload, base_revision)
1569}
1570
1571pub fn default_evolution_store() -> Arc<dyn EvolutionStore> {
1572    Arc::new(oris_evolution::JsonlEvolutionStore::new(
1573        default_store_root(),
1574    ))
1575}
1576
1577fn derive_gene(
1578    mutation: &PreparedMutation,
1579    receipt: &SandboxReceipt,
1580    validation_profile: &str,
1581    extracted_signals: &[String],
1582) -> Gene {
1583    let mut strategy = BTreeSet::new();
1584    for file in &receipt.changed_files {
1585        if let Some(component) = file.components().next() {
1586            strategy.insert(component.as_os_str().to_string_lossy().to_string());
1587        }
1588    }
1589    for token in mutation
1590        .artifact
1591        .payload
1592        .split(|ch: char| !ch.is_ascii_alphanumeric())
1593    {
1594        if token.len() == 5
1595            && token.starts_with('E')
1596            && token[1..].chars().all(|ch| ch.is_ascii_digit())
1597        {
1598            strategy.insert(token.to_string());
1599        }
1600    }
1601    for token in mutation.intent.intent.split_whitespace().take(8) {
1602        strategy.insert(token.to_ascii_lowercase());
1603    }
1604    let strategy = strategy.into_iter().collect::<Vec<_>>();
1605    let id = stable_hash_json(&(extracted_signals, &strategy, validation_profile))
1606        .unwrap_or_else(|_| next_id("gene"));
1607    Gene {
1608        id,
1609        signals: extracted_signals.to_vec(),
1610        strategy,
1611        validation: vec![validation_profile.to_string()],
1612        state: AssetState::Promoted,
1613    }
1614}
1615
1616fn build_capsule(
1617    run_id: &RunId,
1618    mutation: &PreparedMutation,
1619    receipt: &SandboxReceipt,
1620    report: &ValidationReport,
1621    validation_profile: &str,
1622    gene: &Gene,
1623    blast_radius: &BlastRadius,
1624) -> Result<Capsule, EvolutionError> {
1625    let env = current_env_fingerprint(&receipt.workdir);
1626    let validator_hash = stable_hash_json(report)?;
1627    let diff_hash = mutation.artifact.content_hash.clone();
1628    let id = stable_hash_json(&(run_id, &gene.id, &diff_hash, &mutation.intent.id))?;
1629    Ok(Capsule {
1630        id,
1631        gene_id: gene.id.clone(),
1632        mutation_id: mutation.intent.id.clone(),
1633        run_id: run_id.clone(),
1634        diff_hash,
1635        confidence: 0.7,
1636        env,
1637        outcome: oris_evolution::Outcome {
1638            success: true,
1639            validation_profile: validation_profile.to_string(),
1640            validation_duration_ms: report.duration_ms,
1641            changed_files: receipt
1642                .changed_files
1643                .iter()
1644                .map(|path| path.to_string_lossy().to_string())
1645                .collect(),
1646            validator_hash,
1647            lines_changed: blast_radius.lines_changed,
1648            replay_verified: false,
1649        },
1650        state: AssetState::Promoted,
1651    })
1652}
1653
1654fn current_env_fingerprint(workdir: &Path) -> EnvFingerprint {
1655    let rustc_version = Command::new("rustc")
1656        .arg("--version")
1657        .output()
1658        .ok()
1659        .filter(|output| output.status.success())
1660        .map(|output| String::from_utf8_lossy(&output.stdout).trim().to_string())
1661        .unwrap_or_else(|| "rustc unknown".into());
1662    let cargo_lock_hash = fs::read(workdir.join("Cargo.lock"))
1663        .ok()
1664        .map(|bytes| {
1665            let value = String::from_utf8_lossy(&bytes);
1666            compute_artifact_hash(&value)
1667        })
1668        .unwrap_or_else(|| "missing-cargo-lock".into());
1669    let target_triple = format!(
1670        "{}-unknown-{}",
1671        std::env::consts::ARCH,
1672        std::env::consts::OS
1673    );
1674    EnvFingerprint {
1675        rustc_version,
1676        cargo_lock_hash,
1677        target_triple,
1678        os: std::env::consts::OS.to_string(),
1679    }
1680}
1681
1682fn extend_signal_tokens(out: &mut BTreeSet<String>, input: &str) {
1683    for raw in input.split(|ch: char| !ch.is_ascii_alphanumeric()) {
1684        let trimmed = raw.trim();
1685        if trimmed.is_empty() {
1686            continue;
1687        }
1688        let normalized = if is_rust_error_code(trimmed) {
1689            let mut chars = trimmed.chars();
1690            let prefix = chars
1691                .next()
1692                .map(|ch| ch.to_ascii_uppercase())
1693                .unwrap_or('E');
1694            format!("{prefix}{}", chars.as_str())
1695        } else {
1696            trimmed.to_ascii_lowercase()
1697        };
1698        if normalized.len() < 3 {
1699            continue;
1700        }
1701        out.insert(normalized);
1702    }
1703}
1704
1705fn normalize_signal_phrase(input: &str) -> Option<String> {
1706    let normalized = input
1707        .split(|ch: char| !ch.is_ascii_alphanumeric())
1708        .filter_map(|raw| {
1709            let trimmed = raw.trim();
1710            if trimmed.is_empty() {
1711                return None;
1712            }
1713            let normalized = if is_rust_error_code(trimmed) {
1714                let mut chars = trimmed.chars();
1715                let prefix = chars
1716                    .next()
1717                    .map(|ch| ch.to_ascii_uppercase())
1718                    .unwrap_or('E');
1719                format!("{prefix}{}", chars.as_str())
1720            } else {
1721                trimmed.to_ascii_lowercase()
1722            };
1723            if normalized.len() < 3 {
1724                None
1725            } else {
1726                Some(normalized)
1727            }
1728        })
1729        .collect::<Vec<_>>()
1730        .join(" ");
1731    if normalized.is_empty() {
1732        None
1733    } else {
1734        Some(normalized)
1735    }
1736}
1737
1738fn is_rust_error_code(value: &str) -> bool {
1739    value.len() == 5
1740        && matches!(value.as_bytes().first(), Some(b'e') | Some(b'E'))
1741        && value[1..].chars().all(|ch| ch.is_ascii_digit())
1742}
1743
1744fn find_declared_mutation(
1745    store: &dyn EvolutionStore,
1746    mutation_id: &MutationId,
1747) -> Result<Option<PreparedMutation>, EvolutionError> {
1748    for stored in store.scan(1)? {
1749        if let EvolutionEvent::MutationDeclared { mutation } = stored.event {
1750            if &mutation.intent.id == mutation_id {
1751                return Ok(Some(mutation));
1752            }
1753        }
1754    }
1755    Ok(None)
1756}
1757
1758fn exact_match_candidates(store: &dyn EvolutionStore, input: &SelectorInput) -> Vec<GeneCandidate> {
1759    let Ok(projection) = store.rebuild_projection() else {
1760        return Vec::new();
1761    };
1762    let capsules = projection.capsules.clone();
1763    let spec_ids_by_gene = projection.spec_ids_by_gene.clone();
1764    let requested_spec_id = input
1765        .spec_id
1766        .as_deref()
1767        .map(str::trim)
1768        .filter(|value| !value.is_empty());
1769    let signal_set = input
1770        .signals
1771        .iter()
1772        .map(|signal| signal.to_ascii_lowercase())
1773        .collect::<BTreeSet<_>>();
1774    let mut candidates = projection
1775        .genes
1776        .into_iter()
1777        .filter_map(|gene| {
1778            if gene.state != AssetState::Promoted {
1779                return None;
1780            }
1781            if let Some(spec_id) = requested_spec_id {
1782                let matches_spec = spec_ids_by_gene
1783                    .get(&gene.id)
1784                    .map(|values| {
1785                        values
1786                            .iter()
1787                            .any(|value| value.eq_ignore_ascii_case(spec_id))
1788                    })
1789                    .unwrap_or(false);
1790                if !matches_spec {
1791                    return None;
1792                }
1793            }
1794            let gene_signals = gene
1795                .signals
1796                .iter()
1797                .map(|signal| signal.to_ascii_lowercase())
1798                .collect::<BTreeSet<_>>();
1799            if gene_signals == signal_set {
1800                let mut matched_capsules = capsules
1801                    .iter()
1802                    .filter(|capsule| {
1803                        capsule.gene_id == gene.id && capsule.state == AssetState::Promoted
1804                    })
1805                    .cloned()
1806                    .collect::<Vec<_>>();
1807                matched_capsules.sort_by(|left, right| {
1808                    replay_environment_match_factor(&input.env, &right.env)
1809                        .partial_cmp(&replay_environment_match_factor(&input.env, &left.env))
1810                        .unwrap_or(std::cmp::Ordering::Equal)
1811                        .then_with(|| {
1812                            right
1813                                .confidence
1814                                .partial_cmp(&left.confidence)
1815                                .unwrap_or(std::cmp::Ordering::Equal)
1816                        })
1817                        .then_with(|| left.id.cmp(&right.id))
1818                });
1819                if matched_capsules.is_empty() {
1820                    None
1821                } else {
1822                    let score = matched_capsules
1823                        .first()
1824                        .map(|capsule| replay_environment_match_factor(&input.env, &capsule.env))
1825                        .unwrap_or(0.0);
1826                    Some(GeneCandidate {
1827                        gene,
1828                        score,
1829                        capsules: matched_capsules,
1830                    })
1831                }
1832            } else {
1833                None
1834            }
1835        })
1836        .collect::<Vec<_>>();
1837    candidates.sort_by(|left, right| {
1838        right
1839            .score
1840            .partial_cmp(&left.score)
1841            .unwrap_or(std::cmp::Ordering::Equal)
1842            .then_with(|| left.gene.id.cmp(&right.gene.id))
1843    });
1844    candidates
1845}
1846
1847fn quarantined_remote_exact_match_candidates(
1848    store: &dyn EvolutionStore,
1849    input: &SelectorInput,
1850) -> Vec<GeneCandidate> {
1851    let remote_asset_ids = store
1852        .scan(1)
1853        .ok()
1854        .map(|events| {
1855            events
1856                .into_iter()
1857                .filter_map(|stored| match stored.event {
1858                    EvolutionEvent::RemoteAssetImported {
1859                        source: CandidateSource::Remote,
1860                        asset_ids,
1861                    } => Some(asset_ids),
1862                    _ => None,
1863                })
1864                .flatten()
1865                .collect::<BTreeSet<_>>()
1866        })
1867        .unwrap_or_default();
1868    if remote_asset_ids.is_empty() {
1869        return Vec::new();
1870    }
1871
1872    let Ok(projection) = store.rebuild_projection() else {
1873        return Vec::new();
1874    };
1875    let capsules = projection.capsules.clone();
1876    let spec_ids_by_gene = projection.spec_ids_by_gene.clone();
1877    let requested_spec_id = input
1878        .spec_id
1879        .as_deref()
1880        .map(str::trim)
1881        .filter(|value| !value.is_empty());
1882    let signal_set = input
1883        .signals
1884        .iter()
1885        .map(|signal| signal.to_ascii_lowercase())
1886        .collect::<BTreeSet<_>>();
1887    let mut candidates = projection
1888        .genes
1889        .into_iter()
1890        .filter_map(|gene| {
1891            if gene.state != AssetState::Promoted {
1892                return None;
1893            }
1894            if let Some(spec_id) = requested_spec_id {
1895                let matches_spec = spec_ids_by_gene
1896                    .get(&gene.id)
1897                    .map(|values| {
1898                        values
1899                            .iter()
1900                            .any(|value| value.eq_ignore_ascii_case(spec_id))
1901                    })
1902                    .unwrap_or(false);
1903                if !matches_spec {
1904                    return None;
1905                }
1906            }
1907            let gene_signals = gene
1908                .signals
1909                .iter()
1910                .map(|signal| signal.to_ascii_lowercase())
1911                .collect::<BTreeSet<_>>();
1912            if gene_signals == signal_set {
1913                let mut matched_capsules = capsules
1914                    .iter()
1915                    .filter(|capsule| {
1916                        capsule.gene_id == gene.id
1917                            && capsule.state == AssetState::Quarantined
1918                            && remote_asset_ids.contains(&capsule.id)
1919                    })
1920                    .cloned()
1921                    .collect::<Vec<_>>();
1922                matched_capsules.sort_by(|left, right| {
1923                    replay_environment_match_factor(&input.env, &right.env)
1924                        .partial_cmp(&replay_environment_match_factor(&input.env, &left.env))
1925                        .unwrap_or(std::cmp::Ordering::Equal)
1926                        .then_with(|| {
1927                            right
1928                                .confidence
1929                                .partial_cmp(&left.confidence)
1930                                .unwrap_or(std::cmp::Ordering::Equal)
1931                        })
1932                        .then_with(|| left.id.cmp(&right.id))
1933                });
1934                if matched_capsules.is_empty() {
1935                    None
1936                } else {
1937                    let score = matched_capsules
1938                        .first()
1939                        .map(|capsule| replay_environment_match_factor(&input.env, &capsule.env))
1940                        .unwrap_or(0.0);
1941                    Some(GeneCandidate {
1942                        gene,
1943                        score,
1944                        capsules: matched_capsules,
1945                    })
1946                }
1947            } else {
1948                None
1949            }
1950        })
1951        .collect::<Vec<_>>();
1952    candidates.sort_by(|left, right| {
1953        right
1954            .score
1955            .partial_cmp(&left.score)
1956            .unwrap_or(std::cmp::Ordering::Equal)
1957            .then_with(|| left.gene.id.cmp(&right.gene.id))
1958    });
1959    candidates
1960}
1961
1962fn replay_environment_match_factor(input: &EnvFingerprint, candidate: &EnvFingerprint) -> f32 {
1963    let fields = [
1964        input
1965            .rustc_version
1966            .eq_ignore_ascii_case(&candidate.rustc_version),
1967        input
1968            .cargo_lock_hash
1969            .eq_ignore_ascii_case(&candidate.cargo_lock_hash),
1970        input
1971            .target_triple
1972            .eq_ignore_ascii_case(&candidate.target_triple),
1973        input.os.eq_ignore_ascii_case(&candidate.os),
1974    ];
1975    let matched_fields = fields.into_iter().filter(|matched| *matched).count() as f32;
1976    0.5 + ((matched_fields / 4.0) * 0.5)
1977}
1978
1979fn effective_candidate_score(
1980    candidate: &GeneCandidate,
1981    publishers_by_gene: &BTreeMap<String, String>,
1982    reputation_bias: &BTreeMap<String, f32>,
1983) -> f32 {
1984    let bias = publishers_by_gene
1985        .get(&candidate.gene.id)
1986        .and_then(|publisher| reputation_bias.get(publisher))
1987        .copied()
1988        .unwrap_or(0.0)
1989        .clamp(0.0, 1.0);
1990    candidate.score * (1.0 + (bias * 0.1))
1991}
1992
1993fn export_promoted_assets_from_store(
1994    store: &dyn EvolutionStore,
1995    sender_id: impl Into<String>,
1996) -> Result<EvolutionEnvelope, EvoKernelError> {
1997    let projection = store.rebuild_projection().map_err(store_err)?;
1998    let mut assets = Vec::new();
1999    for gene in projection
2000        .genes
2001        .into_iter()
2002        .filter(|gene| gene.state == AssetState::Promoted)
2003    {
2004        assets.push(NetworkAsset::Gene { gene });
2005    }
2006    for capsule in projection
2007        .capsules
2008        .into_iter()
2009        .filter(|capsule| capsule.state == AssetState::Promoted)
2010    {
2011        assets.push(NetworkAsset::Capsule { capsule });
2012    }
2013    Ok(EvolutionEnvelope::publish(sender_id, assets))
2014}
2015
2016fn import_remote_envelope_into_store(
2017    store: &dyn EvolutionStore,
2018    envelope: &EvolutionEnvelope,
2019) -> Result<ImportOutcome, EvoKernelError> {
2020    if !envelope.verify_content_hash() {
2021        return Err(EvoKernelError::Validation(
2022            "invalid evolution envelope hash".into(),
2023        ));
2024    }
2025
2026    let mut imported_asset_ids = Vec::new();
2027    for asset in &envelope.assets {
2028        match asset {
2029            NetworkAsset::Gene { gene } => {
2030                imported_asset_ids.push(gene.id.clone());
2031                store
2032                    .append_event(EvolutionEvent::RemoteAssetImported {
2033                        source: CandidateSource::Remote,
2034                        asset_ids: vec![gene.id.clone()],
2035                    })
2036                    .map_err(store_err)?;
2037                store
2038                    .append_event(EvolutionEvent::GeneProjected { gene: gene.clone() })
2039                    .map_err(store_err)?;
2040            }
2041            NetworkAsset::Capsule { capsule } => {
2042                imported_asset_ids.push(capsule.id.clone());
2043                store
2044                    .append_event(EvolutionEvent::RemoteAssetImported {
2045                        source: CandidateSource::Remote,
2046                        asset_ids: vec![capsule.id.clone()],
2047                    })
2048                    .map_err(store_err)?;
2049                let mut quarantined = capsule.clone();
2050                quarantined.state = AssetState::Quarantined;
2051                store
2052                    .append_event(EvolutionEvent::CapsuleCommitted {
2053                        capsule: quarantined.clone(),
2054                    })
2055                    .map_err(store_err)?;
2056                store
2057                    .append_event(EvolutionEvent::CapsuleQuarantined {
2058                        capsule_id: quarantined.id,
2059                    })
2060                    .map_err(store_err)?;
2061            }
2062            NetworkAsset::EvolutionEvent { event } => {
2063                if should_import_remote_event(event) {
2064                    store.append_event(event.clone()).map_err(store_err)?;
2065                }
2066            }
2067        }
2068    }
2069
2070    Ok(ImportOutcome {
2071        imported_asset_ids,
2072        accepted: true,
2073    })
2074}
2075
2076fn should_import_remote_event(event: &EvolutionEvent) -> bool {
2077    matches!(
2078        event,
2079        EvolutionEvent::MutationDeclared { .. } | EvolutionEvent::SpecLinked { .. }
2080    )
2081}
2082
2083fn fetch_assets_from_store(
2084    store: &dyn EvolutionStore,
2085    responder_id: impl Into<String>,
2086    query: &FetchQuery,
2087) -> Result<FetchResponse, EvoKernelError> {
2088    let projection = store.rebuild_projection().map_err(store_err)?;
2089    let normalized_signals: Vec<String> = query
2090        .signals
2091        .iter()
2092        .map(|signal| signal.trim().to_ascii_lowercase())
2093        .filter(|signal| !signal.is_empty())
2094        .collect();
2095    let matches_any_signal = |candidate: &str| {
2096        if normalized_signals.is_empty() {
2097            return true;
2098        }
2099        let candidate = candidate.to_ascii_lowercase();
2100        normalized_signals
2101            .iter()
2102            .any(|signal| candidate.contains(signal) || signal.contains(&candidate))
2103    };
2104
2105    let matched_genes: Vec<Gene> = projection
2106        .genes
2107        .into_iter()
2108        .filter(|gene| gene.state == AssetState::Promoted)
2109        .filter(|gene| gene.signals.iter().any(|signal| matches_any_signal(signal)))
2110        .collect();
2111    let matched_gene_ids: BTreeSet<String> =
2112        matched_genes.iter().map(|gene| gene.id.clone()).collect();
2113    let matched_capsules: Vec<Capsule> = projection
2114        .capsules
2115        .into_iter()
2116        .filter(|capsule| capsule.state == AssetState::Promoted)
2117        .filter(|capsule| matched_gene_ids.contains(&capsule.gene_id))
2118        .collect();
2119
2120    let mut assets = Vec::new();
2121    for gene in matched_genes {
2122        assets.push(NetworkAsset::Gene { gene });
2123    }
2124    for capsule in matched_capsules {
2125        assets.push(NetworkAsset::Capsule { capsule });
2126    }
2127
2128    Ok(FetchResponse {
2129        sender_id: responder_id.into(),
2130        assets,
2131    })
2132}
2133
2134fn revoke_assets_in_store(
2135    store: &dyn EvolutionStore,
2136    notice: &RevokeNotice,
2137) -> Result<RevokeNotice, EvoKernelError> {
2138    let projection = store.rebuild_projection().map_err(store_err)?;
2139    let requested: BTreeSet<String> = notice
2140        .asset_ids
2141        .iter()
2142        .map(|asset_id| asset_id.trim().to_string())
2143        .filter(|asset_id| !asset_id.is_empty())
2144        .collect();
2145    let mut revoked_gene_ids = BTreeSet::new();
2146    let mut quarantined_capsule_ids = BTreeSet::new();
2147
2148    for gene in &projection.genes {
2149        if requested.contains(&gene.id) {
2150            revoked_gene_ids.insert(gene.id.clone());
2151        }
2152    }
2153    for capsule in &projection.capsules {
2154        if requested.contains(&capsule.id) {
2155            quarantined_capsule_ids.insert(capsule.id.clone());
2156            revoked_gene_ids.insert(capsule.gene_id.clone());
2157        }
2158    }
2159    for capsule in &projection.capsules {
2160        if revoked_gene_ids.contains(&capsule.gene_id) {
2161            quarantined_capsule_ids.insert(capsule.id.clone());
2162        }
2163    }
2164
2165    for gene_id in &revoked_gene_ids {
2166        store
2167            .append_event(EvolutionEvent::GeneRevoked {
2168                gene_id: gene_id.clone(),
2169                reason: notice.reason.clone(),
2170            })
2171            .map_err(store_err)?;
2172    }
2173    for capsule_id in &quarantined_capsule_ids {
2174        store
2175            .append_event(EvolutionEvent::CapsuleQuarantined {
2176                capsule_id: capsule_id.clone(),
2177            })
2178            .map_err(store_err)?;
2179    }
2180
2181    let mut affected_ids: Vec<String> = revoked_gene_ids.into_iter().collect();
2182    affected_ids.extend(quarantined_capsule_ids);
2183    affected_ids.sort();
2184    affected_ids.dedup();
2185
2186    Ok(RevokeNotice {
2187        sender_id: notice.sender_id.clone(),
2188        asset_ids: affected_ids,
2189        reason: notice.reason.clone(),
2190    })
2191}
2192
2193fn evolution_metrics_snapshot(
2194    store: &dyn EvolutionStore,
2195) -> Result<EvolutionMetricsSnapshot, EvoKernelError> {
2196    let events = store.scan(1).map_err(store_err)?;
2197    let projection = store.rebuild_projection().map_err(store_err)?;
2198    let replay_success_total = events
2199        .iter()
2200        .filter(|stored| matches!(stored.event, EvolutionEvent::CapsuleReused { .. }))
2201        .count() as u64;
2202    let replay_failures_total = events
2203        .iter()
2204        .filter(|stored| is_replay_validation_failure(&stored.event))
2205        .count() as u64;
2206    let replay_attempts_total = replay_success_total + replay_failures_total;
2207    let mutation_declared_total = events
2208        .iter()
2209        .filter(|stored| matches!(stored.event, EvolutionEvent::MutationDeclared { .. }))
2210        .count() as u64;
2211    let promoted_mutations_total = events
2212        .iter()
2213        .filter(|stored| matches!(stored.event, EvolutionEvent::GenePromoted { .. }))
2214        .count() as u64;
2215    let gene_revocations_total = events
2216        .iter()
2217        .filter(|stored| matches!(stored.event, EvolutionEvent::GeneRevoked { .. }))
2218        .count() as u64;
2219    let cutoff = Utc::now() - Duration::hours(1);
2220    let mutation_velocity_last_hour = count_recent_events(&events, cutoff, |event| {
2221        matches!(event, EvolutionEvent::MutationDeclared { .. })
2222    });
2223    let revoke_frequency_last_hour = count_recent_events(&events, cutoff, |event| {
2224        matches!(event, EvolutionEvent::GeneRevoked { .. })
2225    });
2226    let promoted_genes = projection
2227        .genes
2228        .iter()
2229        .filter(|gene| gene.state == AssetState::Promoted)
2230        .count() as u64;
2231    let promoted_capsules = projection
2232        .capsules
2233        .iter()
2234        .filter(|capsule| capsule.state == AssetState::Promoted)
2235        .count() as u64;
2236
2237    Ok(EvolutionMetricsSnapshot {
2238        replay_attempts_total,
2239        replay_success_total,
2240        replay_success_rate: safe_ratio(replay_success_total, replay_attempts_total),
2241        mutation_declared_total,
2242        promoted_mutations_total,
2243        promotion_ratio: safe_ratio(promoted_mutations_total, mutation_declared_total),
2244        gene_revocations_total,
2245        mutation_velocity_last_hour,
2246        revoke_frequency_last_hour,
2247        promoted_genes,
2248        promoted_capsules,
2249        last_event_seq: events.last().map(|stored| stored.seq).unwrap_or(0),
2250    })
2251}
2252
2253fn evolution_health_snapshot(snapshot: &EvolutionMetricsSnapshot) -> EvolutionHealthSnapshot {
2254    EvolutionHealthSnapshot {
2255        status: "ok".into(),
2256        last_event_seq: snapshot.last_event_seq,
2257        promoted_genes: snapshot.promoted_genes,
2258        promoted_capsules: snapshot.promoted_capsules,
2259    }
2260}
2261
2262fn render_evolution_metrics_prometheus(
2263    snapshot: &EvolutionMetricsSnapshot,
2264    health: &EvolutionHealthSnapshot,
2265) -> String {
2266    let mut out = String::new();
2267    out.push_str(
2268        "# HELP oris_evolution_replay_attempts_total Total replay attempts that reached validation.\n",
2269    );
2270    out.push_str("# TYPE oris_evolution_replay_attempts_total counter\n");
2271    out.push_str(&format!(
2272        "oris_evolution_replay_attempts_total {}\n",
2273        snapshot.replay_attempts_total
2274    ));
2275    out.push_str("# HELP oris_evolution_replay_success_total Total replay attempts that reused a capsule successfully.\n");
2276    out.push_str("# TYPE oris_evolution_replay_success_total counter\n");
2277    out.push_str(&format!(
2278        "oris_evolution_replay_success_total {}\n",
2279        snapshot.replay_success_total
2280    ));
2281    out.push_str("# HELP oris_evolution_replay_success_rate Successful replay attempts divided by replay attempts that reached validation.\n");
2282    out.push_str("# TYPE oris_evolution_replay_success_rate gauge\n");
2283    out.push_str(&format!(
2284        "oris_evolution_replay_success_rate {:.6}\n",
2285        snapshot.replay_success_rate
2286    ));
2287    out.push_str(
2288        "# HELP oris_evolution_mutation_declared_total Total declared mutations recorded in the evolution log.\n",
2289    );
2290    out.push_str("# TYPE oris_evolution_mutation_declared_total counter\n");
2291    out.push_str(&format!(
2292        "oris_evolution_mutation_declared_total {}\n",
2293        snapshot.mutation_declared_total
2294    ));
2295    out.push_str("# HELP oris_evolution_promoted_mutations_total Total mutations promoted by the governor.\n");
2296    out.push_str("# TYPE oris_evolution_promoted_mutations_total counter\n");
2297    out.push_str(&format!(
2298        "oris_evolution_promoted_mutations_total {}\n",
2299        snapshot.promoted_mutations_total
2300    ));
2301    out.push_str(
2302        "# HELP oris_evolution_promotion_ratio Promoted mutations divided by declared mutations.\n",
2303    );
2304    out.push_str("# TYPE oris_evolution_promotion_ratio gauge\n");
2305    out.push_str(&format!(
2306        "oris_evolution_promotion_ratio {:.6}\n",
2307        snapshot.promotion_ratio
2308    ));
2309    out.push_str("# HELP oris_evolution_gene_revocations_total Total gene revocations recorded in the evolution log.\n");
2310    out.push_str("# TYPE oris_evolution_gene_revocations_total counter\n");
2311    out.push_str(&format!(
2312        "oris_evolution_gene_revocations_total {}\n",
2313        snapshot.gene_revocations_total
2314    ));
2315    out.push_str("# HELP oris_evolution_mutation_velocity_last_hour Declared mutations observed in the last hour.\n");
2316    out.push_str("# TYPE oris_evolution_mutation_velocity_last_hour gauge\n");
2317    out.push_str(&format!(
2318        "oris_evolution_mutation_velocity_last_hour {}\n",
2319        snapshot.mutation_velocity_last_hour
2320    ));
2321    out.push_str("# HELP oris_evolution_revoke_frequency_last_hour Gene revocations observed in the last hour.\n");
2322    out.push_str("# TYPE oris_evolution_revoke_frequency_last_hour gauge\n");
2323    out.push_str(&format!(
2324        "oris_evolution_revoke_frequency_last_hour {}\n",
2325        snapshot.revoke_frequency_last_hour
2326    ));
2327    out.push_str("# HELP oris_evolution_promoted_genes Current promoted genes in the evolution projection.\n");
2328    out.push_str("# TYPE oris_evolution_promoted_genes gauge\n");
2329    out.push_str(&format!(
2330        "oris_evolution_promoted_genes {}\n",
2331        snapshot.promoted_genes
2332    ));
2333    out.push_str("# HELP oris_evolution_promoted_capsules Current promoted capsules in the evolution projection.\n");
2334    out.push_str("# TYPE oris_evolution_promoted_capsules gauge\n");
2335    out.push_str(&format!(
2336        "oris_evolution_promoted_capsules {}\n",
2337        snapshot.promoted_capsules
2338    ));
2339    out.push_str("# HELP oris_evolution_store_last_event_seq Last visible append-only evolution event sequence.\n");
2340    out.push_str("# TYPE oris_evolution_store_last_event_seq gauge\n");
2341    out.push_str(&format!(
2342        "oris_evolution_store_last_event_seq {}\n",
2343        snapshot.last_event_seq
2344    ));
2345    out.push_str(
2346        "# HELP oris_evolution_health Evolution observability store health (1 = healthy).\n",
2347    );
2348    out.push_str("# TYPE oris_evolution_health gauge\n");
2349    out.push_str(&format!(
2350        "oris_evolution_health {}\n",
2351        u8::from(health.status == "ok")
2352    ));
2353    out
2354}
2355
2356fn count_recent_events(
2357    events: &[StoredEvolutionEvent],
2358    cutoff: DateTime<Utc>,
2359    predicate: impl Fn(&EvolutionEvent) -> bool,
2360) -> u64 {
2361    events
2362        .iter()
2363        .filter(|stored| {
2364            predicate(&stored.event)
2365                && parse_event_timestamp(&stored.timestamp)
2366                    .map(|timestamp| timestamp >= cutoff)
2367                    .unwrap_or(false)
2368        })
2369        .count() as u64
2370}
2371
2372fn parse_event_timestamp(raw: &str) -> Option<DateTime<Utc>> {
2373    DateTime::parse_from_rfc3339(raw)
2374        .ok()
2375        .map(|parsed| parsed.with_timezone(&Utc))
2376}
2377
2378fn is_replay_validation_failure(event: &EvolutionEvent) -> bool {
2379    matches!(
2380        event,
2381        EvolutionEvent::ValidationFailed {
2382            gene_id: Some(_),
2383            ..
2384        }
2385    )
2386}
2387
2388fn safe_ratio(numerator: u64, denominator: u64) -> f64 {
2389    if denominator == 0 {
2390        0.0
2391    } else {
2392        numerator as f64 / denominator as f64
2393    }
2394}
2395
2396fn store_err(err: EvolutionError) -> EvoKernelError {
2397    EvoKernelError::Store(err.to_string())
2398}
2399
2400#[cfg(test)]
2401mod tests {
2402    use super::*;
2403    use oris_agent_contract::{
2404        AgentRole, CoordinationPlan, CoordinationPrimitive, CoordinationTask,
2405    };
2406    use oris_kernel::{
2407        AllowAllPolicy, InMemoryEventStore, KernelMode, KernelState, NoopActionExecutor,
2408        NoopStepFn, StateUpdatedOnlyReducer,
2409    };
2410    use serde::{Deserialize, Serialize};
2411
2412    #[derive(Clone, Debug, Default, Serialize, Deserialize)]
2413    struct TestState;
2414
2415    impl KernelState for TestState {
2416        fn version(&self) -> u32 {
2417            1
2418        }
2419    }
2420
2421    fn temp_workspace(name: &str) -> std::path::PathBuf {
2422        let root =
2423            std::env::temp_dir().join(format!("oris-evokernel-{name}-{}", std::process::id()));
2424        if root.exists() {
2425            fs::remove_dir_all(&root).unwrap();
2426        }
2427        fs::create_dir_all(root.join("src")).unwrap();
2428        fs::write(
2429            root.join("Cargo.toml"),
2430            "[package]\nname = \"sample\"\nversion = \"0.1.0\"\nedition = \"2021\"\n",
2431        )
2432        .unwrap();
2433        fs::write(root.join("Cargo.lock"), "# lock\n").unwrap();
2434        fs::write(root.join("src/lib.rs"), "pub fn demo() -> usize { 1 }\n").unwrap();
2435        root
2436    }
2437
2438    fn test_kernel() -> Arc<Kernel<TestState>> {
2439        Arc::new(Kernel::<TestState> {
2440            events: Box::new(InMemoryEventStore::new()),
2441            snaps: None,
2442            reducer: Box::new(StateUpdatedOnlyReducer),
2443            exec: Box::new(NoopActionExecutor),
2444            step: Box::new(NoopStepFn),
2445            policy: Box::new(AllowAllPolicy),
2446            effect_sink: None,
2447            mode: KernelMode::Normal,
2448        })
2449    }
2450
2451    fn lightweight_plan() -> ValidationPlan {
2452        ValidationPlan {
2453            profile: "test".into(),
2454            stages: vec![ValidationStage::Command {
2455                program: "git".into(),
2456                args: vec!["--version".into()],
2457                timeout_ms: 5_000,
2458            }],
2459        }
2460    }
2461
2462    fn sample_mutation() -> PreparedMutation {
2463        prepare_mutation(
2464            MutationIntent {
2465                id: "mutation-1".into(),
2466                intent: "add README".into(),
2467                target: MutationTarget::Paths {
2468                    allow: vec!["README.md".into()],
2469                },
2470                expected_effect: "repo still builds".into(),
2471                risk: RiskLevel::Low,
2472                signals: vec!["missing readme".into()],
2473                spec_id: None,
2474            },
2475            "\
2476diff --git a/README.md b/README.md
2477new file mode 100644
2478index 0000000..1111111
2479--- /dev/null
2480+++ b/README.md
2481@@ -0,0 +1 @@
2482+# sample
2483"
2484            .into(),
2485            Some("HEAD".into()),
2486        )
2487    }
2488
2489    fn base_sandbox_policy() -> SandboxPolicy {
2490        SandboxPolicy {
2491            allowed_programs: vec!["git".into()],
2492            max_duration_ms: 60_000,
2493            max_output_bytes: 1024 * 1024,
2494            denied_env_prefixes: Vec::new(),
2495        }
2496    }
2497
2498    fn command_validator() -> Arc<dyn Validator> {
2499        Arc::new(CommandValidator::new(base_sandbox_policy()))
2500    }
2501
2502    fn replay_input(signal: &str) -> SelectorInput {
2503        let rustc_version = std::process::Command::new("rustc")
2504            .arg("--version")
2505            .output()
2506            .ok()
2507            .filter(|output| output.status.success())
2508            .map(|output| String::from_utf8_lossy(&output.stdout).trim().to_string())
2509            .unwrap_or_else(|| "rustc unknown".into());
2510        SelectorInput {
2511            signals: vec![signal.into()],
2512            env: EnvFingerprint {
2513                rustc_version,
2514                cargo_lock_hash: compute_artifact_hash("# lock\n"),
2515                target_triple: format!(
2516                    "{}-unknown-{}",
2517                    std::env::consts::ARCH,
2518                    std::env::consts::OS
2519                ),
2520                os: std::env::consts::OS.into(),
2521            },
2522            spec_id: None,
2523            limit: 1,
2524        }
2525    }
2526
2527    fn build_test_evo_with_store(
2528        name: &str,
2529        run_id: &str,
2530        validator: Arc<dyn Validator>,
2531        store: Arc<dyn EvolutionStore>,
2532    ) -> EvoKernel<TestState> {
2533        let workspace = temp_workspace(name);
2534        let sandbox: Arc<dyn Sandbox> = Arc::new(oris_sandbox::LocalProcessSandbox::new(
2535            run_id,
2536            &workspace,
2537            std::env::temp_dir(),
2538        ));
2539        EvoKernel::new(test_kernel(), sandbox, validator, store)
2540            .with_governor(Arc::new(DefaultGovernor::new(
2541                oris_governor::GovernorConfig {
2542                    promote_after_successes: 1,
2543                    ..Default::default()
2544                },
2545            )))
2546            .with_validation_plan(lightweight_plan())
2547            .with_sandbox_policy(base_sandbox_policy())
2548    }
2549
2550    fn build_test_evo(
2551        name: &str,
2552        run_id: &str,
2553        validator: Arc<dyn Validator>,
2554    ) -> (EvoKernel<TestState>, Arc<dyn EvolutionStore>) {
2555        let store_root = std::env::temp_dir().join(format!(
2556            "oris-evokernel-{name}-store-{}",
2557            std::process::id()
2558        ));
2559        if store_root.exists() {
2560            fs::remove_dir_all(&store_root).unwrap();
2561        }
2562        let store: Arc<dyn EvolutionStore> =
2563            Arc::new(oris_evolution::JsonlEvolutionStore::new(&store_root));
2564        let evo = build_test_evo_with_store(name, run_id, validator, store.clone());
2565        (evo, store)
2566    }
2567
2568    fn remote_publish_envelope(
2569        sender_id: &str,
2570        run_id: &str,
2571        gene_id: &str,
2572        capsule_id: &str,
2573        mutation_id: &str,
2574        signal: &str,
2575        file_name: &str,
2576        line: &str,
2577    ) -> EvolutionEnvelope {
2578        remote_publish_envelope_with_env(
2579            sender_id,
2580            run_id,
2581            gene_id,
2582            capsule_id,
2583            mutation_id,
2584            signal,
2585            file_name,
2586            line,
2587            replay_input(signal).env,
2588        )
2589    }
2590
2591    fn remote_publish_envelope_with_env(
2592        sender_id: &str,
2593        run_id: &str,
2594        gene_id: &str,
2595        capsule_id: &str,
2596        mutation_id: &str,
2597        signal: &str,
2598        file_name: &str,
2599        line: &str,
2600        env: EnvFingerprint,
2601    ) -> EvolutionEnvelope {
2602        let mutation = prepare_mutation(
2603            MutationIntent {
2604                id: mutation_id.into(),
2605                intent: format!("add {file_name}"),
2606                target: MutationTarget::Paths {
2607                    allow: vec![file_name.into()],
2608                },
2609                expected_effect: "replay should still validate".into(),
2610                risk: RiskLevel::Low,
2611                signals: vec![signal.into()],
2612                spec_id: None,
2613            },
2614            format!(
2615                "\
2616diff --git a/{file_name} b/{file_name}
2617new file mode 100644
2618index 0000000..1111111
2619--- /dev/null
2620+++ b/{file_name}
2621@@ -0,0 +1 @@
2622+{line}
2623"
2624            ),
2625            Some("HEAD".into()),
2626        );
2627        let gene = Gene {
2628            id: gene_id.into(),
2629            signals: vec![signal.into()],
2630            strategy: vec![file_name.into()],
2631            validation: vec!["test".into()],
2632            state: AssetState::Promoted,
2633        };
2634        let capsule = Capsule {
2635            id: capsule_id.into(),
2636            gene_id: gene_id.into(),
2637            mutation_id: mutation_id.into(),
2638            run_id: run_id.into(),
2639            diff_hash: mutation.artifact.content_hash.clone(),
2640            confidence: 0.9,
2641            env,
2642            outcome: Outcome {
2643                success: true,
2644                validation_profile: "test".into(),
2645                validation_duration_ms: 1,
2646                changed_files: vec![file_name.into()],
2647                validator_hash: "validator-hash".into(),
2648                lines_changed: 1,
2649                replay_verified: false,
2650            },
2651            state: AssetState::Promoted,
2652        };
2653        EvolutionEnvelope::publish(
2654            sender_id,
2655            vec![
2656                NetworkAsset::EvolutionEvent {
2657                    event: EvolutionEvent::MutationDeclared { mutation },
2658                },
2659                NetworkAsset::Gene { gene: gene.clone() },
2660                NetworkAsset::Capsule {
2661                    capsule: capsule.clone(),
2662                },
2663                NetworkAsset::EvolutionEvent {
2664                    event: EvolutionEvent::CapsuleReleased {
2665                        capsule_id: capsule.id.clone(),
2666                        state: AssetState::Promoted,
2667                    },
2668                },
2669            ],
2670        )
2671    }
2672
2673    struct FixedValidator {
2674        success: bool,
2675    }
2676
2677    #[async_trait]
2678    impl Validator for FixedValidator {
2679        async fn run(
2680            &self,
2681            _receipt: &SandboxReceipt,
2682            plan: &ValidationPlan,
2683        ) -> Result<ValidationReport, ValidationError> {
2684            Ok(ValidationReport {
2685                success: self.success,
2686                duration_ms: 1,
2687                stages: Vec::new(),
2688                logs: if self.success {
2689                    format!("{} ok", plan.profile)
2690                } else {
2691                    format!("{} failed", plan.profile)
2692                },
2693            })
2694        }
2695    }
2696
2697    #[test]
2698    fn coordination_planner_to_coder_handoff_is_deterministic() {
2699        let result = MultiAgentCoordinator::new().coordinate(CoordinationPlan {
2700            root_goal: "ship feature".into(),
2701            primitive: CoordinationPrimitive::Sequential,
2702            tasks: vec![
2703                CoordinationTask {
2704                    id: "planner".into(),
2705                    role: AgentRole::Planner,
2706                    description: "split the work".into(),
2707                    depends_on: Vec::new(),
2708                },
2709                CoordinationTask {
2710                    id: "coder".into(),
2711                    role: AgentRole::Coder,
2712                    description: "implement the patch".into(),
2713                    depends_on: vec!["planner".into()],
2714                },
2715            ],
2716            timeout_ms: 5_000,
2717            max_retries: 0,
2718        });
2719
2720        assert_eq!(result.completed_tasks, vec!["planner", "coder"]);
2721        assert!(result.failed_tasks.is_empty());
2722        assert!(result.messages.iter().any(|message| {
2723            message.from_role == AgentRole::Planner
2724                && message.to_role == AgentRole::Coder
2725                && message.task_id == "coder"
2726        }));
2727    }
2728
2729    #[test]
2730    fn coordination_repair_runs_only_after_coder_failure() {
2731        let result = MultiAgentCoordinator::new().coordinate(CoordinationPlan {
2732            root_goal: "fix broken implementation".into(),
2733            primitive: CoordinationPrimitive::Sequential,
2734            tasks: vec![
2735                CoordinationTask {
2736                    id: "coder".into(),
2737                    role: AgentRole::Coder,
2738                    description: "force-fail initial implementation".into(),
2739                    depends_on: Vec::new(),
2740                },
2741                CoordinationTask {
2742                    id: "repair".into(),
2743                    role: AgentRole::Repair,
2744                    description: "patch the failed implementation".into(),
2745                    depends_on: vec!["coder".into()],
2746                },
2747            ],
2748            timeout_ms: 5_000,
2749            max_retries: 0,
2750        });
2751
2752        assert_eq!(result.completed_tasks, vec!["repair"]);
2753        assert_eq!(result.failed_tasks, vec!["coder"]);
2754        assert!(result.messages.iter().any(|message| {
2755            message.from_role == AgentRole::Coder
2756                && message.to_role == AgentRole::Repair
2757                && message.task_id == "repair"
2758        }));
2759    }
2760
2761    #[test]
2762    fn coordination_optimizer_runs_after_successful_implementation_step() {
2763        let result = MultiAgentCoordinator::new().coordinate(CoordinationPlan {
2764            root_goal: "ship optimized patch".into(),
2765            primitive: CoordinationPrimitive::Sequential,
2766            tasks: vec![
2767                CoordinationTask {
2768                    id: "coder".into(),
2769                    role: AgentRole::Coder,
2770                    description: "implement a working patch".into(),
2771                    depends_on: Vec::new(),
2772                },
2773                CoordinationTask {
2774                    id: "optimizer".into(),
2775                    role: AgentRole::Optimizer,
2776                    description: "tighten the implementation".into(),
2777                    depends_on: vec!["coder".into()],
2778                },
2779            ],
2780            timeout_ms: 5_000,
2781            max_retries: 0,
2782        });
2783
2784        assert_eq!(result.completed_tasks, vec!["coder", "optimizer"]);
2785        assert!(result.failed_tasks.is_empty());
2786    }
2787
2788    #[test]
2789    fn coordination_parallel_waves_preserve_sorted_merge_order() {
2790        let result = MultiAgentCoordinator::new().coordinate(CoordinationPlan {
2791            root_goal: "parallelize safe tasks".into(),
2792            primitive: CoordinationPrimitive::Parallel,
2793            tasks: vec![
2794                CoordinationTask {
2795                    id: "z-task".into(),
2796                    role: AgentRole::Planner,
2797                    description: "analyze z".into(),
2798                    depends_on: Vec::new(),
2799                },
2800                CoordinationTask {
2801                    id: "a-task".into(),
2802                    role: AgentRole::Coder,
2803                    description: "implement a".into(),
2804                    depends_on: Vec::new(),
2805                },
2806                CoordinationTask {
2807                    id: "mid-task".into(),
2808                    role: AgentRole::Optimizer,
2809                    description: "polish after both".into(),
2810                    depends_on: vec!["z-task".into(), "a-task".into()],
2811                },
2812            ],
2813            timeout_ms: 5_000,
2814            max_retries: 0,
2815        });
2816
2817        assert_eq!(result.completed_tasks, vec!["a-task", "z-task", "mid-task"]);
2818        assert!(result.failed_tasks.is_empty());
2819    }
2820
2821    #[test]
2822    fn coordination_retries_stop_at_max_retries() {
2823        let result = MultiAgentCoordinator::new().coordinate(CoordinationPlan {
2824            root_goal: "retry then stop".into(),
2825            primitive: CoordinationPrimitive::Sequential,
2826            tasks: vec![CoordinationTask {
2827                id: "coder".into(),
2828                role: AgentRole::Coder,
2829                description: "force-fail this task".into(),
2830                depends_on: Vec::new(),
2831            }],
2832            timeout_ms: 5_000,
2833            max_retries: 1,
2834        });
2835
2836        assert!(result.completed_tasks.is_empty());
2837        assert_eq!(result.failed_tasks, vec!["coder"]);
2838        assert_eq!(
2839            result
2840                .messages
2841                .iter()
2842                .filter(|message| message.task_id == "coder" && message.content.contains("failed"))
2843                .count(),
2844            2
2845        );
2846    }
2847
2848    #[test]
2849    fn coordination_conditional_mode_skips_downstream_tasks_on_failure() {
2850        let result = MultiAgentCoordinator::new().coordinate(CoordinationPlan {
2851            root_goal: "skip blocked follow-up work".into(),
2852            primitive: CoordinationPrimitive::Conditional,
2853            tasks: vec![
2854                CoordinationTask {
2855                    id: "coder".into(),
2856                    role: AgentRole::Coder,
2857                    description: "force-fail the implementation".into(),
2858                    depends_on: Vec::new(),
2859                },
2860                CoordinationTask {
2861                    id: "optimizer".into(),
2862                    role: AgentRole::Optimizer,
2863                    description: "only optimize a successful implementation".into(),
2864                    depends_on: vec!["coder".into()],
2865                },
2866            ],
2867            timeout_ms: 5_000,
2868            max_retries: 0,
2869        });
2870
2871        assert!(result.completed_tasks.is_empty());
2872        assert_eq!(result.failed_tasks, vec!["coder"]);
2873        assert!(result.messages.iter().any(|message| {
2874            message.task_id == "optimizer"
2875                && message
2876                    .content
2877                    .contains("skipped due to failed dependency chain")
2878        }));
2879        assert!(!result
2880            .failed_tasks
2881            .iter()
2882            .any(|task_id| task_id == "optimizer"));
2883    }
2884
2885    #[tokio::test]
2886    async fn command_validator_aggregates_stage_reports() {
2887        let workspace = temp_workspace("validator");
2888        let receipt = SandboxReceipt {
2889            mutation_id: "m".into(),
2890            workdir: workspace,
2891            applied: true,
2892            changed_files: Vec::new(),
2893            patch_hash: "hash".into(),
2894            stdout_log: std::env::temp_dir().join("stdout.log"),
2895            stderr_log: std::env::temp_dir().join("stderr.log"),
2896        };
2897        let validator = CommandValidator::new(SandboxPolicy {
2898            allowed_programs: vec!["git".into()],
2899            max_duration_ms: 1_000,
2900            max_output_bytes: 1024,
2901            denied_env_prefixes: Vec::new(),
2902        });
2903        let report = validator
2904            .run(
2905                &receipt,
2906                &ValidationPlan {
2907                    profile: "test".into(),
2908                    stages: vec![ValidationStage::Command {
2909                        program: "git".into(),
2910                        args: vec!["--version".into()],
2911                        timeout_ms: 1_000,
2912                    }],
2913                },
2914            )
2915            .await
2916            .unwrap();
2917        assert_eq!(report.stages.len(), 1);
2918    }
2919
2920    #[tokio::test]
2921    async fn capture_successful_mutation_appends_capsule() {
2922        let (evo, store) = build_test_evo("capture", "run-1", command_validator());
2923        let capsule = evo
2924            .capture_successful_mutation(&"run-1".into(), sample_mutation())
2925            .await
2926            .unwrap();
2927        let events = store.scan(1).unwrap();
2928        assert!(events
2929            .iter()
2930            .any(|stored| matches!(stored.event, EvolutionEvent::CapsuleCommitted { .. })));
2931        assert!(!capsule.id.is_empty());
2932    }
2933
2934    #[tokio::test]
2935    async fn replay_hit_records_capsule_reused() {
2936        let (evo, store) = build_test_evo("replay", "run-2", command_validator());
2937        let capsule = evo
2938            .capture_successful_mutation(&"run-2".into(), sample_mutation())
2939            .await
2940            .unwrap();
2941        let decision = evo
2942            .replay_or_fallback(replay_input("missing readme"))
2943            .await
2944            .unwrap();
2945        assert!(decision.used_capsule);
2946        assert_eq!(decision.capsule_id, Some(capsule.id));
2947        assert!(store
2948            .scan(1)
2949            .unwrap()
2950            .iter()
2951            .any(|stored| matches!(stored.event, EvolutionEvent::CapsuleReused { .. })));
2952    }
2953
2954    #[tokio::test]
2955    async fn metrics_snapshot_tracks_replay_promotion_and_revocation_signals() {
2956        let (evo, _) = build_test_evo("metrics", "run-metrics", command_validator());
2957        let capsule = evo
2958            .capture_successful_mutation(&"run-metrics".into(), sample_mutation())
2959            .await
2960            .unwrap();
2961        let decision = evo
2962            .replay_or_fallback(replay_input("missing readme"))
2963            .await
2964            .unwrap();
2965        assert!(decision.used_capsule);
2966
2967        evo.revoke_assets(&RevokeNotice {
2968            sender_id: "node-metrics".into(),
2969            asset_ids: vec![capsule.id.clone()],
2970            reason: "manual test revoke".into(),
2971        })
2972        .unwrap();
2973
2974        let snapshot = evo.metrics_snapshot().unwrap();
2975        assert_eq!(snapshot.replay_attempts_total, 1);
2976        assert_eq!(snapshot.replay_success_total, 1);
2977        assert_eq!(snapshot.replay_success_rate, 1.0);
2978        assert_eq!(snapshot.mutation_declared_total, 1);
2979        assert_eq!(snapshot.promoted_mutations_total, 1);
2980        assert_eq!(snapshot.promotion_ratio, 1.0);
2981        assert_eq!(snapshot.gene_revocations_total, 1);
2982        assert_eq!(snapshot.mutation_velocity_last_hour, 1);
2983        assert_eq!(snapshot.revoke_frequency_last_hour, 1);
2984        assert_eq!(snapshot.promoted_genes, 0);
2985        assert_eq!(snapshot.promoted_capsules, 0);
2986
2987        let rendered = evo.render_metrics_prometheus().unwrap();
2988        assert!(rendered.contains("oris_evolution_replay_success_rate 1.000000"));
2989        assert!(rendered.contains("oris_evolution_promotion_ratio 1.000000"));
2990        assert!(rendered.contains("oris_evolution_revoke_frequency_last_hour 1"));
2991        assert!(rendered.contains("oris_evolution_mutation_velocity_last_hour 1"));
2992        assert!(rendered.contains("oris_evolution_health 1"));
2993    }
2994
2995    #[tokio::test]
2996    async fn remote_replay_prefers_closest_environment_match() {
2997        let (evo, _) = build_test_evo("remote-env", "run-remote-env", command_validator());
2998        let input = replay_input("env-signal");
2999
3000        let envelope_a = remote_publish_envelope_with_env(
3001            "node-a",
3002            "run-remote-a",
3003            "gene-a",
3004            "capsule-a",
3005            "mutation-a",
3006            "env-signal",
3007            "A.md",
3008            "# from a",
3009            input.env.clone(),
3010        );
3011        let envelope_b = remote_publish_envelope_with_env(
3012            "node-b",
3013            "run-remote-b",
3014            "gene-b",
3015            "capsule-b",
3016            "mutation-b",
3017            "env-signal",
3018            "B.md",
3019            "# from b",
3020            EnvFingerprint {
3021                rustc_version: "old-rustc".into(),
3022                cargo_lock_hash: "other-lock".into(),
3023                target_triple: "aarch64-apple-darwin".into(),
3024                os: "linux".into(),
3025            },
3026        );
3027
3028        evo.import_remote_envelope(&envelope_a).unwrap();
3029        evo.import_remote_envelope(&envelope_b).unwrap();
3030
3031        let decision = evo.replay_or_fallback(input).await.unwrap();
3032
3033        assert!(decision.used_capsule);
3034        assert_eq!(decision.capsule_id, Some("capsule-a".into()));
3035        assert!(!decision.fallback_to_planner);
3036    }
3037
3038    #[tokio::test]
3039    async fn remote_capsule_stays_quarantined_until_first_successful_replay() {
3040        let (evo, store) = build_test_evo(
3041            "remote-quarantine",
3042            "run-remote-quarantine",
3043            command_validator(),
3044        );
3045        let envelope = remote_publish_envelope(
3046            "node-remote",
3047            "run-remote-quarantine",
3048            "gene-remote",
3049            "capsule-remote",
3050            "mutation-remote",
3051            "remote-signal",
3052            "REMOTE.md",
3053            "# from remote",
3054        );
3055
3056        evo.import_remote_envelope(&envelope).unwrap();
3057
3058        let before_replay = store.rebuild_projection().unwrap();
3059        let imported_capsule = before_replay
3060            .capsules
3061            .iter()
3062            .find(|capsule| capsule.id == "capsule-remote")
3063            .unwrap();
3064        assert_eq!(imported_capsule.state, AssetState::Quarantined);
3065
3066        let decision = evo
3067            .replay_or_fallback(replay_input("remote-signal"))
3068            .await
3069            .unwrap();
3070
3071        assert!(decision.used_capsule);
3072        assert_eq!(decision.capsule_id, Some("capsule-remote".into()));
3073
3074        let after_replay = store.rebuild_projection().unwrap();
3075        let released_capsule = after_replay
3076            .capsules
3077            .iter()
3078            .find(|capsule| capsule.id == "capsule-remote")
3079            .unwrap();
3080        assert_eq!(released_capsule.state, AssetState::Promoted);
3081    }
3082
3083    #[tokio::test]
3084    async fn insufficient_evu_blocks_publish_but_not_local_replay() {
3085        let (evo, _) = build_test_evo("stake-gate", "run-stake", command_validator());
3086        let capsule = evo
3087            .capture_successful_mutation(&"run-stake".into(), sample_mutation())
3088            .await
3089            .unwrap();
3090        let publish = evo.export_promoted_assets("node-local");
3091        assert!(matches!(publish, Err(EvoKernelError::Validation(_))));
3092
3093        let decision = evo
3094            .replay_or_fallback(replay_input("missing readme"))
3095            .await
3096            .unwrap();
3097        assert!(decision.used_capsule);
3098        assert_eq!(decision.capsule_id, Some(capsule.id));
3099    }
3100
3101    #[tokio::test]
3102    async fn second_replay_validation_failure_revokes_gene_immediately() {
3103        let (capturer, store) = build_test_evo("revoke-replay", "run-capture", command_validator());
3104        let capsule = capturer
3105            .capture_successful_mutation(&"run-capture".into(), sample_mutation())
3106            .await
3107            .unwrap();
3108
3109        let failing_validator: Arc<dyn Validator> = Arc::new(FixedValidator { success: false });
3110        let failing_replay = build_test_evo_with_store(
3111            "revoke-replay",
3112            "run-replay-fail",
3113            failing_validator,
3114            store.clone(),
3115        );
3116
3117        let first = failing_replay
3118            .replay_or_fallback(replay_input("missing readme"))
3119            .await
3120            .unwrap();
3121        let second = failing_replay
3122            .replay_or_fallback(replay_input("missing readme"))
3123            .await
3124            .unwrap();
3125
3126        assert!(!first.used_capsule);
3127        assert!(first.fallback_to_planner);
3128        assert!(!second.used_capsule);
3129        assert!(second.fallback_to_planner);
3130
3131        let projection = store.rebuild_projection().unwrap();
3132        let gene = projection
3133            .genes
3134            .iter()
3135            .find(|gene| gene.id == capsule.gene_id)
3136            .unwrap();
3137        assert_eq!(gene.state, AssetState::Promoted);
3138        let committed_capsule = projection
3139            .capsules
3140            .iter()
3141            .find(|current| current.id == capsule.id)
3142            .unwrap();
3143        assert_eq!(committed_capsule.state, AssetState::Promoted);
3144
3145        let events = store.scan(1).unwrap();
3146        assert_eq!(
3147            events
3148                .iter()
3149                .filter(|stored| {
3150                    matches!(
3151                        &stored.event,
3152                        EvolutionEvent::ValidationFailed {
3153                            gene_id: Some(gene_id),
3154                            ..
3155                        } if gene_id == &capsule.gene_id
3156                    )
3157                })
3158                .count(),
3159            1
3160        );
3161        assert!(!events.iter().any(|stored| {
3162            matches!(
3163                &stored.event,
3164                EvolutionEvent::GeneRevoked { gene_id, .. } if gene_id == &capsule.gene_id
3165            )
3166        }));
3167
3168        let recovered = build_test_evo_with_store(
3169            "revoke-replay",
3170            "run-replay-check",
3171            command_validator(),
3172            store.clone(),
3173        );
3174        let after_revoke = recovered
3175            .replay_or_fallback(replay_input("missing readme"))
3176            .await
3177            .unwrap();
3178        assert!(!after_revoke.used_capsule);
3179        assert!(after_revoke.fallback_to_planner);
3180        assert!(after_revoke.reason.contains("below replay threshold"));
3181    }
3182
3183    #[tokio::test]
3184    async fn remote_reuse_success_rewards_publisher_and_biases_selection() {
3185        let ledger = Arc::new(Mutex::new(EvuLedger {
3186            accounts: vec![],
3187            reputations: vec![
3188                oris_economics::ReputationRecord {
3189                    node_id: "node-a".into(),
3190                    publish_success_rate: 0.4,
3191                    validator_accuracy: 0.4,
3192                    reuse_impact: 0,
3193                },
3194                oris_economics::ReputationRecord {
3195                    node_id: "node-b".into(),
3196                    publish_success_rate: 0.95,
3197                    validator_accuracy: 0.95,
3198                    reuse_impact: 8,
3199                },
3200            ],
3201        }));
3202        let (evo, _) = build_test_evo("remote-success", "run-remote", command_validator());
3203        let evo = evo.with_economics(ledger.clone());
3204
3205        let envelope_a = remote_publish_envelope(
3206            "node-a",
3207            "run-remote-a",
3208            "gene-a",
3209            "capsule-a",
3210            "mutation-a",
3211            "shared-signal",
3212            "A.md",
3213            "# from a",
3214        );
3215        let envelope_b = remote_publish_envelope(
3216            "node-b",
3217            "run-remote-b",
3218            "gene-b",
3219            "capsule-b",
3220            "mutation-b",
3221            "shared-signal",
3222            "B.md",
3223            "# from b",
3224        );
3225
3226        evo.import_remote_envelope(&envelope_a).unwrap();
3227        evo.import_remote_envelope(&envelope_b).unwrap();
3228
3229        let decision = evo
3230            .replay_or_fallback(replay_input("shared-signal"))
3231            .await
3232            .unwrap();
3233
3234        assert!(decision.used_capsule);
3235        assert_eq!(decision.capsule_id, Some("capsule-b".into()));
3236        let locked = ledger.lock().unwrap();
3237        let rewarded = locked
3238            .accounts
3239            .iter()
3240            .find(|item| item.node_id == "node-b")
3241            .unwrap();
3242        assert_eq!(rewarded.balance, evo.stake_policy.reuse_reward);
3243        assert!(
3244            locked.selector_reputation_bias()["node-b"]
3245                > locked.selector_reputation_bias()["node-a"]
3246        );
3247    }
3248
3249    #[tokio::test]
3250    async fn remote_reuse_failure_penalizes_remote_reputation() {
3251        let ledger = Arc::new(Mutex::new(EvuLedger::default()));
3252        let failing_validator: Arc<dyn Validator> = Arc::new(FixedValidator { success: false });
3253        let (evo, _) = build_test_evo("remote-failure", "run-failure", failing_validator);
3254        let evo = evo.with_economics(ledger.clone());
3255
3256        let envelope = remote_publish_envelope(
3257            "node-remote",
3258            "run-remote-failed",
3259            "gene-remote",
3260            "capsule-remote",
3261            "mutation-remote",
3262            "failure-signal",
3263            "FAILED.md",
3264            "# from remote",
3265        );
3266        evo.import_remote_envelope(&envelope).unwrap();
3267
3268        let decision = evo
3269            .replay_or_fallback(replay_input("failure-signal"))
3270            .await
3271            .unwrap();
3272
3273        assert!(!decision.used_capsule);
3274        assert!(decision.fallback_to_planner);
3275
3276        let signal = evo.economics_signal("node-remote").unwrap();
3277        assert_eq!(signal.available_evu, 0);
3278        assert!(signal.publish_success_rate < 0.5);
3279        assert!(signal.validator_accuracy < 0.5);
3280    }
3281}