bamboo_server/schedule_app/
manager.rs1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use chrono::Utc;
6use tokio::sync::{broadcast, mpsc, RwLock};
7
8use bamboo_agent_core::tools::ToolExecutor;
9use bamboo_agent_core::{AgentEvent, Message, Role, Session};
10use bamboo_domain::reasoning::ReasoningEffort;
11use bamboo_engine::execution::{
12 create_event_forwarder, finalize_runner, get_or_create_event_sender, try_reserve_runner,
13 AgentRunner, RunnerReservation,
14};
15use bamboo_engine::ExecuteRequest;
16use bamboo_infrastructure::LockedSessionStore;
17
18use super::store::{ClaimedScheduleRun, ScheduleStore};
19use super::trigger_engine::DynTriggerEngine;
20use bamboo_domain::{ScheduleRunConfig, ScheduleRunStatus};
21
22#[derive(Debug, Clone)]
23pub struct ScheduleRunJob {
24 pub run_id: String,
25 pub schedule_id: String,
26 pub schedule_name: String,
27 pub run_config: ScheduleRunConfig,
28 pub scheduled_for: chrono::DateTime<chrono::Utc>,
29 pub claimed_at: chrono::DateTime<chrono::Utc>,
30 pub was_catch_up: bool,
31}
32
33#[derive(Debug, Clone)]
39pub struct ResolvedRunConfig {
40 pub model: String,
41 pub reasoning_effort: Option<ReasoningEffort>,
42 pub system_prompt: String,
43 pub base_system_prompt: String,
44 pub workspace_path: Option<String>,
45}
46
47#[derive(Clone)]
48pub struct ScheduleContext {
49 pub schedule_store: Arc<ScheduleStore>,
50 pub agent: Arc<bamboo_engine::Agent>,
51 pub persistence: Arc<LockedSessionStore>,
52 pub tools: Arc<dyn ToolExecutor>,
53 pub sessions_cache: Arc<RwLock<HashMap<String, Session>>>,
54 pub agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
55 pub session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
56 pub app_data_dir: Option<std::path::PathBuf>,
57 pub trigger_engine: DynTriggerEngine,
58 pub resolve_run_config: Arc<dyn Fn(&ScheduleRunJob) -> ResolvedRunConfig + Send + Sync>,
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64enum ScheduleRunLifecycleResult {
65 Terminal(ScheduleRunStatus),
66 BackgroundExecutionInProgress,
67}
68
69#[derive(Clone)]
70pub struct ScheduleManager {
71 tx: mpsc::Sender<ScheduleRunJob>,
72}
73
74impl ScheduleManager {
75 pub fn new(ctx: ScheduleContext) -> Self {
76 let (tx, mut rx) = mpsc::channel::<ScheduleRunJob>(128);
77
78 tokio::spawn({
80 let ctx = ctx.clone();
81 async move {
82 while let Some(job) = rx.recv().await {
83 if let Err(error) = ctx
84 .schedule_store
85 .mark_run_started(&job.schedule_id, &job.run_id)
86 .await
87 {
88 tracing::warn!(
89 "failed to mark schedule run started for {} / {}: {}",
90 job.schedule_id,
91 job.run_id,
92 error
93 );
94 }
95 let schedule_id = job.schedule_id.clone();
96 let run_id = job.run_id.clone();
97 match run_schedule_job(ctx.clone(), job).await {
98 Ok(ScheduleRunLifecycleResult::Terminal(status)) => {
99 if let Err(error) = ctx
100 .schedule_store
101 .mark_run_terminal(&schedule_id, &run_id, status, None)
102 .await
103 {
104 tracing::warn!(
105 "failed to mark schedule run terminal state for {} / {}: {}",
106 schedule_id,
107 run_id,
108 error
109 );
110 }
111 }
112 Ok(ScheduleRunLifecycleResult::BackgroundExecutionInProgress) => {}
113 Err(e) => {
114 tracing::warn!("schedule job failed: {e}");
115 if let Err(error) = ctx
116 .schedule_store
117 .mark_run_terminal(
118 &schedule_id,
119 &run_id,
120 ScheduleRunStatus::Failed,
121 Some(e.clone()),
122 )
123 .await
124 {
125 tracing::warn!(
126 "failed to mark schedule run failed state for {} / {}: {}",
127 schedule_id,
128 run_id,
129 error
130 );
131 }
132 }
133 }
134 }
135 }
136 });
137
138 tokio::spawn({
140 let tx = tx.clone();
141 let store = ctx.schedule_store.clone();
142 let trigger_engine = ctx.trigger_engine.clone();
143 async move {
144 let mut ticker = tokio::time::interval(Duration::from_secs(15));
145 loop {
146 ticker.tick().await;
147 let now = Utc::now();
148 let claimed: Vec<ClaimedScheduleRun> = match store
149 .claim_due_runs_with_engine(now, trigger_engine.as_ref())
150 .await
151 {
152 Ok(v) => v,
153 Err(e) => {
154 tracing::warn!("claim_due_runs failed: {e}");
155 continue;
156 }
157 };
158 for c in claimed {
159 let schedule_id = c.schedule_id.clone();
160 let run_id = c.run_id.clone();
161 if tx
162 .send(ScheduleRunJob {
163 run_id: c.run_id,
164 schedule_id: c.schedule_id,
165 schedule_name: c.schedule_name,
166 run_config: c.run_config,
167 scheduled_for: c.scheduled_for,
168 claimed_at: c.claimed_at,
169 was_catch_up: c.was_catch_up,
170 })
171 .await
172 .is_err()
173 {
174 let _ = store
175 .mark_run_dequeued_without_start(
176 &schedule_id,
177 &run_id,
178 Some("schedule manager is not running".to_string()),
179 )
180 .await;
181 }
182 }
183 }
184 }
185 });
186
187 Self { tx }
188 }
189
190 pub async fn enqueue_run_now(&self, job: ScheduleRunJob) -> Result<(), String> {
191 self.tx
192 .send(job)
193 .await
194 .map_err(|_| "schedule manager is not running".to_string())
195 }
196}
197
198async fn run_schedule_job(
199 ctx: ScheduleContext,
200 job: ScheduleRunJob,
201) -> Result<ScheduleRunLifecycleResult, String> {
202 let resolved = (ctx.resolve_run_config)(&job);
203
204 if resolved.model.trim().is_empty() {
206 tracing::warn!(
207 "[schedule:{}] skipping run: resolved model is empty",
208 job.schedule_id
209 );
210 return Ok(ScheduleRunLifecycleResult::Terminal(
211 ScheduleRunStatus::Skipped,
212 ));
213 }
214
215 let requested_model = job
216 .run_config
217 .model
218 .as_deref()
219 .map(str::trim)
220 .filter(|v| !v.is_empty())
221 .map(|v| v.to_string());
222 let requested_reasoning_effort = job.run_config.reasoning_effort;
223
224 let mut session = super::session_factory::create_schedule_session(
225 &job,
226 &resolved.model,
227 &resolved.system_prompt,
228 &resolved.base_system_prompt,
229 resolved.workspace_path.as_deref(),
230 resolved.reasoning_effort,
231 );
232 let session_id = session.id.clone();
233
234 ctx.persistence
236 .merge_save_runtime(&mut session)
237 .await
238 .map_err(|e| format!("failed to save scheduled session: {e}"))?;
239 if let Err(error) = ctx
240 .schedule_store
241 .bind_run_session(&job.schedule_id, &job.run_id, &session_id)
242 .await
243 {
244 tracing::warn!(
245 "failed to bind session {} to schedule run {} / {}: {}",
246 session_id,
247 job.schedule_id,
248 job.run_id,
249 error
250 );
251 }
252 {
253 let mut sessions = ctx.sessions_cache.write().await;
254 sessions.insert(session_id.clone(), session.clone());
255 }
256
257 let should_execute = job.run_config.auto_execute
259 && session
260 .messages
261 .last()
262 .map(|m| matches!(m.role, Role::User))
263 .unwrap_or(false);
264
265 tracing::info!(
266 "[schedule:{}] created session {} (auto_execute={}, model={}, model_source={}, reasoning_effort={}, reasoning_source={})",
267 job.schedule_id,
268 session_id,
269 job.run_config.auto_execute,
270 resolved.model,
271 if requested_model.is_some() {
272 "schedule.run_config.model"
273 } else {
274 "resolved"
275 },
276 resolved.reasoning_effort.map(|value| value.as_str()).unwrap_or("none"),
277 if requested_reasoning_effort.is_some() {
278 "schedule.run_config.reasoning_effort"
279 } else {
280 "resolved"
281 }
282 );
283 if !should_execute {
284 return Ok(ScheduleRunLifecycleResult::Terminal(
285 ScheduleRunStatus::Success,
286 ));
287 }
288
289 if resolved.model.trim().is_empty() {
291 let msg = "resolved model is empty".to_string();
292 session.add_message(Message::assistant(format!("❌ {msg}"), None));
293 let _ = ctx.persistence.merge_save_runtime(&mut session).await;
294 return Err(msg);
295 }
296
297 let session_tx = get_or_create_event_sender(&ctx.session_event_senders, &session_id).await;
298 let schedule_id_for_log = job.schedule_id.clone();
299 let run_id_for_log = job.run_id.clone();
300
301 let Some(RunnerReservation { cancel_token, .. }) =
303 try_reserve_runner(&ctx.agent_runners, &session_id, &session_tx).await
304 else {
305 return Ok(ScheduleRunLifecycleResult::Terminal(
306 ScheduleRunStatus::Skipped,
307 ));
308 };
309
310 let (mpsc_tx, _forwarder_handle) = create_event_forwarder(
311 session_id.clone(),
312 session_tx.clone(),
313 ctx.agent_runners.clone(),
314 );
315
316 let agent_runtime = ctx.agent.clone();
318 let tools = ctx.tools.clone();
319 let schedule_store = ctx.schedule_store.clone();
320 let persistence = ctx.persistence.clone();
321 let session_id_clone = session_id.clone();
322 let schedule_id_for_state = job.schedule_id.clone();
323 let run_id_for_state = job.run_id.clone();
324 let agent_runners_for_status = ctx.agent_runners.clone();
325 let sessions_cache = ctx.sessions_cache.clone();
326 let model = resolved.model.clone();
327 let reasoning_effort = resolved.reasoning_effort;
328
329 tokio::spawn(async move {
330 let initial_message = session
331 .messages
332 .last()
333 .filter(|m| matches!(m.role, Role::User))
334 .map(|m| m.content.clone())
335 .unwrap_or_default();
336
337 let result = agent_runtime
338 .execute(
339 &mut session,
340 ExecuteRequest {
341 initial_message,
342 event_tx: mpsc_tx,
343 cancel_token,
344 tools: Some(tools),
345 provider_override: None,
346 model: Some(model.clone()),
347 provider_name: None,
348 background_model: None,
349 background_model_provider: None,
350 reasoning_effort,
351 disabled_tools: None,
352 disabled_skill_ids: None,
353 selected_skill_ids: None,
354 selected_skill_mode: None,
355 image_fallback: None,
356 app_data_dir: ctx.app_data_dir.clone(),
357 },
358 )
359 .await;
360
361 let terminal_status = if let Err(ref e) = result {
362 session.add_message(Message::assistant(
365 format!("❌ Scheduled run failed: {e}"),
366 None,
367 ));
368 tracing::warn!(
369 "[schedule:{}][run:{}][session:{}] scheduled run failed: {}",
370 schedule_id_for_log,
371 run_id_for_log,
372 session_id_clone,
373 e
374 );
375 if e.to_string().contains("cancelled") {
376 ScheduleRunStatus::Cancelled
377 } else {
378 ScheduleRunStatus::Failed
379 }
380 } else {
381 tracing::info!(
382 "[schedule:{}][run:{}][session:{}] scheduled run completed",
383 schedule_id_for_log,
384 run_id_for_log,
385 session_id_clone
386 );
387 ScheduleRunStatus::Success
388 };
389
390 if let Err(error) = schedule_store
391 .mark_run_terminal(
392 &schedule_id_for_state,
393 &run_id_for_state,
394 terminal_status,
395 None,
396 )
397 .await
398 {
399 tracing::warn!(
400 "failed to mark schedule run terminal state for {} / {}: {}",
401 schedule_id_for_state,
402 run_id_for_state,
403 error
404 );
405 }
406
407 finalize_runner(&agent_runners_for_status, &session_id_clone, &result).await;
408
409 let _ = persistence.merge_save_runtime(&mut session).await;
410 {
411 let mut sessions = sessions_cache.write().await;
412 sessions.insert(session_id_clone.clone(), session);
413 }
414 });
415
416 Ok(ScheduleRunLifecycleResult::BackgroundExecutionInProgress)
417}