Skip to main content

oris_evokernel/
core.rs

1//! EvoKernel orchestration: mutation capture, validation, capsule construction, and replay-first reuse.
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::fs;
5use std::path::Path;
6use std::process::Command;
7use std::sync::{Arc, Mutex};
8
9use async_trait::async_trait;
10use chrono::{DateTime, Duration, Utc};
11use oris_agent_contract::{
12    AgentRole, CoordinationMessage, CoordinationPlan, CoordinationPrimitive, CoordinationResult,
13    CoordinationTask, ExecutionFeedback, MutationProposal as AgentMutationProposal,
14};
15use oris_economics::{EconomicsSignal, EvuLedger, StakePolicy};
16use oris_evolution::{
17    compute_artifact_hash, next_id, stable_hash_json, AssetState, BlastRadius, CandidateSource,
18    Capsule, CapsuleId, EnvFingerprint, EvolutionError, EvolutionEvent, EvolutionProjection,
19    EvolutionStore, Gene, GeneCandidate, MutationId, PreparedMutation, Selector, SelectorInput,
20    StoreBackedSelector, StoredEvolutionEvent, ValidationSnapshot,
21};
22use oris_evolution_network::{EvolutionEnvelope, NetworkAsset};
23use oris_governor::{DefaultGovernor, Governor, GovernorDecision, GovernorInput};
24use oris_kernel::{Kernel, KernelState, RunId};
25use oris_sandbox::{
26    compute_blast_radius, execute_allowed_command, Sandbox, SandboxPolicy, SandboxReceipt,
27};
28use oris_spec::CompiledMutationPlan;
29use serde::{Deserialize, Serialize};
30use thiserror::Error;
31
32pub use oris_evolution::{
33    default_store_root, ArtifactEncoding, AssetState as EvoAssetState,
34    BlastRadius as EvoBlastRadius, CandidateSource as EvoCandidateSource,
35    EnvFingerprint as EvoEnvFingerprint, EvolutionStore as EvoEvolutionStore, JsonlEvolutionStore,
36    MutationArtifact, MutationIntent, MutationTarget, Outcome, RiskLevel,
37    SelectorInput as EvoSelectorInput,
38};
39pub use oris_evolution_network::{
40    FetchQuery, FetchResponse, MessageType, PublishRequest, RevokeNotice,
41};
42pub use oris_governor::{CoolingWindow, GovernorConfig, RevocationReason};
43pub use oris_sandbox::{LocalProcessSandbox, SandboxPolicy as EvoSandboxPolicy};
44pub use oris_spec::{SpecCompileError, SpecCompiler, SpecDocument};
45
46#[derive(Clone, Debug, Serialize, Deserialize)]
47pub struct ValidationPlan {
48    pub profile: String,
49    pub stages: Vec<ValidationStage>,
50}
51
52impl ValidationPlan {
53    pub fn oris_default() -> Self {
54        Self {
55            profile: "oris-default".into(),
56            stages: vec![
57                ValidationStage::Command {
58                    program: "cargo".into(),
59                    args: vec!["fmt".into(), "--all".into(), "--check".into()],
60                    timeout_ms: 60_000,
61                },
62                ValidationStage::Command {
63                    program: "cargo".into(),
64                    args: vec!["check".into(), "--workspace".into()],
65                    timeout_ms: 180_000,
66                },
67                ValidationStage::Command {
68                    program: "cargo".into(),
69                    args: vec![
70                        "test".into(),
71                        "-p".into(),
72                        "oris-kernel".into(),
73                        "-p".into(),
74                        "oris-evolution".into(),
75                        "-p".into(),
76                        "oris-sandbox".into(),
77                        "-p".into(),
78                        "oris-evokernel".into(),
79                        "--lib".into(),
80                    ],
81                    timeout_ms: 300_000,
82                },
83                ValidationStage::Command {
84                    program: "cargo".into(),
85                    args: vec![
86                        "test".into(),
87                        "-p".into(),
88                        "oris-runtime".into(),
89                        "--lib".into(),
90                    ],
91                    timeout_ms: 300_000,
92                },
93            ],
94        }
95    }
96}
97
98#[derive(Clone, Debug, Serialize, Deserialize)]
99pub enum ValidationStage {
100    Command {
101        program: String,
102        args: Vec<String>,
103        timeout_ms: u64,
104    },
105}
106
107#[derive(Clone, Debug, Serialize, Deserialize)]
108pub struct ValidationStageReport {
109    pub stage: String,
110    pub success: bool,
111    pub exit_code: Option<i32>,
112    pub duration_ms: u64,
113    pub stdout: String,
114    pub stderr: String,
115}
116
117#[derive(Clone, Debug, Serialize, Deserialize)]
118pub struct ValidationReport {
119    pub success: bool,
120    pub duration_ms: u64,
121    pub stages: Vec<ValidationStageReport>,
122    pub logs: String,
123}
124
125#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
126pub struct SignalExtractionInput {
127    pub patch_diff: String,
128    pub intent: String,
129    pub expected_effect: String,
130    pub declared_signals: Vec<String>,
131    pub changed_files: Vec<String>,
132    pub validation_success: bool,
133    pub validation_logs: String,
134    pub stage_outputs: Vec<String>,
135}
136
137#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
138pub struct SignalExtractionOutput {
139    pub values: Vec<String>,
140    pub hash: String,
141}
142
143#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
144pub struct SeedTemplate {
145    pub id: String,
146    pub intent: String,
147    pub signals: Vec<String>,
148    pub diff_payload: String,
149    pub validation_profile: String,
150}
151
152#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
153pub struct BootstrapReport {
154    pub seeded: bool,
155    pub genes_added: usize,
156    pub capsules_added: usize,
157}
158
159impl ValidationReport {
160    pub fn to_snapshot(&self, profile: &str) -> ValidationSnapshot {
161        ValidationSnapshot {
162            success: self.success,
163            profile: profile.to_string(),
164            duration_ms: self.duration_ms,
165            summary: if self.success {
166                "validation passed".into()
167            } else {
168                "validation failed".into()
169            },
170        }
171    }
172}
173
174pub fn extract_deterministic_signals(input: &SignalExtractionInput) -> SignalExtractionOutput {
175    let mut signals = BTreeSet::new();
176
177    for declared in &input.declared_signals {
178        if let Some(phrase) = normalize_signal_phrase(declared) {
179            signals.insert(phrase);
180        }
181        extend_signal_tokens(&mut signals, declared);
182    }
183
184    for text in [
185        input.patch_diff.as_str(),
186        input.intent.as_str(),
187        input.expected_effect.as_str(),
188        input.validation_logs.as_str(),
189    ] {
190        extend_signal_tokens(&mut signals, text);
191    }
192
193    for changed_file in &input.changed_files {
194        extend_signal_tokens(&mut signals, changed_file);
195    }
196
197    for stage_output in &input.stage_outputs {
198        extend_signal_tokens(&mut signals, stage_output);
199    }
200
201    signals.insert(if input.validation_success {
202        "validation passed".into()
203    } else {
204        "validation failed".into()
205    });
206
207    let values = signals.into_iter().take(32).collect::<Vec<_>>();
208    let hash =
209        stable_hash_json(&values).unwrap_or_else(|_| compute_artifact_hash(&values.join("\n")));
210    SignalExtractionOutput { values, hash }
211}
212
213#[derive(Debug, Error)]
214pub enum ValidationError {
215    #[error("validation execution failed: {0}")]
216    Execution(String),
217}
218
219#[async_trait]
220pub trait Validator: Send + Sync {
221    async fn run(
222        &self,
223        receipt: &SandboxReceipt,
224        plan: &ValidationPlan,
225    ) -> Result<ValidationReport, ValidationError>;
226}
227
228pub struct CommandValidator {
229    policy: SandboxPolicy,
230}
231
232impl CommandValidator {
233    pub fn new(policy: SandboxPolicy) -> Self {
234        Self { policy }
235    }
236}
237
238#[async_trait]
239impl Validator for CommandValidator {
240    async fn run(
241        &self,
242        receipt: &SandboxReceipt,
243        plan: &ValidationPlan,
244    ) -> Result<ValidationReport, ValidationError> {
245        let started = std::time::Instant::now();
246        let mut stages = Vec::new();
247        let mut success = true;
248        let mut logs = String::new();
249
250        for stage in &plan.stages {
251            match stage {
252                ValidationStage::Command {
253                    program,
254                    args,
255                    timeout_ms,
256                } => {
257                    let result = execute_allowed_command(
258                        &self.policy,
259                        &receipt.workdir,
260                        program,
261                        args,
262                        *timeout_ms,
263                    )
264                    .await;
265                    let report = match result {
266                        Ok(output) => ValidationStageReport {
267                            stage: format!("{program} {}", args.join(" ")),
268                            success: output.success,
269                            exit_code: output.exit_code,
270                            duration_ms: output.duration_ms,
271                            stdout: output.stdout,
272                            stderr: output.stderr,
273                        },
274                        Err(err) => ValidationStageReport {
275                            stage: format!("{program} {}", args.join(" ")),
276                            success: false,
277                            exit_code: None,
278                            duration_ms: 0,
279                            stdout: String::new(),
280                            stderr: err.to_string(),
281                        },
282                    };
283                    if !report.success {
284                        success = false;
285                    }
286                    if !report.stdout.is_empty() {
287                        logs.push_str(&report.stdout);
288                        logs.push('\n');
289                    }
290                    if !report.stderr.is_empty() {
291                        logs.push_str(&report.stderr);
292                        logs.push('\n');
293                    }
294                    stages.push(report);
295                    if !success {
296                        break;
297                    }
298                }
299            }
300        }
301
302        Ok(ValidationReport {
303            success,
304            duration_ms: started.elapsed().as_millis() as u64,
305            stages,
306            logs,
307        })
308    }
309}
310
311#[derive(Clone, Debug)]
312pub struct ReplayDecision {
313    pub used_capsule: bool,
314    pub capsule_id: Option<CapsuleId>,
315    pub fallback_to_planner: bool,
316    pub reason: String,
317}
318
319#[derive(Clone, Copy, Debug, Eq, PartialEq)]
320enum CoordinationTaskState {
321    Ready,
322    Waiting,
323    BlockedByFailure,
324    PermanentlyBlocked,
325}
326
327#[derive(Clone, Debug, Default)]
328pub struct MultiAgentCoordinator;
329
330impl MultiAgentCoordinator {
331    pub fn new() -> Self {
332        Self
333    }
334
335    pub fn coordinate(&self, plan: CoordinationPlan) -> CoordinationResult {
336        let primitive = plan.primitive.clone();
337        let root_goal = plan.root_goal.clone();
338        let timeout_ms = plan.timeout_ms;
339        let max_retries = plan.max_retries;
340        let mut tasks = BTreeMap::new();
341        for task in plan.tasks {
342            tasks.entry(task.id.clone()).or_insert(task);
343        }
344
345        let mut pending = tasks.keys().cloned().collect::<BTreeSet<_>>();
346        let mut completed = BTreeSet::new();
347        let mut failed = BTreeSet::new();
348        let mut completed_order = Vec::new();
349        let mut failed_order = Vec::new();
350        let mut skipped = BTreeSet::new();
351        let mut attempts = BTreeMap::new();
352        let mut messages = Vec::new();
353
354        loop {
355            if matches!(primitive, CoordinationPrimitive::Conditional) {
356                self.apply_conditional_skips(
357                    &tasks,
358                    &mut pending,
359                    &completed,
360                    &failed,
361                    &mut skipped,
362                    &mut messages,
363                );
364            }
365
366            let mut ready = self.ready_task_ids(&tasks, &pending, &completed, &failed, &skipped);
367            if ready.is_empty() {
368                break;
369            }
370            if matches!(primitive, CoordinationPrimitive::Sequential) {
371                ready.truncate(1);
372            }
373
374            for task_id in ready {
375                let Some(task) = tasks.get(&task_id) else {
376                    continue;
377                };
378                if !pending.contains(&task_id) {
379                    continue;
380                }
381                self.record_handoff_messages(task, &tasks, &completed, &failed, &mut messages);
382
383                let prior_failures = attempts.get(&task_id).copied().unwrap_or(0);
384                if Self::simulate_task_failure(task, prior_failures) {
385                    let failure_count = prior_failures + 1;
386                    attempts.insert(task_id.clone(), failure_count);
387                    let will_retry = failure_count <= max_retries;
388                    messages.push(CoordinationMessage {
389                        from_role: task.role.clone(),
390                        to_role: task.role.clone(),
391                        task_id: task_id.clone(),
392                        content: if will_retry {
393                            format!("task {task_id} failed on attempt {failure_count} and will retry")
394                        } else {
395                            format!(
396                                "task {task_id} failed on attempt {failure_count} and exhausted retries"
397                            )
398                        },
399                    });
400                    if !will_retry {
401                        pending.remove(&task_id);
402                        if failed.insert(task_id.clone()) {
403                            failed_order.push(task_id);
404                        }
405                    }
406                    continue;
407                }
408
409                pending.remove(&task_id);
410                if completed.insert(task_id.clone()) {
411                    completed_order.push(task_id);
412                }
413            }
414        }
415
416        let blocked_ids = pending.into_iter().collect::<Vec<_>>();
417        for task_id in blocked_ids {
418            let Some(task) = tasks.get(&task_id) else {
419                continue;
420            };
421            let state = self.classify_task(task, &tasks, &completed, &failed, &skipped);
422            let content = match state {
423                CoordinationTaskState::BlockedByFailure => {
424                    format!("task {task_id} blocked by failed dependencies")
425                }
426                CoordinationTaskState::PermanentlyBlocked => {
427                    format!("task {task_id} has invalid coordination prerequisites")
428                }
429                CoordinationTaskState::Waiting => {
430                    format!("task {task_id} has unresolved dependencies")
431                }
432                CoordinationTaskState::Ready => {
433                    format!("task {task_id} was left pending unexpectedly")
434                }
435            };
436            messages.push(CoordinationMessage {
437                from_role: task.role.clone(),
438                to_role: task.role.clone(),
439                task_id: task_id.clone(),
440                content,
441            });
442            if failed.insert(task_id.clone()) {
443                failed_order.push(task_id);
444            }
445        }
446
447        CoordinationResult {
448            completed_tasks: completed_order,
449            failed_tasks: failed_order,
450            messages,
451            summary: format!(
452                "goal '{}' completed {} tasks, failed {}, skipped {} using {:?} coordination (timeout={}ms, max_retries={})",
453                root_goal,
454                completed.len(),
455                failed.len(),
456                skipped.len(),
457                primitive,
458                timeout_ms,
459                max_retries
460            ),
461        }
462    }
463
464    fn ready_task_ids(
465        &self,
466        tasks: &BTreeMap<String, CoordinationTask>,
467        pending: &BTreeSet<String>,
468        completed: &BTreeSet<String>,
469        failed: &BTreeSet<String>,
470        skipped: &BTreeSet<String>,
471    ) -> Vec<String> {
472        pending
473            .iter()
474            .filter_map(|task_id| {
475                let task = tasks.get(task_id)?;
476                (self.classify_task(task, tasks, completed, failed, skipped)
477                    == CoordinationTaskState::Ready)
478                    .then(|| task_id.clone())
479            })
480            .collect()
481    }
482
483    fn apply_conditional_skips(
484        &self,
485        tasks: &BTreeMap<String, CoordinationTask>,
486        pending: &mut BTreeSet<String>,
487        completed: &BTreeSet<String>,
488        failed: &BTreeSet<String>,
489        skipped: &mut BTreeSet<String>,
490        messages: &mut Vec<CoordinationMessage>,
491    ) {
492        let skip_ids = pending
493            .iter()
494            .filter_map(|task_id| {
495                let task = tasks.get(task_id)?;
496                (self.classify_task(task, tasks, completed, failed, skipped)
497                    == CoordinationTaskState::BlockedByFailure)
498                    .then(|| task_id.clone())
499            })
500            .collect::<Vec<_>>();
501
502        for task_id in skip_ids {
503            let Some(task) = tasks.get(&task_id) else {
504                continue;
505            };
506            pending.remove(&task_id);
507            skipped.insert(task_id.clone());
508            messages.push(CoordinationMessage {
509                from_role: task.role.clone(),
510                to_role: task.role.clone(),
511                task_id: task_id.clone(),
512                content: format!("task {task_id} skipped due to failed dependency chain"),
513            });
514        }
515    }
516
517    fn classify_task(
518        &self,
519        task: &CoordinationTask,
520        tasks: &BTreeMap<String, CoordinationTask>,
521        completed: &BTreeSet<String>,
522        failed: &BTreeSet<String>,
523        skipped: &BTreeSet<String>,
524    ) -> CoordinationTaskState {
525        match task.role {
526            AgentRole::Planner | AgentRole::Coder => {
527                let mut waiting = false;
528                for dependency_id in &task.depends_on {
529                    if !tasks.contains_key(dependency_id) {
530                        return CoordinationTaskState::PermanentlyBlocked;
531                    }
532                    if skipped.contains(dependency_id) || failed.contains(dependency_id) {
533                        return CoordinationTaskState::BlockedByFailure;
534                    }
535                    if !completed.contains(dependency_id) {
536                        waiting = true;
537                    }
538                }
539                if waiting {
540                    CoordinationTaskState::Waiting
541                } else {
542                    CoordinationTaskState::Ready
543                }
544            }
545            AgentRole::Repair => {
546                let mut waiting = false;
547                let mut has_coder_dependency = false;
548                let mut has_failed_coder = false;
549                for dependency_id in &task.depends_on {
550                    let Some(dependency) = tasks.get(dependency_id) else {
551                        return CoordinationTaskState::PermanentlyBlocked;
552                    };
553                    let is_coder = matches!(dependency.role, AgentRole::Coder);
554                    if is_coder {
555                        has_coder_dependency = true;
556                    }
557                    if skipped.contains(dependency_id) {
558                        return CoordinationTaskState::BlockedByFailure;
559                    }
560                    if failed.contains(dependency_id) {
561                        if is_coder {
562                            has_failed_coder = true;
563                        } else {
564                            return CoordinationTaskState::BlockedByFailure;
565                        }
566                        continue;
567                    }
568                    if !completed.contains(dependency_id) {
569                        waiting = true;
570                    }
571                }
572                if !has_coder_dependency {
573                    CoordinationTaskState::PermanentlyBlocked
574                } else if waiting {
575                    CoordinationTaskState::Waiting
576                } else if has_failed_coder {
577                    CoordinationTaskState::Ready
578                } else {
579                    CoordinationTaskState::PermanentlyBlocked
580                }
581            }
582            AgentRole::Optimizer => {
583                let mut waiting = false;
584                let mut has_impl_dependency = false;
585                let mut has_completed_impl = false;
586                let mut has_failed_impl = false;
587                for dependency_id in &task.depends_on {
588                    let Some(dependency) = tasks.get(dependency_id) else {
589                        return CoordinationTaskState::PermanentlyBlocked;
590                    };
591                    let is_impl = matches!(dependency.role, AgentRole::Coder | AgentRole::Repair);
592                    if is_impl {
593                        has_impl_dependency = true;
594                    }
595                    if skipped.contains(dependency_id) || failed.contains(dependency_id) {
596                        if is_impl {
597                            has_failed_impl = true;
598                            continue;
599                        }
600                        return CoordinationTaskState::BlockedByFailure;
601                    }
602                    if completed.contains(dependency_id) {
603                        if is_impl {
604                            has_completed_impl = true;
605                        }
606                        continue;
607                    }
608                    waiting = true;
609                }
610                if !has_impl_dependency {
611                    CoordinationTaskState::PermanentlyBlocked
612                } else if waiting {
613                    CoordinationTaskState::Waiting
614                } else if has_completed_impl {
615                    CoordinationTaskState::Ready
616                } else if has_failed_impl {
617                    CoordinationTaskState::BlockedByFailure
618                } else {
619                    CoordinationTaskState::PermanentlyBlocked
620                }
621            }
622        }
623    }
624
625    fn record_handoff_messages(
626        &self,
627        task: &CoordinationTask,
628        tasks: &BTreeMap<String, CoordinationTask>,
629        completed: &BTreeSet<String>,
630        failed: &BTreeSet<String>,
631        messages: &mut Vec<CoordinationMessage>,
632    ) {
633        let mut dependency_ids = task.depends_on.clone();
634        dependency_ids.sort();
635        dependency_ids.dedup();
636
637        for dependency_id in dependency_ids {
638            let Some(dependency) = tasks.get(&dependency_id) else {
639                continue;
640            };
641            if completed.contains(&dependency_id) {
642                messages.push(CoordinationMessage {
643                    from_role: dependency.role.clone(),
644                    to_role: task.role.clone(),
645                    task_id: task.id.clone(),
646                    content: format!("handoff from {dependency_id} to {}", task.id),
647                });
648            } else if failed.contains(&dependency_id) {
649                messages.push(CoordinationMessage {
650                    from_role: dependency.role.clone(),
651                    to_role: task.role.clone(),
652                    task_id: task.id.clone(),
653                    content: format!("failed dependency {dependency_id} routed to {}", task.id),
654                });
655            }
656        }
657    }
658
659    fn simulate_task_failure(task: &CoordinationTask, prior_failures: u32) -> bool {
660        let normalized = task.description.to_ascii_lowercase();
661        normalized.contains("force-fail")
662            || (normalized.contains("fail-once") && prior_failures == 0)
663    }
664}
665
666#[derive(Debug, Error)]
667pub enum ReplayError {
668    #[error("store error: {0}")]
669    Store(String),
670    #[error("sandbox error: {0}")]
671    Sandbox(String),
672    #[error("validation error: {0}")]
673    Validation(String),
674}
675
676#[async_trait]
677pub trait ReplayExecutor: Send + Sync {
678    async fn try_replay(
679        &self,
680        input: &SelectorInput,
681        policy: &SandboxPolicy,
682        validation: &ValidationPlan,
683    ) -> Result<ReplayDecision, ReplayError>;
684
685    async fn try_replay_for_run(
686        &self,
687        run_id: &RunId,
688        input: &SelectorInput,
689        policy: &SandboxPolicy,
690        validation: &ValidationPlan,
691    ) -> Result<ReplayDecision, ReplayError> {
692        let _ = run_id;
693        self.try_replay(input, policy, validation).await
694    }
695}
696
697pub struct StoreReplayExecutor {
698    pub sandbox: Arc<dyn Sandbox>,
699    pub validator: Arc<dyn Validator>,
700    pub store: Arc<dyn EvolutionStore>,
701    pub selector: Arc<dyn Selector>,
702    pub governor: Arc<dyn Governor>,
703    pub economics: Option<Arc<Mutex<EvuLedger>>>,
704    pub remote_publishers: Option<Arc<Mutex<BTreeMap<String, String>>>>,
705    pub stake_policy: StakePolicy,
706}
707
708struct ReplayCandidates {
709    candidates: Vec<GeneCandidate>,
710    exact_match: bool,
711}
712
713#[async_trait]
714impl ReplayExecutor for StoreReplayExecutor {
715    async fn try_replay(
716        &self,
717        input: &SelectorInput,
718        policy: &SandboxPolicy,
719        validation: &ValidationPlan,
720    ) -> Result<ReplayDecision, ReplayError> {
721        self.try_replay_inner(None, input, policy, validation).await
722    }
723
724    async fn try_replay_for_run(
725        &self,
726        run_id: &RunId,
727        input: &SelectorInput,
728        policy: &SandboxPolicy,
729        validation: &ValidationPlan,
730    ) -> Result<ReplayDecision, ReplayError> {
731        self.try_replay_inner(Some(run_id), input, policy, validation)
732            .await
733    }
734}
735
736impl StoreReplayExecutor {
737    fn collect_replay_candidates(&self, input: &SelectorInput) -> ReplayCandidates {
738        let mut selector_input = input.clone();
739        if self.economics.is_some() && self.remote_publishers.is_some() {
740            selector_input.limit = selector_input.limit.max(4);
741        }
742        let mut candidates = self.selector.select(&selector_input);
743        self.rerank_with_reputation_bias(&mut candidates);
744        let mut exact_match = false;
745        if candidates.is_empty() {
746            let mut exact_candidates = exact_match_candidates(self.store.as_ref(), input);
747            self.rerank_with_reputation_bias(&mut exact_candidates);
748            if !exact_candidates.is_empty() {
749                candidates = exact_candidates;
750                exact_match = true;
751            }
752        }
753        if candidates.is_empty() {
754            let mut remote_candidates =
755                quarantined_remote_exact_match_candidates(self.store.as_ref(), input);
756            self.rerank_with_reputation_bias(&mut remote_candidates);
757            if !remote_candidates.is_empty() {
758                candidates = remote_candidates;
759                exact_match = true;
760            }
761        }
762        candidates.truncate(input.limit.max(1));
763        ReplayCandidates {
764            candidates,
765            exact_match,
766        }
767    }
768
769    async fn try_replay_inner(
770        &self,
771        replay_run_id: Option<&RunId>,
772        input: &SelectorInput,
773        policy: &SandboxPolicy,
774        validation: &ValidationPlan,
775    ) -> Result<ReplayDecision, ReplayError> {
776        let ReplayCandidates {
777            candidates,
778            exact_match,
779        } = self.collect_replay_candidates(input);
780        let Some(best) = candidates.into_iter().next() else {
781            return Ok(ReplayDecision {
782                used_capsule: false,
783                capsule_id: None,
784                fallback_to_planner: true,
785                reason: "no matching gene".into(),
786            });
787        };
788        if !exact_match && best.score < 0.82 {
789            return Ok(ReplayDecision {
790                used_capsule: false,
791                capsule_id: None,
792                fallback_to_planner: true,
793                reason: format!("best gene score {:.3} below replay threshold", best.score),
794            });
795        }
796
797        let Some(capsule) = best.capsules.first().cloned() else {
798            return Ok(ReplayDecision {
799                used_capsule: false,
800                capsule_id: None,
801                fallback_to_planner: true,
802                reason: "candidate gene has no capsule".into(),
803            });
804        };
805        let remote_publisher = self.publisher_for_capsule(&capsule.id);
806
807        let Some(mutation) = find_declared_mutation(self.store.as_ref(), &capsule.mutation_id)
808            .map_err(|err| ReplayError::Store(err.to_string()))?
809        else {
810            return Ok(ReplayDecision {
811                used_capsule: false,
812                capsule_id: None,
813                fallback_to_planner: true,
814                reason: "mutation payload missing from store".into(),
815            });
816        };
817
818        let receipt = match self.sandbox.apply(&mutation, policy).await {
819            Ok(receipt) => receipt,
820            Err(err) => {
821                self.record_reuse_settlement(remote_publisher.as_deref(), false);
822                return Ok(ReplayDecision {
823                    used_capsule: false,
824                    capsule_id: Some(capsule.id.clone()),
825                    fallback_to_planner: true,
826                    reason: format!("replay patch apply failed: {err}"),
827                });
828            }
829        };
830
831        let report = self
832            .validator
833            .run(&receipt, validation)
834            .await
835            .map_err(|err| ReplayError::Validation(err.to_string()))?;
836        if !report.success {
837            self.record_replay_validation_failure(&best, &capsule, validation, &report)?;
838            self.record_reuse_settlement(remote_publisher.as_deref(), false);
839            return Ok(ReplayDecision {
840                used_capsule: false,
841                capsule_id: Some(capsule.id.clone()),
842                fallback_to_planner: true,
843                reason: "replay validation failed".into(),
844            });
845        }
846
847        if matches!(capsule.state, AssetState::Quarantined) {
848            self.store
849                .append_event(EvolutionEvent::ValidationPassed {
850                    mutation_id: capsule.mutation_id.clone(),
851                    report: report.to_snapshot(&validation.profile),
852                    gene_id: Some(best.gene.id.clone()),
853                })
854                .map_err(|err| ReplayError::Store(err.to_string()))?;
855            if matches!(best.gene.state, AssetState::Quarantined) {
856                self.store
857                    .append_event(EvolutionEvent::PromotionEvaluated {
858                        gene_id: best.gene.id.clone(),
859                        state: AssetState::Promoted,
860                        reason: "remote asset locally validated via replay".into(),
861                    })
862                    .map_err(|err| ReplayError::Store(err.to_string()))?;
863                self.store
864                    .append_event(EvolutionEvent::GenePromoted {
865                        gene_id: best.gene.id.clone(),
866                    })
867                    .map_err(|err| ReplayError::Store(err.to_string()))?;
868            }
869            self.store
870                .append_event(EvolutionEvent::CapsuleReleased {
871                    capsule_id: capsule.id.clone(),
872                    state: AssetState::Promoted,
873                })
874                .map_err(|err| ReplayError::Store(err.to_string()))?;
875        }
876
877        self.store
878            .append_event(EvolutionEvent::CapsuleReused {
879                capsule_id: capsule.id.clone(),
880                gene_id: capsule.gene_id.clone(),
881                run_id: capsule.run_id.clone(),
882                replay_run_id: replay_run_id.cloned(),
883            })
884            .map_err(|err| ReplayError::Store(err.to_string()))?;
885        self.record_reuse_settlement(remote_publisher.as_deref(), true);
886
887        Ok(ReplayDecision {
888            used_capsule: true,
889            capsule_id: Some(capsule.id),
890            fallback_to_planner: false,
891            reason: if exact_match {
892                "replayed via cold-start lookup".into()
893            } else {
894                "replayed via selector".into()
895            },
896        })
897    }
898
899    fn rerank_with_reputation_bias(&self, candidates: &mut [GeneCandidate]) {
900        let Some(ledger) = self.economics.as_ref() else {
901            return;
902        };
903        let reputation_bias = ledger
904            .lock()
905            .ok()
906            .map(|locked| locked.selector_reputation_bias())
907            .unwrap_or_default();
908        if reputation_bias.is_empty() {
909            return;
910        }
911        let required_assets = candidates
912            .iter()
913            .filter_map(|candidate| {
914                candidate
915                    .capsules
916                    .first()
917                    .map(|capsule| capsule.id.as_str())
918            })
919            .collect::<Vec<_>>();
920        let publisher_map = self.remote_publishers_snapshot(&required_assets);
921        if publisher_map.is_empty() {
922            return;
923        }
924        candidates.sort_by(|left, right| {
925            effective_candidate_score(right, &publisher_map, &reputation_bias)
926                .partial_cmp(&effective_candidate_score(
927                    left,
928                    &publisher_map,
929                    &reputation_bias,
930                ))
931                .unwrap_or(std::cmp::Ordering::Equal)
932                .then_with(|| left.gene.id.cmp(&right.gene.id))
933        });
934    }
935
936    fn publisher_for_capsule(&self, capsule_id: &str) -> Option<String> {
937        self.remote_publishers_snapshot(&[capsule_id])
938            .get(capsule_id)
939            .cloned()
940    }
941
942    fn remote_publishers_snapshot(&self, required_assets: &[&str]) -> BTreeMap<String, String> {
943        let cached = self
944            .remote_publishers
945            .as_ref()
946            .and_then(|remote_publishers| {
947                remote_publishers.lock().ok().map(|locked| locked.clone())
948            })
949            .unwrap_or_default();
950        if !cached.is_empty()
951            && required_assets
952                .iter()
953                .all(|asset_id| cached.contains_key(*asset_id))
954        {
955            return cached;
956        }
957
958        let persisted = remote_publishers_by_asset_from_store(self.store.as_ref());
959        if persisted.is_empty() {
960            return cached;
961        }
962
963        let mut merged = cached;
964        for (asset_id, sender_id) in persisted {
965            merged.entry(asset_id).or_insert(sender_id);
966        }
967
968        if let Some(remote_publishers) = self.remote_publishers.as_ref() {
969            if let Ok(mut locked) = remote_publishers.lock() {
970                for (asset_id, sender_id) in &merged {
971                    locked.entry(asset_id.clone()).or_insert(sender_id.clone());
972                }
973            }
974        }
975
976        merged
977    }
978
979    fn record_reuse_settlement(&self, publisher_id: Option<&str>, success: bool) {
980        let Some(publisher_id) = publisher_id else {
981            return;
982        };
983        let Some(ledger) = self.economics.as_ref() else {
984            return;
985        };
986        if let Ok(mut locked) = ledger.lock() {
987            locked.settle_remote_reuse(publisher_id, success, &self.stake_policy);
988        }
989    }
990
991    fn record_replay_validation_failure(
992        &self,
993        best: &GeneCandidate,
994        capsule: &Capsule,
995        validation: &ValidationPlan,
996        report: &ValidationReport,
997    ) -> Result<(), ReplayError> {
998        let projection = projection_snapshot(self.store.as_ref())
999            .map_err(|err| ReplayError::Store(err.to_string()))?;
1000        let (current_confidence, historical_peak_confidence, confidence_last_updated_secs) =
1001            Self::confidence_context(&projection, &best.gene.id);
1002
1003        self.store
1004            .append_event(EvolutionEvent::ValidationFailed {
1005                mutation_id: capsule.mutation_id.clone(),
1006                report: report.to_snapshot(&validation.profile),
1007                gene_id: Some(best.gene.id.clone()),
1008            })
1009            .map_err(|err| ReplayError::Store(err.to_string()))?;
1010
1011        let replay_failures = self.replay_failure_count(&best.gene.id)?;
1012        let governor_decision = self.governor.evaluate(GovernorInput {
1013            candidate_source: if self.publisher_for_capsule(&capsule.id).is_some() {
1014                CandidateSource::Remote
1015            } else {
1016                CandidateSource::Local
1017            },
1018            success_count: 0,
1019            blast_radius: BlastRadius {
1020                files_changed: capsule.outcome.changed_files.len(),
1021                lines_changed: capsule.outcome.lines_changed,
1022            },
1023            replay_failures,
1024            recent_mutation_ages_secs: Vec::new(),
1025            current_confidence,
1026            historical_peak_confidence,
1027            confidence_last_updated_secs,
1028        });
1029
1030        if matches!(governor_decision.target_state, AssetState::Revoked) {
1031            self.store
1032                .append_event(EvolutionEvent::PromotionEvaluated {
1033                    gene_id: best.gene.id.clone(),
1034                    state: AssetState::Revoked,
1035                    reason: governor_decision.reason.clone(),
1036                })
1037                .map_err(|err| ReplayError::Store(err.to_string()))?;
1038            self.store
1039                .append_event(EvolutionEvent::GeneRevoked {
1040                    gene_id: best.gene.id.clone(),
1041                    reason: governor_decision.reason,
1042                })
1043                .map_err(|err| ReplayError::Store(err.to_string()))?;
1044            for related in &best.capsules {
1045                self.store
1046                    .append_event(EvolutionEvent::CapsuleQuarantined {
1047                        capsule_id: related.id.clone(),
1048                    })
1049                    .map_err(|err| ReplayError::Store(err.to_string()))?;
1050            }
1051        }
1052
1053        Ok(())
1054    }
1055
1056    fn confidence_context(
1057        projection: &EvolutionProjection,
1058        gene_id: &str,
1059    ) -> (f32, f32, Option<u64>) {
1060        let peak_confidence = projection
1061            .capsules
1062            .iter()
1063            .filter(|capsule| capsule.gene_id == gene_id)
1064            .map(|capsule| capsule.confidence)
1065            .fold(0.0_f32, f32::max);
1066        let age_secs = projection
1067            .last_updated_at
1068            .get(gene_id)
1069            .and_then(|timestamp| Self::seconds_since_timestamp(timestamp, Utc::now()));
1070        (peak_confidence, peak_confidence, age_secs)
1071    }
1072
1073    fn seconds_since_timestamp(timestamp: &str, now: DateTime<Utc>) -> Option<u64> {
1074        let parsed = DateTime::parse_from_rfc3339(timestamp)
1075            .ok()?
1076            .with_timezone(&Utc);
1077        let elapsed = now.signed_duration_since(parsed);
1078        if elapsed < Duration::zero() {
1079            Some(0)
1080        } else {
1081            u64::try_from(elapsed.num_seconds()).ok()
1082        }
1083    }
1084
1085    fn replay_failure_count(&self, gene_id: &str) -> Result<u64, ReplayError> {
1086        Ok(self
1087            .store
1088            .scan(1)
1089            .map_err(|err| ReplayError::Store(err.to_string()))?
1090            .into_iter()
1091            .filter(|stored| {
1092                matches!(
1093                    &stored.event,
1094                    EvolutionEvent::ValidationFailed {
1095                        gene_id: Some(current_gene_id),
1096                        ..
1097                    } if current_gene_id == gene_id
1098                )
1099            })
1100            .count() as u64)
1101    }
1102}
1103
1104#[derive(Debug, Error)]
1105pub enum EvoKernelError {
1106    #[error("sandbox error: {0}")]
1107    Sandbox(String),
1108    #[error("validation error: {0}")]
1109    Validation(String),
1110    #[error("validation failed")]
1111    ValidationFailed(ValidationReport),
1112    #[error("store error: {0}")]
1113    Store(String),
1114}
1115
1116#[derive(Clone, Debug)]
1117pub struct CaptureOutcome {
1118    pub capsule: Capsule,
1119    pub gene: Gene,
1120    pub governor_decision: GovernorDecision,
1121}
1122
1123#[derive(Clone, Debug, Serialize, Deserialize)]
1124pub struct ImportOutcome {
1125    pub imported_asset_ids: Vec<String>,
1126    pub accepted: bool,
1127}
1128
1129#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
1130pub struct EvolutionMetricsSnapshot {
1131    pub replay_attempts_total: u64,
1132    pub replay_success_total: u64,
1133    pub replay_success_rate: f64,
1134    pub mutation_declared_total: u64,
1135    pub promoted_mutations_total: u64,
1136    pub promotion_ratio: f64,
1137    pub gene_revocations_total: u64,
1138    pub mutation_velocity_last_hour: u64,
1139    pub revoke_frequency_last_hour: u64,
1140    pub promoted_genes: u64,
1141    pub promoted_capsules: u64,
1142    pub last_event_seq: u64,
1143}
1144
1145#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
1146pub struct EvolutionHealthSnapshot {
1147    pub status: String,
1148    pub last_event_seq: u64,
1149    pub promoted_genes: u64,
1150    pub promoted_capsules: u64,
1151}
1152
1153#[derive(Clone)]
1154pub struct EvolutionNetworkNode {
1155    pub store: Arc<dyn EvolutionStore>,
1156}
1157
1158impl EvolutionNetworkNode {
1159    pub fn new(store: Arc<dyn EvolutionStore>) -> Self {
1160        Self { store }
1161    }
1162
1163    pub fn with_default_store() -> Self {
1164        Self {
1165            store: Arc::new(JsonlEvolutionStore::new(default_store_root())),
1166        }
1167    }
1168
1169    pub fn accept_publish_request(
1170        &self,
1171        request: &PublishRequest,
1172    ) -> Result<ImportOutcome, EvoKernelError> {
1173        import_remote_envelope_into_store(
1174            self.store.as_ref(),
1175            &EvolutionEnvelope::publish(request.sender_id.clone(), request.assets.clone()),
1176            None,
1177        )
1178    }
1179
1180    pub fn publish_local_assets(
1181        &self,
1182        sender_id: impl Into<String>,
1183    ) -> Result<EvolutionEnvelope, EvoKernelError> {
1184        export_promoted_assets_from_store(self.store.as_ref(), sender_id)
1185    }
1186
1187    pub fn fetch_assets(
1188        &self,
1189        responder_id: impl Into<String>,
1190        query: &FetchQuery,
1191    ) -> Result<FetchResponse, EvoKernelError> {
1192        fetch_assets_from_store(self.store.as_ref(), responder_id, query)
1193    }
1194
1195    pub fn revoke_assets(&self, notice: &RevokeNotice) -> Result<RevokeNotice, EvoKernelError> {
1196        revoke_assets_in_store(self.store.as_ref(), notice)
1197    }
1198
1199    pub fn metrics_snapshot(&self) -> Result<EvolutionMetricsSnapshot, EvoKernelError> {
1200        evolution_metrics_snapshot(self.store.as_ref())
1201    }
1202
1203    pub fn render_metrics_prometheus(&self) -> Result<String, EvoKernelError> {
1204        self.metrics_snapshot().map(|snapshot| {
1205            let health = evolution_health_snapshot(&snapshot);
1206            render_evolution_metrics_prometheus(&snapshot, &health)
1207        })
1208    }
1209
1210    pub fn health_snapshot(&self) -> Result<EvolutionHealthSnapshot, EvoKernelError> {
1211        self.metrics_snapshot()
1212            .map(|snapshot| evolution_health_snapshot(&snapshot))
1213    }
1214}
1215
1216pub struct EvoKernel<S: KernelState> {
1217    pub kernel: Arc<Kernel<S>>,
1218    pub sandbox: Arc<dyn Sandbox>,
1219    pub validator: Arc<dyn Validator>,
1220    pub store: Arc<dyn EvolutionStore>,
1221    pub selector: Arc<dyn Selector>,
1222    pub governor: Arc<dyn Governor>,
1223    pub economics: Arc<Mutex<EvuLedger>>,
1224    pub remote_publishers: Arc<Mutex<BTreeMap<String, String>>>,
1225    pub stake_policy: StakePolicy,
1226    pub sandbox_policy: SandboxPolicy,
1227    pub validation_plan: ValidationPlan,
1228}
1229
1230impl<S: KernelState> EvoKernel<S> {
1231    fn recent_prior_mutation_ages_secs(
1232        &self,
1233        exclude_mutation_id: Option<&str>,
1234    ) -> Result<Vec<u64>, EvolutionError> {
1235        let now = Utc::now();
1236        let mut ages = self
1237            .store
1238            .scan(1)?
1239            .into_iter()
1240            .filter_map(|stored| match stored.event {
1241                EvolutionEvent::MutationDeclared { mutation }
1242                    if exclude_mutation_id != Some(mutation.intent.id.as_str()) =>
1243                {
1244                    Self::seconds_since_timestamp(&stored.timestamp, now)
1245                }
1246                _ => None,
1247            })
1248            .collect::<Vec<_>>();
1249        ages.sort_unstable();
1250        Ok(ages)
1251    }
1252
1253    fn seconds_since_timestamp(timestamp: &str, now: DateTime<Utc>) -> Option<u64> {
1254        let parsed = DateTime::parse_from_rfc3339(timestamp)
1255            .ok()?
1256            .with_timezone(&Utc);
1257        let elapsed = now.signed_duration_since(parsed);
1258        if elapsed < Duration::zero() {
1259            Some(0)
1260        } else {
1261            u64::try_from(elapsed.num_seconds()).ok()
1262        }
1263    }
1264
1265    pub fn new(
1266        kernel: Arc<Kernel<S>>,
1267        sandbox: Arc<dyn Sandbox>,
1268        validator: Arc<dyn Validator>,
1269        store: Arc<dyn EvolutionStore>,
1270    ) -> Self {
1271        let selector: Arc<dyn Selector> = Arc::new(StoreBackedSelector::new(store.clone()));
1272        Self {
1273            kernel,
1274            sandbox,
1275            validator,
1276            store,
1277            selector,
1278            governor: Arc::new(DefaultGovernor::default()),
1279            economics: Arc::new(Mutex::new(EvuLedger::default())),
1280            remote_publishers: Arc::new(Mutex::new(BTreeMap::new())),
1281            stake_policy: StakePolicy::default(),
1282            sandbox_policy: SandboxPolicy::oris_default(),
1283            validation_plan: ValidationPlan::oris_default(),
1284        }
1285    }
1286
1287    pub fn with_selector(mut self, selector: Arc<dyn Selector>) -> Self {
1288        self.selector = selector;
1289        self
1290    }
1291
1292    pub fn with_sandbox_policy(mut self, policy: SandboxPolicy) -> Self {
1293        self.sandbox_policy = policy;
1294        self
1295    }
1296
1297    pub fn with_governor(mut self, governor: Arc<dyn Governor>) -> Self {
1298        self.governor = governor;
1299        self
1300    }
1301
1302    pub fn with_economics(mut self, economics: Arc<Mutex<EvuLedger>>) -> Self {
1303        self.economics = economics;
1304        self
1305    }
1306
1307    pub fn with_stake_policy(mut self, policy: StakePolicy) -> Self {
1308        self.stake_policy = policy;
1309        self
1310    }
1311
1312    pub fn with_validation_plan(mut self, plan: ValidationPlan) -> Self {
1313        self.validation_plan = plan;
1314        self
1315    }
1316
1317    pub fn select_candidates(&self, input: &SelectorInput) -> Vec<GeneCandidate> {
1318        let executor = StoreReplayExecutor {
1319            sandbox: self.sandbox.clone(),
1320            validator: self.validator.clone(),
1321            store: self.store.clone(),
1322            selector: self.selector.clone(),
1323            governor: self.governor.clone(),
1324            economics: Some(self.economics.clone()),
1325            remote_publishers: Some(self.remote_publishers.clone()),
1326            stake_policy: self.stake_policy.clone(),
1327        };
1328        executor.collect_replay_candidates(input).candidates
1329    }
1330
1331    pub fn bootstrap_if_empty(&self, run_id: &RunId) -> Result<BootstrapReport, EvoKernelError> {
1332        let projection = projection_snapshot(self.store.as_ref())?;
1333        if !projection.genes.is_empty() {
1334            return Ok(BootstrapReport::default());
1335        }
1336
1337        let templates = built_in_seed_templates();
1338        for template in &templates {
1339            let mutation = build_seed_mutation(template);
1340            let extracted = extract_seed_signals(template);
1341            let gene = build_bootstrap_gene(template, &extracted)
1342                .map_err(|err| EvoKernelError::Validation(err.to_string()))?;
1343            let capsule = build_bootstrap_capsule(run_id, template, &mutation, &gene)
1344                .map_err(|err| EvoKernelError::Validation(err.to_string()))?;
1345
1346            self.store
1347                .append_event(EvolutionEvent::MutationDeclared {
1348                    mutation: mutation.clone(),
1349                })
1350                .map_err(store_err)?;
1351            self.store
1352                .append_event(EvolutionEvent::SignalsExtracted {
1353                    mutation_id: mutation.intent.id.clone(),
1354                    hash: extracted.hash.clone(),
1355                    signals: extracted.values.clone(),
1356                })
1357                .map_err(store_err)?;
1358            self.store
1359                .append_event(EvolutionEvent::GeneProjected { gene: gene.clone() })
1360                .map_err(store_err)?;
1361            self.store
1362                .append_event(EvolutionEvent::PromotionEvaluated {
1363                    gene_id: gene.id.clone(),
1364                    state: AssetState::Quarantined,
1365                    reason: "bootstrap seeds require local validation before replay".into(),
1366                })
1367                .map_err(store_err)?;
1368            self.store
1369                .append_event(EvolutionEvent::CapsuleCommitted {
1370                    capsule: capsule.clone(),
1371                })
1372                .map_err(store_err)?;
1373            self.store
1374                .append_event(EvolutionEvent::CapsuleQuarantined {
1375                    capsule_id: capsule.id,
1376                })
1377                .map_err(store_err)?;
1378        }
1379
1380        Ok(BootstrapReport {
1381            seeded: true,
1382            genes_added: templates.len(),
1383            capsules_added: templates.len(),
1384        })
1385    }
1386
1387    pub async fn capture_successful_mutation(
1388        &self,
1389        run_id: &RunId,
1390        mutation: PreparedMutation,
1391    ) -> Result<Capsule, EvoKernelError> {
1392        Ok(self
1393            .capture_mutation_with_governor(run_id, mutation)
1394            .await?
1395            .capsule)
1396    }
1397
1398    pub async fn capture_mutation_with_governor(
1399        &self,
1400        run_id: &RunId,
1401        mutation: PreparedMutation,
1402    ) -> Result<CaptureOutcome, EvoKernelError> {
1403        self.store
1404            .append_event(EvolutionEvent::MutationDeclared {
1405                mutation: mutation.clone(),
1406            })
1407            .map_err(store_err)?;
1408
1409        let receipt = match self.sandbox.apply(&mutation, &self.sandbox_policy).await {
1410            Ok(receipt) => receipt,
1411            Err(err) => {
1412                self.store
1413                    .append_event(EvolutionEvent::MutationRejected {
1414                        mutation_id: mutation.intent.id.clone(),
1415                        reason: err.to_string(),
1416                    })
1417                    .map_err(store_err)?;
1418                return Err(EvoKernelError::Sandbox(err.to_string()));
1419            }
1420        };
1421
1422        self.store
1423            .append_event(EvolutionEvent::MutationApplied {
1424                mutation_id: mutation.intent.id.clone(),
1425                patch_hash: receipt.patch_hash.clone(),
1426                changed_files: receipt
1427                    .changed_files
1428                    .iter()
1429                    .map(|path| path.to_string_lossy().to_string())
1430                    .collect(),
1431            })
1432            .map_err(store_err)?;
1433
1434        let report = self
1435            .validator
1436            .run(&receipt, &self.validation_plan)
1437            .await
1438            .map_err(|err| EvoKernelError::Validation(err.to_string()))?;
1439        if !report.success {
1440            self.store
1441                .append_event(EvolutionEvent::ValidationFailed {
1442                    mutation_id: mutation.intent.id.clone(),
1443                    report: report.to_snapshot(&self.validation_plan.profile),
1444                    gene_id: None,
1445                })
1446                .map_err(store_err)?;
1447            return Err(EvoKernelError::ValidationFailed(report));
1448        }
1449
1450        self.store
1451            .append_event(EvolutionEvent::ValidationPassed {
1452                mutation_id: mutation.intent.id.clone(),
1453                report: report.to_snapshot(&self.validation_plan.profile),
1454                gene_id: None,
1455            })
1456            .map_err(store_err)?;
1457
1458        let extracted_signals = extract_deterministic_signals(&SignalExtractionInput {
1459            patch_diff: mutation.artifact.payload.clone(),
1460            intent: mutation.intent.intent.clone(),
1461            expected_effect: mutation.intent.expected_effect.clone(),
1462            declared_signals: mutation.intent.signals.clone(),
1463            changed_files: receipt
1464                .changed_files
1465                .iter()
1466                .map(|path| path.to_string_lossy().to_string())
1467                .collect(),
1468            validation_success: report.success,
1469            validation_logs: report.logs.clone(),
1470            stage_outputs: report
1471                .stages
1472                .iter()
1473                .flat_map(|stage| [stage.stdout.clone(), stage.stderr.clone()])
1474                .filter(|value| !value.is_empty())
1475                .collect(),
1476        });
1477        self.store
1478            .append_event(EvolutionEvent::SignalsExtracted {
1479                mutation_id: mutation.intent.id.clone(),
1480                hash: extracted_signals.hash.clone(),
1481                signals: extracted_signals.values.clone(),
1482            })
1483            .map_err(store_err)?;
1484
1485        let projection = projection_snapshot(self.store.as_ref())?;
1486        let blast_radius = compute_blast_radius(&mutation.artifact.payload);
1487        let recent_mutation_ages_secs = self
1488            .recent_prior_mutation_ages_secs(Some(mutation.intent.id.as_str()))
1489            .map_err(store_err)?;
1490        let mut gene = derive_gene(
1491            &mutation,
1492            &receipt,
1493            &self.validation_plan.profile,
1494            &extracted_signals.values,
1495        );
1496        let (current_confidence, historical_peak_confidence, confidence_last_updated_secs) =
1497            StoreReplayExecutor::confidence_context(&projection, &gene.id);
1498        let success_count = projection
1499            .genes
1500            .iter()
1501            .find(|existing| existing.id == gene.id)
1502            .map(|existing| {
1503                projection
1504                    .capsules
1505                    .iter()
1506                    .filter(|capsule| capsule.gene_id == existing.id)
1507                    .count() as u64
1508            })
1509            .unwrap_or(0)
1510            + 1;
1511        let governor_decision = self.governor.evaluate(GovernorInput {
1512            candidate_source: CandidateSource::Local,
1513            success_count,
1514            blast_radius: blast_radius.clone(),
1515            replay_failures: 0,
1516            recent_mutation_ages_secs,
1517            current_confidence,
1518            historical_peak_confidence,
1519            confidence_last_updated_secs,
1520        });
1521
1522        gene.state = governor_decision.target_state.clone();
1523        self.store
1524            .append_event(EvolutionEvent::GeneProjected { gene: gene.clone() })
1525            .map_err(store_err)?;
1526        self.store
1527            .append_event(EvolutionEvent::PromotionEvaluated {
1528                gene_id: gene.id.clone(),
1529                state: governor_decision.target_state.clone(),
1530                reason: governor_decision.reason.clone(),
1531            })
1532            .map_err(store_err)?;
1533        if matches!(governor_decision.target_state, AssetState::Promoted) {
1534            self.store
1535                .append_event(EvolutionEvent::GenePromoted {
1536                    gene_id: gene.id.clone(),
1537                })
1538                .map_err(store_err)?;
1539        }
1540        if matches!(governor_decision.target_state, AssetState::Revoked) {
1541            self.store
1542                .append_event(EvolutionEvent::GeneRevoked {
1543                    gene_id: gene.id.clone(),
1544                    reason: governor_decision.reason.clone(),
1545                })
1546                .map_err(store_err)?;
1547        }
1548        if let Some(spec_id) = &mutation.intent.spec_id {
1549            self.store
1550                .append_event(EvolutionEvent::SpecLinked {
1551                    mutation_id: mutation.intent.id.clone(),
1552                    spec_id: spec_id.clone(),
1553                })
1554                .map_err(store_err)?;
1555        }
1556
1557        let mut capsule = build_capsule(
1558            run_id,
1559            &mutation,
1560            &receipt,
1561            &report,
1562            &self.validation_plan.profile,
1563            &gene,
1564            &blast_radius,
1565        )
1566        .map_err(|err| EvoKernelError::Validation(err.to_string()))?;
1567        capsule.state = governor_decision.target_state.clone();
1568        self.store
1569            .append_event(EvolutionEvent::CapsuleCommitted {
1570                capsule: capsule.clone(),
1571            })
1572            .map_err(store_err)?;
1573        if matches!(governor_decision.target_state, AssetState::Quarantined) {
1574            self.store
1575                .append_event(EvolutionEvent::CapsuleQuarantined {
1576                    capsule_id: capsule.id.clone(),
1577                })
1578                .map_err(store_err)?;
1579        }
1580
1581        Ok(CaptureOutcome {
1582            capsule,
1583            gene,
1584            governor_decision,
1585        })
1586    }
1587
1588    pub async fn capture_from_proposal(
1589        &self,
1590        run_id: &RunId,
1591        proposal: &AgentMutationProposal,
1592        diff_payload: String,
1593        base_revision: Option<String>,
1594    ) -> Result<CaptureOutcome, EvoKernelError> {
1595        let intent = MutationIntent {
1596            id: next_id("proposal"),
1597            intent: proposal.intent.clone(),
1598            target: MutationTarget::Paths {
1599                allow: proposal.files.clone(),
1600            },
1601            expected_effect: proposal.expected_effect.clone(),
1602            risk: RiskLevel::Low,
1603            signals: proposal.files.clone(),
1604            spec_id: None,
1605        };
1606        self.capture_mutation_with_governor(
1607            run_id,
1608            prepare_mutation(intent, diff_payload, base_revision),
1609        )
1610        .await
1611    }
1612
1613    pub fn feedback_for_agent(outcome: &CaptureOutcome) -> ExecutionFeedback {
1614        ExecutionFeedback {
1615            accepted: !matches!(outcome.governor_decision.target_state, AssetState::Revoked),
1616            asset_state: Some(format!("{:?}", outcome.governor_decision.target_state)),
1617            summary: outcome.governor_decision.reason.clone(),
1618        }
1619    }
1620
1621    pub fn coordinate(&self, plan: CoordinationPlan) -> CoordinationResult {
1622        MultiAgentCoordinator::new().coordinate(plan)
1623    }
1624
1625    pub fn export_promoted_assets(
1626        &self,
1627        sender_id: impl Into<String>,
1628    ) -> Result<EvolutionEnvelope, EvoKernelError> {
1629        let sender_id = sender_id.into();
1630        let envelope = export_promoted_assets_from_store(self.store.as_ref(), sender_id.clone())?;
1631        if !envelope.assets.is_empty() {
1632            let mut ledger = self
1633                .economics
1634                .lock()
1635                .map_err(|_| EvoKernelError::Validation("economics ledger lock poisoned".into()))?;
1636            if ledger
1637                .reserve_publish_stake(&sender_id, &self.stake_policy)
1638                .is_none()
1639            {
1640                return Err(EvoKernelError::Validation(
1641                    "insufficient EVU for remote publish".into(),
1642                ));
1643            }
1644        }
1645        Ok(envelope)
1646    }
1647
1648    pub fn import_remote_envelope(
1649        &self,
1650        envelope: &EvolutionEnvelope,
1651    ) -> Result<ImportOutcome, EvoKernelError> {
1652        import_remote_envelope_into_store(
1653            self.store.as_ref(),
1654            envelope,
1655            Some(self.remote_publishers.as_ref()),
1656        )
1657    }
1658
1659    pub fn fetch_assets(
1660        &self,
1661        responder_id: impl Into<String>,
1662        query: &FetchQuery,
1663    ) -> Result<FetchResponse, EvoKernelError> {
1664        fetch_assets_from_store(self.store.as_ref(), responder_id, query)
1665    }
1666
1667    pub fn revoke_assets(&self, notice: &RevokeNotice) -> Result<RevokeNotice, EvoKernelError> {
1668        revoke_assets_in_store(self.store.as_ref(), notice)
1669    }
1670
1671    pub async fn replay_or_fallback(
1672        &self,
1673        input: SelectorInput,
1674    ) -> Result<ReplayDecision, EvoKernelError> {
1675        let replay_run_id = next_id("replay");
1676        self.replay_or_fallback_for_run(&replay_run_id, input).await
1677    }
1678
1679    pub async fn replay_or_fallback_for_run(
1680        &self,
1681        run_id: &RunId,
1682        input: SelectorInput,
1683    ) -> Result<ReplayDecision, EvoKernelError> {
1684        let executor = StoreReplayExecutor {
1685            sandbox: self.sandbox.clone(),
1686            validator: self.validator.clone(),
1687            store: self.store.clone(),
1688            selector: self.selector.clone(),
1689            governor: self.governor.clone(),
1690            economics: Some(self.economics.clone()),
1691            remote_publishers: Some(self.remote_publishers.clone()),
1692            stake_policy: self.stake_policy.clone(),
1693        };
1694        executor
1695            .try_replay_for_run(run_id, &input, &self.sandbox_policy, &self.validation_plan)
1696            .await
1697            .map_err(|err| EvoKernelError::Validation(err.to_string()))
1698    }
1699
1700    pub fn economics_signal(&self, node_id: &str) -> Option<EconomicsSignal> {
1701        self.economics.lock().ok()?.governor_signal(node_id)
1702    }
1703
1704    pub fn selector_reputation_bias(&self) -> BTreeMap<String, f32> {
1705        self.economics
1706            .lock()
1707            .ok()
1708            .map(|locked| locked.selector_reputation_bias())
1709            .unwrap_or_default()
1710    }
1711
1712    pub fn metrics_snapshot(&self) -> Result<EvolutionMetricsSnapshot, EvoKernelError> {
1713        evolution_metrics_snapshot(self.store.as_ref())
1714    }
1715
1716    pub fn render_metrics_prometheus(&self) -> Result<String, EvoKernelError> {
1717        self.metrics_snapshot().map(|snapshot| {
1718            let health = evolution_health_snapshot(&snapshot);
1719            render_evolution_metrics_prometheus(&snapshot, &health)
1720        })
1721    }
1722
1723    pub fn health_snapshot(&self) -> Result<EvolutionHealthSnapshot, EvoKernelError> {
1724        self.metrics_snapshot()
1725            .map(|snapshot| evolution_health_snapshot(&snapshot))
1726    }
1727}
1728
1729pub fn prepare_mutation(
1730    intent: MutationIntent,
1731    diff_payload: String,
1732    base_revision: Option<String>,
1733) -> PreparedMutation {
1734    PreparedMutation {
1735        intent,
1736        artifact: MutationArtifact {
1737            encoding: ArtifactEncoding::UnifiedDiff,
1738            content_hash: compute_artifact_hash(&diff_payload),
1739            payload: diff_payload,
1740            base_revision,
1741        },
1742    }
1743}
1744
1745pub fn prepare_mutation_from_spec(
1746    plan: CompiledMutationPlan,
1747    diff_payload: String,
1748    base_revision: Option<String>,
1749) -> PreparedMutation {
1750    prepare_mutation(plan.mutation_intent, diff_payload, base_revision)
1751}
1752
1753pub fn default_evolution_store() -> Arc<dyn EvolutionStore> {
1754    Arc::new(oris_evolution::JsonlEvolutionStore::new(
1755        default_store_root(),
1756    ))
1757}
1758
1759fn built_in_seed_templates() -> Vec<SeedTemplate> {
1760    vec![
1761        SeedTemplate {
1762            id: "bootstrap-readme".into(),
1763            intent: "Seed a baseline README recovery pattern".into(),
1764            signals: vec!["bootstrap readme".into(), "missing readme".into()],
1765            diff_payload: "\
1766diff --git a/README.md b/README.md
1767new file mode 100644
1768index 0000000..1111111
1769--- /dev/null
1770+++ b/README.md
1771@@ -0,0 +1,3 @@
1772+# Oris
1773+Bootstrap documentation seed
1774+"
1775            .into(),
1776            validation_profile: "bootstrap-seed".into(),
1777        },
1778        SeedTemplate {
1779            id: "bootstrap-test-fix".into(),
1780            intent: "Seed a deterministic test stabilization pattern".into(),
1781            signals: vec!["bootstrap test fix".into(), "failing tests".into()],
1782            diff_payload: "\
1783diff --git a/src/lib.rs b/src/lib.rs
1784index 1111111..2222222 100644
1785--- a/src/lib.rs
1786+++ b/src/lib.rs
1787@@ -1 +1,2 @@
1788 pub fn demo() -> usize { 1 }
1789+pub fn normalize_test_output() -> bool { true }
1790"
1791            .into(),
1792            validation_profile: "bootstrap-seed".into(),
1793        },
1794        SeedTemplate {
1795            id: "bootstrap-refactor".into(),
1796            intent: "Seed a low-risk refactor capsule".into(),
1797            signals: vec!["bootstrap refactor".into(), "small refactor".into()],
1798            diff_payload: "\
1799diff --git a/src/lib.rs b/src/lib.rs
1800index 2222222..3333333 100644
1801--- a/src/lib.rs
1802+++ b/src/lib.rs
1803@@ -1 +1,3 @@
1804 pub fn demo() -> usize { 1 }
1805+
1806+fn extract_strategy_key(input: &str) -> &str { input }
1807"
1808            .into(),
1809            validation_profile: "bootstrap-seed".into(),
1810        },
1811        SeedTemplate {
1812            id: "bootstrap-logging".into(),
1813            intent: "Seed a baseline structured logging mutation".into(),
1814            signals: vec!["bootstrap logging".into(), "structured logs".into()],
1815            diff_payload: "\
1816diff --git a/src/lib.rs b/src/lib.rs
1817index 3333333..4444444 100644
1818--- a/src/lib.rs
1819+++ b/src/lib.rs
1820@@ -1 +1,3 @@
1821 pub fn demo() -> usize { 1 }
1822+
1823+fn emit_bootstrap_log() { println!(\"bootstrap-log\"); }
1824"
1825            .into(),
1826            validation_profile: "bootstrap-seed".into(),
1827        },
1828    ]
1829}
1830
1831fn build_seed_mutation(template: &SeedTemplate) -> PreparedMutation {
1832    let changed_files = seed_changed_files(&template.diff_payload);
1833    let target = if changed_files.is_empty() {
1834        MutationTarget::WorkspaceRoot
1835    } else {
1836        MutationTarget::Paths {
1837            allow: changed_files,
1838        }
1839    };
1840    prepare_mutation(
1841        MutationIntent {
1842            id: stable_hash_json(&("bootstrap-mutation", &template.id))
1843                .unwrap_or_else(|_| format!("bootstrap-mutation-{}", template.id)),
1844            intent: template.intent.clone(),
1845            target,
1846            expected_effect: format!("seed {}", template.id),
1847            risk: RiskLevel::Low,
1848            signals: template.signals.clone(),
1849            spec_id: None,
1850        },
1851        template.diff_payload.clone(),
1852        None,
1853    )
1854}
1855
1856fn extract_seed_signals(template: &SeedTemplate) -> SignalExtractionOutput {
1857    let mut signals = BTreeSet::new();
1858    for declared in &template.signals {
1859        if let Some(phrase) = normalize_signal_phrase(declared) {
1860            signals.insert(phrase);
1861        }
1862        extend_signal_tokens(&mut signals, declared);
1863    }
1864    extend_signal_tokens(&mut signals, &template.intent);
1865    extend_signal_tokens(&mut signals, &template.diff_payload);
1866    for changed_file in seed_changed_files(&template.diff_payload) {
1867        extend_signal_tokens(&mut signals, &changed_file);
1868    }
1869    let values = signals.into_iter().take(32).collect::<Vec<_>>();
1870    let hash =
1871        stable_hash_json(&values).unwrap_or_else(|_| compute_artifact_hash(&values.join("\n")));
1872    SignalExtractionOutput { values, hash }
1873}
1874
1875fn seed_changed_files(diff_payload: &str) -> Vec<String> {
1876    let mut changed_files = BTreeSet::new();
1877    for line in diff_payload.lines() {
1878        if let Some(path) = line.strip_prefix("+++ b/") {
1879            let normalized = path.trim();
1880            if !normalized.is_empty() {
1881                changed_files.insert(normalized.to_string());
1882            }
1883        }
1884    }
1885    changed_files.into_iter().collect()
1886}
1887
1888fn build_bootstrap_gene(
1889    template: &SeedTemplate,
1890    extracted: &SignalExtractionOutput,
1891) -> Result<Gene, EvolutionError> {
1892    let strategy = vec![template.id.clone(), "bootstrap".into()];
1893    let id = stable_hash_json(&(
1894        "bootstrap-gene",
1895        &template.id,
1896        &extracted.values,
1897        &template.validation_profile,
1898    ))?;
1899    Ok(Gene {
1900        id,
1901        signals: extracted.values.clone(),
1902        strategy,
1903        validation: vec![template.validation_profile.clone()],
1904        state: AssetState::Quarantined,
1905    })
1906}
1907
1908fn build_bootstrap_capsule(
1909    run_id: &RunId,
1910    template: &SeedTemplate,
1911    mutation: &PreparedMutation,
1912    gene: &Gene,
1913) -> Result<Capsule, EvolutionError> {
1914    let cwd = std::env::current_dir().unwrap_or_else(|_| Path::new(".").to_path_buf());
1915    let env = current_env_fingerprint(&cwd);
1916    let diff_hash = mutation.artifact.content_hash.clone();
1917    let changed_files = seed_changed_files(&template.diff_payload);
1918    let validator_hash = stable_hash_json(&(
1919        "bootstrap-validator",
1920        &template.id,
1921        &template.validation_profile,
1922        &diff_hash,
1923    ))?;
1924    let id = stable_hash_json(&(
1925        "bootstrap-capsule",
1926        &template.id,
1927        run_id,
1928        &gene.id,
1929        &diff_hash,
1930        &env,
1931    ))?;
1932    Ok(Capsule {
1933        id,
1934        gene_id: gene.id.clone(),
1935        mutation_id: mutation.intent.id.clone(),
1936        run_id: run_id.clone(),
1937        diff_hash,
1938        confidence: 0.0,
1939        env,
1940        outcome: Outcome {
1941            success: false,
1942            validation_profile: template.validation_profile.clone(),
1943            validation_duration_ms: 0,
1944            changed_files,
1945            validator_hash,
1946            lines_changed: compute_blast_radius(&template.diff_payload).lines_changed,
1947            replay_verified: false,
1948        },
1949        state: AssetState::Quarantined,
1950    })
1951}
1952
1953fn derive_gene(
1954    mutation: &PreparedMutation,
1955    receipt: &SandboxReceipt,
1956    validation_profile: &str,
1957    extracted_signals: &[String],
1958) -> Gene {
1959    let mut strategy = BTreeSet::new();
1960    for file in &receipt.changed_files {
1961        if let Some(component) = file.components().next() {
1962            strategy.insert(component.as_os_str().to_string_lossy().to_string());
1963        }
1964    }
1965    for token in mutation
1966        .artifact
1967        .payload
1968        .split(|ch: char| !ch.is_ascii_alphanumeric())
1969    {
1970        if token.len() == 5
1971            && token.starts_with('E')
1972            && token[1..].chars().all(|ch| ch.is_ascii_digit())
1973        {
1974            strategy.insert(token.to_string());
1975        }
1976    }
1977    for token in mutation.intent.intent.split_whitespace().take(8) {
1978        strategy.insert(token.to_ascii_lowercase());
1979    }
1980    let strategy = strategy.into_iter().collect::<Vec<_>>();
1981    let id = stable_hash_json(&(extracted_signals, &strategy, validation_profile))
1982        .unwrap_or_else(|_| next_id("gene"));
1983    Gene {
1984        id,
1985        signals: extracted_signals.to_vec(),
1986        strategy,
1987        validation: vec![validation_profile.to_string()],
1988        state: AssetState::Promoted,
1989    }
1990}
1991
1992fn build_capsule(
1993    run_id: &RunId,
1994    mutation: &PreparedMutation,
1995    receipt: &SandboxReceipt,
1996    report: &ValidationReport,
1997    validation_profile: &str,
1998    gene: &Gene,
1999    blast_radius: &BlastRadius,
2000) -> Result<Capsule, EvolutionError> {
2001    let env = current_env_fingerprint(&receipt.workdir);
2002    let validator_hash = stable_hash_json(report)?;
2003    let diff_hash = mutation.artifact.content_hash.clone();
2004    let id = stable_hash_json(&(run_id, &gene.id, &diff_hash, &mutation.intent.id))?;
2005    Ok(Capsule {
2006        id,
2007        gene_id: gene.id.clone(),
2008        mutation_id: mutation.intent.id.clone(),
2009        run_id: run_id.clone(),
2010        diff_hash,
2011        confidence: 0.7,
2012        env,
2013        outcome: oris_evolution::Outcome {
2014            success: true,
2015            validation_profile: validation_profile.to_string(),
2016            validation_duration_ms: report.duration_ms,
2017            changed_files: receipt
2018                .changed_files
2019                .iter()
2020                .map(|path| path.to_string_lossy().to_string())
2021                .collect(),
2022            validator_hash,
2023            lines_changed: blast_radius.lines_changed,
2024            replay_verified: false,
2025        },
2026        state: AssetState::Promoted,
2027    })
2028}
2029
2030fn current_env_fingerprint(workdir: &Path) -> EnvFingerprint {
2031    let rustc_version = Command::new("rustc")
2032        .arg("--version")
2033        .output()
2034        .ok()
2035        .filter(|output| output.status.success())
2036        .map(|output| String::from_utf8_lossy(&output.stdout).trim().to_string())
2037        .unwrap_or_else(|| "rustc unknown".into());
2038    let cargo_lock_hash = fs::read(workdir.join("Cargo.lock"))
2039        .ok()
2040        .map(|bytes| {
2041            let value = String::from_utf8_lossy(&bytes);
2042            compute_artifact_hash(&value)
2043        })
2044        .unwrap_or_else(|| "missing-cargo-lock".into());
2045    let target_triple = format!(
2046        "{}-unknown-{}",
2047        std::env::consts::ARCH,
2048        std::env::consts::OS
2049    );
2050    EnvFingerprint {
2051        rustc_version,
2052        cargo_lock_hash,
2053        target_triple,
2054        os: std::env::consts::OS.to_string(),
2055    }
2056}
2057
2058fn extend_signal_tokens(out: &mut BTreeSet<String>, input: &str) {
2059    for raw in input.split(|ch: char| !ch.is_ascii_alphanumeric()) {
2060        let trimmed = raw.trim();
2061        if trimmed.is_empty() {
2062            continue;
2063        }
2064        let normalized = if is_rust_error_code(trimmed) {
2065            let mut chars = trimmed.chars();
2066            let prefix = chars
2067                .next()
2068                .map(|ch| ch.to_ascii_uppercase())
2069                .unwrap_or('E');
2070            format!("{prefix}{}", chars.as_str())
2071        } else {
2072            trimmed.to_ascii_lowercase()
2073        };
2074        if normalized.len() < 3 {
2075            continue;
2076        }
2077        out.insert(normalized);
2078    }
2079}
2080
2081fn normalize_signal_phrase(input: &str) -> Option<String> {
2082    let normalized = input
2083        .split(|ch: char| !ch.is_ascii_alphanumeric())
2084        .filter_map(|raw| {
2085            let trimmed = raw.trim();
2086            if trimmed.is_empty() {
2087                return None;
2088            }
2089            let normalized = if is_rust_error_code(trimmed) {
2090                let mut chars = trimmed.chars();
2091                let prefix = chars
2092                    .next()
2093                    .map(|ch| ch.to_ascii_uppercase())
2094                    .unwrap_or('E');
2095                format!("{prefix}{}", chars.as_str())
2096            } else {
2097                trimmed.to_ascii_lowercase()
2098            };
2099            if normalized.len() < 3 {
2100                None
2101            } else {
2102                Some(normalized)
2103            }
2104        })
2105        .collect::<Vec<_>>()
2106        .join(" ");
2107    if normalized.is_empty() {
2108        None
2109    } else {
2110        Some(normalized)
2111    }
2112}
2113
2114fn is_rust_error_code(value: &str) -> bool {
2115    value.len() == 5
2116        && matches!(value.as_bytes().first(), Some(b'e') | Some(b'E'))
2117        && value[1..].chars().all(|ch| ch.is_ascii_digit())
2118}
2119
2120fn find_declared_mutation(
2121    store: &dyn EvolutionStore,
2122    mutation_id: &MutationId,
2123) -> Result<Option<PreparedMutation>, EvolutionError> {
2124    for stored in store.scan(1)? {
2125        if let EvolutionEvent::MutationDeclared { mutation } = stored.event {
2126            if &mutation.intent.id == mutation_id {
2127                return Ok(Some(mutation));
2128            }
2129        }
2130    }
2131    Ok(None)
2132}
2133
2134fn exact_match_candidates(store: &dyn EvolutionStore, input: &SelectorInput) -> Vec<GeneCandidate> {
2135    let Ok(projection) = projection_snapshot(store) else {
2136        return Vec::new();
2137    };
2138    let capsules = projection.capsules.clone();
2139    let spec_ids_by_gene = projection.spec_ids_by_gene.clone();
2140    let requested_spec_id = input
2141        .spec_id
2142        .as_deref()
2143        .map(str::trim)
2144        .filter(|value| !value.is_empty());
2145    let signal_set = input
2146        .signals
2147        .iter()
2148        .map(|signal| signal.to_ascii_lowercase())
2149        .collect::<BTreeSet<_>>();
2150    let mut candidates = projection
2151        .genes
2152        .into_iter()
2153        .filter_map(|gene| {
2154            if gene.state != AssetState::Promoted {
2155                return None;
2156            }
2157            if let Some(spec_id) = requested_spec_id {
2158                let matches_spec = spec_ids_by_gene
2159                    .get(&gene.id)
2160                    .map(|values| {
2161                        values
2162                            .iter()
2163                            .any(|value| value.eq_ignore_ascii_case(spec_id))
2164                    })
2165                    .unwrap_or(false);
2166                if !matches_spec {
2167                    return None;
2168                }
2169            }
2170            let gene_signals = gene
2171                .signals
2172                .iter()
2173                .map(|signal| signal.to_ascii_lowercase())
2174                .collect::<BTreeSet<_>>();
2175            if gene_signals == signal_set {
2176                let mut matched_capsules = capsules
2177                    .iter()
2178                    .filter(|capsule| {
2179                        capsule.gene_id == gene.id && capsule.state == AssetState::Promoted
2180                    })
2181                    .cloned()
2182                    .collect::<Vec<_>>();
2183                matched_capsules.sort_by(|left, right| {
2184                    replay_environment_match_factor(&input.env, &right.env)
2185                        .partial_cmp(&replay_environment_match_factor(&input.env, &left.env))
2186                        .unwrap_or(std::cmp::Ordering::Equal)
2187                        .then_with(|| {
2188                            right
2189                                .confidence
2190                                .partial_cmp(&left.confidence)
2191                                .unwrap_or(std::cmp::Ordering::Equal)
2192                        })
2193                        .then_with(|| left.id.cmp(&right.id))
2194                });
2195                if matched_capsules.is_empty() {
2196                    None
2197                } else {
2198                    let score = matched_capsules
2199                        .first()
2200                        .map(|capsule| replay_environment_match_factor(&input.env, &capsule.env))
2201                        .unwrap_or(0.0);
2202                    Some(GeneCandidate {
2203                        gene,
2204                        score,
2205                        capsules: matched_capsules,
2206                    })
2207                }
2208            } else {
2209                None
2210            }
2211        })
2212        .collect::<Vec<_>>();
2213    candidates.sort_by(|left, right| {
2214        right
2215            .score
2216            .partial_cmp(&left.score)
2217            .unwrap_or(std::cmp::Ordering::Equal)
2218            .then_with(|| left.gene.id.cmp(&right.gene.id))
2219    });
2220    candidates
2221}
2222
2223fn quarantined_remote_exact_match_candidates(
2224    store: &dyn EvolutionStore,
2225    input: &SelectorInput,
2226) -> Vec<GeneCandidate> {
2227    let remote_asset_ids = store
2228        .scan(1)
2229        .ok()
2230        .map(|events| {
2231            events
2232                .into_iter()
2233                .filter_map(|stored| match stored.event {
2234                    EvolutionEvent::RemoteAssetImported {
2235                        source: CandidateSource::Remote,
2236                        asset_ids,
2237                        ..
2238                    } => Some(asset_ids),
2239                    _ => None,
2240                })
2241                .flatten()
2242                .collect::<BTreeSet<_>>()
2243        })
2244        .unwrap_or_default();
2245    if remote_asset_ids.is_empty() {
2246        return Vec::new();
2247    }
2248
2249    let Ok(projection) = projection_snapshot(store) else {
2250        return Vec::new();
2251    };
2252    let capsules = projection.capsules.clone();
2253    let spec_ids_by_gene = projection.spec_ids_by_gene.clone();
2254    let requested_spec_id = input
2255        .spec_id
2256        .as_deref()
2257        .map(str::trim)
2258        .filter(|value| !value.is_empty());
2259    let normalized_signals = input
2260        .signals
2261        .iter()
2262        .filter_map(|signal| normalize_signal_phrase(signal))
2263        .collect::<BTreeSet<_>>()
2264        .into_iter()
2265        .collect::<Vec<_>>();
2266    if normalized_signals.is_empty() {
2267        return Vec::new();
2268    }
2269    let mut candidates = projection
2270        .genes
2271        .into_iter()
2272        .filter_map(|gene| {
2273            if !matches!(gene.state, AssetState::Promoted | AssetState::Quarantined) {
2274                return None;
2275            }
2276            if let Some(spec_id) = requested_spec_id {
2277                let matches_spec = spec_ids_by_gene
2278                    .get(&gene.id)
2279                    .map(|values| {
2280                        values
2281                            .iter()
2282                            .any(|value| value.eq_ignore_ascii_case(spec_id))
2283                    })
2284                    .unwrap_or(false);
2285                if !matches_spec {
2286                    return None;
2287                }
2288            }
2289            let normalized_gene_signals = gene
2290                .signals
2291                .iter()
2292                .filter_map(|candidate| normalize_signal_phrase(candidate))
2293                .collect::<Vec<_>>();
2294            let matched_query_count = normalized_signals
2295                .iter()
2296                .filter(|signal| {
2297                    normalized_gene_signals.iter().any(|candidate| {
2298                        candidate.contains(signal.as_str()) || signal.contains(candidate)
2299                    })
2300                })
2301                .count();
2302            if matched_query_count == 0 {
2303                return None;
2304            }
2305
2306            let mut matched_capsules = capsules
2307                .iter()
2308                .filter(|capsule| {
2309                    capsule.gene_id == gene.id
2310                        && capsule.state == AssetState::Quarantined
2311                        && remote_asset_ids.contains(&capsule.id)
2312                })
2313                .cloned()
2314                .collect::<Vec<_>>();
2315            matched_capsules.sort_by(|left, right| {
2316                replay_environment_match_factor(&input.env, &right.env)
2317                    .partial_cmp(&replay_environment_match_factor(&input.env, &left.env))
2318                    .unwrap_or(std::cmp::Ordering::Equal)
2319                    .then_with(|| {
2320                        right
2321                            .confidence
2322                            .partial_cmp(&left.confidence)
2323                            .unwrap_or(std::cmp::Ordering::Equal)
2324                    })
2325                    .then_with(|| left.id.cmp(&right.id))
2326            });
2327            if matched_capsules.is_empty() {
2328                None
2329            } else {
2330                let overlap = matched_query_count as f32 / normalized_signals.len() as f32;
2331                let env_score = matched_capsules
2332                    .first()
2333                    .map(|capsule| replay_environment_match_factor(&input.env, &capsule.env))
2334                    .unwrap_or(0.0);
2335                Some(GeneCandidate {
2336                    gene,
2337                    score: overlap.max(env_score),
2338                    capsules: matched_capsules,
2339                })
2340            }
2341        })
2342        .collect::<Vec<_>>();
2343    candidates.sort_by(|left, right| {
2344        right
2345            .score
2346            .partial_cmp(&left.score)
2347            .unwrap_or(std::cmp::Ordering::Equal)
2348            .then_with(|| left.gene.id.cmp(&right.gene.id))
2349    });
2350    candidates
2351}
2352
2353fn replay_environment_match_factor(input: &EnvFingerprint, candidate: &EnvFingerprint) -> f32 {
2354    let fields = [
2355        input
2356            .rustc_version
2357            .eq_ignore_ascii_case(&candidate.rustc_version),
2358        input
2359            .cargo_lock_hash
2360            .eq_ignore_ascii_case(&candidate.cargo_lock_hash),
2361        input
2362            .target_triple
2363            .eq_ignore_ascii_case(&candidate.target_triple),
2364        input.os.eq_ignore_ascii_case(&candidate.os),
2365    ];
2366    let matched_fields = fields.into_iter().filter(|matched| *matched).count() as f32;
2367    0.5 + ((matched_fields / 4.0) * 0.5)
2368}
2369
2370fn effective_candidate_score(
2371    candidate: &GeneCandidate,
2372    publishers_by_asset: &BTreeMap<String, String>,
2373    reputation_bias: &BTreeMap<String, f32>,
2374) -> f32 {
2375    let bias = candidate
2376        .capsules
2377        .first()
2378        .and_then(|capsule| publishers_by_asset.get(&capsule.id))
2379        .and_then(|publisher| reputation_bias.get(publisher))
2380        .copied()
2381        .unwrap_or(0.0)
2382        .clamp(0.0, 1.0);
2383    candidate.score * (1.0 + (bias * 0.1))
2384}
2385
2386fn export_promoted_assets_from_store(
2387    store: &dyn EvolutionStore,
2388    sender_id: impl Into<String>,
2389) -> Result<EvolutionEnvelope, EvoKernelError> {
2390    let (events, projection) = scan_projection(store)?;
2391    let genes = projection
2392        .genes
2393        .into_iter()
2394        .filter(|gene| gene.state == AssetState::Promoted)
2395        .collect::<Vec<_>>();
2396    let capsules = projection
2397        .capsules
2398        .into_iter()
2399        .filter(|capsule| capsule.state == AssetState::Promoted)
2400        .collect::<Vec<_>>();
2401    let assets = replay_export_assets(&events, genes, capsules);
2402    Ok(EvolutionEnvelope::publish(sender_id, assets))
2403}
2404
2405fn scan_projection(
2406    store: &dyn EvolutionStore,
2407) -> Result<(Vec<StoredEvolutionEvent>, EvolutionProjection), EvoKernelError> {
2408    store.scan_projection().map_err(store_err)
2409}
2410
2411fn projection_snapshot(store: &dyn EvolutionStore) -> Result<EvolutionProjection, EvoKernelError> {
2412    scan_projection(store).map(|(_, projection)| projection)
2413}
2414
2415fn replay_export_assets(
2416    events: &[StoredEvolutionEvent],
2417    genes: Vec<Gene>,
2418    capsules: Vec<Capsule>,
2419) -> Vec<NetworkAsset> {
2420    let mutation_ids = capsules
2421        .iter()
2422        .map(|capsule| capsule.mutation_id.clone())
2423        .collect::<BTreeSet<_>>();
2424    let mut assets = replay_export_events_for_mutations(events, &mutation_ids);
2425    for gene in genes {
2426        assets.push(NetworkAsset::Gene { gene });
2427    }
2428    for capsule in capsules {
2429        assets.push(NetworkAsset::Capsule { capsule });
2430    }
2431    assets
2432}
2433
2434fn replay_export_events_for_mutations(
2435    events: &[StoredEvolutionEvent],
2436    mutation_ids: &BTreeSet<String>,
2437) -> Vec<NetworkAsset> {
2438    if mutation_ids.is_empty() {
2439        return Vec::new();
2440    }
2441
2442    let mut assets = Vec::new();
2443    let mut seen_mutations = BTreeSet::new();
2444    let mut seen_spec_links = BTreeSet::new();
2445    for stored in events {
2446        match &stored.event {
2447            EvolutionEvent::MutationDeclared { mutation }
2448                if mutation_ids.contains(mutation.intent.id.as_str())
2449                    && seen_mutations.insert(mutation.intent.id.clone()) =>
2450            {
2451                assets.push(NetworkAsset::EvolutionEvent {
2452                    event: EvolutionEvent::MutationDeclared {
2453                        mutation: mutation.clone(),
2454                    },
2455                });
2456            }
2457            EvolutionEvent::SpecLinked {
2458                mutation_id,
2459                spec_id,
2460            } if mutation_ids.contains(mutation_id.as_str())
2461                && seen_spec_links.insert((mutation_id.clone(), spec_id.clone())) =>
2462            {
2463                assets.push(NetworkAsset::EvolutionEvent {
2464                    event: EvolutionEvent::SpecLinked {
2465                        mutation_id: mutation_id.clone(),
2466                        spec_id: spec_id.clone(),
2467                    },
2468                });
2469            }
2470            _ => {}
2471        }
2472    }
2473
2474    assets
2475}
2476
2477fn import_remote_envelope_into_store(
2478    store: &dyn EvolutionStore,
2479    envelope: &EvolutionEnvelope,
2480    remote_publishers: Option<&Mutex<BTreeMap<String, String>>>,
2481) -> Result<ImportOutcome, EvoKernelError> {
2482    if !envelope.verify_content_hash() {
2483        return Err(EvoKernelError::Validation(
2484            "invalid evolution envelope hash".into(),
2485        ));
2486    }
2487
2488    let sender_id = normalized_sender_id(&envelope.sender_id);
2489    let (events, projection) = scan_projection(store)?;
2490    let mut known_gene_ids = projection
2491        .genes
2492        .into_iter()
2493        .map(|gene| gene.id)
2494        .collect::<BTreeSet<_>>();
2495    let mut known_capsule_ids = projection
2496        .capsules
2497        .into_iter()
2498        .map(|capsule| capsule.id)
2499        .collect::<BTreeSet<_>>();
2500    let mut known_mutation_ids = BTreeSet::new();
2501    let mut known_spec_links = BTreeSet::new();
2502    for stored in &events {
2503        match &stored.event {
2504            EvolutionEvent::MutationDeclared { mutation } => {
2505                known_mutation_ids.insert(mutation.intent.id.clone());
2506            }
2507            EvolutionEvent::SpecLinked {
2508                mutation_id,
2509                spec_id,
2510            } => {
2511                known_spec_links.insert((mutation_id.clone(), spec_id.clone()));
2512            }
2513            _ => {}
2514        }
2515    }
2516    let mut imported_asset_ids = Vec::new();
2517    for asset in &envelope.assets {
2518        match asset {
2519            NetworkAsset::Gene { gene } => {
2520                if !known_gene_ids.insert(gene.id.clone()) {
2521                    continue;
2522                }
2523                imported_asset_ids.push(gene.id.clone());
2524                let mut quarantined_gene = gene.clone();
2525                quarantined_gene.state = AssetState::Quarantined;
2526                store
2527                    .append_event(EvolutionEvent::RemoteAssetImported {
2528                        source: CandidateSource::Remote,
2529                        asset_ids: vec![gene.id.clone()],
2530                        sender_id: sender_id.clone(),
2531                    })
2532                    .map_err(store_err)?;
2533                store
2534                    .append_event(EvolutionEvent::GeneProjected {
2535                        gene: quarantined_gene.clone(),
2536                    })
2537                    .map_err(store_err)?;
2538                record_remote_publisher_for_asset(remote_publishers, &envelope.sender_id, asset);
2539                store
2540                    .append_event(EvolutionEvent::PromotionEvaluated {
2541                        gene_id: quarantined_gene.id,
2542                        state: AssetState::Quarantined,
2543                        reason: "remote asset requires local validation before promotion".into(),
2544                    })
2545                    .map_err(store_err)?;
2546            }
2547            NetworkAsset::Capsule { capsule } => {
2548                if !known_capsule_ids.insert(capsule.id.clone()) {
2549                    continue;
2550                }
2551                imported_asset_ids.push(capsule.id.clone());
2552                store
2553                    .append_event(EvolutionEvent::RemoteAssetImported {
2554                        source: CandidateSource::Remote,
2555                        asset_ids: vec![capsule.id.clone()],
2556                        sender_id: sender_id.clone(),
2557                    })
2558                    .map_err(store_err)?;
2559                let mut quarantined = capsule.clone();
2560                quarantined.state = AssetState::Quarantined;
2561                store
2562                    .append_event(EvolutionEvent::CapsuleCommitted {
2563                        capsule: quarantined.clone(),
2564                    })
2565                    .map_err(store_err)?;
2566                record_remote_publisher_for_asset(remote_publishers, &envelope.sender_id, asset);
2567                store
2568                    .append_event(EvolutionEvent::CapsuleQuarantined {
2569                        capsule_id: quarantined.id,
2570                    })
2571                    .map_err(store_err)?;
2572            }
2573            NetworkAsset::EvolutionEvent { event } => {
2574                let should_append = match event {
2575                    EvolutionEvent::MutationDeclared { mutation } => {
2576                        known_mutation_ids.insert(mutation.intent.id.clone())
2577                    }
2578                    EvolutionEvent::SpecLinked {
2579                        mutation_id,
2580                        spec_id,
2581                    } => known_spec_links.insert((mutation_id.clone(), spec_id.clone())),
2582                    _ if should_import_remote_event(event) => true,
2583                    _ => false,
2584                };
2585                if should_append {
2586                    store.append_event(event.clone()).map_err(store_err)?;
2587                }
2588            }
2589        }
2590    }
2591
2592    Ok(ImportOutcome {
2593        imported_asset_ids,
2594        accepted: true,
2595    })
2596}
2597
2598fn normalized_sender_id(sender_id: &str) -> Option<String> {
2599    let trimmed = sender_id.trim();
2600    if trimmed.is_empty() {
2601        None
2602    } else {
2603        Some(trimmed.to_string())
2604    }
2605}
2606
2607fn record_remote_publisher_for_asset(
2608    remote_publishers: Option<&Mutex<BTreeMap<String, String>>>,
2609    sender_id: &str,
2610    asset: &NetworkAsset,
2611) {
2612    let Some(remote_publishers) = remote_publishers else {
2613        return;
2614    };
2615    let sender_id = sender_id.trim();
2616    if sender_id.is_empty() {
2617        return;
2618    }
2619    let Ok(mut publishers) = remote_publishers.lock() else {
2620        return;
2621    };
2622    match asset {
2623        NetworkAsset::Gene { gene } => {
2624            publishers.insert(gene.id.clone(), sender_id.to_string());
2625        }
2626        NetworkAsset::Capsule { capsule } => {
2627            publishers.insert(capsule.id.clone(), sender_id.to_string());
2628        }
2629        NetworkAsset::EvolutionEvent { .. } => {}
2630    }
2631}
2632
2633fn remote_publishers_by_asset_from_store(store: &dyn EvolutionStore) -> BTreeMap<String, String> {
2634    let Ok(events) = store.scan(1) else {
2635        return BTreeMap::new();
2636    };
2637    remote_publishers_by_asset_from_events(&events)
2638}
2639
2640fn remote_publishers_by_asset_from_events(
2641    events: &[StoredEvolutionEvent],
2642) -> BTreeMap<String, String> {
2643    let mut imported_asset_publishers = BTreeMap::<String, String>::new();
2644    let mut known_gene_ids = BTreeSet::<String>::new();
2645    let mut known_capsule_ids = BTreeSet::<String>::new();
2646    let mut publishers_by_asset = BTreeMap::<String, String>::new();
2647
2648    for stored in events {
2649        match &stored.event {
2650            EvolutionEvent::RemoteAssetImported {
2651                source: CandidateSource::Remote,
2652                asset_ids,
2653                sender_id,
2654            } => {
2655                let Some(sender_id) = sender_id.as_deref().and_then(normalized_sender_id) else {
2656                    continue;
2657                };
2658                for asset_id in asset_ids {
2659                    imported_asset_publishers.insert(asset_id.clone(), sender_id.clone());
2660                    if known_gene_ids.contains(asset_id) || known_capsule_ids.contains(asset_id) {
2661                        publishers_by_asset.insert(asset_id.clone(), sender_id.clone());
2662                    }
2663                }
2664            }
2665            EvolutionEvent::GeneProjected { gene } => {
2666                known_gene_ids.insert(gene.id.clone());
2667                if let Some(sender_id) = imported_asset_publishers.get(&gene.id) {
2668                    publishers_by_asset.insert(gene.id.clone(), sender_id.clone());
2669                }
2670            }
2671            EvolutionEvent::CapsuleCommitted { capsule } => {
2672                known_capsule_ids.insert(capsule.id.clone());
2673                if let Some(sender_id) = imported_asset_publishers.get(&capsule.id) {
2674                    publishers_by_asset.insert(capsule.id.clone(), sender_id.clone());
2675                }
2676            }
2677            _ => {}
2678        }
2679    }
2680
2681    publishers_by_asset
2682}
2683
2684fn should_import_remote_event(event: &EvolutionEvent) -> bool {
2685    matches!(
2686        event,
2687        EvolutionEvent::MutationDeclared { .. } | EvolutionEvent::SpecLinked { .. }
2688    )
2689}
2690
2691fn fetch_assets_from_store(
2692    store: &dyn EvolutionStore,
2693    responder_id: impl Into<String>,
2694    query: &FetchQuery,
2695) -> Result<FetchResponse, EvoKernelError> {
2696    let (events, projection) = scan_projection(store)?;
2697    let normalized_signals: Vec<String> = query
2698        .signals
2699        .iter()
2700        .map(|signal| signal.trim().to_ascii_lowercase())
2701        .filter(|signal| !signal.is_empty())
2702        .collect();
2703    let matches_any_signal = |candidate: &str| {
2704        if normalized_signals.is_empty() {
2705            return true;
2706        }
2707        let candidate = candidate.to_ascii_lowercase();
2708        normalized_signals
2709            .iter()
2710            .any(|signal| candidate.contains(signal) || signal.contains(&candidate))
2711    };
2712
2713    let matched_genes: Vec<Gene> = projection
2714        .genes
2715        .into_iter()
2716        .filter(|gene| gene.state == AssetState::Promoted)
2717        .filter(|gene| gene.signals.iter().any(|signal| matches_any_signal(signal)))
2718        .collect();
2719    let matched_gene_ids: BTreeSet<String> =
2720        matched_genes.iter().map(|gene| gene.id.clone()).collect();
2721    let matched_capsules: Vec<Capsule> = projection
2722        .capsules
2723        .into_iter()
2724        .filter(|capsule| capsule.state == AssetState::Promoted)
2725        .filter(|capsule| matched_gene_ids.contains(&capsule.gene_id))
2726        .collect();
2727
2728    let assets = replay_export_assets(&events, matched_genes, matched_capsules);
2729
2730    Ok(FetchResponse {
2731        sender_id: responder_id.into(),
2732        assets,
2733    })
2734}
2735
2736fn revoke_assets_in_store(
2737    store: &dyn EvolutionStore,
2738    notice: &RevokeNotice,
2739) -> Result<RevokeNotice, EvoKernelError> {
2740    let projection = projection_snapshot(store)?;
2741    let requested: BTreeSet<String> = notice
2742        .asset_ids
2743        .iter()
2744        .map(|asset_id| asset_id.trim().to_string())
2745        .filter(|asset_id| !asset_id.is_empty())
2746        .collect();
2747    let mut revoked_gene_ids = BTreeSet::new();
2748    let mut quarantined_capsule_ids = BTreeSet::new();
2749
2750    for gene in &projection.genes {
2751        if requested.contains(&gene.id) {
2752            revoked_gene_ids.insert(gene.id.clone());
2753        }
2754    }
2755    for capsule in &projection.capsules {
2756        if requested.contains(&capsule.id) {
2757            quarantined_capsule_ids.insert(capsule.id.clone());
2758            revoked_gene_ids.insert(capsule.gene_id.clone());
2759        }
2760    }
2761    for capsule in &projection.capsules {
2762        if revoked_gene_ids.contains(&capsule.gene_id) {
2763            quarantined_capsule_ids.insert(capsule.id.clone());
2764        }
2765    }
2766
2767    for gene_id in &revoked_gene_ids {
2768        store
2769            .append_event(EvolutionEvent::GeneRevoked {
2770                gene_id: gene_id.clone(),
2771                reason: notice.reason.clone(),
2772            })
2773            .map_err(store_err)?;
2774    }
2775    for capsule_id in &quarantined_capsule_ids {
2776        store
2777            .append_event(EvolutionEvent::CapsuleQuarantined {
2778                capsule_id: capsule_id.clone(),
2779            })
2780            .map_err(store_err)?;
2781    }
2782
2783    let mut affected_ids: Vec<String> = revoked_gene_ids.into_iter().collect();
2784    affected_ids.extend(quarantined_capsule_ids);
2785    affected_ids.sort();
2786    affected_ids.dedup();
2787
2788    Ok(RevokeNotice {
2789        sender_id: notice.sender_id.clone(),
2790        asset_ids: affected_ids,
2791        reason: notice.reason.clone(),
2792    })
2793}
2794
2795fn evolution_metrics_snapshot(
2796    store: &dyn EvolutionStore,
2797) -> Result<EvolutionMetricsSnapshot, EvoKernelError> {
2798    let (events, projection) = scan_projection(store)?;
2799    let replay_success_total = events
2800        .iter()
2801        .filter(|stored| matches!(stored.event, EvolutionEvent::CapsuleReused { .. }))
2802        .count() as u64;
2803    let replay_failures_total = events
2804        .iter()
2805        .filter(|stored| is_replay_validation_failure(&stored.event))
2806        .count() as u64;
2807    let replay_attempts_total = replay_success_total + replay_failures_total;
2808    let mutation_declared_total = events
2809        .iter()
2810        .filter(|stored| matches!(stored.event, EvolutionEvent::MutationDeclared { .. }))
2811        .count() as u64;
2812    let promoted_mutations_total = events
2813        .iter()
2814        .filter(|stored| matches!(stored.event, EvolutionEvent::GenePromoted { .. }))
2815        .count() as u64;
2816    let gene_revocations_total = events
2817        .iter()
2818        .filter(|stored| matches!(stored.event, EvolutionEvent::GeneRevoked { .. }))
2819        .count() as u64;
2820    let cutoff = Utc::now() - Duration::hours(1);
2821    let mutation_velocity_last_hour = count_recent_events(&events, cutoff, |event| {
2822        matches!(event, EvolutionEvent::MutationDeclared { .. })
2823    });
2824    let revoke_frequency_last_hour = count_recent_events(&events, cutoff, |event| {
2825        matches!(event, EvolutionEvent::GeneRevoked { .. })
2826    });
2827    let promoted_genes = projection
2828        .genes
2829        .iter()
2830        .filter(|gene| gene.state == AssetState::Promoted)
2831        .count() as u64;
2832    let promoted_capsules = projection
2833        .capsules
2834        .iter()
2835        .filter(|capsule| capsule.state == AssetState::Promoted)
2836        .count() as u64;
2837
2838    Ok(EvolutionMetricsSnapshot {
2839        replay_attempts_total,
2840        replay_success_total,
2841        replay_success_rate: safe_ratio(replay_success_total, replay_attempts_total),
2842        mutation_declared_total,
2843        promoted_mutations_total,
2844        promotion_ratio: safe_ratio(promoted_mutations_total, mutation_declared_total),
2845        gene_revocations_total,
2846        mutation_velocity_last_hour,
2847        revoke_frequency_last_hour,
2848        promoted_genes,
2849        promoted_capsules,
2850        last_event_seq: events.last().map(|stored| stored.seq).unwrap_or(0),
2851    })
2852}
2853
2854fn evolution_health_snapshot(snapshot: &EvolutionMetricsSnapshot) -> EvolutionHealthSnapshot {
2855    EvolutionHealthSnapshot {
2856        status: "ok".into(),
2857        last_event_seq: snapshot.last_event_seq,
2858        promoted_genes: snapshot.promoted_genes,
2859        promoted_capsules: snapshot.promoted_capsules,
2860    }
2861}
2862
2863fn render_evolution_metrics_prometheus(
2864    snapshot: &EvolutionMetricsSnapshot,
2865    health: &EvolutionHealthSnapshot,
2866) -> String {
2867    let mut out = String::new();
2868    out.push_str(
2869        "# HELP oris_evolution_replay_attempts_total Total replay attempts that reached validation.\n",
2870    );
2871    out.push_str("# TYPE oris_evolution_replay_attempts_total counter\n");
2872    out.push_str(&format!(
2873        "oris_evolution_replay_attempts_total {}\n",
2874        snapshot.replay_attempts_total
2875    ));
2876    out.push_str("# HELP oris_evolution_replay_success_total Total replay attempts that reused a capsule successfully.\n");
2877    out.push_str("# TYPE oris_evolution_replay_success_total counter\n");
2878    out.push_str(&format!(
2879        "oris_evolution_replay_success_total {}\n",
2880        snapshot.replay_success_total
2881    ));
2882    out.push_str("# HELP oris_evolution_replay_success_rate Successful replay attempts divided by replay attempts that reached validation.\n");
2883    out.push_str("# TYPE oris_evolution_replay_success_rate gauge\n");
2884    out.push_str(&format!(
2885        "oris_evolution_replay_success_rate {:.6}\n",
2886        snapshot.replay_success_rate
2887    ));
2888    out.push_str(
2889        "# HELP oris_evolution_mutation_declared_total Total declared mutations recorded in the evolution log.\n",
2890    );
2891    out.push_str("# TYPE oris_evolution_mutation_declared_total counter\n");
2892    out.push_str(&format!(
2893        "oris_evolution_mutation_declared_total {}\n",
2894        snapshot.mutation_declared_total
2895    ));
2896    out.push_str("# HELP oris_evolution_promoted_mutations_total Total mutations promoted by the governor.\n");
2897    out.push_str("# TYPE oris_evolution_promoted_mutations_total counter\n");
2898    out.push_str(&format!(
2899        "oris_evolution_promoted_mutations_total {}\n",
2900        snapshot.promoted_mutations_total
2901    ));
2902    out.push_str(
2903        "# HELP oris_evolution_promotion_ratio Promoted mutations divided by declared mutations.\n",
2904    );
2905    out.push_str("# TYPE oris_evolution_promotion_ratio gauge\n");
2906    out.push_str(&format!(
2907        "oris_evolution_promotion_ratio {:.6}\n",
2908        snapshot.promotion_ratio
2909    ));
2910    out.push_str("# HELP oris_evolution_gene_revocations_total Total gene revocations recorded in the evolution log.\n");
2911    out.push_str("# TYPE oris_evolution_gene_revocations_total counter\n");
2912    out.push_str(&format!(
2913        "oris_evolution_gene_revocations_total {}\n",
2914        snapshot.gene_revocations_total
2915    ));
2916    out.push_str("# HELP oris_evolution_mutation_velocity_last_hour Declared mutations observed in the last hour.\n");
2917    out.push_str("# TYPE oris_evolution_mutation_velocity_last_hour gauge\n");
2918    out.push_str(&format!(
2919        "oris_evolution_mutation_velocity_last_hour {}\n",
2920        snapshot.mutation_velocity_last_hour
2921    ));
2922    out.push_str("# HELP oris_evolution_revoke_frequency_last_hour Gene revocations observed in the last hour.\n");
2923    out.push_str("# TYPE oris_evolution_revoke_frequency_last_hour gauge\n");
2924    out.push_str(&format!(
2925        "oris_evolution_revoke_frequency_last_hour {}\n",
2926        snapshot.revoke_frequency_last_hour
2927    ));
2928    out.push_str("# HELP oris_evolution_promoted_genes Current promoted genes in the evolution projection.\n");
2929    out.push_str("# TYPE oris_evolution_promoted_genes gauge\n");
2930    out.push_str(&format!(
2931        "oris_evolution_promoted_genes {}\n",
2932        snapshot.promoted_genes
2933    ));
2934    out.push_str("# HELP oris_evolution_promoted_capsules Current promoted capsules in the evolution projection.\n");
2935    out.push_str("# TYPE oris_evolution_promoted_capsules gauge\n");
2936    out.push_str(&format!(
2937        "oris_evolution_promoted_capsules {}\n",
2938        snapshot.promoted_capsules
2939    ));
2940    out.push_str("# HELP oris_evolution_store_last_event_seq Last visible append-only evolution event sequence.\n");
2941    out.push_str("# TYPE oris_evolution_store_last_event_seq gauge\n");
2942    out.push_str(&format!(
2943        "oris_evolution_store_last_event_seq {}\n",
2944        snapshot.last_event_seq
2945    ));
2946    out.push_str(
2947        "# HELP oris_evolution_health Evolution observability store health (1 = healthy).\n",
2948    );
2949    out.push_str("# TYPE oris_evolution_health gauge\n");
2950    out.push_str(&format!(
2951        "oris_evolution_health {}\n",
2952        u8::from(health.status == "ok")
2953    ));
2954    out
2955}
2956
2957fn count_recent_events(
2958    events: &[StoredEvolutionEvent],
2959    cutoff: DateTime<Utc>,
2960    predicate: impl Fn(&EvolutionEvent) -> bool,
2961) -> u64 {
2962    events
2963        .iter()
2964        .filter(|stored| {
2965            predicate(&stored.event)
2966                && parse_event_timestamp(&stored.timestamp)
2967                    .map(|timestamp| timestamp >= cutoff)
2968                    .unwrap_or(false)
2969        })
2970        .count() as u64
2971}
2972
2973fn parse_event_timestamp(raw: &str) -> Option<DateTime<Utc>> {
2974    DateTime::parse_from_rfc3339(raw)
2975        .ok()
2976        .map(|parsed| parsed.with_timezone(&Utc))
2977}
2978
2979fn is_replay_validation_failure(event: &EvolutionEvent) -> bool {
2980    matches!(
2981        event,
2982        EvolutionEvent::ValidationFailed {
2983            gene_id: Some(_),
2984            ..
2985        }
2986    )
2987}
2988
2989fn safe_ratio(numerator: u64, denominator: u64) -> f64 {
2990    if denominator == 0 {
2991        0.0
2992    } else {
2993        numerator as f64 / denominator as f64
2994    }
2995}
2996
2997fn store_err(err: EvolutionError) -> EvoKernelError {
2998    EvoKernelError::Store(err.to_string())
2999}
3000
3001#[cfg(test)]
3002mod tests {
3003    use super::*;
3004    use oris_agent_contract::{
3005        AgentRole, CoordinationPlan, CoordinationPrimitive, CoordinationTask,
3006    };
3007    use oris_kernel::{
3008        AllowAllPolicy, InMemoryEventStore, KernelMode, KernelState, NoopActionExecutor,
3009        NoopStepFn, StateUpdatedOnlyReducer,
3010    };
3011    use serde::{Deserialize, Serialize};
3012
3013    #[derive(Clone, Debug, Default, Serialize, Deserialize)]
3014    struct TestState;
3015
3016    impl KernelState for TestState {
3017        fn version(&self) -> u32 {
3018            1
3019        }
3020    }
3021
3022    fn temp_workspace(name: &str) -> std::path::PathBuf {
3023        let root =
3024            std::env::temp_dir().join(format!("oris-evokernel-{name}-{}", std::process::id()));
3025        if root.exists() {
3026            fs::remove_dir_all(&root).unwrap();
3027        }
3028        fs::create_dir_all(root.join("src")).unwrap();
3029        fs::write(
3030            root.join("Cargo.toml"),
3031            "[package]\nname = \"sample\"\nversion = \"0.1.0\"\nedition = \"2021\"\n",
3032        )
3033        .unwrap();
3034        fs::write(root.join("Cargo.lock"), "# lock\n").unwrap();
3035        fs::write(root.join("src/lib.rs"), "pub fn demo() -> usize { 1 }\n").unwrap();
3036        root
3037    }
3038
3039    fn test_kernel() -> Arc<Kernel<TestState>> {
3040        Arc::new(Kernel::<TestState> {
3041            events: Box::new(InMemoryEventStore::new()),
3042            snaps: None,
3043            reducer: Box::new(StateUpdatedOnlyReducer),
3044            exec: Box::new(NoopActionExecutor),
3045            step: Box::new(NoopStepFn),
3046            policy: Box::new(AllowAllPolicy),
3047            effect_sink: None,
3048            mode: KernelMode::Normal,
3049        })
3050    }
3051
3052    fn lightweight_plan() -> ValidationPlan {
3053        ValidationPlan {
3054            profile: "test".into(),
3055            stages: vec![ValidationStage::Command {
3056                program: "git".into(),
3057                args: vec!["--version".into()],
3058                timeout_ms: 5_000,
3059            }],
3060        }
3061    }
3062
3063    fn sample_mutation() -> PreparedMutation {
3064        prepare_mutation(
3065            MutationIntent {
3066                id: "mutation-1".into(),
3067                intent: "add README".into(),
3068                target: MutationTarget::Paths {
3069                    allow: vec!["README.md".into()],
3070                },
3071                expected_effect: "repo still builds".into(),
3072                risk: RiskLevel::Low,
3073                signals: vec!["missing readme".into()],
3074                spec_id: None,
3075            },
3076            "\
3077diff --git a/README.md b/README.md
3078new file mode 100644
3079index 0000000..1111111
3080--- /dev/null
3081+++ b/README.md
3082@@ -0,0 +1 @@
3083+# sample
3084"
3085            .into(),
3086            Some("HEAD".into()),
3087        )
3088    }
3089
3090    fn base_sandbox_policy() -> SandboxPolicy {
3091        SandboxPolicy {
3092            allowed_programs: vec!["git".into()],
3093            max_duration_ms: 60_000,
3094            max_output_bytes: 1024 * 1024,
3095            denied_env_prefixes: Vec::new(),
3096        }
3097    }
3098
3099    fn command_validator() -> Arc<dyn Validator> {
3100        Arc::new(CommandValidator::new(base_sandbox_policy()))
3101    }
3102
3103    fn replay_input(signal: &str) -> SelectorInput {
3104        let rustc_version = std::process::Command::new("rustc")
3105            .arg("--version")
3106            .output()
3107            .ok()
3108            .filter(|output| output.status.success())
3109            .map(|output| String::from_utf8_lossy(&output.stdout).trim().to_string())
3110            .unwrap_or_else(|| "rustc unknown".into());
3111        SelectorInput {
3112            signals: vec![signal.into()],
3113            env: EnvFingerprint {
3114                rustc_version,
3115                cargo_lock_hash: compute_artifact_hash("# lock\n"),
3116                target_triple: format!(
3117                    "{}-unknown-{}",
3118                    std::env::consts::ARCH,
3119                    std::env::consts::OS
3120                ),
3121                os: std::env::consts::OS.into(),
3122            },
3123            spec_id: None,
3124            limit: 1,
3125        }
3126    }
3127
3128    fn build_test_evo_with_store(
3129        name: &str,
3130        run_id: &str,
3131        validator: Arc<dyn Validator>,
3132        store: Arc<dyn EvolutionStore>,
3133    ) -> EvoKernel<TestState> {
3134        let workspace = temp_workspace(name);
3135        let sandbox: Arc<dyn Sandbox> = Arc::new(oris_sandbox::LocalProcessSandbox::new(
3136            run_id,
3137            &workspace,
3138            std::env::temp_dir(),
3139        ));
3140        EvoKernel::new(test_kernel(), sandbox, validator, store)
3141            .with_governor(Arc::new(DefaultGovernor::new(
3142                oris_governor::GovernorConfig {
3143                    promote_after_successes: 1,
3144                    ..Default::default()
3145                },
3146            )))
3147            .with_validation_plan(lightweight_plan())
3148            .with_sandbox_policy(base_sandbox_policy())
3149    }
3150
3151    fn build_test_evo(
3152        name: &str,
3153        run_id: &str,
3154        validator: Arc<dyn Validator>,
3155    ) -> (EvoKernel<TestState>, Arc<dyn EvolutionStore>) {
3156        let store_root = std::env::temp_dir().join(format!(
3157            "oris-evokernel-{name}-store-{}",
3158            std::process::id()
3159        ));
3160        if store_root.exists() {
3161            fs::remove_dir_all(&store_root).unwrap();
3162        }
3163        let store: Arc<dyn EvolutionStore> =
3164            Arc::new(oris_evolution::JsonlEvolutionStore::new(&store_root));
3165        let evo = build_test_evo_with_store(name, run_id, validator, store.clone());
3166        (evo, store)
3167    }
3168
3169    fn remote_publish_envelope(
3170        sender_id: &str,
3171        run_id: &str,
3172        gene_id: &str,
3173        capsule_id: &str,
3174        mutation_id: &str,
3175        signal: &str,
3176        file_name: &str,
3177        line: &str,
3178    ) -> EvolutionEnvelope {
3179        remote_publish_envelope_with_env(
3180            sender_id,
3181            run_id,
3182            gene_id,
3183            capsule_id,
3184            mutation_id,
3185            signal,
3186            file_name,
3187            line,
3188            replay_input(signal).env,
3189        )
3190    }
3191
3192    fn remote_publish_envelope_with_env(
3193        sender_id: &str,
3194        run_id: &str,
3195        gene_id: &str,
3196        capsule_id: &str,
3197        mutation_id: &str,
3198        signal: &str,
3199        file_name: &str,
3200        line: &str,
3201        env: EnvFingerprint,
3202    ) -> EvolutionEnvelope {
3203        let mutation = prepare_mutation(
3204            MutationIntent {
3205                id: mutation_id.into(),
3206                intent: format!("add {file_name}"),
3207                target: MutationTarget::Paths {
3208                    allow: vec![file_name.into()],
3209                },
3210                expected_effect: "replay should still validate".into(),
3211                risk: RiskLevel::Low,
3212                signals: vec![signal.into()],
3213                spec_id: None,
3214            },
3215            format!(
3216                "\
3217diff --git a/{file_name} b/{file_name}
3218new file mode 100644
3219index 0000000..1111111
3220--- /dev/null
3221+++ b/{file_name}
3222@@ -0,0 +1 @@
3223+{line}
3224"
3225            ),
3226            Some("HEAD".into()),
3227        );
3228        let gene = Gene {
3229            id: gene_id.into(),
3230            signals: vec![signal.into()],
3231            strategy: vec![file_name.into()],
3232            validation: vec!["test".into()],
3233            state: AssetState::Promoted,
3234        };
3235        let capsule = Capsule {
3236            id: capsule_id.into(),
3237            gene_id: gene_id.into(),
3238            mutation_id: mutation_id.into(),
3239            run_id: run_id.into(),
3240            diff_hash: mutation.artifact.content_hash.clone(),
3241            confidence: 0.9,
3242            env,
3243            outcome: Outcome {
3244                success: true,
3245                validation_profile: "test".into(),
3246                validation_duration_ms: 1,
3247                changed_files: vec![file_name.into()],
3248                validator_hash: "validator-hash".into(),
3249                lines_changed: 1,
3250                replay_verified: false,
3251            },
3252            state: AssetState::Promoted,
3253        };
3254        EvolutionEnvelope::publish(
3255            sender_id,
3256            vec![
3257                NetworkAsset::EvolutionEvent {
3258                    event: EvolutionEvent::MutationDeclared { mutation },
3259                },
3260                NetworkAsset::Gene { gene: gene.clone() },
3261                NetworkAsset::Capsule {
3262                    capsule: capsule.clone(),
3263                },
3264                NetworkAsset::EvolutionEvent {
3265                    event: EvolutionEvent::CapsuleReleased {
3266                        capsule_id: capsule.id.clone(),
3267                        state: AssetState::Promoted,
3268                    },
3269                },
3270            ],
3271        )
3272    }
3273
3274    fn remote_publish_envelope_with_signals(
3275        sender_id: &str,
3276        run_id: &str,
3277        gene_id: &str,
3278        capsule_id: &str,
3279        mutation_id: &str,
3280        mutation_signals: Vec<String>,
3281        gene_signals: Vec<String>,
3282        file_name: &str,
3283        line: &str,
3284        env: EnvFingerprint,
3285    ) -> EvolutionEnvelope {
3286        let mutation = prepare_mutation(
3287            MutationIntent {
3288                id: mutation_id.into(),
3289                intent: format!("add {file_name}"),
3290                target: MutationTarget::Paths {
3291                    allow: vec![file_name.into()],
3292                },
3293                expected_effect: "replay should still validate".into(),
3294                risk: RiskLevel::Low,
3295                signals: mutation_signals,
3296                spec_id: None,
3297            },
3298            format!(
3299                "\
3300diff --git a/{file_name} b/{file_name}
3301new file mode 100644
3302index 0000000..1111111
3303--- /dev/null
3304+++ b/{file_name}
3305@@ -0,0 +1 @@
3306+{line}
3307"
3308            ),
3309            Some("HEAD".into()),
3310        );
3311        let gene = Gene {
3312            id: gene_id.into(),
3313            signals: gene_signals,
3314            strategy: vec![file_name.into()],
3315            validation: vec!["test".into()],
3316            state: AssetState::Promoted,
3317        };
3318        let capsule = Capsule {
3319            id: capsule_id.into(),
3320            gene_id: gene_id.into(),
3321            mutation_id: mutation_id.into(),
3322            run_id: run_id.into(),
3323            diff_hash: mutation.artifact.content_hash.clone(),
3324            confidence: 0.9,
3325            env,
3326            outcome: Outcome {
3327                success: true,
3328                validation_profile: "test".into(),
3329                validation_duration_ms: 1,
3330                changed_files: vec![file_name.into()],
3331                validator_hash: "validator-hash".into(),
3332                lines_changed: 1,
3333                replay_verified: false,
3334            },
3335            state: AssetState::Promoted,
3336        };
3337        EvolutionEnvelope::publish(
3338            sender_id,
3339            vec![
3340                NetworkAsset::EvolutionEvent {
3341                    event: EvolutionEvent::MutationDeclared { mutation },
3342                },
3343                NetworkAsset::Gene { gene: gene.clone() },
3344                NetworkAsset::Capsule {
3345                    capsule: capsule.clone(),
3346                },
3347                NetworkAsset::EvolutionEvent {
3348                    event: EvolutionEvent::CapsuleReleased {
3349                        capsule_id: capsule.id.clone(),
3350                        state: AssetState::Promoted,
3351                    },
3352                },
3353            ],
3354        )
3355    }
3356
3357    struct FixedValidator {
3358        success: bool,
3359    }
3360
3361    #[async_trait]
3362    impl Validator for FixedValidator {
3363        async fn run(
3364            &self,
3365            _receipt: &SandboxReceipt,
3366            plan: &ValidationPlan,
3367        ) -> Result<ValidationReport, ValidationError> {
3368            Ok(ValidationReport {
3369                success: self.success,
3370                duration_ms: 1,
3371                stages: Vec::new(),
3372                logs: if self.success {
3373                    format!("{} ok", plan.profile)
3374                } else {
3375                    format!("{} failed", plan.profile)
3376                },
3377            })
3378        }
3379    }
3380
3381    struct FailOnAppendStore {
3382        inner: JsonlEvolutionStore,
3383        fail_on_call: usize,
3384        call_count: Mutex<usize>,
3385    }
3386
3387    impl FailOnAppendStore {
3388        fn new(root_dir: std::path::PathBuf, fail_on_call: usize) -> Self {
3389            Self {
3390                inner: JsonlEvolutionStore::new(root_dir),
3391                fail_on_call,
3392                call_count: Mutex::new(0),
3393            }
3394        }
3395    }
3396
3397    impl EvolutionStore for FailOnAppendStore {
3398        fn append_event(&self, event: EvolutionEvent) -> Result<u64, EvolutionError> {
3399            let mut call_count = self
3400                .call_count
3401                .lock()
3402                .map_err(|_| EvolutionError::Io("test store lock poisoned".into()))?;
3403            *call_count += 1;
3404            if *call_count == self.fail_on_call {
3405                return Err(EvolutionError::Io("injected append failure".into()));
3406            }
3407            self.inner.append_event(event)
3408        }
3409
3410        fn scan(&self, from_seq: u64) -> Result<Vec<StoredEvolutionEvent>, EvolutionError> {
3411            self.inner.scan(from_seq)
3412        }
3413
3414        fn rebuild_projection(&self) -> Result<EvolutionProjection, EvolutionError> {
3415            self.inner.rebuild_projection()
3416        }
3417    }
3418
3419    #[test]
3420    fn coordination_planner_to_coder_handoff_is_deterministic() {
3421        let result = MultiAgentCoordinator::new().coordinate(CoordinationPlan {
3422            root_goal: "ship feature".into(),
3423            primitive: CoordinationPrimitive::Sequential,
3424            tasks: vec![
3425                CoordinationTask {
3426                    id: "planner".into(),
3427                    role: AgentRole::Planner,
3428                    description: "split the work".into(),
3429                    depends_on: Vec::new(),
3430                },
3431                CoordinationTask {
3432                    id: "coder".into(),
3433                    role: AgentRole::Coder,
3434                    description: "implement the patch".into(),
3435                    depends_on: vec!["planner".into()],
3436                },
3437            ],
3438            timeout_ms: 5_000,
3439            max_retries: 0,
3440        });
3441
3442        assert_eq!(result.completed_tasks, vec!["planner", "coder"]);
3443        assert!(result.failed_tasks.is_empty());
3444        assert!(result.messages.iter().any(|message| {
3445            message.from_role == AgentRole::Planner
3446                && message.to_role == AgentRole::Coder
3447                && message.task_id == "coder"
3448        }));
3449    }
3450
3451    #[test]
3452    fn coordination_repair_runs_only_after_coder_failure() {
3453        let result = MultiAgentCoordinator::new().coordinate(CoordinationPlan {
3454            root_goal: "fix broken implementation".into(),
3455            primitive: CoordinationPrimitive::Sequential,
3456            tasks: vec![
3457                CoordinationTask {
3458                    id: "coder".into(),
3459                    role: AgentRole::Coder,
3460                    description: "force-fail initial implementation".into(),
3461                    depends_on: Vec::new(),
3462                },
3463                CoordinationTask {
3464                    id: "repair".into(),
3465                    role: AgentRole::Repair,
3466                    description: "patch the failed implementation".into(),
3467                    depends_on: vec!["coder".into()],
3468                },
3469            ],
3470            timeout_ms: 5_000,
3471            max_retries: 0,
3472        });
3473
3474        assert_eq!(result.completed_tasks, vec!["repair"]);
3475        assert_eq!(result.failed_tasks, vec!["coder"]);
3476        assert!(result.messages.iter().any(|message| {
3477            message.from_role == AgentRole::Coder
3478                && message.to_role == AgentRole::Repair
3479                && message.task_id == "repair"
3480        }));
3481    }
3482
3483    #[test]
3484    fn coordination_optimizer_runs_after_successful_implementation_step() {
3485        let result = MultiAgentCoordinator::new().coordinate(CoordinationPlan {
3486            root_goal: "ship optimized patch".into(),
3487            primitive: CoordinationPrimitive::Sequential,
3488            tasks: vec![
3489                CoordinationTask {
3490                    id: "coder".into(),
3491                    role: AgentRole::Coder,
3492                    description: "implement a working patch".into(),
3493                    depends_on: Vec::new(),
3494                },
3495                CoordinationTask {
3496                    id: "optimizer".into(),
3497                    role: AgentRole::Optimizer,
3498                    description: "tighten the implementation".into(),
3499                    depends_on: vec!["coder".into()],
3500                },
3501            ],
3502            timeout_ms: 5_000,
3503            max_retries: 0,
3504        });
3505
3506        assert_eq!(result.completed_tasks, vec!["coder", "optimizer"]);
3507        assert!(result.failed_tasks.is_empty());
3508    }
3509
3510    #[test]
3511    fn coordination_parallel_waves_preserve_sorted_merge_order() {
3512        let result = MultiAgentCoordinator::new().coordinate(CoordinationPlan {
3513            root_goal: "parallelize safe tasks".into(),
3514            primitive: CoordinationPrimitive::Parallel,
3515            tasks: vec![
3516                CoordinationTask {
3517                    id: "z-task".into(),
3518                    role: AgentRole::Planner,
3519                    description: "analyze z".into(),
3520                    depends_on: Vec::new(),
3521                },
3522                CoordinationTask {
3523                    id: "a-task".into(),
3524                    role: AgentRole::Coder,
3525                    description: "implement a".into(),
3526                    depends_on: Vec::new(),
3527                },
3528                CoordinationTask {
3529                    id: "mid-task".into(),
3530                    role: AgentRole::Optimizer,
3531                    description: "polish after both".into(),
3532                    depends_on: vec!["z-task".into(), "a-task".into()],
3533                },
3534            ],
3535            timeout_ms: 5_000,
3536            max_retries: 0,
3537        });
3538
3539        assert_eq!(result.completed_tasks, vec!["a-task", "z-task", "mid-task"]);
3540        assert!(result.failed_tasks.is_empty());
3541    }
3542
3543    #[test]
3544    fn coordination_retries_stop_at_max_retries() {
3545        let result = MultiAgentCoordinator::new().coordinate(CoordinationPlan {
3546            root_goal: "retry then stop".into(),
3547            primitive: CoordinationPrimitive::Sequential,
3548            tasks: vec![CoordinationTask {
3549                id: "coder".into(),
3550                role: AgentRole::Coder,
3551                description: "force-fail this task".into(),
3552                depends_on: Vec::new(),
3553            }],
3554            timeout_ms: 5_000,
3555            max_retries: 1,
3556        });
3557
3558        assert!(result.completed_tasks.is_empty());
3559        assert_eq!(result.failed_tasks, vec!["coder"]);
3560        assert_eq!(
3561            result
3562                .messages
3563                .iter()
3564                .filter(|message| message.task_id == "coder" && message.content.contains("failed"))
3565                .count(),
3566            2
3567        );
3568    }
3569
3570    #[test]
3571    fn coordination_conditional_mode_skips_downstream_tasks_on_failure() {
3572        let result = MultiAgentCoordinator::new().coordinate(CoordinationPlan {
3573            root_goal: "skip blocked follow-up work".into(),
3574            primitive: CoordinationPrimitive::Conditional,
3575            tasks: vec![
3576                CoordinationTask {
3577                    id: "coder".into(),
3578                    role: AgentRole::Coder,
3579                    description: "force-fail the implementation".into(),
3580                    depends_on: Vec::new(),
3581                },
3582                CoordinationTask {
3583                    id: "optimizer".into(),
3584                    role: AgentRole::Optimizer,
3585                    description: "only optimize a successful implementation".into(),
3586                    depends_on: vec!["coder".into()],
3587                },
3588            ],
3589            timeout_ms: 5_000,
3590            max_retries: 0,
3591        });
3592
3593        assert!(result.completed_tasks.is_empty());
3594        assert_eq!(result.failed_tasks, vec!["coder"]);
3595        assert!(result.messages.iter().any(|message| {
3596            message.task_id == "optimizer"
3597                && message
3598                    .content
3599                    .contains("skipped due to failed dependency chain")
3600        }));
3601        assert!(!result
3602            .failed_tasks
3603            .iter()
3604            .any(|task_id| task_id == "optimizer"));
3605    }
3606
3607    #[tokio::test]
3608    async fn command_validator_aggregates_stage_reports() {
3609        let workspace = temp_workspace("validator");
3610        let receipt = SandboxReceipt {
3611            mutation_id: "m".into(),
3612            workdir: workspace,
3613            applied: true,
3614            changed_files: Vec::new(),
3615            patch_hash: "hash".into(),
3616            stdout_log: std::env::temp_dir().join("stdout.log"),
3617            stderr_log: std::env::temp_dir().join("stderr.log"),
3618        };
3619        let validator = CommandValidator::new(SandboxPolicy {
3620            allowed_programs: vec!["git".into()],
3621            max_duration_ms: 1_000,
3622            max_output_bytes: 1024,
3623            denied_env_prefixes: Vec::new(),
3624        });
3625        let report = validator
3626            .run(
3627                &receipt,
3628                &ValidationPlan {
3629                    profile: "test".into(),
3630                    stages: vec![ValidationStage::Command {
3631                        program: "git".into(),
3632                        args: vec!["--version".into()],
3633                        timeout_ms: 1_000,
3634                    }],
3635                },
3636            )
3637            .await
3638            .unwrap();
3639        assert_eq!(report.stages.len(), 1);
3640    }
3641
3642    #[tokio::test]
3643    async fn capture_successful_mutation_appends_capsule() {
3644        let (evo, store) = build_test_evo("capture", "run-1", command_validator());
3645        let capsule = evo
3646            .capture_successful_mutation(&"run-1".into(), sample_mutation())
3647            .await
3648            .unwrap();
3649        let events = store.scan(1).unwrap();
3650        assert!(events
3651            .iter()
3652            .any(|stored| matches!(stored.event, EvolutionEvent::CapsuleCommitted { .. })));
3653        assert!(!capsule.id.is_empty());
3654    }
3655
3656    #[tokio::test]
3657    async fn replay_hit_records_capsule_reused() {
3658        let (evo, store) = build_test_evo("replay", "run-2", command_validator());
3659        let capsule = evo
3660            .capture_successful_mutation(&"run-2".into(), sample_mutation())
3661            .await
3662            .unwrap();
3663        let replay_run_id = "run-replay".to_string();
3664        let decision = evo
3665            .replay_or_fallback_for_run(&replay_run_id, replay_input("missing readme"))
3666            .await
3667            .unwrap();
3668        assert!(decision.used_capsule);
3669        assert_eq!(decision.capsule_id, Some(capsule.id));
3670        assert!(store.scan(1).unwrap().iter().any(|stored| matches!(
3671            &stored.event,
3672            EvolutionEvent::CapsuleReused {
3673                run_id,
3674                replay_run_id: Some(current_replay_run_id),
3675                ..
3676            } if run_id == "run-2" && current_replay_run_id == &replay_run_id
3677        )));
3678    }
3679
3680    #[tokio::test]
3681    async fn legacy_replay_executor_api_preserves_original_capsule_run_id() {
3682        let capture_run_id = "run-legacy-capture".to_string();
3683        let (evo, store) = build_test_evo("replay-legacy", &capture_run_id, command_validator());
3684        let capsule = evo
3685            .capture_successful_mutation(&capture_run_id, sample_mutation())
3686            .await
3687            .unwrap();
3688        let executor = StoreReplayExecutor {
3689            sandbox: evo.sandbox.clone(),
3690            validator: evo.validator.clone(),
3691            store: evo.store.clone(),
3692            selector: evo.selector.clone(),
3693            governor: evo.governor.clone(),
3694            economics: Some(evo.economics.clone()),
3695            remote_publishers: Some(evo.remote_publishers.clone()),
3696            stake_policy: evo.stake_policy.clone(),
3697        };
3698
3699        let decision = executor
3700            .try_replay(
3701                &replay_input("missing readme"),
3702                &evo.sandbox_policy,
3703                &evo.validation_plan,
3704            )
3705            .await
3706            .unwrap();
3707
3708        assert!(decision.used_capsule);
3709        assert_eq!(decision.capsule_id, Some(capsule.id));
3710        assert!(store.scan(1).unwrap().iter().any(|stored| matches!(
3711            &stored.event,
3712            EvolutionEvent::CapsuleReused {
3713                run_id,
3714                replay_run_id: None,
3715                ..
3716            } if run_id == &capture_run_id
3717        )));
3718    }
3719
3720    #[tokio::test]
3721    async fn metrics_snapshot_tracks_replay_promotion_and_revocation_signals() {
3722        let (evo, _) = build_test_evo("metrics", "run-metrics", command_validator());
3723        let capsule = evo
3724            .capture_successful_mutation(&"run-metrics".into(), sample_mutation())
3725            .await
3726            .unwrap();
3727        let decision = evo
3728            .replay_or_fallback(replay_input("missing readme"))
3729            .await
3730            .unwrap();
3731        assert!(decision.used_capsule);
3732
3733        evo.revoke_assets(&RevokeNotice {
3734            sender_id: "node-metrics".into(),
3735            asset_ids: vec![capsule.id.clone()],
3736            reason: "manual test revoke".into(),
3737        })
3738        .unwrap();
3739
3740        let snapshot = evo.metrics_snapshot().unwrap();
3741        assert_eq!(snapshot.replay_attempts_total, 1);
3742        assert_eq!(snapshot.replay_success_total, 1);
3743        assert_eq!(snapshot.replay_success_rate, 1.0);
3744        assert_eq!(snapshot.mutation_declared_total, 1);
3745        assert_eq!(snapshot.promoted_mutations_total, 1);
3746        assert_eq!(snapshot.promotion_ratio, 1.0);
3747        assert_eq!(snapshot.gene_revocations_total, 1);
3748        assert_eq!(snapshot.mutation_velocity_last_hour, 1);
3749        assert_eq!(snapshot.revoke_frequency_last_hour, 1);
3750        assert_eq!(snapshot.promoted_genes, 0);
3751        assert_eq!(snapshot.promoted_capsules, 0);
3752
3753        let rendered = evo.render_metrics_prometheus().unwrap();
3754        assert!(rendered.contains("oris_evolution_replay_success_rate 1.000000"));
3755        assert!(rendered.contains("oris_evolution_promotion_ratio 1.000000"));
3756        assert!(rendered.contains("oris_evolution_revoke_frequency_last_hour 1"));
3757        assert!(rendered.contains("oris_evolution_mutation_velocity_last_hour 1"));
3758        assert!(rendered.contains("oris_evolution_health 1"));
3759    }
3760
3761    #[tokio::test]
3762    async fn remote_replay_prefers_closest_environment_match() {
3763        let (evo, _) = build_test_evo("remote-env", "run-remote-env", command_validator());
3764        let input = replay_input("env-signal");
3765
3766        let envelope_a = remote_publish_envelope_with_env(
3767            "node-a",
3768            "run-remote-a",
3769            "gene-a",
3770            "capsule-a",
3771            "mutation-a",
3772            "env-signal",
3773            "A.md",
3774            "# from a",
3775            input.env.clone(),
3776        );
3777        let envelope_b = remote_publish_envelope_with_env(
3778            "node-b",
3779            "run-remote-b",
3780            "gene-b",
3781            "capsule-b",
3782            "mutation-b",
3783            "env-signal",
3784            "B.md",
3785            "# from b",
3786            EnvFingerprint {
3787                rustc_version: "old-rustc".into(),
3788                cargo_lock_hash: "other-lock".into(),
3789                target_triple: "aarch64-apple-darwin".into(),
3790                os: "linux".into(),
3791            },
3792        );
3793
3794        evo.import_remote_envelope(&envelope_a).unwrap();
3795        evo.import_remote_envelope(&envelope_b).unwrap();
3796
3797        let decision = evo.replay_or_fallback(input).await.unwrap();
3798
3799        assert!(decision.used_capsule);
3800        assert_eq!(decision.capsule_id, Some("capsule-a".into()));
3801        assert!(!decision.fallback_to_planner);
3802    }
3803
3804    #[test]
3805    fn remote_cold_start_scoring_caps_distinct_query_coverage() {
3806        let (evo, _) = build_test_evo("remote-score", "run-remote-score", command_validator());
3807        let input = replay_input("missing readme");
3808
3809        let exact = remote_publish_envelope_with_signals(
3810            "node-exact",
3811            "run-remote-exact",
3812            "gene-exact",
3813            "capsule-exact",
3814            "mutation-exact",
3815            vec!["missing readme".into()],
3816            vec!["missing readme".into()],
3817            "EXACT.md",
3818            "# exact",
3819            input.env.clone(),
3820        );
3821        let overlapping = remote_publish_envelope_with_signals(
3822            "node-overlap",
3823            "run-remote-overlap",
3824            "gene-overlap",
3825            "capsule-overlap",
3826            "mutation-overlap",
3827            vec!["missing readme".into()],
3828            vec!["missing".into(), "readme".into()],
3829            "OVERLAP.md",
3830            "# overlap",
3831            input.env.clone(),
3832        );
3833
3834        evo.import_remote_envelope(&exact).unwrap();
3835        evo.import_remote_envelope(&overlapping).unwrap();
3836
3837        let candidates = quarantined_remote_exact_match_candidates(evo.store.as_ref(), &input);
3838        let exact_candidate = candidates
3839            .iter()
3840            .find(|candidate| candidate.gene.id == "gene-exact")
3841            .unwrap();
3842        let overlap_candidate = candidates
3843            .iter()
3844            .find(|candidate| candidate.gene.id == "gene-overlap")
3845            .unwrap();
3846
3847        assert_eq!(exact_candidate.score, 1.0);
3848        assert_eq!(overlap_candidate.score, 1.0);
3849        assert!(candidates.iter().all(|candidate| candidate.score <= 1.0));
3850    }
3851
3852    #[test]
3853    fn exact_match_candidates_respect_spec_linked_events() {
3854        let (evo, _) = build_test_evo(
3855            "spec-linked-filter",
3856            "run-spec-linked-filter",
3857            command_validator(),
3858        );
3859        let mut input = replay_input("missing readme");
3860        input.spec_id = Some("spec-readme".into());
3861
3862        let mut mutation = sample_mutation();
3863        mutation.intent.id = "mutation-spec-linked".into();
3864        mutation.intent.spec_id = None;
3865        let gene = Gene {
3866            id: "gene-spec-linked".into(),
3867            signals: vec!["missing readme".into()],
3868            strategy: vec!["README.md".into()],
3869            validation: vec!["test".into()],
3870            state: AssetState::Promoted,
3871        };
3872        let capsule = Capsule {
3873            id: "capsule-spec-linked".into(),
3874            gene_id: gene.id.clone(),
3875            mutation_id: mutation.intent.id.clone(),
3876            run_id: "run-spec-linked".into(),
3877            diff_hash: mutation.artifact.content_hash.clone(),
3878            confidence: 0.9,
3879            env: input.env.clone(),
3880            outcome: Outcome {
3881                success: true,
3882                validation_profile: "test".into(),
3883                validation_duration_ms: 1,
3884                changed_files: vec!["README.md".into()],
3885                validator_hash: "validator-hash".into(),
3886                lines_changed: 1,
3887                replay_verified: false,
3888            },
3889            state: AssetState::Promoted,
3890        };
3891
3892        evo.store
3893            .append_event(EvolutionEvent::MutationDeclared { mutation })
3894            .unwrap();
3895        evo.store
3896            .append_event(EvolutionEvent::GeneProjected { gene })
3897            .unwrap();
3898        evo.store
3899            .append_event(EvolutionEvent::CapsuleCommitted { capsule })
3900            .unwrap();
3901        evo.store
3902            .append_event(EvolutionEvent::SpecLinked {
3903                mutation_id: "mutation-spec-linked".into(),
3904                spec_id: "spec-readme".into(),
3905            })
3906            .unwrap();
3907
3908        let candidates = exact_match_candidates(evo.store.as_ref(), &input);
3909        assert_eq!(candidates.len(), 1);
3910        assert_eq!(candidates[0].gene.id, "gene-spec-linked");
3911    }
3912
3913    #[tokio::test]
3914    async fn remote_capsule_stays_quarantined_until_first_successful_replay() {
3915        let (evo, store) = build_test_evo(
3916            "remote-quarantine",
3917            "run-remote-quarantine",
3918            command_validator(),
3919        );
3920        let envelope = remote_publish_envelope(
3921            "node-remote",
3922            "run-remote-quarantine",
3923            "gene-remote",
3924            "capsule-remote",
3925            "mutation-remote",
3926            "remote-signal",
3927            "REMOTE.md",
3928            "# from remote",
3929        );
3930
3931        evo.import_remote_envelope(&envelope).unwrap();
3932
3933        let before_replay = store.rebuild_projection().unwrap();
3934        let imported_gene = before_replay
3935            .genes
3936            .iter()
3937            .find(|gene| gene.id == "gene-remote")
3938            .unwrap();
3939        let imported_capsule = before_replay
3940            .capsules
3941            .iter()
3942            .find(|capsule| capsule.id == "capsule-remote")
3943            .unwrap();
3944        assert_eq!(imported_gene.state, AssetState::Quarantined);
3945        assert_eq!(imported_capsule.state, AssetState::Quarantined);
3946        let exported_before_replay =
3947            export_promoted_assets_from_store(store.as_ref(), "node-local").unwrap();
3948        assert!(exported_before_replay.assets.is_empty());
3949
3950        let decision = evo
3951            .replay_or_fallback(replay_input("remote-signal"))
3952            .await
3953            .unwrap();
3954
3955        assert!(decision.used_capsule);
3956        assert_eq!(decision.capsule_id, Some("capsule-remote".into()));
3957
3958        let after_replay = store.rebuild_projection().unwrap();
3959        let promoted_gene = after_replay
3960            .genes
3961            .iter()
3962            .find(|gene| gene.id == "gene-remote")
3963            .unwrap();
3964        let released_capsule = after_replay
3965            .capsules
3966            .iter()
3967            .find(|capsule| capsule.id == "capsule-remote")
3968            .unwrap();
3969        assert_eq!(promoted_gene.state, AssetState::Promoted);
3970        assert_eq!(released_capsule.state, AssetState::Promoted);
3971        let exported_after_replay =
3972            export_promoted_assets_from_store(store.as_ref(), "node-local").unwrap();
3973        assert_eq!(exported_after_replay.assets.len(), 3);
3974        assert!(exported_after_replay.assets.iter().any(|asset| matches!(
3975            asset,
3976            NetworkAsset::EvolutionEvent {
3977                event: EvolutionEvent::MutationDeclared { .. }
3978            }
3979        )));
3980    }
3981
3982    #[tokio::test]
3983    async fn publish_local_assets_include_mutation_payload_for_remote_replay() {
3984        let (source, source_store) = build_test_evo(
3985            "remote-publish-export",
3986            "run-remote-publish-export",
3987            command_validator(),
3988        );
3989        source
3990            .capture_successful_mutation(&"run-remote-publish-export".into(), sample_mutation())
3991            .await
3992            .unwrap();
3993        let envelope = EvolutionNetworkNode::new(source_store.clone())
3994            .publish_local_assets("node-source")
3995            .unwrap();
3996        assert!(envelope.assets.iter().any(|asset| matches!(
3997            asset,
3998            NetworkAsset::EvolutionEvent {
3999                event: EvolutionEvent::MutationDeclared { mutation }
4000            } if mutation.intent.id == "mutation-1"
4001        )));
4002
4003        let (remote, _) = build_test_evo(
4004            "remote-publish-import",
4005            "run-remote-publish-import",
4006            command_validator(),
4007        );
4008        remote.import_remote_envelope(&envelope).unwrap();
4009
4010        let decision = remote
4011            .replay_or_fallback(replay_input("missing readme"))
4012            .await
4013            .unwrap();
4014
4015        assert!(decision.used_capsule);
4016        assert!(!decision.fallback_to_planner);
4017    }
4018
4019    #[tokio::test]
4020    async fn fetch_assets_include_mutation_payload_for_remote_replay() {
4021        let (evo, store) = build_test_evo(
4022            "remote-fetch-export",
4023            "run-remote-fetch",
4024            command_validator(),
4025        );
4026        evo.capture_successful_mutation(&"run-remote-fetch".into(), sample_mutation())
4027            .await
4028            .unwrap();
4029
4030        let response = EvolutionNetworkNode::new(store.clone())
4031            .fetch_assets(
4032                "node-source",
4033                &FetchQuery {
4034                    sender_id: "node-client".into(),
4035                    signals: vec!["missing readme".into()],
4036                },
4037            )
4038            .unwrap();
4039
4040        assert!(response.assets.iter().any(|asset| matches!(
4041            asset,
4042            NetworkAsset::EvolutionEvent {
4043                event: EvolutionEvent::MutationDeclared { mutation }
4044            } if mutation.intent.id == "mutation-1"
4045        )));
4046        assert!(response
4047            .assets
4048            .iter()
4049            .any(|asset| matches!(asset, NetworkAsset::Gene { .. })));
4050        assert!(response
4051            .assets
4052            .iter()
4053            .any(|asset| matches!(asset, NetworkAsset::Capsule { .. })));
4054    }
4055
4056    #[test]
4057    fn partial_remote_import_keeps_publisher_for_already_imported_assets() {
4058        let store_root = std::env::temp_dir().join(format!(
4059            "oris-evokernel-remote-partial-store-{}",
4060            std::process::id()
4061        ));
4062        if store_root.exists() {
4063            fs::remove_dir_all(&store_root).unwrap();
4064        }
4065        let store: Arc<dyn EvolutionStore> = Arc::new(FailOnAppendStore::new(store_root, 4));
4066        let evo = build_test_evo_with_store(
4067            "remote-partial",
4068            "run-remote-partial",
4069            command_validator(),
4070            store.clone(),
4071        );
4072        let envelope = remote_publish_envelope(
4073            "node-partial",
4074            "run-remote-partial",
4075            "gene-partial",
4076            "capsule-partial",
4077            "mutation-partial",
4078            "partial-signal",
4079            "PARTIAL.md",
4080            "# partial",
4081        );
4082
4083        let result = evo.import_remote_envelope(&envelope);
4084
4085        assert!(matches!(result, Err(EvoKernelError::Store(_))));
4086        let projection = store.rebuild_projection().unwrap();
4087        assert!(projection
4088            .genes
4089            .iter()
4090            .any(|gene| gene.id == "gene-partial"));
4091        assert!(projection.capsules.is_empty());
4092        let publishers = evo.remote_publishers.lock().unwrap();
4093        assert_eq!(
4094            publishers.get("gene-partial").map(String::as_str),
4095            Some("node-partial")
4096        );
4097    }
4098
4099    #[test]
4100    fn retry_remote_import_after_partial_failure_only_imports_missing_assets() {
4101        let store_root = std::env::temp_dir().join(format!(
4102            "oris-evokernel-remote-partial-retry-store-{}",
4103            next_id("t")
4104        ));
4105        if store_root.exists() {
4106            fs::remove_dir_all(&store_root).unwrap();
4107        }
4108        let store: Arc<dyn EvolutionStore> = Arc::new(FailOnAppendStore::new(store_root, 4));
4109        let evo = build_test_evo_with_store(
4110            "remote-partial-retry",
4111            "run-remote-partial-retry",
4112            command_validator(),
4113            store.clone(),
4114        );
4115        let envelope = remote_publish_envelope(
4116            "node-partial",
4117            "run-remote-partial-retry",
4118            "gene-partial-retry",
4119            "capsule-partial-retry",
4120            "mutation-partial-retry",
4121            "partial-retry-signal",
4122            "PARTIAL_RETRY.md",
4123            "# partial retry",
4124        );
4125
4126        let first = evo.import_remote_envelope(&envelope);
4127        assert!(matches!(first, Err(EvoKernelError::Store(_))));
4128
4129        let retry = evo.import_remote_envelope(&envelope).unwrap();
4130
4131        assert_eq!(retry.imported_asset_ids, vec!["capsule-partial-retry"]);
4132        let projection = store.rebuild_projection().unwrap();
4133        let gene = projection
4134            .genes
4135            .iter()
4136            .find(|gene| gene.id == "gene-partial-retry")
4137            .unwrap();
4138        assert_eq!(gene.state, AssetState::Quarantined);
4139        let capsule = projection
4140            .capsules
4141            .iter()
4142            .find(|capsule| capsule.id == "capsule-partial-retry")
4143            .unwrap();
4144        assert_eq!(capsule.state, AssetState::Quarantined);
4145        assert_eq!(projection.attempt_counts["gene-partial-retry"], 1);
4146
4147        let events = store.scan(1).unwrap();
4148        assert_eq!(
4149            events
4150                .iter()
4151                .filter(|stored| {
4152                    matches!(
4153                        &stored.event,
4154                        EvolutionEvent::MutationDeclared { mutation }
4155                            if mutation.intent.id == "mutation-partial-retry"
4156                    )
4157                })
4158                .count(),
4159            1
4160        );
4161        assert_eq!(
4162            events
4163                .iter()
4164                .filter(|stored| {
4165                    matches!(
4166                        &stored.event,
4167                        EvolutionEvent::GeneProjected { gene } if gene.id == "gene-partial-retry"
4168                    )
4169                })
4170                .count(),
4171            1
4172        );
4173        assert_eq!(
4174            events
4175                .iter()
4176                .filter(|stored| {
4177                    matches!(
4178                        &stored.event,
4179                        EvolutionEvent::CapsuleCommitted { capsule }
4180                            if capsule.id == "capsule-partial-retry"
4181                    )
4182                })
4183                .count(),
4184            1
4185        );
4186    }
4187
4188    #[tokio::test]
4189    async fn duplicate_remote_import_does_not_requarantine_locally_validated_assets() {
4190        let (evo, store) = build_test_evo(
4191            "remote-idempotent",
4192            "run-remote-idempotent",
4193            command_validator(),
4194        );
4195        let envelope = remote_publish_envelope(
4196            "node-idempotent",
4197            "run-remote-idempotent",
4198            "gene-idempotent",
4199            "capsule-idempotent",
4200            "mutation-idempotent",
4201            "idempotent-signal",
4202            "IDEMPOTENT.md",
4203            "# idempotent",
4204        );
4205
4206        let first = evo.import_remote_envelope(&envelope).unwrap();
4207        assert_eq!(
4208            first.imported_asset_ids,
4209            vec!["gene-idempotent", "capsule-idempotent"]
4210        );
4211
4212        let decision = evo
4213            .replay_or_fallback(replay_input("idempotent-signal"))
4214            .await
4215            .unwrap();
4216        assert!(decision.used_capsule);
4217        assert_eq!(decision.capsule_id, Some("capsule-idempotent".into()));
4218
4219        let projection_before = store.rebuild_projection().unwrap();
4220        let attempts_before = projection_before.attempt_counts["gene-idempotent"];
4221        let gene_before = projection_before
4222            .genes
4223            .iter()
4224            .find(|gene| gene.id == "gene-idempotent")
4225            .unwrap();
4226        assert_eq!(gene_before.state, AssetState::Promoted);
4227        let capsule_before = projection_before
4228            .capsules
4229            .iter()
4230            .find(|capsule| capsule.id == "capsule-idempotent")
4231            .unwrap();
4232        assert_eq!(capsule_before.state, AssetState::Promoted);
4233
4234        let second = evo.import_remote_envelope(&envelope).unwrap();
4235        assert!(second.imported_asset_ids.is_empty());
4236
4237        let projection_after = store.rebuild_projection().unwrap();
4238        assert_eq!(
4239            projection_after.attempt_counts["gene-idempotent"],
4240            attempts_before
4241        );
4242        let gene_after = projection_after
4243            .genes
4244            .iter()
4245            .find(|gene| gene.id == "gene-idempotent")
4246            .unwrap();
4247        assert_eq!(gene_after.state, AssetState::Promoted);
4248        let capsule_after = projection_after
4249            .capsules
4250            .iter()
4251            .find(|capsule| capsule.id == "capsule-idempotent")
4252            .unwrap();
4253        assert_eq!(capsule_after.state, AssetState::Promoted);
4254
4255        let events = store.scan(1).unwrap();
4256        assert_eq!(
4257            events
4258                .iter()
4259                .filter(|stored| {
4260                    matches!(
4261                        &stored.event,
4262                        EvolutionEvent::MutationDeclared { mutation }
4263                            if mutation.intent.id == "mutation-idempotent"
4264                    )
4265                })
4266                .count(),
4267            1
4268        );
4269        assert_eq!(
4270            events
4271                .iter()
4272                .filter(|stored| {
4273                    matches!(
4274                        &stored.event,
4275                        EvolutionEvent::GeneProjected { gene } if gene.id == "gene-idempotent"
4276                    )
4277                })
4278                .count(),
4279            1
4280        );
4281        assert_eq!(
4282            events
4283                .iter()
4284                .filter(|stored| {
4285                    matches!(
4286                        &stored.event,
4287                        EvolutionEvent::CapsuleCommitted { capsule }
4288                            if capsule.id == "capsule-idempotent"
4289                    )
4290                })
4291                .count(),
4292            1
4293        );
4294    }
4295
4296    #[tokio::test]
4297    async fn insufficient_evu_blocks_publish_but_not_local_replay() {
4298        let (evo, _) = build_test_evo("stake-gate", "run-stake", command_validator());
4299        let capsule = evo
4300            .capture_successful_mutation(&"run-stake".into(), sample_mutation())
4301            .await
4302            .unwrap();
4303        let publish = evo.export_promoted_assets("node-local");
4304        assert!(matches!(publish, Err(EvoKernelError::Validation(_))));
4305
4306        let decision = evo
4307            .replay_or_fallback(replay_input("missing readme"))
4308            .await
4309            .unwrap();
4310        assert!(decision.used_capsule);
4311        assert_eq!(decision.capsule_id, Some(capsule.id));
4312    }
4313
4314    #[tokio::test]
4315    async fn second_replay_validation_failure_revokes_gene_immediately() {
4316        let (capturer, store) = build_test_evo("revoke-replay", "run-capture", command_validator());
4317        let capsule = capturer
4318            .capture_successful_mutation(&"run-capture".into(), sample_mutation())
4319            .await
4320            .unwrap();
4321
4322        let failing_validator: Arc<dyn Validator> = Arc::new(FixedValidator { success: false });
4323        let failing_replay = build_test_evo_with_store(
4324            "revoke-replay",
4325            "run-replay-fail",
4326            failing_validator,
4327            store.clone(),
4328        );
4329
4330        let first = failing_replay
4331            .replay_or_fallback(replay_input("missing readme"))
4332            .await
4333            .unwrap();
4334        let second = failing_replay
4335            .replay_or_fallback(replay_input("missing readme"))
4336            .await
4337            .unwrap();
4338
4339        assert!(!first.used_capsule);
4340        assert!(first.fallback_to_planner);
4341        assert!(!second.used_capsule);
4342        assert!(second.fallback_to_planner);
4343
4344        let projection = store.rebuild_projection().unwrap();
4345        let gene = projection
4346            .genes
4347            .iter()
4348            .find(|gene| gene.id == capsule.gene_id)
4349            .unwrap();
4350        assert_eq!(gene.state, AssetState::Promoted);
4351        let committed_capsule = projection
4352            .capsules
4353            .iter()
4354            .find(|current| current.id == capsule.id)
4355            .unwrap();
4356        assert_eq!(committed_capsule.state, AssetState::Promoted);
4357
4358        let events = store.scan(1).unwrap();
4359        assert_eq!(
4360            events
4361                .iter()
4362                .filter(|stored| {
4363                    matches!(
4364                        &stored.event,
4365                        EvolutionEvent::ValidationFailed {
4366                            gene_id: Some(gene_id),
4367                            ..
4368                        } if gene_id == &capsule.gene_id
4369                    )
4370                })
4371                .count(),
4372            1
4373        );
4374        assert!(!events.iter().any(|stored| {
4375            matches!(
4376                &stored.event,
4377                EvolutionEvent::GeneRevoked { gene_id, .. } if gene_id == &capsule.gene_id
4378            )
4379        }));
4380
4381        let recovered = build_test_evo_with_store(
4382            "revoke-replay",
4383            "run-replay-check",
4384            command_validator(),
4385            store.clone(),
4386        );
4387        let after_revoke = recovered
4388            .replay_or_fallback(replay_input("missing readme"))
4389            .await
4390            .unwrap();
4391        assert!(!after_revoke.used_capsule);
4392        assert!(after_revoke.fallback_to_planner);
4393        assert!(after_revoke.reason.contains("below replay threshold"));
4394    }
4395
4396    #[tokio::test]
4397    async fn remote_reuse_success_rewards_publisher_and_biases_selection() {
4398        let ledger = Arc::new(Mutex::new(EvuLedger {
4399            accounts: vec![],
4400            reputations: vec![
4401                oris_economics::ReputationRecord {
4402                    node_id: "node-a".into(),
4403                    publish_success_rate: 0.4,
4404                    validator_accuracy: 0.4,
4405                    reuse_impact: 0,
4406                },
4407                oris_economics::ReputationRecord {
4408                    node_id: "node-b".into(),
4409                    publish_success_rate: 0.95,
4410                    validator_accuracy: 0.95,
4411                    reuse_impact: 8,
4412                },
4413            ],
4414        }));
4415        let (evo, _) = build_test_evo("remote-success", "run-remote", command_validator());
4416        let evo = evo.with_economics(ledger.clone());
4417
4418        let envelope_a = remote_publish_envelope(
4419            "node-a",
4420            "run-remote-a",
4421            "gene-a",
4422            "capsule-a",
4423            "mutation-a",
4424            "shared-signal",
4425            "A.md",
4426            "# from a",
4427        );
4428        let envelope_b = remote_publish_envelope(
4429            "node-b",
4430            "run-remote-b",
4431            "gene-b",
4432            "capsule-b",
4433            "mutation-b",
4434            "shared-signal",
4435            "B.md",
4436            "# from b",
4437        );
4438
4439        evo.import_remote_envelope(&envelope_a).unwrap();
4440        evo.import_remote_envelope(&envelope_b).unwrap();
4441
4442        let decision = evo
4443            .replay_or_fallback(replay_input("shared-signal"))
4444            .await
4445            .unwrap();
4446
4447        assert!(decision.used_capsule);
4448        assert_eq!(decision.capsule_id, Some("capsule-b".into()));
4449        let locked = ledger.lock().unwrap();
4450        let rewarded = locked
4451            .accounts
4452            .iter()
4453            .find(|item| item.node_id == "node-b")
4454            .unwrap();
4455        assert_eq!(rewarded.balance, evo.stake_policy.reuse_reward);
4456        assert!(
4457            locked.selector_reputation_bias()["node-b"]
4458                > locked.selector_reputation_bias()["node-a"]
4459        );
4460    }
4461
4462    #[tokio::test]
4463    async fn remote_reuse_settlement_tracks_selected_capsule_publisher_for_shared_gene() {
4464        let ledger = Arc::new(Mutex::new(EvuLedger::default()));
4465        let (evo, _) = build_test_evo(
4466            "remote-shared-publisher",
4467            "run-remote-shared-publisher",
4468            command_validator(),
4469        );
4470        let evo = evo.with_economics(ledger.clone());
4471        let input = replay_input("shared-signal");
4472        let preferred = remote_publish_envelope_with_env(
4473            "node-a",
4474            "run-remote-a",
4475            "gene-shared",
4476            "capsule-preferred",
4477            "mutation-preferred",
4478            "shared-signal",
4479            "A.md",
4480            "# from a",
4481            input.env.clone(),
4482        );
4483        let fallback = remote_publish_envelope_with_env(
4484            "node-b",
4485            "run-remote-b",
4486            "gene-shared",
4487            "capsule-fallback",
4488            "mutation-fallback",
4489            "shared-signal",
4490            "B.md",
4491            "# from b",
4492            EnvFingerprint {
4493                rustc_version: "old-rustc".into(),
4494                cargo_lock_hash: "other-lock".into(),
4495                target_triple: "aarch64-apple-darwin".into(),
4496                os: "linux".into(),
4497            },
4498        );
4499
4500        evo.import_remote_envelope(&preferred).unwrap();
4501        evo.import_remote_envelope(&fallback).unwrap();
4502
4503        let decision = evo.replay_or_fallback(input).await.unwrap();
4504
4505        assert!(decision.used_capsule);
4506        assert_eq!(decision.capsule_id, Some("capsule-preferred".into()));
4507        let locked = ledger.lock().unwrap();
4508        let rewarded = locked
4509            .accounts
4510            .iter()
4511            .find(|item| item.node_id == "node-a")
4512            .unwrap();
4513        assert_eq!(rewarded.balance, evo.stake_policy.reuse_reward);
4514        assert!(locked.accounts.iter().all(|item| item.node_id != "node-b"));
4515    }
4516
4517    #[test]
4518    fn select_candidates_surfaces_ranked_remote_cold_start_candidates() {
4519        let ledger = Arc::new(Mutex::new(EvuLedger {
4520            accounts: vec![],
4521            reputations: vec![
4522                oris_economics::ReputationRecord {
4523                    node_id: "node-a".into(),
4524                    publish_success_rate: 0.4,
4525                    validator_accuracy: 0.4,
4526                    reuse_impact: 0,
4527                },
4528                oris_economics::ReputationRecord {
4529                    node_id: "node-b".into(),
4530                    publish_success_rate: 0.95,
4531                    validator_accuracy: 0.95,
4532                    reuse_impact: 8,
4533                },
4534            ],
4535        }));
4536        let (evo, _) = build_test_evo("remote-select", "run-remote-select", command_validator());
4537        let evo = evo.with_economics(ledger);
4538
4539        let envelope_a = remote_publish_envelope(
4540            "node-a",
4541            "run-remote-a",
4542            "gene-a",
4543            "capsule-a",
4544            "mutation-a",
4545            "shared-signal",
4546            "A.md",
4547            "# from a",
4548        );
4549        let envelope_b = remote_publish_envelope(
4550            "node-b",
4551            "run-remote-b",
4552            "gene-b",
4553            "capsule-b",
4554            "mutation-b",
4555            "shared-signal",
4556            "B.md",
4557            "# from b",
4558        );
4559
4560        evo.import_remote_envelope(&envelope_a).unwrap();
4561        evo.import_remote_envelope(&envelope_b).unwrap();
4562
4563        let candidates = evo.select_candidates(&replay_input("shared-signal"));
4564
4565        assert_eq!(candidates.len(), 1);
4566        assert_eq!(candidates[0].gene.id, "gene-b");
4567        assert_eq!(candidates[0].capsules[0].id, "capsule-b");
4568    }
4569
4570    #[tokio::test]
4571    async fn remote_reuse_publisher_bias_survives_restart() {
4572        let ledger = Arc::new(Mutex::new(EvuLedger {
4573            accounts: vec![],
4574            reputations: vec![
4575                oris_economics::ReputationRecord {
4576                    node_id: "node-a".into(),
4577                    publish_success_rate: 0.4,
4578                    validator_accuracy: 0.4,
4579                    reuse_impact: 0,
4580                },
4581                oris_economics::ReputationRecord {
4582                    node_id: "node-b".into(),
4583                    publish_success_rate: 0.95,
4584                    validator_accuracy: 0.95,
4585                    reuse_impact: 8,
4586                },
4587            ],
4588        }));
4589        let store_root = std::env::temp_dir().join(format!(
4590            "oris-evokernel-remote-restart-store-{}",
4591            next_id("t")
4592        ));
4593        if store_root.exists() {
4594            fs::remove_dir_all(&store_root).unwrap();
4595        }
4596        let store: Arc<dyn EvolutionStore> =
4597            Arc::new(oris_evolution::JsonlEvolutionStore::new(&store_root));
4598        let evo = build_test_evo_with_store(
4599            "remote-success-restart-source",
4600            "run-remote-restart-source",
4601            command_validator(),
4602            store.clone(),
4603        )
4604        .with_economics(ledger.clone());
4605
4606        let envelope_a = remote_publish_envelope(
4607            "node-a",
4608            "run-remote-a",
4609            "gene-a",
4610            "capsule-a",
4611            "mutation-a",
4612            "shared-signal",
4613            "A.md",
4614            "# from a",
4615        );
4616        let envelope_b = remote_publish_envelope(
4617            "node-b",
4618            "run-remote-b",
4619            "gene-b",
4620            "capsule-b",
4621            "mutation-b",
4622            "shared-signal",
4623            "B.md",
4624            "# from b",
4625        );
4626
4627        evo.import_remote_envelope(&envelope_a).unwrap();
4628        evo.import_remote_envelope(&envelope_b).unwrap();
4629
4630        let recovered = build_test_evo_with_store(
4631            "remote-success-restart-recovered",
4632            "run-remote-restart-recovered",
4633            command_validator(),
4634            store.clone(),
4635        )
4636        .with_economics(ledger.clone());
4637
4638        let decision = recovered
4639            .replay_or_fallback(replay_input("shared-signal"))
4640            .await
4641            .unwrap();
4642
4643        assert!(decision.used_capsule);
4644        assert_eq!(decision.capsule_id, Some("capsule-b".into()));
4645        let locked = ledger.lock().unwrap();
4646        let rewarded = locked
4647            .accounts
4648            .iter()
4649            .find(|item| item.node_id == "node-b")
4650            .unwrap();
4651        assert_eq!(rewarded.balance, recovered.stake_policy.reuse_reward);
4652    }
4653
4654    #[tokio::test]
4655    async fn remote_reuse_failure_penalizes_remote_reputation() {
4656        let ledger = Arc::new(Mutex::new(EvuLedger::default()));
4657        let failing_validator: Arc<dyn Validator> = Arc::new(FixedValidator { success: false });
4658        let (evo, _) = build_test_evo("remote-failure", "run-failure", failing_validator);
4659        let evo = evo.with_economics(ledger.clone());
4660
4661        let envelope = remote_publish_envelope(
4662            "node-remote",
4663            "run-remote-failed",
4664            "gene-remote",
4665            "capsule-remote",
4666            "mutation-remote",
4667            "failure-signal",
4668            "FAILED.md",
4669            "# from remote",
4670        );
4671        evo.import_remote_envelope(&envelope).unwrap();
4672
4673        let decision = evo
4674            .replay_or_fallback(replay_input("failure-signal"))
4675            .await
4676            .unwrap();
4677
4678        assert!(!decision.used_capsule);
4679        assert!(decision.fallback_to_planner);
4680
4681        let signal = evo.economics_signal("node-remote").unwrap();
4682        assert_eq!(signal.available_evu, 0);
4683        assert!(signal.publish_success_rate < 0.5);
4684        assert!(signal.validator_accuracy < 0.5);
4685    }
4686}