use crate::{
AgentDefinition, ConcurrencyController, DispatchTask, Dispatcher, ScheduleEvent, TimeScheduler,
};
use tracing::{error, info, warn};
pub struct TimeMode {
scheduler: TimeScheduler,
dispatcher: Dispatcher,
concurrency: ConcurrencyController,
running: std::collections::HashSet<String>,
}
impl TimeMode {
pub fn new(scheduler: TimeScheduler, concurrency: ConcurrencyController) -> Self {
Self {
scheduler,
dispatcher: Dispatcher::new(),
concurrency,
running: std::collections::HashSet::new(),
}
}
pub fn immediate_agents(&self) -> Vec<AgentDefinition> {
self.scheduler.immediate_agents()
}
pub async fn start_safety_agent(&mut self, agent: AgentDefinition) -> Result<(), String> {
let task = DispatchTask::TimeDriven {
name: agent.name.clone(),
task: agent.task.clone(),
layer: agent.layer,
};
self.dispatcher.enqueue(task);
let name = agent.name.clone();
self.running.insert(name.clone());
info!(
agent_name = %name,
layer = ?agent.layer,
"started Safety agent"
);
Ok(())
}
pub async fn run(mut self, mut shutdown: tokio::sync::watch::Receiver<bool>) {
info!("starting time-driven mode");
for agent in self.immediate_agents() {
if let Err(e) = self.start_safety_agent(agent).await {
error!("failed to start Safety agent: {}", e);
}
}
loop {
tokio::select! {
event = self.scheduler.next_event() => {
match event {
ScheduleEvent::Spawn(agent) => {
if let Err(e) = self.handle_spawn(*agent).await {
error!("failed to spawn agent: {}", e);
}
}
ScheduleEvent::Stop { agent_name } => {
self.handle_stop(agent_name).await;
}
ScheduleEvent::CompoundReview => {
info!("compound review scheduled");
}
ScheduleEvent::Flow(flow) => {
info!(flow_name = %flow.name, "flow scheduled");
}
}
}
_ = shutdown.changed() => {
if *shutdown.borrow() {
info!("shutting down time-driven mode");
break;
}
}
}
}
}
async fn handle_spawn(&mut self, agent: AgentDefinition) -> Result<(), String> {
match self.concurrency.acquire_time_driven().await {
Some(permit) => {
let task = DispatchTask::TimeDriven {
name: agent.name.clone(),
task: agent.task.clone(),
layer: agent.layer,
};
self.dispatcher.enqueue(task);
self.running.insert(agent.name.clone());
info!(
agent_name = %agent.name,
layer = ?agent.layer,
"dispatched time-driven agent"
);
drop(permit);
Ok(())
}
None => {
warn!(
agent_name = %agent.name,
"no concurrency slot available for agent"
);
Err("concurrency limit reached".into())
}
}
}
async fn handle_stop(&mut self, agent_name: String) {
info!(agent_name = %agent_name, "stopping agent");
self.running.remove(&agent_name);
}
pub fn dispatcher_stats(&self) -> &crate::dispatcher::DispatcherStats {
self.dispatcher.stats()
}
pub fn event_sender(&self) -> tokio::sync::mpsc::Sender<ScheduleEvent> {
self.scheduler.event_sender()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{AgentLayer, ModeQuotas};
fn test_agent(name: &str, layer: AgentLayer) -> AgentDefinition {
AgentDefinition {
name: name.into(),
layer,
cli_tool: "echo".into(),
task: "test".into(),
model: None,
schedule: None,
capabilities: vec![],
max_memory_bytes: None,
budget_monthly_cents: None,
provider: None,
persona: None,
terraphim_role: None,
skill_chain: vec![],
sfia_skills: vec![],
fallback_provider: None,
fallback_model: None,
grace_period_secs: None,
max_cpu_seconds: None,
pre_check: None,
gitea_issue: None,
}
}
#[tokio::test]
async fn test_safety_agent_bypasses_concurrency() {
let scheduler =
TimeScheduler::new(&[test_agent("safety", AgentLayer::Safety)], None).unwrap();
let concurrency = ConcurrencyController::new(
10,
ModeQuotas {
time_max: 0, issue_max: 10,
},
crate::FairnessPolicy::RoundRobin,
);
let mut mode = TimeMode::new(scheduler, concurrency);
let agent = test_agent("safety", AgentLayer::Safety);
let result = mode.start_safety_agent(agent).await;
assert!(result.is_ok());
}
}