Skip to main content

terraphim_orchestrator/
dual_mode.rs

1//! Dual-mode orchestrator with full integration.
2//!
3//! Manages both time-driven and issue-driven agent execution modes
4//! with shared concurrency control and unified status.
5
6use crate::concurrency::ProjectCaps;
7use crate::{
8    AgentDefinition, AgentOrchestrator, CompoundReviewResult, ConcurrencyController,
9    DispatcherStats, FairnessPolicy, HandoffContext, ModeQuotas, OrchestratorConfig, ScheduleEvent,
10    TimeScheduler, WorkflowConfig,
11};
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Duration;
15use terraphim_tracker::{GiteaTracker, IssueTracker};
16use tokio::sync::{mpsc, watch, Mutex};
17use tracing::{error, info, warn};
18
19/// Shared state between time and issue modes.
20#[derive(Clone)]
21pub struct SharedState {
22    /// Concurrency controller for agent limits.
23    pub concurrency: ConcurrencyController,
24    /// Statistics from both modes.
25    pub stats: Arc<Mutex<DualModeStats>>,
26    /// Shutdown signal.
27    pub shutdown_tx: watch::Sender<bool>,
28}
29
30/// Statistics for dual-mode operation.
31#[derive(Debug, Default)]
32pub struct DualModeStats {
33    /// Time-driven statistics.
34    pub time_stats: Option<DispatcherStats>,
35    /// Issue-driven statistics.
36    pub issue_stats: Option<DispatcherStats>,
37    /// Total agents spawned.
38    pub total_agents_spawned: u64,
39    /// Active agents by mode.
40    pub active_by_mode: HashMap<String, usize>,
41}
42
43/// Agent identifier with mode.
44#[derive(Debug, Clone)]
45pub struct AgentId {
46    /// Agent name or issue identifier.
47    pub name: String,
48    /// Source mode.
49    pub mode: ExecutionMode,
50}
51
52/// Execution mode.
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum ExecutionMode {
55    /// Time-driven (cron-based).
56    TimeDriven,
57    /// Issue-driven (tracker-based).
58    IssueDriven,
59}
60
61impl std::fmt::Display for ExecutionMode {
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        match self {
64            ExecutionMode::TimeDriven => write!(f, "time"),
65            ExecutionMode::IssueDriven => write!(f, "issue"),
66        }
67    }
68}
69
70/// Task spawned by either mode.
71#[derive(Debug, Clone)]
72pub enum SpawnTask {
73    /// Time-driven agent task.
74    TimeTask { agent: Box<AgentDefinition> },
75    /// Issue-driven agent task.
76    IssueTask { issue_id: String, title: String },
77}
78
79/// Full dual-mode orchestrator.
80pub struct DualModeOrchestrator {
81    /// Configuration.
82    config: OrchestratorConfig,
83    /// Base orchestrator for time-driven mode.
84    base: AgentOrchestrator,
85    /// Shared state.
86    state: SharedState,
87    /// Time mode controller (if enabled).
88    time_mode: Option<TimeModeComponents>,
89    /// Issue mode controller (if enabled).
90    issue_mode: Option<IssueModeComponents>,
91    /// Task receiver from both modes.
92    task_rx: mpsc::Receiver<SpawnTask>,
93    /// Task sender for dispatching from modes.
94    task_tx: mpsc::Sender<SpawnTask>,
95    /// Active agents.
96    active_agents: Arc<Mutex<HashMap<String, AgentId>>>,
97}
98
99/// Components for time mode.
100struct TimeModeComponents {
101    scheduler: TimeScheduler,
102    shutdown_rx: watch::Receiver<bool>,
103}
104
105/// Per-project tracker state for issue mode.
106struct ProjectTracker {
107    tracker: Box<dyn IssueTracker>,
108    workflow: WorkflowConfig,
109}
110
111/// Components for issue mode.
112struct IssueModeComponents {
113    /// Trackers keyed by project id. Legacy single-project mode uses
114    /// [`crate::dispatcher::LEGACY_PROJECT_ID`] as the key.
115    running_trackers: HashMap<String, ProjectTracker>,
116    shutdown_rx: watch::Receiver<bool>,
117}
118
119impl DualModeOrchestrator {
120    /// Create a new dual-mode orchestrator.
121    pub fn new(config: OrchestratorConfig) -> Result<Self, crate::OrchestratorError> {
122        let base = AgentOrchestrator::new(config.clone())?;
123
124        // Collect per-project concurrency caps from project definitions.
125        let project_caps: HashMap<String, ProjectCaps> = config
126            .projects
127            .iter()
128            .filter_map(|p| {
129                p.max_concurrent_agents.map(|max| {
130                    (
131                        p.id.clone(),
132                        ProjectCaps {
133                            max_concurrent_agents: max,
134                            max_concurrent_mention_agents: p.max_concurrent_mention_agents,
135                        },
136                    )
137                })
138            })
139            .collect();
140
141        // Create concurrency controller
142        let concurrency = if let Some(ref workflow) = config.workflow {
143            ConcurrencyController::with_project_caps(
144                workflow.concurrency.global_max,
145                ModeQuotas {
146                    time_max: workflow
147                        .concurrency
148                        .global_max
149                        .saturating_sub(workflow.concurrency.issue_max),
150                    issue_max: workflow.concurrency.issue_max,
151                },
152                workflow
153                    .concurrency
154                    .fairness
155                    .parse()
156                    .unwrap_or(FairnessPolicy::RoundRobin),
157                project_caps,
158            )
159        } else {
160            ConcurrencyController::with_project_caps(
161                10,
162                ModeQuotas::default(),
163                FairnessPolicy::RoundRobin,
164                project_caps,
165            )
166        };
167
168        // Create shared state
169        let (shutdown_tx, _shutdown_rx) = watch::channel(false);
170        let state = SharedState {
171            concurrency,
172            stats: Arc::new(Mutex::new(DualModeStats::default())),
173            shutdown_tx,
174        };
175
176        // Create task channel for modes to send spawned tasks to orchestrator
177        let (task_tx, task_rx) = mpsc::channel(128);
178
179        // Setup time mode
180        let time_mode = {
181            let scheduler =
182                TimeScheduler::new(&config.agents, Some(&config.compound_review.schedule))?;
183            let shutdown_rx = state.shutdown_tx.subscribe();
184            Some(TimeModeComponents {
185                scheduler,
186                shutdown_rx,
187            })
188        };
189
190        // Setup issue mode if configured. Build one tracker per project when
191        // multi-project config is present; otherwise fall back to the top-level
192        // workflow with the legacy global project id.
193        let mut running_trackers: HashMap<String, ProjectTracker> = HashMap::new();
194
195        if !config.projects.is_empty() {
196            for project in &config.projects {
197                let Some(workflow) = project.workflow.as_ref() else {
198                    continue;
199                };
200                if !workflow.enabled {
201                    continue;
202                }
203                match create_tracker(workflow) {
204                    Ok(tracker) => {
205                        running_trackers.insert(
206                            project.id.clone(),
207                            ProjectTracker {
208                                tracker,
209                                workflow: workflow.clone(),
210                            },
211                        );
212                    }
213                    Err(e) => warn!(
214                        project = %project.id,
215                        "failed to create per-project issue tracker: {}",
216                        e
217                    ),
218                }
219            }
220        } else if let Some(ref workflow) = config.workflow {
221            if workflow.enabled {
222                match create_tracker(workflow) {
223                    Ok(tracker) => {
224                        running_trackers.insert(
225                            crate::dispatcher::LEGACY_PROJECT_ID.to_string(),
226                            ProjectTracker {
227                                tracker,
228                                workflow: workflow.clone(),
229                            },
230                        );
231                    }
232                    Err(e) => warn!("failed to create issue tracker: {}", e),
233                }
234            }
235        }
236
237        let issue_mode = if running_trackers.is_empty() {
238            None
239        } else {
240            let shutdown_rx = state.shutdown_tx.subscribe();
241            Some(IssueModeComponents {
242                running_trackers,
243                shutdown_rx,
244            })
245        };
246
247        Ok(Self {
248            config,
249            base,
250            state,
251            time_mode,
252            issue_mode,
253            task_rx,
254            task_tx,
255            active_agents: Arc::new(Mutex::new(HashMap::new())),
256        })
257    }
258
259    /// Access the orchestrator configuration.
260    pub fn config(&self) -> &OrchestratorConfig {
261        &self.config
262    }
263
264    /// Run the dual-mode orchestrator.
265    pub async fn run(&mut self) -> Result<(), crate::OrchestratorError> {
266        info!(
267            agents = self.config.agents.len(),
268            workflow_enabled = self.config.workflow.as_ref().is_some_and(|w| w.enabled),
269            "starting dual-mode orchestrator"
270        );
271
272        // Start time mode task
273        let mut time_handle = if let Some(time_components) = self.time_mode.take() {
274            let state = self.state.clone();
275            Some(tokio::spawn(run_time_mode(time_components, state)))
276        } else {
277            None
278        };
279
280        // Start issue mode task
281        let mut issue_handle = if let Some(issue_components) = self.issue_mode.take() {
282            let state = self.state.clone();
283            Some(tokio::spawn(run_issue_mode(issue_components, state)))
284        } else {
285            None
286        };
287
288        // Wait for shutdown signal, task completion, or spawned tasks
289        let ctrl_c = tokio::signal::ctrl_c();
290        tokio::pin!(ctrl_c);
291
292        // Pin the optional handles for select
293        let mut time_done = false;
294        let mut issue_done = false;
295
296        loop {
297            tokio::select! {
298                // Receive spawned tasks from modes and track them
299                Some(task) = self.task_rx.recv() => {
300                    self.track_spawned_task(task).await;
301                }
302                // Wait for time mode completion
303                result = async {
304                    match &mut time_handle {
305                        Some(h) => h.await,
306                        None => std::future::pending().await,
307                    }
308                }, if !time_done => {
309                    time_done = true;
310                    match result {
311                        Ok(()) => info!("time mode completed"),
312                        Err(e) => error!("time mode panicked: {}", e),
313                    }
314                    if issue_done { break; }
315                }
316                // Wait for issue mode completion
317                result = async {
318                    match &mut issue_handle {
319                        Some(h) => h.await,
320                        None => std::future::pending().await,
321                    }
322                }, if !issue_done => {
323                    issue_done = true;
324                    match result {
325                        Ok(()) => info!("issue mode completed"),
326                        Err(e) => error!("issue mode panicked: {}", e),
327                    }
328                    if time_done { break; }
329                }
330                // Base orchestrator reconciliation loop
331                result = self.base.run() => {
332                    match result {
333                        Ok(()) => info!("base orchestrator completed"),
334                        Err(e) => error!("base orchestrator error: {}", e),
335                    }
336                    break;
337                }
338                _ = &mut ctrl_c => {
339                    info!("shutdown signal received");
340                    let _ = self.state.shutdown_tx.send(true);
341                    break;
342                }
343            }
344        }
345
346        // Graceful shutdown
347        info!("shutting down dual-mode orchestrator");
348        self.shutdown().await;
349
350        Ok(())
351    }
352
353    /// Track a spawned task from either mode.
354    async fn track_spawned_task(&self, task: SpawnTask) {
355        let mut stats = self.state.stats.lock().await;
356        stats.total_agents_spawned += 1;
357        match &task {
358            SpawnTask::TimeTask { agent } => {
359                info!(agent_name = %agent.name, "received time-driven spawn task");
360                let mut agents = self.active_agents.lock().await;
361                agents.insert(
362                    agent.name.clone(),
363                    AgentId {
364                        name: agent.name.clone(),
365                        mode: ExecutionMode::TimeDriven,
366                    },
367                );
368                *stats.active_by_mode.entry("time".into()).or_insert(0) += 1;
369            }
370            SpawnTask::IssueTask { issue_id, title } => {
371                info!(issue_id = %issue_id, title = %title, "received issue-driven spawn task");
372                let mut agents = self.active_agents.lock().await;
373                agents.insert(
374                    issue_id.clone(),
375                    AgentId {
376                        name: issue_id.clone(),
377                        mode: ExecutionMode::IssueDriven,
378                    },
379                );
380                *stats.active_by_mode.entry("issue".into()).or_insert(0) += 1;
381            }
382        }
383    }
384
385    /// Get a clone of the task sender for external dispatch.
386    pub fn task_sender(&self) -> mpsc::Sender<SpawnTask> {
387        self.task_tx.clone()
388    }
389
390    /// Request shutdown.
391    pub fn request_shutdown(&self) {
392        let _ = self.state.shutdown_tx.send(true);
393    }
394
395    /// Shutdown gracefully.
396    async fn shutdown(&mut self) {
397        info!("initiating graceful shutdown");
398
399        // Stop accepting new tasks
400        self.request_shutdown();
401
402        // Wait for active agents to complete
403        let timeout = Duration::from_secs(30);
404        let start = std::time::Instant::now();
405
406        loop {
407            let active_count = {
408                let agents = self.active_agents.lock().await;
409                agents.len()
410            };
411
412            if active_count == 0 {
413                info!("all agents completed");
414                break;
415            }
416
417            if start.elapsed() > timeout {
418                warn!(
419                    "shutdown timeout reached with {} agents still active",
420                    active_count
421                );
422                break;
423            }
424
425            tokio::time::sleep(Duration::from_millis(100)).await;
426        }
427
428        // Shutdown base orchestrator
429        self.base.shutdown();
430
431        info!("shutdown complete");
432    }
433
434    /// Get current statistics.
435    pub async fn stats(&self) -> DualModeStats {
436        let stats = self.state.stats.lock().await;
437        stats.clone()
438    }
439
440    /// Get active agent count.
441    pub async fn active_count(&self) -> usize {
442        let agents = self.active_agents.lock().await;
443        agents.len()
444    }
445
446    /// Trigger compound review.
447    pub async fn trigger_compound_review(
448        &mut self,
449        git_ref: &str,
450        base_ref: &str,
451    ) -> Result<CompoundReviewResult, crate::OrchestratorError> {
452        self.base.trigger_compound_review(git_ref, base_ref).await
453    }
454
455    /// Handoff task between agents.
456    pub async fn handoff(
457        &mut self,
458        from_agent: &str,
459        to_agent: &str,
460        ctx: HandoffContext,
461    ) -> Result<(), crate::OrchestratorError> {
462        self.base.handoff(from_agent, to_agent, ctx).await
463    }
464}
465
466/// Run time mode in background.
467async fn run_time_mode(components: TimeModeComponents, state: SharedState) {
468    info!("starting time mode task");
469
470    let TimeModeComponents {
471        mut scheduler,
472        mut shutdown_rx,
473    } = components;
474
475    // Get immediate agents (Safety layer)
476    let immediate = scheduler.immediate_agents();
477    for agent in immediate {
478        info!(agent_name = %agent.name, "spawning immediate Safety agent");
479        // Safety agents spawn without concurrency limit
480    }
481
482    loop {
483        tokio::select! {
484            event = scheduler.next_event() => {
485                match event {
486                    ScheduleEvent::Spawn(agent) => {
487                        let project = agent
488                            .project
489                            .clone()
490                            .unwrap_or_else(|| crate::dispatcher::LEGACY_PROJECT_ID.to_string());
491                        // Try to acquire time-driven slot for this project
492                        match state.concurrency.acquire_time_driven(&project).await {
493                            Some(permit) => {
494                                info!(agent_name = %agent.name, "spawning time-driven agent");
495                                // Spawn agent here
496                                drop(permit);
497                            }
498                            None => {
499                                warn!(agent_name = %agent.name, "no slot available for time-driven agent");
500                            }
501                        }
502                    }
503                    ScheduleEvent::Stop { agent_name } => {
504                        info!(agent_name = %agent_name, "stopping agent");
505                    }
506                    ScheduleEvent::CompoundReview => {
507                        info!("compound review triggered");
508                    }
509                    ScheduleEvent::Flow(flow) => {
510                        info!(flow_name = %flow.name, "flow triggered");
511                    }
512                }
513            }
514            _ = shutdown_rx.changed() => {
515                if *shutdown_rx.borrow() {
516                    info!("time mode shutting down");
517                    break;
518                }
519            }
520        }
521    }
522}
523
524/// Run issue mode in background. Polls every configured tracker and
525/// dispatches against its owning project id.
526async fn run_issue_mode(components: IssueModeComponents, state: SharedState) {
527    info!(
528        projects = components.running_trackers.len(),
529        "starting issue mode task"
530    );
531
532    let IssueModeComponents {
533        running_trackers,
534        mut shutdown_rx,
535    } = components;
536
537    // Use the shortest configured poll interval across projects so every
538    // tracker gets polled at least as often as its own setting requires. A
539    // per-project cadence could be added later by moving each tracker into
540    // its own task; a single shared interval keeps the loop simple for now.
541    let poll_interval = running_trackers
542        .values()
543        .map(|p| p.workflow.poll_interval_secs)
544        .min()
545        .map(Duration::from_secs)
546        .unwrap_or_else(|| Duration::from_secs(60));
547
548    loop {
549        tokio::select! {
550            _ = tokio::time::sleep(poll_interval) => {
551                for (project_id, project_tracker) in running_trackers.iter() {
552                    let ProjectTracker { tracker, workflow } = project_tracker;
553                    match tracker.fetch_candidate_issues().await {
554                        Ok(issues) => {
555                            info!(
556                                project = %project_id,
557                                count = issues.len(),
558                                "fetched candidate issues"
559                            );
560
561                            for issue in issues {
562                                // Skip blocked issues
563                                if !issue.all_blockers_terminal(&workflow.tracker.states.terminal) {
564                                    continue;
565                                }
566
567                                // Try to acquire issue-driven slot for this project.
568                                match state
569                                    .concurrency
570                                    .acquire_issue_driven(project_id)
571                                    .await
572                                {
573                                    Some(permit) => {
574                                        info!(
575                                            project = %project_id,
576                                            issue_id = %issue.id,
577                                            title = %issue.title,
578                                            "dispatching issue-driven agent"
579                                        );
580                                        // Spawn agent here
581                                        drop(permit);
582                                    }
583                                    None => {
584                                        warn!(
585                                            project = %project_id,
586                                            "no slot available for issue-driven agent"
587                                        );
588                                        break; // Stop trying for this project until slots free up
589                                    }
590                                }
591                            }
592                        }
593                        Err(e) => {
594                            error!(project = %project_id, "failed to fetch issues: {}", e);
595                        }
596                    }
597                }
598            }
599            _ = shutdown_rx.changed() => {
600                if *shutdown_rx.borrow() {
601                    info!("issue mode shutting down");
602                    break;
603                }
604            }
605        }
606    }
607}
608
609/// Create tracker from workflow config.
610fn create_tracker(workflow: &WorkflowConfig) -> Result<Box<dyn IssueTracker>, String> {
611    match workflow.tracker.kind.as_str() {
612        "gitea" => {
613            use terraphim_tracker::gitea::GiteaConfig;
614            let tracker = GiteaTracker::new(GiteaConfig {
615                base_url: workflow.tracker.endpoint.clone(),
616                token: workflow.tracker.api_key.clone(),
617                owner: workflow.tracker.owner.clone(),
618                repo: workflow.tracker.repo.clone(),
619                active_states: workflow.tracker.states.active.clone(),
620                terminal_states: workflow.tracker.states.terminal.clone(),
621                use_robot_api: workflow.tracker.use_robot_api,
622                robot_path: std::path::PathBuf::from("/home/alex/go/bin/gitea-robot"),
623                claim_strategy: terraphim_tracker::gitea::ClaimStrategy::PreferRobot,
624            })
625            .map_err(|e| format!("failed to create Gitea tracker: {}", e))?;
626
627            Ok(Box::new(tracker))
628        }
629        "linear" => {
630            use terraphim_tracker::{LinearConfig, LinearTracker};
631            let project_slug = workflow
632                .tracker
633                .project_slug
634                .clone()
635                .ok_or("project_slug required for linear tracker")?;
636            let tracker = LinearTracker::new(LinearConfig {
637                endpoint: workflow.tracker.endpoint.clone(),
638                api_key: workflow.tracker.api_key.clone(),
639                project_slug,
640                active_states: workflow.tracker.states.active.clone(),
641                terminal_states: workflow.tracker.states.terminal.clone(),
642            })
643            .map_err(|e| format!("failed to create Linear tracker: {}", e))?;
644
645            Ok(Box::new(tracker))
646        }
647        _ => Err(format!(
648            "unsupported tracker kind: {}",
649            workflow.tracker.kind
650        )),
651    }
652}
653
654impl Clone for DualModeStats {
655    fn clone(&self) -> Self {
656        Self {
657            time_stats: self.time_stats.clone(),
658            issue_stats: self.issue_stats.clone(),
659            total_agents_spawned: self.total_agents_spawned,
660            active_by_mode: self.active_by_mode.clone(),
661        }
662    }
663}
664
665#[cfg(test)]
666mod tests {
667    use super::*;
668
669    #[test]
670    fn test_execution_mode_display() {
671        assert_eq!(ExecutionMode::TimeDriven.to_string(), "time");
672        assert_eq!(ExecutionMode::IssueDriven.to_string(), "issue");
673    }
674
675    #[test]
676    fn test_dual_mode_stats_default() {
677        let stats = DualModeStats::default();
678        assert_eq!(stats.total_agents_spawned, 0);
679        assert!(stats.time_stats.is_none());
680        assert!(stats.issue_stats.is_none());
681    }
682}