1use devsper_core::{GraphMutation, NodeSpec};
2use devsper_graph::GraphHandle;
3use devsper_scheduler::Scheduler;
4use anyhow::Result;
5use std::sync::Arc;
6use tokio::sync::Semaphore;
7use tokio::time::{sleep, Duration};
8use tracing::{debug, error, info, warn};
9
10pub struct AgentOutput {
12 pub result: serde_json::Value,
13 pub mutations: Vec<GraphMutation>,
14}
15
16pub type AgentFn = Arc<
20 dyn Fn(NodeSpec) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<AgentOutput, String>> + Send>>
21 + Send
22 + Sync,
23>;
24
25#[derive(Debug, Clone)]
27pub struct ExecutorConfig {
28 pub worker_count: usize,
30 pub poll_interval_ms: u64,
32}
33
34impl Default for ExecutorConfig {
35 fn default() -> Self {
36 Self {
37 worker_count: 4,
38 poll_interval_ms: 50,
39 }
40 }
41}
42
43pub struct Executor {
45 config: ExecutorConfig,
46 scheduler: Arc<Scheduler>,
47 handle: GraphHandle,
48 agent_fn: AgentFn,
49}
50
51impl Executor {
52 pub fn new(
53 config: ExecutorConfig,
54 scheduler: Arc<Scheduler>,
55 handle: GraphHandle,
56 agent_fn: AgentFn,
57 ) -> Self {
58 Self {
59 config,
60 scheduler,
61 handle,
62 agent_fn,
63 }
64 }
65
66 pub async fn run(self) -> Result<()> {
68 let semaphore = Arc::new(Semaphore::new(self.config.worker_count));
69 let scheduler = self.scheduler.clone();
70 let handle = self.handle.clone();
71 let agent_fn = self.agent_fn.clone();
72 let poll_ms = self.config.poll_interval_ms;
73
74 info!("Executor started (workers={})", self.config.worker_count);
75
76 let mut stall_count = 0u32;
77 const MAX_STALL: u32 = 100; loop {
80 let ready = scheduler.get_ready().await;
81
82 if ready.is_empty() {
83 let snap = scheduler.snapshot().await;
85 if let Some(snap) = snap {
86 let all_terminal = snap.nodes.values().all(|n| n.is_terminal());
87 if all_terminal && !snap.nodes.is_empty() {
88 info!("All tasks complete. Executor done.");
89 break;
90 }
91 stall_count += 1;
93 if stall_count > MAX_STALL {
94 warn!(
95 "Executor stalled: no ready tasks and not all terminal after {MAX_STALL} polls"
96 );
97 break;
98 }
99 }
100 sleep(Duration::from_millis(poll_ms)).await;
101 continue;
102 }
103
104 stall_count = 0;
105
106 for node_id in ready {
107 if !scheduler.claim(node_id.clone()).await {
109 continue;
110 }
111
112 let permit = semaphore.clone().acquire_owned().await?;
113 let sched = scheduler.clone();
114 let h = handle.clone();
115 let agent = agent_fn.clone();
116
117 let spec = {
119 let snap = sched.snapshot().await;
120 snap.and_then(|s| s.nodes.get(&node_id).map(|n| n.spec.clone()))
121 };
122
123 let Some(spec) = spec else {
124 warn!("Could not find spec for claimed node {node_id}");
125 sched.fail(node_id, "spec not found".to_string()).await;
126 drop(permit);
127 continue;
128 };
129
130 debug!(node = %node_id, prompt = %spec.prompt, "Dispatching task");
131
132 tokio::spawn(async move {
133 let _permit = permit; match agent(spec).await {
135 Ok(output) => {
136 for mutation in output.mutations {
138 if let Err(e) = h.mutate(mutation).await {
139 warn!("Mutation rejected: {e}");
140 }
141 }
142 sched.complete(node_id, output.result).await;
143 }
144 Err(e) => {
145 error!(error = %e, "Task failed");
146 sched.fail(node_id, e).await;
147 }
148 }
149 });
150 }
151
152 sleep(Duration::from_millis(poll_ms)).await;
153 }
154
155 Ok(())
156 }
157}
158
159#[cfg(test)]
160mod tests {
161 use super::*;
162 use devsper_core::{NodeSpec, RunId};
163 use devsper_graph::{GraphActor, GraphConfig};
164 use std::sync::atomic::{AtomicUsize, Ordering};
165
166 fn make_agent(result: serde_json::Value) -> AgentFn {
167 Arc::new(move |_spec: NodeSpec| {
168 let result = result.clone();
169 Box::pin(async move {
170 Ok(AgentOutput {
171 result,
172 mutations: vec![],
173 })
174 })
175 })
176 }
177
178 fn make_failing_agent() -> AgentFn {
179 Arc::new(|_spec: NodeSpec| {
180 Box::pin(async move { Err("agent failed intentionally".to_string()) })
181 })
182 }
183
184 #[tokio::test]
185 async fn runs_single_task_to_completion() {
186 let config = GraphConfig {
187 run_id: RunId::new(),
188 snapshot_interval: 100,
189 max_depth: 10,
190 };
191 let (mut actor, handle, _events) = GraphActor::new(config);
192
193 let spec = NodeSpec::new("hello task");
194 actor.add_initial_nodes(vec![spec]);
195 tokio::spawn(actor.run());
196
197 let scheduler = Arc::new(Scheduler::new(handle.clone()));
198 let executor = Executor::new(
199 ExecutorConfig {
200 worker_count: 2,
201 poll_interval_ms: 10,
202 },
203 scheduler,
204 handle,
205 make_agent(serde_json::json!({"output": "done"})),
206 );
207
208 executor.run().await.unwrap();
209 }
210
211 #[tokio::test]
212 async fn runs_linear_chain() {
213 let config = GraphConfig {
214 run_id: RunId::new(),
215 snapshot_interval: 100,
216 max_depth: 10,
217 };
218 let (mut actor, handle, _events) = GraphActor::new(config);
219
220 let spec_a = NodeSpec::new("A");
221 let id_a = spec_a.id.clone();
222 let spec_b = NodeSpec::new("B").depends_on(vec![id_a.clone()]);
223 let id_b = spec_b.id.clone();
224 let spec_c = NodeSpec::new("C").depends_on(vec![id_b.clone()]);
225
226 actor.add_initial_nodes(vec![spec_a, spec_b, spec_c]);
227 tokio::spawn(actor.run());
228
229 let counter = Arc::new(AtomicUsize::new(0));
230 let counter2 = counter.clone();
231
232 let agent: AgentFn = Arc::new(move |_spec: NodeSpec| {
233 let c = counter2.clone();
234 Box::pin(async move {
235 c.fetch_add(1, Ordering::SeqCst);
236 Ok(AgentOutput {
237 result: serde_json::json!(null),
238 mutations: vec![],
239 })
240 })
241 });
242
243 let scheduler = Arc::new(Scheduler::new(handle.clone()));
244 let executor = Executor::new(
245 ExecutorConfig {
246 worker_count: 4,
247 poll_interval_ms: 10,
248 },
249 scheduler,
250 handle,
251 agent,
252 );
253
254 executor.run().await.unwrap();
255 assert_eq!(counter.load(Ordering::SeqCst), 3, "All 3 tasks should run");
256 }
257
258 #[tokio::test]
259 async fn failed_task_marks_node_failed() {
260 let config = GraphConfig {
261 run_id: RunId::new(),
262 snapshot_interval: 100,
263 max_depth: 10,
264 };
265 let (mut actor, handle, _events) = GraphActor::new(config);
266 let spec = NodeSpec::new("doomed");
267 actor.add_initial_nodes(vec![spec]);
268 tokio::spawn(actor.run());
269
270 let scheduler = Arc::new(Scheduler::new(handle.clone()));
271 let h2 = handle.clone();
272 let executor = Executor::new(
273 ExecutorConfig {
274 worker_count: 1,
275 poll_interval_ms: 10,
276 },
277 scheduler,
278 handle,
279 make_failing_agent(),
280 );
281
282 executor.run().await.unwrap();
283
284 let snap = h2.snapshot().await.unwrap();
285 let all_terminal = snap.nodes.values().all(|n| n.is_terminal());
286 assert!(all_terminal, "All nodes should be terminal after failure");
287 }
288
289 #[tokio::test]
290 async fn mutation_from_agent_is_applied() {
291 use devsper_core::GraphMutation;
292
293 let config = GraphConfig {
294 run_id: RunId::new(),
295 snapshot_interval: 100,
296 max_depth: 10,
297 };
298 let (mut actor, handle, _events) = GraphActor::new(config);
299 let spec = NodeSpec::new("planning task");
300 actor.add_initial_nodes(vec![spec]);
301 tokio::spawn(actor.run());
302
303 let injected_spec = NodeSpec::new("injected subtask");
305 let injected_id = injected_spec.id.clone();
306
307 let agent: AgentFn = Arc::new(move |_spec: NodeSpec| {
308 let inj = injected_spec.clone();
309 Box::pin(async move {
310 Ok(AgentOutput {
311 result: serde_json::json!({"planned": true}),
312 mutations: vec![GraphMutation::AddNode { spec: inj }],
313 })
314 })
315 });
316
317 let scheduler = Arc::new(Scheduler::new(handle.clone()));
318 let h2 = handle.clone();
319 let executor = Executor::new(
320 ExecutorConfig {
321 worker_count: 2,
322 poll_interval_ms: 10,
323 },
324 scheduler,
325 handle,
326 agent,
327 );
328
329 executor.run().await.unwrap();
330
331 let snap = h2.snapshot().await.unwrap();
332 assert!(
333 snap.nodes.contains_key(&injected_id),
334 "Injected node should be in the graph"
335 );
336 }
337}