use exo_exotic::domain_transfer::EmergentTransferDetector;
use exo_federation::transfer_crdt::{TransferCrdt, TransferPriorSummary};
use exo_manifold::transfer_store::TransferManifold;
use exo_temporal::transfer_timeline::TransferTimeline;
use ruvector_domain_expansion::{
ArmId, ContextBucket, DomainExpansionEngine, DomainId, Solution, TransferPrior,
};
use crate::domain_bridge::{ExoGraphDomain, ExoRetrievalDomain};
#[derive(Debug, Clone)]
pub struct CycleResult {
pub eval_score: f32,
pub emergence_score: f64,
pub mean_improvement: f64,
pub manifold_entries: usize,
pub cycle: u64,
}
pub struct ExoTransferOrchestrator {
engine: DomainExpansionEngine,
src_id: DomainId,
dst_id: DomainId,
manifold: TransferManifold,
timeline: TransferTimeline,
crdt: TransferCrdt,
emergence: EmergentTransferDetector,
cycle: u64,
}
impl ExoTransferOrchestrator {
pub fn new(_node_id: impl Into<String>) -> Self {
let src_id = DomainId("exo-retrieval".to_string());
let dst_id = DomainId("exo-graph".to_string());
let mut engine = DomainExpansionEngine::new();
engine.register_domain(Box::new(ExoRetrievalDomain::new()));
engine.register_domain(Box::new(ExoGraphDomain::new()));
Self {
engine,
src_id,
dst_id,
manifold: TransferManifold::new(),
timeline: TransferTimeline::new(),
crdt: TransferCrdt::new(),
emergence: EmergentTransferDetector::new(),
cycle: 0,
}
}
pub fn run_cycle(&mut self) -> CycleResult {
self.cycle += 1;
let bucket = ContextBucket {
difficulty_tier: "medium".to_string(),
category: "transfer".to_string(),
};
let tasks = self.engine.generate_tasks(&self.src_id, 1, 0.5);
let eval_score = if let Some(task) = tasks.first() {
let arm = self
.engine
.select_arm(&self.src_id, &bucket)
.unwrap_or_else(|| ArmId("approximate".to_string()));
let solution = Solution {
task_id: task.id.clone(),
content: arm.0.clone(),
data: serde_json::json!({ "arm": &arm.0 }),
};
let eval =
self.engine
.evaluate_and_record(&self.src_id, task, &solution, bucket.clone(), arm);
eval.score
} else {
0.5f32
};
self.engine.initiate_transfer(&self.src_id, &self.dst_id);
let prior = TransferPrior::uniform(self.src_id.clone());
let _ = self
.manifold
.store_prior(&self.src_id, &self.dst_id, &prior, self.cycle);
let manifold_entries = self.manifold.len();
let _ = self
.timeline
.record_transfer(&self.src_id, &self.dst_id, self.cycle, eval_score);
self.crdt.publish_prior(
&self.src_id,
&self.dst_id,
eval_score,
eval_score, self.cycle,
);
if self.cycle == 1 {
self.emergence.record_baseline(eval_score as f64);
} else {
self.emergence.record_post_transfer(eval_score as f64);
}
let emergence_score = self.emergence.emergence_score();
let mean_improvement = self.emergence.mean_improvement();
CycleResult {
eval_score,
emergence_score,
mean_improvement,
manifold_entries,
cycle: self.cycle,
}
}
pub fn cycle(&self) -> u64 {
self.cycle
}
pub fn best_prior(&self) -> Option<&TransferPriorSummary> {
self.crdt.best_prior_for(&self.src_id, &self.dst_id)
}
#[cfg(feature = "rvf")]
pub fn package_as_rvf(&self) -> Vec<u8> {
use ruvector_domain_expansion::rvf_bridge;
let priors: Vec<_> = [&self.src_id, &self.dst_id]
.iter()
.filter_map(|id| self.engine.thompson.extract_prior(id))
.collect();
let kernels: Vec<_> = self.engine.population.population().to_vec();
let curves: Vec<_> = [&self.src_id, &self.dst_id]
.iter()
.filter_map(|id| self.engine.scoreboard.curves.get(id))
.cloned()
.collect();
rvf_bridge::assemble_domain_expansion_segments(&priors, &kernels, &curves, 1)
}
#[cfg(feature = "rvf")]
pub fn save_rvf(&self, path: impl AsRef<std::path::Path>) -> std::io::Result<()> {
std::fs::write(path, self.package_as_rvf())
}
}
impl Default for ExoTransferOrchestrator {
fn default() -> Self {
Self::new("default_node")
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_orchestrator_creation() {
let orchestrator = ExoTransferOrchestrator::new("test_node");
assert_eq!(orchestrator.cycle(), 0);
assert!(orchestrator.best_prior().is_none());
}
#[test]
fn test_single_cycle() {
let mut orchestrator = ExoTransferOrchestrator::new("node_1");
let result = orchestrator.run_cycle();
assert_eq!(result.cycle, 1);
assert!(result.eval_score >= 0.0 && result.eval_score <= 1.0);
assert!(result.manifold_entries >= 1);
assert!(orchestrator.best_prior().is_some());
}
#[test]
fn test_multi_cycle_emergence() {
let mut orchestrator = ExoTransferOrchestrator::new("node_2");
let r1 = orchestrator.run_cycle();
assert_eq!(r1.cycle, 1);
for _ in 0..4 {
let r = orchestrator.run_cycle();
assert!(r.emergence_score >= 0.0);
}
assert_eq!(orchestrator.cycle(), 5);
}
#[test]
#[cfg(feature = "rvf")]
fn test_package_as_rvf_empty() {
let orchestrator = ExoTransferOrchestrator::new("rvf_node");
let bytes = orchestrator.package_as_rvf();
assert_eq!(bytes.len() % 64, 0, "RVF output must be 64-byte aligned");
}
#[test]
#[cfg(feature = "rvf")]
fn test_package_as_rvf_after_cycles() {
const SEGMENT_MAGIC: u32 = 0x5256_4653;
let mut orchestrator = ExoTransferOrchestrator::new("rvf_cycle_node");
for _ in 0..3 {
orchestrator.run_cycle();
}
let bytes = orchestrator.package_as_rvf();
assert!(
!bytes.is_empty(),
"RVF output must not be empty after cycles"
);
assert_eq!(bytes.len() % 64, 0, "RVF output must be 64-byte aligned");
let magic = u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
assert_eq!(
magic, SEGMENT_MAGIC,
"First segment must have valid RVF magic"
);
}
#[test]
#[cfg(feature = "rvf")]
fn test_save_rvf_to_file() {
let mut orchestrator = ExoTransferOrchestrator::new("rvf_file_node");
orchestrator.run_cycle();
let path = std::env::temp_dir().join("exo_test.rvf");
orchestrator
.save_rvf(&path)
.expect("save_rvf should succeed");
let written = std::fs::read(&path).expect("file should exist after save_rvf");
assert!(!written.is_empty());
assert_eq!(written.len() % 64, 0);
let _ = std::fs::remove_file(&path);
}
}