brainos_orchestrate/
lifecycle.rs1use chrono::Utc;
5use tokio_util::sync::CancellationToken;
6
7use crate::decompose::DecompositionContext;
8use crate::graph::TaskGraph;
9use crate::orchestrator::{OrchestrateError, TaskOrchestrator};
10use crate::state::{StepState, TaskPhase, TaskState};
11use crate::synthesize;
12
13impl TaskOrchestrator {
14 pub async fn plan(
17 &self,
18 request: &str,
19 context: DecompositionContext,
20 ) -> Result<(String, String), OrchestrateError> {
21 tracing::info!(request = %request, "Decomposing task");
22
23 let steps = self.decomposer.decompose(request, context).await?;
24 let graph = TaskGraph::from_steps(steps)?;
25
26 let task_id = uuid::Uuid::new_v4().to_string();
27 let task_state = TaskState::new(task_id.clone(), request.to_string(), graph);
28
29 let plan_text = synthesize::format_plan_for_approval(&task_state);
30
31 if let Some(audit) = &self.audit {
33 let entry = audit::AuditEntry::new(
34 request,
35 "decomposed into task plan",
36 &plan_text,
37 audit::ActionTier::Read,
38 )
39 .with_source("orchestrator");
40 if let Err(e) = audit.record(entry).await {
41 tracing::warn!("Failed to audit task plan: {e}");
42 }
43 }
44
45 self.tasks.write().await.insert(task_id.clone(), task_state);
46 self.cancel_tokens
47 .write()
48 .await
49 .insert(task_id.clone(), CancellationToken::new());
50
51 self.record_initial_planning(&task_id).await;
55 self.transition_phase(&task_id, TaskPhase::AwaitingApproval)
56 .await;
57
58 tracing::info!(task_id = %task_id, "Task plan created");
59 Ok((task_id, plan_text))
60 }
61 pub async fn get_task(&self, task_id: &str) -> Option<TaskState> {
63 self.tasks.read().await.get(task_id).cloned()
64 }
65
66 pub async fn pending_approvals(&self) -> Vec<String> {
70 self.tasks
71 .read()
72 .await
73 .iter()
74 .filter(|(_, t)| t.phase == TaskPhase::AwaitingApproval)
75 .map(|(id, _)| id.clone())
76 .collect()
77 }
78
79 pub async fn list_tasks(&self) -> Vec<(String, String, TaskPhase)> {
81 self.tasks
82 .read()
83 .await
84 .iter()
85 .map(|(id, t)| (id.clone(), t.request.clone(), t.phase))
86 .collect()
87 }
88
89 pub async fn cancel(&self, task_id: &str) -> Result<(), OrchestrateError> {
96 {
97 let mut tasks = self.tasks.write().await;
98 let task = tasks
99 .get_mut(task_id)
100 .ok_or_else(|| OrchestrateError::TaskNotFound(task_id.to_string()))?;
101 for (_, state) in task.step_states.iter_mut() {
102 if !state.is_terminal() {
103 *state = StepState::Cancelled;
104 }
105 }
106 }
107 self.transition_phase(task_id, TaskPhase::Cancelled).await;
108 if let Some(t) = self.cancel_tokens.read().await.get(task_id) {
114 t.cancel();
115 }
116 Ok(())
117 }
118
119 pub(crate) async fn transition_phase(&self, task_id: &str, to: TaskPhase) {
127 let from = {
131 let mut tasks = self.tasks.write().await;
132 let task = match tasks.get_mut(task_id) {
133 Some(t) => t,
134 None => return,
135 };
136 if task.phase.is_terminal() && task.phase != to {
137 tracing::debug!(
141 task_id = %task_id,
142 from = %task.phase.as_str(),
143 to = %to.as_str(),
144 "ignoring transition out of terminal state"
145 );
146 return;
147 }
148 if task.phase == to {
149 return;
150 }
151 let from = task.phase;
152 task.phase = to;
153 if to.is_terminal() {
154 task.completed_at = Some(Utc::now());
155 }
156 from
157 };
158
159 if let Some(pool) = &self.state_pool {
162 let task_id_owned = task_id.to_string();
163 let state_str = to.as_str();
164 let res = pool.with_conn(|conn| {
165 conn.execute(
166 "INSERT INTO task_states (task_id, state) VALUES (?1, ?2)",
167 rusqlite::params![task_id_owned, state_str],
168 )?;
169 Ok(())
170 });
171 if let Err(e) = res {
172 tracing::warn!(
173 task_id = %task_id,
174 state = %to.as_str(),
175 error = %e,
176 "task_states row append failed"
177 );
178 }
179 }
180
181 if let Some(observer) = &self.observer {
183 let event = observe::BrainEvent::TaskStateChange {
184 id: uuid::Uuid::new_v4(),
185 task_id: task_id.to_string(),
186 from: from.as_str().to_string(),
187 to: to.as_str().to_string(),
188 ts: Utc::now(),
189 };
190 let _ = observer.publish(event).await;
191 }
192
193 tracing::info!(
194 task_id = %task_id,
195 from = %from.as_str(),
196 to = %to.as_str(),
197 "task phase transition"
198 );
199 }
200
201 pub(crate) async fn record_initial_planning(&self, task_id: &str) {
206 if let Some(pool) = &self.state_pool {
207 let task_id_owned = task_id.to_string();
208 let res = pool.with_conn(|conn| {
209 conn.execute(
210 "INSERT INTO task_states (task_id, state) VALUES (?1, 'planning')",
211 rusqlite::params![task_id_owned],
212 )?;
213 Ok(())
214 });
215 if let Err(e) = res {
216 tracing::warn!(
217 task_id = %task_id,
218 error = %e,
219 "initial planning state append failed"
220 );
221 }
222 }
223 if let Some(observer) = &self.observer {
224 let event = observe::BrainEvent::TaskStateChange {
225 id: uuid::Uuid::new_v4(),
226 task_id: task_id.to_string(),
227 from: "none".into(),
228 to: "planning".into(),
229 ts: Utc::now(),
230 };
231 let _ = observer.publish(event).await;
232 }
233 }
234}