use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use anyhow::Result;
use oxios_kernel::supervisor::Supervisor;
use oxios_kernel::types::{AgentId, AgentInfo, AgentStatus};
use oxios_kernel::{
config::OrchestratorConfig, A2AProtocol, AccessManager, AgentLifecycleManager, EventBus,
KernelEvent, Orchestrator, StateStore,
};
use oxios_ouroboros::{
AmbiguityScore, EvaluationResult, ExecutionResult, InterviewResult, OuroborosProtocol, Phase,
Seed,
};
fn make_config(max_iterations: u32) -> OrchestratorConfig {
OrchestratorConfig {
max_evolution_iterations: max_iterations,
min_evaluation_score: 0.8,
eval_cache_enabled: true,
}
}
struct MockOuroboros {
interview_called: AtomicUsize,
generate_seed_called: AtomicUsize,
evaluate_called: AtomicUsize,
evolve_called: AtomicUsize,
evaluation_passes: AtomicBool,
}
impl MockOuroboros {
fn new() -> Self {
Self {
interview_called: AtomicUsize::new(0),
generate_seed_called: AtomicUsize::new(0),
evaluate_called: AtomicUsize::new(0),
evolve_called: AtomicUsize::new(0),
evaluation_passes: AtomicBool::new(true),
}
}
fn with_failing_evaluation() -> Self {
let s = Self::new();
s.evaluation_passes.store(false, Ordering::SeqCst);
s
}
}
#[async_trait]
impl OuroborosProtocol for MockOuroboros {
async fn interview(&self, _user_input: &str) -> Result<InterviewResult> {
self.interview_called.fetch_add(1, Ordering::SeqCst);
let mut result = InterviewResult::new();
let score = AmbiguityScore::new(0.9, 0.85, 0.8);
result.update_ambiguity(score);
result.add_exchange("Goal confirmed", "User wants to proceed");
Ok(result)
}
async fn generate_seed(&self, _interview: &InterviewResult) -> Result<Seed> {
self.generate_seed_called.fetch_add(1, Ordering::SeqCst);
let mut seed = Seed::new("Test task from e2e smoke test");
seed.acceptance_criteria
.push("Output contains 'done'".into());
Ok(seed)
}
async fn execute(&self, seed: &Seed) -> Result<ExecutionResult> {
Ok(ExecutionResult {
output: format!("Executed seed: {}", seed.goal),
steps_completed: 3,
success: true,
})
}
async fn evaluate(
&self,
_seed: &Seed,
_execution: &ExecutionResult,
) -> Result<EvaluationResult> {
self.evaluate_called.fetch_add(1, Ordering::SeqCst);
let passes = self.evaluation_passes.load(Ordering::SeqCst);
self.evaluation_passes.store(true, Ordering::SeqCst);
Ok(EvaluationResult {
mechanical_pass: passes,
semantic_pass: Some(passes),
consensus_pass: None,
score: if passes { 0.95 } else { 0.4 },
notes: vec!["Mock evaluation".into()],
})
}
async fn evolve(&self, seed: &Seed, _evaluation: &EvaluationResult) -> Result<Option<Seed>> {
self.evolve_called.fetch_add(1, Ordering::SeqCst);
let evolved = Seed::evolved_from(seed);
Ok(Some(evolved))
}
}
struct MockSupervisor {
agents: parking_lot::RwLock<HashMap<AgentId, AgentInfo>>,
fork_called: AtomicUsize,
run_called: AtomicUsize,
event_bus: EventBus,
}
impl MockSupervisor {
fn new(event_bus: EventBus) -> Self {
Self {
agents: parking_lot::RwLock::new(HashMap::new()),
fork_called: AtomicUsize::new(0),
run_called: AtomicUsize::new(0),
event_bus,
}
}
}
#[async_trait]
impl Supervisor for MockSupervisor {
async fn fork(&self, spec: &Seed) -> Result<AgentId> {
self.fork_called.fetch_add(1, Ordering::SeqCst);
let id = AgentId::new_v4();
let info = AgentInfo {
id,
name: spec.goal.clone(),
status: AgentStatus::Starting,
created_at: chrono::Utc::now(),
seed_id: Some(spec.id),
};
self.agents.write().insert(id, info);
let _ = self.event_bus.publish(KernelEvent::AgentCreated {
id,
name: spec.goal.clone(),
});
Ok(id)
}
async fn exec(&self, id: AgentId) -> Result<()> {
self.agents
.write()
.get_mut(&id)
.map(|a| a.status = AgentStatus::Running);
Ok(())
}
async fn run_with_seed(&self, id: AgentId, _seed: &Seed) -> Result<ExecutionResult> {
self.run_called.fetch_add(1, Ordering::SeqCst);
self.agents
.write()
.get_mut(&id)
.map(|a| a.status = AgentStatus::Idle);
let _ = self.event_bus.publish(KernelEvent::AgentStarted { id });
let _ = self.event_bus.publish(KernelEvent::AgentStopped { id });
Ok(ExecutionResult {
output: "Mock agent completed successfully".into(),
steps_completed: 5,
success: true,
})
}
async fn wait(&self, id: AgentId) -> Result<AgentStatus> {
Ok(self
.agents
.read()
.get(&id)
.map(|a| a.status)
.unwrap_or(AgentStatus::Stopped))
}
async fn kill(&self, id: AgentId) -> Result<()> {
self.agents
.write()
.get_mut(&id)
.map(|a| a.status = AgentStatus::Stopped);
Ok(())
}
async fn list(&self) -> Result<Vec<AgentInfo>> {
Ok(self.agents.read().values().cloned().collect())
}
}
fn build_orchestrator_parts() -> (
Arc<MockOuroboros>,
Arc<MockSupervisor>,
EventBus,
Arc<StateStore>,
) {
let event_bus = EventBus::new(64);
let tmp = tempfile::tempdir().unwrap();
let state_store =
Arc::new(StateStore::new(tmp.path().to_path_buf()).expect("StateStore creation failed"));
let ouroboros = Arc::new(MockOuroboros::new());
let supervisor = Arc::new(MockSupervisor::new(event_bus.clone()));
(ouroboros, supervisor, event_bus, state_store)
}
fn make_lifecycle(
supervisor: Arc<MockSupervisor>,
scheduler: Arc<oxios_kernel::scheduler::AgentScheduler>,
event_bus: &EventBus,
) -> AgentLifecycleManager {
let access_manager = Arc::new(parking_lot::Mutex::new(AccessManager::new()));
let a2a = Arc::new(A2AProtocol::new(event_bus.clone()));
AgentLifecycleManager::new(
supervisor,
scheduler,
access_manager,
a2a,
event_bus.clone(),
300,
)
}
#[tokio::test]
async fn test_orchestrator_happy_path() {
let (ouroboros, supervisor, event_bus, state_store) = build_orchestrator_parts();
let scheduler = Arc::new(oxios_kernel::scheduler::AgentScheduler::default());
let lifecycle = make_lifecycle(supervisor.clone(), scheduler, &event_bus);
let orchestrator = Orchestrator::with_config(
ouroboros.clone(),
event_bus,
state_store,
lifecycle,
make_config(0), );
let result = orchestrator
.handle_message(
"test-user",
"Fix the bug in main.rs",
None,
None,
"test-req",
)
.await
.unwrap();
assert!(result.session_id.is_some());
assert!(result.seed_id.is_some());
assert_eq!(result.phase_reached, Phase::Evaluate);
assert!(result.evaluation_passed);
assert!(!result.response.is_empty());
assert_eq!(ouroboros.interview_called.load(Ordering::SeqCst), 1);
assert_eq!(ouroboros.generate_seed_called.load(Ordering::SeqCst), 1);
assert_eq!(ouroboros.evaluate_called.load(Ordering::SeqCst), 1);
assert_eq!(ouroboros.evolve_called.load(Ordering::SeqCst), 0);
assert_eq!(supervisor.fork_called.load(Ordering::SeqCst), 1);
assert_eq!(supervisor.run_called.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_orchestrator_evolution_loop() {
let event_bus = EventBus::new(64);
let tmp = tempfile::tempdir().unwrap();
let state_store = Arc::new(StateStore::new(tmp.path().to_path_buf()).unwrap());
let ouroboros = Arc::new(MockOuroboros::with_failing_evaluation());
let supervisor = Arc::new(MockSupervisor::new(event_bus.clone()));
let scheduler = Arc::new(oxios_kernel::scheduler::AgentScheduler::default());
let lifecycle = make_lifecycle(supervisor.clone(), scheduler, &event_bus);
let orchestrator = Orchestrator::with_config(
ouroboros.clone(),
event_bus,
state_store,
lifecycle,
make_config(3), );
let result = orchestrator
.handle_message(
"test-user",
"Something that needs evolution",
None,
None,
"test-req",
)
.await
.unwrap();
assert_eq!(result.phase_reached, Phase::Evolve);
assert!(result.evaluation_passed, "Final evaluation should pass");
assert_eq!(ouroboros.evaluate_called.load(Ordering::SeqCst), 2);
assert_eq!(ouroboros.evolve_called.load(Ordering::SeqCst), 1);
assert_eq!(supervisor.fork_called.load(Ordering::SeqCst), 2);
assert_eq!(supervisor.run_called.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn test_session_continuation() {
let (ouroboros, supervisor, event_bus, state_store) = build_orchestrator_parts();
let scheduler = Arc::new(oxios_kernel::scheduler::AgentScheduler::default());
let lifecycle = make_lifecycle(supervisor.clone(), scheduler, &event_bus);
let orchestrator = Orchestrator::with_config(
ouroboros.clone(),
event_bus,
state_store,
lifecycle,
make_config(0),
);
let session_id = "test-session-123";
let result1 = orchestrator
.handle_message(
"test-user",
"Work on the project",
Some(session_id),
None,
"test-req",
)
.await
.unwrap();
assert_eq!(result1.session_id.as_deref(), Some(session_id));
let result2 = orchestrator
.handle_message(
"test-user",
"Make it production ready",
Some(session_id),
None,
"test-req",
)
.await
.unwrap();
assert_eq!(result2.session_id.as_deref(), Some(session_id));
}
#[tokio::test]
async fn test_multiple_sessions_independent() {
let (ouroboros, supervisor, event_bus, state_store) = build_orchestrator_parts();
let scheduler = Arc::new(oxios_kernel::scheduler::AgentScheduler::default());
let lifecycle = make_lifecycle(supervisor.clone(), scheduler, &event_bus);
let orchestrator = Orchestrator::with_config(
ouroboros.clone(),
event_bus,
state_store,
lifecycle,
make_config(0),
);
let result_a = orchestrator
.handle_message("user-a", "Task A", Some("session-a"), None, "test-req")
.await
.unwrap();
let result_b = orchestrator
.handle_message("user-b", "Task B", Some("session-b"), None, "test-req")
.await
.unwrap();
assert_eq!(result_a.session_id.as_deref(), Some("session-a"));
assert_eq!(result_b.session_id.as_deref(), Some("session-b"));
assert_ne!(result_a.session_id, result_b.session_id);
}
#[tokio::test]
async fn test_session_cleaned_after_completion() {
let (ouroboros, supervisor, event_bus, state_store) = build_orchestrator_parts();
let scheduler = Arc::new(oxios_kernel::scheduler::AgentScheduler::default());
let lifecycle = make_lifecycle(supervisor.clone(), scheduler, &event_bus);
let orchestrator = Orchestrator::with_config(
ouroboros.clone(),
event_bus,
state_store,
lifecycle,
make_config(0),
);
let session_id = "cleanup-test-session";
orchestrator
.handle_message(
"test-user",
"Simple task",
Some(session_id),
None,
"test-req",
)
.await
.unwrap();
let result2 = orchestrator
.handle_message("test-user", "Another task", None, None, "test-req")
.await
.unwrap();
assert_ne!(result2.session_id.as_deref(), Some(session_id));
}
#[tokio::test]
async fn test_phase_events_published() {
let event_bus = EventBus::new(64);
let mut rx = event_bus.subscribe();
let tmp = tempfile::tempdir().unwrap();
let state_store = Arc::new(StateStore::new(tmp.path().to_path_buf()).unwrap());
let ouroboros = Arc::new(MockOuroboros::new());
let supervisor = Arc::new(MockSupervisor::new(event_bus.clone()));
let scheduler = Arc::new(oxios_kernel::scheduler::AgentScheduler::default());
let lifecycle = make_lifecycle(supervisor, scheduler, &event_bus);
let orchestrator = Orchestrator::with_config(
ouroboros,
event_bus.clone(),
state_store,
lifecycle,
make_config(0), );
let handle = tokio::spawn(async move {
orchestrator
.handle_message("test-user", "Test events", None, None, "test-req")
.await
.unwrap()
});
let mut phase_started = 0;
let mut phase_completed = 0;
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(1);
loop {
let elapsed = deadline.saturating_duration_since(tokio::time::Instant::now());
if elapsed.is_zero() {
break;
}
let evt = tokio::select! {
evt = rx.recv() => evt.unwrap(),
_ = tokio::time::sleep(elapsed) => break,
};
match evt {
KernelEvent::PhaseStarted { .. } => phase_started += 1,
KernelEvent::PhaseCompleted { .. } => phase_completed += 1,
_ => {}
}
}
assert!(
phase_started >= 4,
"Expected ≥4 PhaseStarted events, got {phase_started}"
);
assert!(
phase_completed >= 4,
"Expected ≥4 PhaseCompleted events, got {phase_completed}"
);
let _ = handle.await.unwrap();
}