Skip to main content

bamboo_server/schedule_app/
manager.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use chrono::Utc;
6use tokio::sync::{broadcast, mpsc, RwLock};
7
8use bamboo_agent_core::tools::ToolExecutor;
9use bamboo_agent_core::{AgentEvent, Message, Role, Session};
10use bamboo_domain::reasoning::ReasoningEffort;
11use bamboo_engine::execution::{
12    create_event_forwarder, finalize_runner, get_or_create_event_sender, try_reserve_runner,
13    AgentRunner, RunnerReservation,
14};
15use bamboo_engine::ExecuteRequest;
16use bamboo_infrastructure::LockedSessionStore;
17
18use super::store::{ClaimedScheduleRun, ScheduleStore};
19use super::trigger_engine::DynTriggerEngine;
20use bamboo_domain::{ScheduleRunConfig, ScheduleRunStatus};
21
22#[derive(Debug, Clone)]
23pub struct ScheduleRunJob {
24    pub run_id: String,
25    pub schedule_id: String,
26    pub schedule_name: String,
27    pub run_config: ScheduleRunConfig,
28    pub scheduled_for: chrono::DateTime<chrono::Utc>,
29    pub claimed_at: chrono::DateTime<chrono::Utc>,
30    pub was_catch_up: bool,
31}
32
33/// Resolved run configuration computed by the adapter layer.
34///
35/// The schedule crate delegates model/prompt/workspace resolution to the
36/// caller via [`ScheduleContext::resolve_run_config`] so that server-specific
37/// concerns (Config, filesystem prompt templates) stay out of the crate.
38#[derive(Clone)]
39pub struct ResolvedRunConfig {
40    pub model: String,
41    pub provider_name: Option<String>,
42    pub provider_type: Option<String>,
43    pub fast_model: Option<String>,
44    pub fast_model_provider: Option<Arc<dyn bamboo_infrastructure::LLMProvider>>,
45    pub background_model: Option<String>,
46    pub background_model_provider: Option<Arc<dyn bamboo_infrastructure::LLMProvider>>,
47    pub summarization_model: Option<String>,
48    pub summarization_model_provider: Option<Arc<dyn bamboo_infrastructure::LLMProvider>>,
49    pub reasoning_effort: Option<ReasoningEffort>,
50    pub system_prompt: String,
51    pub base_system_prompt: String,
52    pub workspace_path: Option<String>,
53}
54
55#[derive(Clone)]
56pub struct ScheduleContext {
57    pub schedule_store: Arc<ScheduleStore>,
58    pub agent: Arc<bamboo_engine::Agent>,
59    pub persistence: Arc<LockedSessionStore>,
60    pub tools: Arc<dyn ToolExecutor>,
61    pub sessions_cache: Arc<RwLock<HashMap<String, Session>>>,
62    pub agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
63    pub session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
64    pub app_data_dir: Option<std::path::PathBuf>,
65    pub trigger_engine: DynTriggerEngine,
66    /// Adapter-provided callback that resolves model, system prompt, workspace path
67    /// and reasoning effort for a schedule run job.
68    pub resolve_run_config: Arc<dyn Fn(&ScheduleRunJob) -> ResolvedRunConfig + Send + Sync>,
69}
70
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72enum ScheduleRunLifecycleResult {
73    Terminal(ScheduleRunStatus),
74    BackgroundExecutionInProgress,
75}
76
77#[derive(Clone)]
78pub struct ScheduleManager {
79    tx: mpsc::Sender<ScheduleRunJob>,
80}
81
82impl ScheduleManager {
83    pub fn new(ctx: ScheduleContext) -> Self {
84        let (tx, mut rx) = mpsc::channel::<ScheduleRunJob>(128);
85
86        // Worker: executes jobs sequentially (simple + predictable).
87        tokio::spawn({
88            let ctx = ctx.clone();
89            async move {
90                while let Some(job) = rx.recv().await {
91                    if let Err(error) = ctx
92                        .schedule_store
93                        .mark_run_started(&job.schedule_id, &job.run_id)
94                        .await
95                    {
96                        tracing::warn!(
97                            "failed to mark schedule run started for {} / {}: {}",
98                            job.schedule_id,
99                            job.run_id,
100                            error
101                        );
102                    }
103                    let schedule_id = job.schedule_id.clone();
104                    let run_id = job.run_id.clone();
105                    match run_schedule_job(ctx.clone(), job).await {
106                        Ok(ScheduleRunLifecycleResult::Terminal(status)) => {
107                            if let Err(error) = ctx
108                                .schedule_store
109                                .mark_run_terminal(&schedule_id, &run_id, status, None)
110                                .await
111                            {
112                                tracing::warn!(
113                                    "failed to mark schedule run terminal state for {} / {}: {}",
114                                    schedule_id,
115                                    run_id,
116                                    error
117                                );
118                            }
119                        }
120                        Ok(ScheduleRunLifecycleResult::BackgroundExecutionInProgress) => {}
121                        Err(e) => {
122                            tracing::warn!("schedule job failed: {e}");
123                            if let Err(error) = ctx
124                                .schedule_store
125                                .mark_run_terminal(
126                                    &schedule_id,
127                                    &run_id,
128                                    ScheduleRunStatus::Failed,
129                                    Some(e.clone()),
130                                )
131                                .await
132                            {
133                                tracing::warn!(
134                                    "failed to mark schedule run failed state for {} / {}: {}",
135                                    schedule_id,
136                                    run_id,
137                                    error
138                                );
139                            }
140                        }
141                    }
142                }
143            }
144        });
145
146        // Ticker: claims due schedules and enqueues jobs.
147        tokio::spawn({
148            let tx = tx.clone();
149            let store = ctx.schedule_store.clone();
150            let trigger_engine = ctx.trigger_engine.clone();
151            async move {
152                let mut ticker = tokio::time::interval(Duration::from_secs(15));
153                loop {
154                    ticker.tick().await;
155                    let now = Utc::now();
156                    let claimed: Vec<ClaimedScheduleRun> = match store
157                        .claim_due_runs_with_engine(now, trigger_engine.as_ref())
158                        .await
159                    {
160                        Ok(v) => v,
161                        Err(e) => {
162                            tracing::warn!("claim_due_runs failed: {e}");
163                            continue;
164                        }
165                    };
166                    for c in claimed {
167                        let schedule_id = c.schedule_id.clone();
168                        let run_id = c.run_id.clone();
169                        if tx
170                            .send(ScheduleRunJob {
171                                run_id: c.run_id,
172                                schedule_id: c.schedule_id,
173                                schedule_name: c.schedule_name,
174                                run_config: c.run_config,
175                                scheduled_for: c.scheduled_for,
176                                claimed_at: c.claimed_at,
177                                was_catch_up: c.was_catch_up,
178                            })
179                            .await
180                            .is_err()
181                        {
182                            let _ = store
183                                .mark_run_dequeued_without_start(
184                                    &schedule_id,
185                                    &run_id,
186                                    Some("schedule manager is not running".to_string()),
187                                )
188                                .await;
189                        }
190                    }
191                }
192            }
193        });
194
195        Self { tx }
196    }
197
198    pub async fn enqueue_run_now(&self, job: ScheduleRunJob) -> Result<(), String> {
199        self.tx
200            .send(job)
201            .await
202            .map_err(|_| "schedule manager is not running".to_string())
203    }
204}
205
206async fn run_schedule_job(
207    ctx: ScheduleContext,
208    job: ScheduleRunJob,
209) -> Result<ScheduleRunLifecycleResult, String> {
210    let resolved = (ctx.resolve_run_config)(&job);
211
212    // If the adapter resolved an empty model, skip the run.
213    if resolved.model.trim().is_empty() {
214        tracing::warn!(
215            "[schedule:{}] skipping run: resolved model is empty",
216            job.schedule_id
217        );
218        return Ok(ScheduleRunLifecycleResult::Terminal(
219            ScheduleRunStatus::Skipped,
220        ));
221    }
222
223    let requested_model = job
224        .run_config
225        .model
226        .as_deref()
227        .map(str::trim)
228        .filter(|v| !v.is_empty())
229        .map(|v| v.to_string());
230    let requested_reasoning_effort = job.run_config.reasoning_effort;
231
232    let mut session = super::session_factory::create_schedule_session(
233        &job,
234        &resolved.model,
235        &resolved.system_prompt,
236        &resolved.base_system_prompt,
237        resolved.workspace_path.as_deref(),
238        resolved.reasoning_effort,
239    );
240    let session_id = session.id.clone();
241
242    // Persist session and index entry.
243    ctx.persistence
244        .merge_save_runtime(&mut session)
245        .await
246        .map_err(|e| format!("failed to save scheduled session: {e}"))?;
247    if let Err(error) = ctx
248        .schedule_store
249        .bind_run_session(&job.schedule_id, &job.run_id, &session_id)
250        .await
251    {
252        tracing::warn!(
253            "failed to bind session {} to schedule run {} / {}: {}",
254            session_id,
255            job.schedule_id,
256            job.run_id,
257            error
258        );
259    }
260    {
261        let mut sessions = ctx.sessions_cache.write().await;
262        sessions.insert(session_id.clone(), session.clone());
263    }
264
265    // If no task message (or not configured to execute), we're done.
266    let should_execute = job.run_config.auto_execute
267        && session
268            .messages
269            .last()
270            .map(|m| matches!(m.role, Role::User))
271            .unwrap_or(false);
272
273    tracing::info!(
274        "[schedule:{}] created session {} (auto_execute={}, model={}, model_source={}, reasoning_effort={}, reasoning_source={})",
275        job.schedule_id,
276        session_id,
277        job.run_config.auto_execute,
278        resolved.model,
279        if requested_model.is_some() {
280            "schedule.run_config.model"
281        } else {
282            "resolved"
283        },
284        resolved.reasoning_effort.map(|value| value.as_str()).unwrap_or("none"),
285        if requested_reasoning_effort.is_some() {
286            "schedule.run_config.reasoning_effort"
287        } else {
288            "resolved"
289        }
290    );
291    if !should_execute {
292        return Ok(ScheduleRunLifecycleResult::Terminal(
293            ScheduleRunStatus::Success,
294        ));
295    }
296
297    // Model is required by the provider trait; if resolution failed we'd have returned earlier.
298    if resolved.model.trim().is_empty() {
299        let msg = "resolved model is empty".to_string();
300        session.add_message(Message::assistant(format!("❌ {msg}"), None));
301        let _ = ctx.persistence.merge_save_runtime(&mut session).await;
302        return Err(msg);
303    }
304
305    let session_tx = get_or_create_event_sender(&ctx.session_event_senders, &session_id).await;
306    let schedule_id_for_log = job.schedule_id.clone();
307    let run_id_for_log = job.run_id.clone();
308
309    // Insert runner status (for cancellation/status introspection).
310    let Some(RunnerReservation { cancel_token, .. }) =
311        try_reserve_runner(&ctx.agent_runners, &session_id, &session_tx).await
312    else {
313        return Ok(ScheduleRunLifecycleResult::Terminal(
314            ScheduleRunStatus::Skipped,
315        ));
316    };
317
318    let (mpsc_tx, _forwarder_handle) = create_event_forwarder(
319        session_id.clone(),
320        session_tx.clone(),
321        ctx.agent_runners.clone(),
322    );
323
324    // Run agent loop in background.
325    let agent_runtime = ctx.agent.clone();
326    let tools = ctx.tools.clone();
327    let schedule_store = ctx.schedule_store.clone();
328    let persistence = ctx.persistence.clone();
329    let session_id_clone = session_id.clone();
330    let schedule_id_for_state = job.schedule_id.clone();
331    let run_id_for_state = job.run_id.clone();
332    let agent_runners_for_status = ctx.agent_runners.clone();
333    let sessions_cache = ctx.sessions_cache.clone();
334    let model = resolved.model.clone();
335    let provider_name = resolved.provider_name.clone();
336    let provider_type = resolved.provider_type.clone();
337    let fast_model = resolved.fast_model.clone();
338    let fast_model_provider = resolved.fast_model_provider.clone();
339    let background_model = resolved.background_model.clone();
340    let background_model_provider = resolved.background_model_provider.clone();
341    let summarization_model = resolved.summarization_model.clone();
342    let summarization_model_provider = resolved.summarization_model_provider.clone();
343    let reasoning_effort = resolved.reasoning_effort;
344
345    tokio::spawn(async move {
346        let initial_message = session
347            .messages
348            .last()
349            .filter(|m| matches!(m.role, Role::User))
350            .map(|m| m.content.clone())
351            .unwrap_or_default();
352
353        let result = agent_runtime
354            .execute(
355                &mut session,
356                ExecuteRequest {
357                    initial_message,
358                    event_tx: mpsc_tx,
359                    cancel_token,
360                    tools: Some(tools),
361                    provider_override: None,
362                    model: Some(model.clone()),
363                    provider_name,
364                    provider_type,
365                    fast_model,
366                    fast_model_provider,
367                    background_model,
368                    background_model_provider,
369                    summarization_model,
370                    summarization_model_provider,
371                    reasoning_effort,
372                    disabled_tools: None,
373                    disabled_skill_ids: None,
374                    selected_skill_ids: None,
375                    selected_skill_mode: None,
376                    image_fallback: None,
377                    app_data_dir: ctx.app_data_dir.clone(),
378                },
379            )
380            .await;
381
382        let terminal_status = if let Err(ref e) = result {
383            // Persist a visible failure marker so the user can open the scheduled session
384            // and understand why it didn't produce output.
385            session.add_message(Message::assistant(
386                format!("❌ Scheduled run failed: {e}"),
387                None,
388            ));
389            tracing::warn!(
390                "[schedule:{}][run:{}][session:{}] scheduled run failed: {}",
391                schedule_id_for_log,
392                run_id_for_log,
393                session_id_clone,
394                e
395            );
396            if e.to_string().contains("cancelled") {
397                ScheduleRunStatus::Cancelled
398            } else {
399                ScheduleRunStatus::Failed
400            }
401        } else {
402            tracing::info!(
403                "[schedule:{}][run:{}][session:{}] scheduled run completed",
404                schedule_id_for_log,
405                run_id_for_log,
406                session_id_clone
407            );
408            ScheduleRunStatus::Success
409        };
410
411        if let Err(error) = schedule_store
412            .mark_run_terminal(
413                &schedule_id_for_state,
414                &run_id_for_state,
415                terminal_status,
416                None,
417            )
418            .await
419        {
420            tracing::warn!(
421                "failed to mark schedule run terminal state for {} / {}: {}",
422                schedule_id_for_state,
423                run_id_for_state,
424                error
425            );
426        }
427
428        finalize_runner(&agent_runners_for_status, &session_id_clone, &result).await;
429
430        let _ = persistence.merge_save_runtime(&mut session).await;
431        {
432            let mut sessions = sessions_cache.write().await;
433            sessions.insert(session_id_clone.clone(), session);
434        }
435    });
436
437    Ok(ScheduleRunLifecycleResult::BackgroundExecutionInProgress)
438}