brainos_orchestrate/orchestrator.rs
1//! Task orchestrator — the execution loop that coordinates decomposition,
2//! approval, execution, and outcome synthesis.
3//!
4//! The `TaskOrchestrator` `impl` is split across sibling modules sharing the
5//! `pub(crate)` fields: construction + cancellation-token helpers here,
6//! `plan`/queries/state-machine in `crate::lifecycle`, the execution loop in
7//! `crate::execute`, corrective replanning in `crate::replan`, and the
8//! per-`StepAction` handlers in `crate::actions` / `crate::aggregation`.
9
10use std::collections::HashMap;
11use std::sync::Arc;
12
13use thiserror::Error;
14use tokio::sync::RwLock;
15use tokio_util::sync::CancellationToken;
16
17use crate::decompose::{DecompositionError, TaskDecomposer};
18use crate::state::{StepState, TaskState};
19
20#[derive(Debug, Error)]
21pub enum OrchestrateError {
22 #[error("Decomposition failed: {0}")]
23 Decomposition(#[from] DecompositionError),
24 #[error("Graph error: {0}")]
25 Graph(#[from] crate::graph::GraphError),
26 #[error("Sandbox error: {0}")]
27 Sandbox(String),
28 #[error("Confirmation error: {0}")]
29 Confirmation(String),
30 #[error("Budget exceeded: {0}")]
31 BudgetExceeded(String),
32 #[error("Audit error: {0}")]
33 Audit(String),
34 #[error("Task not found: {0}")]
35 TaskNotFound(String),
36 #[error("Task cancelled")]
37 Cancelled,
38}
39
40/// The task orchestrator — manages the full lifecycle of task plans.
41///
42/// Fields are `pub(crate)` so per-action handlers (`crate::actions`) and
43/// aggregation helpers (`crate::aggregation`) can split `impl` across
44/// sibling modules. Outside the `orchestrate` crate the struct's surface
45/// is the public methods only.
46pub struct TaskOrchestrator {
47 pub(crate) decomposer: Arc<dyn TaskDecomposer>,
48 pub(crate) audit: Option<Arc<dyn audit::AuditTrail>>,
49 pub(crate) confirm: Option<Arc<dyn confirm::ConfirmationEngine>>,
50 pub(crate) budget: Option<Arc<dyn budget::CostBudget>>,
51 pub(crate) sandbox: Option<Arc<dyn sandbox::SandboxExecutor>>,
52 pub(crate) agents: Option<Arc<delegate::AgentRegistry>>,
53 /// LLM provider for `Research` / `Review` step types.
54 pub(crate) llm: Option<Arc<dyn cortex::LlmProvider>>,
55 /// Channel dispatcher for `Notify` step types.
56 pub(crate) dispatcher: Option<Arc<channel::ChannelDispatcher>>,
57 /// Episodic memory store — captures delegation outcomes so future
58 /// runs can recall them.
59 pub(crate) episodic: Option<Arc<hippocampus::EpisodicStore>>,
60 /// Default fallback chain applied to every delegation. Individual
61 /// step failures follow this chain unless overridden in the future.
62 pub(crate) delegation_policy: delegate::EscalationPolicy,
63 /// Cached binary allowlist used to rebuild a `DecompositionContext`
64 /// inside the replan-on-failure loop. Populated by the wiring
65 /// layer; empty by default (no allowlist constraint surfaced to
66 /// the LLM during replan).
67 pub(crate) available_tools: Vec<String>,
68 /// Active tasks indexed by task ID.
69 pub(crate) tasks: RwLock<HashMap<String, TaskState>>,
70 /// Observer bus for `BrainEvent::TaskStateChange` emissions. When
71 /// unwired, transitions still update the in-memory state and the
72 /// optional persistence pool, but no event goes out — existing tests
73 /// can keep building bare orchestrators.
74 pub(crate) observer: Option<Arc<dyn observe::Observer>>,
75 /// SQLite pool used to append rows to the `task_states` audit table
76 /// (migration v22). When unwired, the state-machine history lives
77 /// only in memory.
78 pub(crate) state_pool: Option<storage::SqlitePool>,
79 /// Per-task cancellation tokens (PR-6b). Created on `plan()`,
80 /// observed at every orchestrator checkpoint (the execute loop, the
81 /// confirmation wait, the per-action future, the replan LLM call),
82 /// and fired by `cancel()` so in-flight child futures abort within
83 /// one polling cycle instead of waiting for the current step to
84 /// finish.
85 pub(crate) cancel_tokens: RwLock<HashMap<String, CancellationToken>>,
86}
87
88/// Maximum number of replan-on-failure attempts per task. Bounds LLM
89/// cost when the model keeps producing plans the sandbox refuses.
90pub(crate) const MAX_REPLAN_ATTEMPTS: u32 = 2;
91
92impl TaskOrchestrator {
93 pub fn new(decomposer: Arc<dyn TaskDecomposer>) -> Self {
94 Self {
95 decomposer,
96 audit: None,
97 confirm: None,
98 budget: None,
99 sandbox: None,
100 agents: None,
101 llm: None,
102 dispatcher: None,
103 episodic: None,
104 delegation_policy: delegate::EscalationPolicy::default(),
105 available_tools: Vec::new(),
106 tasks: RwLock::new(HashMap::new()),
107 observer: None,
108 state_pool: None,
109 cancel_tokens: RwLock::new(HashMap::new()),
110 }
111 }
112
113 /// Look up the per-task cancellation token. Returns a fresh
114 /// (never-cancelled) token for unknown task IDs so callers that pre-
115 /// date the cancel-token map (e.g. tasks constructed by tests that
116 /// inject directly into `self.tasks`) keep their old behavior — they
117 /// just never observe a cancel signal.
118 pub(crate) async fn cancel_token_for(&self, task_id: &str) -> CancellationToken {
119 self.cancel_tokens
120 .read()
121 .await
122 .get(task_id)
123 .cloned()
124 .unwrap_or_else(CancellationToken::new)
125 }
126
127 /// Mark a single step `Cancelled` under a brief write lock. Used by
128 /// the cancellation arms of `execute_step` so the per-step state
129 /// reflects the abort even when `cancel()` raced ahead (which would
130 /// have flipped it to Cancelled already — overwriting Cancelled with
131 /// Cancelled is a no-op).
132 pub(crate) async fn mark_step_cancelled(&self, task_id: &str, step_id: &str) {
133 let mut tasks = self.tasks.write().await;
134 if let Some(task) = tasks.get_mut(task_id) {
135 task.set_step_state(step_id, StepState::Cancelled);
136 }
137 }
138
139 /// Cache the sandbox's binary allowlist so the replan-on-failure
140 /// loop can include it in its corrective LLM call. Without this the
141 /// replan call has no allowlist context and may suggest binaries
142 /// the sandbox would reject.
143 pub fn with_available_tools(mut self, tools: Vec<String>) -> Self {
144 self.available_tools = tools;
145 self
146 }
147
148 pub fn with_audit(mut self, audit: Arc<dyn audit::AuditTrail>) -> Self {
149 self.audit = Some(audit);
150 self
151 }
152
153 pub fn with_confirmation(mut self, confirm: Arc<dyn confirm::ConfirmationEngine>) -> Self {
154 self.confirm = Some(confirm);
155 self
156 }
157
158 pub fn with_budget(mut self, budget: Arc<dyn budget::CostBudget>) -> Self {
159 self.budget = Some(budget);
160 self
161 }
162
163 pub fn with_sandbox(mut self, sandbox: Arc<dyn sandbox::SandboxExecutor>) -> Self {
164 self.sandbox = Some(sandbox);
165 self
166 }
167
168 /// Attach the agent registry — enables `StepAction::Implement`
169 /// dispatch to specialist delegates.
170 pub fn with_agents(mut self, agents: Arc<delegate::AgentRegistry>) -> Self {
171 self.agents = Some(agents);
172 self
173 }
174
175 /// Attach an LLM provider so `Research` and `Review` steps actually
176 /// run a model call instead of returning a no-op string.
177 pub fn with_llm(mut self, llm: Arc<dyn cortex::LlmProvider>) -> Self {
178 self.llm = Some(llm);
179 self
180 }
181
182 /// Attach a channel dispatcher so `Notify` steps actually deliver
183 /// the message to the user's preferred channel.
184 pub fn with_channel_dispatcher(mut self, dispatcher: Arc<channel::ChannelDispatcher>) -> Self {
185 self.dispatcher = Some(dispatcher);
186 self
187 }
188
189 /// Attach an episodic memory store — delegate outcomes are recorded
190 /// so they're searchable in future sessions.
191 pub fn with_episodic(mut self, store: Arc<hippocampus::EpisodicStore>) -> Self {
192 self.episodic = Some(store);
193 self
194 }
195
196 /// Attach an observer bus so phase transitions emit
197 /// [`observe::BrainEvent::TaskStateChange`]. Unwired = silent.
198 pub fn with_observer(mut self, observer: Arc<dyn observe::Observer>) -> Self {
199 self.observer = Some(observer);
200 self
201 }
202
203 /// Attach a SQLite pool so phase transitions append rows to the
204 /// `task_states` audit table (migration v22). Unwired = in-memory
205 /// history only.
206 pub fn with_state_pool(mut self, pool: storage::SqlitePool) -> Self {
207 self.state_pool = Some(pool);
208 self
209 }
210
211 /// Override the default delegation escalation policy.
212 pub fn with_delegation_policy(mut self, policy: delegate::EscalationPolicy) -> Self {
213 self.delegation_policy = policy;
214 self
215 }
216}
217
218#[cfg(test)]
219mod tests;