1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use chrono::Utc;
6use tokio::sync::{broadcast, mpsc, RwLock};
7
8use bamboo_agent_core::tools::ToolExecutor;
9use bamboo_agent_core::{AgentEvent, Message, Role, Session};
10use bamboo_domain::reasoning::ReasoningEffort;
11use bamboo_engine::execution::{
12 create_event_forwarder, finalize_runner, get_or_create_event_sender, try_reserve_runner,
13 AgentRunner, RunnerReservation,
14};
15use bamboo_engine::ExecuteRequest;
16use bamboo_infrastructure::LockedSessionStore;
17
18use super::store::{ClaimedScheduleRun, ScheduleStore};
19use super::trigger_engine::DynTriggerEngine;
20use bamboo_domain::{ScheduleRunConfig, ScheduleRunStatus};
21
22#[derive(Debug, Clone)]
23pub struct ScheduleRunJob {
24 pub run_id: String,
25 pub schedule_id: String,
26 pub schedule_name: String,
27 pub run_config: ScheduleRunConfig,
28 pub scheduled_for: chrono::DateTime<chrono::Utc>,
29 pub claimed_at: chrono::DateTime<chrono::Utc>,
30 pub was_catch_up: bool,
31}
32
33#[derive(Clone)]
39pub struct ResolvedRunConfig {
40 pub model: String,
41 pub provider_name: Option<String>,
42 pub provider_type: Option<String>,
43 pub fast_model: Option<String>,
44 pub fast_model_provider: Option<Arc<dyn bamboo_infrastructure::LLMProvider>>,
45 pub background_model: Option<String>,
46 pub background_model_provider: Option<Arc<dyn bamboo_infrastructure::LLMProvider>>,
47 pub summarization_model: Option<String>,
48 pub summarization_model_provider: Option<Arc<dyn bamboo_infrastructure::LLMProvider>>,
49 pub reasoning_effort: Option<ReasoningEffort>,
50 pub system_prompt: String,
51 pub base_system_prompt: String,
52 pub workspace_path: Option<String>,
53}
54
55#[derive(Clone)]
56pub struct ScheduleContext {
57 pub schedule_store: Arc<ScheduleStore>,
58 pub agent: Arc<bamboo_engine::Agent>,
59 pub persistence: Arc<LockedSessionStore>,
60 pub tools: Arc<dyn ToolExecutor>,
61 pub sessions_cache: Arc<RwLock<HashMap<String, Session>>>,
62 pub agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
63 pub session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
64 pub app_data_dir: Option<std::path::PathBuf>,
65 pub trigger_engine: DynTriggerEngine,
66 pub resolve_run_config: Arc<dyn Fn(&ScheduleRunJob) -> ResolvedRunConfig + Send + Sync>,
69}
70
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72enum ScheduleRunLifecycleResult {
73 Terminal(ScheduleRunStatus),
74 BackgroundExecutionInProgress,
75}
76
77#[derive(Clone)]
78pub struct ScheduleManager {
79 tx: mpsc::Sender<ScheduleRunJob>,
80}
81
82impl ScheduleManager {
83 pub fn new(ctx: ScheduleContext) -> Self {
84 let (tx, mut rx) = mpsc::channel::<ScheduleRunJob>(128);
85
86 tokio::spawn({
88 let ctx = ctx.clone();
89 async move {
90 while let Some(job) = rx.recv().await {
91 if let Err(error) = ctx
92 .schedule_store
93 .mark_run_started(&job.schedule_id, &job.run_id)
94 .await
95 {
96 tracing::warn!(
97 "failed to mark schedule run started for {} / {}: {}",
98 job.schedule_id,
99 job.run_id,
100 error
101 );
102 }
103 let schedule_id = job.schedule_id.clone();
104 let run_id = job.run_id.clone();
105 match run_schedule_job(ctx.clone(), job).await {
106 Ok(ScheduleRunLifecycleResult::Terminal(status)) => {
107 if let Err(error) = ctx
108 .schedule_store
109 .mark_run_terminal(&schedule_id, &run_id, status, None)
110 .await
111 {
112 tracing::warn!(
113 "failed to mark schedule run terminal state for {} / {}: {}",
114 schedule_id,
115 run_id,
116 error
117 );
118 }
119 }
120 Ok(ScheduleRunLifecycleResult::BackgroundExecutionInProgress) => {}
121 Err(e) => {
122 tracing::warn!("schedule job failed: {e}");
123 if let Err(error) = ctx
124 .schedule_store
125 .mark_run_terminal(
126 &schedule_id,
127 &run_id,
128 ScheduleRunStatus::Failed,
129 Some(e.clone()),
130 )
131 .await
132 {
133 tracing::warn!(
134 "failed to mark schedule run failed state for {} / {}: {}",
135 schedule_id,
136 run_id,
137 error
138 );
139 }
140 }
141 }
142 }
143 }
144 });
145
146 tokio::spawn({
148 let tx = tx.clone();
149 let store = ctx.schedule_store.clone();
150 let trigger_engine = ctx.trigger_engine.clone();
151 async move {
152 let mut ticker = tokio::time::interval(Duration::from_secs(15));
153 loop {
154 ticker.tick().await;
155 let now = Utc::now();
156 let claimed: Vec<ClaimedScheduleRun> = match store
157 .claim_due_runs_with_engine(now, trigger_engine.as_ref())
158 .await
159 {
160 Ok(v) => v,
161 Err(e) => {
162 tracing::warn!("claim_due_runs failed: {e}");
163 continue;
164 }
165 };
166 for c in claimed {
167 let schedule_id = c.schedule_id.clone();
168 let run_id = c.run_id.clone();
169 if tx
170 .send(ScheduleRunJob {
171 run_id: c.run_id,
172 schedule_id: c.schedule_id,
173 schedule_name: c.schedule_name,
174 run_config: c.run_config,
175 scheduled_for: c.scheduled_for,
176 claimed_at: c.claimed_at,
177 was_catch_up: c.was_catch_up,
178 })
179 .await
180 .is_err()
181 {
182 let _ = store
183 .mark_run_dequeued_without_start(
184 &schedule_id,
185 &run_id,
186 Some("schedule manager is not running".to_string()),
187 )
188 .await;
189 }
190 }
191 }
192 }
193 });
194
195 Self { tx }
196 }
197
198 pub async fn enqueue_run_now(&self, job: ScheduleRunJob) -> Result<(), String> {
199 self.tx
200 .send(job)
201 .await
202 .map_err(|_| "schedule manager is not running".to_string())
203 }
204}
205
206async fn run_schedule_job(
207 ctx: ScheduleContext,
208 job: ScheduleRunJob,
209) -> Result<ScheduleRunLifecycleResult, String> {
210 let resolved = (ctx.resolve_run_config)(&job);
211
212 if resolved.model.trim().is_empty() {
214 tracing::warn!(
215 "[schedule:{}] skipping run: resolved model is empty",
216 job.schedule_id
217 );
218 return Ok(ScheduleRunLifecycleResult::Terminal(
219 ScheduleRunStatus::Skipped,
220 ));
221 }
222
223 let requested_model = job
224 .run_config
225 .model
226 .as_deref()
227 .map(str::trim)
228 .filter(|v| !v.is_empty())
229 .map(|v| v.to_string());
230 let requested_reasoning_effort = job.run_config.reasoning_effort;
231
232 let mut session = super::session_factory::create_schedule_session(
233 &job,
234 &resolved.model,
235 &resolved.system_prompt,
236 &resolved.base_system_prompt,
237 resolved.workspace_path.as_deref(),
238 resolved.reasoning_effort,
239 );
240 let session_id = session.id.clone();
241
242 ctx.persistence
244 .merge_save_runtime(&mut session)
245 .await
246 .map_err(|e| format!("failed to save scheduled session: {e}"))?;
247 if let Err(error) = ctx
248 .schedule_store
249 .bind_run_session(&job.schedule_id, &job.run_id, &session_id)
250 .await
251 {
252 tracing::warn!(
253 "failed to bind session {} to schedule run {} / {}: {}",
254 session_id,
255 job.schedule_id,
256 job.run_id,
257 error
258 );
259 }
260 {
261 let mut sessions = ctx.sessions_cache.write().await;
262 sessions.insert(session_id.clone(), session.clone());
263 }
264
265 let should_execute = job.run_config.auto_execute
267 && session
268 .messages
269 .last()
270 .map(|m| matches!(m.role, Role::User))
271 .unwrap_or(false);
272
273 tracing::info!(
274 "[schedule:{}] created session {} (auto_execute={}, model={}, model_source={}, reasoning_effort={}, reasoning_source={})",
275 job.schedule_id,
276 session_id,
277 job.run_config.auto_execute,
278 resolved.model,
279 if requested_model.is_some() {
280 "schedule.run_config.model"
281 } else {
282 "resolved"
283 },
284 resolved.reasoning_effort.map(|value| value.as_str()).unwrap_or("none"),
285 if requested_reasoning_effort.is_some() {
286 "schedule.run_config.reasoning_effort"
287 } else {
288 "resolved"
289 }
290 );
291 if !should_execute {
292 return Ok(ScheduleRunLifecycleResult::Terminal(
293 ScheduleRunStatus::Success,
294 ));
295 }
296
297 if resolved.model.trim().is_empty() {
299 let msg = "resolved model is empty".to_string();
300 session.add_message(Message::assistant(format!("❌ {msg}"), None));
301 let _ = ctx.persistence.merge_save_runtime(&mut session).await;
302 return Err(msg);
303 }
304
305 let session_tx = get_or_create_event_sender(&ctx.session_event_senders, &session_id).await;
306 let schedule_id_for_log = job.schedule_id.clone();
307 let run_id_for_log = job.run_id.clone();
308
309 let Some(RunnerReservation { cancel_token, .. }) =
311 try_reserve_runner(&ctx.agent_runners, &session_id, &session_tx).await
312 else {
313 return Ok(ScheduleRunLifecycleResult::Terminal(
314 ScheduleRunStatus::Skipped,
315 ));
316 };
317
318 let (mpsc_tx, _forwarder_handle) = create_event_forwarder(
319 session_id.clone(),
320 session_tx.clone(),
321 ctx.agent_runners.clone(),
322 );
323
324 let agent_runtime = ctx.agent.clone();
326 let tools = ctx.tools.clone();
327 let schedule_store = ctx.schedule_store.clone();
328 let persistence = ctx.persistence.clone();
329 let session_id_clone = session_id.clone();
330 let schedule_id_for_state = job.schedule_id.clone();
331 let run_id_for_state = job.run_id.clone();
332 let agent_runners_for_status = ctx.agent_runners.clone();
333 let sessions_cache = ctx.sessions_cache.clone();
334 let model = resolved.model.clone();
335 let provider_name = resolved.provider_name.clone();
336 let provider_type = resolved.provider_type.clone();
337 let fast_model = resolved.fast_model.clone();
338 let fast_model_provider = resolved.fast_model_provider.clone();
339 let background_model = resolved.background_model.clone();
340 let background_model_provider = resolved.background_model_provider.clone();
341 let summarization_model = resolved.summarization_model.clone();
342 let summarization_model_provider = resolved.summarization_model_provider.clone();
343 let reasoning_effort = resolved.reasoning_effort;
344
345 tokio::spawn(async move {
346 let initial_message = session
347 .messages
348 .last()
349 .filter(|m| matches!(m.role, Role::User))
350 .map(|m| m.content.clone())
351 .unwrap_or_default();
352
353 let result = agent_runtime
354 .execute(
355 &mut session,
356 ExecuteRequest {
357 initial_message,
358 event_tx: mpsc_tx,
359 cancel_token,
360 tools: Some(tools),
361 provider_override: None,
362 model: Some(model.clone()),
363 provider_name,
364 provider_type,
365 fast_model,
366 fast_model_provider,
367 background_model,
368 background_model_provider,
369 summarization_model,
370 summarization_model_provider,
371 reasoning_effort,
372 disabled_tools: None,
373 disabled_skill_ids: None,
374 selected_skill_ids: None,
375 selected_skill_mode: None,
376 image_fallback: None,
377 app_data_dir: ctx.app_data_dir.clone(),
378 },
379 )
380 .await;
381
382 let terminal_status = if let Err(ref e) = result {
383 session.add_message(Message::assistant(
386 format!("❌ Scheduled run failed: {e}"),
387 None,
388 ));
389 tracing::warn!(
390 "[schedule:{}][run:{}][session:{}] scheduled run failed: {}",
391 schedule_id_for_log,
392 run_id_for_log,
393 session_id_clone,
394 e
395 );
396 if e.to_string().contains("cancelled") {
397 ScheduleRunStatus::Cancelled
398 } else {
399 ScheduleRunStatus::Failed
400 }
401 } else {
402 tracing::info!(
403 "[schedule:{}][run:{}][session:{}] scheduled run completed",
404 schedule_id_for_log,
405 run_id_for_log,
406 session_id_clone
407 );
408 ScheduleRunStatus::Success
409 };
410
411 if let Err(error) = schedule_store
412 .mark_run_terminal(
413 &schedule_id_for_state,
414 &run_id_for_state,
415 terminal_status,
416 None,
417 )
418 .await
419 {
420 tracing::warn!(
421 "failed to mark schedule run terminal state for {} / {}: {}",
422 schedule_id_for_state,
423 run_id_for_state,
424 error
425 );
426 }
427
428 finalize_runner(&agent_runners_for_status, &session_id_clone, &result).await;
429
430 let _ = persistence.merge_save_runtime(&mut session).await;
431 {
432 let mut sessions = sessions_cache.write().await;
433 sessions.insert(session_id_clone.clone(), session);
434 }
435 });
436
437 Ok(ScheduleRunLifecycleResult::BackgroundExecutionInProgress)
438}