brainos_orchestrate/execute.rs
1//! Task execution loop for [`TaskOrchestrator`].
2//!
3//! Holds `execute` (drive a planned task to a terminal phase) and the
4//! per-step `execute_step` dispatcher. Split out of `orchestrator.rs` to
5//! keep the construction/state-machine core small; the per-`StepAction`
6//! handlers live in `crate::actions`.
7
8use std::collections::HashSet;
9
10use chrono::Utc;
11use tokio_util::sync::CancellationToken;
12
13use crate::orchestrator::{OrchestrateError, TaskOrchestrator};
14use crate::state::{StepOutcome, StepState, TaskPhase};
15use crate::step::StepAction;
16use crate::synthesize;
17
18impl TaskOrchestrator {
19 /// Execute a previously planned task (after user approval).
20 pub async fn execute(&self, task_id: &str) -> Result<String, OrchestrateError> {
21 // Confirm the task exists before any state work so a wrong
22 // task_id never produces a phantom transition event.
23 {
24 let tasks = self.tasks.read().await;
25 if !tasks.contains_key(task_id) {
26 return Err(OrchestrateError::TaskNotFound(task_id.to_string()));
27 }
28 }
29 // PR-6b: clone the task's cancellation token up-front. Every
30 // checkpoint below — top of loop, per-step dispatch, the per-
31 // action future, the confirmation wait, the replan LLM call —
32 // races against `token.cancelled()` so a `cancel()` call mid-
33 // step aborts within one polling cycle.
34 let token = self.cancel_token_for(task_id).await;
35 if token.is_cancelled() {
36 // Cancel fired before execute() even started; honor it.
37 return Ok(synthesize::summarize_task(
38 self.tasks
39 .read()
40 .await
41 .get(task_id)
42 .expect("invariant: task_id is present (checked above)"),
43 ));
44 }
45 self.transition_phase(task_id, TaskPhase::Executing).await;
46
47 tracing::info!(task_id = %task_id, "Starting task execution");
48
49 // Execute steps in topological order, respecting dependencies.
50 //
51 // `ready_steps` is computed against the *succeeded* set, not the
52 // terminal set — a failed step must NOT unblock its dependents.
53 // Failure cascades are handled below by marking dependents
54 // `Skipped` so the loop still terminates without busy-looping.
55 loop {
56 if token.is_cancelled() {
57 tracing::info!(task_id = %task_id, "execute loop observed cancellation");
58 break;
59 }
60 let ready_steps = {
61 let tasks = self.tasks.read().await;
62 let task = tasks
63 .get(task_id)
64 .expect("invariant: task inserted by plan(); only state changes after");
65
66 if task.is_complete() {
67 break;
68 }
69
70 let succeeded: HashSet<String> = task
71 .step_states
72 .iter()
73 .filter(|(_, s)| s.is_success())
74 .map(|(id, _)| id.clone())
75 .collect();
76 // `ready_steps` only checks dep-satisfaction — it does
77 // NOT exclude steps that are already terminal. Without
78 // this filter a Failed step (which is not in `succeeded`
79 // and has no missing deps) would be picked as "ready"
80 // again on the next iteration, re-running the failure
81 // and re-triggering the replan loop. Only steps whose
82 // current state is Pending may be (re)scheduled.
83 task.graph
84 .ready_steps(&succeeded)
85 .into_iter()
86 .filter(|id| {
87 matches!(
88 task.step_states.get(id),
89 Some(StepState::Pending) | Some(StepState::Ready)
90 )
91 })
92 .collect::<Vec<_>>()
93 };
94
95 if ready_steps.is_empty() {
96 // No ready steps but not complete — some steps must be blocked
97 // (running or awaiting confirmation). Break to avoid busy-loop.
98 break;
99 }
100
101 // Execute ready steps (sequentially for now; parallel in future)
102 for step_id in &ready_steps {
103 if token.is_cancelled() {
104 break;
105 }
106 self.execute_step(task_id, step_id, &token).await?;
107 }
108 }
109
110 // Generate summary
111 let tasks = self.tasks.read().await;
112 let task = tasks
113 .get(task_id)
114 .expect("invariant: task inserted by plan() and never removed");
115 let summary = synthesize::summarize_task(task);
116
117 Ok(summary)
118 }
119
120 /// Execute a single step.
121 async fn execute_step(
122 &self,
123 task_id: &str,
124 step_id: &str,
125 token: &CancellationToken,
126 ) -> Result<(), OrchestrateError> {
127 // Pre-flight: if cancellation already fired (e.g. between the
128 // outer loop's check and us entering this fn), mark the step
129 // cancelled and bail without touching the action handlers.
130 if token.is_cancelled() {
131 self.mark_step_cancelled(task_id, step_id).await;
132 return Ok(());
133 }
134 let (action, tier, description) = {
135 let tasks = self.tasks.read().await;
136 let task = tasks
137 .get(task_id)
138 .expect("invariant: task_id always corresponds to a planned task");
139 let step = task
140 .graph
141 .steps
142 .get(step_id)
143 .expect("invariant: step_id sourced from task.graph.ready_steps()");
144 (step.action.clone(), step.tier, step.description.clone())
145 };
146
147 // Mark as running
148 {
149 let mut tasks = self.tasks.write().await;
150 let task = tasks
151 .get_mut(task_id)
152 .expect("invariant: task_id always corresponds to a planned task");
153 task.set_step_state(
154 step_id,
155 StepState::Running {
156 started_at: Utc::now(),
157 },
158 );
159 }
160
161 tracing::info!(task_id = %task_id, step_id = %step_id, step = %description, "Executing step");
162
163 // Check confirmation for destructive/external tiers
164 if tier.requires_confirmation() {
165 if let Some(confirm) = &self.confirm {
166 let spec = confirm::ApprovalSpec::new(&description, tier);
167 let nonce = spec.nonce.clone();
168
169 // Mark as awaiting confirmation
170 {
171 let mut tasks = self.tasks.write().await;
172 let task = tasks
173 .get_mut(task_id)
174 .expect("invariant: task_id always corresponds to a planned task");
175 task.set_step_state(
176 step_id,
177 StepState::AwaitingConfirmation {
178 nonce: nonce.clone(),
179 since: Utc::now(),
180 },
181 );
182 }
183
184 // PR-6b: race the confirmation wait against the task
185 // token. A `cancel()` mid-prompt aborts the wait so the
186 // step doesn't block forever on a confirmation that will
187 // never come.
188 let confirm_outcome = tokio::select! {
189 biased;
190 _ = token.cancelled() => {
191 self.mark_step_cancelled(task_id, step_id).await;
192 return Ok(());
193 }
194 r = confirm.request(spec) => r,
195 };
196 match confirm_outcome {
197 Ok(confirm::ApprovalOutcome::Approved) => {
198 tracing::info!(step = %description, "Step approved");
199 }
200 Ok(outcome) => {
201 let reason = format!("Approval denied: {outcome:?}");
202 let mut tasks = self.tasks.write().await;
203 let task = tasks
204 .get_mut(task_id)
205 .expect("invariant: task_id always corresponds to a planned task");
206 task.set_step_state(step_id, StepState::Cancelled);
207 tracing::info!(step = %description, reason = %reason, "Step cancelled");
208 return Ok(());
209 }
210 Err(e) => {
211 let mut tasks = self.tasks.write().await;
212 let task = tasks
213 .get_mut(task_id)
214 .expect("invariant: task_id always corresponds to a planned task");
215 task.set_step_state(
216 step_id,
217 StepState::Failed {
218 error: format!("Confirmation error: {e}"),
219 retryable: true,
220 failed_at: Utc::now(),
221 },
222 );
223 return Ok(());
224 }
225 }
226 }
227 }
228
229 // Execute the action. PR-6b: race against `token.cancelled()`
230 // so an in-flight sandbox/LLM/delegate call aborts mid-flight.
231 // Dropping the action future is cancel-safe — none of the
232 // handlers hold mutable global state past an await.
233 let result = tokio::select! {
234 biased;
235 _ = token.cancelled() => {
236 self.mark_step_cancelled(task_id, step_id).await;
237 return Ok(());
238 }
239 r = async { match &action {
240 StepAction::Execute { command, workdir } | StepAction::Test { command, workdir } => {
241 self.execute_sandbox_step(command, workdir).await
242 }
243 StepAction::Shell { command, workdir } => {
244 self.execute_shell_step(command, workdir).await
245 }
246 StepAction::Research { query } => self.execute_research_step(query).await,
247 StepAction::Plan { output } => {
248 // A `Plan` step that carries no output is effectively a
249 // no-op — the LLM emitted a step the executor cannot
250 // perform but marked it `plan` so it would silently
251 // succeed. Treat that as an honest failure so the user
252 // sees that nothing happened, instead of a "succeeded"
253 // count that masks an empty result.
254 let trimmed = output.trim();
255 if trimmed.is_empty() {
256 Err(format!(
257 "Plan step '{description}' had no output to produce — \
258 the planner did not specify what this step should write. \
259 Re-plan with concrete steps (research/execute/implement)."
260 ))
261 } else {
262 Ok(StepOutcome {
263 stdout: output.clone(),
264 stderr: String::new(),
265 exit_code: None,
266 artifacts: vec![],
267 summary: summarize_first_line(trimmed),
268 })
269 }
270 }
271 StepAction::Implement { spec, agent } => {
272 self.delegate_implement_step(spec, agent).await
273 }
274 StepAction::Review { artifact } => self.execute_review_step(artifact).await,
275 StepAction::Notify { channel, message } => {
276 self.execute_notify_step(channel, message).await
277 }
278 } } => r,
279 };
280
281 // Update step state
282 let mut tasks = self.tasks.write().await;
283 let task = tasks
284 .get_mut(task_id)
285 .expect("invariant: task_id always corresponds to a planned task");
286
287 match result {
288 Ok(outcome) => {
289 // Record in audit trail
290 if let Some(audit) = &self.audit {
291 let entry = audit::AuditEntry::new(
292 &description,
293 "step executed",
294 &outcome.summary,
295 tier,
296 )
297 .with_source("orchestrator")
298 .with_execution(
299 outcome.stdout.clone(),
300 outcome.stderr.clone(),
301 outcome.exit_code.unwrap_or(0),
302 0, // duration tracked elsewhere
303 );
304 if let Err(e) = audit.record(entry).await {
305 tracing::warn!("Failed to audit step outcome: {e}");
306 }
307 }
308
309 task.set_step_state(
310 step_id,
311 StepState::Completed {
312 outcome,
313 completed_at: Utc::now(),
314 },
315 );
316 }
317 Err(error) => {
318 // Mirror the success-path audit write so failed steps
319 // are recorded in the audit trail too — otherwise a
320 // sandbox exit-1 disappears from history once we lifted
321 // it out of the Ok arm.
322 if let Some(audit) = &self.audit {
323 let entry = audit::AuditEntry::new(&description, "step failed", &error, tier)
324 .with_source("orchestrator")
325 .with_outcome(audit::AuditOutcome::Failure);
326 if let Err(e) = audit.record(entry).await {
327 tracing::warn!("Failed to audit step failure: {e}");
328 }
329 }
330
331 task.set_step_state(
332 step_id,
333 StepState::Failed {
334 error: error.clone(),
335 retryable: true,
336 failed_at: Utc::now(),
337 },
338 );
339
340 // Mark all transitive dependents `Skipped` so the loop
341 // terminates and the user sees an honest status instead
342 // of cascading attempts against missing inputs.
343 let dependents = task.graph.transitive_dependents(step_id);
344 let reason = format!("dependency {step_id} failed");
345 for dep_id in dependents {
346 if let Some(state) = task.step_states.get(&dep_id) {
347 if !state.is_terminal() {
348 task.set_step_state(
349 &dep_id,
350 StepState::Skipped {
351 reason: reason.clone(),
352 },
353 );
354 }
355 }
356 }
357
358 // Drop the write lock before the (potentially slow) LLM
359 // replan call below. We still own a snapshot of the
360 // fields the replan needs.
361 drop(tasks);
362
363 // Try to repair the plan if we still have replan budget.
364 // Best-effort: a replan failure leaves the task in the
365 // standard "failed step + skipped dependents" state.
366 self.try_replan_after_failure(task_id, step_id, &description, &error, token)
367 .await;
368
369 // Re-check completion + drive the canonical
370 // Reconciling → (Completed | Failed) shape under the
371 // state-machine helper.
372 let (done, all_succeeded) = {
373 let tasks = self.tasks.read().await;
374 let task = tasks
375 .get(task_id)
376 .expect("invariant: task_id always corresponds to a planned task");
377 (task.is_complete(), task.all_succeeded())
378 };
379 if done {
380 self.transition_phase(task_id, TaskPhase::Reconciling).await;
381 let terminal = if all_succeeded {
382 TaskPhase::Completed
383 } else {
384 TaskPhase::Failed
385 };
386 self.transition_phase(task_id, terminal).await;
387 tracing::info!(task_id = %task_id, terminal = %terminal.as_str(), "Task complete");
388 }
389 return Ok(());
390 }
391 }
392
393 // Drop the write lock before the I/O-bound transition_phase
394 // calls below — they take their own lock for the brief in-mem
395 // flip and we don't want to hold the executor's lock through
396 // the audit-row write and observer publish.
397 drop(tasks);
398 let (done, all_succeeded) = {
399 let tasks = self.tasks.read().await;
400 let task = tasks
401 .get(task_id)
402 .expect("invariant: task_id always corresponds to a planned task");
403 (task.is_complete(), task.all_succeeded())
404 };
405 if done {
406 self.transition_phase(task_id, TaskPhase::Reconciling).await;
407 let terminal = if all_succeeded {
408 TaskPhase::Completed
409 } else {
410 TaskPhase::Failed
411 };
412 self.transition_phase(task_id, terminal).await;
413 tracing::info!(task_id = %task_id, terminal = %terminal.as_str(), "Task complete");
414 }
415
416 Ok(())
417 }
418}
419
420/// First non-empty line of `s` truncated to 160 chars — used for short
421/// step summaries surfaced in the user-facing task report.
422fn summarize_first_line(s: &str) -> String {
423 let line = s
424 .lines()
425 .map(str::trim)
426 .find(|l| !l.is_empty())
427 .unwrap_or("Plan produced");
428 if line.chars().count() > 160 {
429 let truncated: String = line.chars().take(157).collect();
430 format!("{truncated}…")
431 } else {
432 line.to_string()
433 }
434}