vtcode_core/orchestrator/
mod.rs1mod executor;
4mod scheduler;
5
6use anyhow::{Context, Result, anyhow};
7use serde_json::Value;
8use std::fmt;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::time::timeout;
12use tracing::{debug, warn};
13
14pub use executor::{ExecutorRegistry, LocalExecutor, WorkExecutor};
15pub use scheduler::Scheduler;
16
17#[derive(Debug, Clone, PartialEq, Eq, Hash)]
19pub enum ExecutionTarget {
20 Cloud,
21 Edge,
22 OnPrem,
23 Custom(String),
24}
25
26impl fmt::Display for ExecutionTarget {
27 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
28 match self {
29 ExecutionTarget::Cloud => write!(f, "cloud"),
30 ExecutionTarget::Edge => write!(f, "edge"),
31 ExecutionTarget::OnPrem => write!(f, "on-prem"),
32 ExecutionTarget::Custom(name) => write!(f, "{name}"),
33 }
34 }
35}
36
37#[derive(Debug, Clone)]
39pub struct ScheduledWork {
40 pub id: String,
41 pub target: ExecutionTarget,
42 pub payload: Value,
43 pub metadata: Value,
44}
45
46impl ScheduledWork {
47 pub fn new(
48 id: impl Into<String>,
49 target: ExecutionTarget,
50 payload: Value,
51 metadata: Value,
52 ) -> Self {
53 Self {
54 id: id.into(),
55 target,
56 payload,
57 metadata,
58 }
59 }
60}
61
62pub struct DistributedOrchestrator {
64 scheduler: Scheduler,
65 executors: ExecutorRegistry,
66}
67
68impl DistributedOrchestrator {
69 pub fn new() -> Self {
70 let mut executors = ExecutorRegistry::default();
71 executors.register("cloud", Arc::new(LocalExecutor));
72 executors.register("edge", Arc::new(LocalExecutor));
73 executors.register("on-prem", Arc::new(LocalExecutor));
74
75 Self {
76 scheduler: Scheduler::new(),
77 executors,
78 }
79 }
80
81 pub fn register_executor(
82 &mut self,
83 target: impl Into<String>,
84 executor: Arc<dyn WorkExecutor>,
85 ) {
86 self.executors.register(target, executor);
87 }
88
89 pub async fn submit(&self, work: ScheduledWork) -> Result<()> {
90 self.scheduler.enqueue(work).await;
91 Ok(())
92 }
93
94 pub async fn tick(&self) -> Result<Option<Value>> {
95 if let Some(work) = self.scheduler.next().await {
96 let target_key = work.target.to_string();
97 let executor = self
98 .executors
99 .get(&target_key)
100 .context("executor not registered for target")?;
101
102 let exec_deadline = Duration::from_secs(30);
104 match timeout(exec_deadline, executor.execute(work)).await {
105 Ok(result) => {
106 debug!(target = %target_key, "executor finished work item");
107 return Ok(Some(result?));
108 }
109 Err(_) => {
110 warn!(target = %target_key, "executor timed out after {:?}", exec_deadline);
111 return Err(anyhow!(
112 "executor for target {} timed out after {:?}",
113 target_key,
114 exec_deadline
115 ));
116 }
117 }
118 }
119
120 Ok(None)
121 }
122
123 pub async fn queue_depth(&self) -> usize {
124 self.scheduler.queue_depth().await
125 }
126}
127
128impl Default for DistributedOrchestrator {
129 fn default() -> Self {
130 Self::new()
131 }
132}
133
134#[cfg(test)]
135mod tests {
136 use super::*;
137
138 #[tokio::test]
139 async fn schedules_and_executes_work() {
140 let orchestrator = DistributedOrchestrator::new();
141 orchestrator
142 .submit(ScheduledWork::new(
143 "job-1",
144 ExecutionTarget::Cloud,
145 serde_json::json!({"task": "compile"}),
146 Value::Null,
147 ))
148 .await
149 .expect("submit should succeed");
150
151 let result = orchestrator.tick().await.expect("tick should run");
152 assert!(result.is_some());
153 }
154}