Skip to main content

vtcode_core/orchestrator/
mod.rs

1//! Distributed orchestration primitives for cloud/edge/on-prem scheduling.
2
3mod executor;
4mod scheduler;
5
6use anyhow::{Context, Result, anyhow};
7use serde_json::Value;
8use std::fmt;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::time::timeout;
12use tracing::{debug, warn};
13
14pub use executor::{ExecutorRegistry, LocalExecutor, WorkExecutor};
15pub use scheduler::Scheduler;
16
17/// Execution target supported by the orchestrator.
18#[derive(Debug, Clone, PartialEq, Eq, Hash)]
19pub enum ExecutionTarget {
20    Cloud,
21    Edge,
22    OnPrem,
23    Custom(String),
24}
25
26impl fmt::Display for ExecutionTarget {
27    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
28        match self {
29            ExecutionTarget::Cloud => write!(f, "cloud"),
30            ExecutionTarget::Edge => write!(f, "edge"),
31            ExecutionTarget::OnPrem => write!(f, "on-prem"),
32            ExecutionTarget::Custom(name) => write!(f, "{name}"),
33        }
34    }
35}
36
37/// Workload scheduled for execution.
38#[derive(Debug, Clone)]
39pub struct ScheduledWork {
40    pub id: String,
41    pub target: ExecutionTarget,
42    pub payload: Value,
43    pub metadata: Value,
44}
45
46impl ScheduledWork {
47    pub fn new(
48        id: impl Into<String>,
49        target: ExecutionTarget,
50        payload: Value,
51        metadata: Value,
52    ) -> Self {
53        Self {
54            id: id.into(),
55            target,
56            payload,
57            metadata,
58        }
59    }
60}
61
62/// Main orchestrator that coordinates scheduling and execution.
63pub struct DistributedOrchestrator {
64    scheduler: Scheduler,
65    executors: ExecutorRegistry,
66}
67
68impl DistributedOrchestrator {
69    pub fn new() -> Self {
70        let mut executors = ExecutorRegistry::default();
71        executors.register("cloud", Arc::new(LocalExecutor));
72        executors.register("edge", Arc::new(LocalExecutor));
73        executors.register("on-prem", Arc::new(LocalExecutor));
74
75        Self {
76            scheduler: Scheduler::new(),
77            executors,
78        }
79    }
80
81    pub fn register_executor(
82        &mut self,
83        target: impl Into<String>,
84        executor: Arc<dyn WorkExecutor>,
85    ) {
86        self.executors.register(target, executor);
87    }
88
89    pub async fn submit(&self, work: ScheduledWork) -> Result<()> {
90        self.scheduler.enqueue(work).await;
91        Ok(())
92    }
93
94    pub async fn tick(&self) -> Result<Option<Value>> {
95        if let Some(work) = self.scheduler.next().await {
96            let target_key = work.target.to_string();
97            let executor = self
98                .executors
99                .get(&target_key)
100                .context("executor not registered for target")?;
101
102            // Prevent long-running executors from blocking the queue forever.
103            let exec_deadline = Duration::from_secs(30);
104            match timeout(exec_deadline, executor.execute(work)).await {
105                Ok(result) => {
106                    debug!(target = %target_key, "executor finished work item");
107                    return Ok(Some(result?));
108                }
109                Err(_) => {
110                    warn!(target = %target_key, "executor timed out after {:?}", exec_deadline);
111                    return Err(anyhow!(
112                        "executor for target {} timed out after {:?}",
113                        target_key,
114                        exec_deadline
115                    ));
116                }
117            }
118        }
119
120        Ok(None)
121    }
122
123    pub async fn queue_depth(&self) -> usize {
124        self.scheduler.queue_depth().await
125    }
126}
127
128impl Default for DistributedOrchestrator {
129    fn default() -> Self {
130        Self::new()
131    }
132}
133
134#[cfg(test)]
135mod tests {
136    use super::*;
137
138    #[tokio::test]
139    async fn schedules_and_executes_work() {
140        let orchestrator = DistributedOrchestrator::new();
141        orchestrator
142            .submit(ScheduledWork::new(
143                "job-1",
144                ExecutionTarget::Cloud,
145                serde_json::json!({"task": "compile"}),
146                Value::Null,
147            ))
148            .await
149            .expect("submit should succeed");
150
151        let result = orchestrator.tick().await.expect("tick should run");
152        assert!(result.is_some());
153    }
154}