1use std::collections::{BTreeMap, BTreeSet};
4use std::fs;
5use std::path::Path;
6use std::process::Command;
7use std::sync::{Arc, Mutex};
8
9use async_trait::async_trait;
10use chrono::{DateTime, Duration, Utc};
11use oris_agent_contract::{
12 AgentRole, CoordinationMessage, CoordinationPlan, CoordinationPrimitive, CoordinationResult,
13 CoordinationTask, ExecutionFeedback, MutationProposal as AgentMutationProposal,
14};
15use oris_economics::{EconomicsSignal, EvuLedger, StakePolicy};
16use oris_evolution::{
17 compute_artifact_hash, next_id, stable_hash_json, AssetState, BlastRadius, CandidateSource,
18 Capsule, CapsuleId, EnvFingerprint, EvolutionError, EvolutionEvent, EvolutionProjection,
19 EvolutionStore, Gene, GeneCandidate, MutationId, PreparedMutation, Selector, SelectorInput,
20 StoreBackedSelector, StoredEvolutionEvent, ValidationSnapshot,
21};
22use oris_evolution_network::{EvolutionEnvelope, NetworkAsset};
23use oris_governor::{DefaultGovernor, Governor, GovernorDecision, GovernorInput};
24use oris_kernel::{Kernel, KernelState, RunId};
25use oris_sandbox::{
26 compute_blast_radius, execute_allowed_command, Sandbox, SandboxPolicy, SandboxReceipt,
27};
28use oris_spec::CompiledMutationPlan;
29use serde::{Deserialize, Serialize};
30use thiserror::Error;
31
32pub use oris_evolution::{
33 default_store_root, ArtifactEncoding, AssetState as EvoAssetState,
34 BlastRadius as EvoBlastRadius, CandidateSource as EvoCandidateSource,
35 EnvFingerprint as EvoEnvFingerprint, EvolutionStore as EvoEvolutionStore, JsonlEvolutionStore,
36 MutationArtifact, MutationIntent, MutationTarget, Outcome, RiskLevel,
37 SelectorInput as EvoSelectorInput,
38};
39pub use oris_evolution_network::{
40 FetchQuery, FetchResponse, MessageType, PublishRequest, RevokeNotice,
41};
42pub use oris_governor::{CoolingWindow, GovernorConfig, RevocationReason};
43pub use oris_sandbox::{LocalProcessSandbox, SandboxPolicy as EvoSandboxPolicy};
44pub use oris_spec::{SpecCompileError, SpecCompiler, SpecDocument};
45
46#[derive(Clone, Debug, Serialize, Deserialize)]
47pub struct ValidationPlan {
48 pub profile: String,
49 pub stages: Vec<ValidationStage>,
50}
51
52impl ValidationPlan {
53 pub fn oris_default() -> Self {
54 Self {
55 profile: "oris-default".into(),
56 stages: vec![
57 ValidationStage::Command {
58 program: "cargo".into(),
59 args: vec!["fmt".into(), "--all".into(), "--check".into()],
60 timeout_ms: 60_000,
61 },
62 ValidationStage::Command {
63 program: "cargo".into(),
64 args: vec!["check".into(), "--workspace".into()],
65 timeout_ms: 180_000,
66 },
67 ValidationStage::Command {
68 program: "cargo".into(),
69 args: vec![
70 "test".into(),
71 "-p".into(),
72 "oris-kernel".into(),
73 "-p".into(),
74 "oris-evolution".into(),
75 "-p".into(),
76 "oris-sandbox".into(),
77 "-p".into(),
78 "oris-evokernel".into(),
79 "--lib".into(),
80 ],
81 timeout_ms: 300_000,
82 },
83 ValidationStage::Command {
84 program: "cargo".into(),
85 args: vec![
86 "test".into(),
87 "-p".into(),
88 "oris-runtime".into(),
89 "--lib".into(),
90 ],
91 timeout_ms: 300_000,
92 },
93 ],
94 }
95 }
96}
97
98#[derive(Clone, Debug, Serialize, Deserialize)]
99pub enum ValidationStage {
100 Command {
101 program: String,
102 args: Vec<String>,
103 timeout_ms: u64,
104 },
105}
106
107#[derive(Clone, Debug, Serialize, Deserialize)]
108pub struct ValidationStageReport {
109 pub stage: String,
110 pub success: bool,
111 pub exit_code: Option<i32>,
112 pub duration_ms: u64,
113 pub stdout: String,
114 pub stderr: String,
115}
116
117#[derive(Clone, Debug, Serialize, Deserialize)]
118pub struct ValidationReport {
119 pub success: bool,
120 pub duration_ms: u64,
121 pub stages: Vec<ValidationStageReport>,
122 pub logs: String,
123}
124
125#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
126pub struct SignalExtractionInput {
127 pub patch_diff: String,
128 pub intent: String,
129 pub expected_effect: String,
130 pub declared_signals: Vec<String>,
131 pub changed_files: Vec<String>,
132 pub validation_success: bool,
133 pub validation_logs: String,
134 pub stage_outputs: Vec<String>,
135}
136
137#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
138pub struct SignalExtractionOutput {
139 pub values: Vec<String>,
140 pub hash: String,
141}
142
143#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
144pub struct SeedTemplate {
145 pub id: String,
146 pub intent: String,
147 pub signals: Vec<String>,
148 pub diff_payload: String,
149 pub validation_profile: String,
150}
151
152#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
153pub struct BootstrapReport {
154 pub seeded: bool,
155 pub genes_added: usize,
156 pub capsules_added: usize,
157}
158
159impl ValidationReport {
160 pub fn to_snapshot(&self, profile: &str) -> ValidationSnapshot {
161 ValidationSnapshot {
162 success: self.success,
163 profile: profile.to_string(),
164 duration_ms: self.duration_ms,
165 summary: if self.success {
166 "validation passed".into()
167 } else {
168 "validation failed".into()
169 },
170 }
171 }
172}
173
174pub fn extract_deterministic_signals(input: &SignalExtractionInput) -> SignalExtractionOutput {
175 let mut signals = BTreeSet::new();
176
177 for declared in &input.declared_signals {
178 if let Some(phrase) = normalize_signal_phrase(declared) {
179 signals.insert(phrase);
180 }
181 extend_signal_tokens(&mut signals, declared);
182 }
183
184 for text in [
185 input.patch_diff.as_str(),
186 input.intent.as_str(),
187 input.expected_effect.as_str(),
188 input.validation_logs.as_str(),
189 ] {
190 extend_signal_tokens(&mut signals, text);
191 }
192
193 for changed_file in &input.changed_files {
194 extend_signal_tokens(&mut signals, changed_file);
195 }
196
197 for stage_output in &input.stage_outputs {
198 extend_signal_tokens(&mut signals, stage_output);
199 }
200
201 signals.insert(if input.validation_success {
202 "validation passed".into()
203 } else {
204 "validation failed".into()
205 });
206
207 let values = signals.into_iter().take(32).collect::<Vec<_>>();
208 let hash =
209 stable_hash_json(&values).unwrap_or_else(|_| compute_artifact_hash(&values.join("\n")));
210 SignalExtractionOutput { values, hash }
211}
212
213#[derive(Debug, Error)]
214pub enum ValidationError {
215 #[error("validation execution failed: {0}")]
216 Execution(String),
217}
218
219#[async_trait]
220pub trait Validator: Send + Sync {
221 async fn run(
222 &self,
223 receipt: &SandboxReceipt,
224 plan: &ValidationPlan,
225 ) -> Result<ValidationReport, ValidationError>;
226}
227
228pub struct CommandValidator {
229 policy: SandboxPolicy,
230}
231
232impl CommandValidator {
233 pub fn new(policy: SandboxPolicy) -> Self {
234 Self { policy }
235 }
236}
237
238#[async_trait]
239impl Validator for CommandValidator {
240 async fn run(
241 &self,
242 receipt: &SandboxReceipt,
243 plan: &ValidationPlan,
244 ) -> Result<ValidationReport, ValidationError> {
245 let started = std::time::Instant::now();
246 let mut stages = Vec::new();
247 let mut success = true;
248 let mut logs = String::new();
249
250 for stage in &plan.stages {
251 match stage {
252 ValidationStage::Command {
253 program,
254 args,
255 timeout_ms,
256 } => {
257 let result = execute_allowed_command(
258 &self.policy,
259 &receipt.workdir,
260 program,
261 args,
262 *timeout_ms,
263 )
264 .await;
265 let report = match result {
266 Ok(output) => ValidationStageReport {
267 stage: format!("{program} {}", args.join(" ")),
268 success: output.success,
269 exit_code: output.exit_code,
270 duration_ms: output.duration_ms,
271 stdout: output.stdout,
272 stderr: output.stderr,
273 },
274 Err(err) => ValidationStageReport {
275 stage: format!("{program} {}", args.join(" ")),
276 success: false,
277 exit_code: None,
278 duration_ms: 0,
279 stdout: String::new(),
280 stderr: err.to_string(),
281 },
282 };
283 if !report.success {
284 success = false;
285 }
286 if !report.stdout.is_empty() {
287 logs.push_str(&report.stdout);
288 logs.push('\n');
289 }
290 if !report.stderr.is_empty() {
291 logs.push_str(&report.stderr);
292 logs.push('\n');
293 }
294 stages.push(report);
295 if !success {
296 break;
297 }
298 }
299 }
300 }
301
302 Ok(ValidationReport {
303 success,
304 duration_ms: started.elapsed().as_millis() as u64,
305 stages,
306 logs,
307 })
308 }
309}
310
311#[derive(Clone, Debug)]
312pub struct ReplayDecision {
313 pub used_capsule: bool,
314 pub capsule_id: Option<CapsuleId>,
315 pub fallback_to_planner: bool,
316 pub reason: String,
317}
318
319#[derive(Clone, Copy, Debug, Eq, PartialEq)]
320enum CoordinationTaskState {
321 Ready,
322 Waiting,
323 BlockedByFailure,
324 PermanentlyBlocked,
325}
326
327#[derive(Clone, Debug, Default)]
328pub struct MultiAgentCoordinator;
329
330impl MultiAgentCoordinator {
331 pub fn new() -> Self {
332 Self
333 }
334
335 pub fn coordinate(&self, plan: CoordinationPlan) -> CoordinationResult {
336 let primitive = plan.primitive.clone();
337 let root_goal = plan.root_goal.clone();
338 let timeout_ms = plan.timeout_ms;
339 let max_retries = plan.max_retries;
340 let mut tasks = BTreeMap::new();
341 for task in plan.tasks {
342 tasks.entry(task.id.clone()).or_insert(task);
343 }
344
345 let mut pending = tasks.keys().cloned().collect::<BTreeSet<_>>();
346 let mut completed = BTreeSet::new();
347 let mut failed = BTreeSet::new();
348 let mut completed_order = Vec::new();
349 let mut failed_order = Vec::new();
350 let mut skipped = BTreeSet::new();
351 let mut attempts = BTreeMap::new();
352 let mut messages = Vec::new();
353
354 loop {
355 if matches!(primitive, CoordinationPrimitive::Conditional) {
356 self.apply_conditional_skips(
357 &tasks,
358 &mut pending,
359 &completed,
360 &failed,
361 &mut skipped,
362 &mut messages,
363 );
364 }
365
366 let mut ready = self.ready_task_ids(&tasks, &pending, &completed, &failed, &skipped);
367 if ready.is_empty() {
368 break;
369 }
370 if matches!(primitive, CoordinationPrimitive::Sequential) {
371 ready.truncate(1);
372 }
373
374 for task_id in ready {
375 let Some(task) = tasks.get(&task_id) else {
376 continue;
377 };
378 if !pending.contains(&task_id) {
379 continue;
380 }
381 self.record_handoff_messages(task, &tasks, &completed, &failed, &mut messages);
382
383 let prior_failures = attempts.get(&task_id).copied().unwrap_or(0);
384 if Self::simulate_task_failure(task, prior_failures) {
385 let failure_count = prior_failures + 1;
386 attempts.insert(task_id.clone(), failure_count);
387 let will_retry = failure_count <= max_retries;
388 messages.push(CoordinationMessage {
389 from_role: task.role.clone(),
390 to_role: task.role.clone(),
391 task_id: task_id.clone(),
392 content: if will_retry {
393 format!("task {task_id} failed on attempt {failure_count} and will retry")
394 } else {
395 format!(
396 "task {task_id} failed on attempt {failure_count} and exhausted retries"
397 )
398 },
399 });
400 if !will_retry {
401 pending.remove(&task_id);
402 if failed.insert(task_id.clone()) {
403 failed_order.push(task_id);
404 }
405 }
406 continue;
407 }
408
409 pending.remove(&task_id);
410 if completed.insert(task_id.clone()) {
411 completed_order.push(task_id);
412 }
413 }
414 }
415
416 let blocked_ids = pending.into_iter().collect::<Vec<_>>();
417 for task_id in blocked_ids {
418 let Some(task) = tasks.get(&task_id) else {
419 continue;
420 };
421 let state = self.classify_task(task, &tasks, &completed, &failed, &skipped);
422 let content = match state {
423 CoordinationTaskState::BlockedByFailure => {
424 format!("task {task_id} blocked by failed dependencies")
425 }
426 CoordinationTaskState::PermanentlyBlocked => {
427 format!("task {task_id} has invalid coordination prerequisites")
428 }
429 CoordinationTaskState::Waiting => {
430 format!("task {task_id} has unresolved dependencies")
431 }
432 CoordinationTaskState::Ready => {
433 format!("task {task_id} was left pending unexpectedly")
434 }
435 };
436 messages.push(CoordinationMessage {
437 from_role: task.role.clone(),
438 to_role: task.role.clone(),
439 task_id: task_id.clone(),
440 content,
441 });
442 if failed.insert(task_id.clone()) {
443 failed_order.push(task_id);
444 }
445 }
446
447 CoordinationResult {
448 completed_tasks: completed_order,
449 failed_tasks: failed_order,
450 messages,
451 summary: format!(
452 "goal '{}' completed {} tasks, failed {}, skipped {} using {:?} coordination (timeout={}ms, max_retries={})",
453 root_goal,
454 completed.len(),
455 failed.len(),
456 skipped.len(),
457 primitive,
458 timeout_ms,
459 max_retries
460 ),
461 }
462 }
463
464 fn ready_task_ids(
465 &self,
466 tasks: &BTreeMap<String, CoordinationTask>,
467 pending: &BTreeSet<String>,
468 completed: &BTreeSet<String>,
469 failed: &BTreeSet<String>,
470 skipped: &BTreeSet<String>,
471 ) -> Vec<String> {
472 pending
473 .iter()
474 .filter_map(|task_id| {
475 let task = tasks.get(task_id)?;
476 (self.classify_task(task, tasks, completed, failed, skipped)
477 == CoordinationTaskState::Ready)
478 .then(|| task_id.clone())
479 })
480 .collect()
481 }
482
483 fn apply_conditional_skips(
484 &self,
485 tasks: &BTreeMap<String, CoordinationTask>,
486 pending: &mut BTreeSet<String>,
487 completed: &BTreeSet<String>,
488 failed: &BTreeSet<String>,
489 skipped: &mut BTreeSet<String>,
490 messages: &mut Vec<CoordinationMessage>,
491 ) {
492 let skip_ids = pending
493 .iter()
494 .filter_map(|task_id| {
495 let task = tasks.get(task_id)?;
496 (self.classify_task(task, tasks, completed, failed, skipped)
497 == CoordinationTaskState::BlockedByFailure)
498 .then(|| task_id.clone())
499 })
500 .collect::<Vec<_>>();
501
502 for task_id in skip_ids {
503 let Some(task) = tasks.get(&task_id) else {
504 continue;
505 };
506 pending.remove(&task_id);
507 skipped.insert(task_id.clone());
508 messages.push(CoordinationMessage {
509 from_role: task.role.clone(),
510 to_role: task.role.clone(),
511 task_id: task_id.clone(),
512 content: format!("task {task_id} skipped due to failed dependency chain"),
513 });
514 }
515 }
516
517 fn classify_task(
518 &self,
519 task: &CoordinationTask,
520 tasks: &BTreeMap<String, CoordinationTask>,
521 completed: &BTreeSet<String>,
522 failed: &BTreeSet<String>,
523 skipped: &BTreeSet<String>,
524 ) -> CoordinationTaskState {
525 match task.role {
526 AgentRole::Planner | AgentRole::Coder => {
527 let mut waiting = false;
528 for dependency_id in &task.depends_on {
529 if !tasks.contains_key(dependency_id) {
530 return CoordinationTaskState::PermanentlyBlocked;
531 }
532 if skipped.contains(dependency_id) || failed.contains(dependency_id) {
533 return CoordinationTaskState::BlockedByFailure;
534 }
535 if !completed.contains(dependency_id) {
536 waiting = true;
537 }
538 }
539 if waiting {
540 CoordinationTaskState::Waiting
541 } else {
542 CoordinationTaskState::Ready
543 }
544 }
545 AgentRole::Repair => {
546 let mut waiting = false;
547 let mut has_coder_dependency = false;
548 let mut has_failed_coder = false;
549 for dependency_id in &task.depends_on {
550 let Some(dependency) = tasks.get(dependency_id) else {
551 return CoordinationTaskState::PermanentlyBlocked;
552 };
553 let is_coder = matches!(dependency.role, AgentRole::Coder);
554 if is_coder {
555 has_coder_dependency = true;
556 }
557 if skipped.contains(dependency_id) {
558 return CoordinationTaskState::BlockedByFailure;
559 }
560 if failed.contains(dependency_id) {
561 if is_coder {
562 has_failed_coder = true;
563 } else {
564 return CoordinationTaskState::BlockedByFailure;
565 }
566 continue;
567 }
568 if !completed.contains(dependency_id) {
569 waiting = true;
570 }
571 }
572 if !has_coder_dependency {
573 CoordinationTaskState::PermanentlyBlocked
574 } else if waiting {
575 CoordinationTaskState::Waiting
576 } else if has_failed_coder {
577 CoordinationTaskState::Ready
578 } else {
579 CoordinationTaskState::PermanentlyBlocked
580 }
581 }
582 AgentRole::Optimizer => {
583 let mut waiting = false;
584 let mut has_impl_dependency = false;
585 let mut has_completed_impl = false;
586 let mut has_failed_impl = false;
587 for dependency_id in &task.depends_on {
588 let Some(dependency) = tasks.get(dependency_id) else {
589 return CoordinationTaskState::PermanentlyBlocked;
590 };
591 let is_impl = matches!(dependency.role, AgentRole::Coder | AgentRole::Repair);
592 if is_impl {
593 has_impl_dependency = true;
594 }
595 if skipped.contains(dependency_id) || failed.contains(dependency_id) {
596 if is_impl {
597 has_failed_impl = true;
598 continue;
599 }
600 return CoordinationTaskState::BlockedByFailure;
601 }
602 if completed.contains(dependency_id) {
603 if is_impl {
604 has_completed_impl = true;
605 }
606 continue;
607 }
608 waiting = true;
609 }
610 if !has_impl_dependency {
611 CoordinationTaskState::PermanentlyBlocked
612 } else if waiting {
613 CoordinationTaskState::Waiting
614 } else if has_completed_impl {
615 CoordinationTaskState::Ready
616 } else if has_failed_impl {
617 CoordinationTaskState::BlockedByFailure
618 } else {
619 CoordinationTaskState::PermanentlyBlocked
620 }
621 }
622 }
623 }
624
625 fn record_handoff_messages(
626 &self,
627 task: &CoordinationTask,
628 tasks: &BTreeMap<String, CoordinationTask>,
629 completed: &BTreeSet<String>,
630 failed: &BTreeSet<String>,
631 messages: &mut Vec<CoordinationMessage>,
632 ) {
633 let mut dependency_ids = task.depends_on.clone();
634 dependency_ids.sort();
635 dependency_ids.dedup();
636
637 for dependency_id in dependency_ids {
638 let Some(dependency) = tasks.get(&dependency_id) else {
639 continue;
640 };
641 if completed.contains(&dependency_id) {
642 messages.push(CoordinationMessage {
643 from_role: dependency.role.clone(),
644 to_role: task.role.clone(),
645 task_id: task.id.clone(),
646 content: format!("handoff from {dependency_id} to {}", task.id),
647 });
648 } else if failed.contains(&dependency_id) {
649 messages.push(CoordinationMessage {
650 from_role: dependency.role.clone(),
651 to_role: task.role.clone(),
652 task_id: task.id.clone(),
653 content: format!("failed dependency {dependency_id} routed to {}", task.id),
654 });
655 }
656 }
657 }
658
659 fn simulate_task_failure(task: &CoordinationTask, prior_failures: u32) -> bool {
660 let normalized = task.description.to_ascii_lowercase();
661 normalized.contains("force-fail")
662 || (normalized.contains("fail-once") && prior_failures == 0)
663 }
664}
665
666#[derive(Debug, Error)]
667pub enum ReplayError {
668 #[error("store error: {0}")]
669 Store(String),
670 #[error("sandbox error: {0}")]
671 Sandbox(String),
672 #[error("validation error: {0}")]
673 Validation(String),
674}
675
676#[async_trait]
677pub trait ReplayExecutor: Send + Sync {
678 async fn try_replay(
679 &self,
680 input: &SelectorInput,
681 policy: &SandboxPolicy,
682 validation: &ValidationPlan,
683 ) -> Result<ReplayDecision, ReplayError>;
684
685 async fn try_replay_for_run(
686 &self,
687 run_id: &RunId,
688 input: &SelectorInput,
689 policy: &SandboxPolicy,
690 validation: &ValidationPlan,
691 ) -> Result<ReplayDecision, ReplayError> {
692 let _ = run_id;
693 self.try_replay(input, policy, validation).await
694 }
695}
696
697pub struct StoreReplayExecutor {
698 pub sandbox: Arc<dyn Sandbox>,
699 pub validator: Arc<dyn Validator>,
700 pub store: Arc<dyn EvolutionStore>,
701 pub selector: Arc<dyn Selector>,
702 pub governor: Arc<dyn Governor>,
703 pub economics: Option<Arc<Mutex<EvuLedger>>>,
704 pub remote_publishers: Option<Arc<Mutex<BTreeMap<String, String>>>>,
705 pub stake_policy: StakePolicy,
706}
707
708#[async_trait]
709impl ReplayExecutor for StoreReplayExecutor {
710 async fn try_replay(
711 &self,
712 input: &SelectorInput,
713 policy: &SandboxPolicy,
714 validation: &ValidationPlan,
715 ) -> Result<ReplayDecision, ReplayError> {
716 self.try_replay_inner(None, input, policy, validation).await
717 }
718
719 async fn try_replay_for_run(
720 &self,
721 run_id: &RunId,
722 input: &SelectorInput,
723 policy: &SandboxPolicy,
724 validation: &ValidationPlan,
725 ) -> Result<ReplayDecision, ReplayError> {
726 self.try_replay_inner(Some(run_id), input, policy, validation)
727 .await
728 }
729}
730
731impl StoreReplayExecutor {
732 async fn try_replay_inner(
733 &self,
734 replay_run_id: Option<&RunId>,
735 input: &SelectorInput,
736 policy: &SandboxPolicy,
737 validation: &ValidationPlan,
738 ) -> Result<ReplayDecision, ReplayError> {
739 let mut selector_input = input.clone();
740 if self.economics.is_some() && self.remote_publishers.is_some() {
741 selector_input.limit = selector_input.limit.max(4);
742 }
743 let mut candidates = self.selector.select(&selector_input);
744 self.rerank_with_reputation_bias(&mut candidates);
745 let mut exact_match = false;
746 if candidates.is_empty() {
747 let mut exact_candidates = exact_match_candidates(self.store.as_ref(), input);
748 self.rerank_with_reputation_bias(&mut exact_candidates);
749 if !exact_candidates.is_empty() {
750 candidates = exact_candidates;
751 exact_match = true;
752 }
753 }
754 if candidates.is_empty() {
755 let mut remote_candidates =
756 quarantined_remote_exact_match_candidates(self.store.as_ref(), input);
757 self.rerank_with_reputation_bias(&mut remote_candidates);
758 if !remote_candidates.is_empty() {
759 candidates = remote_candidates;
760 exact_match = true;
761 }
762 }
763 candidates.truncate(input.limit.max(1));
764 let Some(best) = candidates.into_iter().next() else {
765 return Ok(ReplayDecision {
766 used_capsule: false,
767 capsule_id: None,
768 fallback_to_planner: true,
769 reason: "no matching gene".into(),
770 });
771 };
772 let remote_publisher = self.publisher_for_gene(&best.gene.id);
773
774 if !exact_match && best.score < 0.82 {
775 return Ok(ReplayDecision {
776 used_capsule: false,
777 capsule_id: None,
778 fallback_to_planner: true,
779 reason: format!("best gene score {:.3} below replay threshold", best.score),
780 });
781 }
782
783 let Some(capsule) = best.capsules.first().cloned() else {
784 return Ok(ReplayDecision {
785 used_capsule: false,
786 capsule_id: None,
787 fallback_to_planner: true,
788 reason: "candidate gene has no capsule".into(),
789 });
790 };
791
792 let Some(mutation) = find_declared_mutation(self.store.as_ref(), &capsule.mutation_id)
793 .map_err(|err| ReplayError::Store(err.to_string()))?
794 else {
795 return Ok(ReplayDecision {
796 used_capsule: false,
797 capsule_id: None,
798 fallback_to_planner: true,
799 reason: "mutation payload missing from store".into(),
800 });
801 };
802
803 let receipt = match self.sandbox.apply(&mutation, policy).await {
804 Ok(receipt) => receipt,
805 Err(err) => {
806 self.record_reuse_settlement(remote_publisher.as_deref(), false);
807 return Ok(ReplayDecision {
808 used_capsule: false,
809 capsule_id: Some(capsule.id.clone()),
810 fallback_to_planner: true,
811 reason: format!("replay patch apply failed: {err}"),
812 });
813 }
814 };
815
816 let report = self
817 .validator
818 .run(&receipt, validation)
819 .await
820 .map_err(|err| ReplayError::Validation(err.to_string()))?;
821 if !report.success {
822 self.record_replay_validation_failure(&best, &capsule, validation, &report)?;
823 self.record_reuse_settlement(remote_publisher.as_deref(), false);
824 return Ok(ReplayDecision {
825 used_capsule: false,
826 capsule_id: Some(capsule.id.clone()),
827 fallback_to_planner: true,
828 reason: "replay validation failed".into(),
829 });
830 }
831
832 if matches!(capsule.state, AssetState::Quarantined) {
833 self.store
834 .append_event(EvolutionEvent::ValidationPassed {
835 mutation_id: capsule.mutation_id.clone(),
836 report: report.to_snapshot(&validation.profile),
837 gene_id: Some(best.gene.id.clone()),
838 })
839 .map_err(|err| ReplayError::Store(err.to_string()))?;
840 if matches!(best.gene.state, AssetState::Quarantined) {
841 self.store
842 .append_event(EvolutionEvent::PromotionEvaluated {
843 gene_id: best.gene.id.clone(),
844 state: AssetState::Promoted,
845 reason: "remote asset locally validated via replay".into(),
846 })
847 .map_err(|err| ReplayError::Store(err.to_string()))?;
848 self.store
849 .append_event(EvolutionEvent::GenePromoted {
850 gene_id: best.gene.id.clone(),
851 })
852 .map_err(|err| ReplayError::Store(err.to_string()))?;
853 }
854 self.store
855 .append_event(EvolutionEvent::CapsuleReleased {
856 capsule_id: capsule.id.clone(),
857 state: AssetState::Promoted,
858 })
859 .map_err(|err| ReplayError::Store(err.to_string()))?;
860 }
861
862 self.store
863 .append_event(EvolutionEvent::CapsuleReused {
864 capsule_id: capsule.id.clone(),
865 gene_id: capsule.gene_id.clone(),
866 run_id: capsule.run_id.clone(),
867 replay_run_id: replay_run_id.cloned(),
868 })
869 .map_err(|err| ReplayError::Store(err.to_string()))?;
870 self.record_reuse_settlement(remote_publisher.as_deref(), true);
871
872 Ok(ReplayDecision {
873 used_capsule: true,
874 capsule_id: Some(capsule.id),
875 fallback_to_planner: false,
876 reason: if exact_match {
877 "replayed via cold-start lookup".into()
878 } else {
879 "replayed via selector".into()
880 },
881 })
882 }
883
884 fn rerank_with_reputation_bias(&self, candidates: &mut [GeneCandidate]) {
885 let Some(ledger) = self.economics.as_ref() else {
886 return;
887 };
888 let Some(remote_publishers) = self.remote_publishers.as_ref() else {
889 return;
890 };
891 let reputation_bias = ledger
892 .lock()
893 .ok()
894 .map(|locked| locked.selector_reputation_bias())
895 .unwrap_or_default();
896 if reputation_bias.is_empty() {
897 return;
898 }
899 let publisher_map = remote_publishers
900 .lock()
901 .ok()
902 .map(|locked| locked.clone())
903 .unwrap_or_default();
904 candidates.sort_by(|left, right| {
905 effective_candidate_score(right, &publisher_map, &reputation_bias)
906 .partial_cmp(&effective_candidate_score(
907 left,
908 &publisher_map,
909 &reputation_bias,
910 ))
911 .unwrap_or(std::cmp::Ordering::Equal)
912 .then_with(|| left.gene.id.cmp(&right.gene.id))
913 });
914 }
915
916 fn publisher_for_gene(&self, gene_id: &str) -> Option<String> {
917 self.remote_publishers
918 .as_ref()?
919 .lock()
920 .ok()?
921 .get(gene_id)
922 .cloned()
923 }
924
925 fn record_reuse_settlement(&self, publisher_id: Option<&str>, success: bool) {
926 let Some(publisher_id) = publisher_id else {
927 return;
928 };
929 let Some(ledger) = self.economics.as_ref() else {
930 return;
931 };
932 if let Ok(mut locked) = ledger.lock() {
933 locked.settle_remote_reuse(publisher_id, success, &self.stake_policy);
934 }
935 }
936
937 fn record_replay_validation_failure(
938 &self,
939 best: &GeneCandidate,
940 capsule: &Capsule,
941 validation: &ValidationPlan,
942 report: &ValidationReport,
943 ) -> Result<(), ReplayError> {
944 let projection = self
945 .store
946 .rebuild_projection()
947 .map_err(|err| ReplayError::Store(err.to_string()))?;
948 let (current_confidence, historical_peak_confidence, confidence_last_updated_secs) =
949 Self::confidence_context(&projection, &best.gene.id);
950
951 self.store
952 .append_event(EvolutionEvent::ValidationFailed {
953 mutation_id: capsule.mutation_id.clone(),
954 report: report.to_snapshot(&validation.profile),
955 gene_id: Some(best.gene.id.clone()),
956 })
957 .map_err(|err| ReplayError::Store(err.to_string()))?;
958
959 let replay_failures = self.replay_failure_count(&best.gene.id)?;
960 let governor_decision = self.governor.evaluate(GovernorInput {
961 candidate_source: if self.publisher_for_gene(&best.gene.id).is_some() {
962 CandidateSource::Remote
963 } else {
964 CandidateSource::Local
965 },
966 success_count: 0,
967 blast_radius: BlastRadius {
968 files_changed: capsule.outcome.changed_files.len(),
969 lines_changed: capsule.outcome.lines_changed,
970 },
971 replay_failures,
972 recent_mutation_ages_secs: Vec::new(),
973 current_confidence,
974 historical_peak_confidence,
975 confidence_last_updated_secs,
976 });
977
978 if matches!(governor_decision.target_state, AssetState::Revoked) {
979 self.store
980 .append_event(EvolutionEvent::PromotionEvaluated {
981 gene_id: best.gene.id.clone(),
982 state: AssetState::Revoked,
983 reason: governor_decision.reason.clone(),
984 })
985 .map_err(|err| ReplayError::Store(err.to_string()))?;
986 self.store
987 .append_event(EvolutionEvent::GeneRevoked {
988 gene_id: best.gene.id.clone(),
989 reason: governor_decision.reason,
990 })
991 .map_err(|err| ReplayError::Store(err.to_string()))?;
992 for related in &best.capsules {
993 self.store
994 .append_event(EvolutionEvent::CapsuleQuarantined {
995 capsule_id: related.id.clone(),
996 })
997 .map_err(|err| ReplayError::Store(err.to_string()))?;
998 }
999 }
1000
1001 Ok(())
1002 }
1003
1004 fn confidence_context(
1005 projection: &EvolutionProjection,
1006 gene_id: &str,
1007 ) -> (f32, f32, Option<u64>) {
1008 let peak_confidence = projection
1009 .capsules
1010 .iter()
1011 .filter(|capsule| capsule.gene_id == gene_id)
1012 .map(|capsule| capsule.confidence)
1013 .fold(0.0_f32, f32::max);
1014 let age_secs = projection
1015 .last_updated_at
1016 .get(gene_id)
1017 .and_then(|timestamp| Self::seconds_since_timestamp(timestamp, Utc::now()));
1018 (peak_confidence, peak_confidence, age_secs)
1019 }
1020
1021 fn seconds_since_timestamp(timestamp: &str, now: DateTime<Utc>) -> Option<u64> {
1022 let parsed = DateTime::parse_from_rfc3339(timestamp)
1023 .ok()?
1024 .with_timezone(&Utc);
1025 let elapsed = now.signed_duration_since(parsed);
1026 if elapsed < Duration::zero() {
1027 Some(0)
1028 } else {
1029 u64::try_from(elapsed.num_seconds()).ok()
1030 }
1031 }
1032
1033 fn replay_failure_count(&self, gene_id: &str) -> Result<u64, ReplayError> {
1034 Ok(self
1035 .store
1036 .scan(1)
1037 .map_err(|err| ReplayError::Store(err.to_string()))?
1038 .into_iter()
1039 .filter(|stored| {
1040 matches!(
1041 &stored.event,
1042 EvolutionEvent::ValidationFailed {
1043 gene_id: Some(current_gene_id),
1044 ..
1045 } if current_gene_id == gene_id
1046 )
1047 })
1048 .count() as u64)
1049 }
1050}
1051
1052#[derive(Debug, Error)]
1053pub enum EvoKernelError {
1054 #[error("sandbox error: {0}")]
1055 Sandbox(String),
1056 #[error("validation error: {0}")]
1057 Validation(String),
1058 #[error("validation failed")]
1059 ValidationFailed(ValidationReport),
1060 #[error("store error: {0}")]
1061 Store(String),
1062}
1063
1064#[derive(Clone, Debug)]
1065pub struct CaptureOutcome {
1066 pub capsule: Capsule,
1067 pub gene: Gene,
1068 pub governor_decision: GovernorDecision,
1069}
1070
1071#[derive(Clone, Debug, Serialize, Deserialize)]
1072pub struct ImportOutcome {
1073 pub imported_asset_ids: Vec<String>,
1074 pub accepted: bool,
1075}
1076
1077#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
1078pub struct EvolutionMetricsSnapshot {
1079 pub replay_attempts_total: u64,
1080 pub replay_success_total: u64,
1081 pub replay_success_rate: f64,
1082 pub mutation_declared_total: u64,
1083 pub promoted_mutations_total: u64,
1084 pub promotion_ratio: f64,
1085 pub gene_revocations_total: u64,
1086 pub mutation_velocity_last_hour: u64,
1087 pub revoke_frequency_last_hour: u64,
1088 pub promoted_genes: u64,
1089 pub promoted_capsules: u64,
1090 pub last_event_seq: u64,
1091}
1092
1093#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
1094pub struct EvolutionHealthSnapshot {
1095 pub status: String,
1096 pub last_event_seq: u64,
1097 pub promoted_genes: u64,
1098 pub promoted_capsules: u64,
1099}
1100
1101#[derive(Clone)]
1102pub struct EvolutionNetworkNode {
1103 pub store: Arc<dyn EvolutionStore>,
1104}
1105
1106impl EvolutionNetworkNode {
1107 pub fn new(store: Arc<dyn EvolutionStore>) -> Self {
1108 Self { store }
1109 }
1110
1111 pub fn with_default_store() -> Self {
1112 Self {
1113 store: Arc::new(JsonlEvolutionStore::new(default_store_root())),
1114 }
1115 }
1116
1117 pub fn accept_publish_request(
1118 &self,
1119 request: &PublishRequest,
1120 ) -> Result<ImportOutcome, EvoKernelError> {
1121 import_remote_envelope_into_store(
1122 self.store.as_ref(),
1123 &EvolutionEnvelope::publish(request.sender_id.clone(), request.assets.clone()),
1124 None,
1125 )
1126 }
1127
1128 pub fn publish_local_assets(
1129 &self,
1130 sender_id: impl Into<String>,
1131 ) -> Result<EvolutionEnvelope, EvoKernelError> {
1132 export_promoted_assets_from_store(self.store.as_ref(), sender_id)
1133 }
1134
1135 pub fn fetch_assets(
1136 &self,
1137 responder_id: impl Into<String>,
1138 query: &FetchQuery,
1139 ) -> Result<FetchResponse, EvoKernelError> {
1140 fetch_assets_from_store(self.store.as_ref(), responder_id, query)
1141 }
1142
1143 pub fn revoke_assets(&self, notice: &RevokeNotice) -> Result<RevokeNotice, EvoKernelError> {
1144 revoke_assets_in_store(self.store.as_ref(), notice)
1145 }
1146
1147 pub fn metrics_snapshot(&self) -> Result<EvolutionMetricsSnapshot, EvoKernelError> {
1148 evolution_metrics_snapshot(self.store.as_ref())
1149 }
1150
1151 pub fn render_metrics_prometheus(&self) -> Result<String, EvoKernelError> {
1152 self.metrics_snapshot().map(|snapshot| {
1153 let health = evolution_health_snapshot(&snapshot);
1154 render_evolution_metrics_prometheus(&snapshot, &health)
1155 })
1156 }
1157
1158 pub fn health_snapshot(&self) -> Result<EvolutionHealthSnapshot, EvoKernelError> {
1159 self.metrics_snapshot()
1160 .map(|snapshot| evolution_health_snapshot(&snapshot))
1161 }
1162}
1163
1164pub struct EvoKernel<S: KernelState> {
1165 pub kernel: Arc<Kernel<S>>,
1166 pub sandbox: Arc<dyn Sandbox>,
1167 pub validator: Arc<dyn Validator>,
1168 pub store: Arc<dyn EvolutionStore>,
1169 pub selector: Arc<dyn Selector>,
1170 pub governor: Arc<dyn Governor>,
1171 pub economics: Arc<Mutex<EvuLedger>>,
1172 pub remote_publishers: Arc<Mutex<BTreeMap<String, String>>>,
1173 pub stake_policy: StakePolicy,
1174 pub sandbox_policy: SandboxPolicy,
1175 pub validation_plan: ValidationPlan,
1176}
1177
1178impl<S: KernelState> EvoKernel<S> {
1179 fn recent_prior_mutation_ages_secs(
1180 &self,
1181 exclude_mutation_id: Option<&str>,
1182 ) -> Result<Vec<u64>, EvolutionError> {
1183 let now = Utc::now();
1184 let mut ages = self
1185 .store
1186 .scan(1)?
1187 .into_iter()
1188 .filter_map(|stored| match stored.event {
1189 EvolutionEvent::MutationDeclared { mutation }
1190 if exclude_mutation_id != Some(mutation.intent.id.as_str()) =>
1191 {
1192 Self::seconds_since_timestamp(&stored.timestamp, now)
1193 }
1194 _ => None,
1195 })
1196 .collect::<Vec<_>>();
1197 ages.sort_unstable();
1198 Ok(ages)
1199 }
1200
1201 fn seconds_since_timestamp(timestamp: &str, now: DateTime<Utc>) -> Option<u64> {
1202 let parsed = DateTime::parse_from_rfc3339(timestamp)
1203 .ok()?
1204 .with_timezone(&Utc);
1205 let elapsed = now.signed_duration_since(parsed);
1206 if elapsed < Duration::zero() {
1207 Some(0)
1208 } else {
1209 u64::try_from(elapsed.num_seconds()).ok()
1210 }
1211 }
1212
1213 pub fn new(
1214 kernel: Arc<Kernel<S>>,
1215 sandbox: Arc<dyn Sandbox>,
1216 validator: Arc<dyn Validator>,
1217 store: Arc<dyn EvolutionStore>,
1218 ) -> Self {
1219 let selector: Arc<dyn Selector> = Arc::new(StoreBackedSelector::new(store.clone()));
1220 Self {
1221 kernel,
1222 sandbox,
1223 validator,
1224 store,
1225 selector,
1226 governor: Arc::new(DefaultGovernor::default()),
1227 economics: Arc::new(Mutex::new(EvuLedger::default())),
1228 remote_publishers: Arc::new(Mutex::new(BTreeMap::new())),
1229 stake_policy: StakePolicy::default(),
1230 sandbox_policy: SandboxPolicy::oris_default(),
1231 validation_plan: ValidationPlan::oris_default(),
1232 }
1233 }
1234
1235 pub fn with_selector(mut self, selector: Arc<dyn Selector>) -> Self {
1236 self.selector = selector;
1237 self
1238 }
1239
1240 pub fn with_sandbox_policy(mut self, policy: SandboxPolicy) -> Self {
1241 self.sandbox_policy = policy;
1242 self
1243 }
1244
1245 pub fn with_governor(mut self, governor: Arc<dyn Governor>) -> Self {
1246 self.governor = governor;
1247 self
1248 }
1249
1250 pub fn with_economics(mut self, economics: Arc<Mutex<EvuLedger>>) -> Self {
1251 self.economics = economics;
1252 self
1253 }
1254
1255 pub fn with_stake_policy(mut self, policy: StakePolicy) -> Self {
1256 self.stake_policy = policy;
1257 self
1258 }
1259
1260 pub fn with_validation_plan(mut self, plan: ValidationPlan) -> Self {
1261 self.validation_plan = plan;
1262 self
1263 }
1264
1265 pub fn select_candidates(&self, input: &SelectorInput) -> Vec<GeneCandidate> {
1266 let mut candidates = self.selector.select(input);
1267 candidates.sort_by(|left, right| {
1268 right
1269 .score
1270 .partial_cmp(&left.score)
1271 .unwrap_or(std::cmp::Ordering::Equal)
1272 .then_with(|| left.gene.id.cmp(&right.gene.id))
1273 });
1274 candidates.truncate(input.limit.max(1));
1275 candidates
1276 }
1277
1278 pub fn bootstrap_if_empty(&self, run_id: &RunId) -> Result<BootstrapReport, EvoKernelError> {
1279 let projection = self.store.rebuild_projection().map_err(store_err)?;
1280 if !projection.genes.is_empty() {
1281 return Ok(BootstrapReport::default());
1282 }
1283
1284 let templates = built_in_seed_templates();
1285 for template in &templates {
1286 let mutation = build_seed_mutation(template);
1287 let extracted = extract_seed_signals(template);
1288 let gene = build_bootstrap_gene(template, &extracted)
1289 .map_err(|err| EvoKernelError::Validation(err.to_string()))?;
1290 let capsule = build_bootstrap_capsule(run_id, template, &mutation, &gene)
1291 .map_err(|err| EvoKernelError::Validation(err.to_string()))?;
1292
1293 self.store
1294 .append_event(EvolutionEvent::MutationDeclared {
1295 mutation: mutation.clone(),
1296 })
1297 .map_err(store_err)?;
1298 self.store
1299 .append_event(EvolutionEvent::SignalsExtracted {
1300 mutation_id: mutation.intent.id.clone(),
1301 hash: extracted.hash.clone(),
1302 signals: extracted.values.clone(),
1303 })
1304 .map_err(store_err)?;
1305 self.store
1306 .append_event(EvolutionEvent::GeneProjected { gene: gene.clone() })
1307 .map_err(store_err)?;
1308 self.store
1309 .append_event(EvolutionEvent::PromotionEvaluated {
1310 gene_id: gene.id.clone(),
1311 state: AssetState::Quarantined,
1312 reason: "bootstrap seeds require local validation before replay".into(),
1313 })
1314 .map_err(store_err)?;
1315 self.store
1316 .append_event(EvolutionEvent::CapsuleCommitted {
1317 capsule: capsule.clone(),
1318 })
1319 .map_err(store_err)?;
1320 self.store
1321 .append_event(EvolutionEvent::CapsuleQuarantined {
1322 capsule_id: capsule.id,
1323 })
1324 .map_err(store_err)?;
1325 }
1326
1327 Ok(BootstrapReport {
1328 seeded: true,
1329 genes_added: templates.len(),
1330 capsules_added: templates.len(),
1331 })
1332 }
1333
1334 pub async fn capture_successful_mutation(
1335 &self,
1336 run_id: &RunId,
1337 mutation: PreparedMutation,
1338 ) -> Result<Capsule, EvoKernelError> {
1339 Ok(self
1340 .capture_mutation_with_governor(run_id, mutation)
1341 .await?
1342 .capsule)
1343 }
1344
1345 pub async fn capture_mutation_with_governor(
1346 &self,
1347 run_id: &RunId,
1348 mutation: PreparedMutation,
1349 ) -> Result<CaptureOutcome, EvoKernelError> {
1350 self.store
1351 .append_event(EvolutionEvent::MutationDeclared {
1352 mutation: mutation.clone(),
1353 })
1354 .map_err(store_err)?;
1355
1356 let receipt = match self.sandbox.apply(&mutation, &self.sandbox_policy).await {
1357 Ok(receipt) => receipt,
1358 Err(err) => {
1359 self.store
1360 .append_event(EvolutionEvent::MutationRejected {
1361 mutation_id: mutation.intent.id.clone(),
1362 reason: err.to_string(),
1363 })
1364 .map_err(store_err)?;
1365 return Err(EvoKernelError::Sandbox(err.to_string()));
1366 }
1367 };
1368
1369 self.store
1370 .append_event(EvolutionEvent::MutationApplied {
1371 mutation_id: mutation.intent.id.clone(),
1372 patch_hash: receipt.patch_hash.clone(),
1373 changed_files: receipt
1374 .changed_files
1375 .iter()
1376 .map(|path| path.to_string_lossy().to_string())
1377 .collect(),
1378 })
1379 .map_err(store_err)?;
1380
1381 let report = self
1382 .validator
1383 .run(&receipt, &self.validation_plan)
1384 .await
1385 .map_err(|err| EvoKernelError::Validation(err.to_string()))?;
1386 if !report.success {
1387 self.store
1388 .append_event(EvolutionEvent::ValidationFailed {
1389 mutation_id: mutation.intent.id.clone(),
1390 report: report.to_snapshot(&self.validation_plan.profile),
1391 gene_id: None,
1392 })
1393 .map_err(store_err)?;
1394 return Err(EvoKernelError::ValidationFailed(report));
1395 }
1396
1397 self.store
1398 .append_event(EvolutionEvent::ValidationPassed {
1399 mutation_id: mutation.intent.id.clone(),
1400 report: report.to_snapshot(&self.validation_plan.profile),
1401 gene_id: None,
1402 })
1403 .map_err(store_err)?;
1404
1405 let extracted_signals = extract_deterministic_signals(&SignalExtractionInput {
1406 patch_diff: mutation.artifact.payload.clone(),
1407 intent: mutation.intent.intent.clone(),
1408 expected_effect: mutation.intent.expected_effect.clone(),
1409 declared_signals: mutation.intent.signals.clone(),
1410 changed_files: receipt
1411 .changed_files
1412 .iter()
1413 .map(|path| path.to_string_lossy().to_string())
1414 .collect(),
1415 validation_success: report.success,
1416 validation_logs: report.logs.clone(),
1417 stage_outputs: report
1418 .stages
1419 .iter()
1420 .flat_map(|stage| [stage.stdout.clone(), stage.stderr.clone()])
1421 .filter(|value| !value.is_empty())
1422 .collect(),
1423 });
1424 self.store
1425 .append_event(EvolutionEvent::SignalsExtracted {
1426 mutation_id: mutation.intent.id.clone(),
1427 hash: extracted_signals.hash.clone(),
1428 signals: extracted_signals.values.clone(),
1429 })
1430 .map_err(store_err)?;
1431
1432 let projection = self.store.rebuild_projection().map_err(store_err)?;
1433 let blast_radius = compute_blast_radius(&mutation.artifact.payload);
1434 let recent_mutation_ages_secs = self
1435 .recent_prior_mutation_ages_secs(Some(mutation.intent.id.as_str()))
1436 .map_err(store_err)?;
1437 let mut gene = derive_gene(
1438 &mutation,
1439 &receipt,
1440 &self.validation_plan.profile,
1441 &extracted_signals.values,
1442 );
1443 let (current_confidence, historical_peak_confidence, confidence_last_updated_secs) =
1444 StoreReplayExecutor::confidence_context(&projection, &gene.id);
1445 let success_count = projection
1446 .genes
1447 .iter()
1448 .find(|existing| existing.id == gene.id)
1449 .map(|existing| {
1450 projection
1451 .capsules
1452 .iter()
1453 .filter(|capsule| capsule.gene_id == existing.id)
1454 .count() as u64
1455 })
1456 .unwrap_or(0)
1457 + 1;
1458 let governor_decision = self.governor.evaluate(GovernorInput {
1459 candidate_source: CandidateSource::Local,
1460 success_count,
1461 blast_radius: blast_radius.clone(),
1462 replay_failures: 0,
1463 recent_mutation_ages_secs,
1464 current_confidence,
1465 historical_peak_confidence,
1466 confidence_last_updated_secs,
1467 });
1468
1469 gene.state = governor_decision.target_state.clone();
1470 self.store
1471 .append_event(EvolutionEvent::GeneProjected { gene: gene.clone() })
1472 .map_err(store_err)?;
1473 self.store
1474 .append_event(EvolutionEvent::PromotionEvaluated {
1475 gene_id: gene.id.clone(),
1476 state: governor_decision.target_state.clone(),
1477 reason: governor_decision.reason.clone(),
1478 })
1479 .map_err(store_err)?;
1480 if matches!(governor_decision.target_state, AssetState::Promoted) {
1481 self.store
1482 .append_event(EvolutionEvent::GenePromoted {
1483 gene_id: gene.id.clone(),
1484 })
1485 .map_err(store_err)?;
1486 }
1487 if matches!(governor_decision.target_state, AssetState::Revoked) {
1488 self.store
1489 .append_event(EvolutionEvent::GeneRevoked {
1490 gene_id: gene.id.clone(),
1491 reason: governor_decision.reason.clone(),
1492 })
1493 .map_err(store_err)?;
1494 }
1495 if let Some(spec_id) = &mutation.intent.spec_id {
1496 self.store
1497 .append_event(EvolutionEvent::SpecLinked {
1498 mutation_id: mutation.intent.id.clone(),
1499 spec_id: spec_id.clone(),
1500 })
1501 .map_err(store_err)?;
1502 }
1503
1504 let mut capsule = build_capsule(
1505 run_id,
1506 &mutation,
1507 &receipt,
1508 &report,
1509 &self.validation_plan.profile,
1510 &gene,
1511 &blast_radius,
1512 )
1513 .map_err(|err| EvoKernelError::Validation(err.to_string()))?;
1514 capsule.state = governor_decision.target_state.clone();
1515 self.store
1516 .append_event(EvolutionEvent::CapsuleCommitted {
1517 capsule: capsule.clone(),
1518 })
1519 .map_err(store_err)?;
1520 if matches!(governor_decision.target_state, AssetState::Quarantined) {
1521 self.store
1522 .append_event(EvolutionEvent::CapsuleQuarantined {
1523 capsule_id: capsule.id.clone(),
1524 })
1525 .map_err(store_err)?;
1526 }
1527
1528 Ok(CaptureOutcome {
1529 capsule,
1530 gene,
1531 governor_decision,
1532 })
1533 }
1534
1535 pub async fn capture_from_proposal(
1536 &self,
1537 run_id: &RunId,
1538 proposal: &AgentMutationProposal,
1539 diff_payload: String,
1540 base_revision: Option<String>,
1541 ) -> Result<CaptureOutcome, EvoKernelError> {
1542 let intent = MutationIntent {
1543 id: next_id("proposal"),
1544 intent: proposal.intent.clone(),
1545 target: MutationTarget::Paths {
1546 allow: proposal.files.clone(),
1547 },
1548 expected_effect: proposal.expected_effect.clone(),
1549 risk: RiskLevel::Low,
1550 signals: proposal.files.clone(),
1551 spec_id: None,
1552 };
1553 self.capture_mutation_with_governor(
1554 run_id,
1555 prepare_mutation(intent, diff_payload, base_revision),
1556 )
1557 .await
1558 }
1559
1560 pub fn feedback_for_agent(outcome: &CaptureOutcome) -> ExecutionFeedback {
1561 ExecutionFeedback {
1562 accepted: !matches!(outcome.governor_decision.target_state, AssetState::Revoked),
1563 asset_state: Some(format!("{:?}", outcome.governor_decision.target_state)),
1564 summary: outcome.governor_decision.reason.clone(),
1565 }
1566 }
1567
1568 pub fn coordinate(&self, plan: CoordinationPlan) -> CoordinationResult {
1569 MultiAgentCoordinator::new().coordinate(plan)
1570 }
1571
1572 pub fn export_promoted_assets(
1573 &self,
1574 sender_id: impl Into<String>,
1575 ) -> Result<EvolutionEnvelope, EvoKernelError> {
1576 let sender_id = sender_id.into();
1577 let envelope = export_promoted_assets_from_store(self.store.as_ref(), sender_id.clone())?;
1578 if !envelope.assets.is_empty() {
1579 let mut ledger = self
1580 .economics
1581 .lock()
1582 .map_err(|_| EvoKernelError::Validation("economics ledger lock poisoned".into()))?;
1583 if ledger
1584 .reserve_publish_stake(&sender_id, &self.stake_policy)
1585 .is_none()
1586 {
1587 return Err(EvoKernelError::Validation(
1588 "insufficient EVU for remote publish".into(),
1589 ));
1590 }
1591 }
1592 Ok(envelope)
1593 }
1594
1595 pub fn import_remote_envelope(
1596 &self,
1597 envelope: &EvolutionEnvelope,
1598 ) -> Result<ImportOutcome, EvoKernelError> {
1599 import_remote_envelope_into_store(
1600 self.store.as_ref(),
1601 envelope,
1602 Some(self.remote_publishers.as_ref()),
1603 )
1604 }
1605
1606 pub fn fetch_assets(
1607 &self,
1608 responder_id: impl Into<String>,
1609 query: &FetchQuery,
1610 ) -> Result<FetchResponse, EvoKernelError> {
1611 fetch_assets_from_store(self.store.as_ref(), responder_id, query)
1612 }
1613
1614 pub fn revoke_assets(&self, notice: &RevokeNotice) -> Result<RevokeNotice, EvoKernelError> {
1615 revoke_assets_in_store(self.store.as_ref(), notice)
1616 }
1617
1618 pub async fn replay_or_fallback(
1619 &self,
1620 input: SelectorInput,
1621 ) -> Result<ReplayDecision, EvoKernelError> {
1622 let replay_run_id = next_id("replay");
1623 self.replay_or_fallback_for_run(&replay_run_id, input).await
1624 }
1625
1626 pub async fn replay_or_fallback_for_run(
1627 &self,
1628 run_id: &RunId,
1629 input: SelectorInput,
1630 ) -> Result<ReplayDecision, EvoKernelError> {
1631 let executor = StoreReplayExecutor {
1632 sandbox: self.sandbox.clone(),
1633 validator: self.validator.clone(),
1634 store: self.store.clone(),
1635 selector: self.selector.clone(),
1636 governor: self.governor.clone(),
1637 economics: Some(self.economics.clone()),
1638 remote_publishers: Some(self.remote_publishers.clone()),
1639 stake_policy: self.stake_policy.clone(),
1640 };
1641 executor
1642 .try_replay_for_run(run_id, &input, &self.sandbox_policy, &self.validation_plan)
1643 .await
1644 .map_err(|err| EvoKernelError::Validation(err.to_string()))
1645 }
1646
1647 pub fn economics_signal(&self, node_id: &str) -> Option<EconomicsSignal> {
1648 self.economics.lock().ok()?.governor_signal(node_id)
1649 }
1650
1651 pub fn selector_reputation_bias(&self) -> BTreeMap<String, f32> {
1652 self.economics
1653 .lock()
1654 .ok()
1655 .map(|locked| locked.selector_reputation_bias())
1656 .unwrap_or_default()
1657 }
1658
1659 pub fn metrics_snapshot(&self) -> Result<EvolutionMetricsSnapshot, EvoKernelError> {
1660 evolution_metrics_snapshot(self.store.as_ref())
1661 }
1662
1663 pub fn render_metrics_prometheus(&self) -> Result<String, EvoKernelError> {
1664 self.metrics_snapshot().map(|snapshot| {
1665 let health = evolution_health_snapshot(&snapshot);
1666 render_evolution_metrics_prometheus(&snapshot, &health)
1667 })
1668 }
1669
1670 pub fn health_snapshot(&self) -> Result<EvolutionHealthSnapshot, EvoKernelError> {
1671 self.metrics_snapshot()
1672 .map(|snapshot| evolution_health_snapshot(&snapshot))
1673 }
1674}
1675
1676pub fn prepare_mutation(
1677 intent: MutationIntent,
1678 diff_payload: String,
1679 base_revision: Option<String>,
1680) -> PreparedMutation {
1681 PreparedMutation {
1682 intent,
1683 artifact: MutationArtifact {
1684 encoding: ArtifactEncoding::UnifiedDiff,
1685 content_hash: compute_artifact_hash(&diff_payload),
1686 payload: diff_payload,
1687 base_revision,
1688 },
1689 }
1690}
1691
1692pub fn prepare_mutation_from_spec(
1693 plan: CompiledMutationPlan,
1694 diff_payload: String,
1695 base_revision: Option<String>,
1696) -> PreparedMutation {
1697 prepare_mutation(plan.mutation_intent, diff_payload, base_revision)
1698}
1699
1700pub fn default_evolution_store() -> Arc<dyn EvolutionStore> {
1701 Arc::new(oris_evolution::JsonlEvolutionStore::new(
1702 default_store_root(),
1703 ))
1704}
1705
1706fn built_in_seed_templates() -> Vec<SeedTemplate> {
1707 vec![
1708 SeedTemplate {
1709 id: "bootstrap-readme".into(),
1710 intent: "Seed a baseline README recovery pattern".into(),
1711 signals: vec!["bootstrap readme".into(), "missing readme".into()],
1712 diff_payload: "\
1713diff --git a/README.md b/README.md
1714new file mode 100644
1715index 0000000..1111111
1716--- /dev/null
1717+++ b/README.md
1718@@ -0,0 +1,3 @@
1719+# Oris
1720+Bootstrap documentation seed
1721+"
1722 .into(),
1723 validation_profile: "bootstrap-seed".into(),
1724 },
1725 SeedTemplate {
1726 id: "bootstrap-test-fix".into(),
1727 intent: "Seed a deterministic test stabilization pattern".into(),
1728 signals: vec!["bootstrap test fix".into(), "failing tests".into()],
1729 diff_payload: "\
1730diff --git a/src/lib.rs b/src/lib.rs
1731index 1111111..2222222 100644
1732--- a/src/lib.rs
1733+++ b/src/lib.rs
1734@@ -1 +1,2 @@
1735 pub fn demo() -> usize { 1 }
1736+pub fn normalize_test_output() -> bool { true }
1737"
1738 .into(),
1739 validation_profile: "bootstrap-seed".into(),
1740 },
1741 SeedTemplate {
1742 id: "bootstrap-refactor".into(),
1743 intent: "Seed a low-risk refactor capsule".into(),
1744 signals: vec!["bootstrap refactor".into(), "small refactor".into()],
1745 diff_payload: "\
1746diff --git a/src/lib.rs b/src/lib.rs
1747index 2222222..3333333 100644
1748--- a/src/lib.rs
1749+++ b/src/lib.rs
1750@@ -1 +1,3 @@
1751 pub fn demo() -> usize { 1 }
1752+
1753+fn extract_strategy_key(input: &str) -> &str { input }
1754"
1755 .into(),
1756 validation_profile: "bootstrap-seed".into(),
1757 },
1758 SeedTemplate {
1759 id: "bootstrap-logging".into(),
1760 intent: "Seed a baseline structured logging mutation".into(),
1761 signals: vec!["bootstrap logging".into(), "structured logs".into()],
1762 diff_payload: "\
1763diff --git a/src/lib.rs b/src/lib.rs
1764index 3333333..4444444 100644
1765--- a/src/lib.rs
1766+++ b/src/lib.rs
1767@@ -1 +1,3 @@
1768 pub fn demo() -> usize { 1 }
1769+
1770+fn emit_bootstrap_log() { println!(\"bootstrap-log\"); }
1771"
1772 .into(),
1773 validation_profile: "bootstrap-seed".into(),
1774 },
1775 ]
1776}
1777
1778fn build_seed_mutation(template: &SeedTemplate) -> PreparedMutation {
1779 let changed_files = seed_changed_files(&template.diff_payload);
1780 let target = if changed_files.is_empty() {
1781 MutationTarget::WorkspaceRoot
1782 } else {
1783 MutationTarget::Paths {
1784 allow: changed_files,
1785 }
1786 };
1787 prepare_mutation(
1788 MutationIntent {
1789 id: stable_hash_json(&("bootstrap-mutation", &template.id))
1790 .unwrap_or_else(|_| format!("bootstrap-mutation-{}", template.id)),
1791 intent: template.intent.clone(),
1792 target,
1793 expected_effect: format!("seed {}", template.id),
1794 risk: RiskLevel::Low,
1795 signals: template.signals.clone(),
1796 spec_id: None,
1797 },
1798 template.diff_payload.clone(),
1799 None,
1800 )
1801}
1802
1803fn extract_seed_signals(template: &SeedTemplate) -> SignalExtractionOutput {
1804 let mut signals = BTreeSet::new();
1805 for declared in &template.signals {
1806 if let Some(phrase) = normalize_signal_phrase(declared) {
1807 signals.insert(phrase);
1808 }
1809 extend_signal_tokens(&mut signals, declared);
1810 }
1811 extend_signal_tokens(&mut signals, &template.intent);
1812 extend_signal_tokens(&mut signals, &template.diff_payload);
1813 for changed_file in seed_changed_files(&template.diff_payload) {
1814 extend_signal_tokens(&mut signals, &changed_file);
1815 }
1816 let values = signals.into_iter().take(32).collect::<Vec<_>>();
1817 let hash =
1818 stable_hash_json(&values).unwrap_or_else(|_| compute_artifact_hash(&values.join("\n")));
1819 SignalExtractionOutput { values, hash }
1820}
1821
1822fn seed_changed_files(diff_payload: &str) -> Vec<String> {
1823 let mut changed_files = BTreeSet::new();
1824 for line in diff_payload.lines() {
1825 if let Some(path) = line.strip_prefix("+++ b/") {
1826 let normalized = path.trim();
1827 if !normalized.is_empty() {
1828 changed_files.insert(normalized.to_string());
1829 }
1830 }
1831 }
1832 changed_files.into_iter().collect()
1833}
1834
1835fn build_bootstrap_gene(
1836 template: &SeedTemplate,
1837 extracted: &SignalExtractionOutput,
1838) -> Result<Gene, EvolutionError> {
1839 let strategy = vec![template.id.clone(), "bootstrap".into()];
1840 let id = stable_hash_json(&(
1841 "bootstrap-gene",
1842 &template.id,
1843 &extracted.values,
1844 &template.validation_profile,
1845 ))?;
1846 Ok(Gene {
1847 id,
1848 signals: extracted.values.clone(),
1849 strategy,
1850 validation: vec![template.validation_profile.clone()],
1851 state: AssetState::Quarantined,
1852 })
1853}
1854
1855fn build_bootstrap_capsule(
1856 run_id: &RunId,
1857 template: &SeedTemplate,
1858 mutation: &PreparedMutation,
1859 gene: &Gene,
1860) -> Result<Capsule, EvolutionError> {
1861 let cwd = std::env::current_dir().unwrap_or_else(|_| Path::new(".").to_path_buf());
1862 let env = current_env_fingerprint(&cwd);
1863 let diff_hash = mutation.artifact.content_hash.clone();
1864 let changed_files = seed_changed_files(&template.diff_payload);
1865 let validator_hash = stable_hash_json(&(
1866 "bootstrap-validator",
1867 &template.id,
1868 &template.validation_profile,
1869 &diff_hash,
1870 ))?;
1871 let id = stable_hash_json(&(
1872 "bootstrap-capsule",
1873 &template.id,
1874 run_id,
1875 &gene.id,
1876 &diff_hash,
1877 &env,
1878 ))?;
1879 Ok(Capsule {
1880 id,
1881 gene_id: gene.id.clone(),
1882 mutation_id: mutation.intent.id.clone(),
1883 run_id: run_id.clone(),
1884 diff_hash,
1885 confidence: 0.0,
1886 env,
1887 outcome: Outcome {
1888 success: false,
1889 validation_profile: template.validation_profile.clone(),
1890 validation_duration_ms: 0,
1891 changed_files,
1892 validator_hash,
1893 lines_changed: compute_blast_radius(&template.diff_payload).lines_changed,
1894 replay_verified: false,
1895 },
1896 state: AssetState::Quarantined,
1897 })
1898}
1899
1900fn derive_gene(
1901 mutation: &PreparedMutation,
1902 receipt: &SandboxReceipt,
1903 validation_profile: &str,
1904 extracted_signals: &[String],
1905) -> Gene {
1906 let mut strategy = BTreeSet::new();
1907 for file in &receipt.changed_files {
1908 if let Some(component) = file.components().next() {
1909 strategy.insert(component.as_os_str().to_string_lossy().to_string());
1910 }
1911 }
1912 for token in mutation
1913 .artifact
1914 .payload
1915 .split(|ch: char| !ch.is_ascii_alphanumeric())
1916 {
1917 if token.len() == 5
1918 && token.starts_with('E')
1919 && token[1..].chars().all(|ch| ch.is_ascii_digit())
1920 {
1921 strategy.insert(token.to_string());
1922 }
1923 }
1924 for token in mutation.intent.intent.split_whitespace().take(8) {
1925 strategy.insert(token.to_ascii_lowercase());
1926 }
1927 let strategy = strategy.into_iter().collect::<Vec<_>>();
1928 let id = stable_hash_json(&(extracted_signals, &strategy, validation_profile))
1929 .unwrap_or_else(|_| next_id("gene"));
1930 Gene {
1931 id,
1932 signals: extracted_signals.to_vec(),
1933 strategy,
1934 validation: vec![validation_profile.to_string()],
1935 state: AssetState::Promoted,
1936 }
1937}
1938
1939fn build_capsule(
1940 run_id: &RunId,
1941 mutation: &PreparedMutation,
1942 receipt: &SandboxReceipt,
1943 report: &ValidationReport,
1944 validation_profile: &str,
1945 gene: &Gene,
1946 blast_radius: &BlastRadius,
1947) -> Result<Capsule, EvolutionError> {
1948 let env = current_env_fingerprint(&receipt.workdir);
1949 let validator_hash = stable_hash_json(report)?;
1950 let diff_hash = mutation.artifact.content_hash.clone();
1951 let id = stable_hash_json(&(run_id, &gene.id, &diff_hash, &mutation.intent.id))?;
1952 Ok(Capsule {
1953 id,
1954 gene_id: gene.id.clone(),
1955 mutation_id: mutation.intent.id.clone(),
1956 run_id: run_id.clone(),
1957 diff_hash,
1958 confidence: 0.7,
1959 env,
1960 outcome: oris_evolution::Outcome {
1961 success: true,
1962 validation_profile: validation_profile.to_string(),
1963 validation_duration_ms: report.duration_ms,
1964 changed_files: receipt
1965 .changed_files
1966 .iter()
1967 .map(|path| path.to_string_lossy().to_string())
1968 .collect(),
1969 validator_hash,
1970 lines_changed: blast_radius.lines_changed,
1971 replay_verified: false,
1972 },
1973 state: AssetState::Promoted,
1974 })
1975}
1976
1977fn current_env_fingerprint(workdir: &Path) -> EnvFingerprint {
1978 let rustc_version = Command::new("rustc")
1979 .arg("--version")
1980 .output()
1981 .ok()
1982 .filter(|output| output.status.success())
1983 .map(|output| String::from_utf8_lossy(&output.stdout).trim().to_string())
1984 .unwrap_or_else(|| "rustc unknown".into());
1985 let cargo_lock_hash = fs::read(workdir.join("Cargo.lock"))
1986 .ok()
1987 .map(|bytes| {
1988 let value = String::from_utf8_lossy(&bytes);
1989 compute_artifact_hash(&value)
1990 })
1991 .unwrap_or_else(|| "missing-cargo-lock".into());
1992 let target_triple = format!(
1993 "{}-unknown-{}",
1994 std::env::consts::ARCH,
1995 std::env::consts::OS
1996 );
1997 EnvFingerprint {
1998 rustc_version,
1999 cargo_lock_hash,
2000 target_triple,
2001 os: std::env::consts::OS.to_string(),
2002 }
2003}
2004
2005fn extend_signal_tokens(out: &mut BTreeSet<String>, input: &str) {
2006 for raw in input.split(|ch: char| !ch.is_ascii_alphanumeric()) {
2007 let trimmed = raw.trim();
2008 if trimmed.is_empty() {
2009 continue;
2010 }
2011 let normalized = if is_rust_error_code(trimmed) {
2012 let mut chars = trimmed.chars();
2013 let prefix = chars
2014 .next()
2015 .map(|ch| ch.to_ascii_uppercase())
2016 .unwrap_or('E');
2017 format!("{prefix}{}", chars.as_str())
2018 } else {
2019 trimmed.to_ascii_lowercase()
2020 };
2021 if normalized.len() < 3 {
2022 continue;
2023 }
2024 out.insert(normalized);
2025 }
2026}
2027
2028fn normalize_signal_phrase(input: &str) -> Option<String> {
2029 let normalized = input
2030 .split(|ch: char| !ch.is_ascii_alphanumeric())
2031 .filter_map(|raw| {
2032 let trimmed = raw.trim();
2033 if trimmed.is_empty() {
2034 return None;
2035 }
2036 let normalized = if is_rust_error_code(trimmed) {
2037 let mut chars = trimmed.chars();
2038 let prefix = chars
2039 .next()
2040 .map(|ch| ch.to_ascii_uppercase())
2041 .unwrap_or('E');
2042 format!("{prefix}{}", chars.as_str())
2043 } else {
2044 trimmed.to_ascii_lowercase()
2045 };
2046 if normalized.len() < 3 {
2047 None
2048 } else {
2049 Some(normalized)
2050 }
2051 })
2052 .collect::<Vec<_>>()
2053 .join(" ");
2054 if normalized.is_empty() {
2055 None
2056 } else {
2057 Some(normalized)
2058 }
2059}
2060
2061fn is_rust_error_code(value: &str) -> bool {
2062 value.len() == 5
2063 && matches!(value.as_bytes().first(), Some(b'e') | Some(b'E'))
2064 && value[1..].chars().all(|ch| ch.is_ascii_digit())
2065}
2066
2067fn find_declared_mutation(
2068 store: &dyn EvolutionStore,
2069 mutation_id: &MutationId,
2070) -> Result<Option<PreparedMutation>, EvolutionError> {
2071 for stored in store.scan(1)? {
2072 if let EvolutionEvent::MutationDeclared { mutation } = stored.event {
2073 if &mutation.intent.id == mutation_id {
2074 return Ok(Some(mutation));
2075 }
2076 }
2077 }
2078 Ok(None)
2079}
2080
2081fn exact_match_candidates(store: &dyn EvolutionStore, input: &SelectorInput) -> Vec<GeneCandidate> {
2082 let Ok(projection) = store.rebuild_projection() else {
2083 return Vec::new();
2084 };
2085 let capsules = projection.capsules.clone();
2086 let spec_ids_by_gene = projection.spec_ids_by_gene.clone();
2087 let requested_spec_id = input
2088 .spec_id
2089 .as_deref()
2090 .map(str::trim)
2091 .filter(|value| !value.is_empty());
2092 let signal_set = input
2093 .signals
2094 .iter()
2095 .map(|signal| signal.to_ascii_lowercase())
2096 .collect::<BTreeSet<_>>();
2097 let mut candidates = projection
2098 .genes
2099 .into_iter()
2100 .filter_map(|gene| {
2101 if gene.state != AssetState::Promoted {
2102 return None;
2103 }
2104 if let Some(spec_id) = requested_spec_id {
2105 let matches_spec = spec_ids_by_gene
2106 .get(&gene.id)
2107 .map(|values| {
2108 values
2109 .iter()
2110 .any(|value| value.eq_ignore_ascii_case(spec_id))
2111 })
2112 .unwrap_or(false);
2113 if !matches_spec {
2114 return None;
2115 }
2116 }
2117 let gene_signals = gene
2118 .signals
2119 .iter()
2120 .map(|signal| signal.to_ascii_lowercase())
2121 .collect::<BTreeSet<_>>();
2122 if gene_signals == signal_set {
2123 let mut matched_capsules = capsules
2124 .iter()
2125 .filter(|capsule| {
2126 capsule.gene_id == gene.id && capsule.state == AssetState::Promoted
2127 })
2128 .cloned()
2129 .collect::<Vec<_>>();
2130 matched_capsules.sort_by(|left, right| {
2131 replay_environment_match_factor(&input.env, &right.env)
2132 .partial_cmp(&replay_environment_match_factor(&input.env, &left.env))
2133 .unwrap_or(std::cmp::Ordering::Equal)
2134 .then_with(|| {
2135 right
2136 .confidence
2137 .partial_cmp(&left.confidence)
2138 .unwrap_or(std::cmp::Ordering::Equal)
2139 })
2140 .then_with(|| left.id.cmp(&right.id))
2141 });
2142 if matched_capsules.is_empty() {
2143 None
2144 } else {
2145 let score = matched_capsules
2146 .first()
2147 .map(|capsule| replay_environment_match_factor(&input.env, &capsule.env))
2148 .unwrap_or(0.0);
2149 Some(GeneCandidate {
2150 gene,
2151 score,
2152 capsules: matched_capsules,
2153 })
2154 }
2155 } else {
2156 None
2157 }
2158 })
2159 .collect::<Vec<_>>();
2160 candidates.sort_by(|left, right| {
2161 right
2162 .score
2163 .partial_cmp(&left.score)
2164 .unwrap_or(std::cmp::Ordering::Equal)
2165 .then_with(|| left.gene.id.cmp(&right.gene.id))
2166 });
2167 candidates
2168}
2169
2170fn quarantined_remote_exact_match_candidates(
2171 store: &dyn EvolutionStore,
2172 input: &SelectorInput,
2173) -> Vec<GeneCandidate> {
2174 let remote_asset_ids = store
2175 .scan(1)
2176 .ok()
2177 .map(|events| {
2178 events
2179 .into_iter()
2180 .filter_map(|stored| match stored.event {
2181 EvolutionEvent::RemoteAssetImported {
2182 source: CandidateSource::Remote,
2183 asset_ids,
2184 } => Some(asset_ids),
2185 _ => None,
2186 })
2187 .flatten()
2188 .collect::<BTreeSet<_>>()
2189 })
2190 .unwrap_or_default();
2191 if remote_asset_ids.is_empty() {
2192 return Vec::new();
2193 }
2194
2195 let Ok(projection) = store.rebuild_projection() else {
2196 return Vec::new();
2197 };
2198 let capsules = projection.capsules.clone();
2199 let spec_ids_by_gene = projection.spec_ids_by_gene.clone();
2200 let requested_spec_id = input
2201 .spec_id
2202 .as_deref()
2203 .map(str::trim)
2204 .filter(|value| !value.is_empty());
2205 let normalized_signals = input
2206 .signals
2207 .iter()
2208 .filter_map(|signal| normalize_signal_phrase(signal))
2209 .collect::<Vec<_>>();
2210 if normalized_signals.is_empty() {
2211 return Vec::new();
2212 }
2213 let mut candidates = projection
2214 .genes
2215 .into_iter()
2216 .filter_map(|gene| {
2217 if !matches!(gene.state, AssetState::Promoted | AssetState::Quarantined) {
2218 return None;
2219 }
2220 if let Some(spec_id) = requested_spec_id {
2221 let matches_spec = spec_ids_by_gene
2222 .get(&gene.id)
2223 .map(|values| {
2224 values
2225 .iter()
2226 .any(|value| value.eq_ignore_ascii_case(spec_id))
2227 })
2228 .unwrap_or(false);
2229 if !matches_spec {
2230 return None;
2231 }
2232 }
2233 let matched_signal_count = gene
2234 .signals
2235 .iter()
2236 .filter(|candidate| {
2237 normalize_signal_phrase(candidate)
2238 .map(|candidate| {
2239 normalized_signals.iter().any(|signal| {
2240 candidate.contains(signal) || signal.contains(&candidate)
2241 })
2242 })
2243 .unwrap_or(false)
2244 })
2245 .count();
2246 if matched_signal_count == 0 {
2247 return None;
2248 }
2249
2250 let mut matched_capsules = capsules
2251 .iter()
2252 .filter(|capsule| {
2253 capsule.gene_id == gene.id
2254 && capsule.state == AssetState::Quarantined
2255 && remote_asset_ids.contains(&capsule.id)
2256 })
2257 .cloned()
2258 .collect::<Vec<_>>();
2259 matched_capsules.sort_by(|left, right| {
2260 replay_environment_match_factor(&input.env, &right.env)
2261 .partial_cmp(&replay_environment_match_factor(&input.env, &left.env))
2262 .unwrap_or(std::cmp::Ordering::Equal)
2263 .then_with(|| {
2264 right
2265 .confidence
2266 .partial_cmp(&left.confidence)
2267 .unwrap_or(std::cmp::Ordering::Equal)
2268 })
2269 .then_with(|| left.id.cmp(&right.id))
2270 });
2271 if matched_capsules.is_empty() {
2272 None
2273 } else {
2274 let overlap = matched_signal_count as f32 / normalized_signals.len() as f32;
2275 let env_score = matched_capsules
2276 .first()
2277 .map(|capsule| replay_environment_match_factor(&input.env, &capsule.env))
2278 .unwrap_or(0.0);
2279 Some(GeneCandidate {
2280 gene,
2281 score: overlap.max(env_score),
2282 capsules: matched_capsules,
2283 })
2284 }
2285 })
2286 .collect::<Vec<_>>();
2287 candidates.sort_by(|left, right| {
2288 right
2289 .score
2290 .partial_cmp(&left.score)
2291 .unwrap_or(std::cmp::Ordering::Equal)
2292 .then_with(|| left.gene.id.cmp(&right.gene.id))
2293 });
2294 candidates
2295}
2296
2297fn replay_environment_match_factor(input: &EnvFingerprint, candidate: &EnvFingerprint) -> f32 {
2298 let fields = [
2299 input
2300 .rustc_version
2301 .eq_ignore_ascii_case(&candidate.rustc_version),
2302 input
2303 .cargo_lock_hash
2304 .eq_ignore_ascii_case(&candidate.cargo_lock_hash),
2305 input
2306 .target_triple
2307 .eq_ignore_ascii_case(&candidate.target_triple),
2308 input.os.eq_ignore_ascii_case(&candidate.os),
2309 ];
2310 let matched_fields = fields.into_iter().filter(|matched| *matched).count() as f32;
2311 0.5 + ((matched_fields / 4.0) * 0.5)
2312}
2313
2314fn effective_candidate_score(
2315 candidate: &GeneCandidate,
2316 publishers_by_gene: &BTreeMap<String, String>,
2317 reputation_bias: &BTreeMap<String, f32>,
2318) -> f32 {
2319 let bias = publishers_by_gene
2320 .get(&candidate.gene.id)
2321 .and_then(|publisher| reputation_bias.get(publisher))
2322 .copied()
2323 .unwrap_or(0.0)
2324 .clamp(0.0, 1.0);
2325 candidate.score * (1.0 + (bias * 0.1))
2326}
2327
2328fn export_promoted_assets_from_store(
2329 store: &dyn EvolutionStore,
2330 sender_id: impl Into<String>,
2331) -> Result<EvolutionEnvelope, EvoKernelError> {
2332 let projection = store.rebuild_projection().map_err(store_err)?;
2333 let genes = projection
2334 .genes
2335 .into_iter()
2336 .filter(|gene| gene.state == AssetState::Promoted)
2337 .collect::<Vec<_>>();
2338 let capsules = projection
2339 .capsules
2340 .into_iter()
2341 .filter(|capsule| capsule.state == AssetState::Promoted)
2342 .collect::<Vec<_>>();
2343 let assets = replay_export_assets(store, genes, capsules)?;
2344 Ok(EvolutionEnvelope::publish(sender_id, assets))
2345}
2346
2347fn replay_export_assets(
2348 store: &dyn EvolutionStore,
2349 genes: Vec<Gene>,
2350 capsules: Vec<Capsule>,
2351) -> Result<Vec<NetworkAsset>, EvoKernelError> {
2352 let mutation_ids = capsules
2353 .iter()
2354 .map(|capsule| capsule.mutation_id.clone())
2355 .collect::<BTreeSet<_>>();
2356 let mut assets = replay_export_events_for_mutations(store, &mutation_ids)?;
2357 for gene in genes {
2358 assets.push(NetworkAsset::Gene { gene });
2359 }
2360 for capsule in capsules {
2361 assets.push(NetworkAsset::Capsule { capsule });
2362 }
2363 Ok(assets)
2364}
2365
2366fn replay_export_events_for_mutations(
2367 store: &dyn EvolutionStore,
2368 mutation_ids: &BTreeSet<String>,
2369) -> Result<Vec<NetworkAsset>, EvoKernelError> {
2370 if mutation_ids.is_empty() {
2371 return Ok(Vec::new());
2372 }
2373
2374 let mut assets = Vec::new();
2375 let mut seen_mutations = BTreeSet::new();
2376 let mut seen_spec_links = BTreeSet::new();
2377 for stored in store.scan(1).map_err(store_err)? {
2378 match stored.event {
2379 EvolutionEvent::MutationDeclared { mutation }
2380 if mutation_ids.contains(&mutation.intent.id)
2381 && seen_mutations.insert(mutation.intent.id.clone()) =>
2382 {
2383 assets.push(NetworkAsset::EvolutionEvent {
2384 event: EvolutionEvent::MutationDeclared { mutation },
2385 });
2386 }
2387 EvolutionEvent::SpecLinked {
2388 mutation_id,
2389 spec_id,
2390 } if mutation_ids.contains(&mutation_id)
2391 && seen_spec_links.insert((mutation_id.clone(), spec_id.clone())) =>
2392 {
2393 assets.push(NetworkAsset::EvolutionEvent {
2394 event: EvolutionEvent::SpecLinked {
2395 mutation_id,
2396 spec_id,
2397 },
2398 });
2399 }
2400 _ => {}
2401 }
2402 }
2403
2404 Ok(assets)
2405}
2406
2407fn import_remote_envelope_into_store(
2408 store: &dyn EvolutionStore,
2409 envelope: &EvolutionEnvelope,
2410 remote_publishers: Option<&Mutex<BTreeMap<String, String>>>,
2411) -> Result<ImportOutcome, EvoKernelError> {
2412 if !envelope.verify_content_hash() {
2413 return Err(EvoKernelError::Validation(
2414 "invalid evolution envelope hash".into(),
2415 ));
2416 }
2417
2418 let mut imported_asset_ids = Vec::new();
2419 for asset in &envelope.assets {
2420 match asset {
2421 NetworkAsset::Gene { gene } => {
2422 imported_asset_ids.push(gene.id.clone());
2423 let mut quarantined_gene = gene.clone();
2424 quarantined_gene.state = AssetState::Quarantined;
2425 store
2426 .append_event(EvolutionEvent::RemoteAssetImported {
2427 source: CandidateSource::Remote,
2428 asset_ids: vec![gene.id.clone()],
2429 })
2430 .map_err(store_err)?;
2431 store
2432 .append_event(EvolutionEvent::GeneProjected {
2433 gene: quarantined_gene.clone(),
2434 })
2435 .map_err(store_err)?;
2436 record_remote_publisher_for_asset(remote_publishers, &envelope.sender_id, asset);
2437 store
2438 .append_event(EvolutionEvent::PromotionEvaluated {
2439 gene_id: quarantined_gene.id,
2440 state: AssetState::Quarantined,
2441 reason: "remote asset requires local validation before promotion".into(),
2442 })
2443 .map_err(store_err)?;
2444 }
2445 NetworkAsset::Capsule { capsule } => {
2446 imported_asset_ids.push(capsule.id.clone());
2447 store
2448 .append_event(EvolutionEvent::RemoteAssetImported {
2449 source: CandidateSource::Remote,
2450 asset_ids: vec![capsule.id.clone()],
2451 })
2452 .map_err(store_err)?;
2453 let mut quarantined = capsule.clone();
2454 quarantined.state = AssetState::Quarantined;
2455 store
2456 .append_event(EvolutionEvent::CapsuleCommitted {
2457 capsule: quarantined.clone(),
2458 })
2459 .map_err(store_err)?;
2460 record_remote_publisher_for_asset(remote_publishers, &envelope.sender_id, asset);
2461 store
2462 .append_event(EvolutionEvent::CapsuleQuarantined {
2463 capsule_id: quarantined.id,
2464 })
2465 .map_err(store_err)?;
2466 }
2467 NetworkAsset::EvolutionEvent { event } => {
2468 if should_import_remote_event(event) {
2469 store.append_event(event.clone()).map_err(store_err)?;
2470 }
2471 }
2472 }
2473 }
2474
2475 Ok(ImportOutcome {
2476 imported_asset_ids,
2477 accepted: true,
2478 })
2479}
2480
2481fn record_remote_publisher_for_asset(
2482 remote_publishers: Option<&Mutex<BTreeMap<String, String>>>,
2483 sender_id: &str,
2484 asset: &NetworkAsset,
2485) {
2486 let Some(remote_publishers) = remote_publishers else {
2487 return;
2488 };
2489 let sender_id = sender_id.trim();
2490 if sender_id.is_empty() {
2491 return;
2492 }
2493 let Ok(mut publishers) = remote_publishers.lock() else {
2494 return;
2495 };
2496 match asset {
2497 NetworkAsset::Gene { gene } => {
2498 publishers.insert(gene.id.clone(), sender_id.to_string());
2499 }
2500 NetworkAsset::Capsule { capsule } => {
2501 publishers.insert(capsule.gene_id.clone(), sender_id.to_string());
2502 }
2503 NetworkAsset::EvolutionEvent { .. } => {}
2504 }
2505}
2506
2507fn should_import_remote_event(event: &EvolutionEvent) -> bool {
2508 matches!(
2509 event,
2510 EvolutionEvent::MutationDeclared { .. } | EvolutionEvent::SpecLinked { .. }
2511 )
2512}
2513
2514fn fetch_assets_from_store(
2515 store: &dyn EvolutionStore,
2516 responder_id: impl Into<String>,
2517 query: &FetchQuery,
2518) -> Result<FetchResponse, EvoKernelError> {
2519 let projection = store.rebuild_projection().map_err(store_err)?;
2520 let normalized_signals: Vec<String> = query
2521 .signals
2522 .iter()
2523 .map(|signal| signal.trim().to_ascii_lowercase())
2524 .filter(|signal| !signal.is_empty())
2525 .collect();
2526 let matches_any_signal = |candidate: &str| {
2527 if normalized_signals.is_empty() {
2528 return true;
2529 }
2530 let candidate = candidate.to_ascii_lowercase();
2531 normalized_signals
2532 .iter()
2533 .any(|signal| candidate.contains(signal) || signal.contains(&candidate))
2534 };
2535
2536 let matched_genes: Vec<Gene> = projection
2537 .genes
2538 .into_iter()
2539 .filter(|gene| gene.state == AssetState::Promoted)
2540 .filter(|gene| gene.signals.iter().any(|signal| matches_any_signal(signal)))
2541 .collect();
2542 let matched_gene_ids: BTreeSet<String> =
2543 matched_genes.iter().map(|gene| gene.id.clone()).collect();
2544 let matched_capsules: Vec<Capsule> = projection
2545 .capsules
2546 .into_iter()
2547 .filter(|capsule| capsule.state == AssetState::Promoted)
2548 .filter(|capsule| matched_gene_ids.contains(&capsule.gene_id))
2549 .collect();
2550
2551 let assets = replay_export_assets(store, matched_genes, matched_capsules)?;
2552
2553 Ok(FetchResponse {
2554 sender_id: responder_id.into(),
2555 assets,
2556 })
2557}
2558
2559fn revoke_assets_in_store(
2560 store: &dyn EvolutionStore,
2561 notice: &RevokeNotice,
2562) -> Result<RevokeNotice, EvoKernelError> {
2563 let projection = store.rebuild_projection().map_err(store_err)?;
2564 let requested: BTreeSet<String> = notice
2565 .asset_ids
2566 .iter()
2567 .map(|asset_id| asset_id.trim().to_string())
2568 .filter(|asset_id| !asset_id.is_empty())
2569 .collect();
2570 let mut revoked_gene_ids = BTreeSet::new();
2571 let mut quarantined_capsule_ids = BTreeSet::new();
2572
2573 for gene in &projection.genes {
2574 if requested.contains(&gene.id) {
2575 revoked_gene_ids.insert(gene.id.clone());
2576 }
2577 }
2578 for capsule in &projection.capsules {
2579 if requested.contains(&capsule.id) {
2580 quarantined_capsule_ids.insert(capsule.id.clone());
2581 revoked_gene_ids.insert(capsule.gene_id.clone());
2582 }
2583 }
2584 for capsule in &projection.capsules {
2585 if revoked_gene_ids.contains(&capsule.gene_id) {
2586 quarantined_capsule_ids.insert(capsule.id.clone());
2587 }
2588 }
2589
2590 for gene_id in &revoked_gene_ids {
2591 store
2592 .append_event(EvolutionEvent::GeneRevoked {
2593 gene_id: gene_id.clone(),
2594 reason: notice.reason.clone(),
2595 })
2596 .map_err(store_err)?;
2597 }
2598 for capsule_id in &quarantined_capsule_ids {
2599 store
2600 .append_event(EvolutionEvent::CapsuleQuarantined {
2601 capsule_id: capsule_id.clone(),
2602 })
2603 .map_err(store_err)?;
2604 }
2605
2606 let mut affected_ids: Vec<String> = revoked_gene_ids.into_iter().collect();
2607 affected_ids.extend(quarantined_capsule_ids);
2608 affected_ids.sort();
2609 affected_ids.dedup();
2610
2611 Ok(RevokeNotice {
2612 sender_id: notice.sender_id.clone(),
2613 asset_ids: affected_ids,
2614 reason: notice.reason.clone(),
2615 })
2616}
2617
2618fn evolution_metrics_snapshot(
2619 store: &dyn EvolutionStore,
2620) -> Result<EvolutionMetricsSnapshot, EvoKernelError> {
2621 let events = store.scan(1).map_err(store_err)?;
2622 let projection = store.rebuild_projection().map_err(store_err)?;
2623 let replay_success_total = events
2624 .iter()
2625 .filter(|stored| matches!(stored.event, EvolutionEvent::CapsuleReused { .. }))
2626 .count() as u64;
2627 let replay_failures_total = events
2628 .iter()
2629 .filter(|stored| is_replay_validation_failure(&stored.event))
2630 .count() as u64;
2631 let replay_attempts_total = replay_success_total + replay_failures_total;
2632 let mutation_declared_total = events
2633 .iter()
2634 .filter(|stored| matches!(stored.event, EvolutionEvent::MutationDeclared { .. }))
2635 .count() as u64;
2636 let promoted_mutations_total = events
2637 .iter()
2638 .filter(|stored| matches!(stored.event, EvolutionEvent::GenePromoted { .. }))
2639 .count() as u64;
2640 let gene_revocations_total = events
2641 .iter()
2642 .filter(|stored| matches!(stored.event, EvolutionEvent::GeneRevoked { .. }))
2643 .count() as u64;
2644 let cutoff = Utc::now() - Duration::hours(1);
2645 let mutation_velocity_last_hour = count_recent_events(&events, cutoff, |event| {
2646 matches!(event, EvolutionEvent::MutationDeclared { .. })
2647 });
2648 let revoke_frequency_last_hour = count_recent_events(&events, cutoff, |event| {
2649 matches!(event, EvolutionEvent::GeneRevoked { .. })
2650 });
2651 let promoted_genes = projection
2652 .genes
2653 .iter()
2654 .filter(|gene| gene.state == AssetState::Promoted)
2655 .count() as u64;
2656 let promoted_capsules = projection
2657 .capsules
2658 .iter()
2659 .filter(|capsule| capsule.state == AssetState::Promoted)
2660 .count() as u64;
2661
2662 Ok(EvolutionMetricsSnapshot {
2663 replay_attempts_total,
2664 replay_success_total,
2665 replay_success_rate: safe_ratio(replay_success_total, replay_attempts_total),
2666 mutation_declared_total,
2667 promoted_mutations_total,
2668 promotion_ratio: safe_ratio(promoted_mutations_total, mutation_declared_total),
2669 gene_revocations_total,
2670 mutation_velocity_last_hour,
2671 revoke_frequency_last_hour,
2672 promoted_genes,
2673 promoted_capsules,
2674 last_event_seq: events.last().map(|stored| stored.seq).unwrap_or(0),
2675 })
2676}
2677
2678fn evolution_health_snapshot(snapshot: &EvolutionMetricsSnapshot) -> EvolutionHealthSnapshot {
2679 EvolutionHealthSnapshot {
2680 status: "ok".into(),
2681 last_event_seq: snapshot.last_event_seq,
2682 promoted_genes: snapshot.promoted_genes,
2683 promoted_capsules: snapshot.promoted_capsules,
2684 }
2685}
2686
2687fn render_evolution_metrics_prometheus(
2688 snapshot: &EvolutionMetricsSnapshot,
2689 health: &EvolutionHealthSnapshot,
2690) -> String {
2691 let mut out = String::new();
2692 out.push_str(
2693 "# HELP oris_evolution_replay_attempts_total Total replay attempts that reached validation.\n",
2694 );
2695 out.push_str("# TYPE oris_evolution_replay_attempts_total counter\n");
2696 out.push_str(&format!(
2697 "oris_evolution_replay_attempts_total {}\n",
2698 snapshot.replay_attempts_total
2699 ));
2700 out.push_str("# HELP oris_evolution_replay_success_total Total replay attempts that reused a capsule successfully.\n");
2701 out.push_str("# TYPE oris_evolution_replay_success_total counter\n");
2702 out.push_str(&format!(
2703 "oris_evolution_replay_success_total {}\n",
2704 snapshot.replay_success_total
2705 ));
2706 out.push_str("# HELP oris_evolution_replay_success_rate Successful replay attempts divided by replay attempts that reached validation.\n");
2707 out.push_str("# TYPE oris_evolution_replay_success_rate gauge\n");
2708 out.push_str(&format!(
2709 "oris_evolution_replay_success_rate {:.6}\n",
2710 snapshot.replay_success_rate
2711 ));
2712 out.push_str(
2713 "# HELP oris_evolution_mutation_declared_total Total declared mutations recorded in the evolution log.\n",
2714 );
2715 out.push_str("# TYPE oris_evolution_mutation_declared_total counter\n");
2716 out.push_str(&format!(
2717 "oris_evolution_mutation_declared_total {}\n",
2718 snapshot.mutation_declared_total
2719 ));
2720 out.push_str("# HELP oris_evolution_promoted_mutations_total Total mutations promoted by the governor.\n");
2721 out.push_str("# TYPE oris_evolution_promoted_mutations_total counter\n");
2722 out.push_str(&format!(
2723 "oris_evolution_promoted_mutations_total {}\n",
2724 snapshot.promoted_mutations_total
2725 ));
2726 out.push_str(
2727 "# HELP oris_evolution_promotion_ratio Promoted mutations divided by declared mutations.\n",
2728 );
2729 out.push_str("# TYPE oris_evolution_promotion_ratio gauge\n");
2730 out.push_str(&format!(
2731 "oris_evolution_promotion_ratio {:.6}\n",
2732 snapshot.promotion_ratio
2733 ));
2734 out.push_str("# HELP oris_evolution_gene_revocations_total Total gene revocations recorded in the evolution log.\n");
2735 out.push_str("# TYPE oris_evolution_gene_revocations_total counter\n");
2736 out.push_str(&format!(
2737 "oris_evolution_gene_revocations_total {}\n",
2738 snapshot.gene_revocations_total
2739 ));
2740 out.push_str("# HELP oris_evolution_mutation_velocity_last_hour Declared mutations observed in the last hour.\n");
2741 out.push_str("# TYPE oris_evolution_mutation_velocity_last_hour gauge\n");
2742 out.push_str(&format!(
2743 "oris_evolution_mutation_velocity_last_hour {}\n",
2744 snapshot.mutation_velocity_last_hour
2745 ));
2746 out.push_str("# HELP oris_evolution_revoke_frequency_last_hour Gene revocations observed in the last hour.\n");
2747 out.push_str("# TYPE oris_evolution_revoke_frequency_last_hour gauge\n");
2748 out.push_str(&format!(
2749 "oris_evolution_revoke_frequency_last_hour {}\n",
2750 snapshot.revoke_frequency_last_hour
2751 ));
2752 out.push_str("# HELP oris_evolution_promoted_genes Current promoted genes in the evolution projection.\n");
2753 out.push_str("# TYPE oris_evolution_promoted_genes gauge\n");
2754 out.push_str(&format!(
2755 "oris_evolution_promoted_genes {}\n",
2756 snapshot.promoted_genes
2757 ));
2758 out.push_str("# HELP oris_evolution_promoted_capsules Current promoted capsules in the evolution projection.\n");
2759 out.push_str("# TYPE oris_evolution_promoted_capsules gauge\n");
2760 out.push_str(&format!(
2761 "oris_evolution_promoted_capsules {}\n",
2762 snapshot.promoted_capsules
2763 ));
2764 out.push_str("# HELP oris_evolution_store_last_event_seq Last visible append-only evolution event sequence.\n");
2765 out.push_str("# TYPE oris_evolution_store_last_event_seq gauge\n");
2766 out.push_str(&format!(
2767 "oris_evolution_store_last_event_seq {}\n",
2768 snapshot.last_event_seq
2769 ));
2770 out.push_str(
2771 "# HELP oris_evolution_health Evolution observability store health (1 = healthy).\n",
2772 );
2773 out.push_str("# TYPE oris_evolution_health gauge\n");
2774 out.push_str(&format!(
2775 "oris_evolution_health {}\n",
2776 u8::from(health.status == "ok")
2777 ));
2778 out
2779}
2780
2781fn count_recent_events(
2782 events: &[StoredEvolutionEvent],
2783 cutoff: DateTime<Utc>,
2784 predicate: impl Fn(&EvolutionEvent) -> bool,
2785) -> u64 {
2786 events
2787 .iter()
2788 .filter(|stored| {
2789 predicate(&stored.event)
2790 && parse_event_timestamp(&stored.timestamp)
2791 .map(|timestamp| timestamp >= cutoff)
2792 .unwrap_or(false)
2793 })
2794 .count() as u64
2795}
2796
2797fn parse_event_timestamp(raw: &str) -> Option<DateTime<Utc>> {
2798 DateTime::parse_from_rfc3339(raw)
2799 .ok()
2800 .map(|parsed| parsed.with_timezone(&Utc))
2801}
2802
2803fn is_replay_validation_failure(event: &EvolutionEvent) -> bool {
2804 matches!(
2805 event,
2806 EvolutionEvent::ValidationFailed {
2807 gene_id: Some(_),
2808 ..
2809 }
2810 )
2811}
2812
2813fn safe_ratio(numerator: u64, denominator: u64) -> f64 {
2814 if denominator == 0 {
2815 0.0
2816 } else {
2817 numerator as f64 / denominator as f64
2818 }
2819}
2820
2821fn store_err(err: EvolutionError) -> EvoKernelError {
2822 EvoKernelError::Store(err.to_string())
2823}
2824
2825#[cfg(test)]
2826mod tests {
2827 use super::*;
2828 use oris_agent_contract::{
2829 AgentRole, CoordinationPlan, CoordinationPrimitive, CoordinationTask,
2830 };
2831 use oris_kernel::{
2832 AllowAllPolicy, InMemoryEventStore, KernelMode, KernelState, NoopActionExecutor,
2833 NoopStepFn, StateUpdatedOnlyReducer,
2834 };
2835 use serde::{Deserialize, Serialize};
2836
2837 #[derive(Clone, Debug, Default, Serialize, Deserialize)]
2838 struct TestState;
2839
2840 impl KernelState for TestState {
2841 fn version(&self) -> u32 {
2842 1
2843 }
2844 }
2845
2846 fn temp_workspace(name: &str) -> std::path::PathBuf {
2847 let root =
2848 std::env::temp_dir().join(format!("oris-evokernel-{name}-{}", std::process::id()));
2849 if root.exists() {
2850 fs::remove_dir_all(&root).unwrap();
2851 }
2852 fs::create_dir_all(root.join("src")).unwrap();
2853 fs::write(
2854 root.join("Cargo.toml"),
2855 "[package]\nname = \"sample\"\nversion = \"0.1.0\"\nedition = \"2021\"\n",
2856 )
2857 .unwrap();
2858 fs::write(root.join("Cargo.lock"), "# lock\n").unwrap();
2859 fs::write(root.join("src/lib.rs"), "pub fn demo() -> usize { 1 }\n").unwrap();
2860 root
2861 }
2862
2863 fn test_kernel() -> Arc<Kernel<TestState>> {
2864 Arc::new(Kernel::<TestState> {
2865 events: Box::new(InMemoryEventStore::new()),
2866 snaps: None,
2867 reducer: Box::new(StateUpdatedOnlyReducer),
2868 exec: Box::new(NoopActionExecutor),
2869 step: Box::new(NoopStepFn),
2870 policy: Box::new(AllowAllPolicy),
2871 effect_sink: None,
2872 mode: KernelMode::Normal,
2873 })
2874 }
2875
2876 fn lightweight_plan() -> ValidationPlan {
2877 ValidationPlan {
2878 profile: "test".into(),
2879 stages: vec![ValidationStage::Command {
2880 program: "git".into(),
2881 args: vec!["--version".into()],
2882 timeout_ms: 5_000,
2883 }],
2884 }
2885 }
2886
2887 fn sample_mutation() -> PreparedMutation {
2888 prepare_mutation(
2889 MutationIntent {
2890 id: "mutation-1".into(),
2891 intent: "add README".into(),
2892 target: MutationTarget::Paths {
2893 allow: vec!["README.md".into()],
2894 },
2895 expected_effect: "repo still builds".into(),
2896 risk: RiskLevel::Low,
2897 signals: vec!["missing readme".into()],
2898 spec_id: None,
2899 },
2900 "\
2901diff --git a/README.md b/README.md
2902new file mode 100644
2903index 0000000..1111111
2904--- /dev/null
2905+++ b/README.md
2906@@ -0,0 +1 @@
2907+# sample
2908"
2909 .into(),
2910 Some("HEAD".into()),
2911 )
2912 }
2913
2914 fn base_sandbox_policy() -> SandboxPolicy {
2915 SandboxPolicy {
2916 allowed_programs: vec!["git".into()],
2917 max_duration_ms: 60_000,
2918 max_output_bytes: 1024 * 1024,
2919 denied_env_prefixes: Vec::new(),
2920 }
2921 }
2922
2923 fn command_validator() -> Arc<dyn Validator> {
2924 Arc::new(CommandValidator::new(base_sandbox_policy()))
2925 }
2926
2927 fn replay_input(signal: &str) -> SelectorInput {
2928 let rustc_version = std::process::Command::new("rustc")
2929 .arg("--version")
2930 .output()
2931 .ok()
2932 .filter(|output| output.status.success())
2933 .map(|output| String::from_utf8_lossy(&output.stdout).trim().to_string())
2934 .unwrap_or_else(|| "rustc unknown".into());
2935 SelectorInput {
2936 signals: vec![signal.into()],
2937 env: EnvFingerprint {
2938 rustc_version,
2939 cargo_lock_hash: compute_artifact_hash("# lock\n"),
2940 target_triple: format!(
2941 "{}-unknown-{}",
2942 std::env::consts::ARCH,
2943 std::env::consts::OS
2944 ),
2945 os: std::env::consts::OS.into(),
2946 },
2947 spec_id: None,
2948 limit: 1,
2949 }
2950 }
2951
2952 fn build_test_evo_with_store(
2953 name: &str,
2954 run_id: &str,
2955 validator: Arc<dyn Validator>,
2956 store: Arc<dyn EvolutionStore>,
2957 ) -> EvoKernel<TestState> {
2958 let workspace = temp_workspace(name);
2959 let sandbox: Arc<dyn Sandbox> = Arc::new(oris_sandbox::LocalProcessSandbox::new(
2960 run_id,
2961 &workspace,
2962 std::env::temp_dir(),
2963 ));
2964 EvoKernel::new(test_kernel(), sandbox, validator, store)
2965 .with_governor(Arc::new(DefaultGovernor::new(
2966 oris_governor::GovernorConfig {
2967 promote_after_successes: 1,
2968 ..Default::default()
2969 },
2970 )))
2971 .with_validation_plan(lightweight_plan())
2972 .with_sandbox_policy(base_sandbox_policy())
2973 }
2974
2975 fn build_test_evo(
2976 name: &str,
2977 run_id: &str,
2978 validator: Arc<dyn Validator>,
2979 ) -> (EvoKernel<TestState>, Arc<dyn EvolutionStore>) {
2980 let store_root = std::env::temp_dir().join(format!(
2981 "oris-evokernel-{name}-store-{}",
2982 std::process::id()
2983 ));
2984 if store_root.exists() {
2985 fs::remove_dir_all(&store_root).unwrap();
2986 }
2987 let store: Arc<dyn EvolutionStore> =
2988 Arc::new(oris_evolution::JsonlEvolutionStore::new(&store_root));
2989 let evo = build_test_evo_with_store(name, run_id, validator, store.clone());
2990 (evo, store)
2991 }
2992
2993 fn remote_publish_envelope(
2994 sender_id: &str,
2995 run_id: &str,
2996 gene_id: &str,
2997 capsule_id: &str,
2998 mutation_id: &str,
2999 signal: &str,
3000 file_name: &str,
3001 line: &str,
3002 ) -> EvolutionEnvelope {
3003 remote_publish_envelope_with_env(
3004 sender_id,
3005 run_id,
3006 gene_id,
3007 capsule_id,
3008 mutation_id,
3009 signal,
3010 file_name,
3011 line,
3012 replay_input(signal).env,
3013 )
3014 }
3015
3016 fn remote_publish_envelope_with_env(
3017 sender_id: &str,
3018 run_id: &str,
3019 gene_id: &str,
3020 capsule_id: &str,
3021 mutation_id: &str,
3022 signal: &str,
3023 file_name: &str,
3024 line: &str,
3025 env: EnvFingerprint,
3026 ) -> EvolutionEnvelope {
3027 let mutation = prepare_mutation(
3028 MutationIntent {
3029 id: mutation_id.into(),
3030 intent: format!("add {file_name}"),
3031 target: MutationTarget::Paths {
3032 allow: vec![file_name.into()],
3033 },
3034 expected_effect: "replay should still validate".into(),
3035 risk: RiskLevel::Low,
3036 signals: vec![signal.into()],
3037 spec_id: None,
3038 },
3039 format!(
3040 "\
3041diff --git a/{file_name} b/{file_name}
3042new file mode 100644
3043index 0000000..1111111
3044--- /dev/null
3045+++ b/{file_name}
3046@@ -0,0 +1 @@
3047+{line}
3048"
3049 ),
3050 Some("HEAD".into()),
3051 );
3052 let gene = Gene {
3053 id: gene_id.into(),
3054 signals: vec![signal.into()],
3055 strategy: vec![file_name.into()],
3056 validation: vec!["test".into()],
3057 state: AssetState::Promoted,
3058 };
3059 let capsule = Capsule {
3060 id: capsule_id.into(),
3061 gene_id: gene_id.into(),
3062 mutation_id: mutation_id.into(),
3063 run_id: run_id.into(),
3064 diff_hash: mutation.artifact.content_hash.clone(),
3065 confidence: 0.9,
3066 env,
3067 outcome: Outcome {
3068 success: true,
3069 validation_profile: "test".into(),
3070 validation_duration_ms: 1,
3071 changed_files: vec![file_name.into()],
3072 validator_hash: "validator-hash".into(),
3073 lines_changed: 1,
3074 replay_verified: false,
3075 },
3076 state: AssetState::Promoted,
3077 };
3078 EvolutionEnvelope::publish(
3079 sender_id,
3080 vec![
3081 NetworkAsset::EvolutionEvent {
3082 event: EvolutionEvent::MutationDeclared { mutation },
3083 },
3084 NetworkAsset::Gene { gene: gene.clone() },
3085 NetworkAsset::Capsule {
3086 capsule: capsule.clone(),
3087 },
3088 NetworkAsset::EvolutionEvent {
3089 event: EvolutionEvent::CapsuleReleased {
3090 capsule_id: capsule.id.clone(),
3091 state: AssetState::Promoted,
3092 },
3093 },
3094 ],
3095 )
3096 }
3097
3098 struct FixedValidator {
3099 success: bool,
3100 }
3101
3102 #[async_trait]
3103 impl Validator for FixedValidator {
3104 async fn run(
3105 &self,
3106 _receipt: &SandboxReceipt,
3107 plan: &ValidationPlan,
3108 ) -> Result<ValidationReport, ValidationError> {
3109 Ok(ValidationReport {
3110 success: self.success,
3111 duration_ms: 1,
3112 stages: Vec::new(),
3113 logs: if self.success {
3114 format!("{} ok", plan.profile)
3115 } else {
3116 format!("{} failed", plan.profile)
3117 },
3118 })
3119 }
3120 }
3121
3122 struct FailOnAppendStore {
3123 inner: JsonlEvolutionStore,
3124 fail_on_call: usize,
3125 call_count: Mutex<usize>,
3126 }
3127
3128 impl FailOnAppendStore {
3129 fn new(root_dir: std::path::PathBuf, fail_on_call: usize) -> Self {
3130 Self {
3131 inner: JsonlEvolutionStore::new(root_dir),
3132 fail_on_call,
3133 call_count: Mutex::new(0),
3134 }
3135 }
3136 }
3137
3138 impl EvolutionStore for FailOnAppendStore {
3139 fn append_event(&self, event: EvolutionEvent) -> Result<u64, EvolutionError> {
3140 let mut call_count = self
3141 .call_count
3142 .lock()
3143 .map_err(|_| EvolutionError::Io("test store lock poisoned".into()))?;
3144 *call_count += 1;
3145 if *call_count == self.fail_on_call {
3146 return Err(EvolutionError::Io("injected append failure".into()));
3147 }
3148 self.inner.append_event(event)
3149 }
3150
3151 fn scan(&self, from_seq: u64) -> Result<Vec<StoredEvolutionEvent>, EvolutionError> {
3152 self.inner.scan(from_seq)
3153 }
3154
3155 fn rebuild_projection(&self) -> Result<EvolutionProjection, EvolutionError> {
3156 self.inner.rebuild_projection()
3157 }
3158 }
3159
3160 #[test]
3161 fn coordination_planner_to_coder_handoff_is_deterministic() {
3162 let result = MultiAgentCoordinator::new().coordinate(CoordinationPlan {
3163 root_goal: "ship feature".into(),
3164 primitive: CoordinationPrimitive::Sequential,
3165 tasks: vec![
3166 CoordinationTask {
3167 id: "planner".into(),
3168 role: AgentRole::Planner,
3169 description: "split the work".into(),
3170 depends_on: Vec::new(),
3171 },
3172 CoordinationTask {
3173 id: "coder".into(),
3174 role: AgentRole::Coder,
3175 description: "implement the patch".into(),
3176 depends_on: vec!["planner".into()],
3177 },
3178 ],
3179 timeout_ms: 5_000,
3180 max_retries: 0,
3181 });
3182
3183 assert_eq!(result.completed_tasks, vec!["planner", "coder"]);
3184 assert!(result.failed_tasks.is_empty());
3185 assert!(result.messages.iter().any(|message| {
3186 message.from_role == AgentRole::Planner
3187 && message.to_role == AgentRole::Coder
3188 && message.task_id == "coder"
3189 }));
3190 }
3191
3192 #[test]
3193 fn coordination_repair_runs_only_after_coder_failure() {
3194 let result = MultiAgentCoordinator::new().coordinate(CoordinationPlan {
3195 root_goal: "fix broken implementation".into(),
3196 primitive: CoordinationPrimitive::Sequential,
3197 tasks: vec![
3198 CoordinationTask {
3199 id: "coder".into(),
3200 role: AgentRole::Coder,
3201 description: "force-fail initial implementation".into(),
3202 depends_on: Vec::new(),
3203 },
3204 CoordinationTask {
3205 id: "repair".into(),
3206 role: AgentRole::Repair,
3207 description: "patch the failed implementation".into(),
3208 depends_on: vec!["coder".into()],
3209 },
3210 ],
3211 timeout_ms: 5_000,
3212 max_retries: 0,
3213 });
3214
3215 assert_eq!(result.completed_tasks, vec!["repair"]);
3216 assert_eq!(result.failed_tasks, vec!["coder"]);
3217 assert!(result.messages.iter().any(|message| {
3218 message.from_role == AgentRole::Coder
3219 && message.to_role == AgentRole::Repair
3220 && message.task_id == "repair"
3221 }));
3222 }
3223
3224 #[test]
3225 fn coordination_optimizer_runs_after_successful_implementation_step() {
3226 let result = MultiAgentCoordinator::new().coordinate(CoordinationPlan {
3227 root_goal: "ship optimized patch".into(),
3228 primitive: CoordinationPrimitive::Sequential,
3229 tasks: vec![
3230 CoordinationTask {
3231 id: "coder".into(),
3232 role: AgentRole::Coder,
3233 description: "implement a working patch".into(),
3234 depends_on: Vec::new(),
3235 },
3236 CoordinationTask {
3237 id: "optimizer".into(),
3238 role: AgentRole::Optimizer,
3239 description: "tighten the implementation".into(),
3240 depends_on: vec!["coder".into()],
3241 },
3242 ],
3243 timeout_ms: 5_000,
3244 max_retries: 0,
3245 });
3246
3247 assert_eq!(result.completed_tasks, vec!["coder", "optimizer"]);
3248 assert!(result.failed_tasks.is_empty());
3249 }
3250
3251 #[test]
3252 fn coordination_parallel_waves_preserve_sorted_merge_order() {
3253 let result = MultiAgentCoordinator::new().coordinate(CoordinationPlan {
3254 root_goal: "parallelize safe tasks".into(),
3255 primitive: CoordinationPrimitive::Parallel,
3256 tasks: vec![
3257 CoordinationTask {
3258 id: "z-task".into(),
3259 role: AgentRole::Planner,
3260 description: "analyze z".into(),
3261 depends_on: Vec::new(),
3262 },
3263 CoordinationTask {
3264 id: "a-task".into(),
3265 role: AgentRole::Coder,
3266 description: "implement a".into(),
3267 depends_on: Vec::new(),
3268 },
3269 CoordinationTask {
3270 id: "mid-task".into(),
3271 role: AgentRole::Optimizer,
3272 description: "polish after both".into(),
3273 depends_on: vec!["z-task".into(), "a-task".into()],
3274 },
3275 ],
3276 timeout_ms: 5_000,
3277 max_retries: 0,
3278 });
3279
3280 assert_eq!(result.completed_tasks, vec!["a-task", "z-task", "mid-task"]);
3281 assert!(result.failed_tasks.is_empty());
3282 }
3283
3284 #[test]
3285 fn coordination_retries_stop_at_max_retries() {
3286 let result = MultiAgentCoordinator::new().coordinate(CoordinationPlan {
3287 root_goal: "retry then stop".into(),
3288 primitive: CoordinationPrimitive::Sequential,
3289 tasks: vec![CoordinationTask {
3290 id: "coder".into(),
3291 role: AgentRole::Coder,
3292 description: "force-fail this task".into(),
3293 depends_on: Vec::new(),
3294 }],
3295 timeout_ms: 5_000,
3296 max_retries: 1,
3297 });
3298
3299 assert!(result.completed_tasks.is_empty());
3300 assert_eq!(result.failed_tasks, vec!["coder"]);
3301 assert_eq!(
3302 result
3303 .messages
3304 .iter()
3305 .filter(|message| message.task_id == "coder" && message.content.contains("failed"))
3306 .count(),
3307 2
3308 );
3309 }
3310
3311 #[test]
3312 fn coordination_conditional_mode_skips_downstream_tasks_on_failure() {
3313 let result = MultiAgentCoordinator::new().coordinate(CoordinationPlan {
3314 root_goal: "skip blocked follow-up work".into(),
3315 primitive: CoordinationPrimitive::Conditional,
3316 tasks: vec![
3317 CoordinationTask {
3318 id: "coder".into(),
3319 role: AgentRole::Coder,
3320 description: "force-fail the implementation".into(),
3321 depends_on: Vec::new(),
3322 },
3323 CoordinationTask {
3324 id: "optimizer".into(),
3325 role: AgentRole::Optimizer,
3326 description: "only optimize a successful implementation".into(),
3327 depends_on: vec!["coder".into()],
3328 },
3329 ],
3330 timeout_ms: 5_000,
3331 max_retries: 0,
3332 });
3333
3334 assert!(result.completed_tasks.is_empty());
3335 assert_eq!(result.failed_tasks, vec!["coder"]);
3336 assert!(result.messages.iter().any(|message| {
3337 message.task_id == "optimizer"
3338 && message
3339 .content
3340 .contains("skipped due to failed dependency chain")
3341 }));
3342 assert!(!result
3343 .failed_tasks
3344 .iter()
3345 .any(|task_id| task_id == "optimizer"));
3346 }
3347
3348 #[tokio::test]
3349 async fn command_validator_aggregates_stage_reports() {
3350 let workspace = temp_workspace("validator");
3351 let receipt = SandboxReceipt {
3352 mutation_id: "m".into(),
3353 workdir: workspace,
3354 applied: true,
3355 changed_files: Vec::new(),
3356 patch_hash: "hash".into(),
3357 stdout_log: std::env::temp_dir().join("stdout.log"),
3358 stderr_log: std::env::temp_dir().join("stderr.log"),
3359 };
3360 let validator = CommandValidator::new(SandboxPolicy {
3361 allowed_programs: vec!["git".into()],
3362 max_duration_ms: 1_000,
3363 max_output_bytes: 1024,
3364 denied_env_prefixes: Vec::new(),
3365 });
3366 let report = validator
3367 .run(
3368 &receipt,
3369 &ValidationPlan {
3370 profile: "test".into(),
3371 stages: vec![ValidationStage::Command {
3372 program: "git".into(),
3373 args: vec!["--version".into()],
3374 timeout_ms: 1_000,
3375 }],
3376 },
3377 )
3378 .await
3379 .unwrap();
3380 assert_eq!(report.stages.len(), 1);
3381 }
3382
3383 #[tokio::test]
3384 async fn capture_successful_mutation_appends_capsule() {
3385 let (evo, store) = build_test_evo("capture", "run-1", command_validator());
3386 let capsule = evo
3387 .capture_successful_mutation(&"run-1".into(), sample_mutation())
3388 .await
3389 .unwrap();
3390 let events = store.scan(1).unwrap();
3391 assert!(events
3392 .iter()
3393 .any(|stored| matches!(stored.event, EvolutionEvent::CapsuleCommitted { .. })));
3394 assert!(!capsule.id.is_empty());
3395 }
3396
3397 #[tokio::test]
3398 async fn replay_hit_records_capsule_reused() {
3399 let (evo, store) = build_test_evo("replay", "run-2", command_validator());
3400 let capsule = evo
3401 .capture_successful_mutation(&"run-2".into(), sample_mutation())
3402 .await
3403 .unwrap();
3404 let replay_run_id = "run-replay".to_string();
3405 let decision = evo
3406 .replay_or_fallback_for_run(&replay_run_id, replay_input("missing readme"))
3407 .await
3408 .unwrap();
3409 assert!(decision.used_capsule);
3410 assert_eq!(decision.capsule_id, Some(capsule.id));
3411 assert!(store.scan(1).unwrap().iter().any(|stored| matches!(
3412 &stored.event,
3413 EvolutionEvent::CapsuleReused {
3414 run_id,
3415 replay_run_id: Some(current_replay_run_id),
3416 ..
3417 } if run_id == "run-2" && current_replay_run_id == &replay_run_id
3418 )));
3419 }
3420
3421 #[tokio::test]
3422 async fn legacy_replay_executor_api_preserves_original_capsule_run_id() {
3423 let capture_run_id = "run-legacy-capture".to_string();
3424 let (evo, store) = build_test_evo("replay-legacy", &capture_run_id, command_validator());
3425 let capsule = evo
3426 .capture_successful_mutation(&capture_run_id, sample_mutation())
3427 .await
3428 .unwrap();
3429 let executor = StoreReplayExecutor {
3430 sandbox: evo.sandbox.clone(),
3431 validator: evo.validator.clone(),
3432 store: evo.store.clone(),
3433 selector: evo.selector.clone(),
3434 governor: evo.governor.clone(),
3435 economics: Some(evo.economics.clone()),
3436 remote_publishers: Some(evo.remote_publishers.clone()),
3437 stake_policy: evo.stake_policy.clone(),
3438 };
3439
3440 let decision = executor
3441 .try_replay(
3442 &replay_input("missing readme"),
3443 &evo.sandbox_policy,
3444 &evo.validation_plan,
3445 )
3446 .await
3447 .unwrap();
3448
3449 assert!(decision.used_capsule);
3450 assert_eq!(decision.capsule_id, Some(capsule.id));
3451 assert!(store.scan(1).unwrap().iter().any(|stored| matches!(
3452 &stored.event,
3453 EvolutionEvent::CapsuleReused {
3454 run_id,
3455 replay_run_id: None,
3456 ..
3457 } if run_id == &capture_run_id
3458 )));
3459 }
3460
3461 #[tokio::test]
3462 async fn metrics_snapshot_tracks_replay_promotion_and_revocation_signals() {
3463 let (evo, _) = build_test_evo("metrics", "run-metrics", command_validator());
3464 let capsule = evo
3465 .capture_successful_mutation(&"run-metrics".into(), sample_mutation())
3466 .await
3467 .unwrap();
3468 let decision = evo
3469 .replay_or_fallback(replay_input("missing readme"))
3470 .await
3471 .unwrap();
3472 assert!(decision.used_capsule);
3473
3474 evo.revoke_assets(&RevokeNotice {
3475 sender_id: "node-metrics".into(),
3476 asset_ids: vec![capsule.id.clone()],
3477 reason: "manual test revoke".into(),
3478 })
3479 .unwrap();
3480
3481 let snapshot = evo.metrics_snapshot().unwrap();
3482 assert_eq!(snapshot.replay_attempts_total, 1);
3483 assert_eq!(snapshot.replay_success_total, 1);
3484 assert_eq!(snapshot.replay_success_rate, 1.0);
3485 assert_eq!(snapshot.mutation_declared_total, 1);
3486 assert_eq!(snapshot.promoted_mutations_total, 1);
3487 assert_eq!(snapshot.promotion_ratio, 1.0);
3488 assert_eq!(snapshot.gene_revocations_total, 1);
3489 assert_eq!(snapshot.mutation_velocity_last_hour, 1);
3490 assert_eq!(snapshot.revoke_frequency_last_hour, 1);
3491 assert_eq!(snapshot.promoted_genes, 0);
3492 assert_eq!(snapshot.promoted_capsules, 0);
3493
3494 let rendered = evo.render_metrics_prometheus().unwrap();
3495 assert!(rendered.contains("oris_evolution_replay_success_rate 1.000000"));
3496 assert!(rendered.contains("oris_evolution_promotion_ratio 1.000000"));
3497 assert!(rendered.contains("oris_evolution_revoke_frequency_last_hour 1"));
3498 assert!(rendered.contains("oris_evolution_mutation_velocity_last_hour 1"));
3499 assert!(rendered.contains("oris_evolution_health 1"));
3500 }
3501
3502 #[tokio::test]
3503 async fn remote_replay_prefers_closest_environment_match() {
3504 let (evo, _) = build_test_evo("remote-env", "run-remote-env", command_validator());
3505 let input = replay_input("env-signal");
3506
3507 let envelope_a = remote_publish_envelope_with_env(
3508 "node-a",
3509 "run-remote-a",
3510 "gene-a",
3511 "capsule-a",
3512 "mutation-a",
3513 "env-signal",
3514 "A.md",
3515 "# from a",
3516 input.env.clone(),
3517 );
3518 let envelope_b = remote_publish_envelope_with_env(
3519 "node-b",
3520 "run-remote-b",
3521 "gene-b",
3522 "capsule-b",
3523 "mutation-b",
3524 "env-signal",
3525 "B.md",
3526 "# from b",
3527 EnvFingerprint {
3528 rustc_version: "old-rustc".into(),
3529 cargo_lock_hash: "other-lock".into(),
3530 target_triple: "aarch64-apple-darwin".into(),
3531 os: "linux".into(),
3532 },
3533 );
3534
3535 evo.import_remote_envelope(&envelope_a).unwrap();
3536 evo.import_remote_envelope(&envelope_b).unwrap();
3537
3538 let decision = evo.replay_or_fallback(input).await.unwrap();
3539
3540 assert!(decision.used_capsule);
3541 assert_eq!(decision.capsule_id, Some("capsule-a".into()));
3542 assert!(!decision.fallback_to_planner);
3543 }
3544
3545 #[tokio::test]
3546 async fn remote_capsule_stays_quarantined_until_first_successful_replay() {
3547 let (evo, store) = build_test_evo(
3548 "remote-quarantine",
3549 "run-remote-quarantine",
3550 command_validator(),
3551 );
3552 let envelope = remote_publish_envelope(
3553 "node-remote",
3554 "run-remote-quarantine",
3555 "gene-remote",
3556 "capsule-remote",
3557 "mutation-remote",
3558 "remote-signal",
3559 "REMOTE.md",
3560 "# from remote",
3561 );
3562
3563 evo.import_remote_envelope(&envelope).unwrap();
3564
3565 let before_replay = store.rebuild_projection().unwrap();
3566 let imported_gene = before_replay
3567 .genes
3568 .iter()
3569 .find(|gene| gene.id == "gene-remote")
3570 .unwrap();
3571 let imported_capsule = before_replay
3572 .capsules
3573 .iter()
3574 .find(|capsule| capsule.id == "capsule-remote")
3575 .unwrap();
3576 assert_eq!(imported_gene.state, AssetState::Quarantined);
3577 assert_eq!(imported_capsule.state, AssetState::Quarantined);
3578 let exported_before_replay =
3579 export_promoted_assets_from_store(store.as_ref(), "node-local").unwrap();
3580 assert!(exported_before_replay.assets.is_empty());
3581
3582 let decision = evo
3583 .replay_or_fallback(replay_input("remote-signal"))
3584 .await
3585 .unwrap();
3586
3587 assert!(decision.used_capsule);
3588 assert_eq!(decision.capsule_id, Some("capsule-remote".into()));
3589
3590 let after_replay = store.rebuild_projection().unwrap();
3591 let promoted_gene = after_replay
3592 .genes
3593 .iter()
3594 .find(|gene| gene.id == "gene-remote")
3595 .unwrap();
3596 let released_capsule = after_replay
3597 .capsules
3598 .iter()
3599 .find(|capsule| capsule.id == "capsule-remote")
3600 .unwrap();
3601 assert_eq!(promoted_gene.state, AssetState::Promoted);
3602 assert_eq!(released_capsule.state, AssetState::Promoted);
3603 let exported_after_replay =
3604 export_promoted_assets_from_store(store.as_ref(), "node-local").unwrap();
3605 assert_eq!(exported_after_replay.assets.len(), 3);
3606 assert!(exported_after_replay.assets.iter().any(|asset| matches!(
3607 asset,
3608 NetworkAsset::EvolutionEvent {
3609 event: EvolutionEvent::MutationDeclared { .. }
3610 }
3611 )));
3612 }
3613
3614 #[tokio::test]
3615 async fn publish_local_assets_include_mutation_payload_for_remote_replay() {
3616 let (source, source_store) = build_test_evo(
3617 "remote-publish-export",
3618 "run-remote-publish-export",
3619 command_validator(),
3620 );
3621 source
3622 .capture_successful_mutation(&"run-remote-publish-export".into(), sample_mutation())
3623 .await
3624 .unwrap();
3625 let envelope = EvolutionNetworkNode::new(source_store.clone())
3626 .publish_local_assets("node-source")
3627 .unwrap();
3628 assert!(envelope.assets.iter().any(|asset| matches!(
3629 asset,
3630 NetworkAsset::EvolutionEvent {
3631 event: EvolutionEvent::MutationDeclared { mutation }
3632 } if mutation.intent.id == "mutation-1"
3633 )));
3634
3635 let (remote, _) = build_test_evo(
3636 "remote-publish-import",
3637 "run-remote-publish-import",
3638 command_validator(),
3639 );
3640 remote.import_remote_envelope(&envelope).unwrap();
3641
3642 let decision = remote
3643 .replay_or_fallback(replay_input("missing readme"))
3644 .await
3645 .unwrap();
3646
3647 assert!(decision.used_capsule);
3648 assert!(!decision.fallback_to_planner);
3649 }
3650
3651 #[tokio::test]
3652 async fn fetch_assets_include_mutation_payload_for_remote_replay() {
3653 let (evo, store) = build_test_evo(
3654 "remote-fetch-export",
3655 "run-remote-fetch",
3656 command_validator(),
3657 );
3658 evo.capture_successful_mutation(&"run-remote-fetch".into(), sample_mutation())
3659 .await
3660 .unwrap();
3661
3662 let response = EvolutionNetworkNode::new(store.clone())
3663 .fetch_assets(
3664 "node-source",
3665 &FetchQuery {
3666 sender_id: "node-client".into(),
3667 signals: vec!["missing readme".into()],
3668 },
3669 )
3670 .unwrap();
3671
3672 assert!(response.assets.iter().any(|asset| matches!(
3673 asset,
3674 NetworkAsset::EvolutionEvent {
3675 event: EvolutionEvent::MutationDeclared { mutation }
3676 } if mutation.intent.id == "mutation-1"
3677 )));
3678 assert!(response
3679 .assets
3680 .iter()
3681 .any(|asset| matches!(asset, NetworkAsset::Gene { .. })));
3682 assert!(response
3683 .assets
3684 .iter()
3685 .any(|asset| matches!(asset, NetworkAsset::Capsule { .. })));
3686 }
3687
3688 #[test]
3689 fn partial_remote_import_keeps_publisher_for_already_imported_assets() {
3690 let store_root = std::env::temp_dir().join(format!(
3691 "oris-evokernel-remote-partial-store-{}",
3692 std::process::id()
3693 ));
3694 if store_root.exists() {
3695 fs::remove_dir_all(&store_root).unwrap();
3696 }
3697 let store: Arc<dyn EvolutionStore> = Arc::new(FailOnAppendStore::new(store_root, 4));
3698 let evo = build_test_evo_with_store(
3699 "remote-partial",
3700 "run-remote-partial",
3701 command_validator(),
3702 store.clone(),
3703 );
3704 let envelope = remote_publish_envelope(
3705 "node-partial",
3706 "run-remote-partial",
3707 "gene-partial",
3708 "capsule-partial",
3709 "mutation-partial",
3710 "partial-signal",
3711 "PARTIAL.md",
3712 "# partial",
3713 );
3714
3715 let result = evo.import_remote_envelope(&envelope);
3716
3717 assert!(matches!(result, Err(EvoKernelError::Store(_))));
3718 let projection = store.rebuild_projection().unwrap();
3719 assert!(projection
3720 .genes
3721 .iter()
3722 .any(|gene| gene.id == "gene-partial"));
3723 assert!(projection.capsules.is_empty());
3724 let publishers = evo.remote_publishers.lock().unwrap();
3725 assert_eq!(
3726 publishers.get("gene-partial").map(String::as_str),
3727 Some("node-partial")
3728 );
3729 }
3730
3731 #[tokio::test]
3732 async fn insufficient_evu_blocks_publish_but_not_local_replay() {
3733 let (evo, _) = build_test_evo("stake-gate", "run-stake", command_validator());
3734 let capsule = evo
3735 .capture_successful_mutation(&"run-stake".into(), sample_mutation())
3736 .await
3737 .unwrap();
3738 let publish = evo.export_promoted_assets("node-local");
3739 assert!(matches!(publish, Err(EvoKernelError::Validation(_))));
3740
3741 let decision = evo
3742 .replay_or_fallback(replay_input("missing readme"))
3743 .await
3744 .unwrap();
3745 assert!(decision.used_capsule);
3746 assert_eq!(decision.capsule_id, Some(capsule.id));
3747 }
3748
3749 #[tokio::test]
3750 async fn second_replay_validation_failure_revokes_gene_immediately() {
3751 let (capturer, store) = build_test_evo("revoke-replay", "run-capture", command_validator());
3752 let capsule = capturer
3753 .capture_successful_mutation(&"run-capture".into(), sample_mutation())
3754 .await
3755 .unwrap();
3756
3757 let failing_validator: Arc<dyn Validator> = Arc::new(FixedValidator { success: false });
3758 let failing_replay = build_test_evo_with_store(
3759 "revoke-replay",
3760 "run-replay-fail",
3761 failing_validator,
3762 store.clone(),
3763 );
3764
3765 let first = failing_replay
3766 .replay_or_fallback(replay_input("missing readme"))
3767 .await
3768 .unwrap();
3769 let second = failing_replay
3770 .replay_or_fallback(replay_input("missing readme"))
3771 .await
3772 .unwrap();
3773
3774 assert!(!first.used_capsule);
3775 assert!(first.fallback_to_planner);
3776 assert!(!second.used_capsule);
3777 assert!(second.fallback_to_planner);
3778
3779 let projection = store.rebuild_projection().unwrap();
3780 let gene = projection
3781 .genes
3782 .iter()
3783 .find(|gene| gene.id == capsule.gene_id)
3784 .unwrap();
3785 assert_eq!(gene.state, AssetState::Promoted);
3786 let committed_capsule = projection
3787 .capsules
3788 .iter()
3789 .find(|current| current.id == capsule.id)
3790 .unwrap();
3791 assert_eq!(committed_capsule.state, AssetState::Promoted);
3792
3793 let events = store.scan(1).unwrap();
3794 assert_eq!(
3795 events
3796 .iter()
3797 .filter(|stored| {
3798 matches!(
3799 &stored.event,
3800 EvolutionEvent::ValidationFailed {
3801 gene_id: Some(gene_id),
3802 ..
3803 } if gene_id == &capsule.gene_id
3804 )
3805 })
3806 .count(),
3807 1
3808 );
3809 assert!(!events.iter().any(|stored| {
3810 matches!(
3811 &stored.event,
3812 EvolutionEvent::GeneRevoked { gene_id, .. } if gene_id == &capsule.gene_id
3813 )
3814 }));
3815
3816 let recovered = build_test_evo_with_store(
3817 "revoke-replay",
3818 "run-replay-check",
3819 command_validator(),
3820 store.clone(),
3821 );
3822 let after_revoke = recovered
3823 .replay_or_fallback(replay_input("missing readme"))
3824 .await
3825 .unwrap();
3826 assert!(!after_revoke.used_capsule);
3827 assert!(after_revoke.fallback_to_planner);
3828 assert!(after_revoke.reason.contains("below replay threshold"));
3829 }
3830
3831 #[tokio::test]
3832 async fn remote_reuse_success_rewards_publisher_and_biases_selection() {
3833 let ledger = Arc::new(Mutex::new(EvuLedger {
3834 accounts: vec![],
3835 reputations: vec![
3836 oris_economics::ReputationRecord {
3837 node_id: "node-a".into(),
3838 publish_success_rate: 0.4,
3839 validator_accuracy: 0.4,
3840 reuse_impact: 0,
3841 },
3842 oris_economics::ReputationRecord {
3843 node_id: "node-b".into(),
3844 publish_success_rate: 0.95,
3845 validator_accuracy: 0.95,
3846 reuse_impact: 8,
3847 },
3848 ],
3849 }));
3850 let (evo, _) = build_test_evo("remote-success", "run-remote", command_validator());
3851 let evo = evo.with_economics(ledger.clone());
3852
3853 let envelope_a = remote_publish_envelope(
3854 "node-a",
3855 "run-remote-a",
3856 "gene-a",
3857 "capsule-a",
3858 "mutation-a",
3859 "shared-signal",
3860 "A.md",
3861 "# from a",
3862 );
3863 let envelope_b = remote_publish_envelope(
3864 "node-b",
3865 "run-remote-b",
3866 "gene-b",
3867 "capsule-b",
3868 "mutation-b",
3869 "shared-signal",
3870 "B.md",
3871 "# from b",
3872 );
3873
3874 evo.import_remote_envelope(&envelope_a).unwrap();
3875 evo.import_remote_envelope(&envelope_b).unwrap();
3876
3877 let decision = evo
3878 .replay_or_fallback(replay_input("shared-signal"))
3879 .await
3880 .unwrap();
3881
3882 assert!(decision.used_capsule);
3883 assert_eq!(decision.capsule_id, Some("capsule-b".into()));
3884 let locked = ledger.lock().unwrap();
3885 let rewarded = locked
3886 .accounts
3887 .iter()
3888 .find(|item| item.node_id == "node-b")
3889 .unwrap();
3890 assert_eq!(rewarded.balance, evo.stake_policy.reuse_reward);
3891 assert!(
3892 locked.selector_reputation_bias()["node-b"]
3893 > locked.selector_reputation_bias()["node-a"]
3894 );
3895 }
3896
3897 #[tokio::test]
3898 async fn remote_reuse_failure_penalizes_remote_reputation() {
3899 let ledger = Arc::new(Mutex::new(EvuLedger::default()));
3900 let failing_validator: Arc<dyn Validator> = Arc::new(FixedValidator { success: false });
3901 let (evo, _) = build_test_evo("remote-failure", "run-failure", failing_validator);
3902 let evo = evo.with_economics(ledger.clone());
3903
3904 let envelope = remote_publish_envelope(
3905 "node-remote",
3906 "run-remote-failed",
3907 "gene-remote",
3908 "capsule-remote",
3909 "mutation-remote",
3910 "failure-signal",
3911 "FAILED.md",
3912 "# from remote",
3913 );
3914 evo.import_remote_envelope(&envelope).unwrap();
3915
3916 let decision = evo
3917 .replay_or_fallback(replay_input("failure-signal"))
3918 .await
3919 .unwrap();
3920
3921 assert!(!decision.used_capsule);
3922 assert!(decision.fallback_to_planner);
3923
3924 let signal = evo.economics_signal("node-remote").unwrap();
3925 assert_eq!(signal.available_evu, 0);
3926 assert!(signal.publish_success_rate < 0.5);
3927 assert!(signal.validator_accuracy < 0.5);
3928 }
3929}