Skip to main content

brainos_orchestrate/
execute.rs

1//! Task execution loop for [`TaskOrchestrator`].
2//!
3//! Holds `execute` (drive a planned task to a terminal phase) and the
4//! per-step `execute_step` dispatcher. Split out of `orchestrator.rs` to
5//! keep the construction/state-machine core small; the per-`StepAction`
6//! handlers live in `crate::actions`.
7
8use std::collections::HashSet;
9
10use chrono::Utc;
11use tokio_util::sync::CancellationToken;
12
13use crate::orchestrator::{OrchestrateError, TaskOrchestrator};
14use crate::state::{StepOutcome, StepState, TaskPhase};
15use crate::step::StepAction;
16use crate::synthesize;
17
18impl TaskOrchestrator {
19    /// Execute a previously planned task (after user approval).
20    pub async fn execute(&self, task_id: &str) -> Result<String, OrchestrateError> {
21        // Confirm the task exists before any state work so a wrong
22        // task_id never produces a phantom transition event.
23        {
24            let tasks = self.tasks.read().await;
25            if !tasks.contains_key(task_id) {
26                return Err(OrchestrateError::TaskNotFound(task_id.to_string()));
27            }
28        }
29        // PR-6b: clone the task's cancellation token up-front. Every
30        // checkpoint below — top of loop, per-step dispatch, the per-
31        // action future, the confirmation wait, the replan LLM call —
32        // races against `token.cancelled()` so a `cancel()` call mid-
33        // step aborts within one polling cycle.
34        let token = self.cancel_token_for(task_id).await;
35        if token.is_cancelled() {
36            // Cancel fired before execute() even started; honor it.
37            return Ok(synthesize::summarize_task(
38                self.tasks
39                    .read()
40                    .await
41                    .get(task_id)
42                    .expect("invariant: task_id is present (checked above)"),
43            ));
44        }
45        self.transition_phase(task_id, TaskPhase::Executing).await;
46
47        tracing::info!(task_id = %task_id, "Starting task execution");
48
49        // Execute steps in topological order, respecting dependencies.
50        //
51        // `ready_steps` is computed against the *succeeded* set, not the
52        // terminal set — a failed step must NOT unblock its dependents.
53        // Failure cascades are handled below by marking dependents
54        // `Skipped` so the loop still terminates without busy-looping.
55        loop {
56            if token.is_cancelled() {
57                tracing::info!(task_id = %task_id, "execute loop observed cancellation");
58                break;
59            }
60            let ready_steps = {
61                let tasks = self.tasks.read().await;
62                let task = tasks
63                    .get(task_id)
64                    .expect("invariant: task inserted by plan(); only state changes after");
65
66                if task.is_complete() {
67                    break;
68                }
69
70                let succeeded: HashSet<String> = task
71                    .step_states
72                    .iter()
73                    .filter(|(_, s)| s.is_success())
74                    .map(|(id, _)| id.clone())
75                    .collect();
76                // `ready_steps` only checks dep-satisfaction — it does
77                // NOT exclude steps that are already terminal. Without
78                // this filter a Failed step (which is not in `succeeded`
79                // and has no missing deps) would be picked as "ready"
80                // again on the next iteration, re-running the failure
81                // and re-triggering the replan loop. Only steps whose
82                // current state is Pending may be (re)scheduled.
83                task.graph
84                    .ready_steps(&succeeded)
85                    .into_iter()
86                    .filter(|id| {
87                        matches!(
88                            task.step_states.get(id),
89                            Some(StepState::Pending) | Some(StepState::Ready)
90                        )
91                    })
92                    .collect::<Vec<_>>()
93            };
94
95            if ready_steps.is_empty() {
96                // No ready steps but not complete — some steps must be blocked
97                // (running or awaiting confirmation). Break to avoid busy-loop.
98                break;
99            }
100
101            // Execute ready steps (sequentially for now; parallel in future)
102            for step_id in &ready_steps {
103                if token.is_cancelled() {
104                    break;
105                }
106                self.execute_step(task_id, step_id, &token).await?;
107            }
108        }
109
110        // Generate summary
111        let tasks = self.tasks.read().await;
112        let task = tasks
113            .get(task_id)
114            .expect("invariant: task inserted by plan() and never removed");
115        let summary = synthesize::summarize_task(task);
116
117        Ok(summary)
118    }
119
120    /// Execute a single step.
121    async fn execute_step(
122        &self,
123        task_id: &str,
124        step_id: &str,
125        token: &CancellationToken,
126    ) -> Result<(), OrchestrateError> {
127        // Pre-flight: if cancellation already fired (e.g. between the
128        // outer loop's check and us entering this fn), mark the step
129        // cancelled and bail without touching the action handlers.
130        if token.is_cancelled() {
131            self.mark_step_cancelled(task_id, step_id).await;
132            return Ok(());
133        }
134        let (action, tier, description) = {
135            let tasks = self.tasks.read().await;
136            let task = tasks
137                .get(task_id)
138                .expect("invariant: task_id always corresponds to a planned task");
139            let step = task
140                .graph
141                .steps
142                .get(step_id)
143                .expect("invariant: step_id sourced from task.graph.ready_steps()");
144            (step.action.clone(), step.tier, step.description.clone())
145        };
146
147        // Mark as running
148        {
149            let mut tasks = self.tasks.write().await;
150            let task = tasks
151                .get_mut(task_id)
152                .expect("invariant: task_id always corresponds to a planned task");
153            task.set_step_state(
154                step_id,
155                StepState::Running {
156                    started_at: Utc::now(),
157                },
158            );
159        }
160
161        tracing::info!(task_id = %task_id, step_id = %step_id, step = %description, "Executing step");
162
163        // Check confirmation for destructive/external tiers
164        if tier.requires_confirmation() {
165            if let Some(confirm) = &self.confirm {
166                let spec = confirm::ApprovalSpec::new(&description, tier);
167                let nonce = spec.nonce.clone();
168
169                // Mark as awaiting confirmation
170                {
171                    let mut tasks = self.tasks.write().await;
172                    let task = tasks
173                        .get_mut(task_id)
174                        .expect("invariant: task_id always corresponds to a planned task");
175                    task.set_step_state(
176                        step_id,
177                        StepState::AwaitingConfirmation {
178                            nonce: nonce.clone(),
179                            since: Utc::now(),
180                        },
181                    );
182                }
183
184                // PR-6b: race the confirmation wait against the task
185                // token. A `cancel()` mid-prompt aborts the wait so the
186                // step doesn't block forever on a confirmation that will
187                // never come.
188                let confirm_outcome = tokio::select! {
189                    biased;
190                    _ = token.cancelled() => {
191                        self.mark_step_cancelled(task_id, step_id).await;
192                        return Ok(());
193                    }
194                    r = confirm.request(spec) => r,
195                };
196                match confirm_outcome {
197                    Ok(confirm::ApprovalOutcome::Approved) => {
198                        tracing::info!(step = %description, "Step approved");
199                    }
200                    Ok(outcome) => {
201                        let reason = format!("Approval denied: {outcome:?}");
202                        let mut tasks = self.tasks.write().await;
203                        let task = tasks
204                            .get_mut(task_id)
205                            .expect("invariant: task_id always corresponds to a planned task");
206                        task.set_step_state(step_id, StepState::Cancelled);
207                        tracing::info!(step = %description, reason = %reason, "Step cancelled");
208                        return Ok(());
209                    }
210                    Err(e) => {
211                        let mut tasks = self.tasks.write().await;
212                        let task = tasks
213                            .get_mut(task_id)
214                            .expect("invariant: task_id always corresponds to a planned task");
215                        task.set_step_state(
216                            step_id,
217                            StepState::Failed {
218                                error: format!("Confirmation error: {e}"),
219                                retryable: true,
220                                failed_at: Utc::now(),
221                            },
222                        );
223                        return Ok(());
224                    }
225                }
226            }
227        }
228
229        // Execute the action. PR-6b: race against `token.cancelled()`
230        // so an in-flight sandbox/LLM/delegate call aborts mid-flight.
231        // Dropping the action future is cancel-safe — none of the
232        // handlers hold mutable global state past an await.
233        let result = tokio::select! {
234            biased;
235            _ = token.cancelled() => {
236                self.mark_step_cancelled(task_id, step_id).await;
237                return Ok(());
238            }
239            r = async { match &action {
240            StepAction::Execute { command, workdir } | StepAction::Test { command, workdir } => {
241                self.execute_sandbox_step(command, workdir).await
242            }
243            StepAction::Shell { command, workdir } => {
244                self.execute_shell_step(command, workdir).await
245            }
246            StepAction::Research { query } => self.execute_research_step(query).await,
247            StepAction::Plan { output } => {
248                // A `Plan` step that carries no output is effectively a
249                // no-op — the LLM emitted a step the executor cannot
250                // perform but marked it `plan` so it would silently
251                // succeed. Treat that as an honest failure so the user
252                // sees that nothing happened, instead of a "succeeded"
253                // count that masks an empty result.
254                let trimmed = output.trim();
255                if trimmed.is_empty() {
256                    Err(format!(
257                        "Plan step '{description}' had no output to produce — \
258                         the planner did not specify what this step should write. \
259                         Re-plan with concrete steps (research/execute/implement)."
260                    ))
261                } else {
262                    Ok(StepOutcome {
263                        stdout: output.clone(),
264                        stderr: String::new(),
265                        exit_code: None,
266                        artifacts: vec![],
267                        summary: summarize_first_line(trimmed),
268                    })
269                }
270            }
271            StepAction::Implement { spec, agent } => {
272                self.delegate_implement_step(spec, agent).await
273            }
274            StepAction::Review { artifact } => self.execute_review_step(artifact).await,
275            StepAction::Notify { channel, message } => {
276                self.execute_notify_step(channel, message).await
277            }
278        } } => r,
279        };
280
281        // Update step state
282        let mut tasks = self.tasks.write().await;
283        let task = tasks
284            .get_mut(task_id)
285            .expect("invariant: task_id always corresponds to a planned task");
286
287        match result {
288            Ok(outcome) => {
289                // Record in audit trail
290                if let Some(audit) = &self.audit {
291                    let entry = audit::AuditEntry::new(
292                        &description,
293                        "step executed",
294                        &outcome.summary,
295                        tier,
296                    )
297                    .with_source("orchestrator")
298                    .with_execution(
299                        outcome.stdout.clone(),
300                        outcome.stderr.clone(),
301                        outcome.exit_code.unwrap_or(0),
302                        0, // duration tracked elsewhere
303                    );
304                    if let Err(e) = audit.record(entry).await {
305                        tracing::warn!("Failed to audit step outcome: {e}");
306                    }
307                }
308
309                task.set_step_state(
310                    step_id,
311                    StepState::Completed {
312                        outcome,
313                        completed_at: Utc::now(),
314                    },
315                );
316            }
317            Err(error) => {
318                // Mirror the success-path audit write so failed steps
319                // are recorded in the audit trail too — otherwise a
320                // sandbox exit-1 disappears from history once we lifted
321                // it out of the Ok arm.
322                if let Some(audit) = &self.audit {
323                    let entry = audit::AuditEntry::new(&description, "step failed", &error, tier)
324                        .with_source("orchestrator")
325                        .with_outcome(audit::AuditOutcome::Failure);
326                    if let Err(e) = audit.record(entry).await {
327                        tracing::warn!("Failed to audit step failure: {e}");
328                    }
329                }
330
331                task.set_step_state(
332                    step_id,
333                    StepState::Failed {
334                        error: error.clone(),
335                        retryable: true,
336                        failed_at: Utc::now(),
337                    },
338                );
339
340                // Mark all transitive dependents `Skipped` so the loop
341                // terminates and the user sees an honest status instead
342                // of cascading attempts against missing inputs.
343                let dependents = task.graph.transitive_dependents(step_id);
344                let reason = format!("dependency {step_id} failed");
345                for dep_id in dependents {
346                    if let Some(state) = task.step_states.get(&dep_id) {
347                        if !state.is_terminal() {
348                            task.set_step_state(
349                                &dep_id,
350                                StepState::Skipped {
351                                    reason: reason.clone(),
352                                },
353                            );
354                        }
355                    }
356                }
357
358                // Drop the write lock before the (potentially slow) LLM
359                // replan call below. We still own a snapshot of the
360                // fields the replan needs.
361                drop(tasks);
362
363                // Try to repair the plan if we still have replan budget.
364                // Best-effort: a replan failure leaves the task in the
365                // standard "failed step + skipped dependents" state.
366                self.try_replan_after_failure(task_id, step_id, &description, &error, token)
367                    .await;
368
369                // Re-check completion + drive the canonical
370                // Reconciling → (Completed | Failed) shape under the
371                // state-machine helper.
372                let (done, all_succeeded) = {
373                    let tasks = self.tasks.read().await;
374                    let task = tasks
375                        .get(task_id)
376                        .expect("invariant: task_id always corresponds to a planned task");
377                    (task.is_complete(), task.all_succeeded())
378                };
379                if done {
380                    self.transition_phase(task_id, TaskPhase::Reconciling).await;
381                    let terminal = if all_succeeded {
382                        TaskPhase::Completed
383                    } else {
384                        TaskPhase::Failed
385                    };
386                    self.transition_phase(task_id, terminal).await;
387                    tracing::info!(task_id = %task_id, terminal = %terminal.as_str(), "Task complete");
388                }
389                return Ok(());
390            }
391        }
392
393        // Drop the write lock before the I/O-bound transition_phase
394        // calls below — they take their own lock for the brief in-mem
395        // flip and we don't want to hold the executor's lock through
396        // the audit-row write and observer publish.
397        drop(tasks);
398        let (done, all_succeeded) = {
399            let tasks = self.tasks.read().await;
400            let task = tasks
401                .get(task_id)
402                .expect("invariant: task_id always corresponds to a planned task");
403            (task.is_complete(), task.all_succeeded())
404        };
405        if done {
406            self.transition_phase(task_id, TaskPhase::Reconciling).await;
407            let terminal = if all_succeeded {
408                TaskPhase::Completed
409            } else {
410                TaskPhase::Failed
411            };
412            self.transition_phase(task_id, terminal).await;
413            tracing::info!(task_id = %task_id, terminal = %terminal.as_str(), "Task complete");
414        }
415
416        Ok(())
417    }
418}
419
420/// First non-empty line of `s` truncated to 160 chars — used for short
421/// step summaries surfaced in the user-facing task report.
422fn summarize_first_line(s: &str) -> String {
423    let line = s
424        .lines()
425        .map(str::trim)
426        .find(|l| !l.is_empty())
427        .unwrap_or("Plan produced");
428    if line.chars().count() > 160 {
429        let truncated: String = line.chars().take(157).collect();
430        format!("{truncated}…")
431    } else {
432        line.to_string()
433    }
434}