1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use chrono::Utc;
6use tokio::sync::{broadcast, mpsc, RwLock};
7
8use bamboo_agent_core::tools::ToolExecutor;
9use bamboo_agent_core::{AgentEvent, Message, Role, Session};
10use bamboo_domain::reasoning::ReasoningEffort;
11use bamboo_engine::config::GoldConfig;
12use bamboo_engine::execution::{
13 create_event_forwarder, finalize_runner, get_or_create_event_sender, try_reserve_runner,
14 AgentRunner, RunnerReservation,
15};
16use bamboo_engine::{AuxiliaryModelConfig, ExecuteRequest};
17use bamboo_infrastructure::LockedSessionStore;
18
19use super::store::{ClaimedScheduleRun, ScheduleStore};
20use super::trigger_engine::DynTriggerEngine;
21use bamboo_domain::{ScheduleRunConfig, ScheduleRunStatus};
22
23#[derive(Debug, Clone)]
24pub struct ScheduleRunJob {
25 pub run_id: String,
26 pub schedule_id: String,
27 pub schedule_name: String,
28 pub run_config: ScheduleRunConfig,
29 pub scheduled_for: chrono::DateTime<chrono::Utc>,
30 pub claimed_at: chrono::DateTime<chrono::Utc>,
31 pub was_catch_up: bool,
32}
33
34#[derive(Clone)]
40pub struct ResolvedRunConfig {
41 pub model: String,
42 pub provider_name: Option<String>,
43 pub provider_type: Option<String>,
44 pub fast_model: Option<String>,
45 pub fast_model_provider: Option<Arc<dyn bamboo_infrastructure::LLMProvider>>,
46 pub background_model: Option<String>,
47 pub background_model_provider: Option<Arc<dyn bamboo_infrastructure::LLMProvider>>,
48 pub summarization_model: Option<String>,
49 pub summarization_model_provider: Option<Arc<dyn bamboo_infrastructure::LLMProvider>>,
50 pub reasoning_effort: Option<ReasoningEffort>,
51 pub gold_config: Option<GoldConfig>,
52 pub system_prompt: String,
53 pub base_system_prompt: String,
54 pub workspace_path: Option<String>,
55}
56
57#[derive(Clone)]
58pub struct ScheduleContext {
59 pub schedule_store: Arc<ScheduleStore>,
60 pub agent: Arc<bamboo_engine::Agent>,
61 pub persistence: Arc<LockedSessionStore>,
62 pub tools: Arc<dyn ToolExecutor>,
63 pub sessions_cache: Arc<RwLock<HashMap<String, Session>>>,
64 pub agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
65 pub session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
66 pub account_feed_inbox: Option<bamboo_engine::execution::AccountFeedInbox>,
68 pub app_data_dir: Option<std::path::PathBuf>,
69 pub trigger_engine: DynTriggerEngine,
70 pub resolve_run_config: Arc<dyn Fn(&ScheduleRunJob) -> ResolvedRunConfig + Send + Sync>,
73}
74
75#[derive(Debug, Clone, Copy, PartialEq, Eq)]
76enum ScheduleRunLifecycleResult {
77 Terminal(ScheduleRunStatus),
78 BackgroundExecutionInProgress,
79}
80
81#[derive(Clone)]
82pub struct ScheduleManager {
83 tx: mpsc::Sender<ScheduleRunJob>,
84}
85
86impl ScheduleManager {
87 pub fn new(ctx: ScheduleContext) -> Self {
88 let (tx, mut rx) = mpsc::channel::<ScheduleRunJob>(128);
89
90 tokio::spawn({
92 let ctx = ctx.clone();
93 async move {
94 while let Some(job) = rx.recv().await {
95 if let Err(error) = ctx
96 .schedule_store
97 .mark_run_started(&job.schedule_id, &job.run_id)
98 .await
99 {
100 tracing::warn!(
101 "failed to mark schedule run started for {} / {}: {}",
102 job.schedule_id,
103 job.run_id,
104 error
105 );
106 }
107 let schedule_id = job.schedule_id.clone();
108 let run_id = job.run_id.clone();
109 match run_schedule_job(ctx.clone(), job).await {
110 Ok(ScheduleRunLifecycleResult::Terminal(status)) => {
111 if let Err(error) = ctx
112 .schedule_store
113 .mark_run_terminal(&schedule_id, &run_id, status, None)
114 .await
115 {
116 tracing::warn!(
117 "failed to mark schedule run terminal state for {} / {}: {}",
118 schedule_id,
119 run_id,
120 error
121 );
122 }
123 }
124 Ok(ScheduleRunLifecycleResult::BackgroundExecutionInProgress) => {}
125 Err(e) => {
126 tracing::warn!("schedule job failed: {e}");
127 if let Err(error) = ctx
128 .schedule_store
129 .mark_run_terminal(
130 &schedule_id,
131 &run_id,
132 ScheduleRunStatus::Failed,
133 Some(e.clone()),
134 )
135 .await
136 {
137 tracing::warn!(
138 "failed to mark schedule run failed state for {} / {}: {}",
139 schedule_id,
140 run_id,
141 error
142 );
143 }
144 }
145 }
146 }
147 }
148 });
149
150 tokio::spawn({
152 let tx = tx.clone();
153 let store = ctx.schedule_store.clone();
154 let trigger_engine = ctx.trigger_engine.clone();
155 async move {
156 let mut ticker = tokio::time::interval(Duration::from_secs(15));
157 loop {
158 ticker.tick().await;
159 let now = Utc::now();
160 let claimed: Vec<ClaimedScheduleRun> = match store
161 .claim_due_runs_with_engine(now, trigger_engine.as_ref())
162 .await
163 {
164 Ok(v) => v,
165 Err(e) => {
166 tracing::warn!("claim_due_runs failed: {e}");
167 continue;
168 }
169 };
170 for c in claimed {
171 let schedule_id = c.schedule_id.clone();
172 let run_id = c.run_id.clone();
173 if tx
174 .send(ScheduleRunJob {
175 run_id: c.run_id,
176 schedule_id: c.schedule_id,
177 schedule_name: c.schedule_name,
178 run_config: c.run_config,
179 scheduled_for: c.scheduled_for,
180 claimed_at: c.claimed_at,
181 was_catch_up: c.was_catch_up,
182 })
183 .await
184 .is_err()
185 {
186 let _ = store
187 .mark_run_dequeued_without_start(
188 &schedule_id,
189 &run_id,
190 Some("schedule manager is not running".to_string()),
191 )
192 .await;
193 }
194 }
195 }
196 }
197 });
198
199 Self { tx }
200 }
201
202 pub async fn enqueue_run_now(&self, job: ScheduleRunJob) -> Result<(), String> {
203 self.tx
204 .send(job)
205 .await
206 .map_err(|_| "schedule manager is not running".to_string())
207 }
208}
209
210async fn run_schedule_job(
211 ctx: ScheduleContext,
212 job: ScheduleRunJob,
213) -> Result<ScheduleRunLifecycleResult, String> {
214 let resolved = (ctx.resolve_run_config)(&job);
215
216 if resolved.model.trim().is_empty() {
218 tracing::warn!(
219 "[schedule:{}] skipping run: resolved model is empty",
220 job.schedule_id
221 );
222 return Ok(ScheduleRunLifecycleResult::Terminal(
223 ScheduleRunStatus::Skipped,
224 ));
225 }
226
227 let requested_model = job
228 .run_config
229 .model
230 .as_deref()
231 .map(str::trim)
232 .filter(|v| !v.is_empty())
233 .map(|v| v.to_string());
234 let requested_reasoning_effort = job.run_config.reasoning_effort;
235
236 let mut session = super::session_factory::create_schedule_session(
237 &job,
238 &resolved.model,
239 &resolved.system_prompt,
240 &resolved.base_system_prompt,
241 resolved.workspace_path.as_deref(),
242 resolved.reasoning_effort,
243 );
244 let session_id = session.id.clone();
245
246 ctx.persistence
248 .merge_save_runtime(&mut session)
249 .await
250 .map_err(|e| format!("failed to save scheduled session: {e}"))?;
251 if let Err(error) = ctx
252 .schedule_store
253 .bind_run_session(&job.schedule_id, &job.run_id, &session_id)
254 .await
255 {
256 tracing::warn!(
257 "failed to bind session {} to schedule run {} / {}: {}",
258 session_id,
259 job.schedule_id,
260 job.run_id,
261 error
262 );
263 }
264 {
265 let mut sessions = ctx.sessions_cache.write().await;
266 sessions.insert(session_id.clone(), session.clone());
267 }
268
269 let should_execute = job.run_config.auto_execute
271 && session
272 .messages
273 .last()
274 .map(|m| matches!(m.role, Role::User))
275 .unwrap_or(false);
276
277 tracing::info!(
278 "[schedule:{}] created session {} (auto_execute={}, model={}, model_source={}, reasoning_effort={}, reasoning_source={})",
279 job.schedule_id,
280 session_id,
281 job.run_config.auto_execute,
282 resolved.model,
283 if requested_model.is_some() {
284 "schedule.run_config.model"
285 } else {
286 "resolved"
287 },
288 resolved.reasoning_effort.map(|value| value.as_str()).unwrap_or("none"),
289 if requested_reasoning_effort.is_some() {
290 "schedule.run_config.reasoning_effort"
291 } else {
292 "resolved"
293 }
294 );
295 if !should_execute {
296 return Ok(ScheduleRunLifecycleResult::Terminal(
297 ScheduleRunStatus::Success,
298 ));
299 }
300
301 if resolved.model.trim().is_empty() {
303 let msg = "resolved model is empty".to_string();
304 session.add_message(Message::assistant(format!("❌ {msg}"), None));
305 let _ = ctx.persistence.merge_save_runtime(&mut session).await;
306 return Err(msg);
307 }
308
309 let session_tx = get_or_create_event_sender(&ctx.session_event_senders, &session_id).await;
310 let schedule_id_for_log = job.schedule_id.clone();
311 let run_id_for_log = job.run_id.clone();
312
313 let Some(RunnerReservation { cancel_token, .. }) =
315 try_reserve_runner(&ctx.agent_runners, &session_id, &session_tx).await
316 else {
317 return Ok(ScheduleRunLifecycleResult::Terminal(
318 ScheduleRunStatus::Skipped,
319 ));
320 };
321
322 let (mpsc_tx, _forwarder_handle) = create_event_forwarder(
323 session_id.clone(),
324 session_tx.clone(),
325 ctx.agent_runners.clone(),
326 ctx.account_feed_inbox.clone(),
327 );
328
329 let agent_runtime = ctx.agent.clone();
331 let tools = ctx.tools.clone();
332 let schedule_store = ctx.schedule_store.clone();
333 let persistence = ctx.persistence.clone();
334 let session_id_clone = session_id.clone();
335 let schedule_id_for_state = job.schedule_id.clone();
336 let run_id_for_state = job.run_id.clone();
337 let agent_runners_for_status = ctx.agent_runners.clone();
338 let sessions_cache = ctx.sessions_cache.clone();
339 let model = resolved.model.clone();
340 let provider_name = resolved.provider_name.clone();
341 let provider_type = resolved.provider_type.clone();
342 let fast_model = resolved.fast_model.clone();
343 let fast_model_provider = resolved.fast_model_provider.clone();
344 let background_model = resolved.background_model.clone();
345 let background_model_provider = resolved.background_model_provider.clone();
346 let summarization_model = resolved.summarization_model.clone();
347 let summarization_model_provider = resolved.summarization_model_provider.clone();
348 let reasoning_effort = resolved.reasoning_effort;
349 let gold_config = resolved.gold_config.clone();
350
351 tokio::spawn(async move {
352 let initial_message = session
353 .messages
354 .last()
355 .filter(|m| matches!(m.role, Role::User))
356 .map(|m| m.content.clone())
357 .unwrap_or_default();
358
359 let aux_fast_model = fast_model.clone();
360 let aux_fast_provider = fast_model_provider.clone();
361 let aux_background_model = background_model.clone();
362 let aux_background_provider = background_model_provider.clone();
363 let aux_summarization_model = summarization_model.clone();
364 let aux_summarization_provider = summarization_model_provider.clone();
365 let auxiliary_model_resolver = Arc::new(move || AuxiliaryModelConfig {
366 fast_model_name: aux_fast_model.clone(),
367 fast_model_provider: aux_fast_provider.clone(),
368 background_model_name: aux_background_model.clone(),
369 planning_model_name: None,
370 search_model_name: None,
371 summarization_model_name: aux_summarization_model.clone(),
372 background_model_provider: aux_background_provider.clone(),
373 summarization_model_provider: aux_summarization_provider.clone(),
374 });
375
376 let result = agent_runtime
377 .execute(
378 &mut session,
379 ExecuteRequest {
380 initial_message,
381 event_tx: mpsc_tx,
382 cancel_token,
383 tools: Some(tools),
384 provider_override: None,
385 model: Some(model.clone()),
386 provider_name,
387 provider_type,
388 fast_model,
389 fast_model_provider,
390 background_model,
391 background_model_provider,
392 summarization_model,
393 summarization_model_provider,
394 reasoning_effort,
395 auxiliary_model_resolver: Some(auxiliary_model_resolver),
396 disabled_tools: None,
397 disabled_skill_ids: None,
398 selected_skill_ids: None,
399 selected_skill_mode: None,
400 image_fallback: None,
401 gold_config,
402 app_data_dir: ctx.app_data_dir.clone(),
403 },
404 )
405 .await;
406
407 let terminal_status = if let Err(ref e) = result {
408 session.add_message(Message::assistant(
411 format!("❌ Scheduled run failed: {e}"),
412 None,
413 ));
414 tracing::warn!(
415 "[schedule:{}][run:{}][session:{}] scheduled run failed: {}",
416 schedule_id_for_log,
417 run_id_for_log,
418 session_id_clone,
419 e
420 );
421 if e.to_string().contains("cancelled") {
422 ScheduleRunStatus::Cancelled
423 } else {
424 ScheduleRunStatus::Failed
425 }
426 } else {
427 tracing::info!(
428 "[schedule:{}][run:{}][session:{}] scheduled run completed",
429 schedule_id_for_log,
430 run_id_for_log,
431 session_id_clone
432 );
433 ScheduleRunStatus::Success
434 };
435
436 if let Err(error) = schedule_store
437 .mark_run_terminal(
438 &schedule_id_for_state,
439 &run_id_for_state,
440 terminal_status,
441 None,
442 )
443 .await
444 {
445 tracing::warn!(
446 "failed to mark schedule run terminal state for {} / {}: {}",
447 schedule_id_for_state,
448 run_id_for_state,
449 error
450 );
451 }
452
453 finalize_runner(&agent_runners_for_status, &session_id_clone, &result).await;
454
455 let _ = persistence.merge_save_runtime(&mut session).await;
456 {
457 let mut sessions = sessions_cache.write().await;
458 sessions.insert(session_id_clone.clone(), session);
459 }
460 });
461
462 Ok(ScheduleRunLifecycleResult::BackgroundExecutionInProgress)
463}