Skip to main content

devsper_executor/
executor.rs

1use devsper_core::{GraphMutation, NodeSpec};
2use devsper_graph::GraphHandle;
3use devsper_scheduler::Scheduler;
4use anyhow::Result;
5use std::sync::Arc;
6use tokio::sync::Semaphore;
7use tokio::time::{sleep, Duration};
8use tracing::{debug, error, info, warn};
9
10/// Output from an agent function: result value and optional graph mutations.
11pub struct AgentOutput {
12    pub result: serde_json::Value,
13    pub mutations: Vec<GraphMutation>,
14}
15
16/// The agent function signature.
17/// Takes the NodeSpec (task description + metadata).
18/// Returns AgentOutput or an error string.
19pub type AgentFn = Arc<
20    dyn Fn(NodeSpec) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<AgentOutput, String>> + Send>>
21        + Send
22        + Sync,
23>;
24
25/// Configuration for the executor.
26#[derive(Debug, Clone)]
27pub struct ExecutorConfig {
28    /// Maximum number of concurrent tasks.
29    pub worker_count: usize,
30    /// How often to poll for ready tasks (milliseconds).
31    pub poll_interval_ms: u64,
32}
33
34impl Default for ExecutorConfig {
35    fn default() -> Self {
36        Self {
37            worker_count: 4,
38            poll_interval_ms: 50,
39        }
40    }
41}
42
43/// The executor drives the main run loop.
44pub struct Executor {
45    config: ExecutorConfig,
46    scheduler: Arc<Scheduler>,
47    handle: GraphHandle,
48    agent_fn: AgentFn,
49}
50
51impl Executor {
52    pub fn new(
53        config: ExecutorConfig,
54        scheduler: Arc<Scheduler>,
55        handle: GraphHandle,
56        agent_fn: AgentFn,
57    ) -> Self {
58        Self {
59            config,
60            scheduler,
61            handle,
62            agent_fn,
63        }
64    }
65
66    /// Run the executor until all tasks are complete or no progress can be made.
67    pub async fn run(self) -> Result<()> {
68        let semaphore = Arc::new(Semaphore::new(self.config.worker_count));
69        let scheduler = self.scheduler.clone();
70        let handle = self.handle.clone();
71        let agent_fn = self.agent_fn.clone();
72        let poll_ms = self.config.poll_interval_ms;
73
74        info!("Executor started (workers={})", self.config.worker_count);
75
76        let mut stall_count = 0u32;
77        const MAX_STALL: u32 = 100; // ~5s with 50ms poll
78
79        loop {
80            let ready = scheduler.get_ready().await;
81
82            if ready.is_empty() {
83                // Check if run is complete (no pending or running tasks)
84                let snap = scheduler.snapshot().await;
85                if let Some(snap) = snap {
86                    let all_terminal = snap.nodes.values().all(|n| n.is_terminal());
87                    if all_terminal && !snap.nodes.is_empty() {
88                        info!("All tasks complete. Executor done.");
89                        break;
90                    }
91                    // Some tasks are still running (not terminal), wait for them
92                    stall_count += 1;
93                    if stall_count > MAX_STALL {
94                        warn!(
95                            "Executor stalled: no ready tasks and not all terminal after {MAX_STALL} polls"
96                        );
97                        break;
98                    }
99                }
100                sleep(Duration::from_millis(poll_ms)).await;
101                continue;
102            }
103
104            stall_count = 0;
105
106            for node_id in ready {
107                // Try to claim — only one worker wins the race
108                if !scheduler.claim(node_id.clone()).await {
109                    continue;
110                }
111
112                let permit = semaphore.clone().acquire_owned().await?;
113                let sched = scheduler.clone();
114                let h = handle.clone();
115                let agent = agent_fn.clone();
116
117                // Get the node spec for this task
118                let spec = {
119                    let snap = sched.snapshot().await;
120                    snap.and_then(|s| s.nodes.get(&node_id).map(|n| n.spec.clone()))
121                };
122
123                let Some(spec) = spec else {
124                    warn!("Could not find spec for claimed node {node_id}");
125                    sched.fail(node_id, "spec not found".to_string()).await;
126                    drop(permit);
127                    continue;
128                };
129
130                debug!(node = %node_id, prompt = %spec.prompt, "Dispatching task");
131
132                tokio::spawn(async move {
133                    let _permit = permit; // released when task completes
134                    match agent(spec).await {
135                        Ok(output) => {
136                            // Apply any mutations the agent requested
137                            for mutation in output.mutations {
138                                if let Err(e) = h.mutate(mutation).await {
139                                    warn!("Mutation rejected: {e}");
140                                }
141                            }
142                            sched.complete(node_id, output.result).await;
143                        }
144                        Err(e) => {
145                            error!(error = %e, "Task failed");
146                            sched.fail(node_id, e).await;
147                        }
148                    }
149                });
150            }
151
152            sleep(Duration::from_millis(poll_ms)).await;
153        }
154
155        Ok(())
156    }
157}
158
159#[cfg(test)]
160mod tests {
161    use super::*;
162    use devsper_core::{NodeSpec, RunId};
163    use devsper_graph::{GraphActor, GraphConfig};
164    use std::sync::atomic::{AtomicUsize, Ordering};
165
166    fn make_agent(result: serde_json::Value) -> AgentFn {
167        Arc::new(move |_spec: NodeSpec| {
168            let result = result.clone();
169            Box::pin(async move {
170                Ok(AgentOutput {
171                    result,
172                    mutations: vec![],
173                })
174            })
175        })
176    }
177
178    fn make_failing_agent() -> AgentFn {
179        Arc::new(|_spec: NodeSpec| {
180            Box::pin(async move { Err("agent failed intentionally".to_string()) })
181        })
182    }
183
184    #[tokio::test]
185    async fn runs_single_task_to_completion() {
186        let config = GraphConfig {
187            run_id: RunId::new(),
188            snapshot_interval: 100,
189            max_depth: 10,
190        };
191        let (mut actor, handle, _events) = GraphActor::new(config);
192
193        let spec = NodeSpec::new("hello task");
194        actor.add_initial_nodes(vec![spec]);
195        tokio::spawn(actor.run());
196
197        let scheduler = Arc::new(Scheduler::new(handle.clone()));
198        let executor = Executor::new(
199            ExecutorConfig {
200                worker_count: 2,
201                poll_interval_ms: 10,
202            },
203            scheduler,
204            handle,
205            make_agent(serde_json::json!({"output": "done"})),
206        );
207
208        executor.run().await.unwrap();
209    }
210
211    #[tokio::test]
212    async fn runs_linear_chain() {
213        let config = GraphConfig {
214            run_id: RunId::new(),
215            snapshot_interval: 100,
216            max_depth: 10,
217        };
218        let (mut actor, handle, _events) = GraphActor::new(config);
219
220        let spec_a = NodeSpec::new("A");
221        let id_a = spec_a.id.clone();
222        let spec_b = NodeSpec::new("B").depends_on(vec![id_a.clone()]);
223        let id_b = spec_b.id.clone();
224        let spec_c = NodeSpec::new("C").depends_on(vec![id_b.clone()]);
225
226        actor.add_initial_nodes(vec![spec_a, spec_b, spec_c]);
227        tokio::spawn(actor.run());
228
229        let counter = Arc::new(AtomicUsize::new(0));
230        let counter2 = counter.clone();
231
232        let agent: AgentFn = Arc::new(move |_spec: NodeSpec| {
233            let c = counter2.clone();
234            Box::pin(async move {
235                c.fetch_add(1, Ordering::SeqCst);
236                Ok(AgentOutput {
237                    result: serde_json::json!(null),
238                    mutations: vec![],
239                })
240            })
241        });
242
243        let scheduler = Arc::new(Scheduler::new(handle.clone()));
244        let executor = Executor::new(
245            ExecutorConfig {
246                worker_count: 4,
247                poll_interval_ms: 10,
248            },
249            scheduler,
250            handle,
251            agent,
252        );
253
254        executor.run().await.unwrap();
255        assert_eq!(counter.load(Ordering::SeqCst), 3, "All 3 tasks should run");
256    }
257
258    #[tokio::test]
259    async fn failed_task_marks_node_failed() {
260        let config = GraphConfig {
261            run_id: RunId::new(),
262            snapshot_interval: 100,
263            max_depth: 10,
264        };
265        let (mut actor, handle, _events) = GraphActor::new(config);
266        let spec = NodeSpec::new("doomed");
267        actor.add_initial_nodes(vec![spec]);
268        tokio::spawn(actor.run());
269
270        let scheduler = Arc::new(Scheduler::new(handle.clone()));
271        let h2 = handle.clone();
272        let executor = Executor::new(
273            ExecutorConfig {
274                worker_count: 1,
275                poll_interval_ms: 10,
276            },
277            scheduler,
278            handle,
279            make_failing_agent(),
280        );
281
282        executor.run().await.unwrap();
283
284        let snap = h2.snapshot().await.unwrap();
285        let all_terminal = snap.nodes.values().all(|n| n.is_terminal());
286        assert!(all_terminal, "All nodes should be terminal after failure");
287    }
288
289    #[tokio::test]
290    async fn mutation_from_agent_is_applied() {
291        use devsper_core::GraphMutation;
292
293        let config = GraphConfig {
294            run_id: RunId::new(),
295            snapshot_interval: 100,
296            max_depth: 10,
297        };
298        let (mut actor, handle, _events) = GraphActor::new(config);
299        let spec = NodeSpec::new("planning task");
300        actor.add_initial_nodes(vec![spec]);
301        tokio::spawn(actor.run());
302
303        // Agent injects a new node as a mutation
304        let injected_spec = NodeSpec::new("injected subtask");
305        let injected_id = injected_spec.id.clone();
306
307        let agent: AgentFn = Arc::new(move |_spec: NodeSpec| {
308            let inj = injected_spec.clone();
309            Box::pin(async move {
310                Ok(AgentOutput {
311                    result: serde_json::json!({"planned": true}),
312                    mutations: vec![GraphMutation::AddNode { spec: inj }],
313                })
314            })
315        });
316
317        let scheduler = Arc::new(Scheduler::new(handle.clone()));
318        let h2 = handle.clone();
319        let executor = Executor::new(
320            ExecutorConfig {
321                worker_count: 2,
322                poll_interval_ms: 10,
323            },
324            scheduler,
325            handle,
326            agent,
327        );
328
329        executor.run().await.unwrap();
330
331        let snap = h2.snapshot().await.unwrap();
332        assert!(
333            snap.nodes.contains_key(&injected_id),
334            "Injected node should be in the graph"
335        );
336    }
337}