Skip to main content

brainos_orchestrate/
orchestrator.rs

1//! Task orchestrator — the execution loop that coordinates decomposition,
2//! approval, execution, and outcome synthesis.
3//!
4//! The `TaskOrchestrator` `impl` is split across sibling modules sharing the
5//! `pub(crate)` fields: construction + cancellation-token helpers here,
6//! `plan`/queries/state-machine in `crate::lifecycle`, the execution loop in
7//! `crate::execute`, corrective replanning in `crate::replan`, and the
8//! per-`StepAction` handlers in `crate::actions` / `crate::aggregation`.
9
10use std::collections::HashMap;
11use std::sync::Arc;
12
13use thiserror::Error;
14use tokio::sync::RwLock;
15use tokio_util::sync::CancellationToken;
16
17use crate::decompose::{DecompositionError, TaskDecomposer};
18use crate::state::{StepState, TaskState};
19
20#[derive(Debug, Error)]
21pub enum OrchestrateError {
22    #[error("Decomposition failed: {0}")]
23    Decomposition(#[from] DecompositionError),
24    #[error("Graph error: {0}")]
25    Graph(#[from] crate::graph::GraphError),
26    #[error("Sandbox error: {0}")]
27    Sandbox(String),
28    #[error("Confirmation error: {0}")]
29    Confirmation(String),
30    #[error("Budget exceeded: {0}")]
31    BudgetExceeded(String),
32    #[error("Audit error: {0}")]
33    Audit(String),
34    #[error("Task not found: {0}")]
35    TaskNotFound(String),
36    #[error("Task cancelled")]
37    Cancelled,
38}
39
40/// The task orchestrator — manages the full lifecycle of task plans.
41///
42/// Fields are `pub(crate)` so per-action handlers (`crate::actions`) and
43/// aggregation helpers (`crate::aggregation`) can split `impl` across
44/// sibling modules. Outside the `orchestrate` crate the struct's surface
45/// is the public methods only.
46pub struct TaskOrchestrator {
47    pub(crate) decomposer: Arc<dyn TaskDecomposer>,
48    pub(crate) audit: Option<Arc<dyn audit::AuditTrail>>,
49    pub(crate) confirm: Option<Arc<dyn confirm::ConfirmationEngine>>,
50    pub(crate) budget: Option<Arc<dyn budget::CostBudget>>,
51    pub(crate) sandbox: Option<Arc<dyn sandbox::SandboxExecutor>>,
52    pub(crate) agents: Option<Arc<delegate::AgentRegistry>>,
53    /// LLM provider for `Research` / `Review` step types.
54    pub(crate) llm: Option<Arc<dyn cortex::LlmProvider>>,
55    /// Channel dispatcher for `Notify` step types.
56    pub(crate) dispatcher: Option<Arc<channel::ChannelDispatcher>>,
57    /// Episodic memory store — captures delegation outcomes so future
58    /// runs can recall them.
59    pub(crate) episodic: Option<Arc<hippocampus::EpisodicStore>>,
60    /// Default fallback chain applied to every delegation. Individual
61    /// step failures follow this chain unless overridden in the future.
62    pub(crate) delegation_policy: delegate::EscalationPolicy,
63    /// Cached binary allowlist used to rebuild a `DecompositionContext`
64    /// inside the replan-on-failure loop. Populated by the wiring
65    /// layer; empty by default (no allowlist constraint surfaced to
66    /// the LLM during replan).
67    pub(crate) available_tools: Vec<String>,
68    /// Active tasks indexed by task ID.
69    pub(crate) tasks: RwLock<HashMap<String, TaskState>>,
70    /// Observer bus for `BrainEvent::TaskStateChange` emissions. When
71    /// unwired, transitions still update the in-memory state and the
72    /// optional persistence pool, but no event goes out — existing tests
73    /// can keep building bare orchestrators.
74    pub(crate) observer: Option<Arc<dyn observe::Observer>>,
75    /// SQLite pool used to append rows to the `task_states` audit table
76    /// (migration v22). When unwired, the state-machine history lives
77    /// only in memory.
78    pub(crate) state_pool: Option<storage::SqlitePool>,
79    /// Per-task cancellation tokens (PR-6b). Created on `plan()`,
80    /// observed at every orchestrator checkpoint (the execute loop, the
81    /// confirmation wait, the per-action future, the replan LLM call),
82    /// and fired by `cancel()` so in-flight child futures abort within
83    /// one polling cycle instead of waiting for the current step to
84    /// finish.
85    pub(crate) cancel_tokens: RwLock<HashMap<String, CancellationToken>>,
86}
87
88/// Maximum number of replan-on-failure attempts per task. Bounds LLM
89/// cost when the model keeps producing plans the sandbox refuses.
90pub(crate) const MAX_REPLAN_ATTEMPTS: u32 = 2;
91
92impl TaskOrchestrator {
93    pub fn new(decomposer: Arc<dyn TaskDecomposer>) -> Self {
94        Self {
95            decomposer,
96            audit: None,
97            confirm: None,
98            budget: None,
99            sandbox: None,
100            agents: None,
101            llm: None,
102            dispatcher: None,
103            episodic: None,
104            delegation_policy: delegate::EscalationPolicy::default(),
105            available_tools: Vec::new(),
106            tasks: RwLock::new(HashMap::new()),
107            observer: None,
108            state_pool: None,
109            cancel_tokens: RwLock::new(HashMap::new()),
110        }
111    }
112
113    /// Look up the per-task cancellation token. Returns a fresh
114    /// (never-cancelled) token for unknown task IDs so callers that pre-
115    /// date the cancel-token map (e.g. tasks constructed by tests that
116    /// inject directly into `self.tasks`) keep their old behavior — they
117    /// just never observe a cancel signal.
118    pub(crate) async fn cancel_token_for(&self, task_id: &str) -> CancellationToken {
119        self.cancel_tokens
120            .read()
121            .await
122            .get(task_id)
123            .cloned()
124            .unwrap_or_else(CancellationToken::new)
125    }
126
127    /// Mark a single step `Cancelled` under a brief write lock. Used by
128    /// the cancellation arms of `execute_step` so the per-step state
129    /// reflects the abort even when `cancel()` raced ahead (which would
130    /// have flipped it to Cancelled already — overwriting Cancelled with
131    /// Cancelled is a no-op).
132    pub(crate) async fn mark_step_cancelled(&self, task_id: &str, step_id: &str) {
133        let mut tasks = self.tasks.write().await;
134        if let Some(task) = tasks.get_mut(task_id) {
135            task.set_step_state(step_id, StepState::Cancelled);
136        }
137    }
138
139    /// Cache the sandbox's binary allowlist so the replan-on-failure
140    /// loop can include it in its corrective LLM call. Without this the
141    /// replan call has no allowlist context and may suggest binaries
142    /// the sandbox would reject.
143    pub fn with_available_tools(mut self, tools: Vec<String>) -> Self {
144        self.available_tools = tools;
145        self
146    }
147
148    pub fn with_audit(mut self, audit: Arc<dyn audit::AuditTrail>) -> Self {
149        self.audit = Some(audit);
150        self
151    }
152
153    pub fn with_confirmation(mut self, confirm: Arc<dyn confirm::ConfirmationEngine>) -> Self {
154        self.confirm = Some(confirm);
155        self
156    }
157
158    pub fn with_budget(mut self, budget: Arc<dyn budget::CostBudget>) -> Self {
159        self.budget = Some(budget);
160        self
161    }
162
163    pub fn with_sandbox(mut self, sandbox: Arc<dyn sandbox::SandboxExecutor>) -> Self {
164        self.sandbox = Some(sandbox);
165        self
166    }
167
168    /// Attach the agent registry — enables `StepAction::Implement`
169    /// dispatch to specialist delegates.
170    pub fn with_agents(mut self, agents: Arc<delegate::AgentRegistry>) -> Self {
171        self.agents = Some(agents);
172        self
173    }
174
175    /// Attach an LLM provider so `Research` and `Review` steps actually
176    /// run a model call instead of returning a no-op string.
177    pub fn with_llm(mut self, llm: Arc<dyn cortex::LlmProvider>) -> Self {
178        self.llm = Some(llm);
179        self
180    }
181
182    /// Attach a channel dispatcher so `Notify` steps actually deliver
183    /// the message to the user's preferred channel.
184    pub fn with_channel_dispatcher(mut self, dispatcher: Arc<channel::ChannelDispatcher>) -> Self {
185        self.dispatcher = Some(dispatcher);
186        self
187    }
188
189    /// Attach an episodic memory store — delegate outcomes are recorded
190    /// so they're searchable in future sessions.
191    pub fn with_episodic(mut self, store: Arc<hippocampus::EpisodicStore>) -> Self {
192        self.episodic = Some(store);
193        self
194    }
195
196    /// Attach an observer bus so phase transitions emit
197    /// [`observe::BrainEvent::TaskStateChange`]. Unwired = silent.
198    pub fn with_observer(mut self, observer: Arc<dyn observe::Observer>) -> Self {
199        self.observer = Some(observer);
200        self
201    }
202
203    /// Attach a SQLite pool so phase transitions append rows to the
204    /// `task_states` audit table (migration v22). Unwired = in-memory
205    /// history only.
206    pub fn with_state_pool(mut self, pool: storage::SqlitePool) -> Self {
207        self.state_pool = Some(pool);
208        self
209    }
210
211    /// Override the default delegation escalation policy.
212    pub fn with_delegation_policy(mut self, policy: delegate::EscalationPolicy) -> Self {
213        self.delegation_policy = policy;
214        self
215    }
216}
217
218#[cfg(test)]
219mod tests;