use crate::{
AgentDefinition, AgentOrchestrator, CompoundReviewResult, ConcurrencyController,
DispatcherStats, FairnessPolicy, HandoffContext, ModeQuotas, OrchestratorConfig, ScheduleEvent,
TimeScheduler, WorkflowConfig,
};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use terraphim_tracker::{GiteaTracker, IssueTracker};
use tokio::sync::{mpsc, watch, Mutex};
use tracing::{error, info, warn};
#[derive(Clone)]
pub struct SharedState {
pub concurrency: ConcurrencyController,
pub stats: Arc<Mutex<DualModeStats>>,
pub shutdown_tx: watch::Sender<bool>,
}
#[derive(Debug, Default)]
pub struct DualModeStats {
pub time_stats: Option<DispatcherStats>,
pub issue_stats: Option<DispatcherStats>,
pub total_agents_spawned: u64,
pub active_by_mode: HashMap<String, usize>,
}
#[derive(Debug, Clone)]
pub struct AgentId {
pub name: String,
pub mode: ExecutionMode,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ExecutionMode {
TimeDriven,
IssueDriven,
}
impl std::fmt::Display for ExecutionMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ExecutionMode::TimeDriven => write!(f, "time"),
ExecutionMode::IssueDriven => write!(f, "issue"),
}
}
}
#[derive(Debug, Clone)]
pub enum SpawnTask {
TimeTask { agent: Box<AgentDefinition> },
IssueTask { issue_id: String, title: String },
}
pub struct DualModeOrchestrator {
config: OrchestratorConfig,
base: AgentOrchestrator,
state: SharedState,
time_mode: Option<TimeModeComponents>,
issue_mode: Option<IssueModeComponents>,
task_rx: mpsc::Receiver<SpawnTask>,
task_tx: mpsc::Sender<SpawnTask>,
active_agents: Arc<Mutex<HashMap<String, AgentId>>>,
}
struct TimeModeComponents {
scheduler: TimeScheduler,
shutdown_rx: watch::Receiver<bool>,
}
struct IssueModeComponents {
tracker: Box<dyn IssueTracker>,
workflow: WorkflowConfig,
shutdown_rx: watch::Receiver<bool>,
}
impl DualModeOrchestrator {
pub fn new(config: OrchestratorConfig) -> Result<Self, crate::OrchestratorError> {
let base = AgentOrchestrator::new(config.clone())?;
let concurrency = if let Some(ref workflow) = config.workflow {
ConcurrencyController::new(
workflow.concurrency.global_max,
ModeQuotas {
time_max: workflow
.concurrency
.global_max
.saturating_sub(workflow.concurrency.issue_max),
issue_max: workflow.concurrency.issue_max,
},
workflow
.concurrency
.fairness
.parse()
.unwrap_or(FairnessPolicy::RoundRobin),
)
} else {
ConcurrencyController::new(10, ModeQuotas::default(), FairnessPolicy::RoundRobin)
};
let (shutdown_tx, _shutdown_rx) = watch::channel(false);
let state = SharedState {
concurrency,
stats: Arc::new(Mutex::new(DualModeStats::default())),
shutdown_tx,
};
let (task_tx, task_rx) = mpsc::channel(128);
let time_mode = {
let scheduler =
TimeScheduler::new(&config.agents, Some(&config.compound_review.schedule))?;
let shutdown_rx = state.shutdown_tx.subscribe();
Some(TimeModeComponents {
scheduler,
shutdown_rx,
})
};
let issue_mode = if let Some(ref workflow) = config.workflow {
if workflow.enabled {
match create_tracker(workflow) {
Ok(tracker) => {
let shutdown_rx = state.shutdown_tx.subscribe();
Some(IssueModeComponents {
tracker,
workflow: workflow.clone(),
shutdown_rx,
})
}
Err(e) => {
warn!("failed to create issue tracker: {}", e);
None
}
}
} else {
None
}
} else {
None
};
Ok(Self {
config,
base,
state,
time_mode,
issue_mode,
task_rx,
task_tx,
active_agents: Arc::new(Mutex::new(HashMap::new())),
})
}
pub fn config(&self) -> &OrchestratorConfig {
&self.config
}
pub async fn run(&mut self) -> Result<(), crate::OrchestratorError> {
info!(
agents = self.config.agents.len(),
workflow_enabled = self.config.workflow.as_ref().is_some_and(|w| w.enabled),
"starting dual-mode orchestrator"
);
let mut time_handle = if let Some(time_components) = self.time_mode.take() {
let state = self.state.clone();
Some(tokio::spawn(run_time_mode(time_components, state)))
} else {
None
};
let mut issue_handle = if let Some(issue_components) = self.issue_mode.take() {
let state = self.state.clone();
Some(tokio::spawn(run_issue_mode(issue_components, state)))
} else {
None
};
let ctrl_c = tokio::signal::ctrl_c();
tokio::pin!(ctrl_c);
let mut time_done = false;
let mut issue_done = false;
loop {
tokio::select! {
Some(task) = self.task_rx.recv() => {
self.track_spawned_task(task).await;
}
result = async {
match &mut time_handle {
Some(h) => h.await,
None => std::future::pending().await,
}
}, if !time_done => {
time_done = true;
match result {
Ok(()) => info!("time mode completed"),
Err(e) => error!("time mode panicked: {}", e),
}
if issue_done { break; }
}
result = async {
match &mut issue_handle {
Some(h) => h.await,
None => std::future::pending().await,
}
}, if !issue_done => {
issue_done = true;
match result {
Ok(()) => info!("issue mode completed"),
Err(e) => error!("issue mode panicked: {}", e),
}
if time_done { break; }
}
result = self.base.run() => {
match result {
Ok(()) => info!("base orchestrator completed"),
Err(e) => error!("base orchestrator error: {}", e),
}
break;
}
_ = &mut ctrl_c => {
info!("shutdown signal received");
let _ = self.state.shutdown_tx.send(true);
break;
}
}
}
info!("shutting down dual-mode orchestrator");
self.shutdown().await;
Ok(())
}
async fn track_spawned_task(&self, task: SpawnTask) {
let mut stats = self.state.stats.lock().await;
stats.total_agents_spawned += 1;
match &task {
SpawnTask::TimeTask { agent } => {
info!(agent_name = %agent.name, "received time-driven spawn task");
let mut agents = self.active_agents.lock().await;
agents.insert(
agent.name.clone(),
AgentId {
name: agent.name.clone(),
mode: ExecutionMode::TimeDriven,
},
);
*stats.active_by_mode.entry("time".into()).or_insert(0) += 1;
}
SpawnTask::IssueTask { issue_id, title } => {
info!(issue_id = %issue_id, title = %title, "received issue-driven spawn task");
let mut agents = self.active_agents.lock().await;
agents.insert(
issue_id.clone(),
AgentId {
name: issue_id.clone(),
mode: ExecutionMode::IssueDriven,
},
);
*stats.active_by_mode.entry("issue".into()).or_insert(0) += 1;
}
}
}
pub fn task_sender(&self) -> mpsc::Sender<SpawnTask> {
self.task_tx.clone()
}
pub fn request_shutdown(&self) {
let _ = self.state.shutdown_tx.send(true);
}
async fn shutdown(&mut self) {
info!("initiating graceful shutdown");
self.request_shutdown();
let timeout = Duration::from_secs(30);
let start = std::time::Instant::now();
loop {
let active_count = {
let agents = self.active_agents.lock().await;
agents.len()
};
if active_count == 0 {
info!("all agents completed");
break;
}
if start.elapsed() > timeout {
warn!(
"shutdown timeout reached with {} agents still active",
active_count
);
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
self.base.shutdown();
info!("shutdown complete");
}
pub async fn stats(&self) -> DualModeStats {
let stats = self.state.stats.lock().await;
stats.clone()
}
pub async fn active_count(&self) -> usize {
let agents = self.active_agents.lock().await;
agents.len()
}
pub async fn trigger_compound_review(
&mut self,
git_ref: &str,
base_ref: &str,
) -> Result<CompoundReviewResult, crate::OrchestratorError> {
self.base.trigger_compound_review(git_ref, base_ref).await
}
pub async fn handoff(
&mut self,
from_agent: &str,
to_agent: &str,
ctx: HandoffContext,
) -> Result<(), crate::OrchestratorError> {
self.base.handoff(from_agent, to_agent, ctx).await
}
}
async fn run_time_mode(components: TimeModeComponents, state: SharedState) {
info!("starting time mode task");
let TimeModeComponents {
mut scheduler,
mut shutdown_rx,
} = components;
let immediate = scheduler.immediate_agents();
for agent in immediate {
info!(agent_name = %agent.name, "spawning immediate Safety agent");
}
loop {
tokio::select! {
event = scheduler.next_event() => {
match event {
ScheduleEvent::Spawn(agent) => {
match state.concurrency.acquire_time_driven().await {
Some(permit) => {
info!(agent_name = %agent.name, "spawning time-driven agent");
drop(permit);
}
None => {
warn!(agent_name = %agent.name, "no slot available for time-driven agent");
}
}
}
ScheduleEvent::Stop { agent_name } => {
info!(agent_name = %agent_name, "stopping agent");
}
ScheduleEvent::CompoundReview => {
info!("compound review triggered");
}
ScheduleEvent::Flow(flow) => {
info!(flow_name = %flow.name, "flow triggered");
}
}
}
_ = shutdown_rx.changed() => {
if *shutdown_rx.borrow() {
info!("time mode shutting down");
break;
}
}
}
}
}
async fn run_issue_mode(components: IssueModeComponents, state: SharedState) {
info!("starting issue mode task");
let IssueModeComponents {
tracker,
workflow,
mut shutdown_rx,
} = components;
let poll_interval = Duration::from_secs(workflow.poll_interval_secs);
loop {
tokio::select! {
_ = tokio::time::sleep(poll_interval) => {
match tracker.fetch_candidate_issues().await {
Ok(issues) => {
info!(count = issues.len(), "fetched candidate issues");
for issue in issues {
if !issue.all_blockers_terminal(&workflow.tracker.states.terminal) {
continue;
}
match state.concurrency.acquire_issue_driven().await {
Some(permit) => {
info!(
issue_id = %issue.id,
title = %issue.title,
"dispatching issue-driven agent"
);
drop(permit);
}
None => {
warn!("no slot available for issue-driven agent");
break; }
}
}
}
Err(e) => {
error!("failed to fetch issues: {}", e);
}
}
}
_ = shutdown_rx.changed() => {
if *shutdown_rx.borrow() {
info!("issue mode shutting down");
break;
}
}
}
}
}
fn create_tracker(workflow: &WorkflowConfig) -> Result<Box<dyn IssueTracker>, String> {
match workflow.tracker.kind.as_str() {
"gitea" => {
use terraphim_tracker::gitea::GiteaConfig;
let tracker = GiteaTracker::new(GiteaConfig {
base_url: workflow.tracker.endpoint.clone(),
token: workflow.tracker.api_key.clone(),
owner: workflow.tracker.owner.clone(),
repo: workflow.tracker.repo.clone(),
active_states: workflow.tracker.states.active.clone(),
terminal_states: workflow.tracker.states.terminal.clone(),
use_robot_api: workflow.tracker.use_robot_api,
robot_path: std::path::PathBuf::from("/home/alex/go/bin/gitea-robot"),
claim_strategy: terraphim_tracker::gitea::ClaimStrategy::PreferRobot,
})
.map_err(|e| format!("failed to create Gitea tracker: {}", e))?;
Ok(Box::new(tracker))
}
"linear" => {
use terraphim_tracker::{LinearConfig, LinearTracker};
let project_slug = workflow
.tracker
.project_slug
.clone()
.ok_or("project_slug required for linear tracker")?;
let tracker = LinearTracker::new(LinearConfig {
endpoint: workflow.tracker.endpoint.clone(),
api_key: workflow.tracker.api_key.clone(),
project_slug,
active_states: workflow.tracker.states.active.clone(),
terminal_states: workflow.tracker.states.terminal.clone(),
})
.map_err(|e| format!("failed to create Linear tracker: {}", e))?;
Ok(Box::new(tracker))
}
_ => Err(format!(
"unsupported tracker kind: {}",
workflow.tracker.kind
)),
}
}
impl Clone for DualModeStats {
fn clone(&self) -> Self {
Self {
time_stats: self.time_stats.clone(),
issue_stats: self.issue_stats.clone(),
total_agents_spawned: self.total_agents_spawned,
active_by_mode: self.active_by_mode.clone(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_execution_mode_display() {
assert_eq!(ExecutionMode::TimeDriven.to_string(), "time");
assert_eq!(ExecutionMode::IssueDriven.to_string(), "issue");
}
#[test]
fn test_dual_mode_stats_default() {
let stats = DualModeStats::default();
assert_eq!(stats.total_agents_spawned, 0);
assert!(stats.time_stats.is_none());
assert!(stats.issue_stats.is_none());
}
}