bamboo_server/schedule_app/
manager.rs1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use chrono::Utc;
6use tokio::sync::{broadcast, mpsc, RwLock};
7
8use bamboo_agent_core::tools::ToolExecutor;
9use bamboo_agent_core::{AgentEvent, Message, Role, Session};
10use bamboo_domain::reasoning::ReasoningEffort;
11use bamboo_engine::execution::{
12 create_event_forwarder, finalize_runner, get_or_create_event_sender, try_reserve_runner,
13 AgentRunner,
14};
15use bamboo_engine::ExecuteRequest;
16
17use super::store::{ClaimedScheduleRun, ScheduleStore};
18use super::trigger_engine::DynTriggerEngine;
19use bamboo_domain::{ScheduleRunConfig, ScheduleRunStatus};
20
21#[derive(Debug, Clone)]
22pub struct ScheduleRunJob {
23 pub run_id: String,
24 pub schedule_id: String,
25 pub schedule_name: String,
26 pub run_config: ScheduleRunConfig,
27 pub scheduled_for: chrono::DateTime<chrono::Utc>,
28 pub claimed_at: chrono::DateTime<chrono::Utc>,
29 pub was_catch_up: bool,
30}
31
32#[derive(Debug, Clone)]
38pub struct ResolvedRunConfig {
39 pub model: String,
40 pub reasoning_effort: Option<ReasoningEffort>,
41 pub system_prompt: String,
42 pub base_system_prompt: String,
43 pub workspace_path: Option<String>,
44}
45
46#[derive(Clone)]
47pub struct ScheduleContext {
48 pub schedule_store: Arc<ScheduleStore>,
49 pub agent: Arc<bamboo_engine::Agent>,
50 pub tools: Arc<dyn ToolExecutor>,
51 pub sessions_cache: Arc<RwLock<HashMap<String, Session>>>,
52 pub agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
53 pub session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
54 pub trigger_engine: DynTriggerEngine,
55 pub resolve_run_config: Arc<dyn Fn(&ScheduleRunJob) -> ResolvedRunConfig + Send + Sync>,
58}
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61enum ScheduleRunLifecycleResult {
62 Terminal(ScheduleRunStatus),
63 BackgroundExecutionInProgress,
64}
65
66#[derive(Clone)]
67pub struct ScheduleManager {
68 tx: mpsc::Sender<ScheduleRunJob>,
69}
70
71impl ScheduleManager {
72 pub fn new(ctx: ScheduleContext) -> Self {
73 let (tx, mut rx) = mpsc::channel::<ScheduleRunJob>(128);
74
75 tokio::spawn({
77 let ctx = ctx.clone();
78 async move {
79 while let Some(job) = rx.recv().await {
80 if let Err(error) = ctx
81 .schedule_store
82 .mark_run_started(&job.schedule_id, &job.run_id)
83 .await
84 {
85 tracing::warn!(
86 "failed to mark schedule run started for {} / {}: {}",
87 job.schedule_id,
88 job.run_id,
89 error
90 );
91 }
92 let schedule_id = job.schedule_id.clone();
93 let run_id = job.run_id.clone();
94 match run_schedule_job(ctx.clone(), job).await {
95 Ok(ScheduleRunLifecycleResult::Terminal(status)) => {
96 if let Err(error) = ctx
97 .schedule_store
98 .mark_run_terminal(&schedule_id, &run_id, status, None)
99 .await
100 {
101 tracing::warn!(
102 "failed to mark schedule run terminal state for {} / {}: {}",
103 schedule_id,
104 run_id,
105 error
106 );
107 }
108 }
109 Ok(ScheduleRunLifecycleResult::BackgroundExecutionInProgress) => {}
110 Err(e) => {
111 tracing::warn!("schedule job failed: {e}");
112 if let Err(error) = ctx
113 .schedule_store
114 .mark_run_terminal(
115 &schedule_id,
116 &run_id,
117 ScheduleRunStatus::Failed,
118 Some(e.clone()),
119 )
120 .await
121 {
122 tracing::warn!(
123 "failed to mark schedule run failed state for {} / {}: {}",
124 schedule_id,
125 run_id,
126 error
127 );
128 }
129 }
130 }
131 }
132 }
133 });
134
135 tokio::spawn({
137 let tx = tx.clone();
138 let store = ctx.schedule_store.clone();
139 let trigger_engine = ctx.trigger_engine.clone();
140 async move {
141 let mut ticker = tokio::time::interval(Duration::from_secs(15));
142 loop {
143 ticker.tick().await;
144 let now = Utc::now();
145 let claimed: Vec<ClaimedScheduleRun> = match store
146 .claim_due_runs_with_engine(now, trigger_engine.as_ref())
147 .await
148 {
149 Ok(v) => v,
150 Err(e) => {
151 tracing::warn!("claim_due_runs failed: {e}");
152 continue;
153 }
154 };
155 for c in claimed {
156 let schedule_id = c.schedule_id.clone();
157 let run_id = c.run_id.clone();
158 if tx
159 .send(ScheduleRunJob {
160 run_id: c.run_id,
161 schedule_id: c.schedule_id,
162 schedule_name: c.schedule_name,
163 run_config: c.run_config,
164 scheduled_for: c.scheduled_for,
165 claimed_at: c.claimed_at,
166 was_catch_up: c.was_catch_up,
167 })
168 .await
169 .is_err()
170 {
171 let _ = store
172 .mark_run_dequeued_without_start(
173 &schedule_id,
174 &run_id,
175 Some("schedule manager is not running".to_string()),
176 )
177 .await;
178 }
179 }
180 }
181 }
182 });
183
184 Self { tx }
185 }
186
187 pub async fn enqueue_run_now(&self, job: ScheduleRunJob) -> Result<(), String> {
188 self.tx
189 .send(job)
190 .await
191 .map_err(|_| "schedule manager is not running".to_string())
192 }
193}
194
195async fn run_schedule_job(
196 ctx: ScheduleContext,
197 job: ScheduleRunJob,
198) -> Result<ScheduleRunLifecycleResult, String> {
199 let resolved = (ctx.resolve_run_config)(&job);
200
201 if resolved.model.trim().is_empty() {
203 tracing::warn!(
204 "[schedule:{}] skipping run: resolved model is empty",
205 job.schedule_id
206 );
207 return Ok(ScheduleRunLifecycleResult::Terminal(
208 ScheduleRunStatus::Skipped,
209 ));
210 }
211
212 let requested_model = job
213 .run_config
214 .model
215 .as_deref()
216 .map(str::trim)
217 .filter(|v| !v.is_empty())
218 .map(|v| v.to_string());
219 let requested_reasoning_effort = job.run_config.reasoning_effort;
220
221 let mut session = super::session_factory::create_schedule_session(
222 &job,
223 &resolved.model,
224 &resolved.system_prompt,
225 &resolved.base_system_prompt,
226 resolved.workspace_path.as_deref(),
227 resolved.reasoning_effort,
228 );
229 let session_id = session.id.clone();
230
231 ctx.agent
233 .storage()
234 .save_session(&session)
235 .await
236 .map_err(|e| format!("failed to save scheduled session: {e}"))?;
237 if let Err(error) = ctx
238 .schedule_store
239 .bind_run_session(&job.schedule_id, &job.run_id, &session_id)
240 .await
241 {
242 tracing::warn!(
243 "failed to bind session {} to schedule run {} / {}: {}",
244 session_id,
245 job.schedule_id,
246 job.run_id,
247 error
248 );
249 }
250 {
251 let mut sessions = ctx.sessions_cache.write().await;
252 sessions.insert(session_id.clone(), session.clone());
253 }
254
255 let should_execute = job.run_config.auto_execute
257 && session
258 .messages
259 .last()
260 .map(|m| matches!(m.role, Role::User))
261 .unwrap_or(false);
262
263 tracing::info!(
264 "[schedule:{}] created session {} (auto_execute={}, model={}, model_source={}, reasoning_effort={}, reasoning_source={})",
265 job.schedule_id,
266 session_id,
267 job.run_config.auto_execute,
268 resolved.model,
269 if requested_model.is_some() {
270 "schedule.run_config.model"
271 } else {
272 "resolved"
273 },
274 resolved.reasoning_effort.map(|value| value.as_str()).unwrap_or("none"),
275 if requested_reasoning_effort.is_some() {
276 "schedule.run_config.reasoning_effort"
277 } else {
278 "resolved"
279 }
280 );
281 if !should_execute {
282 return Ok(ScheduleRunLifecycleResult::Terminal(
283 ScheduleRunStatus::Success,
284 ));
285 }
286
287 if resolved.model.trim().is_empty() {
289 let msg = "resolved model is empty".to_string();
290 session.add_message(Message::assistant(format!("❌ {msg}"), None));
291 let _ = ctx.agent.storage().save_session(&session).await;
292 return Err(msg);
293 }
294
295 let session_tx = get_or_create_event_sender(&ctx.session_event_senders, &session_id).await;
296 let schedule_id_for_log = job.schedule_id.clone();
297 let run_id_for_log = job.run_id.clone();
298
299 let Some(cancel_token) = try_reserve_runner(&ctx.agent_runners, &session_id, &session_tx).await
301 else {
302 return Ok(ScheduleRunLifecycleResult::Terminal(
303 ScheduleRunStatus::Skipped,
304 ));
305 };
306
307 let (mpsc_tx, _forwarder_handle) = create_event_forwarder(
308 session_id.clone(),
309 session_tx.clone(),
310 ctx.agent_runners.clone(),
311 );
312
313 let agent_runtime = ctx.agent.clone();
315 let tools = ctx.tools.clone();
316 let schedule_store = ctx.schedule_store.clone();
317 let storage = ctx.agent.storage().clone();
318 let session_id_clone = session_id.clone();
319 let schedule_id_for_state = job.schedule_id.clone();
320 let run_id_for_state = job.run_id.clone();
321 let agent_runners_for_status = ctx.agent_runners.clone();
322 let sessions_cache = ctx.sessions_cache.clone();
323 let model = resolved.model.clone();
324 let reasoning_effort = resolved.reasoning_effort;
325
326 tokio::spawn(async move {
327 let initial_message = session
328 .messages
329 .last()
330 .filter(|m| matches!(m.role, Role::User))
331 .map(|m| m.content.clone())
332 .unwrap_or_default();
333
334 let result = agent_runtime
335 .execute(
336 &mut session,
337 ExecuteRequest {
338 initial_message,
339 event_tx: mpsc_tx,
340 cancel_token,
341 tools: Some(tools),
342 provider_override: None,
343 model: Some(model.clone()),
344 provider_name: None,
345 background_model: None,
346 background_model_provider: None,
347 reasoning_effort,
348 disabled_tools: None,
349 disabled_skill_ids: None,
350 selected_skill_ids: None,
351 selected_skill_mode: None,
352 image_fallback: None,
353 },
354 )
355 .await;
356
357 let terminal_status = if let Err(ref e) = result {
358 session.add_message(Message::assistant(
361 format!("❌ Scheduled run failed: {e}"),
362 None,
363 ));
364 tracing::warn!(
365 "[schedule:{}][run:{}][session:{}] scheduled run failed: {}",
366 schedule_id_for_log,
367 run_id_for_log,
368 session_id_clone,
369 e
370 );
371 if e.to_string().contains("cancelled") {
372 ScheduleRunStatus::Cancelled
373 } else {
374 ScheduleRunStatus::Failed
375 }
376 } else {
377 tracing::info!(
378 "[schedule:{}][run:{}][session:{}] scheduled run completed",
379 schedule_id_for_log,
380 run_id_for_log,
381 session_id_clone
382 );
383 ScheduleRunStatus::Success
384 };
385
386 if let Err(error) = schedule_store
387 .mark_run_terminal(
388 &schedule_id_for_state,
389 &run_id_for_state,
390 terminal_status,
391 None,
392 )
393 .await
394 {
395 tracing::warn!(
396 "failed to mark schedule run terminal state for {} / {}: {}",
397 schedule_id_for_state,
398 run_id_for_state,
399 error
400 );
401 }
402
403 finalize_runner(&agent_runners_for_status, &session_id_clone, &result).await;
404
405 let _ = storage.save_session(&session).await;
406 {
407 let mut sessions = sessions_cache.write().await;
408 sessions.insert(session_id_clone.clone(), session);
409 }
410 });
411
412 Ok(ScheduleRunLifecycleResult::BackgroundExecutionInProgress)
413}