1use std::collections::{BTreeMap, BTreeSet};
4use std::fs;
5use std::path::Path;
6use std::process::Command;
7use std::sync::{Arc, Mutex};
8
9use async_trait::async_trait;
10use chrono::{DateTime, Duration, Utc};
11use oris_agent_contract::{
12 AgentRole, CoordinationMessage, CoordinationPlan, CoordinationPrimitive, CoordinationResult,
13 CoordinationTask, ExecutionFeedback, MutationProposal as AgentMutationProposal,
14};
15use oris_economics::{EconomicsSignal, EvuLedger, StakePolicy};
16use oris_evolution::{
17 compute_artifact_hash, next_id, stable_hash_json, AssetState, BlastRadius, CandidateSource,
18 Capsule, CapsuleId, EnvFingerprint, EvolutionError, EvolutionEvent, EvolutionProjection,
19 EvolutionStore, Gene, GeneCandidate, MutationId, PreparedMutation, Selector, SelectorInput,
20 StoreBackedSelector, StoredEvolutionEvent, ValidationSnapshot,
21};
22use oris_evolution_network::{EvolutionEnvelope, NetworkAsset};
23use oris_governor::{DefaultGovernor, Governor, GovernorDecision, GovernorInput};
24use oris_kernel::{Kernel, KernelState, RunId};
25use oris_sandbox::{
26 compute_blast_radius, execute_allowed_command, Sandbox, SandboxPolicy, SandboxReceipt,
27};
28use oris_spec::CompiledMutationPlan;
29use serde::{Deserialize, Serialize};
30use thiserror::Error;
31
32pub use oris_evolution::{
33 default_store_root, ArtifactEncoding, AssetState as EvoAssetState,
34 BlastRadius as EvoBlastRadius, CandidateSource as EvoCandidateSource,
35 EnvFingerprint as EvoEnvFingerprint, EvolutionStore as EvoEvolutionStore, JsonlEvolutionStore,
36 MutationArtifact, MutationIntent, MutationTarget, Outcome, RiskLevel,
37 SelectorInput as EvoSelectorInput,
38};
39pub use oris_evolution_network::{
40 FetchQuery, FetchResponse, MessageType, PublishRequest, RevokeNotice,
41};
42pub use oris_governor::{CoolingWindow, GovernorConfig, RevocationReason};
43pub use oris_sandbox::{LocalProcessSandbox, SandboxPolicy as EvoSandboxPolicy};
44pub use oris_spec::{SpecCompileError, SpecCompiler, SpecDocument};
45
46#[derive(Clone, Debug, Serialize, Deserialize)]
47pub struct ValidationPlan {
48 pub profile: String,
49 pub stages: Vec<ValidationStage>,
50}
51
52impl ValidationPlan {
53 pub fn oris_default() -> Self {
54 Self {
55 profile: "oris-default".into(),
56 stages: vec![
57 ValidationStage::Command {
58 program: "cargo".into(),
59 args: vec!["fmt".into(), "--all".into(), "--check".into()],
60 timeout_ms: 60_000,
61 },
62 ValidationStage::Command {
63 program: "cargo".into(),
64 args: vec!["check".into(), "--workspace".into()],
65 timeout_ms: 180_000,
66 },
67 ValidationStage::Command {
68 program: "cargo".into(),
69 args: vec![
70 "test".into(),
71 "-p".into(),
72 "oris-kernel".into(),
73 "-p".into(),
74 "oris-evolution".into(),
75 "-p".into(),
76 "oris-sandbox".into(),
77 "-p".into(),
78 "oris-evokernel".into(),
79 "--lib".into(),
80 ],
81 timeout_ms: 300_000,
82 },
83 ValidationStage::Command {
84 program: "cargo".into(),
85 args: vec![
86 "test".into(),
87 "-p".into(),
88 "oris-runtime".into(),
89 "--lib".into(),
90 ],
91 timeout_ms: 300_000,
92 },
93 ],
94 }
95 }
96}
97
98#[derive(Clone, Debug, Serialize, Deserialize)]
99pub enum ValidationStage {
100 Command {
101 program: String,
102 args: Vec<String>,
103 timeout_ms: u64,
104 },
105}
106
107#[derive(Clone, Debug, Serialize, Deserialize)]
108pub struct ValidationStageReport {
109 pub stage: String,
110 pub success: bool,
111 pub exit_code: Option<i32>,
112 pub duration_ms: u64,
113 pub stdout: String,
114 pub stderr: String,
115}
116
117#[derive(Clone, Debug, Serialize, Deserialize)]
118pub struct ValidationReport {
119 pub success: bool,
120 pub duration_ms: u64,
121 pub stages: Vec<ValidationStageReport>,
122 pub logs: String,
123}
124
125#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
126pub struct SignalExtractionInput {
127 pub patch_diff: String,
128 pub intent: String,
129 pub expected_effect: String,
130 pub declared_signals: Vec<String>,
131 pub changed_files: Vec<String>,
132 pub validation_success: bool,
133 pub validation_logs: String,
134 pub stage_outputs: Vec<String>,
135}
136
137#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
138pub struct SignalExtractionOutput {
139 pub values: Vec<String>,
140 pub hash: String,
141}
142
143#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
144pub struct SeedTemplate {
145 pub id: String,
146 pub intent: String,
147 pub signals: Vec<String>,
148 pub diff_payload: String,
149 pub validation_profile: String,
150}
151
152#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
153pub struct BootstrapReport {
154 pub seeded: bool,
155 pub genes_added: usize,
156 pub capsules_added: usize,
157}
158
159impl ValidationReport {
160 pub fn to_snapshot(&self, profile: &str) -> ValidationSnapshot {
161 ValidationSnapshot {
162 success: self.success,
163 profile: profile.to_string(),
164 duration_ms: self.duration_ms,
165 summary: if self.success {
166 "validation passed".into()
167 } else {
168 "validation failed".into()
169 },
170 }
171 }
172}
173
174pub fn extract_deterministic_signals(input: &SignalExtractionInput) -> SignalExtractionOutput {
175 let mut signals = BTreeSet::new();
176
177 for declared in &input.declared_signals {
178 if let Some(phrase) = normalize_signal_phrase(declared) {
179 signals.insert(phrase);
180 }
181 extend_signal_tokens(&mut signals, declared);
182 }
183
184 for text in [
185 input.patch_diff.as_str(),
186 input.intent.as_str(),
187 input.expected_effect.as_str(),
188 input.validation_logs.as_str(),
189 ] {
190 extend_signal_tokens(&mut signals, text);
191 }
192
193 for changed_file in &input.changed_files {
194 extend_signal_tokens(&mut signals, changed_file);
195 }
196
197 for stage_output in &input.stage_outputs {
198 extend_signal_tokens(&mut signals, stage_output);
199 }
200
201 signals.insert(if input.validation_success {
202 "validation passed".into()
203 } else {
204 "validation failed".into()
205 });
206
207 let values = signals.into_iter().take(32).collect::<Vec<_>>();
208 let hash =
209 stable_hash_json(&values).unwrap_or_else(|_| compute_artifact_hash(&values.join("\n")));
210 SignalExtractionOutput { values, hash }
211}
212
213#[derive(Debug, Error)]
214pub enum ValidationError {
215 #[error("validation execution failed: {0}")]
216 Execution(String),
217}
218
219#[async_trait]
220pub trait Validator: Send + Sync {
221 async fn run(
222 &self,
223 receipt: &SandboxReceipt,
224 plan: &ValidationPlan,
225 ) -> Result<ValidationReport, ValidationError>;
226}
227
228pub struct CommandValidator {
229 policy: SandboxPolicy,
230}
231
232impl CommandValidator {
233 pub fn new(policy: SandboxPolicy) -> Self {
234 Self { policy }
235 }
236}
237
238#[async_trait]
239impl Validator for CommandValidator {
240 async fn run(
241 &self,
242 receipt: &SandboxReceipt,
243 plan: &ValidationPlan,
244 ) -> Result<ValidationReport, ValidationError> {
245 let started = std::time::Instant::now();
246 let mut stages = Vec::new();
247 let mut success = true;
248 let mut logs = String::new();
249
250 for stage in &plan.stages {
251 match stage {
252 ValidationStage::Command {
253 program,
254 args,
255 timeout_ms,
256 } => {
257 let result = execute_allowed_command(
258 &self.policy,
259 &receipt.workdir,
260 program,
261 args,
262 *timeout_ms,
263 )
264 .await;
265 let report = match result {
266 Ok(output) => ValidationStageReport {
267 stage: format!("{program} {}", args.join(" ")),
268 success: output.success,
269 exit_code: output.exit_code,
270 duration_ms: output.duration_ms,
271 stdout: output.stdout,
272 stderr: output.stderr,
273 },
274 Err(err) => ValidationStageReport {
275 stage: format!("{program} {}", args.join(" ")),
276 success: false,
277 exit_code: None,
278 duration_ms: 0,
279 stdout: String::new(),
280 stderr: err.to_string(),
281 },
282 };
283 if !report.success {
284 success = false;
285 }
286 if !report.stdout.is_empty() {
287 logs.push_str(&report.stdout);
288 logs.push('\n');
289 }
290 if !report.stderr.is_empty() {
291 logs.push_str(&report.stderr);
292 logs.push('\n');
293 }
294 stages.push(report);
295 if !success {
296 break;
297 }
298 }
299 }
300 }
301
302 Ok(ValidationReport {
303 success,
304 duration_ms: started.elapsed().as_millis() as u64,
305 stages,
306 logs,
307 })
308 }
309}
310
311#[derive(Clone, Debug)]
312pub struct ReplayDecision {
313 pub used_capsule: bool,
314 pub capsule_id: Option<CapsuleId>,
315 pub fallback_to_planner: bool,
316 pub reason: String,
317}
318
319#[derive(Clone, Copy, Debug, Eq, PartialEq)]
320enum CoordinationTaskState {
321 Ready,
322 Waiting,
323 BlockedByFailure,
324 PermanentlyBlocked,
325}
326
327#[derive(Clone, Debug, Default)]
328pub struct MultiAgentCoordinator;
329
330impl MultiAgentCoordinator {
331 pub fn new() -> Self {
332 Self
333 }
334
335 pub fn coordinate(&self, plan: CoordinationPlan) -> CoordinationResult {
336 let primitive = plan.primitive.clone();
337 let root_goal = plan.root_goal.clone();
338 let timeout_ms = plan.timeout_ms;
339 let max_retries = plan.max_retries;
340 let mut tasks = BTreeMap::new();
341 for task in plan.tasks {
342 tasks.entry(task.id.clone()).or_insert(task);
343 }
344
345 let mut pending = tasks.keys().cloned().collect::<BTreeSet<_>>();
346 let mut completed = BTreeSet::new();
347 let mut failed = BTreeSet::new();
348 let mut completed_order = Vec::new();
349 let mut failed_order = Vec::new();
350 let mut skipped = BTreeSet::new();
351 let mut attempts = BTreeMap::new();
352 let mut messages = Vec::new();
353
354 loop {
355 if matches!(primitive, CoordinationPrimitive::Conditional) {
356 self.apply_conditional_skips(
357 &tasks,
358 &mut pending,
359 &completed,
360 &failed,
361 &mut skipped,
362 &mut messages,
363 );
364 }
365
366 let mut ready = self.ready_task_ids(&tasks, &pending, &completed, &failed, &skipped);
367 if ready.is_empty() {
368 break;
369 }
370 if matches!(primitive, CoordinationPrimitive::Sequential) {
371 ready.truncate(1);
372 }
373
374 for task_id in ready {
375 let Some(task) = tasks.get(&task_id) else {
376 continue;
377 };
378 if !pending.contains(&task_id) {
379 continue;
380 }
381 self.record_handoff_messages(task, &tasks, &completed, &failed, &mut messages);
382
383 let prior_failures = attempts.get(&task_id).copied().unwrap_or(0);
384 if Self::simulate_task_failure(task, prior_failures) {
385 let failure_count = prior_failures + 1;
386 attempts.insert(task_id.clone(), failure_count);
387 let will_retry = failure_count <= max_retries;
388 messages.push(CoordinationMessage {
389 from_role: task.role.clone(),
390 to_role: task.role.clone(),
391 task_id: task_id.clone(),
392 content: if will_retry {
393 format!("task {task_id} failed on attempt {failure_count} and will retry")
394 } else {
395 format!(
396 "task {task_id} failed on attempt {failure_count} and exhausted retries"
397 )
398 },
399 });
400 if !will_retry {
401 pending.remove(&task_id);
402 if failed.insert(task_id.clone()) {
403 failed_order.push(task_id);
404 }
405 }
406 continue;
407 }
408
409 pending.remove(&task_id);
410 if completed.insert(task_id.clone()) {
411 completed_order.push(task_id);
412 }
413 }
414 }
415
416 let blocked_ids = pending.into_iter().collect::<Vec<_>>();
417 for task_id in blocked_ids {
418 let Some(task) = tasks.get(&task_id) else {
419 continue;
420 };
421 let state = self.classify_task(task, &tasks, &completed, &failed, &skipped);
422 let content = match state {
423 CoordinationTaskState::BlockedByFailure => {
424 format!("task {task_id} blocked by failed dependencies")
425 }
426 CoordinationTaskState::PermanentlyBlocked => {
427 format!("task {task_id} has invalid coordination prerequisites")
428 }
429 CoordinationTaskState::Waiting => {
430 format!("task {task_id} has unresolved dependencies")
431 }
432 CoordinationTaskState::Ready => {
433 format!("task {task_id} was left pending unexpectedly")
434 }
435 };
436 messages.push(CoordinationMessage {
437 from_role: task.role.clone(),
438 to_role: task.role.clone(),
439 task_id: task_id.clone(),
440 content,
441 });
442 if failed.insert(task_id.clone()) {
443 failed_order.push(task_id);
444 }
445 }
446
447 CoordinationResult {
448 completed_tasks: completed_order,
449 failed_tasks: failed_order,
450 messages,
451 summary: format!(
452 "goal '{}' completed {} tasks, failed {}, skipped {} using {:?} coordination (timeout={}ms, max_retries={})",
453 root_goal,
454 completed.len(),
455 failed.len(),
456 skipped.len(),
457 primitive,
458 timeout_ms,
459 max_retries
460 ),
461 }
462 }
463
464 fn ready_task_ids(
465 &self,
466 tasks: &BTreeMap<String, CoordinationTask>,
467 pending: &BTreeSet<String>,
468 completed: &BTreeSet<String>,
469 failed: &BTreeSet<String>,
470 skipped: &BTreeSet<String>,
471 ) -> Vec<String> {
472 pending
473 .iter()
474 .filter_map(|task_id| {
475 let task = tasks.get(task_id)?;
476 (self.classify_task(task, tasks, completed, failed, skipped)
477 == CoordinationTaskState::Ready)
478 .then(|| task_id.clone())
479 })
480 .collect()
481 }
482
483 fn apply_conditional_skips(
484 &self,
485 tasks: &BTreeMap<String, CoordinationTask>,
486 pending: &mut BTreeSet<String>,
487 completed: &BTreeSet<String>,
488 failed: &BTreeSet<String>,
489 skipped: &mut BTreeSet<String>,
490 messages: &mut Vec<CoordinationMessage>,
491 ) {
492 let skip_ids = pending
493 .iter()
494 .filter_map(|task_id| {
495 let task = tasks.get(task_id)?;
496 (self.classify_task(task, tasks, completed, failed, skipped)
497 == CoordinationTaskState::BlockedByFailure)
498 .then(|| task_id.clone())
499 })
500 .collect::<Vec<_>>();
501
502 for task_id in skip_ids {
503 let Some(task) = tasks.get(&task_id) else {
504 continue;
505 };
506 pending.remove(&task_id);
507 skipped.insert(task_id.clone());
508 messages.push(CoordinationMessage {
509 from_role: task.role.clone(),
510 to_role: task.role.clone(),
511 task_id: task_id.clone(),
512 content: format!("task {task_id} skipped due to failed dependency chain"),
513 });
514 }
515 }
516
517 fn classify_task(
518 &self,
519 task: &CoordinationTask,
520 tasks: &BTreeMap<String, CoordinationTask>,
521 completed: &BTreeSet<String>,
522 failed: &BTreeSet<String>,
523 skipped: &BTreeSet<String>,
524 ) -> CoordinationTaskState {
525 match task.role {
526 AgentRole::Planner | AgentRole::Coder => {
527 let mut waiting = false;
528 for dependency_id in &task.depends_on {
529 if !tasks.contains_key(dependency_id) {
530 return CoordinationTaskState::PermanentlyBlocked;
531 }
532 if skipped.contains(dependency_id) || failed.contains(dependency_id) {
533 return CoordinationTaskState::BlockedByFailure;
534 }
535 if !completed.contains(dependency_id) {
536 waiting = true;
537 }
538 }
539 if waiting {
540 CoordinationTaskState::Waiting
541 } else {
542 CoordinationTaskState::Ready
543 }
544 }
545 AgentRole::Repair => {
546 let mut waiting = false;
547 let mut has_coder_dependency = false;
548 let mut has_failed_coder = false;
549 for dependency_id in &task.depends_on {
550 let Some(dependency) = tasks.get(dependency_id) else {
551 return CoordinationTaskState::PermanentlyBlocked;
552 };
553 let is_coder = matches!(dependency.role, AgentRole::Coder);
554 if is_coder {
555 has_coder_dependency = true;
556 }
557 if skipped.contains(dependency_id) {
558 return CoordinationTaskState::BlockedByFailure;
559 }
560 if failed.contains(dependency_id) {
561 if is_coder {
562 has_failed_coder = true;
563 } else {
564 return CoordinationTaskState::BlockedByFailure;
565 }
566 continue;
567 }
568 if !completed.contains(dependency_id) {
569 waiting = true;
570 }
571 }
572 if !has_coder_dependency {
573 CoordinationTaskState::PermanentlyBlocked
574 } else if waiting {
575 CoordinationTaskState::Waiting
576 } else if has_failed_coder {
577 CoordinationTaskState::Ready
578 } else {
579 CoordinationTaskState::PermanentlyBlocked
580 }
581 }
582 AgentRole::Optimizer => {
583 let mut waiting = false;
584 let mut has_impl_dependency = false;
585 let mut has_completed_impl = false;
586 let mut has_failed_impl = false;
587 for dependency_id in &task.depends_on {
588 let Some(dependency) = tasks.get(dependency_id) else {
589 return CoordinationTaskState::PermanentlyBlocked;
590 };
591 let is_impl = matches!(dependency.role, AgentRole::Coder | AgentRole::Repair);
592 if is_impl {
593 has_impl_dependency = true;
594 }
595 if skipped.contains(dependency_id) || failed.contains(dependency_id) {
596 if is_impl {
597 has_failed_impl = true;
598 continue;
599 }
600 return CoordinationTaskState::BlockedByFailure;
601 }
602 if completed.contains(dependency_id) {
603 if is_impl {
604 has_completed_impl = true;
605 }
606 continue;
607 }
608 waiting = true;
609 }
610 if !has_impl_dependency {
611 CoordinationTaskState::PermanentlyBlocked
612 } else if waiting {
613 CoordinationTaskState::Waiting
614 } else if has_completed_impl {
615 CoordinationTaskState::Ready
616 } else if has_failed_impl {
617 CoordinationTaskState::BlockedByFailure
618 } else {
619 CoordinationTaskState::PermanentlyBlocked
620 }
621 }
622 }
623 }
624
625 fn record_handoff_messages(
626 &self,
627 task: &CoordinationTask,
628 tasks: &BTreeMap<String, CoordinationTask>,
629 completed: &BTreeSet<String>,
630 failed: &BTreeSet<String>,
631 messages: &mut Vec<CoordinationMessage>,
632 ) {
633 let mut dependency_ids = task.depends_on.clone();
634 dependency_ids.sort();
635 dependency_ids.dedup();
636
637 for dependency_id in dependency_ids {
638 let Some(dependency) = tasks.get(&dependency_id) else {
639 continue;
640 };
641 if completed.contains(&dependency_id) {
642 messages.push(CoordinationMessage {
643 from_role: dependency.role.clone(),
644 to_role: task.role.clone(),
645 task_id: task.id.clone(),
646 content: format!("handoff from {dependency_id} to {}", task.id),
647 });
648 } else if failed.contains(&dependency_id) {
649 messages.push(CoordinationMessage {
650 from_role: dependency.role.clone(),
651 to_role: task.role.clone(),
652 task_id: task.id.clone(),
653 content: format!("failed dependency {dependency_id} routed to {}", task.id),
654 });
655 }
656 }
657 }
658
659 fn simulate_task_failure(task: &CoordinationTask, prior_failures: u32) -> bool {
660 let normalized = task.description.to_ascii_lowercase();
661 normalized.contains("force-fail")
662 || (normalized.contains("fail-once") && prior_failures == 0)
663 }
664}
665
666#[derive(Debug, Error)]
667pub enum ReplayError {
668 #[error("store error: {0}")]
669 Store(String),
670 #[error("sandbox error: {0}")]
671 Sandbox(String),
672 #[error("validation error: {0}")]
673 Validation(String),
674}
675
676#[async_trait]
677pub trait ReplayExecutor: Send + Sync {
678 async fn try_replay(
679 &self,
680 input: &SelectorInput,
681 policy: &SandboxPolicy,
682 validation: &ValidationPlan,
683 ) -> Result<ReplayDecision, ReplayError>;
684
685 async fn try_replay_for_run(
686 &self,
687 run_id: &RunId,
688 input: &SelectorInput,
689 policy: &SandboxPolicy,
690 validation: &ValidationPlan,
691 ) -> Result<ReplayDecision, ReplayError> {
692 let _ = run_id;
693 self.try_replay(input, policy, validation).await
694 }
695}
696
697pub struct StoreReplayExecutor {
698 pub sandbox: Arc<dyn Sandbox>,
699 pub validator: Arc<dyn Validator>,
700 pub store: Arc<dyn EvolutionStore>,
701 pub selector: Arc<dyn Selector>,
702 pub governor: Arc<dyn Governor>,
703 pub economics: Option<Arc<Mutex<EvuLedger>>>,
704 pub remote_publishers: Option<Arc<Mutex<BTreeMap<String, String>>>>,
705 pub stake_policy: StakePolicy,
706}
707
708struct ReplayCandidates {
709 candidates: Vec<GeneCandidate>,
710 exact_match: bool,
711}
712
713#[async_trait]
714impl ReplayExecutor for StoreReplayExecutor {
715 async fn try_replay(
716 &self,
717 input: &SelectorInput,
718 policy: &SandboxPolicy,
719 validation: &ValidationPlan,
720 ) -> Result<ReplayDecision, ReplayError> {
721 self.try_replay_inner(None, input, policy, validation).await
722 }
723
724 async fn try_replay_for_run(
725 &self,
726 run_id: &RunId,
727 input: &SelectorInput,
728 policy: &SandboxPolicy,
729 validation: &ValidationPlan,
730 ) -> Result<ReplayDecision, ReplayError> {
731 self.try_replay_inner(Some(run_id), input, policy, validation)
732 .await
733 }
734}
735
736impl StoreReplayExecutor {
737 fn collect_replay_candidates(&self, input: &SelectorInput) -> ReplayCandidates {
738 let mut selector_input = input.clone();
739 if self.economics.is_some() && self.remote_publishers.is_some() {
740 selector_input.limit = selector_input.limit.max(4);
741 }
742 let mut candidates = self.selector.select(&selector_input);
743 self.rerank_with_reputation_bias(&mut candidates);
744 let mut exact_match = false;
745 if candidates.is_empty() {
746 let mut exact_candidates = exact_match_candidates(self.store.as_ref(), input);
747 self.rerank_with_reputation_bias(&mut exact_candidates);
748 if !exact_candidates.is_empty() {
749 candidates = exact_candidates;
750 exact_match = true;
751 }
752 }
753 if candidates.is_empty() {
754 let mut remote_candidates =
755 quarantined_remote_exact_match_candidates(self.store.as_ref(), input);
756 self.rerank_with_reputation_bias(&mut remote_candidates);
757 if !remote_candidates.is_empty() {
758 candidates = remote_candidates;
759 exact_match = true;
760 }
761 }
762 candidates.truncate(input.limit.max(1));
763 ReplayCandidates {
764 candidates,
765 exact_match,
766 }
767 }
768
769 async fn try_replay_inner(
770 &self,
771 replay_run_id: Option<&RunId>,
772 input: &SelectorInput,
773 policy: &SandboxPolicy,
774 validation: &ValidationPlan,
775 ) -> Result<ReplayDecision, ReplayError> {
776 let ReplayCandidates {
777 candidates,
778 exact_match,
779 } = self.collect_replay_candidates(input);
780 let Some(best) = candidates.into_iter().next() else {
781 return Ok(ReplayDecision {
782 used_capsule: false,
783 capsule_id: None,
784 fallback_to_planner: true,
785 reason: "no matching gene".into(),
786 });
787 };
788 if !exact_match && best.score < 0.82 {
789 return Ok(ReplayDecision {
790 used_capsule: false,
791 capsule_id: None,
792 fallback_to_planner: true,
793 reason: format!("best gene score {:.3} below replay threshold", best.score),
794 });
795 }
796
797 let Some(capsule) = best.capsules.first().cloned() else {
798 return Ok(ReplayDecision {
799 used_capsule: false,
800 capsule_id: None,
801 fallback_to_planner: true,
802 reason: "candidate gene has no capsule".into(),
803 });
804 };
805 let remote_publisher = self.publisher_for_capsule(&capsule.id);
806
807 let Some(mutation) = find_declared_mutation(self.store.as_ref(), &capsule.mutation_id)
808 .map_err(|err| ReplayError::Store(err.to_string()))?
809 else {
810 return Ok(ReplayDecision {
811 used_capsule: false,
812 capsule_id: None,
813 fallback_to_planner: true,
814 reason: "mutation payload missing from store".into(),
815 });
816 };
817
818 let receipt = match self.sandbox.apply(&mutation, policy).await {
819 Ok(receipt) => receipt,
820 Err(err) => {
821 self.record_reuse_settlement(remote_publisher.as_deref(), false);
822 return Ok(ReplayDecision {
823 used_capsule: false,
824 capsule_id: Some(capsule.id.clone()),
825 fallback_to_planner: true,
826 reason: format!("replay patch apply failed: {err}"),
827 });
828 }
829 };
830
831 let report = self
832 .validator
833 .run(&receipt, validation)
834 .await
835 .map_err(|err| ReplayError::Validation(err.to_string()))?;
836 if !report.success {
837 self.record_replay_validation_failure(&best, &capsule, validation, &report)?;
838 self.record_reuse_settlement(remote_publisher.as_deref(), false);
839 return Ok(ReplayDecision {
840 used_capsule: false,
841 capsule_id: Some(capsule.id.clone()),
842 fallback_to_planner: true,
843 reason: "replay validation failed".into(),
844 });
845 }
846
847 if matches!(capsule.state, AssetState::Quarantined) {
848 self.store
849 .append_event(EvolutionEvent::ValidationPassed {
850 mutation_id: capsule.mutation_id.clone(),
851 report: report.to_snapshot(&validation.profile),
852 gene_id: Some(best.gene.id.clone()),
853 })
854 .map_err(|err| ReplayError::Store(err.to_string()))?;
855 if matches!(best.gene.state, AssetState::Quarantined) {
856 self.store
857 .append_event(EvolutionEvent::PromotionEvaluated {
858 gene_id: best.gene.id.clone(),
859 state: AssetState::Promoted,
860 reason: "remote asset locally validated via replay".into(),
861 })
862 .map_err(|err| ReplayError::Store(err.to_string()))?;
863 self.store
864 .append_event(EvolutionEvent::GenePromoted {
865 gene_id: best.gene.id.clone(),
866 })
867 .map_err(|err| ReplayError::Store(err.to_string()))?;
868 }
869 self.store
870 .append_event(EvolutionEvent::CapsuleReleased {
871 capsule_id: capsule.id.clone(),
872 state: AssetState::Promoted,
873 })
874 .map_err(|err| ReplayError::Store(err.to_string()))?;
875 }
876
877 self.store
878 .append_event(EvolutionEvent::CapsuleReused {
879 capsule_id: capsule.id.clone(),
880 gene_id: capsule.gene_id.clone(),
881 run_id: capsule.run_id.clone(),
882 replay_run_id: replay_run_id.cloned(),
883 })
884 .map_err(|err| ReplayError::Store(err.to_string()))?;
885 self.record_reuse_settlement(remote_publisher.as_deref(), true);
886
887 Ok(ReplayDecision {
888 used_capsule: true,
889 capsule_id: Some(capsule.id),
890 fallback_to_planner: false,
891 reason: if exact_match {
892 "replayed via cold-start lookup".into()
893 } else {
894 "replayed via selector".into()
895 },
896 })
897 }
898
899 fn rerank_with_reputation_bias(&self, candidates: &mut [GeneCandidate]) {
900 let Some(ledger) = self.economics.as_ref() else {
901 return;
902 };
903 let reputation_bias = ledger
904 .lock()
905 .ok()
906 .map(|locked| locked.selector_reputation_bias())
907 .unwrap_or_default();
908 if reputation_bias.is_empty() {
909 return;
910 }
911 let required_assets = candidates
912 .iter()
913 .filter_map(|candidate| {
914 candidate
915 .capsules
916 .first()
917 .map(|capsule| capsule.id.as_str())
918 })
919 .collect::<Vec<_>>();
920 let publisher_map = self.remote_publishers_snapshot(&required_assets);
921 if publisher_map.is_empty() {
922 return;
923 }
924 candidates.sort_by(|left, right| {
925 effective_candidate_score(right, &publisher_map, &reputation_bias)
926 .partial_cmp(&effective_candidate_score(
927 left,
928 &publisher_map,
929 &reputation_bias,
930 ))
931 .unwrap_or(std::cmp::Ordering::Equal)
932 .then_with(|| left.gene.id.cmp(&right.gene.id))
933 });
934 }
935
936 fn publisher_for_capsule(&self, capsule_id: &str) -> Option<String> {
937 self.remote_publishers_snapshot(&[capsule_id])
938 .get(capsule_id)
939 .cloned()
940 }
941
942 fn remote_publishers_snapshot(&self, required_assets: &[&str]) -> BTreeMap<String, String> {
943 let cached = self
944 .remote_publishers
945 .as_ref()
946 .and_then(|remote_publishers| {
947 remote_publishers.lock().ok().map(|locked| locked.clone())
948 })
949 .unwrap_or_default();
950 if !cached.is_empty()
951 && required_assets
952 .iter()
953 .all(|asset_id| cached.contains_key(*asset_id))
954 {
955 return cached;
956 }
957
958 let persisted = remote_publishers_by_asset_from_store(self.store.as_ref());
959 if persisted.is_empty() {
960 return cached;
961 }
962
963 let mut merged = cached;
964 for (asset_id, sender_id) in persisted {
965 merged.entry(asset_id).or_insert(sender_id);
966 }
967
968 if let Some(remote_publishers) = self.remote_publishers.as_ref() {
969 if let Ok(mut locked) = remote_publishers.lock() {
970 for (asset_id, sender_id) in &merged {
971 locked.entry(asset_id.clone()).or_insert(sender_id.clone());
972 }
973 }
974 }
975
976 merged
977 }
978
979 fn record_reuse_settlement(&self, publisher_id: Option<&str>, success: bool) {
980 let Some(publisher_id) = publisher_id else {
981 return;
982 };
983 let Some(ledger) = self.economics.as_ref() else {
984 return;
985 };
986 if let Ok(mut locked) = ledger.lock() {
987 locked.settle_remote_reuse(publisher_id, success, &self.stake_policy);
988 }
989 }
990
991 fn record_replay_validation_failure(
992 &self,
993 best: &GeneCandidate,
994 capsule: &Capsule,
995 validation: &ValidationPlan,
996 report: &ValidationReport,
997 ) -> Result<(), ReplayError> {
998 let projection = projection_snapshot(self.store.as_ref())
999 .map_err(|err| ReplayError::Store(err.to_string()))?;
1000 let (current_confidence, historical_peak_confidence, confidence_last_updated_secs) =
1001 Self::confidence_context(&projection, &best.gene.id);
1002
1003 self.store
1004 .append_event(EvolutionEvent::ValidationFailed {
1005 mutation_id: capsule.mutation_id.clone(),
1006 report: report.to_snapshot(&validation.profile),
1007 gene_id: Some(best.gene.id.clone()),
1008 })
1009 .map_err(|err| ReplayError::Store(err.to_string()))?;
1010
1011 let replay_failures = self.replay_failure_count(&best.gene.id)?;
1012 let governor_decision = self.governor.evaluate(GovernorInput {
1013 candidate_source: if self.publisher_for_capsule(&capsule.id).is_some() {
1014 CandidateSource::Remote
1015 } else {
1016 CandidateSource::Local
1017 },
1018 success_count: 0,
1019 blast_radius: BlastRadius {
1020 files_changed: capsule.outcome.changed_files.len(),
1021 lines_changed: capsule.outcome.lines_changed,
1022 },
1023 replay_failures,
1024 recent_mutation_ages_secs: Vec::new(),
1025 current_confidence,
1026 historical_peak_confidence,
1027 confidence_last_updated_secs,
1028 });
1029
1030 if matches!(governor_decision.target_state, AssetState::Revoked) {
1031 self.store
1032 .append_event(EvolutionEvent::PromotionEvaluated {
1033 gene_id: best.gene.id.clone(),
1034 state: AssetState::Revoked,
1035 reason: governor_decision.reason.clone(),
1036 })
1037 .map_err(|err| ReplayError::Store(err.to_string()))?;
1038 self.store
1039 .append_event(EvolutionEvent::GeneRevoked {
1040 gene_id: best.gene.id.clone(),
1041 reason: governor_decision.reason,
1042 })
1043 .map_err(|err| ReplayError::Store(err.to_string()))?;
1044 for related in &best.capsules {
1045 self.store
1046 .append_event(EvolutionEvent::CapsuleQuarantined {
1047 capsule_id: related.id.clone(),
1048 })
1049 .map_err(|err| ReplayError::Store(err.to_string()))?;
1050 }
1051 }
1052
1053 Ok(())
1054 }
1055
1056 fn confidence_context(
1057 projection: &EvolutionProjection,
1058 gene_id: &str,
1059 ) -> (f32, f32, Option<u64>) {
1060 let peak_confidence = projection
1061 .capsules
1062 .iter()
1063 .filter(|capsule| capsule.gene_id == gene_id)
1064 .map(|capsule| capsule.confidence)
1065 .fold(0.0_f32, f32::max);
1066 let age_secs = projection
1067 .last_updated_at
1068 .get(gene_id)
1069 .and_then(|timestamp| Self::seconds_since_timestamp(timestamp, Utc::now()));
1070 (peak_confidence, peak_confidence, age_secs)
1071 }
1072
1073 fn seconds_since_timestamp(timestamp: &str, now: DateTime<Utc>) -> Option<u64> {
1074 let parsed = DateTime::parse_from_rfc3339(timestamp)
1075 .ok()?
1076 .with_timezone(&Utc);
1077 let elapsed = now.signed_duration_since(parsed);
1078 if elapsed < Duration::zero() {
1079 Some(0)
1080 } else {
1081 u64::try_from(elapsed.num_seconds()).ok()
1082 }
1083 }
1084
1085 fn replay_failure_count(&self, gene_id: &str) -> Result<u64, ReplayError> {
1086 Ok(self
1087 .store
1088 .scan(1)
1089 .map_err(|err| ReplayError::Store(err.to_string()))?
1090 .into_iter()
1091 .filter(|stored| {
1092 matches!(
1093 &stored.event,
1094 EvolutionEvent::ValidationFailed {
1095 gene_id: Some(current_gene_id),
1096 ..
1097 } if current_gene_id == gene_id
1098 )
1099 })
1100 .count() as u64)
1101 }
1102}
1103
1104#[derive(Debug, Error)]
1105pub enum EvoKernelError {
1106 #[error("sandbox error: {0}")]
1107 Sandbox(String),
1108 #[error("validation error: {0}")]
1109 Validation(String),
1110 #[error("validation failed")]
1111 ValidationFailed(ValidationReport),
1112 #[error("store error: {0}")]
1113 Store(String),
1114}
1115
1116#[derive(Clone, Debug)]
1117pub struct CaptureOutcome {
1118 pub capsule: Capsule,
1119 pub gene: Gene,
1120 pub governor_decision: GovernorDecision,
1121}
1122
1123#[derive(Clone, Debug, Serialize, Deserialize)]
1124pub struct ImportOutcome {
1125 pub imported_asset_ids: Vec<String>,
1126 pub accepted: bool,
1127}
1128
1129#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
1130pub struct EvolutionMetricsSnapshot {
1131 pub replay_attempts_total: u64,
1132 pub replay_success_total: u64,
1133 pub replay_success_rate: f64,
1134 pub mutation_declared_total: u64,
1135 pub promoted_mutations_total: u64,
1136 pub promotion_ratio: f64,
1137 pub gene_revocations_total: u64,
1138 pub mutation_velocity_last_hour: u64,
1139 pub revoke_frequency_last_hour: u64,
1140 pub promoted_genes: u64,
1141 pub promoted_capsules: u64,
1142 pub last_event_seq: u64,
1143}
1144
1145#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
1146pub struct EvolutionHealthSnapshot {
1147 pub status: String,
1148 pub last_event_seq: u64,
1149 pub promoted_genes: u64,
1150 pub promoted_capsules: u64,
1151}
1152
1153#[derive(Clone)]
1154pub struct EvolutionNetworkNode {
1155 pub store: Arc<dyn EvolutionStore>,
1156}
1157
1158impl EvolutionNetworkNode {
1159 pub fn new(store: Arc<dyn EvolutionStore>) -> Self {
1160 Self { store }
1161 }
1162
1163 pub fn with_default_store() -> Self {
1164 Self {
1165 store: Arc::new(JsonlEvolutionStore::new(default_store_root())),
1166 }
1167 }
1168
1169 pub fn accept_publish_request(
1170 &self,
1171 request: &PublishRequest,
1172 ) -> Result<ImportOutcome, EvoKernelError> {
1173 import_remote_envelope_into_store(
1174 self.store.as_ref(),
1175 &EvolutionEnvelope::publish(request.sender_id.clone(), request.assets.clone()),
1176 None,
1177 )
1178 }
1179
1180 pub fn publish_local_assets(
1181 &self,
1182 sender_id: impl Into<String>,
1183 ) -> Result<EvolutionEnvelope, EvoKernelError> {
1184 export_promoted_assets_from_store(self.store.as_ref(), sender_id)
1185 }
1186
1187 pub fn fetch_assets(
1188 &self,
1189 responder_id: impl Into<String>,
1190 query: &FetchQuery,
1191 ) -> Result<FetchResponse, EvoKernelError> {
1192 fetch_assets_from_store(self.store.as_ref(), responder_id, query)
1193 }
1194
1195 pub fn revoke_assets(&self, notice: &RevokeNotice) -> Result<RevokeNotice, EvoKernelError> {
1196 revoke_assets_in_store(self.store.as_ref(), notice)
1197 }
1198
1199 pub fn metrics_snapshot(&self) -> Result<EvolutionMetricsSnapshot, EvoKernelError> {
1200 evolution_metrics_snapshot(self.store.as_ref())
1201 }
1202
1203 pub fn render_metrics_prometheus(&self) -> Result<String, EvoKernelError> {
1204 self.metrics_snapshot().map(|snapshot| {
1205 let health = evolution_health_snapshot(&snapshot);
1206 render_evolution_metrics_prometheus(&snapshot, &health)
1207 })
1208 }
1209
1210 pub fn health_snapshot(&self) -> Result<EvolutionHealthSnapshot, EvoKernelError> {
1211 self.metrics_snapshot()
1212 .map(|snapshot| evolution_health_snapshot(&snapshot))
1213 }
1214}
1215
1216pub struct EvoKernel<S: KernelState> {
1217 pub kernel: Arc<Kernel<S>>,
1218 pub sandbox: Arc<dyn Sandbox>,
1219 pub validator: Arc<dyn Validator>,
1220 pub store: Arc<dyn EvolutionStore>,
1221 pub selector: Arc<dyn Selector>,
1222 pub governor: Arc<dyn Governor>,
1223 pub economics: Arc<Mutex<EvuLedger>>,
1224 pub remote_publishers: Arc<Mutex<BTreeMap<String, String>>>,
1225 pub stake_policy: StakePolicy,
1226 pub sandbox_policy: SandboxPolicy,
1227 pub validation_plan: ValidationPlan,
1228}
1229
1230impl<S: KernelState> EvoKernel<S> {
1231 fn recent_prior_mutation_ages_secs(
1232 &self,
1233 exclude_mutation_id: Option<&str>,
1234 ) -> Result<Vec<u64>, EvolutionError> {
1235 let now = Utc::now();
1236 let mut ages = self
1237 .store
1238 .scan(1)?
1239 .into_iter()
1240 .filter_map(|stored| match stored.event {
1241 EvolutionEvent::MutationDeclared { mutation }
1242 if exclude_mutation_id != Some(mutation.intent.id.as_str()) =>
1243 {
1244 Self::seconds_since_timestamp(&stored.timestamp, now)
1245 }
1246 _ => None,
1247 })
1248 .collect::<Vec<_>>();
1249 ages.sort_unstable();
1250 Ok(ages)
1251 }
1252
1253 fn seconds_since_timestamp(timestamp: &str, now: DateTime<Utc>) -> Option<u64> {
1254 let parsed = DateTime::parse_from_rfc3339(timestamp)
1255 .ok()?
1256 .with_timezone(&Utc);
1257 let elapsed = now.signed_duration_since(parsed);
1258 if elapsed < Duration::zero() {
1259 Some(0)
1260 } else {
1261 u64::try_from(elapsed.num_seconds()).ok()
1262 }
1263 }
1264
1265 pub fn new(
1266 kernel: Arc<Kernel<S>>,
1267 sandbox: Arc<dyn Sandbox>,
1268 validator: Arc<dyn Validator>,
1269 store: Arc<dyn EvolutionStore>,
1270 ) -> Self {
1271 let selector: Arc<dyn Selector> = Arc::new(StoreBackedSelector::new(store.clone()));
1272 Self {
1273 kernel,
1274 sandbox,
1275 validator,
1276 store,
1277 selector,
1278 governor: Arc::new(DefaultGovernor::default()),
1279 economics: Arc::new(Mutex::new(EvuLedger::default())),
1280 remote_publishers: Arc::new(Mutex::new(BTreeMap::new())),
1281 stake_policy: StakePolicy::default(),
1282 sandbox_policy: SandboxPolicy::oris_default(),
1283 validation_plan: ValidationPlan::oris_default(),
1284 }
1285 }
1286
1287 pub fn with_selector(mut self, selector: Arc<dyn Selector>) -> Self {
1288 self.selector = selector;
1289 self
1290 }
1291
1292 pub fn with_sandbox_policy(mut self, policy: SandboxPolicy) -> Self {
1293 self.sandbox_policy = policy;
1294 self
1295 }
1296
1297 pub fn with_governor(mut self, governor: Arc<dyn Governor>) -> Self {
1298 self.governor = governor;
1299 self
1300 }
1301
1302 pub fn with_economics(mut self, economics: Arc<Mutex<EvuLedger>>) -> Self {
1303 self.economics = economics;
1304 self
1305 }
1306
1307 pub fn with_stake_policy(mut self, policy: StakePolicy) -> Self {
1308 self.stake_policy = policy;
1309 self
1310 }
1311
1312 pub fn with_validation_plan(mut self, plan: ValidationPlan) -> Self {
1313 self.validation_plan = plan;
1314 self
1315 }
1316
1317 pub fn select_candidates(&self, input: &SelectorInput) -> Vec<GeneCandidate> {
1318 let executor = StoreReplayExecutor {
1319 sandbox: self.sandbox.clone(),
1320 validator: self.validator.clone(),
1321 store: self.store.clone(),
1322 selector: self.selector.clone(),
1323 governor: self.governor.clone(),
1324 economics: Some(self.economics.clone()),
1325 remote_publishers: Some(self.remote_publishers.clone()),
1326 stake_policy: self.stake_policy.clone(),
1327 };
1328 executor.collect_replay_candidates(input).candidates
1329 }
1330
1331 pub fn bootstrap_if_empty(&self, run_id: &RunId) -> Result<BootstrapReport, EvoKernelError> {
1332 let projection = projection_snapshot(self.store.as_ref())?;
1333 if !projection.genes.is_empty() {
1334 return Ok(BootstrapReport::default());
1335 }
1336
1337 let templates = built_in_seed_templates();
1338 for template in &templates {
1339 let mutation = build_seed_mutation(template);
1340 let extracted = extract_seed_signals(template);
1341 let gene = build_bootstrap_gene(template, &extracted)
1342 .map_err(|err| EvoKernelError::Validation(err.to_string()))?;
1343 let capsule = build_bootstrap_capsule(run_id, template, &mutation, &gene)
1344 .map_err(|err| EvoKernelError::Validation(err.to_string()))?;
1345
1346 self.store
1347 .append_event(EvolutionEvent::MutationDeclared {
1348 mutation: mutation.clone(),
1349 })
1350 .map_err(store_err)?;
1351 self.store
1352 .append_event(EvolutionEvent::SignalsExtracted {
1353 mutation_id: mutation.intent.id.clone(),
1354 hash: extracted.hash.clone(),
1355 signals: extracted.values.clone(),
1356 })
1357 .map_err(store_err)?;
1358 self.store
1359 .append_event(EvolutionEvent::GeneProjected { gene: gene.clone() })
1360 .map_err(store_err)?;
1361 self.store
1362 .append_event(EvolutionEvent::PromotionEvaluated {
1363 gene_id: gene.id.clone(),
1364 state: AssetState::Quarantined,
1365 reason: "bootstrap seeds require local validation before replay".into(),
1366 })
1367 .map_err(store_err)?;
1368 self.store
1369 .append_event(EvolutionEvent::CapsuleCommitted {
1370 capsule: capsule.clone(),
1371 })
1372 .map_err(store_err)?;
1373 self.store
1374 .append_event(EvolutionEvent::CapsuleQuarantined {
1375 capsule_id: capsule.id,
1376 })
1377 .map_err(store_err)?;
1378 }
1379
1380 Ok(BootstrapReport {
1381 seeded: true,
1382 genes_added: templates.len(),
1383 capsules_added: templates.len(),
1384 })
1385 }
1386
1387 pub async fn capture_successful_mutation(
1388 &self,
1389 run_id: &RunId,
1390 mutation: PreparedMutation,
1391 ) -> Result<Capsule, EvoKernelError> {
1392 Ok(self
1393 .capture_mutation_with_governor(run_id, mutation)
1394 .await?
1395 .capsule)
1396 }
1397
1398 pub async fn capture_mutation_with_governor(
1399 &self,
1400 run_id: &RunId,
1401 mutation: PreparedMutation,
1402 ) -> Result<CaptureOutcome, EvoKernelError> {
1403 self.store
1404 .append_event(EvolutionEvent::MutationDeclared {
1405 mutation: mutation.clone(),
1406 })
1407 .map_err(store_err)?;
1408
1409 let receipt = match self.sandbox.apply(&mutation, &self.sandbox_policy).await {
1410 Ok(receipt) => receipt,
1411 Err(err) => {
1412 self.store
1413 .append_event(EvolutionEvent::MutationRejected {
1414 mutation_id: mutation.intent.id.clone(),
1415 reason: err.to_string(),
1416 })
1417 .map_err(store_err)?;
1418 return Err(EvoKernelError::Sandbox(err.to_string()));
1419 }
1420 };
1421
1422 self.store
1423 .append_event(EvolutionEvent::MutationApplied {
1424 mutation_id: mutation.intent.id.clone(),
1425 patch_hash: receipt.patch_hash.clone(),
1426 changed_files: receipt
1427 .changed_files
1428 .iter()
1429 .map(|path| path.to_string_lossy().to_string())
1430 .collect(),
1431 })
1432 .map_err(store_err)?;
1433
1434 let report = self
1435 .validator
1436 .run(&receipt, &self.validation_plan)
1437 .await
1438 .map_err(|err| EvoKernelError::Validation(err.to_string()))?;
1439 if !report.success {
1440 self.store
1441 .append_event(EvolutionEvent::ValidationFailed {
1442 mutation_id: mutation.intent.id.clone(),
1443 report: report.to_snapshot(&self.validation_plan.profile),
1444 gene_id: None,
1445 })
1446 .map_err(store_err)?;
1447 return Err(EvoKernelError::ValidationFailed(report));
1448 }
1449
1450 self.store
1451 .append_event(EvolutionEvent::ValidationPassed {
1452 mutation_id: mutation.intent.id.clone(),
1453 report: report.to_snapshot(&self.validation_plan.profile),
1454 gene_id: None,
1455 })
1456 .map_err(store_err)?;
1457
1458 let extracted_signals = extract_deterministic_signals(&SignalExtractionInput {
1459 patch_diff: mutation.artifact.payload.clone(),
1460 intent: mutation.intent.intent.clone(),
1461 expected_effect: mutation.intent.expected_effect.clone(),
1462 declared_signals: mutation.intent.signals.clone(),
1463 changed_files: receipt
1464 .changed_files
1465 .iter()
1466 .map(|path| path.to_string_lossy().to_string())
1467 .collect(),
1468 validation_success: report.success,
1469 validation_logs: report.logs.clone(),
1470 stage_outputs: report
1471 .stages
1472 .iter()
1473 .flat_map(|stage| [stage.stdout.clone(), stage.stderr.clone()])
1474 .filter(|value| !value.is_empty())
1475 .collect(),
1476 });
1477 self.store
1478 .append_event(EvolutionEvent::SignalsExtracted {
1479 mutation_id: mutation.intent.id.clone(),
1480 hash: extracted_signals.hash.clone(),
1481 signals: extracted_signals.values.clone(),
1482 })
1483 .map_err(store_err)?;
1484
1485 let projection = projection_snapshot(self.store.as_ref())?;
1486 let blast_radius = compute_blast_radius(&mutation.artifact.payload);
1487 let recent_mutation_ages_secs = self
1488 .recent_prior_mutation_ages_secs(Some(mutation.intent.id.as_str()))
1489 .map_err(store_err)?;
1490 let mut gene = derive_gene(
1491 &mutation,
1492 &receipt,
1493 &self.validation_plan.profile,
1494 &extracted_signals.values,
1495 );
1496 let (current_confidence, historical_peak_confidence, confidence_last_updated_secs) =
1497 StoreReplayExecutor::confidence_context(&projection, &gene.id);
1498 let success_count = projection
1499 .genes
1500 .iter()
1501 .find(|existing| existing.id == gene.id)
1502 .map(|existing| {
1503 projection
1504 .capsules
1505 .iter()
1506 .filter(|capsule| capsule.gene_id == existing.id)
1507 .count() as u64
1508 })
1509 .unwrap_or(0)
1510 + 1;
1511 let governor_decision = self.governor.evaluate(GovernorInput {
1512 candidate_source: CandidateSource::Local,
1513 success_count,
1514 blast_radius: blast_radius.clone(),
1515 replay_failures: 0,
1516 recent_mutation_ages_secs,
1517 current_confidence,
1518 historical_peak_confidence,
1519 confidence_last_updated_secs,
1520 });
1521
1522 gene.state = governor_decision.target_state.clone();
1523 self.store
1524 .append_event(EvolutionEvent::GeneProjected { gene: gene.clone() })
1525 .map_err(store_err)?;
1526 self.store
1527 .append_event(EvolutionEvent::PromotionEvaluated {
1528 gene_id: gene.id.clone(),
1529 state: governor_decision.target_state.clone(),
1530 reason: governor_decision.reason.clone(),
1531 })
1532 .map_err(store_err)?;
1533 if matches!(governor_decision.target_state, AssetState::Promoted) {
1534 self.store
1535 .append_event(EvolutionEvent::GenePromoted {
1536 gene_id: gene.id.clone(),
1537 })
1538 .map_err(store_err)?;
1539 }
1540 if matches!(governor_decision.target_state, AssetState::Revoked) {
1541 self.store
1542 .append_event(EvolutionEvent::GeneRevoked {
1543 gene_id: gene.id.clone(),
1544 reason: governor_decision.reason.clone(),
1545 })
1546 .map_err(store_err)?;
1547 }
1548 if let Some(spec_id) = &mutation.intent.spec_id {
1549 self.store
1550 .append_event(EvolutionEvent::SpecLinked {
1551 mutation_id: mutation.intent.id.clone(),
1552 spec_id: spec_id.clone(),
1553 })
1554 .map_err(store_err)?;
1555 }
1556
1557 let mut capsule = build_capsule(
1558 run_id,
1559 &mutation,
1560 &receipt,
1561 &report,
1562 &self.validation_plan.profile,
1563 &gene,
1564 &blast_radius,
1565 )
1566 .map_err(|err| EvoKernelError::Validation(err.to_string()))?;
1567 capsule.state = governor_decision.target_state.clone();
1568 self.store
1569 .append_event(EvolutionEvent::CapsuleCommitted {
1570 capsule: capsule.clone(),
1571 })
1572 .map_err(store_err)?;
1573 if matches!(governor_decision.target_state, AssetState::Quarantined) {
1574 self.store
1575 .append_event(EvolutionEvent::CapsuleQuarantined {
1576 capsule_id: capsule.id.clone(),
1577 })
1578 .map_err(store_err)?;
1579 }
1580
1581 Ok(CaptureOutcome {
1582 capsule,
1583 gene,
1584 governor_decision,
1585 })
1586 }
1587
1588 pub async fn capture_from_proposal(
1589 &self,
1590 run_id: &RunId,
1591 proposal: &AgentMutationProposal,
1592 diff_payload: String,
1593 base_revision: Option<String>,
1594 ) -> Result<CaptureOutcome, EvoKernelError> {
1595 let intent = MutationIntent {
1596 id: next_id("proposal"),
1597 intent: proposal.intent.clone(),
1598 target: MutationTarget::Paths {
1599 allow: proposal.files.clone(),
1600 },
1601 expected_effect: proposal.expected_effect.clone(),
1602 risk: RiskLevel::Low,
1603 signals: proposal.files.clone(),
1604 spec_id: None,
1605 };
1606 self.capture_mutation_with_governor(
1607 run_id,
1608 prepare_mutation(intent, diff_payload, base_revision),
1609 )
1610 .await
1611 }
1612
1613 pub fn feedback_for_agent(outcome: &CaptureOutcome) -> ExecutionFeedback {
1614 ExecutionFeedback {
1615 accepted: !matches!(outcome.governor_decision.target_state, AssetState::Revoked),
1616 asset_state: Some(format!("{:?}", outcome.governor_decision.target_state)),
1617 summary: outcome.governor_decision.reason.clone(),
1618 }
1619 }
1620
1621 pub fn coordinate(&self, plan: CoordinationPlan) -> CoordinationResult {
1622 MultiAgentCoordinator::new().coordinate(plan)
1623 }
1624
1625 pub fn export_promoted_assets(
1626 &self,
1627 sender_id: impl Into<String>,
1628 ) -> Result<EvolutionEnvelope, EvoKernelError> {
1629 let sender_id = sender_id.into();
1630 let envelope = export_promoted_assets_from_store(self.store.as_ref(), sender_id.clone())?;
1631 if !envelope.assets.is_empty() {
1632 let mut ledger = self
1633 .economics
1634 .lock()
1635 .map_err(|_| EvoKernelError::Validation("economics ledger lock poisoned".into()))?;
1636 if ledger
1637 .reserve_publish_stake(&sender_id, &self.stake_policy)
1638 .is_none()
1639 {
1640 return Err(EvoKernelError::Validation(
1641 "insufficient EVU for remote publish".into(),
1642 ));
1643 }
1644 }
1645 Ok(envelope)
1646 }
1647
1648 pub fn import_remote_envelope(
1649 &self,
1650 envelope: &EvolutionEnvelope,
1651 ) -> Result<ImportOutcome, EvoKernelError> {
1652 import_remote_envelope_into_store(
1653 self.store.as_ref(),
1654 envelope,
1655 Some(self.remote_publishers.as_ref()),
1656 )
1657 }
1658
1659 pub fn fetch_assets(
1660 &self,
1661 responder_id: impl Into<String>,
1662 query: &FetchQuery,
1663 ) -> Result<FetchResponse, EvoKernelError> {
1664 fetch_assets_from_store(self.store.as_ref(), responder_id, query)
1665 }
1666
1667 pub fn revoke_assets(&self, notice: &RevokeNotice) -> Result<RevokeNotice, EvoKernelError> {
1668 revoke_assets_in_store(self.store.as_ref(), notice)
1669 }
1670
1671 pub async fn replay_or_fallback(
1672 &self,
1673 input: SelectorInput,
1674 ) -> Result<ReplayDecision, EvoKernelError> {
1675 let replay_run_id = next_id("replay");
1676 self.replay_or_fallback_for_run(&replay_run_id, input).await
1677 }
1678
1679 pub async fn replay_or_fallback_for_run(
1680 &self,
1681 run_id: &RunId,
1682 input: SelectorInput,
1683 ) -> Result<ReplayDecision, EvoKernelError> {
1684 let executor = StoreReplayExecutor {
1685 sandbox: self.sandbox.clone(),
1686 validator: self.validator.clone(),
1687 store: self.store.clone(),
1688 selector: self.selector.clone(),
1689 governor: self.governor.clone(),
1690 economics: Some(self.economics.clone()),
1691 remote_publishers: Some(self.remote_publishers.clone()),
1692 stake_policy: self.stake_policy.clone(),
1693 };
1694 executor
1695 .try_replay_for_run(run_id, &input, &self.sandbox_policy, &self.validation_plan)
1696 .await
1697 .map_err(|err| EvoKernelError::Validation(err.to_string()))
1698 }
1699
1700 pub fn economics_signal(&self, node_id: &str) -> Option<EconomicsSignal> {
1701 self.economics.lock().ok()?.governor_signal(node_id)
1702 }
1703
1704 pub fn selector_reputation_bias(&self) -> BTreeMap<String, f32> {
1705 self.economics
1706 .lock()
1707 .ok()
1708 .map(|locked| locked.selector_reputation_bias())
1709 .unwrap_or_default()
1710 }
1711
1712 pub fn metrics_snapshot(&self) -> Result<EvolutionMetricsSnapshot, EvoKernelError> {
1713 evolution_metrics_snapshot(self.store.as_ref())
1714 }
1715
1716 pub fn render_metrics_prometheus(&self) -> Result<String, EvoKernelError> {
1717 self.metrics_snapshot().map(|snapshot| {
1718 let health = evolution_health_snapshot(&snapshot);
1719 render_evolution_metrics_prometheus(&snapshot, &health)
1720 })
1721 }
1722
1723 pub fn health_snapshot(&self) -> Result<EvolutionHealthSnapshot, EvoKernelError> {
1724 self.metrics_snapshot()
1725 .map(|snapshot| evolution_health_snapshot(&snapshot))
1726 }
1727}
1728
1729pub fn prepare_mutation(
1730 intent: MutationIntent,
1731 diff_payload: String,
1732 base_revision: Option<String>,
1733) -> PreparedMutation {
1734 PreparedMutation {
1735 intent,
1736 artifact: MutationArtifact {
1737 encoding: ArtifactEncoding::UnifiedDiff,
1738 content_hash: compute_artifact_hash(&diff_payload),
1739 payload: diff_payload,
1740 base_revision,
1741 },
1742 }
1743}
1744
1745pub fn prepare_mutation_from_spec(
1746 plan: CompiledMutationPlan,
1747 diff_payload: String,
1748 base_revision: Option<String>,
1749) -> PreparedMutation {
1750 prepare_mutation(plan.mutation_intent, diff_payload, base_revision)
1751}
1752
1753pub fn default_evolution_store() -> Arc<dyn EvolutionStore> {
1754 Arc::new(oris_evolution::JsonlEvolutionStore::new(
1755 default_store_root(),
1756 ))
1757}
1758
1759fn built_in_seed_templates() -> Vec<SeedTemplate> {
1760 vec![
1761 SeedTemplate {
1762 id: "bootstrap-readme".into(),
1763 intent: "Seed a baseline README recovery pattern".into(),
1764 signals: vec!["bootstrap readme".into(), "missing readme".into()],
1765 diff_payload: "\
1766diff --git a/README.md b/README.md
1767new file mode 100644
1768index 0000000..1111111
1769--- /dev/null
1770+++ b/README.md
1771@@ -0,0 +1,3 @@
1772+# Oris
1773+Bootstrap documentation seed
1774+"
1775 .into(),
1776 validation_profile: "bootstrap-seed".into(),
1777 },
1778 SeedTemplate {
1779 id: "bootstrap-test-fix".into(),
1780 intent: "Seed a deterministic test stabilization pattern".into(),
1781 signals: vec!["bootstrap test fix".into(), "failing tests".into()],
1782 diff_payload: "\
1783diff --git a/src/lib.rs b/src/lib.rs
1784index 1111111..2222222 100644
1785--- a/src/lib.rs
1786+++ b/src/lib.rs
1787@@ -1 +1,2 @@
1788 pub fn demo() -> usize { 1 }
1789+pub fn normalize_test_output() -> bool { true }
1790"
1791 .into(),
1792 validation_profile: "bootstrap-seed".into(),
1793 },
1794 SeedTemplate {
1795 id: "bootstrap-refactor".into(),
1796 intent: "Seed a low-risk refactor capsule".into(),
1797 signals: vec!["bootstrap refactor".into(), "small refactor".into()],
1798 diff_payload: "\
1799diff --git a/src/lib.rs b/src/lib.rs
1800index 2222222..3333333 100644
1801--- a/src/lib.rs
1802+++ b/src/lib.rs
1803@@ -1 +1,3 @@
1804 pub fn demo() -> usize { 1 }
1805+
1806+fn extract_strategy_key(input: &str) -> &str { input }
1807"
1808 .into(),
1809 validation_profile: "bootstrap-seed".into(),
1810 },
1811 SeedTemplate {
1812 id: "bootstrap-logging".into(),
1813 intent: "Seed a baseline structured logging mutation".into(),
1814 signals: vec!["bootstrap logging".into(), "structured logs".into()],
1815 diff_payload: "\
1816diff --git a/src/lib.rs b/src/lib.rs
1817index 3333333..4444444 100644
1818--- a/src/lib.rs
1819+++ b/src/lib.rs
1820@@ -1 +1,3 @@
1821 pub fn demo() -> usize { 1 }
1822+
1823+fn emit_bootstrap_log() { println!(\"bootstrap-log\"); }
1824"
1825 .into(),
1826 validation_profile: "bootstrap-seed".into(),
1827 },
1828 ]
1829}
1830
1831fn build_seed_mutation(template: &SeedTemplate) -> PreparedMutation {
1832 let changed_files = seed_changed_files(&template.diff_payload);
1833 let target = if changed_files.is_empty() {
1834 MutationTarget::WorkspaceRoot
1835 } else {
1836 MutationTarget::Paths {
1837 allow: changed_files,
1838 }
1839 };
1840 prepare_mutation(
1841 MutationIntent {
1842 id: stable_hash_json(&("bootstrap-mutation", &template.id))
1843 .unwrap_or_else(|_| format!("bootstrap-mutation-{}", template.id)),
1844 intent: template.intent.clone(),
1845 target,
1846 expected_effect: format!("seed {}", template.id),
1847 risk: RiskLevel::Low,
1848 signals: template.signals.clone(),
1849 spec_id: None,
1850 },
1851 template.diff_payload.clone(),
1852 None,
1853 )
1854}
1855
1856fn extract_seed_signals(template: &SeedTemplate) -> SignalExtractionOutput {
1857 let mut signals = BTreeSet::new();
1858 for declared in &template.signals {
1859 if let Some(phrase) = normalize_signal_phrase(declared) {
1860 signals.insert(phrase);
1861 }
1862 extend_signal_tokens(&mut signals, declared);
1863 }
1864 extend_signal_tokens(&mut signals, &template.intent);
1865 extend_signal_tokens(&mut signals, &template.diff_payload);
1866 for changed_file in seed_changed_files(&template.diff_payload) {
1867 extend_signal_tokens(&mut signals, &changed_file);
1868 }
1869 let values = signals.into_iter().take(32).collect::<Vec<_>>();
1870 let hash =
1871 stable_hash_json(&values).unwrap_or_else(|_| compute_artifact_hash(&values.join("\n")));
1872 SignalExtractionOutput { values, hash }
1873}
1874
1875fn seed_changed_files(diff_payload: &str) -> Vec<String> {
1876 let mut changed_files = BTreeSet::new();
1877 for line in diff_payload.lines() {
1878 if let Some(path) = line.strip_prefix("+++ b/") {
1879 let normalized = path.trim();
1880 if !normalized.is_empty() {
1881 changed_files.insert(normalized.to_string());
1882 }
1883 }
1884 }
1885 changed_files.into_iter().collect()
1886}
1887
1888fn build_bootstrap_gene(
1889 template: &SeedTemplate,
1890 extracted: &SignalExtractionOutput,
1891) -> Result<Gene, EvolutionError> {
1892 let strategy = vec![template.id.clone(), "bootstrap".into()];
1893 let id = stable_hash_json(&(
1894 "bootstrap-gene",
1895 &template.id,
1896 &extracted.values,
1897 &template.validation_profile,
1898 ))?;
1899 Ok(Gene {
1900 id,
1901 signals: extracted.values.clone(),
1902 strategy,
1903 validation: vec![template.validation_profile.clone()],
1904 state: AssetState::Quarantined,
1905 })
1906}
1907
1908fn build_bootstrap_capsule(
1909 run_id: &RunId,
1910 template: &SeedTemplate,
1911 mutation: &PreparedMutation,
1912 gene: &Gene,
1913) -> Result<Capsule, EvolutionError> {
1914 let cwd = std::env::current_dir().unwrap_or_else(|_| Path::new(".").to_path_buf());
1915 let env = current_env_fingerprint(&cwd);
1916 let diff_hash = mutation.artifact.content_hash.clone();
1917 let changed_files = seed_changed_files(&template.diff_payload);
1918 let validator_hash = stable_hash_json(&(
1919 "bootstrap-validator",
1920 &template.id,
1921 &template.validation_profile,
1922 &diff_hash,
1923 ))?;
1924 let id = stable_hash_json(&(
1925 "bootstrap-capsule",
1926 &template.id,
1927 run_id,
1928 &gene.id,
1929 &diff_hash,
1930 &env,
1931 ))?;
1932 Ok(Capsule {
1933 id,
1934 gene_id: gene.id.clone(),
1935 mutation_id: mutation.intent.id.clone(),
1936 run_id: run_id.clone(),
1937 diff_hash,
1938 confidence: 0.0,
1939 env,
1940 outcome: Outcome {
1941 success: false,
1942 validation_profile: template.validation_profile.clone(),
1943 validation_duration_ms: 0,
1944 changed_files,
1945 validator_hash,
1946 lines_changed: compute_blast_radius(&template.diff_payload).lines_changed,
1947 replay_verified: false,
1948 },
1949 state: AssetState::Quarantined,
1950 })
1951}
1952
1953fn derive_gene(
1954 mutation: &PreparedMutation,
1955 receipt: &SandboxReceipt,
1956 validation_profile: &str,
1957 extracted_signals: &[String],
1958) -> Gene {
1959 let mut strategy = BTreeSet::new();
1960 for file in &receipt.changed_files {
1961 if let Some(component) = file.components().next() {
1962 strategy.insert(component.as_os_str().to_string_lossy().to_string());
1963 }
1964 }
1965 for token in mutation
1966 .artifact
1967 .payload
1968 .split(|ch: char| !ch.is_ascii_alphanumeric())
1969 {
1970 if token.len() == 5
1971 && token.starts_with('E')
1972 && token[1..].chars().all(|ch| ch.is_ascii_digit())
1973 {
1974 strategy.insert(token.to_string());
1975 }
1976 }
1977 for token in mutation.intent.intent.split_whitespace().take(8) {
1978 strategy.insert(token.to_ascii_lowercase());
1979 }
1980 let strategy = strategy.into_iter().collect::<Vec<_>>();
1981 let id = stable_hash_json(&(extracted_signals, &strategy, validation_profile))
1982 .unwrap_or_else(|_| next_id("gene"));
1983 Gene {
1984 id,
1985 signals: extracted_signals.to_vec(),
1986 strategy,
1987 validation: vec![validation_profile.to_string()],
1988 state: AssetState::Promoted,
1989 }
1990}
1991
1992fn build_capsule(
1993 run_id: &RunId,
1994 mutation: &PreparedMutation,
1995 receipt: &SandboxReceipt,
1996 report: &ValidationReport,
1997 validation_profile: &str,
1998 gene: &Gene,
1999 blast_radius: &BlastRadius,
2000) -> Result<Capsule, EvolutionError> {
2001 let env = current_env_fingerprint(&receipt.workdir);
2002 let validator_hash = stable_hash_json(report)?;
2003 let diff_hash = mutation.artifact.content_hash.clone();
2004 let id = stable_hash_json(&(run_id, &gene.id, &diff_hash, &mutation.intent.id))?;
2005 Ok(Capsule {
2006 id,
2007 gene_id: gene.id.clone(),
2008 mutation_id: mutation.intent.id.clone(),
2009 run_id: run_id.clone(),
2010 diff_hash,
2011 confidence: 0.7,
2012 env,
2013 outcome: oris_evolution::Outcome {
2014 success: true,
2015 validation_profile: validation_profile.to_string(),
2016 validation_duration_ms: report.duration_ms,
2017 changed_files: receipt
2018 .changed_files
2019 .iter()
2020 .map(|path| path.to_string_lossy().to_string())
2021 .collect(),
2022 validator_hash,
2023 lines_changed: blast_radius.lines_changed,
2024 replay_verified: false,
2025 },
2026 state: AssetState::Promoted,
2027 })
2028}
2029
2030fn current_env_fingerprint(workdir: &Path) -> EnvFingerprint {
2031 let rustc_version = Command::new("rustc")
2032 .arg("--version")
2033 .output()
2034 .ok()
2035 .filter(|output| output.status.success())
2036 .map(|output| String::from_utf8_lossy(&output.stdout).trim().to_string())
2037 .unwrap_or_else(|| "rustc unknown".into());
2038 let cargo_lock_hash = fs::read(workdir.join("Cargo.lock"))
2039 .ok()
2040 .map(|bytes| {
2041 let value = String::from_utf8_lossy(&bytes);
2042 compute_artifact_hash(&value)
2043 })
2044 .unwrap_or_else(|| "missing-cargo-lock".into());
2045 let target_triple = format!(
2046 "{}-unknown-{}",
2047 std::env::consts::ARCH,
2048 std::env::consts::OS
2049 );
2050 EnvFingerprint {
2051 rustc_version,
2052 cargo_lock_hash,
2053 target_triple,
2054 os: std::env::consts::OS.to_string(),
2055 }
2056}
2057
2058fn extend_signal_tokens(out: &mut BTreeSet<String>, input: &str) {
2059 for raw in input.split(|ch: char| !ch.is_ascii_alphanumeric()) {
2060 let trimmed = raw.trim();
2061 if trimmed.is_empty() {
2062 continue;
2063 }
2064 let normalized = if is_rust_error_code(trimmed) {
2065 let mut chars = trimmed.chars();
2066 let prefix = chars
2067 .next()
2068 .map(|ch| ch.to_ascii_uppercase())
2069 .unwrap_or('E');
2070 format!("{prefix}{}", chars.as_str())
2071 } else {
2072 trimmed.to_ascii_lowercase()
2073 };
2074 if normalized.len() < 3 {
2075 continue;
2076 }
2077 out.insert(normalized);
2078 }
2079}
2080
2081fn normalize_signal_phrase(input: &str) -> Option<String> {
2082 let normalized = input
2083 .split(|ch: char| !ch.is_ascii_alphanumeric())
2084 .filter_map(|raw| {
2085 let trimmed = raw.trim();
2086 if trimmed.is_empty() {
2087 return None;
2088 }
2089 let normalized = if is_rust_error_code(trimmed) {
2090 let mut chars = trimmed.chars();
2091 let prefix = chars
2092 .next()
2093 .map(|ch| ch.to_ascii_uppercase())
2094 .unwrap_or('E');
2095 format!("{prefix}{}", chars.as_str())
2096 } else {
2097 trimmed.to_ascii_lowercase()
2098 };
2099 if normalized.len() < 3 {
2100 None
2101 } else {
2102 Some(normalized)
2103 }
2104 })
2105 .collect::<Vec<_>>()
2106 .join(" ");
2107 if normalized.is_empty() {
2108 None
2109 } else {
2110 Some(normalized)
2111 }
2112}
2113
2114fn is_rust_error_code(value: &str) -> bool {
2115 value.len() == 5
2116 && matches!(value.as_bytes().first(), Some(b'e') | Some(b'E'))
2117 && value[1..].chars().all(|ch| ch.is_ascii_digit())
2118}
2119
2120fn find_declared_mutation(
2121 store: &dyn EvolutionStore,
2122 mutation_id: &MutationId,
2123) -> Result<Option<PreparedMutation>, EvolutionError> {
2124 for stored in store.scan(1)? {
2125 if let EvolutionEvent::MutationDeclared { mutation } = stored.event {
2126 if &mutation.intent.id == mutation_id {
2127 return Ok(Some(mutation));
2128 }
2129 }
2130 }
2131 Ok(None)
2132}
2133
2134fn exact_match_candidates(store: &dyn EvolutionStore, input: &SelectorInput) -> Vec<GeneCandidate> {
2135 let Ok(projection) = projection_snapshot(store) else {
2136 return Vec::new();
2137 };
2138 let capsules = projection.capsules.clone();
2139 let spec_ids_by_gene = projection.spec_ids_by_gene.clone();
2140 let requested_spec_id = input
2141 .spec_id
2142 .as_deref()
2143 .map(str::trim)
2144 .filter(|value| !value.is_empty());
2145 let signal_set = input
2146 .signals
2147 .iter()
2148 .map(|signal| signal.to_ascii_lowercase())
2149 .collect::<BTreeSet<_>>();
2150 let mut candidates = projection
2151 .genes
2152 .into_iter()
2153 .filter_map(|gene| {
2154 if gene.state != AssetState::Promoted {
2155 return None;
2156 }
2157 if let Some(spec_id) = requested_spec_id {
2158 let matches_spec = spec_ids_by_gene
2159 .get(&gene.id)
2160 .map(|values| {
2161 values
2162 .iter()
2163 .any(|value| value.eq_ignore_ascii_case(spec_id))
2164 })
2165 .unwrap_or(false);
2166 if !matches_spec {
2167 return None;
2168 }
2169 }
2170 let gene_signals = gene
2171 .signals
2172 .iter()
2173 .map(|signal| signal.to_ascii_lowercase())
2174 .collect::<BTreeSet<_>>();
2175 if gene_signals == signal_set {
2176 let mut matched_capsules = capsules
2177 .iter()
2178 .filter(|capsule| {
2179 capsule.gene_id == gene.id && capsule.state == AssetState::Promoted
2180 })
2181 .cloned()
2182 .collect::<Vec<_>>();
2183 matched_capsules.sort_by(|left, right| {
2184 replay_environment_match_factor(&input.env, &right.env)
2185 .partial_cmp(&replay_environment_match_factor(&input.env, &left.env))
2186 .unwrap_or(std::cmp::Ordering::Equal)
2187 .then_with(|| {
2188 right
2189 .confidence
2190 .partial_cmp(&left.confidence)
2191 .unwrap_or(std::cmp::Ordering::Equal)
2192 })
2193 .then_with(|| left.id.cmp(&right.id))
2194 });
2195 if matched_capsules.is_empty() {
2196 None
2197 } else {
2198 let score = matched_capsules
2199 .first()
2200 .map(|capsule| replay_environment_match_factor(&input.env, &capsule.env))
2201 .unwrap_or(0.0);
2202 Some(GeneCandidate {
2203 gene,
2204 score,
2205 capsules: matched_capsules,
2206 })
2207 }
2208 } else {
2209 None
2210 }
2211 })
2212 .collect::<Vec<_>>();
2213 candidates.sort_by(|left, right| {
2214 right
2215 .score
2216 .partial_cmp(&left.score)
2217 .unwrap_or(std::cmp::Ordering::Equal)
2218 .then_with(|| left.gene.id.cmp(&right.gene.id))
2219 });
2220 candidates
2221}
2222
2223fn quarantined_remote_exact_match_candidates(
2224 store: &dyn EvolutionStore,
2225 input: &SelectorInput,
2226) -> Vec<GeneCandidate> {
2227 let remote_asset_ids = store
2228 .scan(1)
2229 .ok()
2230 .map(|events| {
2231 events
2232 .into_iter()
2233 .filter_map(|stored| match stored.event {
2234 EvolutionEvent::RemoteAssetImported {
2235 source: CandidateSource::Remote,
2236 asset_ids,
2237 ..
2238 } => Some(asset_ids),
2239 _ => None,
2240 })
2241 .flatten()
2242 .collect::<BTreeSet<_>>()
2243 })
2244 .unwrap_or_default();
2245 if remote_asset_ids.is_empty() {
2246 return Vec::new();
2247 }
2248
2249 let Ok(projection) = projection_snapshot(store) else {
2250 return Vec::new();
2251 };
2252 let capsules = projection.capsules.clone();
2253 let spec_ids_by_gene = projection.spec_ids_by_gene.clone();
2254 let requested_spec_id = input
2255 .spec_id
2256 .as_deref()
2257 .map(str::trim)
2258 .filter(|value| !value.is_empty());
2259 let normalized_signals = input
2260 .signals
2261 .iter()
2262 .filter_map(|signal| normalize_signal_phrase(signal))
2263 .collect::<BTreeSet<_>>()
2264 .into_iter()
2265 .collect::<Vec<_>>();
2266 if normalized_signals.is_empty() {
2267 return Vec::new();
2268 }
2269 let mut candidates = projection
2270 .genes
2271 .into_iter()
2272 .filter_map(|gene| {
2273 if !matches!(gene.state, AssetState::Promoted | AssetState::Quarantined) {
2274 return None;
2275 }
2276 if let Some(spec_id) = requested_spec_id {
2277 let matches_spec = spec_ids_by_gene
2278 .get(&gene.id)
2279 .map(|values| {
2280 values
2281 .iter()
2282 .any(|value| value.eq_ignore_ascii_case(spec_id))
2283 })
2284 .unwrap_or(false);
2285 if !matches_spec {
2286 return None;
2287 }
2288 }
2289 let normalized_gene_signals = gene
2290 .signals
2291 .iter()
2292 .filter_map(|candidate| normalize_signal_phrase(candidate))
2293 .collect::<Vec<_>>();
2294 let matched_query_count = normalized_signals
2295 .iter()
2296 .filter(|signal| {
2297 normalized_gene_signals.iter().any(|candidate| {
2298 candidate.contains(signal.as_str()) || signal.contains(candidate)
2299 })
2300 })
2301 .count();
2302 if matched_query_count == 0 {
2303 return None;
2304 }
2305
2306 let mut matched_capsules = capsules
2307 .iter()
2308 .filter(|capsule| {
2309 capsule.gene_id == gene.id
2310 && capsule.state == AssetState::Quarantined
2311 && remote_asset_ids.contains(&capsule.id)
2312 })
2313 .cloned()
2314 .collect::<Vec<_>>();
2315 matched_capsules.sort_by(|left, right| {
2316 replay_environment_match_factor(&input.env, &right.env)
2317 .partial_cmp(&replay_environment_match_factor(&input.env, &left.env))
2318 .unwrap_or(std::cmp::Ordering::Equal)
2319 .then_with(|| {
2320 right
2321 .confidence
2322 .partial_cmp(&left.confidence)
2323 .unwrap_or(std::cmp::Ordering::Equal)
2324 })
2325 .then_with(|| left.id.cmp(&right.id))
2326 });
2327 if matched_capsules.is_empty() {
2328 None
2329 } else {
2330 let overlap = matched_query_count as f32 / normalized_signals.len() as f32;
2331 let env_score = matched_capsules
2332 .first()
2333 .map(|capsule| replay_environment_match_factor(&input.env, &capsule.env))
2334 .unwrap_or(0.0);
2335 Some(GeneCandidate {
2336 gene,
2337 score: overlap.max(env_score),
2338 capsules: matched_capsules,
2339 })
2340 }
2341 })
2342 .collect::<Vec<_>>();
2343 candidates.sort_by(|left, right| {
2344 right
2345 .score
2346 .partial_cmp(&left.score)
2347 .unwrap_or(std::cmp::Ordering::Equal)
2348 .then_with(|| left.gene.id.cmp(&right.gene.id))
2349 });
2350 candidates
2351}
2352
2353fn replay_environment_match_factor(input: &EnvFingerprint, candidate: &EnvFingerprint) -> f32 {
2354 let fields = [
2355 input
2356 .rustc_version
2357 .eq_ignore_ascii_case(&candidate.rustc_version),
2358 input
2359 .cargo_lock_hash
2360 .eq_ignore_ascii_case(&candidate.cargo_lock_hash),
2361 input
2362 .target_triple
2363 .eq_ignore_ascii_case(&candidate.target_triple),
2364 input.os.eq_ignore_ascii_case(&candidate.os),
2365 ];
2366 let matched_fields = fields.into_iter().filter(|matched| *matched).count() as f32;
2367 0.5 + ((matched_fields / 4.0) * 0.5)
2368}
2369
2370fn effective_candidate_score(
2371 candidate: &GeneCandidate,
2372 publishers_by_asset: &BTreeMap<String, String>,
2373 reputation_bias: &BTreeMap<String, f32>,
2374) -> f32 {
2375 let bias = candidate
2376 .capsules
2377 .first()
2378 .and_then(|capsule| publishers_by_asset.get(&capsule.id))
2379 .and_then(|publisher| reputation_bias.get(publisher))
2380 .copied()
2381 .unwrap_or(0.0)
2382 .clamp(0.0, 1.0);
2383 candidate.score * (1.0 + (bias * 0.1))
2384}
2385
2386fn export_promoted_assets_from_store(
2387 store: &dyn EvolutionStore,
2388 sender_id: impl Into<String>,
2389) -> Result<EvolutionEnvelope, EvoKernelError> {
2390 let (events, projection) = scan_projection(store)?;
2391 let genes = projection
2392 .genes
2393 .into_iter()
2394 .filter(|gene| gene.state == AssetState::Promoted)
2395 .collect::<Vec<_>>();
2396 let capsules = projection
2397 .capsules
2398 .into_iter()
2399 .filter(|capsule| capsule.state == AssetState::Promoted)
2400 .collect::<Vec<_>>();
2401 let assets = replay_export_assets(&events, genes, capsules);
2402 Ok(EvolutionEnvelope::publish(sender_id, assets))
2403}
2404
2405fn scan_projection(
2406 store: &dyn EvolutionStore,
2407) -> Result<(Vec<StoredEvolutionEvent>, EvolutionProjection), EvoKernelError> {
2408 store.scan_projection().map_err(store_err)
2409}
2410
2411fn projection_snapshot(store: &dyn EvolutionStore) -> Result<EvolutionProjection, EvoKernelError> {
2412 scan_projection(store).map(|(_, projection)| projection)
2413}
2414
2415fn replay_export_assets(
2416 events: &[StoredEvolutionEvent],
2417 genes: Vec<Gene>,
2418 capsules: Vec<Capsule>,
2419) -> Vec<NetworkAsset> {
2420 let mutation_ids = capsules
2421 .iter()
2422 .map(|capsule| capsule.mutation_id.clone())
2423 .collect::<BTreeSet<_>>();
2424 let mut assets = replay_export_events_for_mutations(events, &mutation_ids);
2425 for gene in genes {
2426 assets.push(NetworkAsset::Gene { gene });
2427 }
2428 for capsule in capsules {
2429 assets.push(NetworkAsset::Capsule { capsule });
2430 }
2431 assets
2432}
2433
2434fn replay_export_events_for_mutations(
2435 events: &[StoredEvolutionEvent],
2436 mutation_ids: &BTreeSet<String>,
2437) -> Vec<NetworkAsset> {
2438 if mutation_ids.is_empty() {
2439 return Vec::new();
2440 }
2441
2442 let mut assets = Vec::new();
2443 let mut seen_mutations = BTreeSet::new();
2444 let mut seen_spec_links = BTreeSet::new();
2445 for stored in events {
2446 match &stored.event {
2447 EvolutionEvent::MutationDeclared { mutation }
2448 if mutation_ids.contains(mutation.intent.id.as_str())
2449 && seen_mutations.insert(mutation.intent.id.clone()) =>
2450 {
2451 assets.push(NetworkAsset::EvolutionEvent {
2452 event: EvolutionEvent::MutationDeclared {
2453 mutation: mutation.clone(),
2454 },
2455 });
2456 }
2457 EvolutionEvent::SpecLinked {
2458 mutation_id,
2459 spec_id,
2460 } if mutation_ids.contains(mutation_id.as_str())
2461 && seen_spec_links.insert((mutation_id.clone(), spec_id.clone())) =>
2462 {
2463 assets.push(NetworkAsset::EvolutionEvent {
2464 event: EvolutionEvent::SpecLinked {
2465 mutation_id: mutation_id.clone(),
2466 spec_id: spec_id.clone(),
2467 },
2468 });
2469 }
2470 _ => {}
2471 }
2472 }
2473
2474 assets
2475}
2476
2477fn import_remote_envelope_into_store(
2478 store: &dyn EvolutionStore,
2479 envelope: &EvolutionEnvelope,
2480 remote_publishers: Option<&Mutex<BTreeMap<String, String>>>,
2481) -> Result<ImportOutcome, EvoKernelError> {
2482 if !envelope.verify_content_hash() {
2483 return Err(EvoKernelError::Validation(
2484 "invalid evolution envelope hash".into(),
2485 ));
2486 }
2487
2488 let sender_id = normalized_sender_id(&envelope.sender_id);
2489 let (events, projection) = scan_projection(store)?;
2490 let mut known_gene_ids = projection
2491 .genes
2492 .into_iter()
2493 .map(|gene| gene.id)
2494 .collect::<BTreeSet<_>>();
2495 let mut known_capsule_ids = projection
2496 .capsules
2497 .into_iter()
2498 .map(|capsule| capsule.id)
2499 .collect::<BTreeSet<_>>();
2500 let mut known_mutation_ids = BTreeSet::new();
2501 let mut known_spec_links = BTreeSet::new();
2502 for stored in &events {
2503 match &stored.event {
2504 EvolutionEvent::MutationDeclared { mutation } => {
2505 known_mutation_ids.insert(mutation.intent.id.clone());
2506 }
2507 EvolutionEvent::SpecLinked {
2508 mutation_id,
2509 spec_id,
2510 } => {
2511 known_spec_links.insert((mutation_id.clone(), spec_id.clone()));
2512 }
2513 _ => {}
2514 }
2515 }
2516 let mut imported_asset_ids = Vec::new();
2517 for asset in &envelope.assets {
2518 match asset {
2519 NetworkAsset::Gene { gene } => {
2520 if !known_gene_ids.insert(gene.id.clone()) {
2521 continue;
2522 }
2523 imported_asset_ids.push(gene.id.clone());
2524 let mut quarantined_gene = gene.clone();
2525 quarantined_gene.state = AssetState::Quarantined;
2526 store
2527 .append_event(EvolutionEvent::RemoteAssetImported {
2528 source: CandidateSource::Remote,
2529 asset_ids: vec![gene.id.clone()],
2530 sender_id: sender_id.clone(),
2531 })
2532 .map_err(store_err)?;
2533 store
2534 .append_event(EvolutionEvent::GeneProjected {
2535 gene: quarantined_gene.clone(),
2536 })
2537 .map_err(store_err)?;
2538 record_remote_publisher_for_asset(remote_publishers, &envelope.sender_id, asset);
2539 store
2540 .append_event(EvolutionEvent::PromotionEvaluated {
2541 gene_id: quarantined_gene.id,
2542 state: AssetState::Quarantined,
2543 reason: "remote asset requires local validation before promotion".into(),
2544 })
2545 .map_err(store_err)?;
2546 }
2547 NetworkAsset::Capsule { capsule } => {
2548 if !known_capsule_ids.insert(capsule.id.clone()) {
2549 continue;
2550 }
2551 imported_asset_ids.push(capsule.id.clone());
2552 store
2553 .append_event(EvolutionEvent::RemoteAssetImported {
2554 source: CandidateSource::Remote,
2555 asset_ids: vec![capsule.id.clone()],
2556 sender_id: sender_id.clone(),
2557 })
2558 .map_err(store_err)?;
2559 let mut quarantined = capsule.clone();
2560 quarantined.state = AssetState::Quarantined;
2561 store
2562 .append_event(EvolutionEvent::CapsuleCommitted {
2563 capsule: quarantined.clone(),
2564 })
2565 .map_err(store_err)?;
2566 record_remote_publisher_for_asset(remote_publishers, &envelope.sender_id, asset);
2567 store
2568 .append_event(EvolutionEvent::CapsuleQuarantined {
2569 capsule_id: quarantined.id,
2570 })
2571 .map_err(store_err)?;
2572 }
2573 NetworkAsset::EvolutionEvent { event } => {
2574 let should_append = match event {
2575 EvolutionEvent::MutationDeclared { mutation } => {
2576 known_mutation_ids.insert(mutation.intent.id.clone())
2577 }
2578 EvolutionEvent::SpecLinked {
2579 mutation_id,
2580 spec_id,
2581 } => known_spec_links.insert((mutation_id.clone(), spec_id.clone())),
2582 _ if should_import_remote_event(event) => true,
2583 _ => false,
2584 };
2585 if should_append {
2586 store.append_event(event.clone()).map_err(store_err)?;
2587 }
2588 }
2589 }
2590 }
2591
2592 Ok(ImportOutcome {
2593 imported_asset_ids,
2594 accepted: true,
2595 })
2596}
2597
2598fn normalized_sender_id(sender_id: &str) -> Option<String> {
2599 let trimmed = sender_id.trim();
2600 if trimmed.is_empty() {
2601 None
2602 } else {
2603 Some(trimmed.to_string())
2604 }
2605}
2606
2607fn record_remote_publisher_for_asset(
2608 remote_publishers: Option<&Mutex<BTreeMap<String, String>>>,
2609 sender_id: &str,
2610 asset: &NetworkAsset,
2611) {
2612 let Some(remote_publishers) = remote_publishers else {
2613 return;
2614 };
2615 let sender_id = sender_id.trim();
2616 if sender_id.is_empty() {
2617 return;
2618 }
2619 let Ok(mut publishers) = remote_publishers.lock() else {
2620 return;
2621 };
2622 match asset {
2623 NetworkAsset::Gene { gene } => {
2624 publishers.insert(gene.id.clone(), sender_id.to_string());
2625 }
2626 NetworkAsset::Capsule { capsule } => {
2627 publishers.insert(capsule.id.clone(), sender_id.to_string());
2628 }
2629 NetworkAsset::EvolutionEvent { .. } => {}
2630 }
2631}
2632
2633fn remote_publishers_by_asset_from_store(store: &dyn EvolutionStore) -> BTreeMap<String, String> {
2634 let Ok(events) = store.scan(1) else {
2635 return BTreeMap::new();
2636 };
2637 remote_publishers_by_asset_from_events(&events)
2638}
2639
2640fn remote_publishers_by_asset_from_events(
2641 events: &[StoredEvolutionEvent],
2642) -> BTreeMap<String, String> {
2643 let mut imported_asset_publishers = BTreeMap::<String, String>::new();
2644 let mut known_gene_ids = BTreeSet::<String>::new();
2645 let mut known_capsule_ids = BTreeSet::<String>::new();
2646 let mut publishers_by_asset = BTreeMap::<String, String>::new();
2647
2648 for stored in events {
2649 match &stored.event {
2650 EvolutionEvent::RemoteAssetImported {
2651 source: CandidateSource::Remote,
2652 asset_ids,
2653 sender_id,
2654 } => {
2655 let Some(sender_id) = sender_id.as_deref().and_then(normalized_sender_id) else {
2656 continue;
2657 };
2658 for asset_id in asset_ids {
2659 imported_asset_publishers.insert(asset_id.clone(), sender_id.clone());
2660 if known_gene_ids.contains(asset_id) || known_capsule_ids.contains(asset_id) {
2661 publishers_by_asset.insert(asset_id.clone(), sender_id.clone());
2662 }
2663 }
2664 }
2665 EvolutionEvent::GeneProjected { gene } => {
2666 known_gene_ids.insert(gene.id.clone());
2667 if let Some(sender_id) = imported_asset_publishers.get(&gene.id) {
2668 publishers_by_asset.insert(gene.id.clone(), sender_id.clone());
2669 }
2670 }
2671 EvolutionEvent::CapsuleCommitted { capsule } => {
2672 known_capsule_ids.insert(capsule.id.clone());
2673 if let Some(sender_id) = imported_asset_publishers.get(&capsule.id) {
2674 publishers_by_asset.insert(capsule.id.clone(), sender_id.clone());
2675 }
2676 }
2677 _ => {}
2678 }
2679 }
2680
2681 publishers_by_asset
2682}
2683
2684fn should_import_remote_event(event: &EvolutionEvent) -> bool {
2685 matches!(
2686 event,
2687 EvolutionEvent::MutationDeclared { .. } | EvolutionEvent::SpecLinked { .. }
2688 )
2689}
2690
2691fn fetch_assets_from_store(
2692 store: &dyn EvolutionStore,
2693 responder_id: impl Into<String>,
2694 query: &FetchQuery,
2695) -> Result<FetchResponse, EvoKernelError> {
2696 let (events, projection) = scan_projection(store)?;
2697 let normalized_signals: Vec<String> = query
2698 .signals
2699 .iter()
2700 .map(|signal| signal.trim().to_ascii_lowercase())
2701 .filter(|signal| !signal.is_empty())
2702 .collect();
2703 let matches_any_signal = |candidate: &str| {
2704 if normalized_signals.is_empty() {
2705 return true;
2706 }
2707 let candidate = candidate.to_ascii_lowercase();
2708 normalized_signals
2709 .iter()
2710 .any(|signal| candidate.contains(signal) || signal.contains(&candidate))
2711 };
2712
2713 let matched_genes: Vec<Gene> = projection
2714 .genes
2715 .into_iter()
2716 .filter(|gene| gene.state == AssetState::Promoted)
2717 .filter(|gene| gene.signals.iter().any(|signal| matches_any_signal(signal)))
2718 .collect();
2719 let matched_gene_ids: BTreeSet<String> =
2720 matched_genes.iter().map(|gene| gene.id.clone()).collect();
2721 let matched_capsules: Vec<Capsule> = projection
2722 .capsules
2723 .into_iter()
2724 .filter(|capsule| capsule.state == AssetState::Promoted)
2725 .filter(|capsule| matched_gene_ids.contains(&capsule.gene_id))
2726 .collect();
2727
2728 let assets = replay_export_assets(&events, matched_genes, matched_capsules);
2729
2730 Ok(FetchResponse {
2731 sender_id: responder_id.into(),
2732 assets,
2733 })
2734}
2735
2736fn revoke_assets_in_store(
2737 store: &dyn EvolutionStore,
2738 notice: &RevokeNotice,
2739) -> Result<RevokeNotice, EvoKernelError> {
2740 let projection = projection_snapshot(store)?;
2741 let requested: BTreeSet<String> = notice
2742 .asset_ids
2743 .iter()
2744 .map(|asset_id| asset_id.trim().to_string())
2745 .filter(|asset_id| !asset_id.is_empty())
2746 .collect();
2747 let mut revoked_gene_ids = BTreeSet::new();
2748 let mut quarantined_capsule_ids = BTreeSet::new();
2749
2750 for gene in &projection.genes {
2751 if requested.contains(&gene.id) {
2752 revoked_gene_ids.insert(gene.id.clone());
2753 }
2754 }
2755 for capsule in &projection.capsules {
2756 if requested.contains(&capsule.id) {
2757 quarantined_capsule_ids.insert(capsule.id.clone());
2758 revoked_gene_ids.insert(capsule.gene_id.clone());
2759 }
2760 }
2761 for capsule in &projection.capsules {
2762 if revoked_gene_ids.contains(&capsule.gene_id) {
2763 quarantined_capsule_ids.insert(capsule.id.clone());
2764 }
2765 }
2766
2767 for gene_id in &revoked_gene_ids {
2768 store
2769 .append_event(EvolutionEvent::GeneRevoked {
2770 gene_id: gene_id.clone(),
2771 reason: notice.reason.clone(),
2772 })
2773 .map_err(store_err)?;
2774 }
2775 for capsule_id in &quarantined_capsule_ids {
2776 store
2777 .append_event(EvolutionEvent::CapsuleQuarantined {
2778 capsule_id: capsule_id.clone(),
2779 })
2780 .map_err(store_err)?;
2781 }
2782
2783 let mut affected_ids: Vec<String> = revoked_gene_ids.into_iter().collect();
2784 affected_ids.extend(quarantined_capsule_ids);
2785 affected_ids.sort();
2786 affected_ids.dedup();
2787
2788 Ok(RevokeNotice {
2789 sender_id: notice.sender_id.clone(),
2790 asset_ids: affected_ids,
2791 reason: notice.reason.clone(),
2792 })
2793}
2794
2795fn evolution_metrics_snapshot(
2796 store: &dyn EvolutionStore,
2797) -> Result<EvolutionMetricsSnapshot, EvoKernelError> {
2798 let (events, projection) = scan_projection(store)?;
2799 let replay_success_total = events
2800 .iter()
2801 .filter(|stored| matches!(stored.event, EvolutionEvent::CapsuleReused { .. }))
2802 .count() as u64;
2803 let replay_failures_total = events
2804 .iter()
2805 .filter(|stored| is_replay_validation_failure(&stored.event))
2806 .count() as u64;
2807 let replay_attempts_total = replay_success_total + replay_failures_total;
2808 let mutation_declared_total = events
2809 .iter()
2810 .filter(|stored| matches!(stored.event, EvolutionEvent::MutationDeclared { .. }))
2811 .count() as u64;
2812 let promoted_mutations_total = events
2813 .iter()
2814 .filter(|stored| matches!(stored.event, EvolutionEvent::GenePromoted { .. }))
2815 .count() as u64;
2816 let gene_revocations_total = events
2817 .iter()
2818 .filter(|stored| matches!(stored.event, EvolutionEvent::GeneRevoked { .. }))
2819 .count() as u64;
2820 let cutoff = Utc::now() - Duration::hours(1);
2821 let mutation_velocity_last_hour = count_recent_events(&events, cutoff, |event| {
2822 matches!(event, EvolutionEvent::MutationDeclared { .. })
2823 });
2824 let revoke_frequency_last_hour = count_recent_events(&events, cutoff, |event| {
2825 matches!(event, EvolutionEvent::GeneRevoked { .. })
2826 });
2827 let promoted_genes = projection
2828 .genes
2829 .iter()
2830 .filter(|gene| gene.state == AssetState::Promoted)
2831 .count() as u64;
2832 let promoted_capsules = projection
2833 .capsules
2834 .iter()
2835 .filter(|capsule| capsule.state == AssetState::Promoted)
2836 .count() as u64;
2837
2838 Ok(EvolutionMetricsSnapshot {
2839 replay_attempts_total,
2840 replay_success_total,
2841 replay_success_rate: safe_ratio(replay_success_total, replay_attempts_total),
2842 mutation_declared_total,
2843 promoted_mutations_total,
2844 promotion_ratio: safe_ratio(promoted_mutations_total, mutation_declared_total),
2845 gene_revocations_total,
2846 mutation_velocity_last_hour,
2847 revoke_frequency_last_hour,
2848 promoted_genes,
2849 promoted_capsules,
2850 last_event_seq: events.last().map(|stored| stored.seq).unwrap_or(0),
2851 })
2852}
2853
2854fn evolution_health_snapshot(snapshot: &EvolutionMetricsSnapshot) -> EvolutionHealthSnapshot {
2855 EvolutionHealthSnapshot {
2856 status: "ok".into(),
2857 last_event_seq: snapshot.last_event_seq,
2858 promoted_genes: snapshot.promoted_genes,
2859 promoted_capsules: snapshot.promoted_capsules,
2860 }
2861}
2862
2863fn render_evolution_metrics_prometheus(
2864 snapshot: &EvolutionMetricsSnapshot,
2865 health: &EvolutionHealthSnapshot,
2866) -> String {
2867 let mut out = String::new();
2868 out.push_str(
2869 "# HELP oris_evolution_replay_attempts_total Total replay attempts that reached validation.\n",
2870 );
2871 out.push_str("# TYPE oris_evolution_replay_attempts_total counter\n");
2872 out.push_str(&format!(
2873 "oris_evolution_replay_attempts_total {}\n",
2874 snapshot.replay_attempts_total
2875 ));
2876 out.push_str("# HELP oris_evolution_replay_success_total Total replay attempts that reused a capsule successfully.\n");
2877 out.push_str("# TYPE oris_evolution_replay_success_total counter\n");
2878 out.push_str(&format!(
2879 "oris_evolution_replay_success_total {}\n",
2880 snapshot.replay_success_total
2881 ));
2882 out.push_str("# HELP oris_evolution_replay_success_rate Successful replay attempts divided by replay attempts that reached validation.\n");
2883 out.push_str("# TYPE oris_evolution_replay_success_rate gauge\n");
2884 out.push_str(&format!(
2885 "oris_evolution_replay_success_rate {:.6}\n",
2886 snapshot.replay_success_rate
2887 ));
2888 out.push_str(
2889 "# HELP oris_evolution_mutation_declared_total Total declared mutations recorded in the evolution log.\n",
2890 );
2891 out.push_str("# TYPE oris_evolution_mutation_declared_total counter\n");
2892 out.push_str(&format!(
2893 "oris_evolution_mutation_declared_total {}\n",
2894 snapshot.mutation_declared_total
2895 ));
2896 out.push_str("# HELP oris_evolution_promoted_mutations_total Total mutations promoted by the governor.\n");
2897 out.push_str("# TYPE oris_evolution_promoted_mutations_total counter\n");
2898 out.push_str(&format!(
2899 "oris_evolution_promoted_mutations_total {}\n",
2900 snapshot.promoted_mutations_total
2901 ));
2902 out.push_str(
2903 "# HELP oris_evolution_promotion_ratio Promoted mutations divided by declared mutations.\n",
2904 );
2905 out.push_str("# TYPE oris_evolution_promotion_ratio gauge\n");
2906 out.push_str(&format!(
2907 "oris_evolution_promotion_ratio {:.6}\n",
2908 snapshot.promotion_ratio
2909 ));
2910 out.push_str("# HELP oris_evolution_gene_revocations_total Total gene revocations recorded in the evolution log.\n");
2911 out.push_str("# TYPE oris_evolution_gene_revocations_total counter\n");
2912 out.push_str(&format!(
2913 "oris_evolution_gene_revocations_total {}\n",
2914 snapshot.gene_revocations_total
2915 ));
2916 out.push_str("# HELP oris_evolution_mutation_velocity_last_hour Declared mutations observed in the last hour.\n");
2917 out.push_str("# TYPE oris_evolution_mutation_velocity_last_hour gauge\n");
2918 out.push_str(&format!(
2919 "oris_evolution_mutation_velocity_last_hour {}\n",
2920 snapshot.mutation_velocity_last_hour
2921 ));
2922 out.push_str("# HELP oris_evolution_revoke_frequency_last_hour Gene revocations observed in the last hour.\n");
2923 out.push_str("# TYPE oris_evolution_revoke_frequency_last_hour gauge\n");
2924 out.push_str(&format!(
2925 "oris_evolution_revoke_frequency_last_hour {}\n",
2926 snapshot.revoke_frequency_last_hour
2927 ));
2928 out.push_str("# HELP oris_evolution_promoted_genes Current promoted genes in the evolution projection.\n");
2929 out.push_str("# TYPE oris_evolution_promoted_genes gauge\n");
2930 out.push_str(&format!(
2931 "oris_evolution_promoted_genes {}\n",
2932 snapshot.promoted_genes
2933 ));
2934 out.push_str("# HELP oris_evolution_promoted_capsules Current promoted capsules in the evolution projection.\n");
2935 out.push_str("# TYPE oris_evolution_promoted_capsules gauge\n");
2936 out.push_str(&format!(
2937 "oris_evolution_promoted_capsules {}\n",
2938 snapshot.promoted_capsules
2939 ));
2940 out.push_str("# HELP oris_evolution_store_last_event_seq Last visible append-only evolution event sequence.\n");
2941 out.push_str("# TYPE oris_evolution_store_last_event_seq gauge\n");
2942 out.push_str(&format!(
2943 "oris_evolution_store_last_event_seq {}\n",
2944 snapshot.last_event_seq
2945 ));
2946 out.push_str(
2947 "# HELP oris_evolution_health Evolution observability store health (1 = healthy).\n",
2948 );
2949 out.push_str("# TYPE oris_evolution_health gauge\n");
2950 out.push_str(&format!(
2951 "oris_evolution_health {}\n",
2952 u8::from(health.status == "ok")
2953 ));
2954 out
2955}
2956
2957fn count_recent_events(
2958 events: &[StoredEvolutionEvent],
2959 cutoff: DateTime<Utc>,
2960 predicate: impl Fn(&EvolutionEvent) -> bool,
2961) -> u64 {
2962 events
2963 .iter()
2964 .filter(|stored| {
2965 predicate(&stored.event)
2966 && parse_event_timestamp(&stored.timestamp)
2967 .map(|timestamp| timestamp >= cutoff)
2968 .unwrap_or(false)
2969 })
2970 .count() as u64
2971}
2972
2973fn parse_event_timestamp(raw: &str) -> Option<DateTime<Utc>> {
2974 DateTime::parse_from_rfc3339(raw)
2975 .ok()
2976 .map(|parsed| parsed.with_timezone(&Utc))
2977}
2978
2979fn is_replay_validation_failure(event: &EvolutionEvent) -> bool {
2980 matches!(
2981 event,
2982 EvolutionEvent::ValidationFailed {
2983 gene_id: Some(_),
2984 ..
2985 }
2986 )
2987}
2988
2989fn safe_ratio(numerator: u64, denominator: u64) -> f64 {
2990 if denominator == 0 {
2991 0.0
2992 } else {
2993 numerator as f64 / denominator as f64
2994 }
2995}
2996
2997fn store_err(err: EvolutionError) -> EvoKernelError {
2998 EvoKernelError::Store(err.to_string())
2999}
3000
3001#[cfg(test)]
3002mod tests {
3003 use super::*;
3004 use oris_agent_contract::{
3005 AgentRole, CoordinationPlan, CoordinationPrimitive, CoordinationTask,
3006 };
3007 use oris_kernel::{
3008 AllowAllPolicy, InMemoryEventStore, KernelMode, KernelState, NoopActionExecutor,
3009 NoopStepFn, StateUpdatedOnlyReducer,
3010 };
3011 use serde::{Deserialize, Serialize};
3012
3013 #[derive(Clone, Debug, Default, Serialize, Deserialize)]
3014 struct TestState;
3015
3016 impl KernelState for TestState {
3017 fn version(&self) -> u32 {
3018 1
3019 }
3020 }
3021
3022 fn temp_workspace(name: &str) -> std::path::PathBuf {
3023 let root =
3024 std::env::temp_dir().join(format!("oris-evokernel-{name}-{}", std::process::id()));
3025 if root.exists() {
3026 fs::remove_dir_all(&root).unwrap();
3027 }
3028 fs::create_dir_all(root.join("src")).unwrap();
3029 fs::write(
3030 root.join("Cargo.toml"),
3031 "[package]\nname = \"sample\"\nversion = \"0.1.0\"\nedition = \"2021\"\n",
3032 )
3033 .unwrap();
3034 fs::write(root.join("Cargo.lock"), "# lock\n").unwrap();
3035 fs::write(root.join("src/lib.rs"), "pub fn demo() -> usize { 1 }\n").unwrap();
3036 root
3037 }
3038
3039 fn test_kernel() -> Arc<Kernel<TestState>> {
3040 Arc::new(Kernel::<TestState> {
3041 events: Box::new(InMemoryEventStore::new()),
3042 snaps: None,
3043 reducer: Box::new(StateUpdatedOnlyReducer),
3044 exec: Box::new(NoopActionExecutor),
3045 step: Box::new(NoopStepFn),
3046 policy: Box::new(AllowAllPolicy),
3047 effect_sink: None,
3048 mode: KernelMode::Normal,
3049 })
3050 }
3051
3052 fn lightweight_plan() -> ValidationPlan {
3053 ValidationPlan {
3054 profile: "test".into(),
3055 stages: vec![ValidationStage::Command {
3056 program: "git".into(),
3057 args: vec!["--version".into()],
3058 timeout_ms: 5_000,
3059 }],
3060 }
3061 }
3062
3063 fn sample_mutation() -> PreparedMutation {
3064 prepare_mutation(
3065 MutationIntent {
3066 id: "mutation-1".into(),
3067 intent: "add README".into(),
3068 target: MutationTarget::Paths {
3069 allow: vec!["README.md".into()],
3070 },
3071 expected_effect: "repo still builds".into(),
3072 risk: RiskLevel::Low,
3073 signals: vec!["missing readme".into()],
3074 spec_id: None,
3075 },
3076 "\
3077diff --git a/README.md b/README.md
3078new file mode 100644
3079index 0000000..1111111
3080--- /dev/null
3081+++ b/README.md
3082@@ -0,0 +1 @@
3083+# sample
3084"
3085 .into(),
3086 Some("HEAD".into()),
3087 )
3088 }
3089
3090 fn base_sandbox_policy() -> SandboxPolicy {
3091 SandboxPolicy {
3092 allowed_programs: vec!["git".into()],
3093 max_duration_ms: 60_000,
3094 max_output_bytes: 1024 * 1024,
3095 denied_env_prefixes: Vec::new(),
3096 }
3097 }
3098
3099 fn command_validator() -> Arc<dyn Validator> {
3100 Arc::new(CommandValidator::new(base_sandbox_policy()))
3101 }
3102
3103 fn replay_input(signal: &str) -> SelectorInput {
3104 let rustc_version = std::process::Command::new("rustc")
3105 .arg("--version")
3106 .output()
3107 .ok()
3108 .filter(|output| output.status.success())
3109 .map(|output| String::from_utf8_lossy(&output.stdout).trim().to_string())
3110 .unwrap_or_else(|| "rustc unknown".into());
3111 SelectorInput {
3112 signals: vec![signal.into()],
3113 env: EnvFingerprint {
3114 rustc_version,
3115 cargo_lock_hash: compute_artifact_hash("# lock\n"),
3116 target_triple: format!(
3117 "{}-unknown-{}",
3118 std::env::consts::ARCH,
3119 std::env::consts::OS
3120 ),
3121 os: std::env::consts::OS.into(),
3122 },
3123 spec_id: None,
3124 limit: 1,
3125 }
3126 }
3127
3128 fn build_test_evo_with_store(
3129 name: &str,
3130 run_id: &str,
3131 validator: Arc<dyn Validator>,
3132 store: Arc<dyn EvolutionStore>,
3133 ) -> EvoKernel<TestState> {
3134 let workspace = temp_workspace(name);
3135 let sandbox: Arc<dyn Sandbox> = Arc::new(oris_sandbox::LocalProcessSandbox::new(
3136 run_id,
3137 &workspace,
3138 std::env::temp_dir(),
3139 ));
3140 EvoKernel::new(test_kernel(), sandbox, validator, store)
3141 .with_governor(Arc::new(DefaultGovernor::new(
3142 oris_governor::GovernorConfig {
3143 promote_after_successes: 1,
3144 ..Default::default()
3145 },
3146 )))
3147 .with_validation_plan(lightweight_plan())
3148 .with_sandbox_policy(base_sandbox_policy())
3149 }
3150
3151 fn build_test_evo(
3152 name: &str,
3153 run_id: &str,
3154 validator: Arc<dyn Validator>,
3155 ) -> (EvoKernel<TestState>, Arc<dyn EvolutionStore>) {
3156 let store_root = std::env::temp_dir().join(format!(
3157 "oris-evokernel-{name}-store-{}",
3158 std::process::id()
3159 ));
3160 if store_root.exists() {
3161 fs::remove_dir_all(&store_root).unwrap();
3162 }
3163 let store: Arc<dyn EvolutionStore> =
3164 Arc::new(oris_evolution::JsonlEvolutionStore::new(&store_root));
3165 let evo = build_test_evo_with_store(name, run_id, validator, store.clone());
3166 (evo, store)
3167 }
3168
3169 fn remote_publish_envelope(
3170 sender_id: &str,
3171 run_id: &str,
3172 gene_id: &str,
3173 capsule_id: &str,
3174 mutation_id: &str,
3175 signal: &str,
3176 file_name: &str,
3177 line: &str,
3178 ) -> EvolutionEnvelope {
3179 remote_publish_envelope_with_env(
3180 sender_id,
3181 run_id,
3182 gene_id,
3183 capsule_id,
3184 mutation_id,
3185 signal,
3186 file_name,
3187 line,
3188 replay_input(signal).env,
3189 )
3190 }
3191
3192 fn remote_publish_envelope_with_env(
3193 sender_id: &str,
3194 run_id: &str,
3195 gene_id: &str,
3196 capsule_id: &str,
3197 mutation_id: &str,
3198 signal: &str,
3199 file_name: &str,
3200 line: &str,
3201 env: EnvFingerprint,
3202 ) -> EvolutionEnvelope {
3203 let mutation = prepare_mutation(
3204 MutationIntent {
3205 id: mutation_id.into(),
3206 intent: format!("add {file_name}"),
3207 target: MutationTarget::Paths {
3208 allow: vec![file_name.into()],
3209 },
3210 expected_effect: "replay should still validate".into(),
3211 risk: RiskLevel::Low,
3212 signals: vec![signal.into()],
3213 spec_id: None,
3214 },
3215 format!(
3216 "\
3217diff --git a/{file_name} b/{file_name}
3218new file mode 100644
3219index 0000000..1111111
3220--- /dev/null
3221+++ b/{file_name}
3222@@ -0,0 +1 @@
3223+{line}
3224"
3225 ),
3226 Some("HEAD".into()),
3227 );
3228 let gene = Gene {
3229 id: gene_id.into(),
3230 signals: vec![signal.into()],
3231 strategy: vec![file_name.into()],
3232 validation: vec!["test".into()],
3233 state: AssetState::Promoted,
3234 };
3235 let capsule = Capsule {
3236 id: capsule_id.into(),
3237 gene_id: gene_id.into(),
3238 mutation_id: mutation_id.into(),
3239 run_id: run_id.into(),
3240 diff_hash: mutation.artifact.content_hash.clone(),
3241 confidence: 0.9,
3242 env,
3243 outcome: Outcome {
3244 success: true,
3245 validation_profile: "test".into(),
3246 validation_duration_ms: 1,
3247 changed_files: vec![file_name.into()],
3248 validator_hash: "validator-hash".into(),
3249 lines_changed: 1,
3250 replay_verified: false,
3251 },
3252 state: AssetState::Promoted,
3253 };
3254 EvolutionEnvelope::publish(
3255 sender_id,
3256 vec![
3257 NetworkAsset::EvolutionEvent {
3258 event: EvolutionEvent::MutationDeclared { mutation },
3259 },
3260 NetworkAsset::Gene { gene: gene.clone() },
3261 NetworkAsset::Capsule {
3262 capsule: capsule.clone(),
3263 },
3264 NetworkAsset::EvolutionEvent {
3265 event: EvolutionEvent::CapsuleReleased {
3266 capsule_id: capsule.id.clone(),
3267 state: AssetState::Promoted,
3268 },
3269 },
3270 ],
3271 )
3272 }
3273
3274 fn remote_publish_envelope_with_signals(
3275 sender_id: &str,
3276 run_id: &str,
3277 gene_id: &str,
3278 capsule_id: &str,
3279 mutation_id: &str,
3280 mutation_signals: Vec<String>,
3281 gene_signals: Vec<String>,
3282 file_name: &str,
3283 line: &str,
3284 env: EnvFingerprint,
3285 ) -> EvolutionEnvelope {
3286 let mutation = prepare_mutation(
3287 MutationIntent {
3288 id: mutation_id.into(),
3289 intent: format!("add {file_name}"),
3290 target: MutationTarget::Paths {
3291 allow: vec![file_name.into()],
3292 },
3293 expected_effect: "replay should still validate".into(),
3294 risk: RiskLevel::Low,
3295 signals: mutation_signals,
3296 spec_id: None,
3297 },
3298 format!(
3299 "\
3300diff --git a/{file_name} b/{file_name}
3301new file mode 100644
3302index 0000000..1111111
3303--- /dev/null
3304+++ b/{file_name}
3305@@ -0,0 +1 @@
3306+{line}
3307"
3308 ),
3309 Some("HEAD".into()),
3310 );
3311 let gene = Gene {
3312 id: gene_id.into(),
3313 signals: gene_signals,
3314 strategy: vec![file_name.into()],
3315 validation: vec!["test".into()],
3316 state: AssetState::Promoted,
3317 };
3318 let capsule = Capsule {
3319 id: capsule_id.into(),
3320 gene_id: gene_id.into(),
3321 mutation_id: mutation_id.into(),
3322 run_id: run_id.into(),
3323 diff_hash: mutation.artifact.content_hash.clone(),
3324 confidence: 0.9,
3325 env,
3326 outcome: Outcome {
3327 success: true,
3328 validation_profile: "test".into(),
3329 validation_duration_ms: 1,
3330 changed_files: vec![file_name.into()],
3331 validator_hash: "validator-hash".into(),
3332 lines_changed: 1,
3333 replay_verified: false,
3334 },
3335 state: AssetState::Promoted,
3336 };
3337 EvolutionEnvelope::publish(
3338 sender_id,
3339 vec![
3340 NetworkAsset::EvolutionEvent {
3341 event: EvolutionEvent::MutationDeclared { mutation },
3342 },
3343 NetworkAsset::Gene { gene: gene.clone() },
3344 NetworkAsset::Capsule {
3345 capsule: capsule.clone(),
3346 },
3347 NetworkAsset::EvolutionEvent {
3348 event: EvolutionEvent::CapsuleReleased {
3349 capsule_id: capsule.id.clone(),
3350 state: AssetState::Promoted,
3351 },
3352 },
3353 ],
3354 )
3355 }
3356
3357 struct FixedValidator {
3358 success: bool,
3359 }
3360
3361 #[async_trait]
3362 impl Validator for FixedValidator {
3363 async fn run(
3364 &self,
3365 _receipt: &SandboxReceipt,
3366 plan: &ValidationPlan,
3367 ) -> Result<ValidationReport, ValidationError> {
3368 Ok(ValidationReport {
3369 success: self.success,
3370 duration_ms: 1,
3371 stages: Vec::new(),
3372 logs: if self.success {
3373 format!("{} ok", plan.profile)
3374 } else {
3375 format!("{} failed", plan.profile)
3376 },
3377 })
3378 }
3379 }
3380
3381 struct FailOnAppendStore {
3382 inner: JsonlEvolutionStore,
3383 fail_on_call: usize,
3384 call_count: Mutex<usize>,
3385 }
3386
3387 impl FailOnAppendStore {
3388 fn new(root_dir: std::path::PathBuf, fail_on_call: usize) -> Self {
3389 Self {
3390 inner: JsonlEvolutionStore::new(root_dir),
3391 fail_on_call,
3392 call_count: Mutex::new(0),
3393 }
3394 }
3395 }
3396
3397 impl EvolutionStore for FailOnAppendStore {
3398 fn append_event(&self, event: EvolutionEvent) -> Result<u64, EvolutionError> {
3399 let mut call_count = self
3400 .call_count
3401 .lock()
3402 .map_err(|_| EvolutionError::Io("test store lock poisoned".into()))?;
3403 *call_count += 1;
3404 if *call_count == self.fail_on_call {
3405 return Err(EvolutionError::Io("injected append failure".into()));
3406 }
3407 self.inner.append_event(event)
3408 }
3409
3410 fn scan(&self, from_seq: u64) -> Result<Vec<StoredEvolutionEvent>, EvolutionError> {
3411 self.inner.scan(from_seq)
3412 }
3413
3414 fn rebuild_projection(&self) -> Result<EvolutionProjection, EvolutionError> {
3415 self.inner.rebuild_projection()
3416 }
3417 }
3418
3419 #[test]
3420 fn coordination_planner_to_coder_handoff_is_deterministic() {
3421 let result = MultiAgentCoordinator::new().coordinate(CoordinationPlan {
3422 root_goal: "ship feature".into(),
3423 primitive: CoordinationPrimitive::Sequential,
3424 tasks: vec![
3425 CoordinationTask {
3426 id: "planner".into(),
3427 role: AgentRole::Planner,
3428 description: "split the work".into(),
3429 depends_on: Vec::new(),
3430 },
3431 CoordinationTask {
3432 id: "coder".into(),
3433 role: AgentRole::Coder,
3434 description: "implement the patch".into(),
3435 depends_on: vec!["planner".into()],
3436 },
3437 ],
3438 timeout_ms: 5_000,
3439 max_retries: 0,
3440 });
3441
3442 assert_eq!(result.completed_tasks, vec!["planner", "coder"]);
3443 assert!(result.failed_tasks.is_empty());
3444 assert!(result.messages.iter().any(|message| {
3445 message.from_role == AgentRole::Planner
3446 && message.to_role == AgentRole::Coder
3447 && message.task_id == "coder"
3448 }));
3449 }
3450
3451 #[test]
3452 fn coordination_repair_runs_only_after_coder_failure() {
3453 let result = MultiAgentCoordinator::new().coordinate(CoordinationPlan {
3454 root_goal: "fix broken implementation".into(),
3455 primitive: CoordinationPrimitive::Sequential,
3456 tasks: vec![
3457 CoordinationTask {
3458 id: "coder".into(),
3459 role: AgentRole::Coder,
3460 description: "force-fail initial implementation".into(),
3461 depends_on: Vec::new(),
3462 },
3463 CoordinationTask {
3464 id: "repair".into(),
3465 role: AgentRole::Repair,
3466 description: "patch the failed implementation".into(),
3467 depends_on: vec!["coder".into()],
3468 },
3469 ],
3470 timeout_ms: 5_000,
3471 max_retries: 0,
3472 });
3473
3474 assert_eq!(result.completed_tasks, vec!["repair"]);
3475 assert_eq!(result.failed_tasks, vec!["coder"]);
3476 assert!(result.messages.iter().any(|message| {
3477 message.from_role == AgentRole::Coder
3478 && message.to_role == AgentRole::Repair
3479 && message.task_id == "repair"
3480 }));
3481 }
3482
3483 #[test]
3484 fn coordination_optimizer_runs_after_successful_implementation_step() {
3485 let result = MultiAgentCoordinator::new().coordinate(CoordinationPlan {
3486 root_goal: "ship optimized patch".into(),
3487 primitive: CoordinationPrimitive::Sequential,
3488 tasks: vec![
3489 CoordinationTask {
3490 id: "coder".into(),
3491 role: AgentRole::Coder,
3492 description: "implement a working patch".into(),
3493 depends_on: Vec::new(),
3494 },
3495 CoordinationTask {
3496 id: "optimizer".into(),
3497 role: AgentRole::Optimizer,
3498 description: "tighten the implementation".into(),
3499 depends_on: vec!["coder".into()],
3500 },
3501 ],
3502 timeout_ms: 5_000,
3503 max_retries: 0,
3504 });
3505
3506 assert_eq!(result.completed_tasks, vec!["coder", "optimizer"]);
3507 assert!(result.failed_tasks.is_empty());
3508 }
3509
3510 #[test]
3511 fn coordination_parallel_waves_preserve_sorted_merge_order() {
3512 let result = MultiAgentCoordinator::new().coordinate(CoordinationPlan {
3513 root_goal: "parallelize safe tasks".into(),
3514 primitive: CoordinationPrimitive::Parallel,
3515 tasks: vec![
3516 CoordinationTask {
3517 id: "z-task".into(),
3518 role: AgentRole::Planner,
3519 description: "analyze z".into(),
3520 depends_on: Vec::new(),
3521 },
3522 CoordinationTask {
3523 id: "a-task".into(),
3524 role: AgentRole::Coder,
3525 description: "implement a".into(),
3526 depends_on: Vec::new(),
3527 },
3528 CoordinationTask {
3529 id: "mid-task".into(),
3530 role: AgentRole::Optimizer,
3531 description: "polish after both".into(),
3532 depends_on: vec!["z-task".into(), "a-task".into()],
3533 },
3534 ],
3535 timeout_ms: 5_000,
3536 max_retries: 0,
3537 });
3538
3539 assert_eq!(result.completed_tasks, vec!["a-task", "z-task", "mid-task"]);
3540 assert!(result.failed_tasks.is_empty());
3541 }
3542
3543 #[test]
3544 fn coordination_retries_stop_at_max_retries() {
3545 let result = MultiAgentCoordinator::new().coordinate(CoordinationPlan {
3546 root_goal: "retry then stop".into(),
3547 primitive: CoordinationPrimitive::Sequential,
3548 tasks: vec![CoordinationTask {
3549 id: "coder".into(),
3550 role: AgentRole::Coder,
3551 description: "force-fail this task".into(),
3552 depends_on: Vec::new(),
3553 }],
3554 timeout_ms: 5_000,
3555 max_retries: 1,
3556 });
3557
3558 assert!(result.completed_tasks.is_empty());
3559 assert_eq!(result.failed_tasks, vec!["coder"]);
3560 assert_eq!(
3561 result
3562 .messages
3563 .iter()
3564 .filter(|message| message.task_id == "coder" && message.content.contains("failed"))
3565 .count(),
3566 2
3567 );
3568 }
3569
3570 #[test]
3571 fn coordination_conditional_mode_skips_downstream_tasks_on_failure() {
3572 let result = MultiAgentCoordinator::new().coordinate(CoordinationPlan {
3573 root_goal: "skip blocked follow-up work".into(),
3574 primitive: CoordinationPrimitive::Conditional,
3575 tasks: vec![
3576 CoordinationTask {
3577 id: "coder".into(),
3578 role: AgentRole::Coder,
3579 description: "force-fail the implementation".into(),
3580 depends_on: Vec::new(),
3581 },
3582 CoordinationTask {
3583 id: "optimizer".into(),
3584 role: AgentRole::Optimizer,
3585 description: "only optimize a successful implementation".into(),
3586 depends_on: vec!["coder".into()],
3587 },
3588 ],
3589 timeout_ms: 5_000,
3590 max_retries: 0,
3591 });
3592
3593 assert!(result.completed_tasks.is_empty());
3594 assert_eq!(result.failed_tasks, vec!["coder"]);
3595 assert!(result.messages.iter().any(|message| {
3596 message.task_id == "optimizer"
3597 && message
3598 .content
3599 .contains("skipped due to failed dependency chain")
3600 }));
3601 assert!(!result
3602 .failed_tasks
3603 .iter()
3604 .any(|task_id| task_id == "optimizer"));
3605 }
3606
3607 #[tokio::test]
3608 async fn command_validator_aggregates_stage_reports() {
3609 let workspace = temp_workspace("validator");
3610 let receipt = SandboxReceipt {
3611 mutation_id: "m".into(),
3612 workdir: workspace,
3613 applied: true,
3614 changed_files: Vec::new(),
3615 patch_hash: "hash".into(),
3616 stdout_log: std::env::temp_dir().join("stdout.log"),
3617 stderr_log: std::env::temp_dir().join("stderr.log"),
3618 };
3619 let validator = CommandValidator::new(SandboxPolicy {
3620 allowed_programs: vec!["git".into()],
3621 max_duration_ms: 1_000,
3622 max_output_bytes: 1024,
3623 denied_env_prefixes: Vec::new(),
3624 });
3625 let report = validator
3626 .run(
3627 &receipt,
3628 &ValidationPlan {
3629 profile: "test".into(),
3630 stages: vec![ValidationStage::Command {
3631 program: "git".into(),
3632 args: vec!["--version".into()],
3633 timeout_ms: 1_000,
3634 }],
3635 },
3636 )
3637 .await
3638 .unwrap();
3639 assert_eq!(report.stages.len(), 1);
3640 }
3641
3642 #[tokio::test]
3643 async fn capture_successful_mutation_appends_capsule() {
3644 let (evo, store) = build_test_evo("capture", "run-1", command_validator());
3645 let capsule = evo
3646 .capture_successful_mutation(&"run-1".into(), sample_mutation())
3647 .await
3648 .unwrap();
3649 let events = store.scan(1).unwrap();
3650 assert!(events
3651 .iter()
3652 .any(|stored| matches!(stored.event, EvolutionEvent::CapsuleCommitted { .. })));
3653 assert!(!capsule.id.is_empty());
3654 }
3655
3656 #[tokio::test]
3657 async fn replay_hit_records_capsule_reused() {
3658 let (evo, store) = build_test_evo("replay", "run-2", command_validator());
3659 let capsule = evo
3660 .capture_successful_mutation(&"run-2".into(), sample_mutation())
3661 .await
3662 .unwrap();
3663 let replay_run_id = "run-replay".to_string();
3664 let decision = evo
3665 .replay_or_fallback_for_run(&replay_run_id, replay_input("missing readme"))
3666 .await
3667 .unwrap();
3668 assert!(decision.used_capsule);
3669 assert_eq!(decision.capsule_id, Some(capsule.id));
3670 assert!(store.scan(1).unwrap().iter().any(|stored| matches!(
3671 &stored.event,
3672 EvolutionEvent::CapsuleReused {
3673 run_id,
3674 replay_run_id: Some(current_replay_run_id),
3675 ..
3676 } if run_id == "run-2" && current_replay_run_id == &replay_run_id
3677 )));
3678 }
3679
3680 #[tokio::test]
3681 async fn legacy_replay_executor_api_preserves_original_capsule_run_id() {
3682 let capture_run_id = "run-legacy-capture".to_string();
3683 let (evo, store) = build_test_evo("replay-legacy", &capture_run_id, command_validator());
3684 let capsule = evo
3685 .capture_successful_mutation(&capture_run_id, sample_mutation())
3686 .await
3687 .unwrap();
3688 let executor = StoreReplayExecutor {
3689 sandbox: evo.sandbox.clone(),
3690 validator: evo.validator.clone(),
3691 store: evo.store.clone(),
3692 selector: evo.selector.clone(),
3693 governor: evo.governor.clone(),
3694 economics: Some(evo.economics.clone()),
3695 remote_publishers: Some(evo.remote_publishers.clone()),
3696 stake_policy: evo.stake_policy.clone(),
3697 };
3698
3699 let decision = executor
3700 .try_replay(
3701 &replay_input("missing readme"),
3702 &evo.sandbox_policy,
3703 &evo.validation_plan,
3704 )
3705 .await
3706 .unwrap();
3707
3708 assert!(decision.used_capsule);
3709 assert_eq!(decision.capsule_id, Some(capsule.id));
3710 assert!(store.scan(1).unwrap().iter().any(|stored| matches!(
3711 &stored.event,
3712 EvolutionEvent::CapsuleReused {
3713 run_id,
3714 replay_run_id: None,
3715 ..
3716 } if run_id == &capture_run_id
3717 )));
3718 }
3719
3720 #[tokio::test]
3721 async fn metrics_snapshot_tracks_replay_promotion_and_revocation_signals() {
3722 let (evo, _) = build_test_evo("metrics", "run-metrics", command_validator());
3723 let capsule = evo
3724 .capture_successful_mutation(&"run-metrics".into(), sample_mutation())
3725 .await
3726 .unwrap();
3727 let decision = evo
3728 .replay_or_fallback(replay_input("missing readme"))
3729 .await
3730 .unwrap();
3731 assert!(decision.used_capsule);
3732
3733 evo.revoke_assets(&RevokeNotice {
3734 sender_id: "node-metrics".into(),
3735 asset_ids: vec![capsule.id.clone()],
3736 reason: "manual test revoke".into(),
3737 })
3738 .unwrap();
3739
3740 let snapshot = evo.metrics_snapshot().unwrap();
3741 assert_eq!(snapshot.replay_attempts_total, 1);
3742 assert_eq!(snapshot.replay_success_total, 1);
3743 assert_eq!(snapshot.replay_success_rate, 1.0);
3744 assert_eq!(snapshot.mutation_declared_total, 1);
3745 assert_eq!(snapshot.promoted_mutations_total, 1);
3746 assert_eq!(snapshot.promotion_ratio, 1.0);
3747 assert_eq!(snapshot.gene_revocations_total, 1);
3748 assert_eq!(snapshot.mutation_velocity_last_hour, 1);
3749 assert_eq!(snapshot.revoke_frequency_last_hour, 1);
3750 assert_eq!(snapshot.promoted_genes, 0);
3751 assert_eq!(snapshot.promoted_capsules, 0);
3752
3753 let rendered = evo.render_metrics_prometheus().unwrap();
3754 assert!(rendered.contains("oris_evolution_replay_success_rate 1.000000"));
3755 assert!(rendered.contains("oris_evolution_promotion_ratio 1.000000"));
3756 assert!(rendered.contains("oris_evolution_revoke_frequency_last_hour 1"));
3757 assert!(rendered.contains("oris_evolution_mutation_velocity_last_hour 1"));
3758 assert!(rendered.contains("oris_evolution_health 1"));
3759 }
3760
3761 #[tokio::test]
3762 async fn remote_replay_prefers_closest_environment_match() {
3763 let (evo, _) = build_test_evo("remote-env", "run-remote-env", command_validator());
3764 let input = replay_input("env-signal");
3765
3766 let envelope_a = remote_publish_envelope_with_env(
3767 "node-a",
3768 "run-remote-a",
3769 "gene-a",
3770 "capsule-a",
3771 "mutation-a",
3772 "env-signal",
3773 "A.md",
3774 "# from a",
3775 input.env.clone(),
3776 );
3777 let envelope_b = remote_publish_envelope_with_env(
3778 "node-b",
3779 "run-remote-b",
3780 "gene-b",
3781 "capsule-b",
3782 "mutation-b",
3783 "env-signal",
3784 "B.md",
3785 "# from b",
3786 EnvFingerprint {
3787 rustc_version: "old-rustc".into(),
3788 cargo_lock_hash: "other-lock".into(),
3789 target_triple: "aarch64-apple-darwin".into(),
3790 os: "linux".into(),
3791 },
3792 );
3793
3794 evo.import_remote_envelope(&envelope_a).unwrap();
3795 evo.import_remote_envelope(&envelope_b).unwrap();
3796
3797 let decision = evo.replay_or_fallback(input).await.unwrap();
3798
3799 assert!(decision.used_capsule);
3800 assert_eq!(decision.capsule_id, Some("capsule-a".into()));
3801 assert!(!decision.fallback_to_planner);
3802 }
3803
3804 #[test]
3805 fn remote_cold_start_scoring_caps_distinct_query_coverage() {
3806 let (evo, _) = build_test_evo("remote-score", "run-remote-score", command_validator());
3807 let input = replay_input("missing readme");
3808
3809 let exact = remote_publish_envelope_with_signals(
3810 "node-exact",
3811 "run-remote-exact",
3812 "gene-exact",
3813 "capsule-exact",
3814 "mutation-exact",
3815 vec!["missing readme".into()],
3816 vec!["missing readme".into()],
3817 "EXACT.md",
3818 "# exact",
3819 input.env.clone(),
3820 );
3821 let overlapping = remote_publish_envelope_with_signals(
3822 "node-overlap",
3823 "run-remote-overlap",
3824 "gene-overlap",
3825 "capsule-overlap",
3826 "mutation-overlap",
3827 vec!["missing readme".into()],
3828 vec!["missing".into(), "readme".into()],
3829 "OVERLAP.md",
3830 "# overlap",
3831 input.env.clone(),
3832 );
3833
3834 evo.import_remote_envelope(&exact).unwrap();
3835 evo.import_remote_envelope(&overlapping).unwrap();
3836
3837 let candidates = quarantined_remote_exact_match_candidates(evo.store.as_ref(), &input);
3838 let exact_candidate = candidates
3839 .iter()
3840 .find(|candidate| candidate.gene.id == "gene-exact")
3841 .unwrap();
3842 let overlap_candidate = candidates
3843 .iter()
3844 .find(|candidate| candidate.gene.id == "gene-overlap")
3845 .unwrap();
3846
3847 assert_eq!(exact_candidate.score, 1.0);
3848 assert_eq!(overlap_candidate.score, 1.0);
3849 assert!(candidates.iter().all(|candidate| candidate.score <= 1.0));
3850 }
3851
3852 #[test]
3853 fn exact_match_candidates_respect_spec_linked_events() {
3854 let (evo, _) = build_test_evo(
3855 "spec-linked-filter",
3856 "run-spec-linked-filter",
3857 command_validator(),
3858 );
3859 let mut input = replay_input("missing readme");
3860 input.spec_id = Some("spec-readme".into());
3861
3862 let mut mutation = sample_mutation();
3863 mutation.intent.id = "mutation-spec-linked".into();
3864 mutation.intent.spec_id = None;
3865 let gene = Gene {
3866 id: "gene-spec-linked".into(),
3867 signals: vec!["missing readme".into()],
3868 strategy: vec!["README.md".into()],
3869 validation: vec!["test".into()],
3870 state: AssetState::Promoted,
3871 };
3872 let capsule = Capsule {
3873 id: "capsule-spec-linked".into(),
3874 gene_id: gene.id.clone(),
3875 mutation_id: mutation.intent.id.clone(),
3876 run_id: "run-spec-linked".into(),
3877 diff_hash: mutation.artifact.content_hash.clone(),
3878 confidence: 0.9,
3879 env: input.env.clone(),
3880 outcome: Outcome {
3881 success: true,
3882 validation_profile: "test".into(),
3883 validation_duration_ms: 1,
3884 changed_files: vec!["README.md".into()],
3885 validator_hash: "validator-hash".into(),
3886 lines_changed: 1,
3887 replay_verified: false,
3888 },
3889 state: AssetState::Promoted,
3890 };
3891
3892 evo.store
3893 .append_event(EvolutionEvent::MutationDeclared { mutation })
3894 .unwrap();
3895 evo.store
3896 .append_event(EvolutionEvent::GeneProjected { gene })
3897 .unwrap();
3898 evo.store
3899 .append_event(EvolutionEvent::CapsuleCommitted { capsule })
3900 .unwrap();
3901 evo.store
3902 .append_event(EvolutionEvent::SpecLinked {
3903 mutation_id: "mutation-spec-linked".into(),
3904 spec_id: "spec-readme".into(),
3905 })
3906 .unwrap();
3907
3908 let candidates = exact_match_candidates(evo.store.as_ref(), &input);
3909 assert_eq!(candidates.len(), 1);
3910 assert_eq!(candidates[0].gene.id, "gene-spec-linked");
3911 }
3912
3913 #[tokio::test]
3914 async fn remote_capsule_stays_quarantined_until_first_successful_replay() {
3915 let (evo, store) = build_test_evo(
3916 "remote-quarantine",
3917 "run-remote-quarantine",
3918 command_validator(),
3919 );
3920 let envelope = remote_publish_envelope(
3921 "node-remote",
3922 "run-remote-quarantine",
3923 "gene-remote",
3924 "capsule-remote",
3925 "mutation-remote",
3926 "remote-signal",
3927 "REMOTE.md",
3928 "# from remote",
3929 );
3930
3931 evo.import_remote_envelope(&envelope).unwrap();
3932
3933 let before_replay = store.rebuild_projection().unwrap();
3934 let imported_gene = before_replay
3935 .genes
3936 .iter()
3937 .find(|gene| gene.id == "gene-remote")
3938 .unwrap();
3939 let imported_capsule = before_replay
3940 .capsules
3941 .iter()
3942 .find(|capsule| capsule.id == "capsule-remote")
3943 .unwrap();
3944 assert_eq!(imported_gene.state, AssetState::Quarantined);
3945 assert_eq!(imported_capsule.state, AssetState::Quarantined);
3946 let exported_before_replay =
3947 export_promoted_assets_from_store(store.as_ref(), "node-local").unwrap();
3948 assert!(exported_before_replay.assets.is_empty());
3949
3950 let decision = evo
3951 .replay_or_fallback(replay_input("remote-signal"))
3952 .await
3953 .unwrap();
3954
3955 assert!(decision.used_capsule);
3956 assert_eq!(decision.capsule_id, Some("capsule-remote".into()));
3957
3958 let after_replay = store.rebuild_projection().unwrap();
3959 let promoted_gene = after_replay
3960 .genes
3961 .iter()
3962 .find(|gene| gene.id == "gene-remote")
3963 .unwrap();
3964 let released_capsule = after_replay
3965 .capsules
3966 .iter()
3967 .find(|capsule| capsule.id == "capsule-remote")
3968 .unwrap();
3969 assert_eq!(promoted_gene.state, AssetState::Promoted);
3970 assert_eq!(released_capsule.state, AssetState::Promoted);
3971 let exported_after_replay =
3972 export_promoted_assets_from_store(store.as_ref(), "node-local").unwrap();
3973 assert_eq!(exported_after_replay.assets.len(), 3);
3974 assert!(exported_after_replay.assets.iter().any(|asset| matches!(
3975 asset,
3976 NetworkAsset::EvolutionEvent {
3977 event: EvolutionEvent::MutationDeclared { .. }
3978 }
3979 )));
3980 }
3981
3982 #[tokio::test]
3983 async fn publish_local_assets_include_mutation_payload_for_remote_replay() {
3984 let (source, source_store) = build_test_evo(
3985 "remote-publish-export",
3986 "run-remote-publish-export",
3987 command_validator(),
3988 );
3989 source
3990 .capture_successful_mutation(&"run-remote-publish-export".into(), sample_mutation())
3991 .await
3992 .unwrap();
3993 let envelope = EvolutionNetworkNode::new(source_store.clone())
3994 .publish_local_assets("node-source")
3995 .unwrap();
3996 assert!(envelope.assets.iter().any(|asset| matches!(
3997 asset,
3998 NetworkAsset::EvolutionEvent {
3999 event: EvolutionEvent::MutationDeclared { mutation }
4000 } if mutation.intent.id == "mutation-1"
4001 )));
4002
4003 let (remote, _) = build_test_evo(
4004 "remote-publish-import",
4005 "run-remote-publish-import",
4006 command_validator(),
4007 );
4008 remote.import_remote_envelope(&envelope).unwrap();
4009
4010 let decision = remote
4011 .replay_or_fallback(replay_input("missing readme"))
4012 .await
4013 .unwrap();
4014
4015 assert!(decision.used_capsule);
4016 assert!(!decision.fallback_to_planner);
4017 }
4018
4019 #[tokio::test]
4020 async fn fetch_assets_include_mutation_payload_for_remote_replay() {
4021 let (evo, store) = build_test_evo(
4022 "remote-fetch-export",
4023 "run-remote-fetch",
4024 command_validator(),
4025 );
4026 evo.capture_successful_mutation(&"run-remote-fetch".into(), sample_mutation())
4027 .await
4028 .unwrap();
4029
4030 let response = EvolutionNetworkNode::new(store.clone())
4031 .fetch_assets(
4032 "node-source",
4033 &FetchQuery {
4034 sender_id: "node-client".into(),
4035 signals: vec!["missing readme".into()],
4036 },
4037 )
4038 .unwrap();
4039
4040 assert!(response.assets.iter().any(|asset| matches!(
4041 asset,
4042 NetworkAsset::EvolutionEvent {
4043 event: EvolutionEvent::MutationDeclared { mutation }
4044 } if mutation.intent.id == "mutation-1"
4045 )));
4046 assert!(response
4047 .assets
4048 .iter()
4049 .any(|asset| matches!(asset, NetworkAsset::Gene { .. })));
4050 assert!(response
4051 .assets
4052 .iter()
4053 .any(|asset| matches!(asset, NetworkAsset::Capsule { .. })));
4054 }
4055
4056 #[test]
4057 fn partial_remote_import_keeps_publisher_for_already_imported_assets() {
4058 let store_root = std::env::temp_dir().join(format!(
4059 "oris-evokernel-remote-partial-store-{}",
4060 std::process::id()
4061 ));
4062 if store_root.exists() {
4063 fs::remove_dir_all(&store_root).unwrap();
4064 }
4065 let store: Arc<dyn EvolutionStore> = Arc::new(FailOnAppendStore::new(store_root, 4));
4066 let evo = build_test_evo_with_store(
4067 "remote-partial",
4068 "run-remote-partial",
4069 command_validator(),
4070 store.clone(),
4071 );
4072 let envelope = remote_publish_envelope(
4073 "node-partial",
4074 "run-remote-partial",
4075 "gene-partial",
4076 "capsule-partial",
4077 "mutation-partial",
4078 "partial-signal",
4079 "PARTIAL.md",
4080 "# partial",
4081 );
4082
4083 let result = evo.import_remote_envelope(&envelope);
4084
4085 assert!(matches!(result, Err(EvoKernelError::Store(_))));
4086 let projection = store.rebuild_projection().unwrap();
4087 assert!(projection
4088 .genes
4089 .iter()
4090 .any(|gene| gene.id == "gene-partial"));
4091 assert!(projection.capsules.is_empty());
4092 let publishers = evo.remote_publishers.lock().unwrap();
4093 assert_eq!(
4094 publishers.get("gene-partial").map(String::as_str),
4095 Some("node-partial")
4096 );
4097 }
4098
4099 #[test]
4100 fn retry_remote_import_after_partial_failure_only_imports_missing_assets() {
4101 let store_root = std::env::temp_dir().join(format!(
4102 "oris-evokernel-remote-partial-retry-store-{}",
4103 next_id("t")
4104 ));
4105 if store_root.exists() {
4106 fs::remove_dir_all(&store_root).unwrap();
4107 }
4108 let store: Arc<dyn EvolutionStore> = Arc::new(FailOnAppendStore::new(store_root, 4));
4109 let evo = build_test_evo_with_store(
4110 "remote-partial-retry",
4111 "run-remote-partial-retry",
4112 command_validator(),
4113 store.clone(),
4114 );
4115 let envelope = remote_publish_envelope(
4116 "node-partial",
4117 "run-remote-partial-retry",
4118 "gene-partial-retry",
4119 "capsule-partial-retry",
4120 "mutation-partial-retry",
4121 "partial-retry-signal",
4122 "PARTIAL_RETRY.md",
4123 "# partial retry",
4124 );
4125
4126 let first = evo.import_remote_envelope(&envelope);
4127 assert!(matches!(first, Err(EvoKernelError::Store(_))));
4128
4129 let retry = evo.import_remote_envelope(&envelope).unwrap();
4130
4131 assert_eq!(retry.imported_asset_ids, vec!["capsule-partial-retry"]);
4132 let projection = store.rebuild_projection().unwrap();
4133 let gene = projection
4134 .genes
4135 .iter()
4136 .find(|gene| gene.id == "gene-partial-retry")
4137 .unwrap();
4138 assert_eq!(gene.state, AssetState::Quarantined);
4139 let capsule = projection
4140 .capsules
4141 .iter()
4142 .find(|capsule| capsule.id == "capsule-partial-retry")
4143 .unwrap();
4144 assert_eq!(capsule.state, AssetState::Quarantined);
4145 assert_eq!(projection.attempt_counts["gene-partial-retry"], 1);
4146
4147 let events = store.scan(1).unwrap();
4148 assert_eq!(
4149 events
4150 .iter()
4151 .filter(|stored| {
4152 matches!(
4153 &stored.event,
4154 EvolutionEvent::MutationDeclared { mutation }
4155 if mutation.intent.id == "mutation-partial-retry"
4156 )
4157 })
4158 .count(),
4159 1
4160 );
4161 assert_eq!(
4162 events
4163 .iter()
4164 .filter(|stored| {
4165 matches!(
4166 &stored.event,
4167 EvolutionEvent::GeneProjected { gene } if gene.id == "gene-partial-retry"
4168 )
4169 })
4170 .count(),
4171 1
4172 );
4173 assert_eq!(
4174 events
4175 .iter()
4176 .filter(|stored| {
4177 matches!(
4178 &stored.event,
4179 EvolutionEvent::CapsuleCommitted { capsule }
4180 if capsule.id == "capsule-partial-retry"
4181 )
4182 })
4183 .count(),
4184 1
4185 );
4186 }
4187
4188 #[tokio::test]
4189 async fn duplicate_remote_import_does_not_requarantine_locally_validated_assets() {
4190 let (evo, store) = build_test_evo(
4191 "remote-idempotent",
4192 "run-remote-idempotent",
4193 command_validator(),
4194 );
4195 let envelope = remote_publish_envelope(
4196 "node-idempotent",
4197 "run-remote-idempotent",
4198 "gene-idempotent",
4199 "capsule-idempotent",
4200 "mutation-idempotent",
4201 "idempotent-signal",
4202 "IDEMPOTENT.md",
4203 "# idempotent",
4204 );
4205
4206 let first = evo.import_remote_envelope(&envelope).unwrap();
4207 assert_eq!(
4208 first.imported_asset_ids,
4209 vec!["gene-idempotent", "capsule-idempotent"]
4210 );
4211
4212 let decision = evo
4213 .replay_or_fallback(replay_input("idempotent-signal"))
4214 .await
4215 .unwrap();
4216 assert!(decision.used_capsule);
4217 assert_eq!(decision.capsule_id, Some("capsule-idempotent".into()));
4218
4219 let projection_before = store.rebuild_projection().unwrap();
4220 let attempts_before = projection_before.attempt_counts["gene-idempotent"];
4221 let gene_before = projection_before
4222 .genes
4223 .iter()
4224 .find(|gene| gene.id == "gene-idempotent")
4225 .unwrap();
4226 assert_eq!(gene_before.state, AssetState::Promoted);
4227 let capsule_before = projection_before
4228 .capsules
4229 .iter()
4230 .find(|capsule| capsule.id == "capsule-idempotent")
4231 .unwrap();
4232 assert_eq!(capsule_before.state, AssetState::Promoted);
4233
4234 let second = evo.import_remote_envelope(&envelope).unwrap();
4235 assert!(second.imported_asset_ids.is_empty());
4236
4237 let projection_after = store.rebuild_projection().unwrap();
4238 assert_eq!(
4239 projection_after.attempt_counts["gene-idempotent"],
4240 attempts_before
4241 );
4242 let gene_after = projection_after
4243 .genes
4244 .iter()
4245 .find(|gene| gene.id == "gene-idempotent")
4246 .unwrap();
4247 assert_eq!(gene_after.state, AssetState::Promoted);
4248 let capsule_after = projection_after
4249 .capsules
4250 .iter()
4251 .find(|capsule| capsule.id == "capsule-idempotent")
4252 .unwrap();
4253 assert_eq!(capsule_after.state, AssetState::Promoted);
4254
4255 let events = store.scan(1).unwrap();
4256 assert_eq!(
4257 events
4258 .iter()
4259 .filter(|stored| {
4260 matches!(
4261 &stored.event,
4262 EvolutionEvent::MutationDeclared { mutation }
4263 if mutation.intent.id == "mutation-idempotent"
4264 )
4265 })
4266 .count(),
4267 1
4268 );
4269 assert_eq!(
4270 events
4271 .iter()
4272 .filter(|stored| {
4273 matches!(
4274 &stored.event,
4275 EvolutionEvent::GeneProjected { gene } if gene.id == "gene-idempotent"
4276 )
4277 })
4278 .count(),
4279 1
4280 );
4281 assert_eq!(
4282 events
4283 .iter()
4284 .filter(|stored| {
4285 matches!(
4286 &stored.event,
4287 EvolutionEvent::CapsuleCommitted { capsule }
4288 if capsule.id == "capsule-idempotent"
4289 )
4290 })
4291 .count(),
4292 1
4293 );
4294 }
4295
4296 #[tokio::test]
4297 async fn insufficient_evu_blocks_publish_but_not_local_replay() {
4298 let (evo, _) = build_test_evo("stake-gate", "run-stake", command_validator());
4299 let capsule = evo
4300 .capture_successful_mutation(&"run-stake".into(), sample_mutation())
4301 .await
4302 .unwrap();
4303 let publish = evo.export_promoted_assets("node-local");
4304 assert!(matches!(publish, Err(EvoKernelError::Validation(_))));
4305
4306 let decision = evo
4307 .replay_or_fallback(replay_input("missing readme"))
4308 .await
4309 .unwrap();
4310 assert!(decision.used_capsule);
4311 assert_eq!(decision.capsule_id, Some(capsule.id));
4312 }
4313
4314 #[tokio::test]
4315 async fn second_replay_validation_failure_revokes_gene_immediately() {
4316 let (capturer, store) = build_test_evo("revoke-replay", "run-capture", command_validator());
4317 let capsule = capturer
4318 .capture_successful_mutation(&"run-capture".into(), sample_mutation())
4319 .await
4320 .unwrap();
4321
4322 let failing_validator: Arc<dyn Validator> = Arc::new(FixedValidator { success: false });
4323 let failing_replay = build_test_evo_with_store(
4324 "revoke-replay",
4325 "run-replay-fail",
4326 failing_validator,
4327 store.clone(),
4328 );
4329
4330 let first = failing_replay
4331 .replay_or_fallback(replay_input("missing readme"))
4332 .await
4333 .unwrap();
4334 let second = failing_replay
4335 .replay_or_fallback(replay_input("missing readme"))
4336 .await
4337 .unwrap();
4338
4339 assert!(!first.used_capsule);
4340 assert!(first.fallback_to_planner);
4341 assert!(!second.used_capsule);
4342 assert!(second.fallback_to_planner);
4343
4344 let projection = store.rebuild_projection().unwrap();
4345 let gene = projection
4346 .genes
4347 .iter()
4348 .find(|gene| gene.id == capsule.gene_id)
4349 .unwrap();
4350 assert_eq!(gene.state, AssetState::Promoted);
4351 let committed_capsule = projection
4352 .capsules
4353 .iter()
4354 .find(|current| current.id == capsule.id)
4355 .unwrap();
4356 assert_eq!(committed_capsule.state, AssetState::Promoted);
4357
4358 let events = store.scan(1).unwrap();
4359 assert_eq!(
4360 events
4361 .iter()
4362 .filter(|stored| {
4363 matches!(
4364 &stored.event,
4365 EvolutionEvent::ValidationFailed {
4366 gene_id: Some(gene_id),
4367 ..
4368 } if gene_id == &capsule.gene_id
4369 )
4370 })
4371 .count(),
4372 1
4373 );
4374 assert!(!events.iter().any(|stored| {
4375 matches!(
4376 &stored.event,
4377 EvolutionEvent::GeneRevoked { gene_id, .. } if gene_id == &capsule.gene_id
4378 )
4379 }));
4380
4381 let recovered = build_test_evo_with_store(
4382 "revoke-replay",
4383 "run-replay-check",
4384 command_validator(),
4385 store.clone(),
4386 );
4387 let after_revoke = recovered
4388 .replay_or_fallback(replay_input("missing readme"))
4389 .await
4390 .unwrap();
4391 assert!(!after_revoke.used_capsule);
4392 assert!(after_revoke.fallback_to_planner);
4393 assert!(after_revoke.reason.contains("below replay threshold"));
4394 }
4395
4396 #[tokio::test]
4397 async fn remote_reuse_success_rewards_publisher_and_biases_selection() {
4398 let ledger = Arc::new(Mutex::new(EvuLedger {
4399 accounts: vec![],
4400 reputations: vec![
4401 oris_economics::ReputationRecord {
4402 node_id: "node-a".into(),
4403 publish_success_rate: 0.4,
4404 validator_accuracy: 0.4,
4405 reuse_impact: 0,
4406 },
4407 oris_economics::ReputationRecord {
4408 node_id: "node-b".into(),
4409 publish_success_rate: 0.95,
4410 validator_accuracy: 0.95,
4411 reuse_impact: 8,
4412 },
4413 ],
4414 }));
4415 let (evo, _) = build_test_evo("remote-success", "run-remote", command_validator());
4416 let evo = evo.with_economics(ledger.clone());
4417
4418 let envelope_a = remote_publish_envelope(
4419 "node-a",
4420 "run-remote-a",
4421 "gene-a",
4422 "capsule-a",
4423 "mutation-a",
4424 "shared-signal",
4425 "A.md",
4426 "# from a",
4427 );
4428 let envelope_b = remote_publish_envelope(
4429 "node-b",
4430 "run-remote-b",
4431 "gene-b",
4432 "capsule-b",
4433 "mutation-b",
4434 "shared-signal",
4435 "B.md",
4436 "# from b",
4437 );
4438
4439 evo.import_remote_envelope(&envelope_a).unwrap();
4440 evo.import_remote_envelope(&envelope_b).unwrap();
4441
4442 let decision = evo
4443 .replay_or_fallback(replay_input("shared-signal"))
4444 .await
4445 .unwrap();
4446
4447 assert!(decision.used_capsule);
4448 assert_eq!(decision.capsule_id, Some("capsule-b".into()));
4449 let locked = ledger.lock().unwrap();
4450 let rewarded = locked
4451 .accounts
4452 .iter()
4453 .find(|item| item.node_id == "node-b")
4454 .unwrap();
4455 assert_eq!(rewarded.balance, evo.stake_policy.reuse_reward);
4456 assert!(
4457 locked.selector_reputation_bias()["node-b"]
4458 > locked.selector_reputation_bias()["node-a"]
4459 );
4460 }
4461
4462 #[tokio::test]
4463 async fn remote_reuse_settlement_tracks_selected_capsule_publisher_for_shared_gene() {
4464 let ledger = Arc::new(Mutex::new(EvuLedger::default()));
4465 let (evo, _) = build_test_evo(
4466 "remote-shared-publisher",
4467 "run-remote-shared-publisher",
4468 command_validator(),
4469 );
4470 let evo = evo.with_economics(ledger.clone());
4471 let input = replay_input("shared-signal");
4472 let preferred = remote_publish_envelope_with_env(
4473 "node-a",
4474 "run-remote-a",
4475 "gene-shared",
4476 "capsule-preferred",
4477 "mutation-preferred",
4478 "shared-signal",
4479 "A.md",
4480 "# from a",
4481 input.env.clone(),
4482 );
4483 let fallback = remote_publish_envelope_with_env(
4484 "node-b",
4485 "run-remote-b",
4486 "gene-shared",
4487 "capsule-fallback",
4488 "mutation-fallback",
4489 "shared-signal",
4490 "B.md",
4491 "# from b",
4492 EnvFingerprint {
4493 rustc_version: "old-rustc".into(),
4494 cargo_lock_hash: "other-lock".into(),
4495 target_triple: "aarch64-apple-darwin".into(),
4496 os: "linux".into(),
4497 },
4498 );
4499
4500 evo.import_remote_envelope(&preferred).unwrap();
4501 evo.import_remote_envelope(&fallback).unwrap();
4502
4503 let decision = evo.replay_or_fallback(input).await.unwrap();
4504
4505 assert!(decision.used_capsule);
4506 assert_eq!(decision.capsule_id, Some("capsule-preferred".into()));
4507 let locked = ledger.lock().unwrap();
4508 let rewarded = locked
4509 .accounts
4510 .iter()
4511 .find(|item| item.node_id == "node-a")
4512 .unwrap();
4513 assert_eq!(rewarded.balance, evo.stake_policy.reuse_reward);
4514 assert!(locked.accounts.iter().all(|item| item.node_id != "node-b"));
4515 }
4516
4517 #[test]
4518 fn select_candidates_surfaces_ranked_remote_cold_start_candidates() {
4519 let ledger = Arc::new(Mutex::new(EvuLedger {
4520 accounts: vec![],
4521 reputations: vec![
4522 oris_economics::ReputationRecord {
4523 node_id: "node-a".into(),
4524 publish_success_rate: 0.4,
4525 validator_accuracy: 0.4,
4526 reuse_impact: 0,
4527 },
4528 oris_economics::ReputationRecord {
4529 node_id: "node-b".into(),
4530 publish_success_rate: 0.95,
4531 validator_accuracy: 0.95,
4532 reuse_impact: 8,
4533 },
4534 ],
4535 }));
4536 let (evo, _) = build_test_evo("remote-select", "run-remote-select", command_validator());
4537 let evo = evo.with_economics(ledger);
4538
4539 let envelope_a = remote_publish_envelope(
4540 "node-a",
4541 "run-remote-a",
4542 "gene-a",
4543 "capsule-a",
4544 "mutation-a",
4545 "shared-signal",
4546 "A.md",
4547 "# from a",
4548 );
4549 let envelope_b = remote_publish_envelope(
4550 "node-b",
4551 "run-remote-b",
4552 "gene-b",
4553 "capsule-b",
4554 "mutation-b",
4555 "shared-signal",
4556 "B.md",
4557 "# from b",
4558 );
4559
4560 evo.import_remote_envelope(&envelope_a).unwrap();
4561 evo.import_remote_envelope(&envelope_b).unwrap();
4562
4563 let candidates = evo.select_candidates(&replay_input("shared-signal"));
4564
4565 assert_eq!(candidates.len(), 1);
4566 assert_eq!(candidates[0].gene.id, "gene-b");
4567 assert_eq!(candidates[0].capsules[0].id, "capsule-b");
4568 }
4569
4570 #[tokio::test]
4571 async fn remote_reuse_publisher_bias_survives_restart() {
4572 let ledger = Arc::new(Mutex::new(EvuLedger {
4573 accounts: vec![],
4574 reputations: vec![
4575 oris_economics::ReputationRecord {
4576 node_id: "node-a".into(),
4577 publish_success_rate: 0.4,
4578 validator_accuracy: 0.4,
4579 reuse_impact: 0,
4580 },
4581 oris_economics::ReputationRecord {
4582 node_id: "node-b".into(),
4583 publish_success_rate: 0.95,
4584 validator_accuracy: 0.95,
4585 reuse_impact: 8,
4586 },
4587 ],
4588 }));
4589 let store_root = std::env::temp_dir().join(format!(
4590 "oris-evokernel-remote-restart-store-{}",
4591 next_id("t")
4592 ));
4593 if store_root.exists() {
4594 fs::remove_dir_all(&store_root).unwrap();
4595 }
4596 let store: Arc<dyn EvolutionStore> =
4597 Arc::new(oris_evolution::JsonlEvolutionStore::new(&store_root));
4598 let evo = build_test_evo_with_store(
4599 "remote-success-restart-source",
4600 "run-remote-restart-source",
4601 command_validator(),
4602 store.clone(),
4603 )
4604 .with_economics(ledger.clone());
4605
4606 let envelope_a = remote_publish_envelope(
4607 "node-a",
4608 "run-remote-a",
4609 "gene-a",
4610 "capsule-a",
4611 "mutation-a",
4612 "shared-signal",
4613 "A.md",
4614 "# from a",
4615 );
4616 let envelope_b = remote_publish_envelope(
4617 "node-b",
4618 "run-remote-b",
4619 "gene-b",
4620 "capsule-b",
4621 "mutation-b",
4622 "shared-signal",
4623 "B.md",
4624 "# from b",
4625 );
4626
4627 evo.import_remote_envelope(&envelope_a).unwrap();
4628 evo.import_remote_envelope(&envelope_b).unwrap();
4629
4630 let recovered = build_test_evo_with_store(
4631 "remote-success-restart-recovered",
4632 "run-remote-restart-recovered",
4633 command_validator(),
4634 store.clone(),
4635 )
4636 .with_economics(ledger.clone());
4637
4638 let decision = recovered
4639 .replay_or_fallback(replay_input("shared-signal"))
4640 .await
4641 .unwrap();
4642
4643 assert!(decision.used_capsule);
4644 assert_eq!(decision.capsule_id, Some("capsule-b".into()));
4645 let locked = ledger.lock().unwrap();
4646 let rewarded = locked
4647 .accounts
4648 .iter()
4649 .find(|item| item.node_id == "node-b")
4650 .unwrap();
4651 assert_eq!(rewarded.balance, recovered.stake_policy.reuse_reward);
4652 }
4653
4654 #[tokio::test]
4655 async fn remote_reuse_failure_penalizes_remote_reputation() {
4656 let ledger = Arc::new(Mutex::new(EvuLedger::default()));
4657 let failing_validator: Arc<dyn Validator> = Arc::new(FixedValidator { success: false });
4658 let (evo, _) = build_test_evo("remote-failure", "run-failure", failing_validator);
4659 let evo = evo.with_economics(ledger.clone());
4660
4661 let envelope = remote_publish_envelope(
4662 "node-remote",
4663 "run-remote-failed",
4664 "gene-remote",
4665 "capsule-remote",
4666 "mutation-remote",
4667 "failure-signal",
4668 "FAILED.md",
4669 "# from remote",
4670 );
4671 evo.import_remote_envelope(&envelope).unwrap();
4672
4673 let decision = evo
4674 .replay_or_fallback(replay_input("failure-signal"))
4675 .await
4676 .unwrap();
4677
4678 assert!(!decision.used_capsule);
4679 assert!(decision.fallback_to_planner);
4680
4681 let signal = evo.economics_signal("node-remote").unwrap();
4682 assert_eq!(signal.available_evu, 0);
4683 assert!(signal.publish_success_rate < 0.5);
4684 assert!(signal.validator_accuracy < 0.5);
4685 }
4686}