brainos-orchestrate 0.5.0

Task orchestrator — decompose, plan, track, and coordinate autonomous execution
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
//! Task execution loop for [`TaskOrchestrator`].
//!
//! Holds `execute` (drive a planned task to a terminal phase) and the
//! per-step `execute_step` dispatcher. Split out of `orchestrator.rs` to
//! keep the construction/state-machine core small; the per-`StepAction`
//! handlers live in `crate::actions`.

use std::collections::HashSet;

use chrono::Utc;
use tokio_util::sync::CancellationToken;

use crate::orchestrator::{OrchestrateError, TaskOrchestrator};
use crate::state::{StepOutcome, StepState, TaskPhase};
use crate::step::StepAction;
use crate::synthesize;

impl TaskOrchestrator {
    /// Execute a previously planned task (after user approval).
    pub async fn execute(&self, task_id: &str) -> Result<String, OrchestrateError> {
        // Confirm the task exists before any state work so a wrong
        // task_id never produces a phantom transition event.
        {
            let tasks = self.tasks.read().await;
            if !tasks.contains_key(task_id) {
                return Err(OrchestrateError::TaskNotFound(task_id.to_string()));
            }
        }
        // PR-6b: clone the task's cancellation token up-front. Every
        // checkpoint below — top of loop, per-step dispatch, the per-
        // action future, the confirmation wait, the replan LLM call —
        // races against `token.cancelled()` so a `cancel()` call mid-
        // step aborts within one polling cycle.
        let token = self.cancel_token_for(task_id).await;
        if token.is_cancelled() {
            // Cancel fired before execute() even started; honor it.
            return Ok(synthesize::summarize_task(
                self.tasks
                    .read()
                    .await
                    .get(task_id)
                    .expect("invariant: task_id is present (checked above)"),
            ));
        }
        self.transition_phase(task_id, TaskPhase::Executing).await;

        tracing::info!(task_id = %task_id, "Starting task execution");

        // Execute steps in topological order, respecting dependencies.
        //
        // `ready_steps` is computed against the *succeeded* set, not the
        // terminal set — a failed step must NOT unblock its dependents.
        // Failure cascades are handled below by marking dependents
        // `Skipped` so the loop still terminates without busy-looping.
        loop {
            if token.is_cancelled() {
                tracing::info!(task_id = %task_id, "execute loop observed cancellation");
                break;
            }
            let ready_steps = {
                let tasks = self.tasks.read().await;
                let task = tasks
                    .get(task_id)
                    .expect("invariant: task inserted by plan(); only state changes after");

                if task.is_complete() {
                    break;
                }

                let succeeded: HashSet<String> = task
                    .step_states
                    .iter()
                    .filter(|(_, s)| s.is_success())
                    .map(|(id, _)| id.clone())
                    .collect();
                // `ready_steps` only checks dep-satisfaction — it does
                // NOT exclude steps that are already terminal. Without
                // this filter a Failed step (which is not in `succeeded`
                // and has no missing deps) would be picked as "ready"
                // again on the next iteration, re-running the failure
                // and re-triggering the replan loop. Only steps whose
                // current state is Pending may be (re)scheduled.
                task.graph
                    .ready_steps(&succeeded)
                    .into_iter()
                    .filter(|id| {
                        matches!(
                            task.step_states.get(id),
                            Some(StepState::Pending) | Some(StepState::Ready)
                        )
                    })
                    .collect::<Vec<_>>()
            };

            if ready_steps.is_empty() {
                // No ready steps but not complete — some steps must be blocked
                // (running or awaiting confirmation). Break to avoid busy-loop.
                break;
            }

            // Execute ready steps (sequentially for now; parallel in future)
            for step_id in &ready_steps {
                if token.is_cancelled() {
                    break;
                }
                self.execute_step(task_id, step_id, &token).await?;
            }
        }

        // Generate summary
        let tasks = self.tasks.read().await;
        let task = tasks
            .get(task_id)
            .expect("invariant: task inserted by plan() and never removed");
        let summary = synthesize::summarize_task(task);

        Ok(summary)
    }

    /// Execute a single step.
    async fn execute_step(
        &self,
        task_id: &str,
        step_id: &str,
        token: &CancellationToken,
    ) -> Result<(), OrchestrateError> {
        // Pre-flight: if cancellation already fired (e.g. between the
        // outer loop's check and us entering this fn), mark the step
        // cancelled and bail without touching the action handlers.
        if token.is_cancelled() {
            self.mark_step_cancelled(task_id, step_id).await;
            return Ok(());
        }
        let (action, tier, description) = {
            let tasks = self.tasks.read().await;
            let task = tasks
                .get(task_id)
                .expect("invariant: task_id always corresponds to a planned task");
            let step = task
                .graph
                .steps
                .get(step_id)
                .expect("invariant: step_id sourced from task.graph.ready_steps()");
            (step.action.clone(), step.tier, step.description.clone())
        };

        // Mark as running
        {
            let mut tasks = self.tasks.write().await;
            let task = tasks
                .get_mut(task_id)
                .expect("invariant: task_id always corresponds to a planned task");
            task.set_step_state(
                step_id,
                StepState::Running {
                    started_at: Utc::now(),
                },
            );
        }

        tracing::info!(task_id = %task_id, step_id = %step_id, step = %description, "Executing step");

        // Check confirmation for destructive/external tiers
        if tier.requires_confirmation() {
            if let Some(confirm) = &self.confirm {
                let spec = confirm::ApprovalSpec::new(&description, tier);
                let nonce = spec.nonce.clone();

                // Mark as awaiting confirmation
                {
                    let mut tasks = self.tasks.write().await;
                    let task = tasks
                        .get_mut(task_id)
                        .expect("invariant: task_id always corresponds to a planned task");
                    task.set_step_state(
                        step_id,
                        StepState::AwaitingConfirmation {
                            nonce: nonce.clone(),
                            since: Utc::now(),
                        },
                    );
                }

                // PR-6b: race the confirmation wait against the task
                // token. A `cancel()` mid-prompt aborts the wait so the
                // step doesn't block forever on a confirmation that will
                // never come.
                let confirm_outcome = tokio::select! {
                    biased;
                    _ = token.cancelled() => {
                        self.mark_step_cancelled(task_id, step_id).await;
                        return Ok(());
                    }
                    r = confirm.request(spec) => r,
                };
                match confirm_outcome {
                    Ok(confirm::ApprovalOutcome::Approved) => {
                        tracing::info!(step = %description, "Step approved");
                    }
                    Ok(outcome) => {
                        let reason = format!("Approval denied: {outcome:?}");
                        let mut tasks = self.tasks.write().await;
                        let task = tasks
                            .get_mut(task_id)
                            .expect("invariant: task_id always corresponds to a planned task");
                        task.set_step_state(step_id, StepState::Cancelled);
                        tracing::info!(step = %description, reason = %reason, "Step cancelled");
                        return Ok(());
                    }
                    Err(e) => {
                        let mut tasks = self.tasks.write().await;
                        let task = tasks
                            .get_mut(task_id)
                            .expect("invariant: task_id always corresponds to a planned task");
                        task.set_step_state(
                            step_id,
                            StepState::Failed {
                                error: format!("Confirmation error: {e}"),
                                retryable: true,
                                failed_at: Utc::now(),
                            },
                        );
                        return Ok(());
                    }
                }
            }
        }

        // Execute the action. PR-6b: race against `token.cancelled()`
        // so an in-flight sandbox/LLM/delegate call aborts mid-flight.
        // Dropping the action future is cancel-safe — none of the
        // handlers hold mutable global state past an await.
        let result = tokio::select! {
            biased;
            _ = token.cancelled() => {
                self.mark_step_cancelled(task_id, step_id).await;
                return Ok(());
            }
            r = async { match &action {
            StepAction::Execute { command, workdir } | StepAction::Test { command, workdir } => {
                self.execute_sandbox_step(command, workdir).await
            }
            StepAction::Shell { command, workdir } => {
                self.execute_shell_step(command, workdir).await
            }
            StepAction::Research { query } => self.execute_research_step(query).await,
            StepAction::Plan { output } => {
                // A `Plan` step that carries no output is effectively a
                // no-op — the LLM emitted a step the executor cannot
                // perform but marked it `plan` so it would silently
                // succeed. Treat that as an honest failure so the user
                // sees that nothing happened, instead of a "succeeded"
                // count that masks an empty result.
                let trimmed = output.trim();
                if trimmed.is_empty() {
                    Err(format!(
                        "Plan step '{description}' had no output to produce — \
                         the planner did not specify what this step should write. \
                         Re-plan with concrete steps (research/execute/implement)."
                    ))
                } else {
                    Ok(StepOutcome {
                        stdout: output.clone(),
                        stderr: String::new(),
                        exit_code: None,
                        artifacts: vec![],
                        summary: summarize_first_line(trimmed),
                    })
                }
            }
            StepAction::Implement { spec, agent } => {
                self.delegate_implement_step(spec, agent).await
            }
            StepAction::Review { artifact } => self.execute_review_step(artifact).await,
            StepAction::Notify { channel, message } => {
                self.execute_notify_step(channel, message).await
            }
        } } => r,
        };

        // Update step state
        let mut tasks = self.tasks.write().await;
        let task = tasks
            .get_mut(task_id)
            .expect("invariant: task_id always corresponds to a planned task");

        match result {
            Ok(outcome) => {
                // Record in audit trail
                if let Some(audit) = &self.audit {
                    let entry = audit::AuditEntry::new(
                        &description,
                        "step executed",
                        &outcome.summary,
                        tier,
                    )
                    .with_source("orchestrator")
                    .with_execution(
                        outcome.stdout.clone(),
                        outcome.stderr.clone(),
                        outcome.exit_code.unwrap_or(0),
                        0, // duration tracked elsewhere
                    );
                    if let Err(e) = audit.record(entry).await {
                        tracing::warn!("Failed to audit step outcome: {e}");
                    }
                }

                task.set_step_state(
                    step_id,
                    StepState::Completed {
                        outcome,
                        completed_at: Utc::now(),
                    },
                );
            }
            Err(error) => {
                // Mirror the success-path audit write so failed steps
                // are recorded in the audit trail too — otherwise a
                // sandbox exit-1 disappears from history once we lifted
                // it out of the Ok arm.
                if let Some(audit) = &self.audit {
                    let entry = audit::AuditEntry::new(&description, "step failed", &error, tier)
                        .with_source("orchestrator")
                        .with_outcome(audit::AuditOutcome::Failure);
                    if let Err(e) = audit.record(entry).await {
                        tracing::warn!("Failed to audit step failure: {e}");
                    }
                }

                task.set_step_state(
                    step_id,
                    StepState::Failed {
                        error: error.clone(),
                        retryable: true,
                        failed_at: Utc::now(),
                    },
                );

                // Mark all transitive dependents `Skipped` so the loop
                // terminates and the user sees an honest status instead
                // of cascading attempts against missing inputs.
                let dependents = task.graph.transitive_dependents(step_id);
                let reason = format!("dependency {step_id} failed");
                for dep_id in dependents {
                    if let Some(state) = task.step_states.get(&dep_id) {
                        if !state.is_terminal() {
                            task.set_step_state(
                                &dep_id,
                                StepState::Skipped {
                                    reason: reason.clone(),
                                },
                            );
                        }
                    }
                }

                // Drop the write lock before the (potentially slow) LLM
                // replan call below. We still own a snapshot of the
                // fields the replan needs.
                drop(tasks);

                // Try to repair the plan if we still have replan budget.
                // Best-effort: a replan failure leaves the task in the
                // standard "failed step + skipped dependents" state.
                self.try_replan_after_failure(task_id, step_id, &description, &error, token)
                    .await;

                // Re-check completion + drive the canonical
                // Reconciling → (Completed | Failed) shape under the
                // state-machine helper.
                let (done, all_succeeded) = {
                    let tasks = self.tasks.read().await;
                    let task = tasks
                        .get(task_id)
                        .expect("invariant: task_id always corresponds to a planned task");
                    (task.is_complete(), task.all_succeeded())
                };
                if done {
                    self.transition_phase(task_id, TaskPhase::Reconciling).await;
                    let terminal = if all_succeeded {
                        TaskPhase::Completed
                    } else {
                        TaskPhase::Failed
                    };
                    self.transition_phase(task_id, terminal).await;
                    tracing::info!(task_id = %task_id, terminal = %terminal.as_str(), "Task complete");
                }
                return Ok(());
            }
        }

        // Drop the write lock before the I/O-bound transition_phase
        // calls below — they take their own lock for the brief in-mem
        // flip and we don't want to hold the executor's lock through
        // the audit-row write and observer publish.
        drop(tasks);
        let (done, all_succeeded) = {
            let tasks = self.tasks.read().await;
            let task = tasks
                .get(task_id)
                .expect("invariant: task_id always corresponds to a planned task");
            (task.is_complete(), task.all_succeeded())
        };
        if done {
            self.transition_phase(task_id, TaskPhase::Reconciling).await;
            let terminal = if all_succeeded {
                TaskPhase::Completed
            } else {
                TaskPhase::Failed
            };
            self.transition_phase(task_id, terminal).await;
            tracing::info!(task_id = %task_id, terminal = %terminal.as_str(), "Task complete");
        }

        Ok(())
    }
}

/// First non-empty line of `s` truncated to 160 chars — used for short
/// step summaries surfaced in the user-facing task report.
fn summarize_first_line(s: &str) -> String {
    let line = s
        .lines()
        .map(str::trim)
        .find(|l| !l.is_empty())
        .unwrap_or("Plan produced");
    if line.chars().count() > 160 {
        let truncated: String = line.chars().take(157).collect();
        format!("{truncated}")
    } else {
        line.to_string()
    }
}