1use std::collections::{BTreeMap, BTreeSet};
4use std::fs;
5use std::path::Path;
6use std::process::Command;
7use std::sync::{Arc, Mutex};
8
9use async_trait::async_trait;
10use chrono::{DateTime, Duration, Utc};
11use oris_agent_contract::{
12 AgentRole, CoordinationMessage, CoordinationPlan, CoordinationPrimitive, CoordinationResult,
13 CoordinationTask, ExecutionFeedback, MutationProposal as AgentMutationProposal,
14};
15use oris_economics::{EconomicsSignal, EvuLedger, StakePolicy};
16use oris_evolution::{
17 compute_artifact_hash, next_id, stable_hash_json, AssetState, BlastRadius, CandidateSource,
18 Capsule, CapsuleId, EnvFingerprint, EvolutionError, EvolutionEvent, EvolutionProjection,
19 EvolutionStore, Gene, GeneCandidate, MutationId, PreparedMutation, Selector, SelectorInput,
20 StoreBackedSelector, StoredEvolutionEvent, ValidationSnapshot,
21};
22use oris_evolution_network::{EvolutionEnvelope, NetworkAsset};
23use oris_governor::{DefaultGovernor, Governor, GovernorDecision, GovernorInput};
24use oris_kernel::{Kernel, KernelState, RunId};
25use oris_sandbox::{
26 compute_blast_radius, execute_allowed_command, Sandbox, SandboxPolicy, SandboxReceipt,
27};
28use oris_spec::CompiledMutationPlan;
29use serde::{Deserialize, Serialize};
30use thiserror::Error;
31
32pub use oris_evolution::{
33 default_store_root, ArtifactEncoding, AssetState as EvoAssetState,
34 BlastRadius as EvoBlastRadius, CandidateSource as EvoCandidateSource,
35 EnvFingerprint as EvoEnvFingerprint, EvolutionStore as EvoEvolutionStore, JsonlEvolutionStore,
36 MutationArtifact, MutationIntent, MutationTarget, Outcome, RiskLevel,
37 SelectorInput as EvoSelectorInput,
38};
39pub use oris_evolution_network::{
40 FetchQuery, FetchResponse, MessageType, PublishRequest, RevokeNotice,
41};
42pub use oris_governor::{CoolingWindow, GovernorConfig, RevocationReason};
43pub use oris_sandbox::{LocalProcessSandbox, SandboxPolicy as EvoSandboxPolicy};
44pub use oris_spec::{SpecCompileError, SpecCompiler, SpecDocument};
45
46#[derive(Clone, Debug, Serialize, Deserialize)]
47pub struct ValidationPlan {
48 pub profile: String,
49 pub stages: Vec<ValidationStage>,
50}
51
52impl ValidationPlan {
53 pub fn oris_default() -> Self {
54 Self {
55 profile: "oris-default".into(),
56 stages: vec![
57 ValidationStage::Command {
58 program: "cargo".into(),
59 args: vec!["fmt".into(), "--all".into(), "--check".into()],
60 timeout_ms: 60_000,
61 },
62 ValidationStage::Command {
63 program: "cargo".into(),
64 args: vec!["check".into(), "--workspace".into()],
65 timeout_ms: 180_000,
66 },
67 ValidationStage::Command {
68 program: "cargo".into(),
69 args: vec![
70 "test".into(),
71 "-p".into(),
72 "oris-kernel".into(),
73 "-p".into(),
74 "oris-evolution".into(),
75 "-p".into(),
76 "oris-sandbox".into(),
77 "-p".into(),
78 "oris-evokernel".into(),
79 "--lib".into(),
80 ],
81 timeout_ms: 300_000,
82 },
83 ValidationStage::Command {
84 program: "cargo".into(),
85 args: vec![
86 "test".into(),
87 "-p".into(),
88 "oris-runtime".into(),
89 "--lib".into(),
90 ],
91 timeout_ms: 300_000,
92 },
93 ],
94 }
95 }
96}
97
98#[derive(Clone, Debug, Serialize, Deserialize)]
99pub enum ValidationStage {
100 Command {
101 program: String,
102 args: Vec<String>,
103 timeout_ms: u64,
104 },
105}
106
107#[derive(Clone, Debug, Serialize, Deserialize)]
108pub struct ValidationStageReport {
109 pub stage: String,
110 pub success: bool,
111 pub exit_code: Option<i32>,
112 pub duration_ms: u64,
113 pub stdout: String,
114 pub stderr: String,
115}
116
117#[derive(Clone, Debug, Serialize, Deserialize)]
118pub struct ValidationReport {
119 pub success: bool,
120 pub duration_ms: u64,
121 pub stages: Vec<ValidationStageReport>,
122 pub logs: String,
123}
124
125#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
126pub struct SignalExtractionInput {
127 pub patch_diff: String,
128 pub intent: String,
129 pub expected_effect: String,
130 pub declared_signals: Vec<String>,
131 pub changed_files: Vec<String>,
132 pub validation_success: bool,
133 pub validation_logs: String,
134 pub stage_outputs: Vec<String>,
135}
136
137#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
138pub struct SignalExtractionOutput {
139 pub values: Vec<String>,
140 pub hash: String,
141}
142
143impl ValidationReport {
144 pub fn to_snapshot(&self, profile: &str) -> ValidationSnapshot {
145 ValidationSnapshot {
146 success: self.success,
147 profile: profile.to_string(),
148 duration_ms: self.duration_ms,
149 summary: if self.success {
150 "validation passed".into()
151 } else {
152 "validation failed".into()
153 },
154 }
155 }
156}
157
158pub fn extract_deterministic_signals(input: &SignalExtractionInput) -> SignalExtractionOutput {
159 let mut signals = BTreeSet::new();
160
161 for declared in &input.declared_signals {
162 if let Some(phrase) = normalize_signal_phrase(declared) {
163 signals.insert(phrase);
164 }
165 extend_signal_tokens(&mut signals, declared);
166 }
167
168 for text in [
169 input.patch_diff.as_str(),
170 input.intent.as_str(),
171 input.expected_effect.as_str(),
172 input.validation_logs.as_str(),
173 ] {
174 extend_signal_tokens(&mut signals, text);
175 }
176
177 for changed_file in &input.changed_files {
178 extend_signal_tokens(&mut signals, changed_file);
179 }
180
181 for stage_output in &input.stage_outputs {
182 extend_signal_tokens(&mut signals, stage_output);
183 }
184
185 signals.insert(if input.validation_success {
186 "validation passed".into()
187 } else {
188 "validation failed".into()
189 });
190
191 let values = signals.into_iter().take(32).collect::<Vec<_>>();
192 let hash =
193 stable_hash_json(&values).unwrap_or_else(|_| compute_artifact_hash(&values.join("\n")));
194 SignalExtractionOutput { values, hash }
195}
196
197#[derive(Debug, Error)]
198pub enum ValidationError {
199 #[error("validation execution failed: {0}")]
200 Execution(String),
201}
202
203#[async_trait]
204pub trait Validator: Send + Sync {
205 async fn run(
206 &self,
207 receipt: &SandboxReceipt,
208 plan: &ValidationPlan,
209 ) -> Result<ValidationReport, ValidationError>;
210}
211
212pub struct CommandValidator {
213 policy: SandboxPolicy,
214}
215
216impl CommandValidator {
217 pub fn new(policy: SandboxPolicy) -> Self {
218 Self { policy }
219 }
220}
221
222#[async_trait]
223impl Validator for CommandValidator {
224 async fn run(
225 &self,
226 receipt: &SandboxReceipt,
227 plan: &ValidationPlan,
228 ) -> Result<ValidationReport, ValidationError> {
229 let started = std::time::Instant::now();
230 let mut stages = Vec::new();
231 let mut success = true;
232 let mut logs = String::new();
233
234 for stage in &plan.stages {
235 match stage {
236 ValidationStage::Command {
237 program,
238 args,
239 timeout_ms,
240 } => {
241 let result = execute_allowed_command(
242 &self.policy,
243 &receipt.workdir,
244 program,
245 args,
246 *timeout_ms,
247 )
248 .await;
249 let report = match result {
250 Ok(output) => ValidationStageReport {
251 stage: format!("{program} {}", args.join(" ")),
252 success: output.success,
253 exit_code: output.exit_code,
254 duration_ms: output.duration_ms,
255 stdout: output.stdout,
256 stderr: output.stderr,
257 },
258 Err(err) => ValidationStageReport {
259 stage: format!("{program} {}", args.join(" ")),
260 success: false,
261 exit_code: None,
262 duration_ms: 0,
263 stdout: String::new(),
264 stderr: err.to_string(),
265 },
266 };
267 if !report.success {
268 success = false;
269 }
270 if !report.stdout.is_empty() {
271 logs.push_str(&report.stdout);
272 logs.push('\n');
273 }
274 if !report.stderr.is_empty() {
275 logs.push_str(&report.stderr);
276 logs.push('\n');
277 }
278 stages.push(report);
279 if !success {
280 break;
281 }
282 }
283 }
284 }
285
286 Ok(ValidationReport {
287 success,
288 duration_ms: started.elapsed().as_millis() as u64,
289 stages,
290 logs,
291 })
292 }
293}
294
295#[derive(Clone, Debug)]
296pub struct ReplayDecision {
297 pub used_capsule: bool,
298 pub capsule_id: Option<CapsuleId>,
299 pub fallback_to_planner: bool,
300 pub reason: String,
301}
302
303#[derive(Clone, Copy, Debug, Eq, PartialEq)]
304enum CoordinationTaskState {
305 Ready,
306 Waiting,
307 BlockedByFailure,
308 PermanentlyBlocked,
309}
310
311#[derive(Clone, Debug, Default)]
312pub struct MultiAgentCoordinator;
313
314impl MultiAgentCoordinator {
315 pub fn new() -> Self {
316 Self
317 }
318
319 pub fn coordinate(&self, plan: CoordinationPlan) -> CoordinationResult {
320 let primitive = plan.primitive.clone();
321 let root_goal = plan.root_goal.clone();
322 let timeout_ms = plan.timeout_ms;
323 let max_retries = plan.max_retries;
324 let mut tasks = BTreeMap::new();
325 for task in plan.tasks {
326 tasks.entry(task.id.clone()).or_insert(task);
327 }
328
329 let mut pending = tasks.keys().cloned().collect::<BTreeSet<_>>();
330 let mut completed = BTreeSet::new();
331 let mut failed = BTreeSet::new();
332 let mut completed_order = Vec::new();
333 let mut failed_order = Vec::new();
334 let mut skipped = BTreeSet::new();
335 let mut attempts = BTreeMap::new();
336 let mut messages = Vec::new();
337
338 loop {
339 if matches!(primitive, CoordinationPrimitive::Conditional) {
340 self.apply_conditional_skips(
341 &tasks,
342 &mut pending,
343 &completed,
344 &failed,
345 &mut skipped,
346 &mut messages,
347 );
348 }
349
350 let mut ready = self.ready_task_ids(&tasks, &pending, &completed, &failed, &skipped);
351 if ready.is_empty() {
352 break;
353 }
354 if matches!(primitive, CoordinationPrimitive::Sequential) {
355 ready.truncate(1);
356 }
357
358 for task_id in ready {
359 let Some(task) = tasks.get(&task_id) else {
360 continue;
361 };
362 if !pending.contains(&task_id) {
363 continue;
364 }
365 self.record_handoff_messages(task, &tasks, &completed, &failed, &mut messages);
366
367 let prior_failures = attempts.get(&task_id).copied().unwrap_or(0);
368 if Self::simulate_task_failure(task, prior_failures) {
369 let failure_count = prior_failures + 1;
370 attempts.insert(task_id.clone(), failure_count);
371 let will_retry = failure_count <= max_retries;
372 messages.push(CoordinationMessage {
373 from_role: task.role.clone(),
374 to_role: task.role.clone(),
375 task_id: task_id.clone(),
376 content: if will_retry {
377 format!("task {task_id} failed on attempt {failure_count} and will retry")
378 } else {
379 format!(
380 "task {task_id} failed on attempt {failure_count} and exhausted retries"
381 )
382 },
383 });
384 if !will_retry {
385 pending.remove(&task_id);
386 if failed.insert(task_id.clone()) {
387 failed_order.push(task_id);
388 }
389 }
390 continue;
391 }
392
393 pending.remove(&task_id);
394 if completed.insert(task_id.clone()) {
395 completed_order.push(task_id);
396 }
397 }
398 }
399
400 let blocked_ids = pending.into_iter().collect::<Vec<_>>();
401 for task_id in blocked_ids {
402 let Some(task) = tasks.get(&task_id) else {
403 continue;
404 };
405 let state = self.classify_task(task, &tasks, &completed, &failed, &skipped);
406 let content = match state {
407 CoordinationTaskState::BlockedByFailure => {
408 format!("task {task_id} blocked by failed dependencies")
409 }
410 CoordinationTaskState::PermanentlyBlocked => {
411 format!("task {task_id} has invalid coordination prerequisites")
412 }
413 CoordinationTaskState::Waiting => {
414 format!("task {task_id} has unresolved dependencies")
415 }
416 CoordinationTaskState::Ready => {
417 format!("task {task_id} was left pending unexpectedly")
418 }
419 };
420 messages.push(CoordinationMessage {
421 from_role: task.role.clone(),
422 to_role: task.role.clone(),
423 task_id: task_id.clone(),
424 content,
425 });
426 if failed.insert(task_id.clone()) {
427 failed_order.push(task_id);
428 }
429 }
430
431 CoordinationResult {
432 completed_tasks: completed_order,
433 failed_tasks: failed_order,
434 messages,
435 summary: format!(
436 "goal '{}' completed {} tasks, failed {}, skipped {} using {:?} coordination (timeout={}ms, max_retries={})",
437 root_goal,
438 completed.len(),
439 failed.len(),
440 skipped.len(),
441 primitive,
442 timeout_ms,
443 max_retries
444 ),
445 }
446 }
447
448 fn ready_task_ids(
449 &self,
450 tasks: &BTreeMap<String, CoordinationTask>,
451 pending: &BTreeSet<String>,
452 completed: &BTreeSet<String>,
453 failed: &BTreeSet<String>,
454 skipped: &BTreeSet<String>,
455 ) -> Vec<String> {
456 pending
457 .iter()
458 .filter_map(|task_id| {
459 let task = tasks.get(task_id)?;
460 (self.classify_task(task, tasks, completed, failed, skipped)
461 == CoordinationTaskState::Ready)
462 .then(|| task_id.clone())
463 })
464 .collect()
465 }
466
467 fn apply_conditional_skips(
468 &self,
469 tasks: &BTreeMap<String, CoordinationTask>,
470 pending: &mut BTreeSet<String>,
471 completed: &BTreeSet<String>,
472 failed: &BTreeSet<String>,
473 skipped: &mut BTreeSet<String>,
474 messages: &mut Vec<CoordinationMessage>,
475 ) {
476 let skip_ids = pending
477 .iter()
478 .filter_map(|task_id| {
479 let task = tasks.get(task_id)?;
480 (self.classify_task(task, tasks, completed, failed, skipped)
481 == CoordinationTaskState::BlockedByFailure)
482 .then(|| task_id.clone())
483 })
484 .collect::<Vec<_>>();
485
486 for task_id in skip_ids {
487 let Some(task) = tasks.get(&task_id) else {
488 continue;
489 };
490 pending.remove(&task_id);
491 skipped.insert(task_id.clone());
492 messages.push(CoordinationMessage {
493 from_role: task.role.clone(),
494 to_role: task.role.clone(),
495 task_id: task_id.clone(),
496 content: format!("task {task_id} skipped due to failed dependency chain"),
497 });
498 }
499 }
500
501 fn classify_task(
502 &self,
503 task: &CoordinationTask,
504 tasks: &BTreeMap<String, CoordinationTask>,
505 completed: &BTreeSet<String>,
506 failed: &BTreeSet<String>,
507 skipped: &BTreeSet<String>,
508 ) -> CoordinationTaskState {
509 match task.role {
510 AgentRole::Planner | AgentRole::Coder => {
511 let mut waiting = false;
512 for dependency_id in &task.depends_on {
513 if !tasks.contains_key(dependency_id) {
514 return CoordinationTaskState::PermanentlyBlocked;
515 }
516 if skipped.contains(dependency_id) || failed.contains(dependency_id) {
517 return CoordinationTaskState::BlockedByFailure;
518 }
519 if !completed.contains(dependency_id) {
520 waiting = true;
521 }
522 }
523 if waiting {
524 CoordinationTaskState::Waiting
525 } else {
526 CoordinationTaskState::Ready
527 }
528 }
529 AgentRole::Repair => {
530 let mut waiting = false;
531 let mut has_coder_dependency = false;
532 let mut has_failed_coder = false;
533 for dependency_id in &task.depends_on {
534 let Some(dependency) = tasks.get(dependency_id) else {
535 return CoordinationTaskState::PermanentlyBlocked;
536 };
537 let is_coder = matches!(dependency.role, AgentRole::Coder);
538 if is_coder {
539 has_coder_dependency = true;
540 }
541 if skipped.contains(dependency_id) {
542 return CoordinationTaskState::BlockedByFailure;
543 }
544 if failed.contains(dependency_id) {
545 if is_coder {
546 has_failed_coder = true;
547 } else {
548 return CoordinationTaskState::BlockedByFailure;
549 }
550 continue;
551 }
552 if !completed.contains(dependency_id) {
553 waiting = true;
554 }
555 }
556 if !has_coder_dependency {
557 CoordinationTaskState::PermanentlyBlocked
558 } else if waiting {
559 CoordinationTaskState::Waiting
560 } else if has_failed_coder {
561 CoordinationTaskState::Ready
562 } else {
563 CoordinationTaskState::PermanentlyBlocked
564 }
565 }
566 AgentRole::Optimizer => {
567 let mut waiting = false;
568 let mut has_impl_dependency = false;
569 let mut has_completed_impl = false;
570 let mut has_failed_impl = false;
571 for dependency_id in &task.depends_on {
572 let Some(dependency) = tasks.get(dependency_id) else {
573 return CoordinationTaskState::PermanentlyBlocked;
574 };
575 let is_impl = matches!(dependency.role, AgentRole::Coder | AgentRole::Repair);
576 if is_impl {
577 has_impl_dependency = true;
578 }
579 if skipped.contains(dependency_id) || failed.contains(dependency_id) {
580 if is_impl {
581 has_failed_impl = true;
582 continue;
583 }
584 return CoordinationTaskState::BlockedByFailure;
585 }
586 if completed.contains(dependency_id) {
587 if is_impl {
588 has_completed_impl = true;
589 }
590 continue;
591 }
592 waiting = true;
593 }
594 if !has_impl_dependency {
595 CoordinationTaskState::PermanentlyBlocked
596 } else if waiting {
597 CoordinationTaskState::Waiting
598 } else if has_completed_impl {
599 CoordinationTaskState::Ready
600 } else if has_failed_impl {
601 CoordinationTaskState::BlockedByFailure
602 } else {
603 CoordinationTaskState::PermanentlyBlocked
604 }
605 }
606 }
607 }
608
609 fn record_handoff_messages(
610 &self,
611 task: &CoordinationTask,
612 tasks: &BTreeMap<String, CoordinationTask>,
613 completed: &BTreeSet<String>,
614 failed: &BTreeSet<String>,
615 messages: &mut Vec<CoordinationMessage>,
616 ) {
617 let mut dependency_ids = task.depends_on.clone();
618 dependency_ids.sort();
619 dependency_ids.dedup();
620
621 for dependency_id in dependency_ids {
622 let Some(dependency) = tasks.get(&dependency_id) else {
623 continue;
624 };
625 if completed.contains(&dependency_id) {
626 messages.push(CoordinationMessage {
627 from_role: dependency.role.clone(),
628 to_role: task.role.clone(),
629 task_id: task.id.clone(),
630 content: format!("handoff from {dependency_id} to {}", task.id),
631 });
632 } else if failed.contains(&dependency_id) {
633 messages.push(CoordinationMessage {
634 from_role: dependency.role.clone(),
635 to_role: task.role.clone(),
636 task_id: task.id.clone(),
637 content: format!("failed dependency {dependency_id} routed to {}", task.id),
638 });
639 }
640 }
641 }
642
643 fn simulate_task_failure(task: &CoordinationTask, prior_failures: u32) -> bool {
644 let normalized = task.description.to_ascii_lowercase();
645 normalized.contains("force-fail")
646 || (normalized.contains("fail-once") && prior_failures == 0)
647 }
648}
649
650#[derive(Debug, Error)]
651pub enum ReplayError {
652 #[error("store error: {0}")]
653 Store(String),
654 #[error("sandbox error: {0}")]
655 Sandbox(String),
656 #[error("validation error: {0}")]
657 Validation(String),
658}
659
660#[async_trait]
661pub trait ReplayExecutor: Send + Sync {
662 async fn try_replay(
663 &self,
664 input: &SelectorInput,
665 policy: &SandboxPolicy,
666 validation: &ValidationPlan,
667 ) -> Result<ReplayDecision, ReplayError>;
668}
669
670pub struct StoreReplayExecutor {
671 pub sandbox: Arc<dyn Sandbox>,
672 pub validator: Arc<dyn Validator>,
673 pub store: Arc<dyn EvolutionStore>,
674 pub selector: Arc<dyn Selector>,
675 pub governor: Arc<dyn Governor>,
676 pub economics: Option<Arc<Mutex<EvuLedger>>>,
677 pub remote_publishers: Option<Arc<Mutex<BTreeMap<String, String>>>>,
678 pub stake_policy: StakePolicy,
679}
680
681#[async_trait]
682impl ReplayExecutor for StoreReplayExecutor {
683 async fn try_replay(
684 &self,
685 input: &SelectorInput,
686 policy: &SandboxPolicy,
687 validation: &ValidationPlan,
688 ) -> Result<ReplayDecision, ReplayError> {
689 let mut selector_input = input.clone();
690 if self.economics.is_some() && self.remote_publishers.is_some() {
691 selector_input.limit = selector_input.limit.max(4);
692 }
693 let mut candidates = self.selector.select(&selector_input);
694 self.rerank_with_reputation_bias(&mut candidates);
695 let mut exact_match = false;
696 if candidates.is_empty() {
697 let mut exact_candidates = exact_match_candidates(self.store.as_ref(), input);
698 self.rerank_with_reputation_bias(&mut exact_candidates);
699 if !exact_candidates.is_empty() {
700 candidates = exact_candidates;
701 exact_match = true;
702 }
703 }
704 if candidates.is_empty() {
705 let mut remote_candidates =
706 quarantined_remote_exact_match_candidates(self.store.as_ref(), input);
707 self.rerank_with_reputation_bias(&mut remote_candidates);
708 if !remote_candidates.is_empty() {
709 candidates = remote_candidates;
710 exact_match = true;
711 }
712 }
713 candidates.truncate(input.limit.max(1));
714 let Some(best) = candidates.into_iter().next() else {
715 return Ok(ReplayDecision {
716 used_capsule: false,
717 capsule_id: None,
718 fallback_to_planner: true,
719 reason: "no matching gene".into(),
720 });
721 };
722 let remote_publisher = self.publisher_for_gene(&best.gene.id);
723
724 if !exact_match && best.score < 0.82 {
725 return Ok(ReplayDecision {
726 used_capsule: false,
727 capsule_id: None,
728 fallback_to_planner: true,
729 reason: format!("best gene score {:.3} below replay threshold", best.score),
730 });
731 }
732
733 let Some(capsule) = best.capsules.first().cloned() else {
734 return Ok(ReplayDecision {
735 used_capsule: false,
736 capsule_id: None,
737 fallback_to_planner: true,
738 reason: "candidate gene has no capsule".into(),
739 });
740 };
741
742 let Some(mutation) = find_declared_mutation(self.store.as_ref(), &capsule.mutation_id)
743 .map_err(|err| ReplayError::Store(err.to_string()))?
744 else {
745 return Ok(ReplayDecision {
746 used_capsule: false,
747 capsule_id: None,
748 fallback_to_planner: true,
749 reason: "mutation payload missing from store".into(),
750 });
751 };
752
753 let receipt = match self.sandbox.apply(&mutation, policy).await {
754 Ok(receipt) => receipt,
755 Err(err) => {
756 self.record_reuse_settlement(remote_publisher.as_deref(), false);
757 return Ok(ReplayDecision {
758 used_capsule: false,
759 capsule_id: Some(capsule.id.clone()),
760 fallback_to_planner: true,
761 reason: format!("replay patch apply failed: {err}"),
762 });
763 }
764 };
765
766 let report = self
767 .validator
768 .run(&receipt, validation)
769 .await
770 .map_err(|err| ReplayError::Validation(err.to_string()))?;
771 if !report.success {
772 self.record_replay_validation_failure(&best, &capsule, validation, &report)?;
773 self.record_reuse_settlement(remote_publisher.as_deref(), false);
774 return Ok(ReplayDecision {
775 used_capsule: false,
776 capsule_id: Some(capsule.id.clone()),
777 fallback_to_planner: true,
778 reason: "replay validation failed".into(),
779 });
780 }
781
782 if matches!(capsule.state, AssetState::Quarantined) {
783 self.store
784 .append_event(EvolutionEvent::ValidationPassed {
785 mutation_id: capsule.mutation_id.clone(),
786 report: report.to_snapshot(&validation.profile),
787 gene_id: Some(best.gene.id.clone()),
788 })
789 .map_err(|err| ReplayError::Store(err.to_string()))?;
790 self.store
791 .append_event(EvolutionEvent::CapsuleReleased {
792 capsule_id: capsule.id.clone(),
793 state: AssetState::Promoted,
794 })
795 .map_err(|err| ReplayError::Store(err.to_string()))?;
796 }
797
798 self.store
799 .append_event(EvolutionEvent::CapsuleReused {
800 capsule_id: capsule.id.clone(),
801 gene_id: capsule.gene_id.clone(),
802 run_id: capsule.run_id.clone(),
803 })
804 .map_err(|err| ReplayError::Store(err.to_string()))?;
805 self.record_reuse_settlement(remote_publisher.as_deref(), true);
806
807 Ok(ReplayDecision {
808 used_capsule: true,
809 capsule_id: Some(capsule.id),
810 fallback_to_planner: false,
811 reason: if exact_match {
812 "replayed via exact-match cold-start lookup".into()
813 } else {
814 "replayed via selector".into()
815 },
816 })
817 }
818}
819
820impl StoreReplayExecutor {
821 fn rerank_with_reputation_bias(&self, candidates: &mut [GeneCandidate]) {
822 let Some(ledger) = self.economics.as_ref() else {
823 return;
824 };
825 let Some(remote_publishers) = self.remote_publishers.as_ref() else {
826 return;
827 };
828 let reputation_bias = ledger
829 .lock()
830 .ok()
831 .map(|locked| locked.selector_reputation_bias())
832 .unwrap_or_default();
833 if reputation_bias.is_empty() {
834 return;
835 }
836 let publisher_map = remote_publishers
837 .lock()
838 .ok()
839 .map(|locked| locked.clone())
840 .unwrap_or_default();
841 candidates.sort_by(|left, right| {
842 effective_candidate_score(right, &publisher_map, &reputation_bias)
843 .partial_cmp(&effective_candidate_score(
844 left,
845 &publisher_map,
846 &reputation_bias,
847 ))
848 .unwrap_or(std::cmp::Ordering::Equal)
849 .then_with(|| left.gene.id.cmp(&right.gene.id))
850 });
851 }
852
853 fn publisher_for_gene(&self, gene_id: &str) -> Option<String> {
854 self.remote_publishers
855 .as_ref()?
856 .lock()
857 .ok()?
858 .get(gene_id)
859 .cloned()
860 }
861
862 fn record_reuse_settlement(&self, publisher_id: Option<&str>, success: bool) {
863 let Some(publisher_id) = publisher_id else {
864 return;
865 };
866 let Some(ledger) = self.economics.as_ref() else {
867 return;
868 };
869 if let Ok(mut locked) = ledger.lock() {
870 locked.settle_remote_reuse(publisher_id, success, &self.stake_policy);
871 }
872 }
873
874 fn record_replay_validation_failure(
875 &self,
876 best: &GeneCandidate,
877 capsule: &Capsule,
878 validation: &ValidationPlan,
879 report: &ValidationReport,
880 ) -> Result<(), ReplayError> {
881 let projection = self
882 .store
883 .rebuild_projection()
884 .map_err(|err| ReplayError::Store(err.to_string()))?;
885 let (current_confidence, historical_peak_confidence, confidence_last_updated_secs) =
886 Self::confidence_context(&projection, &best.gene.id);
887
888 self.store
889 .append_event(EvolutionEvent::ValidationFailed {
890 mutation_id: capsule.mutation_id.clone(),
891 report: report.to_snapshot(&validation.profile),
892 gene_id: Some(best.gene.id.clone()),
893 })
894 .map_err(|err| ReplayError::Store(err.to_string()))?;
895
896 let replay_failures = self.replay_failure_count(&best.gene.id)?;
897 let governor_decision = self.governor.evaluate(GovernorInput {
898 candidate_source: if self.publisher_for_gene(&best.gene.id).is_some() {
899 CandidateSource::Remote
900 } else {
901 CandidateSource::Local
902 },
903 success_count: 0,
904 blast_radius: BlastRadius {
905 files_changed: capsule.outcome.changed_files.len(),
906 lines_changed: capsule.outcome.lines_changed,
907 },
908 replay_failures,
909 recent_mutation_ages_secs: Vec::new(),
910 current_confidence,
911 historical_peak_confidence,
912 confidence_last_updated_secs,
913 });
914
915 if matches!(governor_decision.target_state, AssetState::Revoked) {
916 self.store
917 .append_event(EvolutionEvent::PromotionEvaluated {
918 gene_id: best.gene.id.clone(),
919 state: AssetState::Revoked,
920 reason: governor_decision.reason.clone(),
921 })
922 .map_err(|err| ReplayError::Store(err.to_string()))?;
923 self.store
924 .append_event(EvolutionEvent::GeneRevoked {
925 gene_id: best.gene.id.clone(),
926 reason: governor_decision.reason,
927 })
928 .map_err(|err| ReplayError::Store(err.to_string()))?;
929 for related in &best.capsules {
930 self.store
931 .append_event(EvolutionEvent::CapsuleQuarantined {
932 capsule_id: related.id.clone(),
933 })
934 .map_err(|err| ReplayError::Store(err.to_string()))?;
935 }
936 }
937
938 Ok(())
939 }
940
941 fn confidence_context(
942 projection: &EvolutionProjection,
943 gene_id: &str,
944 ) -> (f32, f32, Option<u64>) {
945 let peak_confidence = projection
946 .capsules
947 .iter()
948 .filter(|capsule| capsule.gene_id == gene_id)
949 .map(|capsule| capsule.confidence)
950 .fold(0.0_f32, f32::max);
951 let age_secs = projection
952 .last_updated_at
953 .get(gene_id)
954 .and_then(|timestamp| Self::seconds_since_timestamp(timestamp, Utc::now()));
955 (peak_confidence, peak_confidence, age_secs)
956 }
957
958 fn seconds_since_timestamp(timestamp: &str, now: DateTime<Utc>) -> Option<u64> {
959 let parsed = DateTime::parse_from_rfc3339(timestamp)
960 .ok()?
961 .with_timezone(&Utc);
962 let elapsed = now.signed_duration_since(parsed);
963 if elapsed < Duration::zero() {
964 Some(0)
965 } else {
966 u64::try_from(elapsed.num_seconds()).ok()
967 }
968 }
969
970 fn replay_failure_count(&self, gene_id: &str) -> Result<u64, ReplayError> {
971 Ok(self
972 .store
973 .scan(1)
974 .map_err(|err| ReplayError::Store(err.to_string()))?
975 .into_iter()
976 .filter(|stored| {
977 matches!(
978 &stored.event,
979 EvolutionEvent::ValidationFailed {
980 gene_id: Some(current_gene_id),
981 ..
982 } if current_gene_id == gene_id
983 )
984 })
985 .count() as u64)
986 }
987}
988
989#[derive(Debug, Error)]
990pub enum EvoKernelError {
991 #[error("sandbox error: {0}")]
992 Sandbox(String),
993 #[error("validation error: {0}")]
994 Validation(String),
995 #[error("validation failed")]
996 ValidationFailed(ValidationReport),
997 #[error("store error: {0}")]
998 Store(String),
999}
1000
1001#[derive(Clone, Debug)]
1002pub struct CaptureOutcome {
1003 pub capsule: Capsule,
1004 pub gene: Gene,
1005 pub governor_decision: GovernorDecision,
1006}
1007
1008#[derive(Clone, Debug, Serialize, Deserialize)]
1009pub struct ImportOutcome {
1010 pub imported_asset_ids: Vec<String>,
1011 pub accepted: bool,
1012}
1013
1014#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
1015pub struct EvolutionMetricsSnapshot {
1016 pub replay_attempts_total: u64,
1017 pub replay_success_total: u64,
1018 pub replay_success_rate: f64,
1019 pub mutation_declared_total: u64,
1020 pub promoted_mutations_total: u64,
1021 pub promotion_ratio: f64,
1022 pub gene_revocations_total: u64,
1023 pub mutation_velocity_last_hour: u64,
1024 pub revoke_frequency_last_hour: u64,
1025 pub promoted_genes: u64,
1026 pub promoted_capsules: u64,
1027 pub last_event_seq: u64,
1028}
1029
1030#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
1031pub struct EvolutionHealthSnapshot {
1032 pub status: String,
1033 pub last_event_seq: u64,
1034 pub promoted_genes: u64,
1035 pub promoted_capsules: u64,
1036}
1037
1038#[derive(Clone)]
1039pub struct EvolutionNetworkNode {
1040 pub store: Arc<dyn EvolutionStore>,
1041}
1042
1043impl EvolutionNetworkNode {
1044 pub fn new(store: Arc<dyn EvolutionStore>) -> Self {
1045 Self { store }
1046 }
1047
1048 pub fn with_default_store() -> Self {
1049 Self {
1050 store: Arc::new(JsonlEvolutionStore::new(default_store_root())),
1051 }
1052 }
1053
1054 pub fn accept_publish_request(
1055 &self,
1056 request: &PublishRequest,
1057 ) -> Result<ImportOutcome, EvoKernelError> {
1058 import_remote_envelope_into_store(
1059 self.store.as_ref(),
1060 &EvolutionEnvelope::publish(request.sender_id.clone(), request.assets.clone()),
1061 )
1062 }
1063
1064 pub fn publish_local_assets(
1065 &self,
1066 sender_id: impl Into<String>,
1067 ) -> Result<EvolutionEnvelope, EvoKernelError> {
1068 export_promoted_assets_from_store(self.store.as_ref(), sender_id)
1069 }
1070
1071 pub fn fetch_assets(
1072 &self,
1073 responder_id: impl Into<String>,
1074 query: &FetchQuery,
1075 ) -> Result<FetchResponse, EvoKernelError> {
1076 fetch_assets_from_store(self.store.as_ref(), responder_id, query)
1077 }
1078
1079 pub fn revoke_assets(&self, notice: &RevokeNotice) -> Result<RevokeNotice, EvoKernelError> {
1080 revoke_assets_in_store(self.store.as_ref(), notice)
1081 }
1082
1083 pub fn metrics_snapshot(&self) -> Result<EvolutionMetricsSnapshot, EvoKernelError> {
1084 evolution_metrics_snapshot(self.store.as_ref())
1085 }
1086
1087 pub fn render_metrics_prometheus(&self) -> Result<String, EvoKernelError> {
1088 self.metrics_snapshot().map(|snapshot| {
1089 let health = evolution_health_snapshot(&snapshot);
1090 render_evolution_metrics_prometheus(&snapshot, &health)
1091 })
1092 }
1093
1094 pub fn health_snapshot(&self) -> Result<EvolutionHealthSnapshot, EvoKernelError> {
1095 self.metrics_snapshot()
1096 .map(|snapshot| evolution_health_snapshot(&snapshot))
1097 }
1098}
1099
1100pub struct EvoKernel<S: KernelState> {
1101 pub kernel: Arc<Kernel<S>>,
1102 pub sandbox: Arc<dyn Sandbox>,
1103 pub validator: Arc<dyn Validator>,
1104 pub store: Arc<dyn EvolutionStore>,
1105 pub selector: Arc<dyn Selector>,
1106 pub governor: Arc<dyn Governor>,
1107 pub economics: Arc<Mutex<EvuLedger>>,
1108 pub remote_publishers: Arc<Mutex<BTreeMap<String, String>>>,
1109 pub stake_policy: StakePolicy,
1110 pub sandbox_policy: SandboxPolicy,
1111 pub validation_plan: ValidationPlan,
1112}
1113
1114impl<S: KernelState> EvoKernel<S> {
1115 fn recent_prior_mutation_ages_secs(
1116 &self,
1117 exclude_mutation_id: Option<&str>,
1118 ) -> Result<Vec<u64>, EvolutionError> {
1119 let now = Utc::now();
1120 let mut ages = self
1121 .store
1122 .scan(1)?
1123 .into_iter()
1124 .filter_map(|stored| match stored.event {
1125 EvolutionEvent::MutationDeclared { mutation }
1126 if exclude_mutation_id != Some(mutation.intent.id.as_str()) =>
1127 {
1128 Self::seconds_since_timestamp(&stored.timestamp, now)
1129 }
1130 _ => None,
1131 })
1132 .collect::<Vec<_>>();
1133 ages.sort_unstable();
1134 Ok(ages)
1135 }
1136
1137 fn seconds_since_timestamp(timestamp: &str, now: DateTime<Utc>) -> Option<u64> {
1138 let parsed = DateTime::parse_from_rfc3339(timestamp)
1139 .ok()?
1140 .with_timezone(&Utc);
1141 let elapsed = now.signed_duration_since(parsed);
1142 if elapsed < Duration::zero() {
1143 Some(0)
1144 } else {
1145 u64::try_from(elapsed.num_seconds()).ok()
1146 }
1147 }
1148
1149 pub fn new(
1150 kernel: Arc<Kernel<S>>,
1151 sandbox: Arc<dyn Sandbox>,
1152 validator: Arc<dyn Validator>,
1153 store: Arc<dyn EvolutionStore>,
1154 ) -> Self {
1155 let selector: Arc<dyn Selector> = Arc::new(StoreBackedSelector::new(store.clone()));
1156 Self {
1157 kernel,
1158 sandbox,
1159 validator,
1160 store,
1161 selector,
1162 governor: Arc::new(DefaultGovernor::default()),
1163 economics: Arc::new(Mutex::new(EvuLedger::default())),
1164 remote_publishers: Arc::new(Mutex::new(BTreeMap::new())),
1165 stake_policy: StakePolicy::default(),
1166 sandbox_policy: SandboxPolicy::oris_default(),
1167 validation_plan: ValidationPlan::oris_default(),
1168 }
1169 }
1170
1171 pub fn with_selector(mut self, selector: Arc<dyn Selector>) -> Self {
1172 self.selector = selector;
1173 self
1174 }
1175
1176 pub fn with_sandbox_policy(mut self, policy: SandboxPolicy) -> Self {
1177 self.sandbox_policy = policy;
1178 self
1179 }
1180
1181 pub fn with_governor(mut self, governor: Arc<dyn Governor>) -> Self {
1182 self.governor = governor;
1183 self
1184 }
1185
1186 pub fn with_economics(mut self, economics: Arc<Mutex<EvuLedger>>) -> Self {
1187 self.economics = economics;
1188 self
1189 }
1190
1191 pub fn with_stake_policy(mut self, policy: StakePolicy) -> Self {
1192 self.stake_policy = policy;
1193 self
1194 }
1195
1196 pub fn with_validation_plan(mut self, plan: ValidationPlan) -> Self {
1197 self.validation_plan = plan;
1198 self
1199 }
1200
1201 pub fn select_candidates(&self, input: &SelectorInput) -> Vec<GeneCandidate> {
1202 self.selector.select(input)
1203 }
1204
1205 pub async fn capture_successful_mutation(
1206 &self,
1207 run_id: &RunId,
1208 mutation: PreparedMutation,
1209 ) -> Result<Capsule, EvoKernelError> {
1210 Ok(self
1211 .capture_mutation_with_governor(run_id, mutation)
1212 .await?
1213 .capsule)
1214 }
1215
1216 pub async fn capture_mutation_with_governor(
1217 &self,
1218 run_id: &RunId,
1219 mutation: PreparedMutation,
1220 ) -> Result<CaptureOutcome, EvoKernelError> {
1221 self.store
1222 .append_event(EvolutionEvent::MutationDeclared {
1223 mutation: mutation.clone(),
1224 })
1225 .map_err(store_err)?;
1226
1227 let receipt = match self.sandbox.apply(&mutation, &self.sandbox_policy).await {
1228 Ok(receipt) => receipt,
1229 Err(err) => {
1230 self.store
1231 .append_event(EvolutionEvent::MutationRejected {
1232 mutation_id: mutation.intent.id.clone(),
1233 reason: err.to_string(),
1234 })
1235 .map_err(store_err)?;
1236 return Err(EvoKernelError::Sandbox(err.to_string()));
1237 }
1238 };
1239
1240 self.store
1241 .append_event(EvolutionEvent::MutationApplied {
1242 mutation_id: mutation.intent.id.clone(),
1243 patch_hash: receipt.patch_hash.clone(),
1244 changed_files: receipt
1245 .changed_files
1246 .iter()
1247 .map(|path| path.to_string_lossy().to_string())
1248 .collect(),
1249 })
1250 .map_err(store_err)?;
1251
1252 let report = self
1253 .validator
1254 .run(&receipt, &self.validation_plan)
1255 .await
1256 .map_err(|err| EvoKernelError::Validation(err.to_string()))?;
1257 if !report.success {
1258 self.store
1259 .append_event(EvolutionEvent::ValidationFailed {
1260 mutation_id: mutation.intent.id.clone(),
1261 report: report.to_snapshot(&self.validation_plan.profile),
1262 gene_id: None,
1263 })
1264 .map_err(store_err)?;
1265 return Err(EvoKernelError::ValidationFailed(report));
1266 }
1267
1268 self.store
1269 .append_event(EvolutionEvent::ValidationPassed {
1270 mutation_id: mutation.intent.id.clone(),
1271 report: report.to_snapshot(&self.validation_plan.profile),
1272 gene_id: None,
1273 })
1274 .map_err(store_err)?;
1275
1276 let extracted_signals = extract_deterministic_signals(&SignalExtractionInput {
1277 patch_diff: mutation.artifact.payload.clone(),
1278 intent: mutation.intent.intent.clone(),
1279 expected_effect: mutation.intent.expected_effect.clone(),
1280 declared_signals: mutation.intent.signals.clone(),
1281 changed_files: receipt
1282 .changed_files
1283 .iter()
1284 .map(|path| path.to_string_lossy().to_string())
1285 .collect(),
1286 validation_success: report.success,
1287 validation_logs: report.logs.clone(),
1288 stage_outputs: report
1289 .stages
1290 .iter()
1291 .flat_map(|stage| [stage.stdout.clone(), stage.stderr.clone()])
1292 .filter(|value| !value.is_empty())
1293 .collect(),
1294 });
1295 self.store
1296 .append_event(EvolutionEvent::SignalsExtracted {
1297 mutation_id: mutation.intent.id.clone(),
1298 hash: extracted_signals.hash.clone(),
1299 signals: extracted_signals.values.clone(),
1300 })
1301 .map_err(store_err)?;
1302
1303 let projection = self.store.rebuild_projection().map_err(store_err)?;
1304 let blast_radius = compute_blast_radius(&mutation.artifact.payload);
1305 let recent_mutation_ages_secs = self
1306 .recent_prior_mutation_ages_secs(Some(mutation.intent.id.as_str()))
1307 .map_err(store_err)?;
1308 let mut gene = derive_gene(
1309 &mutation,
1310 &receipt,
1311 &self.validation_plan.profile,
1312 &extracted_signals.values,
1313 );
1314 let success_count = projection
1315 .genes
1316 .iter()
1317 .find(|existing| existing.id == gene.id)
1318 .map(|existing| {
1319 projection
1320 .capsules
1321 .iter()
1322 .filter(|capsule| capsule.gene_id == existing.id)
1323 .count() as u64
1324 })
1325 .unwrap_or(0)
1326 + 1;
1327 let governor_decision = self.governor.evaluate(GovernorInput {
1328 candidate_source: CandidateSource::Local,
1329 success_count,
1330 blast_radius: blast_radius.clone(),
1331 replay_failures: 0,
1332 recent_mutation_ages_secs,
1333 current_confidence: 0.7,
1334 historical_peak_confidence: 0.7,
1335 confidence_last_updated_secs: Some(0),
1336 });
1337
1338 gene.state = governor_decision.target_state.clone();
1339 self.store
1340 .append_event(EvolutionEvent::GeneProjected { gene: gene.clone() })
1341 .map_err(store_err)?;
1342 self.store
1343 .append_event(EvolutionEvent::PromotionEvaluated {
1344 gene_id: gene.id.clone(),
1345 state: governor_decision.target_state.clone(),
1346 reason: governor_decision.reason.clone(),
1347 })
1348 .map_err(store_err)?;
1349 if matches!(governor_decision.target_state, AssetState::Promoted) {
1350 self.store
1351 .append_event(EvolutionEvent::GenePromoted {
1352 gene_id: gene.id.clone(),
1353 })
1354 .map_err(store_err)?;
1355 }
1356 if let Some(spec_id) = &mutation.intent.spec_id {
1357 self.store
1358 .append_event(EvolutionEvent::SpecLinked {
1359 mutation_id: mutation.intent.id.clone(),
1360 spec_id: spec_id.clone(),
1361 })
1362 .map_err(store_err)?;
1363 }
1364
1365 let mut capsule = build_capsule(
1366 run_id,
1367 &mutation,
1368 &receipt,
1369 &report,
1370 &self.validation_plan.profile,
1371 &gene,
1372 &blast_radius,
1373 )
1374 .map_err(|err| EvoKernelError::Validation(err.to_string()))?;
1375 capsule.state = governor_decision.target_state.clone();
1376 self.store
1377 .append_event(EvolutionEvent::CapsuleCommitted {
1378 capsule: capsule.clone(),
1379 })
1380 .map_err(store_err)?;
1381 if matches!(governor_decision.target_state, AssetState::Quarantined) {
1382 self.store
1383 .append_event(EvolutionEvent::CapsuleQuarantined {
1384 capsule_id: capsule.id.clone(),
1385 })
1386 .map_err(store_err)?;
1387 }
1388
1389 Ok(CaptureOutcome {
1390 capsule,
1391 gene,
1392 governor_decision,
1393 })
1394 }
1395
1396 pub async fn capture_from_proposal(
1397 &self,
1398 run_id: &RunId,
1399 proposal: &AgentMutationProposal,
1400 diff_payload: String,
1401 base_revision: Option<String>,
1402 ) -> Result<CaptureOutcome, EvoKernelError> {
1403 let intent = MutationIntent {
1404 id: next_id("proposal"),
1405 intent: proposal.intent.clone(),
1406 target: MutationTarget::Paths {
1407 allow: proposal.files.clone(),
1408 },
1409 expected_effect: proposal.expected_effect.clone(),
1410 risk: RiskLevel::Low,
1411 signals: proposal.files.clone(),
1412 spec_id: None,
1413 };
1414 self.capture_mutation_with_governor(
1415 run_id,
1416 prepare_mutation(intent, diff_payload, base_revision),
1417 )
1418 .await
1419 }
1420
1421 pub fn feedback_for_agent(outcome: &CaptureOutcome) -> ExecutionFeedback {
1422 ExecutionFeedback {
1423 accepted: !matches!(outcome.governor_decision.target_state, AssetState::Revoked),
1424 asset_state: Some(format!("{:?}", outcome.governor_decision.target_state)),
1425 summary: outcome.governor_decision.reason.clone(),
1426 }
1427 }
1428
1429 pub fn coordinate(&self, plan: CoordinationPlan) -> CoordinationResult {
1430 MultiAgentCoordinator::new().coordinate(plan)
1431 }
1432
1433 pub fn export_promoted_assets(
1434 &self,
1435 sender_id: impl Into<String>,
1436 ) -> Result<EvolutionEnvelope, EvoKernelError> {
1437 let sender_id = sender_id.into();
1438 let envelope = export_promoted_assets_from_store(self.store.as_ref(), sender_id.clone())?;
1439 if !envelope.assets.is_empty() {
1440 let mut ledger = self
1441 .economics
1442 .lock()
1443 .map_err(|_| EvoKernelError::Validation("economics ledger lock poisoned".into()))?;
1444 if ledger
1445 .reserve_publish_stake(&sender_id, &self.stake_policy)
1446 .is_none()
1447 {
1448 return Err(EvoKernelError::Validation(
1449 "insufficient EVU for remote publish".into(),
1450 ));
1451 }
1452 }
1453 Ok(envelope)
1454 }
1455
1456 pub fn import_remote_envelope(
1457 &self,
1458 envelope: &EvolutionEnvelope,
1459 ) -> Result<ImportOutcome, EvoKernelError> {
1460 let outcome = import_remote_envelope_into_store(self.store.as_ref(), envelope)?;
1461 self.record_remote_publishers(envelope);
1462 Ok(outcome)
1463 }
1464
1465 pub fn fetch_assets(
1466 &self,
1467 responder_id: impl Into<String>,
1468 query: &FetchQuery,
1469 ) -> Result<FetchResponse, EvoKernelError> {
1470 fetch_assets_from_store(self.store.as_ref(), responder_id, query)
1471 }
1472
1473 pub fn revoke_assets(&self, notice: &RevokeNotice) -> Result<RevokeNotice, EvoKernelError> {
1474 revoke_assets_in_store(self.store.as_ref(), notice)
1475 }
1476
1477 pub async fn replay_or_fallback(
1478 &self,
1479 input: SelectorInput,
1480 ) -> Result<ReplayDecision, EvoKernelError> {
1481 let executor = StoreReplayExecutor {
1482 sandbox: self.sandbox.clone(),
1483 validator: self.validator.clone(),
1484 store: self.store.clone(),
1485 selector: self.selector.clone(),
1486 governor: self.governor.clone(),
1487 economics: Some(self.economics.clone()),
1488 remote_publishers: Some(self.remote_publishers.clone()),
1489 stake_policy: self.stake_policy.clone(),
1490 };
1491 executor
1492 .try_replay(&input, &self.sandbox_policy, &self.validation_plan)
1493 .await
1494 .map_err(|err| EvoKernelError::Validation(err.to_string()))
1495 }
1496
1497 pub fn economics_signal(&self, node_id: &str) -> Option<EconomicsSignal> {
1498 self.economics.lock().ok()?.governor_signal(node_id)
1499 }
1500
1501 pub fn selector_reputation_bias(&self) -> BTreeMap<String, f32> {
1502 self.economics
1503 .lock()
1504 .ok()
1505 .map(|locked| locked.selector_reputation_bias())
1506 .unwrap_or_default()
1507 }
1508
1509 pub fn metrics_snapshot(&self) -> Result<EvolutionMetricsSnapshot, EvoKernelError> {
1510 evolution_metrics_snapshot(self.store.as_ref())
1511 }
1512
1513 pub fn render_metrics_prometheus(&self) -> Result<String, EvoKernelError> {
1514 self.metrics_snapshot().map(|snapshot| {
1515 let health = evolution_health_snapshot(&snapshot);
1516 render_evolution_metrics_prometheus(&snapshot, &health)
1517 })
1518 }
1519
1520 pub fn health_snapshot(&self) -> Result<EvolutionHealthSnapshot, EvoKernelError> {
1521 self.metrics_snapshot()
1522 .map(|snapshot| evolution_health_snapshot(&snapshot))
1523 }
1524
1525 fn record_remote_publishers(&self, envelope: &EvolutionEnvelope) {
1526 let sender_id = envelope.sender_id.trim();
1527 if sender_id.is_empty() {
1528 return;
1529 }
1530 let Ok(mut publishers) = self.remote_publishers.lock() else {
1531 return;
1532 };
1533 for asset in &envelope.assets {
1534 match asset {
1535 NetworkAsset::Gene { gene } => {
1536 publishers.insert(gene.id.clone(), sender_id.to_string());
1537 }
1538 NetworkAsset::Capsule { capsule } => {
1539 publishers.insert(capsule.gene_id.clone(), sender_id.to_string());
1540 }
1541 NetworkAsset::EvolutionEvent { .. } => {}
1542 }
1543 }
1544 }
1545}
1546
1547pub fn prepare_mutation(
1548 intent: MutationIntent,
1549 diff_payload: String,
1550 base_revision: Option<String>,
1551) -> PreparedMutation {
1552 PreparedMutation {
1553 intent,
1554 artifact: MutationArtifact {
1555 encoding: ArtifactEncoding::UnifiedDiff,
1556 content_hash: compute_artifact_hash(&diff_payload),
1557 payload: diff_payload,
1558 base_revision,
1559 },
1560 }
1561}
1562
1563pub fn prepare_mutation_from_spec(
1564 plan: CompiledMutationPlan,
1565 diff_payload: String,
1566 base_revision: Option<String>,
1567) -> PreparedMutation {
1568 prepare_mutation(plan.mutation_intent, diff_payload, base_revision)
1569}
1570
1571pub fn default_evolution_store() -> Arc<dyn EvolutionStore> {
1572 Arc::new(oris_evolution::JsonlEvolutionStore::new(
1573 default_store_root(),
1574 ))
1575}
1576
1577fn derive_gene(
1578 mutation: &PreparedMutation,
1579 receipt: &SandboxReceipt,
1580 validation_profile: &str,
1581 extracted_signals: &[String],
1582) -> Gene {
1583 let mut strategy = BTreeSet::new();
1584 for file in &receipt.changed_files {
1585 if let Some(component) = file.components().next() {
1586 strategy.insert(component.as_os_str().to_string_lossy().to_string());
1587 }
1588 }
1589 for token in mutation
1590 .artifact
1591 .payload
1592 .split(|ch: char| !ch.is_ascii_alphanumeric())
1593 {
1594 if token.len() == 5
1595 && token.starts_with('E')
1596 && token[1..].chars().all(|ch| ch.is_ascii_digit())
1597 {
1598 strategy.insert(token.to_string());
1599 }
1600 }
1601 for token in mutation.intent.intent.split_whitespace().take(8) {
1602 strategy.insert(token.to_ascii_lowercase());
1603 }
1604 let strategy = strategy.into_iter().collect::<Vec<_>>();
1605 let id = stable_hash_json(&(extracted_signals, &strategy, validation_profile))
1606 .unwrap_or_else(|_| next_id("gene"));
1607 Gene {
1608 id,
1609 signals: extracted_signals.to_vec(),
1610 strategy,
1611 validation: vec![validation_profile.to_string()],
1612 state: AssetState::Promoted,
1613 }
1614}
1615
1616fn build_capsule(
1617 run_id: &RunId,
1618 mutation: &PreparedMutation,
1619 receipt: &SandboxReceipt,
1620 report: &ValidationReport,
1621 validation_profile: &str,
1622 gene: &Gene,
1623 blast_radius: &BlastRadius,
1624) -> Result<Capsule, EvolutionError> {
1625 let env = current_env_fingerprint(&receipt.workdir);
1626 let validator_hash = stable_hash_json(report)?;
1627 let diff_hash = mutation.artifact.content_hash.clone();
1628 let id = stable_hash_json(&(run_id, &gene.id, &diff_hash, &mutation.intent.id))?;
1629 Ok(Capsule {
1630 id,
1631 gene_id: gene.id.clone(),
1632 mutation_id: mutation.intent.id.clone(),
1633 run_id: run_id.clone(),
1634 diff_hash,
1635 confidence: 0.7,
1636 env,
1637 outcome: oris_evolution::Outcome {
1638 success: true,
1639 validation_profile: validation_profile.to_string(),
1640 validation_duration_ms: report.duration_ms,
1641 changed_files: receipt
1642 .changed_files
1643 .iter()
1644 .map(|path| path.to_string_lossy().to_string())
1645 .collect(),
1646 validator_hash,
1647 lines_changed: blast_radius.lines_changed,
1648 replay_verified: false,
1649 },
1650 state: AssetState::Promoted,
1651 })
1652}
1653
1654fn current_env_fingerprint(workdir: &Path) -> EnvFingerprint {
1655 let rustc_version = Command::new("rustc")
1656 .arg("--version")
1657 .output()
1658 .ok()
1659 .filter(|output| output.status.success())
1660 .map(|output| String::from_utf8_lossy(&output.stdout).trim().to_string())
1661 .unwrap_or_else(|| "rustc unknown".into());
1662 let cargo_lock_hash = fs::read(workdir.join("Cargo.lock"))
1663 .ok()
1664 .map(|bytes| {
1665 let value = String::from_utf8_lossy(&bytes);
1666 compute_artifact_hash(&value)
1667 })
1668 .unwrap_or_else(|| "missing-cargo-lock".into());
1669 let target_triple = format!(
1670 "{}-unknown-{}",
1671 std::env::consts::ARCH,
1672 std::env::consts::OS
1673 );
1674 EnvFingerprint {
1675 rustc_version,
1676 cargo_lock_hash,
1677 target_triple,
1678 os: std::env::consts::OS.to_string(),
1679 }
1680}
1681
1682fn extend_signal_tokens(out: &mut BTreeSet<String>, input: &str) {
1683 for raw in input.split(|ch: char| !ch.is_ascii_alphanumeric()) {
1684 let trimmed = raw.trim();
1685 if trimmed.is_empty() {
1686 continue;
1687 }
1688 let normalized = if is_rust_error_code(trimmed) {
1689 let mut chars = trimmed.chars();
1690 let prefix = chars
1691 .next()
1692 .map(|ch| ch.to_ascii_uppercase())
1693 .unwrap_or('E');
1694 format!("{prefix}{}", chars.as_str())
1695 } else {
1696 trimmed.to_ascii_lowercase()
1697 };
1698 if normalized.len() < 3 {
1699 continue;
1700 }
1701 out.insert(normalized);
1702 }
1703}
1704
1705fn normalize_signal_phrase(input: &str) -> Option<String> {
1706 let normalized = input
1707 .split(|ch: char| !ch.is_ascii_alphanumeric())
1708 .filter_map(|raw| {
1709 let trimmed = raw.trim();
1710 if trimmed.is_empty() {
1711 return None;
1712 }
1713 let normalized = if is_rust_error_code(trimmed) {
1714 let mut chars = trimmed.chars();
1715 let prefix = chars
1716 .next()
1717 .map(|ch| ch.to_ascii_uppercase())
1718 .unwrap_or('E');
1719 format!("{prefix}{}", chars.as_str())
1720 } else {
1721 trimmed.to_ascii_lowercase()
1722 };
1723 if normalized.len() < 3 {
1724 None
1725 } else {
1726 Some(normalized)
1727 }
1728 })
1729 .collect::<Vec<_>>()
1730 .join(" ");
1731 if normalized.is_empty() {
1732 None
1733 } else {
1734 Some(normalized)
1735 }
1736}
1737
1738fn is_rust_error_code(value: &str) -> bool {
1739 value.len() == 5
1740 && matches!(value.as_bytes().first(), Some(b'e') | Some(b'E'))
1741 && value[1..].chars().all(|ch| ch.is_ascii_digit())
1742}
1743
1744fn find_declared_mutation(
1745 store: &dyn EvolutionStore,
1746 mutation_id: &MutationId,
1747) -> Result<Option<PreparedMutation>, EvolutionError> {
1748 for stored in store.scan(1)? {
1749 if let EvolutionEvent::MutationDeclared { mutation } = stored.event {
1750 if &mutation.intent.id == mutation_id {
1751 return Ok(Some(mutation));
1752 }
1753 }
1754 }
1755 Ok(None)
1756}
1757
1758fn exact_match_candidates(store: &dyn EvolutionStore, input: &SelectorInput) -> Vec<GeneCandidate> {
1759 let Ok(projection) = store.rebuild_projection() else {
1760 return Vec::new();
1761 };
1762 let capsules = projection.capsules.clone();
1763 let spec_ids_by_gene = projection.spec_ids_by_gene.clone();
1764 let requested_spec_id = input
1765 .spec_id
1766 .as_deref()
1767 .map(str::trim)
1768 .filter(|value| !value.is_empty());
1769 let signal_set = input
1770 .signals
1771 .iter()
1772 .map(|signal| signal.to_ascii_lowercase())
1773 .collect::<BTreeSet<_>>();
1774 let mut candidates = projection
1775 .genes
1776 .into_iter()
1777 .filter_map(|gene| {
1778 if gene.state != AssetState::Promoted {
1779 return None;
1780 }
1781 if let Some(spec_id) = requested_spec_id {
1782 let matches_spec = spec_ids_by_gene
1783 .get(&gene.id)
1784 .map(|values| {
1785 values
1786 .iter()
1787 .any(|value| value.eq_ignore_ascii_case(spec_id))
1788 })
1789 .unwrap_or(false);
1790 if !matches_spec {
1791 return None;
1792 }
1793 }
1794 let gene_signals = gene
1795 .signals
1796 .iter()
1797 .map(|signal| signal.to_ascii_lowercase())
1798 .collect::<BTreeSet<_>>();
1799 if gene_signals == signal_set {
1800 let mut matched_capsules = capsules
1801 .iter()
1802 .filter(|capsule| {
1803 capsule.gene_id == gene.id && capsule.state == AssetState::Promoted
1804 })
1805 .cloned()
1806 .collect::<Vec<_>>();
1807 matched_capsules.sort_by(|left, right| {
1808 replay_environment_match_factor(&input.env, &right.env)
1809 .partial_cmp(&replay_environment_match_factor(&input.env, &left.env))
1810 .unwrap_or(std::cmp::Ordering::Equal)
1811 .then_with(|| {
1812 right
1813 .confidence
1814 .partial_cmp(&left.confidence)
1815 .unwrap_or(std::cmp::Ordering::Equal)
1816 })
1817 .then_with(|| left.id.cmp(&right.id))
1818 });
1819 if matched_capsules.is_empty() {
1820 None
1821 } else {
1822 let score = matched_capsules
1823 .first()
1824 .map(|capsule| replay_environment_match_factor(&input.env, &capsule.env))
1825 .unwrap_or(0.0);
1826 Some(GeneCandidate {
1827 gene,
1828 score,
1829 capsules: matched_capsules,
1830 })
1831 }
1832 } else {
1833 None
1834 }
1835 })
1836 .collect::<Vec<_>>();
1837 candidates.sort_by(|left, right| {
1838 right
1839 .score
1840 .partial_cmp(&left.score)
1841 .unwrap_or(std::cmp::Ordering::Equal)
1842 .then_with(|| left.gene.id.cmp(&right.gene.id))
1843 });
1844 candidates
1845}
1846
1847fn quarantined_remote_exact_match_candidates(
1848 store: &dyn EvolutionStore,
1849 input: &SelectorInput,
1850) -> Vec<GeneCandidate> {
1851 let remote_asset_ids = store
1852 .scan(1)
1853 .ok()
1854 .map(|events| {
1855 events
1856 .into_iter()
1857 .filter_map(|stored| match stored.event {
1858 EvolutionEvent::RemoteAssetImported {
1859 source: CandidateSource::Remote,
1860 asset_ids,
1861 } => Some(asset_ids),
1862 _ => None,
1863 })
1864 .flatten()
1865 .collect::<BTreeSet<_>>()
1866 })
1867 .unwrap_or_default();
1868 if remote_asset_ids.is_empty() {
1869 return Vec::new();
1870 }
1871
1872 let Ok(projection) = store.rebuild_projection() else {
1873 return Vec::new();
1874 };
1875 let capsules = projection.capsules.clone();
1876 let spec_ids_by_gene = projection.spec_ids_by_gene.clone();
1877 let requested_spec_id = input
1878 .spec_id
1879 .as_deref()
1880 .map(str::trim)
1881 .filter(|value| !value.is_empty());
1882 let signal_set = input
1883 .signals
1884 .iter()
1885 .map(|signal| signal.to_ascii_lowercase())
1886 .collect::<BTreeSet<_>>();
1887 let mut candidates = projection
1888 .genes
1889 .into_iter()
1890 .filter_map(|gene| {
1891 if gene.state != AssetState::Promoted {
1892 return None;
1893 }
1894 if let Some(spec_id) = requested_spec_id {
1895 let matches_spec = spec_ids_by_gene
1896 .get(&gene.id)
1897 .map(|values| {
1898 values
1899 .iter()
1900 .any(|value| value.eq_ignore_ascii_case(spec_id))
1901 })
1902 .unwrap_or(false);
1903 if !matches_spec {
1904 return None;
1905 }
1906 }
1907 let gene_signals = gene
1908 .signals
1909 .iter()
1910 .map(|signal| signal.to_ascii_lowercase())
1911 .collect::<BTreeSet<_>>();
1912 if gene_signals == signal_set {
1913 let mut matched_capsules = capsules
1914 .iter()
1915 .filter(|capsule| {
1916 capsule.gene_id == gene.id
1917 && capsule.state == AssetState::Quarantined
1918 && remote_asset_ids.contains(&capsule.id)
1919 })
1920 .cloned()
1921 .collect::<Vec<_>>();
1922 matched_capsules.sort_by(|left, right| {
1923 replay_environment_match_factor(&input.env, &right.env)
1924 .partial_cmp(&replay_environment_match_factor(&input.env, &left.env))
1925 .unwrap_or(std::cmp::Ordering::Equal)
1926 .then_with(|| {
1927 right
1928 .confidence
1929 .partial_cmp(&left.confidence)
1930 .unwrap_or(std::cmp::Ordering::Equal)
1931 })
1932 .then_with(|| left.id.cmp(&right.id))
1933 });
1934 if matched_capsules.is_empty() {
1935 None
1936 } else {
1937 let score = matched_capsules
1938 .first()
1939 .map(|capsule| replay_environment_match_factor(&input.env, &capsule.env))
1940 .unwrap_or(0.0);
1941 Some(GeneCandidate {
1942 gene,
1943 score,
1944 capsules: matched_capsules,
1945 })
1946 }
1947 } else {
1948 None
1949 }
1950 })
1951 .collect::<Vec<_>>();
1952 candidates.sort_by(|left, right| {
1953 right
1954 .score
1955 .partial_cmp(&left.score)
1956 .unwrap_or(std::cmp::Ordering::Equal)
1957 .then_with(|| left.gene.id.cmp(&right.gene.id))
1958 });
1959 candidates
1960}
1961
1962fn replay_environment_match_factor(input: &EnvFingerprint, candidate: &EnvFingerprint) -> f32 {
1963 let fields = [
1964 input
1965 .rustc_version
1966 .eq_ignore_ascii_case(&candidate.rustc_version),
1967 input
1968 .cargo_lock_hash
1969 .eq_ignore_ascii_case(&candidate.cargo_lock_hash),
1970 input
1971 .target_triple
1972 .eq_ignore_ascii_case(&candidate.target_triple),
1973 input.os.eq_ignore_ascii_case(&candidate.os),
1974 ];
1975 let matched_fields = fields.into_iter().filter(|matched| *matched).count() as f32;
1976 0.5 + ((matched_fields / 4.0) * 0.5)
1977}
1978
1979fn effective_candidate_score(
1980 candidate: &GeneCandidate,
1981 publishers_by_gene: &BTreeMap<String, String>,
1982 reputation_bias: &BTreeMap<String, f32>,
1983) -> f32 {
1984 let bias = publishers_by_gene
1985 .get(&candidate.gene.id)
1986 .and_then(|publisher| reputation_bias.get(publisher))
1987 .copied()
1988 .unwrap_or(0.0)
1989 .clamp(0.0, 1.0);
1990 candidate.score * (1.0 + (bias * 0.1))
1991}
1992
1993fn export_promoted_assets_from_store(
1994 store: &dyn EvolutionStore,
1995 sender_id: impl Into<String>,
1996) -> Result<EvolutionEnvelope, EvoKernelError> {
1997 let projection = store.rebuild_projection().map_err(store_err)?;
1998 let mut assets = Vec::new();
1999 for gene in projection
2000 .genes
2001 .into_iter()
2002 .filter(|gene| gene.state == AssetState::Promoted)
2003 {
2004 assets.push(NetworkAsset::Gene { gene });
2005 }
2006 for capsule in projection
2007 .capsules
2008 .into_iter()
2009 .filter(|capsule| capsule.state == AssetState::Promoted)
2010 {
2011 assets.push(NetworkAsset::Capsule { capsule });
2012 }
2013 Ok(EvolutionEnvelope::publish(sender_id, assets))
2014}
2015
2016fn import_remote_envelope_into_store(
2017 store: &dyn EvolutionStore,
2018 envelope: &EvolutionEnvelope,
2019) -> Result<ImportOutcome, EvoKernelError> {
2020 if !envelope.verify_content_hash() {
2021 return Err(EvoKernelError::Validation(
2022 "invalid evolution envelope hash".into(),
2023 ));
2024 }
2025
2026 let mut imported_asset_ids = Vec::new();
2027 for asset in &envelope.assets {
2028 match asset {
2029 NetworkAsset::Gene { gene } => {
2030 imported_asset_ids.push(gene.id.clone());
2031 store
2032 .append_event(EvolutionEvent::RemoteAssetImported {
2033 source: CandidateSource::Remote,
2034 asset_ids: vec![gene.id.clone()],
2035 })
2036 .map_err(store_err)?;
2037 store
2038 .append_event(EvolutionEvent::GeneProjected { gene: gene.clone() })
2039 .map_err(store_err)?;
2040 }
2041 NetworkAsset::Capsule { capsule } => {
2042 imported_asset_ids.push(capsule.id.clone());
2043 store
2044 .append_event(EvolutionEvent::RemoteAssetImported {
2045 source: CandidateSource::Remote,
2046 asset_ids: vec![capsule.id.clone()],
2047 })
2048 .map_err(store_err)?;
2049 let mut quarantined = capsule.clone();
2050 quarantined.state = AssetState::Quarantined;
2051 store
2052 .append_event(EvolutionEvent::CapsuleCommitted {
2053 capsule: quarantined.clone(),
2054 })
2055 .map_err(store_err)?;
2056 store
2057 .append_event(EvolutionEvent::CapsuleQuarantined {
2058 capsule_id: quarantined.id,
2059 })
2060 .map_err(store_err)?;
2061 }
2062 NetworkAsset::EvolutionEvent { event } => {
2063 if should_import_remote_event(event) {
2064 store.append_event(event.clone()).map_err(store_err)?;
2065 }
2066 }
2067 }
2068 }
2069
2070 Ok(ImportOutcome {
2071 imported_asset_ids,
2072 accepted: true,
2073 })
2074}
2075
2076fn should_import_remote_event(event: &EvolutionEvent) -> bool {
2077 matches!(
2078 event,
2079 EvolutionEvent::MutationDeclared { .. } | EvolutionEvent::SpecLinked { .. }
2080 )
2081}
2082
2083fn fetch_assets_from_store(
2084 store: &dyn EvolutionStore,
2085 responder_id: impl Into<String>,
2086 query: &FetchQuery,
2087) -> Result<FetchResponse, EvoKernelError> {
2088 let projection = store.rebuild_projection().map_err(store_err)?;
2089 let normalized_signals: Vec<String> = query
2090 .signals
2091 .iter()
2092 .map(|signal| signal.trim().to_ascii_lowercase())
2093 .filter(|signal| !signal.is_empty())
2094 .collect();
2095 let matches_any_signal = |candidate: &str| {
2096 if normalized_signals.is_empty() {
2097 return true;
2098 }
2099 let candidate = candidate.to_ascii_lowercase();
2100 normalized_signals
2101 .iter()
2102 .any(|signal| candidate.contains(signal) || signal.contains(&candidate))
2103 };
2104
2105 let matched_genes: Vec<Gene> = projection
2106 .genes
2107 .into_iter()
2108 .filter(|gene| gene.state == AssetState::Promoted)
2109 .filter(|gene| gene.signals.iter().any(|signal| matches_any_signal(signal)))
2110 .collect();
2111 let matched_gene_ids: BTreeSet<String> =
2112 matched_genes.iter().map(|gene| gene.id.clone()).collect();
2113 let matched_capsules: Vec<Capsule> = projection
2114 .capsules
2115 .into_iter()
2116 .filter(|capsule| capsule.state == AssetState::Promoted)
2117 .filter(|capsule| matched_gene_ids.contains(&capsule.gene_id))
2118 .collect();
2119
2120 let mut assets = Vec::new();
2121 for gene in matched_genes {
2122 assets.push(NetworkAsset::Gene { gene });
2123 }
2124 for capsule in matched_capsules {
2125 assets.push(NetworkAsset::Capsule { capsule });
2126 }
2127
2128 Ok(FetchResponse {
2129 sender_id: responder_id.into(),
2130 assets,
2131 })
2132}
2133
2134fn revoke_assets_in_store(
2135 store: &dyn EvolutionStore,
2136 notice: &RevokeNotice,
2137) -> Result<RevokeNotice, EvoKernelError> {
2138 let projection = store.rebuild_projection().map_err(store_err)?;
2139 let requested: BTreeSet<String> = notice
2140 .asset_ids
2141 .iter()
2142 .map(|asset_id| asset_id.trim().to_string())
2143 .filter(|asset_id| !asset_id.is_empty())
2144 .collect();
2145 let mut revoked_gene_ids = BTreeSet::new();
2146 let mut quarantined_capsule_ids = BTreeSet::new();
2147
2148 for gene in &projection.genes {
2149 if requested.contains(&gene.id) {
2150 revoked_gene_ids.insert(gene.id.clone());
2151 }
2152 }
2153 for capsule in &projection.capsules {
2154 if requested.contains(&capsule.id) {
2155 quarantined_capsule_ids.insert(capsule.id.clone());
2156 revoked_gene_ids.insert(capsule.gene_id.clone());
2157 }
2158 }
2159 for capsule in &projection.capsules {
2160 if revoked_gene_ids.contains(&capsule.gene_id) {
2161 quarantined_capsule_ids.insert(capsule.id.clone());
2162 }
2163 }
2164
2165 for gene_id in &revoked_gene_ids {
2166 store
2167 .append_event(EvolutionEvent::GeneRevoked {
2168 gene_id: gene_id.clone(),
2169 reason: notice.reason.clone(),
2170 })
2171 .map_err(store_err)?;
2172 }
2173 for capsule_id in &quarantined_capsule_ids {
2174 store
2175 .append_event(EvolutionEvent::CapsuleQuarantined {
2176 capsule_id: capsule_id.clone(),
2177 })
2178 .map_err(store_err)?;
2179 }
2180
2181 let mut affected_ids: Vec<String> = revoked_gene_ids.into_iter().collect();
2182 affected_ids.extend(quarantined_capsule_ids);
2183 affected_ids.sort();
2184 affected_ids.dedup();
2185
2186 Ok(RevokeNotice {
2187 sender_id: notice.sender_id.clone(),
2188 asset_ids: affected_ids,
2189 reason: notice.reason.clone(),
2190 })
2191}
2192
2193fn evolution_metrics_snapshot(
2194 store: &dyn EvolutionStore,
2195) -> Result<EvolutionMetricsSnapshot, EvoKernelError> {
2196 let events = store.scan(1).map_err(store_err)?;
2197 let projection = store.rebuild_projection().map_err(store_err)?;
2198 let replay_success_total = events
2199 .iter()
2200 .filter(|stored| matches!(stored.event, EvolutionEvent::CapsuleReused { .. }))
2201 .count() as u64;
2202 let replay_failures_total = events
2203 .iter()
2204 .filter(|stored| is_replay_validation_failure(&stored.event))
2205 .count() as u64;
2206 let replay_attempts_total = replay_success_total + replay_failures_total;
2207 let mutation_declared_total = events
2208 .iter()
2209 .filter(|stored| matches!(stored.event, EvolutionEvent::MutationDeclared { .. }))
2210 .count() as u64;
2211 let promoted_mutations_total = events
2212 .iter()
2213 .filter(|stored| matches!(stored.event, EvolutionEvent::GenePromoted { .. }))
2214 .count() as u64;
2215 let gene_revocations_total = events
2216 .iter()
2217 .filter(|stored| matches!(stored.event, EvolutionEvent::GeneRevoked { .. }))
2218 .count() as u64;
2219 let cutoff = Utc::now() - Duration::hours(1);
2220 let mutation_velocity_last_hour = count_recent_events(&events, cutoff, |event| {
2221 matches!(event, EvolutionEvent::MutationDeclared { .. })
2222 });
2223 let revoke_frequency_last_hour = count_recent_events(&events, cutoff, |event| {
2224 matches!(event, EvolutionEvent::GeneRevoked { .. })
2225 });
2226 let promoted_genes = projection
2227 .genes
2228 .iter()
2229 .filter(|gene| gene.state == AssetState::Promoted)
2230 .count() as u64;
2231 let promoted_capsules = projection
2232 .capsules
2233 .iter()
2234 .filter(|capsule| capsule.state == AssetState::Promoted)
2235 .count() as u64;
2236
2237 Ok(EvolutionMetricsSnapshot {
2238 replay_attempts_total,
2239 replay_success_total,
2240 replay_success_rate: safe_ratio(replay_success_total, replay_attempts_total),
2241 mutation_declared_total,
2242 promoted_mutations_total,
2243 promotion_ratio: safe_ratio(promoted_mutations_total, mutation_declared_total),
2244 gene_revocations_total,
2245 mutation_velocity_last_hour,
2246 revoke_frequency_last_hour,
2247 promoted_genes,
2248 promoted_capsules,
2249 last_event_seq: events.last().map(|stored| stored.seq).unwrap_or(0),
2250 })
2251}
2252
2253fn evolution_health_snapshot(snapshot: &EvolutionMetricsSnapshot) -> EvolutionHealthSnapshot {
2254 EvolutionHealthSnapshot {
2255 status: "ok".into(),
2256 last_event_seq: snapshot.last_event_seq,
2257 promoted_genes: snapshot.promoted_genes,
2258 promoted_capsules: snapshot.promoted_capsules,
2259 }
2260}
2261
2262fn render_evolution_metrics_prometheus(
2263 snapshot: &EvolutionMetricsSnapshot,
2264 health: &EvolutionHealthSnapshot,
2265) -> String {
2266 let mut out = String::new();
2267 out.push_str(
2268 "# HELP oris_evolution_replay_attempts_total Total replay attempts that reached validation.\n",
2269 );
2270 out.push_str("# TYPE oris_evolution_replay_attempts_total counter\n");
2271 out.push_str(&format!(
2272 "oris_evolution_replay_attempts_total {}\n",
2273 snapshot.replay_attempts_total
2274 ));
2275 out.push_str("# HELP oris_evolution_replay_success_total Total replay attempts that reused a capsule successfully.\n");
2276 out.push_str("# TYPE oris_evolution_replay_success_total counter\n");
2277 out.push_str(&format!(
2278 "oris_evolution_replay_success_total {}\n",
2279 snapshot.replay_success_total
2280 ));
2281 out.push_str("# HELP oris_evolution_replay_success_rate Successful replay attempts divided by replay attempts that reached validation.\n");
2282 out.push_str("# TYPE oris_evolution_replay_success_rate gauge\n");
2283 out.push_str(&format!(
2284 "oris_evolution_replay_success_rate {:.6}\n",
2285 snapshot.replay_success_rate
2286 ));
2287 out.push_str(
2288 "# HELP oris_evolution_mutation_declared_total Total declared mutations recorded in the evolution log.\n",
2289 );
2290 out.push_str("# TYPE oris_evolution_mutation_declared_total counter\n");
2291 out.push_str(&format!(
2292 "oris_evolution_mutation_declared_total {}\n",
2293 snapshot.mutation_declared_total
2294 ));
2295 out.push_str("# HELP oris_evolution_promoted_mutations_total Total mutations promoted by the governor.\n");
2296 out.push_str("# TYPE oris_evolution_promoted_mutations_total counter\n");
2297 out.push_str(&format!(
2298 "oris_evolution_promoted_mutations_total {}\n",
2299 snapshot.promoted_mutations_total
2300 ));
2301 out.push_str(
2302 "# HELP oris_evolution_promotion_ratio Promoted mutations divided by declared mutations.\n",
2303 );
2304 out.push_str("# TYPE oris_evolution_promotion_ratio gauge\n");
2305 out.push_str(&format!(
2306 "oris_evolution_promotion_ratio {:.6}\n",
2307 snapshot.promotion_ratio
2308 ));
2309 out.push_str("# HELP oris_evolution_gene_revocations_total Total gene revocations recorded in the evolution log.\n");
2310 out.push_str("# TYPE oris_evolution_gene_revocations_total counter\n");
2311 out.push_str(&format!(
2312 "oris_evolution_gene_revocations_total {}\n",
2313 snapshot.gene_revocations_total
2314 ));
2315 out.push_str("# HELP oris_evolution_mutation_velocity_last_hour Declared mutations observed in the last hour.\n");
2316 out.push_str("# TYPE oris_evolution_mutation_velocity_last_hour gauge\n");
2317 out.push_str(&format!(
2318 "oris_evolution_mutation_velocity_last_hour {}\n",
2319 snapshot.mutation_velocity_last_hour
2320 ));
2321 out.push_str("# HELP oris_evolution_revoke_frequency_last_hour Gene revocations observed in the last hour.\n");
2322 out.push_str("# TYPE oris_evolution_revoke_frequency_last_hour gauge\n");
2323 out.push_str(&format!(
2324 "oris_evolution_revoke_frequency_last_hour {}\n",
2325 snapshot.revoke_frequency_last_hour
2326 ));
2327 out.push_str("# HELP oris_evolution_promoted_genes Current promoted genes in the evolution projection.\n");
2328 out.push_str("# TYPE oris_evolution_promoted_genes gauge\n");
2329 out.push_str(&format!(
2330 "oris_evolution_promoted_genes {}\n",
2331 snapshot.promoted_genes
2332 ));
2333 out.push_str("# HELP oris_evolution_promoted_capsules Current promoted capsules in the evolution projection.\n");
2334 out.push_str("# TYPE oris_evolution_promoted_capsules gauge\n");
2335 out.push_str(&format!(
2336 "oris_evolution_promoted_capsules {}\n",
2337 snapshot.promoted_capsules
2338 ));
2339 out.push_str("# HELP oris_evolution_store_last_event_seq Last visible append-only evolution event sequence.\n");
2340 out.push_str("# TYPE oris_evolution_store_last_event_seq gauge\n");
2341 out.push_str(&format!(
2342 "oris_evolution_store_last_event_seq {}\n",
2343 snapshot.last_event_seq
2344 ));
2345 out.push_str(
2346 "# HELP oris_evolution_health Evolution observability store health (1 = healthy).\n",
2347 );
2348 out.push_str("# TYPE oris_evolution_health gauge\n");
2349 out.push_str(&format!(
2350 "oris_evolution_health {}\n",
2351 u8::from(health.status == "ok")
2352 ));
2353 out
2354}
2355
2356fn count_recent_events(
2357 events: &[StoredEvolutionEvent],
2358 cutoff: DateTime<Utc>,
2359 predicate: impl Fn(&EvolutionEvent) -> bool,
2360) -> u64 {
2361 events
2362 .iter()
2363 .filter(|stored| {
2364 predicate(&stored.event)
2365 && parse_event_timestamp(&stored.timestamp)
2366 .map(|timestamp| timestamp >= cutoff)
2367 .unwrap_or(false)
2368 })
2369 .count() as u64
2370}
2371
2372fn parse_event_timestamp(raw: &str) -> Option<DateTime<Utc>> {
2373 DateTime::parse_from_rfc3339(raw)
2374 .ok()
2375 .map(|parsed| parsed.with_timezone(&Utc))
2376}
2377
2378fn is_replay_validation_failure(event: &EvolutionEvent) -> bool {
2379 matches!(
2380 event,
2381 EvolutionEvent::ValidationFailed {
2382 gene_id: Some(_),
2383 ..
2384 }
2385 )
2386}
2387
2388fn safe_ratio(numerator: u64, denominator: u64) -> f64 {
2389 if denominator == 0 {
2390 0.0
2391 } else {
2392 numerator as f64 / denominator as f64
2393 }
2394}
2395
2396fn store_err(err: EvolutionError) -> EvoKernelError {
2397 EvoKernelError::Store(err.to_string())
2398}
2399
2400#[cfg(test)]
2401mod tests {
2402 use super::*;
2403 use oris_agent_contract::{
2404 AgentRole, CoordinationPlan, CoordinationPrimitive, CoordinationTask,
2405 };
2406 use oris_kernel::{
2407 AllowAllPolicy, InMemoryEventStore, KernelMode, KernelState, NoopActionExecutor,
2408 NoopStepFn, StateUpdatedOnlyReducer,
2409 };
2410 use serde::{Deserialize, Serialize};
2411
2412 #[derive(Clone, Debug, Default, Serialize, Deserialize)]
2413 struct TestState;
2414
2415 impl KernelState for TestState {
2416 fn version(&self) -> u32 {
2417 1
2418 }
2419 }
2420
2421 fn temp_workspace(name: &str) -> std::path::PathBuf {
2422 let root =
2423 std::env::temp_dir().join(format!("oris-evokernel-{name}-{}", std::process::id()));
2424 if root.exists() {
2425 fs::remove_dir_all(&root).unwrap();
2426 }
2427 fs::create_dir_all(root.join("src")).unwrap();
2428 fs::write(
2429 root.join("Cargo.toml"),
2430 "[package]\nname = \"sample\"\nversion = \"0.1.0\"\nedition = \"2021\"\n",
2431 )
2432 .unwrap();
2433 fs::write(root.join("Cargo.lock"), "# lock\n").unwrap();
2434 fs::write(root.join("src/lib.rs"), "pub fn demo() -> usize { 1 }\n").unwrap();
2435 root
2436 }
2437
2438 fn test_kernel() -> Arc<Kernel<TestState>> {
2439 Arc::new(Kernel::<TestState> {
2440 events: Box::new(InMemoryEventStore::new()),
2441 snaps: None,
2442 reducer: Box::new(StateUpdatedOnlyReducer),
2443 exec: Box::new(NoopActionExecutor),
2444 step: Box::new(NoopStepFn),
2445 policy: Box::new(AllowAllPolicy),
2446 effect_sink: None,
2447 mode: KernelMode::Normal,
2448 })
2449 }
2450
2451 fn lightweight_plan() -> ValidationPlan {
2452 ValidationPlan {
2453 profile: "test".into(),
2454 stages: vec![ValidationStage::Command {
2455 program: "git".into(),
2456 args: vec!["--version".into()],
2457 timeout_ms: 5_000,
2458 }],
2459 }
2460 }
2461
2462 fn sample_mutation() -> PreparedMutation {
2463 prepare_mutation(
2464 MutationIntent {
2465 id: "mutation-1".into(),
2466 intent: "add README".into(),
2467 target: MutationTarget::Paths {
2468 allow: vec!["README.md".into()],
2469 },
2470 expected_effect: "repo still builds".into(),
2471 risk: RiskLevel::Low,
2472 signals: vec!["missing readme".into()],
2473 spec_id: None,
2474 },
2475 "\
2476diff --git a/README.md b/README.md
2477new file mode 100644
2478index 0000000..1111111
2479--- /dev/null
2480+++ b/README.md
2481@@ -0,0 +1 @@
2482+# sample
2483"
2484 .into(),
2485 Some("HEAD".into()),
2486 )
2487 }
2488
2489 fn base_sandbox_policy() -> SandboxPolicy {
2490 SandboxPolicy {
2491 allowed_programs: vec!["git".into()],
2492 max_duration_ms: 60_000,
2493 max_output_bytes: 1024 * 1024,
2494 denied_env_prefixes: Vec::new(),
2495 }
2496 }
2497
2498 fn command_validator() -> Arc<dyn Validator> {
2499 Arc::new(CommandValidator::new(base_sandbox_policy()))
2500 }
2501
2502 fn replay_input(signal: &str) -> SelectorInput {
2503 let rustc_version = std::process::Command::new("rustc")
2504 .arg("--version")
2505 .output()
2506 .ok()
2507 .filter(|output| output.status.success())
2508 .map(|output| String::from_utf8_lossy(&output.stdout).trim().to_string())
2509 .unwrap_or_else(|| "rustc unknown".into());
2510 SelectorInput {
2511 signals: vec![signal.into()],
2512 env: EnvFingerprint {
2513 rustc_version,
2514 cargo_lock_hash: compute_artifact_hash("# lock\n"),
2515 target_triple: format!(
2516 "{}-unknown-{}",
2517 std::env::consts::ARCH,
2518 std::env::consts::OS
2519 ),
2520 os: std::env::consts::OS.into(),
2521 },
2522 spec_id: None,
2523 limit: 1,
2524 }
2525 }
2526
2527 fn build_test_evo_with_store(
2528 name: &str,
2529 run_id: &str,
2530 validator: Arc<dyn Validator>,
2531 store: Arc<dyn EvolutionStore>,
2532 ) -> EvoKernel<TestState> {
2533 let workspace = temp_workspace(name);
2534 let sandbox: Arc<dyn Sandbox> = Arc::new(oris_sandbox::LocalProcessSandbox::new(
2535 run_id,
2536 &workspace,
2537 std::env::temp_dir(),
2538 ));
2539 EvoKernel::new(test_kernel(), sandbox, validator, store)
2540 .with_governor(Arc::new(DefaultGovernor::new(
2541 oris_governor::GovernorConfig {
2542 promote_after_successes: 1,
2543 ..Default::default()
2544 },
2545 )))
2546 .with_validation_plan(lightweight_plan())
2547 .with_sandbox_policy(base_sandbox_policy())
2548 }
2549
2550 fn build_test_evo(
2551 name: &str,
2552 run_id: &str,
2553 validator: Arc<dyn Validator>,
2554 ) -> (EvoKernel<TestState>, Arc<dyn EvolutionStore>) {
2555 let store_root = std::env::temp_dir().join(format!(
2556 "oris-evokernel-{name}-store-{}",
2557 std::process::id()
2558 ));
2559 if store_root.exists() {
2560 fs::remove_dir_all(&store_root).unwrap();
2561 }
2562 let store: Arc<dyn EvolutionStore> =
2563 Arc::new(oris_evolution::JsonlEvolutionStore::new(&store_root));
2564 let evo = build_test_evo_with_store(name, run_id, validator, store.clone());
2565 (evo, store)
2566 }
2567
2568 fn remote_publish_envelope(
2569 sender_id: &str,
2570 run_id: &str,
2571 gene_id: &str,
2572 capsule_id: &str,
2573 mutation_id: &str,
2574 signal: &str,
2575 file_name: &str,
2576 line: &str,
2577 ) -> EvolutionEnvelope {
2578 remote_publish_envelope_with_env(
2579 sender_id,
2580 run_id,
2581 gene_id,
2582 capsule_id,
2583 mutation_id,
2584 signal,
2585 file_name,
2586 line,
2587 replay_input(signal).env,
2588 )
2589 }
2590
2591 fn remote_publish_envelope_with_env(
2592 sender_id: &str,
2593 run_id: &str,
2594 gene_id: &str,
2595 capsule_id: &str,
2596 mutation_id: &str,
2597 signal: &str,
2598 file_name: &str,
2599 line: &str,
2600 env: EnvFingerprint,
2601 ) -> EvolutionEnvelope {
2602 let mutation = prepare_mutation(
2603 MutationIntent {
2604 id: mutation_id.into(),
2605 intent: format!("add {file_name}"),
2606 target: MutationTarget::Paths {
2607 allow: vec![file_name.into()],
2608 },
2609 expected_effect: "replay should still validate".into(),
2610 risk: RiskLevel::Low,
2611 signals: vec![signal.into()],
2612 spec_id: None,
2613 },
2614 format!(
2615 "\
2616diff --git a/{file_name} b/{file_name}
2617new file mode 100644
2618index 0000000..1111111
2619--- /dev/null
2620+++ b/{file_name}
2621@@ -0,0 +1 @@
2622+{line}
2623"
2624 ),
2625 Some("HEAD".into()),
2626 );
2627 let gene = Gene {
2628 id: gene_id.into(),
2629 signals: vec![signal.into()],
2630 strategy: vec![file_name.into()],
2631 validation: vec!["test".into()],
2632 state: AssetState::Promoted,
2633 };
2634 let capsule = Capsule {
2635 id: capsule_id.into(),
2636 gene_id: gene_id.into(),
2637 mutation_id: mutation_id.into(),
2638 run_id: run_id.into(),
2639 diff_hash: mutation.artifact.content_hash.clone(),
2640 confidence: 0.9,
2641 env,
2642 outcome: Outcome {
2643 success: true,
2644 validation_profile: "test".into(),
2645 validation_duration_ms: 1,
2646 changed_files: vec![file_name.into()],
2647 validator_hash: "validator-hash".into(),
2648 lines_changed: 1,
2649 replay_verified: false,
2650 },
2651 state: AssetState::Promoted,
2652 };
2653 EvolutionEnvelope::publish(
2654 sender_id,
2655 vec![
2656 NetworkAsset::EvolutionEvent {
2657 event: EvolutionEvent::MutationDeclared { mutation },
2658 },
2659 NetworkAsset::Gene { gene: gene.clone() },
2660 NetworkAsset::Capsule {
2661 capsule: capsule.clone(),
2662 },
2663 NetworkAsset::EvolutionEvent {
2664 event: EvolutionEvent::CapsuleReleased {
2665 capsule_id: capsule.id.clone(),
2666 state: AssetState::Promoted,
2667 },
2668 },
2669 ],
2670 )
2671 }
2672
2673 struct FixedValidator {
2674 success: bool,
2675 }
2676
2677 #[async_trait]
2678 impl Validator for FixedValidator {
2679 async fn run(
2680 &self,
2681 _receipt: &SandboxReceipt,
2682 plan: &ValidationPlan,
2683 ) -> Result<ValidationReport, ValidationError> {
2684 Ok(ValidationReport {
2685 success: self.success,
2686 duration_ms: 1,
2687 stages: Vec::new(),
2688 logs: if self.success {
2689 format!("{} ok", plan.profile)
2690 } else {
2691 format!("{} failed", plan.profile)
2692 },
2693 })
2694 }
2695 }
2696
2697 #[test]
2698 fn coordination_planner_to_coder_handoff_is_deterministic() {
2699 let result = MultiAgentCoordinator::new().coordinate(CoordinationPlan {
2700 root_goal: "ship feature".into(),
2701 primitive: CoordinationPrimitive::Sequential,
2702 tasks: vec![
2703 CoordinationTask {
2704 id: "planner".into(),
2705 role: AgentRole::Planner,
2706 description: "split the work".into(),
2707 depends_on: Vec::new(),
2708 },
2709 CoordinationTask {
2710 id: "coder".into(),
2711 role: AgentRole::Coder,
2712 description: "implement the patch".into(),
2713 depends_on: vec!["planner".into()],
2714 },
2715 ],
2716 timeout_ms: 5_000,
2717 max_retries: 0,
2718 });
2719
2720 assert_eq!(result.completed_tasks, vec!["planner", "coder"]);
2721 assert!(result.failed_tasks.is_empty());
2722 assert!(result.messages.iter().any(|message| {
2723 message.from_role == AgentRole::Planner
2724 && message.to_role == AgentRole::Coder
2725 && message.task_id == "coder"
2726 }));
2727 }
2728
2729 #[test]
2730 fn coordination_repair_runs_only_after_coder_failure() {
2731 let result = MultiAgentCoordinator::new().coordinate(CoordinationPlan {
2732 root_goal: "fix broken implementation".into(),
2733 primitive: CoordinationPrimitive::Sequential,
2734 tasks: vec![
2735 CoordinationTask {
2736 id: "coder".into(),
2737 role: AgentRole::Coder,
2738 description: "force-fail initial implementation".into(),
2739 depends_on: Vec::new(),
2740 },
2741 CoordinationTask {
2742 id: "repair".into(),
2743 role: AgentRole::Repair,
2744 description: "patch the failed implementation".into(),
2745 depends_on: vec!["coder".into()],
2746 },
2747 ],
2748 timeout_ms: 5_000,
2749 max_retries: 0,
2750 });
2751
2752 assert_eq!(result.completed_tasks, vec!["repair"]);
2753 assert_eq!(result.failed_tasks, vec!["coder"]);
2754 assert!(result.messages.iter().any(|message| {
2755 message.from_role == AgentRole::Coder
2756 && message.to_role == AgentRole::Repair
2757 && message.task_id == "repair"
2758 }));
2759 }
2760
2761 #[test]
2762 fn coordination_optimizer_runs_after_successful_implementation_step() {
2763 let result = MultiAgentCoordinator::new().coordinate(CoordinationPlan {
2764 root_goal: "ship optimized patch".into(),
2765 primitive: CoordinationPrimitive::Sequential,
2766 tasks: vec![
2767 CoordinationTask {
2768 id: "coder".into(),
2769 role: AgentRole::Coder,
2770 description: "implement a working patch".into(),
2771 depends_on: Vec::new(),
2772 },
2773 CoordinationTask {
2774 id: "optimizer".into(),
2775 role: AgentRole::Optimizer,
2776 description: "tighten the implementation".into(),
2777 depends_on: vec!["coder".into()],
2778 },
2779 ],
2780 timeout_ms: 5_000,
2781 max_retries: 0,
2782 });
2783
2784 assert_eq!(result.completed_tasks, vec!["coder", "optimizer"]);
2785 assert!(result.failed_tasks.is_empty());
2786 }
2787
2788 #[test]
2789 fn coordination_parallel_waves_preserve_sorted_merge_order() {
2790 let result = MultiAgentCoordinator::new().coordinate(CoordinationPlan {
2791 root_goal: "parallelize safe tasks".into(),
2792 primitive: CoordinationPrimitive::Parallel,
2793 tasks: vec![
2794 CoordinationTask {
2795 id: "z-task".into(),
2796 role: AgentRole::Planner,
2797 description: "analyze z".into(),
2798 depends_on: Vec::new(),
2799 },
2800 CoordinationTask {
2801 id: "a-task".into(),
2802 role: AgentRole::Coder,
2803 description: "implement a".into(),
2804 depends_on: Vec::new(),
2805 },
2806 CoordinationTask {
2807 id: "mid-task".into(),
2808 role: AgentRole::Optimizer,
2809 description: "polish after both".into(),
2810 depends_on: vec!["z-task".into(), "a-task".into()],
2811 },
2812 ],
2813 timeout_ms: 5_000,
2814 max_retries: 0,
2815 });
2816
2817 assert_eq!(result.completed_tasks, vec!["a-task", "z-task", "mid-task"]);
2818 assert!(result.failed_tasks.is_empty());
2819 }
2820
2821 #[test]
2822 fn coordination_retries_stop_at_max_retries() {
2823 let result = MultiAgentCoordinator::new().coordinate(CoordinationPlan {
2824 root_goal: "retry then stop".into(),
2825 primitive: CoordinationPrimitive::Sequential,
2826 tasks: vec![CoordinationTask {
2827 id: "coder".into(),
2828 role: AgentRole::Coder,
2829 description: "force-fail this task".into(),
2830 depends_on: Vec::new(),
2831 }],
2832 timeout_ms: 5_000,
2833 max_retries: 1,
2834 });
2835
2836 assert!(result.completed_tasks.is_empty());
2837 assert_eq!(result.failed_tasks, vec!["coder"]);
2838 assert_eq!(
2839 result
2840 .messages
2841 .iter()
2842 .filter(|message| message.task_id == "coder" && message.content.contains("failed"))
2843 .count(),
2844 2
2845 );
2846 }
2847
2848 #[test]
2849 fn coordination_conditional_mode_skips_downstream_tasks_on_failure() {
2850 let result = MultiAgentCoordinator::new().coordinate(CoordinationPlan {
2851 root_goal: "skip blocked follow-up work".into(),
2852 primitive: CoordinationPrimitive::Conditional,
2853 tasks: vec![
2854 CoordinationTask {
2855 id: "coder".into(),
2856 role: AgentRole::Coder,
2857 description: "force-fail the implementation".into(),
2858 depends_on: Vec::new(),
2859 },
2860 CoordinationTask {
2861 id: "optimizer".into(),
2862 role: AgentRole::Optimizer,
2863 description: "only optimize a successful implementation".into(),
2864 depends_on: vec!["coder".into()],
2865 },
2866 ],
2867 timeout_ms: 5_000,
2868 max_retries: 0,
2869 });
2870
2871 assert!(result.completed_tasks.is_empty());
2872 assert_eq!(result.failed_tasks, vec!["coder"]);
2873 assert!(result.messages.iter().any(|message| {
2874 message.task_id == "optimizer"
2875 && message
2876 .content
2877 .contains("skipped due to failed dependency chain")
2878 }));
2879 assert!(!result
2880 .failed_tasks
2881 .iter()
2882 .any(|task_id| task_id == "optimizer"));
2883 }
2884
2885 #[tokio::test]
2886 async fn command_validator_aggregates_stage_reports() {
2887 let workspace = temp_workspace("validator");
2888 let receipt = SandboxReceipt {
2889 mutation_id: "m".into(),
2890 workdir: workspace,
2891 applied: true,
2892 changed_files: Vec::new(),
2893 patch_hash: "hash".into(),
2894 stdout_log: std::env::temp_dir().join("stdout.log"),
2895 stderr_log: std::env::temp_dir().join("stderr.log"),
2896 };
2897 let validator = CommandValidator::new(SandboxPolicy {
2898 allowed_programs: vec!["git".into()],
2899 max_duration_ms: 1_000,
2900 max_output_bytes: 1024,
2901 denied_env_prefixes: Vec::new(),
2902 });
2903 let report = validator
2904 .run(
2905 &receipt,
2906 &ValidationPlan {
2907 profile: "test".into(),
2908 stages: vec![ValidationStage::Command {
2909 program: "git".into(),
2910 args: vec!["--version".into()],
2911 timeout_ms: 1_000,
2912 }],
2913 },
2914 )
2915 .await
2916 .unwrap();
2917 assert_eq!(report.stages.len(), 1);
2918 }
2919
2920 #[tokio::test]
2921 async fn capture_successful_mutation_appends_capsule() {
2922 let (evo, store) = build_test_evo("capture", "run-1", command_validator());
2923 let capsule = evo
2924 .capture_successful_mutation(&"run-1".into(), sample_mutation())
2925 .await
2926 .unwrap();
2927 let events = store.scan(1).unwrap();
2928 assert!(events
2929 .iter()
2930 .any(|stored| matches!(stored.event, EvolutionEvent::CapsuleCommitted { .. })));
2931 assert!(!capsule.id.is_empty());
2932 }
2933
2934 #[tokio::test]
2935 async fn replay_hit_records_capsule_reused() {
2936 let (evo, store) = build_test_evo("replay", "run-2", command_validator());
2937 let capsule = evo
2938 .capture_successful_mutation(&"run-2".into(), sample_mutation())
2939 .await
2940 .unwrap();
2941 let decision = evo
2942 .replay_or_fallback(replay_input("missing readme"))
2943 .await
2944 .unwrap();
2945 assert!(decision.used_capsule);
2946 assert_eq!(decision.capsule_id, Some(capsule.id));
2947 assert!(store
2948 .scan(1)
2949 .unwrap()
2950 .iter()
2951 .any(|stored| matches!(stored.event, EvolutionEvent::CapsuleReused { .. })));
2952 }
2953
2954 #[tokio::test]
2955 async fn metrics_snapshot_tracks_replay_promotion_and_revocation_signals() {
2956 let (evo, _) = build_test_evo("metrics", "run-metrics", command_validator());
2957 let capsule = evo
2958 .capture_successful_mutation(&"run-metrics".into(), sample_mutation())
2959 .await
2960 .unwrap();
2961 let decision = evo
2962 .replay_or_fallback(replay_input("missing readme"))
2963 .await
2964 .unwrap();
2965 assert!(decision.used_capsule);
2966
2967 evo.revoke_assets(&RevokeNotice {
2968 sender_id: "node-metrics".into(),
2969 asset_ids: vec![capsule.id.clone()],
2970 reason: "manual test revoke".into(),
2971 })
2972 .unwrap();
2973
2974 let snapshot = evo.metrics_snapshot().unwrap();
2975 assert_eq!(snapshot.replay_attempts_total, 1);
2976 assert_eq!(snapshot.replay_success_total, 1);
2977 assert_eq!(snapshot.replay_success_rate, 1.0);
2978 assert_eq!(snapshot.mutation_declared_total, 1);
2979 assert_eq!(snapshot.promoted_mutations_total, 1);
2980 assert_eq!(snapshot.promotion_ratio, 1.0);
2981 assert_eq!(snapshot.gene_revocations_total, 1);
2982 assert_eq!(snapshot.mutation_velocity_last_hour, 1);
2983 assert_eq!(snapshot.revoke_frequency_last_hour, 1);
2984 assert_eq!(snapshot.promoted_genes, 0);
2985 assert_eq!(snapshot.promoted_capsules, 0);
2986
2987 let rendered = evo.render_metrics_prometheus().unwrap();
2988 assert!(rendered.contains("oris_evolution_replay_success_rate 1.000000"));
2989 assert!(rendered.contains("oris_evolution_promotion_ratio 1.000000"));
2990 assert!(rendered.contains("oris_evolution_revoke_frequency_last_hour 1"));
2991 assert!(rendered.contains("oris_evolution_mutation_velocity_last_hour 1"));
2992 assert!(rendered.contains("oris_evolution_health 1"));
2993 }
2994
2995 #[tokio::test]
2996 async fn remote_replay_prefers_closest_environment_match() {
2997 let (evo, _) = build_test_evo("remote-env", "run-remote-env", command_validator());
2998 let input = replay_input("env-signal");
2999
3000 let envelope_a = remote_publish_envelope_with_env(
3001 "node-a",
3002 "run-remote-a",
3003 "gene-a",
3004 "capsule-a",
3005 "mutation-a",
3006 "env-signal",
3007 "A.md",
3008 "# from a",
3009 input.env.clone(),
3010 );
3011 let envelope_b = remote_publish_envelope_with_env(
3012 "node-b",
3013 "run-remote-b",
3014 "gene-b",
3015 "capsule-b",
3016 "mutation-b",
3017 "env-signal",
3018 "B.md",
3019 "# from b",
3020 EnvFingerprint {
3021 rustc_version: "old-rustc".into(),
3022 cargo_lock_hash: "other-lock".into(),
3023 target_triple: "aarch64-apple-darwin".into(),
3024 os: "linux".into(),
3025 },
3026 );
3027
3028 evo.import_remote_envelope(&envelope_a).unwrap();
3029 evo.import_remote_envelope(&envelope_b).unwrap();
3030
3031 let decision = evo.replay_or_fallback(input).await.unwrap();
3032
3033 assert!(decision.used_capsule);
3034 assert_eq!(decision.capsule_id, Some("capsule-a".into()));
3035 assert!(!decision.fallback_to_planner);
3036 }
3037
3038 #[tokio::test]
3039 async fn remote_capsule_stays_quarantined_until_first_successful_replay() {
3040 let (evo, store) = build_test_evo(
3041 "remote-quarantine",
3042 "run-remote-quarantine",
3043 command_validator(),
3044 );
3045 let envelope = remote_publish_envelope(
3046 "node-remote",
3047 "run-remote-quarantine",
3048 "gene-remote",
3049 "capsule-remote",
3050 "mutation-remote",
3051 "remote-signal",
3052 "REMOTE.md",
3053 "# from remote",
3054 );
3055
3056 evo.import_remote_envelope(&envelope).unwrap();
3057
3058 let before_replay = store.rebuild_projection().unwrap();
3059 let imported_capsule = before_replay
3060 .capsules
3061 .iter()
3062 .find(|capsule| capsule.id == "capsule-remote")
3063 .unwrap();
3064 assert_eq!(imported_capsule.state, AssetState::Quarantined);
3065
3066 let decision = evo
3067 .replay_or_fallback(replay_input("remote-signal"))
3068 .await
3069 .unwrap();
3070
3071 assert!(decision.used_capsule);
3072 assert_eq!(decision.capsule_id, Some("capsule-remote".into()));
3073
3074 let after_replay = store.rebuild_projection().unwrap();
3075 let released_capsule = after_replay
3076 .capsules
3077 .iter()
3078 .find(|capsule| capsule.id == "capsule-remote")
3079 .unwrap();
3080 assert_eq!(released_capsule.state, AssetState::Promoted);
3081 }
3082
3083 #[tokio::test]
3084 async fn insufficient_evu_blocks_publish_but_not_local_replay() {
3085 let (evo, _) = build_test_evo("stake-gate", "run-stake", command_validator());
3086 let capsule = evo
3087 .capture_successful_mutation(&"run-stake".into(), sample_mutation())
3088 .await
3089 .unwrap();
3090 let publish = evo.export_promoted_assets("node-local");
3091 assert!(matches!(publish, Err(EvoKernelError::Validation(_))));
3092
3093 let decision = evo
3094 .replay_or_fallback(replay_input("missing readme"))
3095 .await
3096 .unwrap();
3097 assert!(decision.used_capsule);
3098 assert_eq!(decision.capsule_id, Some(capsule.id));
3099 }
3100
3101 #[tokio::test]
3102 async fn second_replay_validation_failure_revokes_gene_immediately() {
3103 let (capturer, store) = build_test_evo("revoke-replay", "run-capture", command_validator());
3104 let capsule = capturer
3105 .capture_successful_mutation(&"run-capture".into(), sample_mutation())
3106 .await
3107 .unwrap();
3108
3109 let failing_validator: Arc<dyn Validator> = Arc::new(FixedValidator { success: false });
3110 let failing_replay = build_test_evo_with_store(
3111 "revoke-replay",
3112 "run-replay-fail",
3113 failing_validator,
3114 store.clone(),
3115 );
3116
3117 let first = failing_replay
3118 .replay_or_fallback(replay_input("missing readme"))
3119 .await
3120 .unwrap();
3121 let second = failing_replay
3122 .replay_or_fallback(replay_input("missing readme"))
3123 .await
3124 .unwrap();
3125
3126 assert!(!first.used_capsule);
3127 assert!(first.fallback_to_planner);
3128 assert!(!second.used_capsule);
3129 assert!(second.fallback_to_planner);
3130
3131 let projection = store.rebuild_projection().unwrap();
3132 let gene = projection
3133 .genes
3134 .iter()
3135 .find(|gene| gene.id == capsule.gene_id)
3136 .unwrap();
3137 assert_eq!(gene.state, AssetState::Promoted);
3138 let committed_capsule = projection
3139 .capsules
3140 .iter()
3141 .find(|current| current.id == capsule.id)
3142 .unwrap();
3143 assert_eq!(committed_capsule.state, AssetState::Promoted);
3144
3145 let events = store.scan(1).unwrap();
3146 assert_eq!(
3147 events
3148 .iter()
3149 .filter(|stored| {
3150 matches!(
3151 &stored.event,
3152 EvolutionEvent::ValidationFailed {
3153 gene_id: Some(gene_id),
3154 ..
3155 } if gene_id == &capsule.gene_id
3156 )
3157 })
3158 .count(),
3159 1
3160 );
3161 assert!(!events.iter().any(|stored| {
3162 matches!(
3163 &stored.event,
3164 EvolutionEvent::GeneRevoked { gene_id, .. } if gene_id == &capsule.gene_id
3165 )
3166 }));
3167
3168 let recovered = build_test_evo_with_store(
3169 "revoke-replay",
3170 "run-replay-check",
3171 command_validator(),
3172 store.clone(),
3173 );
3174 let after_revoke = recovered
3175 .replay_or_fallback(replay_input("missing readme"))
3176 .await
3177 .unwrap();
3178 assert!(!after_revoke.used_capsule);
3179 assert!(after_revoke.fallback_to_planner);
3180 assert!(after_revoke.reason.contains("below replay threshold"));
3181 }
3182
3183 #[tokio::test]
3184 async fn remote_reuse_success_rewards_publisher_and_biases_selection() {
3185 let ledger = Arc::new(Mutex::new(EvuLedger {
3186 accounts: vec![],
3187 reputations: vec![
3188 oris_economics::ReputationRecord {
3189 node_id: "node-a".into(),
3190 publish_success_rate: 0.4,
3191 validator_accuracy: 0.4,
3192 reuse_impact: 0,
3193 },
3194 oris_economics::ReputationRecord {
3195 node_id: "node-b".into(),
3196 publish_success_rate: 0.95,
3197 validator_accuracy: 0.95,
3198 reuse_impact: 8,
3199 },
3200 ],
3201 }));
3202 let (evo, _) = build_test_evo("remote-success", "run-remote", command_validator());
3203 let evo = evo.with_economics(ledger.clone());
3204
3205 let envelope_a = remote_publish_envelope(
3206 "node-a",
3207 "run-remote-a",
3208 "gene-a",
3209 "capsule-a",
3210 "mutation-a",
3211 "shared-signal",
3212 "A.md",
3213 "# from a",
3214 );
3215 let envelope_b = remote_publish_envelope(
3216 "node-b",
3217 "run-remote-b",
3218 "gene-b",
3219 "capsule-b",
3220 "mutation-b",
3221 "shared-signal",
3222 "B.md",
3223 "# from b",
3224 );
3225
3226 evo.import_remote_envelope(&envelope_a).unwrap();
3227 evo.import_remote_envelope(&envelope_b).unwrap();
3228
3229 let decision = evo
3230 .replay_or_fallback(replay_input("shared-signal"))
3231 .await
3232 .unwrap();
3233
3234 assert!(decision.used_capsule);
3235 assert_eq!(decision.capsule_id, Some("capsule-b".into()));
3236 let locked = ledger.lock().unwrap();
3237 let rewarded = locked
3238 .accounts
3239 .iter()
3240 .find(|item| item.node_id == "node-b")
3241 .unwrap();
3242 assert_eq!(rewarded.balance, evo.stake_policy.reuse_reward);
3243 assert!(
3244 locked.selector_reputation_bias()["node-b"]
3245 > locked.selector_reputation_bias()["node-a"]
3246 );
3247 }
3248
3249 #[tokio::test]
3250 async fn remote_reuse_failure_penalizes_remote_reputation() {
3251 let ledger = Arc::new(Mutex::new(EvuLedger::default()));
3252 let failing_validator: Arc<dyn Validator> = Arc::new(FixedValidator { success: false });
3253 let (evo, _) = build_test_evo("remote-failure", "run-failure", failing_validator);
3254 let evo = evo.with_economics(ledger.clone());
3255
3256 let envelope = remote_publish_envelope(
3257 "node-remote",
3258 "run-remote-failed",
3259 "gene-remote",
3260 "capsule-remote",
3261 "mutation-remote",
3262 "failure-signal",
3263 "FAILED.md",
3264 "# from remote",
3265 );
3266 evo.import_remote_envelope(&envelope).unwrap();
3267
3268 let decision = evo
3269 .replay_or_fallback(replay_input("failure-signal"))
3270 .await
3271 .unwrap();
3272
3273 assert!(!decision.used_capsule);
3274 assert!(decision.fallback_to_planner);
3275
3276 let signal = evo.economics_signal("node-remote").unwrap();
3277 assert_eq!(signal.available_evu, 0);
3278 assert!(signal.publish_success_rate < 0.5);
3279 assert!(signal.validator_accuracy < 0.5);
3280 }
3281}