1use crate::concurrency::ProjectCaps;
7use crate::{
8 AgentDefinition, AgentOrchestrator, CompoundReviewResult, ConcurrencyController,
9 DispatcherStats, FairnessPolicy, HandoffContext, ModeQuotas, OrchestratorConfig, ScheduleEvent,
10 TimeScheduler, WorkflowConfig,
11};
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Duration;
15use terraphim_tracker::{GiteaTracker, IssueTracker};
16use tokio::sync::{mpsc, watch, Mutex};
17use tracing::{error, info, warn};
18
19#[derive(Clone)]
21pub struct SharedState {
22 pub concurrency: ConcurrencyController,
24 pub stats: Arc<Mutex<DualModeStats>>,
26 pub shutdown_tx: watch::Sender<bool>,
28}
29
30#[derive(Debug, Default)]
32pub struct DualModeStats {
33 pub time_stats: Option<DispatcherStats>,
35 pub issue_stats: Option<DispatcherStats>,
37 pub total_agents_spawned: u64,
39 pub active_by_mode: HashMap<String, usize>,
41}
42
43#[derive(Debug, Clone)]
45pub struct AgentId {
46 pub name: String,
48 pub mode: ExecutionMode,
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum ExecutionMode {
55 TimeDriven,
57 IssueDriven,
59}
60
61impl std::fmt::Display for ExecutionMode {
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 match self {
64 ExecutionMode::TimeDriven => write!(f, "time"),
65 ExecutionMode::IssueDriven => write!(f, "issue"),
66 }
67 }
68}
69
70#[derive(Debug, Clone)]
72pub enum SpawnTask {
73 TimeTask { agent: Box<AgentDefinition> },
75 IssueTask { issue_id: String, title: String },
77}
78
79pub struct DualModeOrchestrator {
81 config: OrchestratorConfig,
83 base: AgentOrchestrator,
85 state: SharedState,
87 time_mode: Option<TimeModeComponents>,
89 issue_mode: Option<IssueModeComponents>,
91 task_rx: mpsc::Receiver<SpawnTask>,
93 task_tx: mpsc::Sender<SpawnTask>,
95 active_agents: Arc<Mutex<HashMap<String, AgentId>>>,
97}
98
99struct TimeModeComponents {
101 scheduler: TimeScheduler,
102 shutdown_rx: watch::Receiver<bool>,
103}
104
105struct ProjectTracker {
107 tracker: Box<dyn IssueTracker>,
108 workflow: WorkflowConfig,
109}
110
111struct IssueModeComponents {
113 running_trackers: HashMap<String, ProjectTracker>,
116 shutdown_rx: watch::Receiver<bool>,
117}
118
119impl DualModeOrchestrator {
120 pub fn new(config: OrchestratorConfig) -> Result<Self, crate::OrchestratorError> {
122 let base = AgentOrchestrator::new(config.clone())?;
123
124 let project_caps: HashMap<String, ProjectCaps> = config
126 .projects
127 .iter()
128 .filter_map(|p| {
129 p.max_concurrent_agents.map(|max| {
130 (
131 p.id.clone(),
132 ProjectCaps {
133 max_concurrent_agents: max,
134 max_concurrent_mention_agents: p.max_concurrent_mention_agents,
135 },
136 )
137 })
138 })
139 .collect();
140
141 let concurrency = if let Some(ref workflow) = config.workflow {
143 ConcurrencyController::with_project_caps(
144 workflow.concurrency.global_max,
145 ModeQuotas {
146 time_max: workflow
147 .concurrency
148 .global_max
149 .saturating_sub(workflow.concurrency.issue_max),
150 issue_max: workflow.concurrency.issue_max,
151 },
152 workflow
153 .concurrency
154 .fairness
155 .parse()
156 .unwrap_or(FairnessPolicy::RoundRobin),
157 project_caps,
158 )
159 } else {
160 ConcurrencyController::with_project_caps(
161 10,
162 ModeQuotas::default(),
163 FairnessPolicy::RoundRobin,
164 project_caps,
165 )
166 };
167
168 let (shutdown_tx, _shutdown_rx) = watch::channel(false);
170 let state = SharedState {
171 concurrency,
172 stats: Arc::new(Mutex::new(DualModeStats::default())),
173 shutdown_tx,
174 };
175
176 let (task_tx, task_rx) = mpsc::channel(128);
178
179 let time_mode = {
181 let scheduler =
182 TimeScheduler::new(&config.agents, Some(&config.compound_review.schedule))?;
183 let shutdown_rx = state.shutdown_tx.subscribe();
184 Some(TimeModeComponents {
185 scheduler,
186 shutdown_rx,
187 })
188 };
189
190 let mut running_trackers: HashMap<String, ProjectTracker> = HashMap::new();
194
195 if !config.projects.is_empty() {
196 for project in &config.projects {
197 let Some(workflow) = project.workflow.as_ref() else {
198 continue;
199 };
200 if !workflow.enabled {
201 continue;
202 }
203 match create_tracker(workflow) {
204 Ok(tracker) => {
205 running_trackers.insert(
206 project.id.clone(),
207 ProjectTracker {
208 tracker,
209 workflow: workflow.clone(),
210 },
211 );
212 }
213 Err(e) => warn!(
214 project = %project.id,
215 "failed to create per-project issue tracker: {}",
216 e
217 ),
218 }
219 }
220 } else if let Some(ref workflow) = config.workflow {
221 if workflow.enabled {
222 match create_tracker(workflow) {
223 Ok(tracker) => {
224 running_trackers.insert(
225 crate::dispatcher::LEGACY_PROJECT_ID.to_string(),
226 ProjectTracker {
227 tracker,
228 workflow: workflow.clone(),
229 },
230 );
231 }
232 Err(e) => warn!("failed to create issue tracker: {}", e),
233 }
234 }
235 }
236
237 let issue_mode = if running_trackers.is_empty() {
238 None
239 } else {
240 let shutdown_rx = state.shutdown_tx.subscribe();
241 Some(IssueModeComponents {
242 running_trackers,
243 shutdown_rx,
244 })
245 };
246
247 Ok(Self {
248 config,
249 base,
250 state,
251 time_mode,
252 issue_mode,
253 task_rx,
254 task_tx,
255 active_agents: Arc::new(Mutex::new(HashMap::new())),
256 })
257 }
258
259 pub fn config(&self) -> &OrchestratorConfig {
261 &self.config
262 }
263
264 pub async fn run(&mut self) -> Result<(), crate::OrchestratorError> {
266 info!(
267 agents = self.config.agents.len(),
268 workflow_enabled = self.config.workflow.as_ref().is_some_and(|w| w.enabled),
269 "starting dual-mode orchestrator"
270 );
271
272 let mut time_handle = if let Some(time_components) = self.time_mode.take() {
274 let state = self.state.clone();
275 Some(tokio::spawn(run_time_mode(time_components, state)))
276 } else {
277 None
278 };
279
280 let mut issue_handle = if let Some(issue_components) = self.issue_mode.take() {
282 let state = self.state.clone();
283 Some(tokio::spawn(run_issue_mode(issue_components, state)))
284 } else {
285 None
286 };
287
288 let ctrl_c = tokio::signal::ctrl_c();
290 tokio::pin!(ctrl_c);
291
292 let mut time_done = false;
294 let mut issue_done = false;
295
296 loop {
297 tokio::select! {
298 Some(task) = self.task_rx.recv() => {
300 self.track_spawned_task(task).await;
301 }
302 result = async {
304 match &mut time_handle {
305 Some(h) => h.await,
306 None => std::future::pending().await,
307 }
308 }, if !time_done => {
309 time_done = true;
310 match result {
311 Ok(()) => info!("time mode completed"),
312 Err(e) => error!("time mode panicked: {}", e),
313 }
314 if issue_done { break; }
315 }
316 result = async {
318 match &mut issue_handle {
319 Some(h) => h.await,
320 None => std::future::pending().await,
321 }
322 }, if !issue_done => {
323 issue_done = true;
324 match result {
325 Ok(()) => info!("issue mode completed"),
326 Err(e) => error!("issue mode panicked: {}", e),
327 }
328 if time_done { break; }
329 }
330 result = self.base.run() => {
332 match result {
333 Ok(()) => info!("base orchestrator completed"),
334 Err(e) => error!("base orchestrator error: {}", e),
335 }
336 break;
337 }
338 _ = &mut ctrl_c => {
339 info!("shutdown signal received");
340 let _ = self.state.shutdown_tx.send(true);
341 break;
342 }
343 }
344 }
345
346 info!("shutting down dual-mode orchestrator");
348 self.shutdown().await;
349
350 Ok(())
351 }
352
353 async fn track_spawned_task(&self, task: SpawnTask) {
355 let mut stats = self.state.stats.lock().await;
356 stats.total_agents_spawned += 1;
357 match &task {
358 SpawnTask::TimeTask { agent } => {
359 info!(agent_name = %agent.name, "received time-driven spawn task");
360 let mut agents = self.active_agents.lock().await;
361 agents.insert(
362 agent.name.clone(),
363 AgentId {
364 name: agent.name.clone(),
365 mode: ExecutionMode::TimeDriven,
366 },
367 );
368 *stats.active_by_mode.entry("time".into()).or_insert(0) += 1;
369 }
370 SpawnTask::IssueTask { issue_id, title } => {
371 info!(issue_id = %issue_id, title = %title, "received issue-driven spawn task");
372 let mut agents = self.active_agents.lock().await;
373 agents.insert(
374 issue_id.clone(),
375 AgentId {
376 name: issue_id.clone(),
377 mode: ExecutionMode::IssueDriven,
378 },
379 );
380 *stats.active_by_mode.entry("issue".into()).or_insert(0) += 1;
381 }
382 }
383 }
384
385 pub fn task_sender(&self) -> mpsc::Sender<SpawnTask> {
387 self.task_tx.clone()
388 }
389
390 pub fn request_shutdown(&self) {
392 let _ = self.state.shutdown_tx.send(true);
393 }
394
395 async fn shutdown(&mut self) {
397 info!("initiating graceful shutdown");
398
399 self.request_shutdown();
401
402 let timeout = Duration::from_secs(30);
404 let start = std::time::Instant::now();
405
406 loop {
407 let active_count = {
408 let agents = self.active_agents.lock().await;
409 agents.len()
410 };
411
412 if active_count == 0 {
413 info!("all agents completed");
414 break;
415 }
416
417 if start.elapsed() > timeout {
418 warn!(
419 "shutdown timeout reached with {} agents still active",
420 active_count
421 );
422 break;
423 }
424
425 tokio::time::sleep(Duration::from_millis(100)).await;
426 }
427
428 self.base.shutdown();
430
431 info!("shutdown complete");
432 }
433
434 pub async fn stats(&self) -> DualModeStats {
436 let stats = self.state.stats.lock().await;
437 stats.clone()
438 }
439
440 pub async fn active_count(&self) -> usize {
442 let agents = self.active_agents.lock().await;
443 agents.len()
444 }
445
446 pub async fn trigger_compound_review(
448 &mut self,
449 git_ref: &str,
450 base_ref: &str,
451 ) -> Result<CompoundReviewResult, crate::OrchestratorError> {
452 self.base.trigger_compound_review(git_ref, base_ref).await
453 }
454
455 pub async fn handoff(
457 &mut self,
458 from_agent: &str,
459 to_agent: &str,
460 ctx: HandoffContext,
461 ) -> Result<(), crate::OrchestratorError> {
462 self.base.handoff(from_agent, to_agent, ctx).await
463 }
464}
465
466async fn run_time_mode(components: TimeModeComponents, state: SharedState) {
468 info!("starting time mode task");
469
470 let TimeModeComponents {
471 mut scheduler,
472 mut shutdown_rx,
473 } = components;
474
475 let immediate = scheduler.immediate_agents();
477 for agent in immediate {
478 info!(agent_name = %agent.name, "spawning immediate Safety agent");
479 }
481
482 loop {
483 tokio::select! {
484 event = scheduler.next_event() => {
485 match event {
486 ScheduleEvent::Spawn(agent) => {
487 let project = agent
488 .project
489 .clone()
490 .unwrap_or_else(|| crate::dispatcher::LEGACY_PROJECT_ID.to_string());
491 match state.concurrency.acquire_time_driven(&project).await {
493 Some(permit) => {
494 info!(agent_name = %agent.name, "spawning time-driven agent");
495 drop(permit);
497 }
498 None => {
499 warn!(agent_name = %agent.name, "no slot available for time-driven agent");
500 }
501 }
502 }
503 ScheduleEvent::Stop { agent_name } => {
504 info!(agent_name = %agent_name, "stopping agent");
505 }
506 ScheduleEvent::CompoundReview => {
507 info!("compound review triggered");
508 }
509 ScheduleEvent::Flow(flow) => {
510 info!(flow_name = %flow.name, "flow triggered");
511 }
512 }
513 }
514 _ = shutdown_rx.changed() => {
515 if *shutdown_rx.borrow() {
516 info!("time mode shutting down");
517 break;
518 }
519 }
520 }
521 }
522}
523
524async fn run_issue_mode(components: IssueModeComponents, state: SharedState) {
527 info!(
528 projects = components.running_trackers.len(),
529 "starting issue mode task"
530 );
531
532 let IssueModeComponents {
533 running_trackers,
534 mut shutdown_rx,
535 } = components;
536
537 let poll_interval = running_trackers
542 .values()
543 .map(|p| p.workflow.poll_interval_secs)
544 .min()
545 .map(Duration::from_secs)
546 .unwrap_or_else(|| Duration::from_secs(60));
547
548 loop {
549 tokio::select! {
550 _ = tokio::time::sleep(poll_interval) => {
551 for (project_id, project_tracker) in running_trackers.iter() {
552 let ProjectTracker { tracker, workflow } = project_tracker;
553 match tracker.fetch_candidate_issues().await {
554 Ok(issues) => {
555 info!(
556 project = %project_id,
557 count = issues.len(),
558 "fetched candidate issues"
559 );
560
561 for issue in issues {
562 if !issue.all_blockers_terminal(&workflow.tracker.states.terminal) {
564 continue;
565 }
566
567 match state
569 .concurrency
570 .acquire_issue_driven(project_id)
571 .await
572 {
573 Some(permit) => {
574 info!(
575 project = %project_id,
576 issue_id = %issue.id,
577 title = %issue.title,
578 "dispatching issue-driven agent"
579 );
580 drop(permit);
582 }
583 None => {
584 warn!(
585 project = %project_id,
586 "no slot available for issue-driven agent"
587 );
588 break; }
590 }
591 }
592 }
593 Err(e) => {
594 error!(project = %project_id, "failed to fetch issues: {}", e);
595 }
596 }
597 }
598 }
599 _ = shutdown_rx.changed() => {
600 if *shutdown_rx.borrow() {
601 info!("issue mode shutting down");
602 break;
603 }
604 }
605 }
606 }
607}
608
609fn create_tracker(workflow: &WorkflowConfig) -> Result<Box<dyn IssueTracker>, String> {
611 match workflow.tracker.kind.as_str() {
612 "gitea" => {
613 use terraphim_tracker::gitea::GiteaConfig;
614 let tracker = GiteaTracker::new(GiteaConfig {
615 base_url: workflow.tracker.endpoint.clone(),
616 token: workflow.tracker.api_key.clone(),
617 owner: workflow.tracker.owner.clone(),
618 repo: workflow.tracker.repo.clone(),
619 active_states: workflow.tracker.states.active.clone(),
620 terminal_states: workflow.tracker.states.terminal.clone(),
621 use_robot_api: workflow.tracker.use_robot_api,
622 robot_path: std::path::PathBuf::from("/home/alex/go/bin/gitea-robot"),
623 claim_strategy: terraphim_tracker::gitea::ClaimStrategy::PreferRobot,
624 })
625 .map_err(|e| format!("failed to create Gitea tracker: {}", e))?;
626
627 Ok(Box::new(tracker))
628 }
629 "linear" => {
630 use terraphim_tracker::{LinearConfig, LinearTracker};
631 let project_slug = workflow
632 .tracker
633 .project_slug
634 .clone()
635 .ok_or("project_slug required for linear tracker")?;
636 let tracker = LinearTracker::new(LinearConfig {
637 endpoint: workflow.tracker.endpoint.clone(),
638 api_key: workflow.tracker.api_key.clone(),
639 project_slug,
640 active_states: workflow.tracker.states.active.clone(),
641 terminal_states: workflow.tracker.states.terminal.clone(),
642 })
643 .map_err(|e| format!("failed to create Linear tracker: {}", e))?;
644
645 Ok(Box::new(tracker))
646 }
647 _ => Err(format!(
648 "unsupported tracker kind: {}",
649 workflow.tracker.kind
650 )),
651 }
652}
653
654impl Clone for DualModeStats {
655 fn clone(&self) -> Self {
656 Self {
657 time_stats: self.time_stats.clone(),
658 issue_stats: self.issue_stats.clone(),
659 total_agents_spawned: self.total_agents_spawned,
660 active_by_mode: self.active_by_mode.clone(),
661 }
662 }
663}
664
665#[cfg(test)]
666mod tests {
667 use super::*;
668
669 #[test]
670 fn test_execution_mode_display() {
671 assert_eq!(ExecutionMode::TimeDriven.to_string(), "time");
672 assert_eq!(ExecutionMode::IssueDriven.to_string(), "issue");
673 }
674
675 #[test]
676 fn test_dual_mode_stats_default() {
677 let stats = DualModeStats::default();
678 assert_eq!(stats.total_agents_spawned, 0);
679 assert!(stats.time_stats.is_none());
680 assert!(stats.issue_stats.is_none());
681 }
682}