use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use async_trait::async_trait;
use oxios_gateway::channel::Channel;
use oxios_gateway::gateway::Gateway;
use oxios_gateway::message::{IncomingMessage, OutgoingMessage};
use oxios_kernel::a2a::A2AProtocol;
use oxios_kernel::access_manager::{AccessManager, AgentPermissions};
use oxios_kernel::agent_lifecycle::AgentLifecycleManager;
use oxios_kernel::config::OrchestratorConfig;
use oxios_kernel::event_bus::{EventBus, KernelEvent};
use oxios_kernel::orchestrator::Orchestrator;
use oxios_kernel::scheduler::AgentScheduler;
use oxios_kernel::state_store::StateStore;
use oxios_kernel::supervisor::Supervisor;
use oxios_ouroboros::evaluation::EvaluationResult;
use oxios_ouroboros::interview::InterviewResult;
use oxios_ouroboros::protocol::{ExecutionResult, OuroborosProtocol, Phase};
use oxios_ouroboros::seed::{AmbiguityScore, Seed};
use oxios_kernel::types::{AgentId, AgentInfo, AgentStatus};
struct MockOuroboros {
interview_called: AtomicUsize,
generate_seed_called: AtomicUsize,
evaluate_called: AtomicUsize,
evolve_called: AtomicUsize,
evaluation_passes: AtomicBool,
}
impl MockOuroboros {
fn new() -> Self {
Self {
interview_called: AtomicUsize::new(0),
generate_seed_called: AtomicUsize::new(0),
evaluate_called: AtomicUsize::new(0),
evolve_called: AtomicUsize::new(0),
evaluation_passes: AtomicBool::new(true),
}
}
fn with_failing_evaluation() -> Self {
let s = Self::new();
s.evaluation_passes.store(false, Ordering::SeqCst);
s
}
}
#[async_trait]
impl OuroborosProtocol for MockOuroboros {
async fn interview(&self, _user_input: &str) -> anyhow::Result<InterviewResult> {
self.interview_called.fetch_add(1, Ordering::SeqCst);
let mut result = InterviewResult::new();
let score = AmbiguityScore::new(1.0, 1.0, 1.0);
result.update_ambiguity(score);
result.add_exchange("What is the goal?", "Test goal");
Ok(result)
}
async fn generate_seed(&self, _interview: &InterviewResult) -> anyhow::Result<Seed> {
self.generate_seed_called.fetch_add(1, Ordering::SeqCst);
let mut seed = Seed::new("Test goal");
seed.constraints = vec!["No errors".into()];
seed.acceptance_criteria = vec!["Output contains 'done'".into()];
Ok(seed)
}
async fn execute(&self, seed: &Seed) -> anyhow::Result<ExecutionResult> {
Ok(ExecutionResult {
output: format!("Executed: {}", seed.goal),
steps_completed: 5,
success: true,
})
}
async fn evaluate(
&self,
_seed: &Seed,
_execution: &ExecutionResult,
) -> anyhow::Result<EvaluationResult> {
self.evaluate_called.fetch_add(1, Ordering::SeqCst);
let passes = self.evaluation_passes.load(Ordering::SeqCst);
self.evaluation_passes.store(true, Ordering::SeqCst);
Ok(EvaluationResult {
mechanical_pass: passes,
semantic_pass: Some(passes),
consensus_pass: None,
score: if passes { 0.95 } else { 0.5 },
notes: vec!["Mock evaluation".into()],
})
}
async fn evolve(
&self,
seed: &Seed,
_evaluation: &EvaluationResult,
) -> anyhow::Result<Option<Seed>> {
self.evolve_called.fetch_add(1, Ordering::SeqCst);
let evolved = Seed::evolved_from(seed);
Ok(Some(evolved))
}
}
struct MockSupervisor {
agents: parking_lot::RwLock<HashMap<AgentId, AgentInfo>>,
fork_called: AtomicUsize,
run_called: AtomicUsize,
event_bus: EventBus,
}
impl MockSupervisor {
fn new(event_bus: EventBus) -> Self {
Self {
agents: parking_lot::RwLock::new(HashMap::new()),
fork_called: AtomicUsize::new(0),
run_called: AtomicUsize::new(0),
event_bus,
}
}
}
#[async_trait]
impl Supervisor for MockSupervisor {
async fn fork(&self, spec: &Seed) -> anyhow::Result<AgentId> {
self.fork_called.fetch_add(1, Ordering::SeqCst);
let id = AgentId::new_v4();
let info = AgentInfo {
id,
name: spec.goal.clone(),
status: AgentStatus::Starting,
created_at: chrono::Utc::now(),
seed_id: Some(spec.id),
};
{
let mut agents = self.agents.write();
agents.insert(id, info);
}
let _ = self.event_bus.publish(KernelEvent::AgentCreated {
id,
name: spec.goal.clone(),
});
Ok(id)
}
async fn exec(&self, id: AgentId) -> anyhow::Result<()> {
let mut agents = self.agents.write();
match agents.get_mut(&id) {
Some(a) => a.status = AgentStatus::Running,
None => anyhow::bail!("Agent {id} not found"),
}
Ok(())
}
async fn run_with_seed(&self, id: AgentId, _seed: &Seed) -> anyhow::Result<ExecutionResult> {
self.run_called.fetch_add(1, Ordering::SeqCst);
{
let mut agents = self.agents.write();
if let Some(a) = agents.get_mut(&id) {
a.status = AgentStatus::Idle;
}
}
let _ = self.event_bus.publish(KernelEvent::AgentStarted { id });
let _ = self.event_bus.publish(KernelEvent::AgentStopped { id });
Ok(ExecutionResult {
output: "Mock agent completed".into(),
steps_completed: 3,
success: true,
})
}
async fn wait(&self, id: AgentId) -> anyhow::Result<AgentStatus> {
let agents = self.agents.read();
match agents.get(&id) {
Some(a) => Ok(a.status),
None => anyhow::bail!("Agent {id} not found"),
}
}
async fn kill(&self, id: AgentId) -> anyhow::Result<()> {
let mut agents = self.agents.write();
if let Some(a) = agents.get_mut(&id) {
a.status = AgentStatus::Stopped;
}
Ok(())
}
async fn list(&self) -> anyhow::Result<Vec<AgentInfo>> {
let agents = self.agents.read();
Ok(agents.values().cloned().collect())
}
}
struct MockChannel {
outgoing: tokio::sync::Mutex<Vec<OutgoingMessage>>,
incoming_rx: tokio::sync::Mutex<Option<tokio::sync::mpsc::Receiver<IncomingMessage>>>,
}
impl MockChannel {
fn new(buffer: usize) -> Self {
let (_tx, rx) = tokio::sync::mpsc::channel(buffer);
Self {
outgoing: tokio::sync::Mutex::new(Vec::new()),
incoming_rx: tokio::sync::Mutex::new(Some(rx)),
}
}
}
#[async_trait]
impl Channel for MockChannel {
fn name(&self) -> &str {
"mock"
}
async fn start(
&self,
_tx: tokio::sync::mpsc::Sender<oxios_gateway::GatewayInbox>,
mut shutdown: tokio::sync::watch::Receiver<bool>,
) -> anyhow::Result<tokio::task::JoinHandle<()>> {
self.incoming_rx.lock().await.take();
let handle = tokio::spawn(async move {
let _ = shutdown.changed().await;
});
Ok(handle)
}
async fn send(&self, msg: OutgoingMessage) -> anyhow::Result<()> {
self.outgoing.lock().await.push(msg);
Ok(())
}
}
#[tokio::test]
async fn test_event_bus_publish_subscribe() {
let bus = EventBus::new(16);
let mut rx = bus.subscribe();
bus.publish(KernelEvent::SeedCreated {
seed_id: uuid::Uuid::new_v4(),
})
.unwrap();
let event = rx.recv().await.unwrap();
match event {
KernelEvent::SeedCreated { .. } => {}
other => panic!("Expected SeedCreated, got {other:?}"),
}
}
#[tokio::test]
async fn test_event_bus_multiple_subscribers() {
let bus = EventBus::new(16);
let mut rx1 = bus.subscribe();
let mut rx2 = bus.subscribe();
let seed_id = uuid::Uuid::new_v4();
bus.publish(KernelEvent::SeedCreated { seed_id }).unwrap();
let e1 = rx1.recv().await.unwrap();
let e2 = rx2.recv().await.unwrap();
assert!(matches!(e1, KernelEvent::SeedCreated { .. }));
assert!(matches!(e2, KernelEvent::SeedCreated { .. }));
}
#[tokio::test]
async fn test_event_bus_no_subscribers_ok() {
let bus = EventBus::new(16);
bus.publish(KernelEvent::SeedCreated {
seed_id: uuid::Uuid::new_v4(),
})
.unwrap();
}
#[tokio::test]
async fn test_state_store_save_load_markdown() {
let tmp = tempfile::tempdir().unwrap();
let store = StateStore::new(tmp.path().to_path_buf()).unwrap();
store
.save_markdown("memory", "test-note", "Hello, world!")
.await
.unwrap();
let loaded = store.load_markdown("memory", "test-note").await.unwrap();
assert_eq!(loaded, Some("Hello, world!".to_string()));
}
#[tokio::test]
async fn test_state_store_load_nonexistent() {
let tmp = tempfile::tempdir().unwrap();
let store = StateStore::new(tmp.path().to_path_buf()).unwrap();
let loaded = store.load_markdown("memory", "nope").await.unwrap();
assert_eq!(loaded, None);
}
#[tokio::test]
async fn test_state_store_list_category() {
let tmp = tempfile::tempdir().unwrap();
let store = StateStore::new(tmp.path().to_path_buf()).unwrap();
store
.save_markdown("seeds", "alpha", "seed alpha content")
.await
.unwrap();
store
.save_markdown("seeds", "beta", "seed beta content")
.await
.unwrap();
let names = store.list_category("seeds").await.unwrap();
assert_eq!(names, vec!["alpha", "beta"]);
}
#[tokio::test]
async fn test_state_store_save_load_json() {
let tmp = tempfile::tempdir().unwrap();
let store = StateStore::new(tmp.path().to_path_buf()).unwrap();
let data = serde_json::json!({
"name": "test",
"value": 42
});
store.save_json("config", "test", &data).await.unwrap();
let loaded: Option<serde_json::Value> = store.load_json("config", "test").await.unwrap();
assert_eq!(loaded, Some(data));
}
#[tokio::test]
async fn test_state_store_path_traversal_blocked() {
let tmp = tempfile::tempdir().unwrap();
let store = StateStore::new(tmp.path().to_path_buf()).unwrap();
let result = store.save_markdown("../etc", "shadow", "hacked").await;
assert!(result.is_err());
let result = store.save_markdown("memory", "../shadow", "hacked").await;
assert!(result.is_err());
let result = store.save_markdown("foo/bar", "test", "content").await;
assert!(result.is_ok());
let result = store.save_markdown("foo\\bar", "test", "content").await;
assert!(result.is_err());
let result = store.save_markdown("", "test", "content").await;
assert!(result.is_err());
let result = store.save_markdown("/foo", "test", "content").await;
assert!(result.is_err());
let result = store.save_markdown("foo/", "test", "content").await;
assert!(result.is_err());
let result = store.save_markdown("foo//bar", "test", "content").await;
assert!(result.is_err());
}
fn make_evolution_config(max_iterations: u32) -> OrchestratorConfig {
OrchestratorConfig {
max_evolution_iterations: max_iterations,
min_evaluation_score: 0.8,
eval_cache_enabled: true,
}
}
#[tokio::test]
async fn test_orchestrator_happy_path() {
let event_bus = EventBus::new(64);
let tmp = tempfile::tempdir().unwrap();
let state_store = Arc::new(StateStore::new(tmp.path().to_path_buf()).unwrap());
let ouroboros = Arc::new(MockOuroboros::new());
let supervisor = Arc::new(MockSupervisor::new(event_bus.clone()));
let a2a = Arc::new(A2AProtocol::new(event_bus.clone()));
let scheduler = Arc::new(AgentScheduler::default());
let access_manager = Arc::new(parking_lot::Mutex::new(AccessManager::new()));
let lifecycle = AgentLifecycleManager::new(
supervisor.clone(),
scheduler.clone(),
access_manager.clone(),
a2a.clone(),
event_bus.clone(),
300,
);
let orchestrator = Orchestrator::with_config(
ouroboros.clone(),
event_bus.clone(),
state_store,
lifecycle,
make_evolution_config(0), );
let result = orchestrator
.handle_message("test-user", "Do something useful", None, None, "test-req")
.await
.unwrap();
assert!(result.session_id.is_some());
assert!(result.seed_id.is_some());
assert_eq!(result.phase_reached, Phase::Evaluate);
assert!(result.evaluation_passed);
assert!(!result.response.is_empty());
assert_eq!(ouroboros.interview_called.load(Ordering::SeqCst), 1);
assert_eq!(ouroboros.generate_seed_called.load(Ordering::SeqCst), 1);
assert_eq!(ouroboros.evaluate_called.load(Ordering::SeqCst), 1); assert_eq!(ouroboros.evolve_called.load(Ordering::SeqCst), 0);
assert_eq!(supervisor.fork_called.load(Ordering::SeqCst), 1);
assert_eq!(supervisor.run_called.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_orchestrator_evolution_loop() {
let event_bus = EventBus::new(64);
let tmp = tempfile::tempdir().unwrap();
let state_store = Arc::new(StateStore::new(tmp.path().to_path_buf()).unwrap());
let ouroboros = Arc::new(MockOuroboros::with_failing_evaluation());
let supervisor = Arc::new(MockSupervisor::new(event_bus.clone()));
let a2a = Arc::new(A2AProtocol::new(event_bus.clone()));
let scheduler = Arc::new(AgentScheduler::default());
let access_manager = Arc::new(parking_lot::Mutex::new(AccessManager::new()));
let lifecycle = AgentLifecycleManager::new(
supervisor.clone(),
scheduler.clone(),
access_manager.clone(),
a2a.clone(),
event_bus.clone(),
300,
);
let orchestrator = Orchestrator::with_config(
ouroboros.clone(),
event_bus.clone(),
state_store,
lifecycle,
make_evolution_config(3), );
let result = orchestrator
.handle_message("test-user", "Do something tricky", None, None, "test-req")
.await
.unwrap();
assert_eq!(result.phase_reached, Phase::Evolve);
assert!(result.evaluation_passed);
assert_eq!(ouroboros.evaluate_called.load(Ordering::SeqCst), 2); assert_eq!(ouroboros.evolve_called.load(Ordering::SeqCst), 1);
assert_eq!(supervisor.fork_called.load(Ordering::SeqCst), 2); assert_eq!(supervisor.run_called.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn test_orchestrator_events_published() {
let event_bus = EventBus::new(64);
let mut rx = event_bus.subscribe();
let tmp = tempfile::tempdir().unwrap();
let state_store = Arc::new(StateStore::new(tmp.path().to_path_buf()).unwrap());
let ouroboros = Arc::new(MockOuroboros::new());
let supervisor = Arc::new(MockSupervisor::new(event_bus.clone()));
let a2a = Arc::new(A2AProtocol::new(event_bus.clone()));
let scheduler = Arc::new(AgentScheduler::default());
let access_manager = Arc::new(parking_lot::Mutex::new(AccessManager::new()));
let lifecycle = AgentLifecycleManager::new(
supervisor,
scheduler.clone(),
access_manager.clone(),
a2a.clone(),
event_bus.clone(),
300,
);
let orchestrator = Orchestrator::with_config(
ouroboros,
event_bus.clone(),
state_store,
lifecycle,
make_evolution_config(0),
);
let handle = tokio::spawn(async move {
orchestrator
.handle_message("test-user", "Check events", None, None, "test-req")
.await
.unwrap()
});
let mut phase_events = Vec::new();
let mut seed_events = Vec::new();
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
loop {
let evt = tokio::select! {
evt = rx.recv() => evt.unwrap(),
_ = tokio::time::sleep_until(deadline) => break,
};
match evt {
KernelEvent::PhaseStarted { .. } | KernelEvent::PhaseCompleted { .. } => {
phase_events.push(evt);
}
KernelEvent::SeedCreated { .. } => {
seed_events.push(evt);
}
_ => {}
}
if phase_events.len() >= 8 {
break;
}
}
assert!(
phase_events.len() >= 4,
"Expected at least 4 phase events, got {}",
phase_events.len()
);
assert!(
!seed_events.is_empty(),
"Expected at least one SeedCreated event"
);
let _result = handle.await.unwrap();
}
#[tokio::test]
async fn test_gateway_routes_message_through_orchestrator() {
let event_bus = EventBus::new(64);
let tmp = tempfile::tempdir().unwrap();
let state_store = Arc::new(StateStore::new(tmp.path().to_path_buf()).unwrap());
let ouroboros = Arc::new(MockOuroboros::new());
let supervisor = Arc::new(MockSupervisor::new(event_bus.clone()));
let a2a = Arc::new(A2AProtocol::new(event_bus.clone()));
let scheduler = Arc::new(AgentScheduler::default());
let access_manager = Arc::new(parking_lot::Mutex::new(AccessManager::new()));
let orchestrator = Arc::new({
let lifecycle = AgentLifecycleManager::new(
supervisor,
scheduler.clone(),
access_manager.clone(),
a2a.clone(),
event_bus.clone(),
300,
);
Orchestrator::with_config(
ouroboros,
event_bus.clone(),
state_store,
lifecycle,
make_evolution_config(0),
)
});
let gateway = Gateway::new(orchestrator);
let mock_channel = Box::new(MockChannel::new(16));
gateway.register(mock_channel).await.unwrap();
assert_eq!(gateway.channel_names().await, vec!["mock"]);
let outgoing = OutgoingMessage::new("mock", "test-user", "Hello from gateway");
let result = gateway.send_to("mock", outgoing).await;
assert!(result.is_ok());
gateway.signal_shutdown();
}
#[tokio::test]
async fn test_gateway_unknown_channel() {
let event_bus = EventBus::new(64);
let tmp = tempfile::tempdir().unwrap();
let state_store = Arc::new(StateStore::new(tmp.path().to_path_buf()).unwrap());
let ouroboros = Arc::new(MockOuroboros::new());
let supervisor = Arc::new(MockSupervisor::new(event_bus.clone()));
let a2a = Arc::new(A2AProtocol::new(event_bus.clone()));
let scheduler = Arc::new(AgentScheduler::default());
let access_manager = Arc::new(parking_lot::Mutex::new(AccessManager::new()));
let orchestrator = Arc::new({
let lifecycle = AgentLifecycleManager::new(
supervisor,
scheduler.clone(),
access_manager.clone(),
a2a.clone(),
event_bus.clone(),
300,
);
Orchestrator::with_config(
ouroboros,
event_bus.clone(),
state_store,
lifecycle,
make_evolution_config(0),
)
});
let gateway = Gateway::new(orchestrator);
let outgoing = OutgoingMessage::new("nonexistent", "test-user", "Test");
let result = gateway.send_to("nonexistent", outgoing).await;
assert!(result.is_ok());
}
use oxios_kernel::scheduler::{Priority, ScheduledTask};
use std::sync::atomic::AtomicU32;
struct SchedulerAwareSupervisor {
scheduler: Arc<AgentScheduler>,
agents: parking_lot::RwLock<HashMap<AgentId, AgentInfo>>,
event_bus: EventBus,
tasks_claimed: AtomicU32,
}
impl SchedulerAwareSupervisor {
fn new(scheduler: Arc<AgentScheduler>, event_bus: EventBus) -> Self {
Self {
scheduler,
agents: parking_lot::RwLock::new(HashMap::new()),
event_bus,
tasks_claimed: AtomicU32::new(0),
}
}
}
#[async_trait]
impl Supervisor for SchedulerAwareSupervisor {
async fn fork(&self, spec: &Seed) -> anyhow::Result<AgentId> {
let id = AgentId::new_v4();
let info = AgentInfo {
id,
name: spec.goal.clone(),
status: AgentStatus::Starting,
created_at: chrono::Utc::now(),
seed_id: Some(spec.id),
};
{
let mut agents = self.agents.write();
agents.insert(id, info);
}
let _ = self.event_bus.publish(KernelEvent::AgentCreated {
id,
name: spec.goal.clone(),
});
Ok(id)
}
async fn exec(&self, id: AgentId) -> anyhow::Result<()> {
let mut agents = self.agents.write();
if let Some(a) = agents.get_mut(&id) {
a.status = AgentStatus::Running;
}
Ok(())
}
async fn run_with_seed(&self, id: AgentId, _seed: &Seed) -> anyhow::Result<ExecutionResult> {
let task = ScheduledTask::for_agent(id, format!("Agent {id} execution"), Priority::Normal);
self.scheduler.submit(task)?;
self.tasks_claimed.fetch_add(1, Ordering::SeqCst);
{
let mut agents = self.agents.write();
if let Some(a) = agents.get_mut(&id) {
a.status = AgentStatus::Idle;
}
}
let _ = self.event_bus.publish(KernelEvent::AgentStarted { id });
let _ = self.event_bus.publish(KernelEvent::AgentStopped { id });
Ok(ExecutionResult {
output: "Task completed".into(),
steps_completed: 1,
success: true,
})
}
async fn wait(&self, id: AgentId) -> anyhow::Result<AgentStatus> {
let agents = self.agents.read();
match agents.get(&id) {
Some(a) => Ok(a.status),
None => anyhow::bail!("Agent {id} not found"),
}
}
async fn kill(&self, id: AgentId) -> anyhow::Result<()> {
let mut agents = self.agents.write();
if let Some(a) = agents.get_mut(&id) {
a.status = AgentStatus::Stopped;
}
Ok(())
}
async fn list(&self) -> anyhow::Result<Vec<AgentInfo>> {
let agents = self.agents.read();
Ok(agents.values().cloned().collect())
}
}
#[tokio::test]
async fn test_scheduler_orchestrator_integration() {
let event_bus = EventBus::new(64);
let tmp = tempfile::tempdir().unwrap();
let state_store = Arc::new(StateStore::new(tmp.path().to_path_buf()).unwrap());
let a2a = Arc::new(A2AProtocol::new(event_bus.clone()));
let scheduler = Arc::new(AgentScheduler::new(3, 100, 60));
let ouroboros = Arc::new(MockOuroboros::new());
let supervisor = Arc::new(SchedulerAwareSupervisor::new(
scheduler.clone(),
event_bus.clone(),
));
let access_manager = Arc::new(parking_lot::Mutex::new(AccessManager::new()));
let lifecycle = AgentLifecycleManager::new(
supervisor,
scheduler.clone(),
access_manager.clone(),
a2a.clone(),
event_bus.clone(),
300,
);
let orchestrator = Orchestrator::with_config(
ouroboros,
event_bus.clone(),
state_store,
lifecycle,
make_evolution_config(0),
);
let result = orchestrator
.handle_message("test-user", "Build a simple thing", None, None, "test-req")
.await
.unwrap();
assert!(result.session_id.is_some());
assert!(result.seed_id.is_some());
assert_eq!(result.phase_reached, Phase::Evaluate);
let stats = scheduler.stats();
assert!(
stats.queued + stats.running >= 1,
"Expected at least one task"
);
}
#[tokio::test]
async fn test_scheduler_priority_ordering_in_orchestration() {
let event_bus = EventBus::new(64);
let tmp = tempfile::tempdir().unwrap();
let state_store = Arc::new(StateStore::new(tmp.path().to_path_buf()).unwrap());
let a2a = Arc::new(A2AProtocol::new(event_bus.clone()));
let scheduler = Arc::new(AgentScheduler::new(10, 10_000, 60));
scheduler
.submit(ScheduledTask::new(
"Low priority task".into(),
Priority::Low,
))
.unwrap();
scheduler
.submit(ScheduledTask::new(
"High priority task".into(),
Priority::High,
))
.unwrap();
scheduler
.submit(ScheduledTask::new(
"Normal priority task".into(),
Priority::Normal,
))
.unwrap();
scheduler
.submit(ScheduledTask::new(
"Critical task".into(),
Priority::Critical,
))
.unwrap();
let task1 = scheduler.next_task().unwrap();
assert_eq!(task1.priority, Priority::Critical);
let task2 = scheduler.next_task().unwrap();
assert_eq!(task2.priority, Priority::High);
let task3 = scheduler.next_task().unwrap();
assert_eq!(task3.priority, Priority::Normal);
let task4 = scheduler.next_task().unwrap();
assert_eq!(task4.priority, Priority::Low);
let ouroboros = Arc::new(MockOuroboros::new());
let supervisor = Arc::new(SchedulerAwareSupervisor::new(
scheduler.clone(),
event_bus.clone(),
));
let access_manager = Arc::new(parking_lot::Mutex::new(AccessManager::new()));
let lifecycle = AgentLifecycleManager::new(
supervisor,
scheduler.clone(),
access_manager.clone(),
a2a.clone(),
event_bus.clone(),
300,
);
let _orchestrator = Orchestrator::with_config(
ouroboros,
event_bus,
state_store,
lifecycle,
make_evolution_config(0),
);
}
#[tokio::test]
async fn test_access_manager_blocks_dangerous_tools() {
let mut access = AccessManager::new();
let mut perms = AgentPermissions::for_new_agent("safe-agent");
perms.allowed_tools.clear(); perms.allow_tool("read"); perms.allow_tool("grep");
access.set_permissions(perms);
assert!(!access.can_use_tool("safe-agent", "bash")); assert!(!access.can_use_tool("safe-agent", "rm")); assert!(!access.can_use_tool("safe-agent", "sudo")); assert!(access.can_use_tool("safe-agent", "read")); assert!(access.can_use_tool("safe-agent", "grep"));
assert!(!access.can_use_tool("unknown-agent", "read"));
}
#[tokio::test]
async fn test_access_manager_enforces_path_restrictions() {
let mut access = AccessManager::new();
let mut perms = AgentPermissions::for_new_agent("file-agent");
perms.allowed_paths = vec![
"/workspace/**".to_string(),
"/home/user/projects/**".to_string(),
];
perms.denied_paths = vec![
"/workspace/secrets/**".to_string(),
"/workspace/.oxios/**".to_string(),
];
access.set_permissions(perms);
assert!(access.can_access_path("file-agent", "/workspace/file.txt"));
assert!(access.can_access_path("file-agent", "/workspace/subdir/code.rs"));
assert!(access.can_access_path("file-agent", "/home/user/projects/app/main.rs"));
assert!(!access.can_access_path("file-agent", "/etc/passwd"));
assert!(!access.can_access_path("file-agent", "/root/.ssh/id_rsa"));
assert!(!access.can_access_path("file-agent", "/workspace/secrets/api-key.txt"));
assert!(!access.can_access_path("file-agent", "/workspace/.oxios/config.toml"));
}
#[tokio::test]
async fn test_access_manager_audit_log_on_denied_access() {
let mut access = AccessManager::new();
let perms = AgentPermissions::for_new_agent("audited-agent");
access.set_permissions(perms);
access.can_use_tool("audited-agent", "network"); access.can_access_path("audited-agent", "/etc/shadow");
let log = access.audit_log();
assert_eq!(log.len(), 2);
assert!(!log[0].allowed);
assert!(!log[1].allowed);
assert!(log[0].reason.is_some());
assert!(log[1].reason.is_some());
let denied = access.denied_actions();
assert_eq!(denied.len(), 2);
}
#[tokio::test]
async fn test_access_manager_network_and_fork_permissions() {
let mut access = AccessManager::new();
let mut perms = AgentPermissions::for_new_agent("web-agent");
perms.enable_network();
access.set_permissions(perms);
assert!(access.can_access_network("web-agent"));
assert!(!access.can_fork("web-agent"));
let mut perms2 = AgentPermissions::for_new_agent("fork-agent");
perms2.enable_forking();
access.set_permissions(perms2);
assert!(!access.can_access_network("fork-agent"));
assert!(access.can_fork("fork-agent"));
let mut perms3 = AgentPermissions::for_new_agent("limited-agent");
perms3.max_execution_time_secs = 60;
perms3.max_memory_mb = 256;
access.set_permissions(perms3);
assert!(access.can_execute_for("limited-agent", 30));
assert!(access.can_execute_for("limited-agent", 60));
assert!(!access.can_execute_for("limited-agent", 61));
assert!(access.can_use_memory("limited-agent", 128));
assert!(!access.can_use_memory("limited-agent", 257));
}
#[tokio::test]
async fn test_access_manager_permission_lifecycle() {
let mut access = AccessManager::new();
access.set_permissions(AgentPermissions::for_new_agent("lifecycle-agent"));
assert!(access
.list_agents()
.contains(&"lifecycle-agent".to_string()));
let perms = access.get_or_create_permissions("lifecycle-agent");
perms.allow_tool("custom-tool");
assert!(access.can_use_tool("lifecycle-agent", "custom-tool"));
access.remove_permissions("lifecycle-agent");
assert!(!access
.list_agents()
.contains(&"lifecycle-agent".to_string()));
assert!(!access.can_use_tool("lifecycle-agent", "custom-tool"));
assert!(!access.can_access_path("lifecycle-agent", "/workspace/test.txt"));
assert!(!access.can_access_network("lifecycle-agent"));
}