Skip to main content

brainos_orchestrate/
lifecycle.rs

1//! Task lifecycle for [`TaskOrchestrator`]: planning entry, read queries,
2//! cancellation, and the canonical phase-transition state machine.
3
4use chrono::Utc;
5use tokio_util::sync::CancellationToken;
6
7use crate::decompose::DecompositionContext;
8use crate::graph::TaskGraph;
9use crate::orchestrator::{OrchestrateError, TaskOrchestrator};
10use crate::state::{StepState, TaskPhase, TaskState};
11use crate::synthesize;
12
13impl TaskOrchestrator {
14    /// Decompose a user request into a task plan.
15    /// Returns the task ID and a formatted plan for user review.
16    pub async fn plan(
17        &self,
18        request: &str,
19        context: DecompositionContext,
20    ) -> Result<(String, String), OrchestrateError> {
21        tracing::info!(request = %request, "Decomposing task");
22
23        let steps = self.decomposer.decompose(request, context).await?;
24        let graph = TaskGraph::from_steps(steps)?;
25
26        let task_id = uuid::Uuid::new_v4().to_string();
27        let task_state = TaskState::new(task_id.clone(), request.to_string(), graph);
28
29        let plan_text = synthesize::format_plan_for_approval(&task_state);
30
31        // Record in audit trail
32        if let Some(audit) = &self.audit {
33            let entry = audit::AuditEntry::new(
34                request,
35                "decomposed into task plan",
36                &plan_text,
37                audit::ActionTier::Read,
38            )
39            .with_source("orchestrator");
40            if let Err(e) = audit.record(entry).await {
41                tracing::warn!("Failed to audit task plan: {e}");
42            }
43        }
44
45        self.tasks.write().await.insert(task_id.clone(), task_state);
46        self.cancel_tokens
47            .write()
48            .await
49            .insert(task_id.clone(), CancellationToken::new());
50
51        // State-machine: emit the initial `planning` entry, then
52        // transition to AwaitingApproval. Both events are visible to the
53        // observer and persisted to `task_states` (if a pool is wired).
54        self.record_initial_planning(&task_id).await;
55        self.transition_phase(&task_id, TaskPhase::AwaitingApproval)
56            .await;
57
58        tracing::info!(task_id = %task_id, "Task plan created");
59        Ok((task_id, plan_text))
60    }
61    /// Get the current state of a task.
62    pub async fn get_task(&self, task_id: &str) -> Option<TaskState> {
63        self.tasks.read().await.get(task_id).cloned()
64    }
65
66    /// Return task IDs currently in the `AwaitingApproval` phase. Used by
67    /// the signal pipeline to resolve bare `approve` / `reject` (no id)
68    /// to the single pending plan when there's exactly one.
69    pub async fn pending_approvals(&self) -> Vec<String> {
70        self.tasks
71            .read()
72            .await
73            .iter()
74            .filter(|(_, t)| t.phase == TaskPhase::AwaitingApproval)
75            .map(|(id, _)| id.clone())
76            .collect()
77    }
78
79    /// List all active tasks.
80    pub async fn list_tasks(&self) -> Vec<(String, String, TaskPhase)> {
81        self.tasks
82            .read()
83            .await
84            .iter()
85            .map(|(id, t)| (id.clone(), t.request.clone(), t.phase))
86            .collect()
87    }
88
89    /// Cancel a task. Flips all non-terminal step states to `Cancelled`,
90    /// transitions the task phase to `Cancelled`, and (PR-6b) fires the
91    /// per-task [`CancellationToken`] so any in-flight step future
92    /// observing the token aborts within one polling cycle — without
93    /// PR-6b, cancellation would have to wait for the current step to
94    /// finish on its own.
95    pub async fn cancel(&self, task_id: &str) -> Result<(), OrchestrateError> {
96        {
97            let mut tasks = self.tasks.write().await;
98            let task = tasks
99                .get_mut(task_id)
100                .ok_or_else(|| OrchestrateError::TaskNotFound(task_id.to_string()))?;
101            for (_, state) in task.step_states.iter_mut() {
102                if !state.is_terminal() {
103                    *state = StepState::Cancelled;
104                }
105            }
106        }
107        self.transition_phase(task_id, TaskPhase::Cancelled).await;
108        // Fire the cancellation token AFTER state has already been
109        // flipped to Cancelled — that way a select-loser that races to
110        // overwrite step state with Cancelled is a no-op, not a write
111        // that could clobber a Completed/Failed transition that
112        // legitimately landed first.
113        if let Some(t) = self.cancel_tokens.read().await.get(task_id) {
114            t.cancel();
115        }
116        Ok(())
117    }
118
119    /// State-machine helper. The single canonical mutator of
120    /// [`TaskState::phase`]: takes the write lock just long enough to
121    /// flip the in-memory field, then releases it before doing
122    /// I/O-bound work (audit row write + observer publish). Idempotent
123    /// for terminal transitions — if a task is already in a terminal
124    /// phase, the helper is a no-op so cancel-then-complete races stay
125    /// well-defined.
126    pub(crate) async fn transition_phase(&self, task_id: &str, to: TaskPhase) {
127        // Read prior phase + write the new one under one lock. The
128        // bound block guarantees the guard drops before the async I/O
129        // below so other handlers aren't blocked on the disk write.
130        let from = {
131            let mut tasks = self.tasks.write().await;
132            let task = match tasks.get_mut(task_id) {
133                Some(t) => t,
134                None => return,
135            };
136            if task.phase.is_terminal() && task.phase != to {
137                // Already done — refuse to flip out of a terminal
138                // state so a late completion doesn't overwrite a
139                // cancellation.
140                tracing::debug!(
141                    task_id = %task_id,
142                    from = %task.phase.as_str(),
143                    to = %to.as_str(),
144                    "ignoring transition out of terminal state"
145                );
146                return;
147            }
148            if task.phase == to {
149                return;
150            }
151            let from = task.phase;
152            task.phase = to;
153            if to.is_terminal() {
154                task.completed_at = Some(Utc::now());
155            }
156            from
157        };
158
159        // Audit table append (best-effort — a write failure is logged
160        // and we proceed so the in-memory phase update isn't undone).
161        if let Some(pool) = &self.state_pool {
162            let task_id_owned = task_id.to_string();
163            let state_str = to.as_str();
164            let res = pool.with_conn(|conn| {
165                conn.execute(
166                    "INSERT INTO task_states (task_id, state) VALUES (?1, ?2)",
167                    rusqlite::params![task_id_owned, state_str],
168                )?;
169                Ok(())
170            });
171            if let Err(e) = res {
172                tracing::warn!(
173                    task_id = %task_id,
174                    state = %to.as_str(),
175                    error = %e,
176                    "task_states row append failed"
177                );
178            }
179        }
180
181        // Observer publish (best-effort, same rationale).
182        if let Some(observer) = &self.observer {
183            let event = observe::BrainEvent::TaskStateChange {
184                id: uuid::Uuid::new_v4(),
185                task_id: task_id.to_string(),
186                from: from.as_str().to_string(),
187                to: to.as_str().to_string(),
188                ts: Utc::now(),
189            };
190            let _ = observer.publish(event).await;
191        }
192
193        tracing::info!(
194            task_id = %task_id,
195            from = %from.as_str(),
196            to = %to.as_str(),
197            "task phase transition"
198        );
199    }
200
201    /// Convenience: emit the initial Planning transition (`from = "none"`).
202    /// Called from [`plan`] right after the task is inserted into the
203    /// active map, so the audit table records the task's birth before
204    /// any subsequent state moves.
205    pub(crate) async fn record_initial_planning(&self, task_id: &str) {
206        if let Some(pool) = &self.state_pool {
207            let task_id_owned = task_id.to_string();
208            let res = pool.with_conn(|conn| {
209                conn.execute(
210                    "INSERT INTO task_states (task_id, state) VALUES (?1, 'planning')",
211                    rusqlite::params![task_id_owned],
212                )?;
213                Ok(())
214            });
215            if let Err(e) = res {
216                tracing::warn!(
217                    task_id = %task_id,
218                    error = %e,
219                    "initial planning state append failed"
220                );
221            }
222        }
223        if let Some(observer) = &self.observer {
224            let event = observe::BrainEvent::TaskStateChange {
225                id: uuid::Uuid::new_v4(),
226                task_id: task_id.to_string(),
227                from: "none".into(),
228                to: "planning".into(),
229                ts: Utc::now(),
230            };
231            let _ = observer.publish(event).await;
232        }
233    }
234}