Skip to main content

bamboo_server/schedule_app/
manager.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use chrono::Utc;
6use tokio::sync::{broadcast, mpsc, RwLock};
7
8use bamboo_agent_core::tools::ToolExecutor;
9use bamboo_agent_core::{AgentEvent, Message, Role, Session};
10use bamboo_domain::reasoning::ReasoningEffort;
11use bamboo_engine::execution::{
12    create_event_forwarder, finalize_runner, get_or_create_event_sender, try_reserve_runner,
13    AgentRunner, RunnerReservation,
14};
15use bamboo_engine::ExecuteRequest;
16use bamboo_infrastructure::LockedSessionStore;
17
18use super::store::{ClaimedScheduleRun, ScheduleStore};
19use super::trigger_engine::DynTriggerEngine;
20use bamboo_domain::{ScheduleRunConfig, ScheduleRunStatus};
21
22#[derive(Debug, Clone)]
23pub struct ScheduleRunJob {
24    pub run_id: String,
25    pub schedule_id: String,
26    pub schedule_name: String,
27    pub run_config: ScheduleRunConfig,
28    pub scheduled_for: chrono::DateTime<chrono::Utc>,
29    pub claimed_at: chrono::DateTime<chrono::Utc>,
30    pub was_catch_up: bool,
31}
32
33/// Resolved run configuration computed by the adapter layer.
34///
35/// The schedule crate delegates model/prompt/workspace resolution to the
36/// caller via [`ScheduleContext::resolve_run_config`] so that server-specific
37/// concerns (Config, filesystem prompt templates) stay out of the crate.
38#[derive(Debug, Clone)]
39pub struct ResolvedRunConfig {
40    pub model: String,
41    pub reasoning_effort: Option<ReasoningEffort>,
42    pub system_prompt: String,
43    pub base_system_prompt: String,
44    pub workspace_path: Option<String>,
45}
46
47#[derive(Clone)]
48pub struct ScheduleContext {
49    pub schedule_store: Arc<ScheduleStore>,
50    pub agent: Arc<bamboo_engine::Agent>,
51    pub persistence: Arc<LockedSessionStore>,
52    pub tools: Arc<dyn ToolExecutor>,
53    pub sessions_cache: Arc<RwLock<HashMap<String, Session>>>,
54    pub agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
55    pub session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
56    pub app_data_dir: Option<std::path::PathBuf>,
57    pub trigger_engine: DynTriggerEngine,
58    /// Adapter-provided callback that resolves model, system prompt, workspace path
59    /// and reasoning effort for a schedule run job.
60    pub resolve_run_config: Arc<dyn Fn(&ScheduleRunJob) -> ResolvedRunConfig + Send + Sync>,
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64enum ScheduleRunLifecycleResult {
65    Terminal(ScheduleRunStatus),
66    BackgroundExecutionInProgress,
67}
68
69#[derive(Clone)]
70pub struct ScheduleManager {
71    tx: mpsc::Sender<ScheduleRunJob>,
72}
73
74impl ScheduleManager {
75    pub fn new(ctx: ScheduleContext) -> Self {
76        let (tx, mut rx) = mpsc::channel::<ScheduleRunJob>(128);
77
78        // Worker: executes jobs sequentially (simple + predictable).
79        tokio::spawn({
80            let ctx = ctx.clone();
81            async move {
82                while let Some(job) = rx.recv().await {
83                    if let Err(error) = ctx
84                        .schedule_store
85                        .mark_run_started(&job.schedule_id, &job.run_id)
86                        .await
87                    {
88                        tracing::warn!(
89                            "failed to mark schedule run started for {} / {}: {}",
90                            job.schedule_id,
91                            job.run_id,
92                            error
93                        );
94                    }
95                    let schedule_id = job.schedule_id.clone();
96                    let run_id = job.run_id.clone();
97                    match run_schedule_job(ctx.clone(), job).await {
98                        Ok(ScheduleRunLifecycleResult::Terminal(status)) => {
99                            if let Err(error) = ctx
100                                .schedule_store
101                                .mark_run_terminal(&schedule_id, &run_id, status, None)
102                                .await
103                            {
104                                tracing::warn!(
105                                    "failed to mark schedule run terminal state for {} / {}: {}",
106                                    schedule_id,
107                                    run_id,
108                                    error
109                                );
110                            }
111                        }
112                        Ok(ScheduleRunLifecycleResult::BackgroundExecutionInProgress) => {}
113                        Err(e) => {
114                            tracing::warn!("schedule job failed: {e}");
115                            if let Err(error) = ctx
116                                .schedule_store
117                                .mark_run_terminal(
118                                    &schedule_id,
119                                    &run_id,
120                                    ScheduleRunStatus::Failed,
121                                    Some(e.clone()),
122                                )
123                                .await
124                            {
125                                tracing::warn!(
126                                    "failed to mark schedule run failed state for {} / {}: {}",
127                                    schedule_id,
128                                    run_id,
129                                    error
130                                );
131                            }
132                        }
133                    }
134                }
135            }
136        });
137
138        // Ticker: claims due schedules and enqueues jobs.
139        tokio::spawn({
140            let tx = tx.clone();
141            let store = ctx.schedule_store.clone();
142            let trigger_engine = ctx.trigger_engine.clone();
143            async move {
144                let mut ticker = tokio::time::interval(Duration::from_secs(15));
145                loop {
146                    ticker.tick().await;
147                    let now = Utc::now();
148                    let claimed: Vec<ClaimedScheduleRun> = match store
149                        .claim_due_runs_with_engine(now, trigger_engine.as_ref())
150                        .await
151                    {
152                        Ok(v) => v,
153                        Err(e) => {
154                            tracing::warn!("claim_due_runs failed: {e}");
155                            continue;
156                        }
157                    };
158                    for c in claimed {
159                        let schedule_id = c.schedule_id.clone();
160                        let run_id = c.run_id.clone();
161                        if tx
162                            .send(ScheduleRunJob {
163                                run_id: c.run_id,
164                                schedule_id: c.schedule_id,
165                                schedule_name: c.schedule_name,
166                                run_config: c.run_config,
167                                scheduled_for: c.scheduled_for,
168                                claimed_at: c.claimed_at,
169                                was_catch_up: c.was_catch_up,
170                            })
171                            .await
172                            .is_err()
173                        {
174                            let _ = store
175                                .mark_run_dequeued_without_start(
176                                    &schedule_id,
177                                    &run_id,
178                                    Some("schedule manager is not running".to_string()),
179                                )
180                                .await;
181                        }
182                    }
183                }
184            }
185        });
186
187        Self { tx }
188    }
189
190    pub async fn enqueue_run_now(&self, job: ScheduleRunJob) -> Result<(), String> {
191        self.tx
192            .send(job)
193            .await
194            .map_err(|_| "schedule manager is not running".to_string())
195    }
196}
197
198async fn run_schedule_job(
199    ctx: ScheduleContext,
200    job: ScheduleRunJob,
201) -> Result<ScheduleRunLifecycleResult, String> {
202    let resolved = (ctx.resolve_run_config)(&job);
203
204    // If the adapter resolved an empty model, skip the run.
205    if resolved.model.trim().is_empty() {
206        tracing::warn!(
207            "[schedule:{}] skipping run: resolved model is empty",
208            job.schedule_id
209        );
210        return Ok(ScheduleRunLifecycleResult::Terminal(
211            ScheduleRunStatus::Skipped,
212        ));
213    }
214
215    let requested_model = job
216        .run_config
217        .model
218        .as_deref()
219        .map(str::trim)
220        .filter(|v| !v.is_empty())
221        .map(|v| v.to_string());
222    let requested_reasoning_effort = job.run_config.reasoning_effort;
223
224    let mut session = super::session_factory::create_schedule_session(
225        &job,
226        &resolved.model,
227        &resolved.system_prompt,
228        &resolved.base_system_prompt,
229        resolved.workspace_path.as_deref(),
230        resolved.reasoning_effort,
231    );
232    let session_id = session.id.clone();
233
234    // Persist session and index entry.
235    ctx.persistence
236        .merge_save_runtime(&mut session)
237        .await
238        .map_err(|e| format!("failed to save scheduled session: {e}"))?;
239    if let Err(error) = ctx
240        .schedule_store
241        .bind_run_session(&job.schedule_id, &job.run_id, &session_id)
242        .await
243    {
244        tracing::warn!(
245            "failed to bind session {} to schedule run {} / {}: {}",
246            session_id,
247            job.schedule_id,
248            job.run_id,
249            error
250        );
251    }
252    {
253        let mut sessions = ctx.sessions_cache.write().await;
254        sessions.insert(session_id.clone(), session.clone());
255    }
256
257    // If no task message (or not configured to execute), we're done.
258    let should_execute = job.run_config.auto_execute
259        && session
260            .messages
261            .last()
262            .map(|m| matches!(m.role, Role::User))
263            .unwrap_or(false);
264
265    tracing::info!(
266        "[schedule:{}] created session {} (auto_execute={}, model={}, model_source={}, reasoning_effort={}, reasoning_source={})",
267        job.schedule_id,
268        session_id,
269        job.run_config.auto_execute,
270        resolved.model,
271        if requested_model.is_some() {
272            "schedule.run_config.model"
273        } else {
274            "resolved"
275        },
276        resolved.reasoning_effort.map(|value| value.as_str()).unwrap_or("none"),
277        if requested_reasoning_effort.is_some() {
278            "schedule.run_config.reasoning_effort"
279        } else {
280            "resolved"
281        }
282    );
283    if !should_execute {
284        return Ok(ScheduleRunLifecycleResult::Terminal(
285            ScheduleRunStatus::Success,
286        ));
287    }
288
289    // Model is required by the provider trait; if resolution failed we'd have returned earlier.
290    if resolved.model.trim().is_empty() {
291        let msg = "resolved model is empty".to_string();
292        session.add_message(Message::assistant(format!("❌ {msg}"), None));
293        let _ = ctx.persistence.merge_save_runtime(&mut session).await;
294        return Err(msg);
295    }
296
297    let session_tx = get_or_create_event_sender(&ctx.session_event_senders, &session_id).await;
298    let schedule_id_for_log = job.schedule_id.clone();
299    let run_id_for_log = job.run_id.clone();
300
301    // Insert runner status (for cancellation/status introspection).
302    let Some(RunnerReservation { cancel_token, .. }) =
303        try_reserve_runner(&ctx.agent_runners, &session_id, &session_tx).await
304    else {
305        return Ok(ScheduleRunLifecycleResult::Terminal(
306            ScheduleRunStatus::Skipped,
307        ));
308    };
309
310    let (mpsc_tx, _forwarder_handle) = create_event_forwarder(
311        session_id.clone(),
312        session_tx.clone(),
313        ctx.agent_runners.clone(),
314    );
315
316    // Run agent loop in background.
317    let agent_runtime = ctx.agent.clone();
318    let tools = ctx.tools.clone();
319    let schedule_store = ctx.schedule_store.clone();
320    let persistence = ctx.persistence.clone();
321    let session_id_clone = session_id.clone();
322    let schedule_id_for_state = job.schedule_id.clone();
323    let run_id_for_state = job.run_id.clone();
324    let agent_runners_for_status = ctx.agent_runners.clone();
325    let sessions_cache = ctx.sessions_cache.clone();
326    let model = resolved.model.clone();
327    let reasoning_effort = resolved.reasoning_effort;
328
329    tokio::spawn(async move {
330        let initial_message = session
331            .messages
332            .last()
333            .filter(|m| matches!(m.role, Role::User))
334            .map(|m| m.content.clone())
335            .unwrap_or_default();
336
337        let result = agent_runtime
338            .execute(
339                &mut session,
340                ExecuteRequest {
341                    initial_message,
342                    event_tx: mpsc_tx,
343                    cancel_token,
344                    tools: Some(tools),
345                    provider_override: None,
346                    model: Some(model.clone()),
347                    provider_name: None,
348                    background_model: None,
349                    background_model_provider: None,
350                    reasoning_effort,
351                    disabled_tools: None,
352                    disabled_skill_ids: None,
353                    selected_skill_ids: None,
354                    selected_skill_mode: None,
355                    image_fallback: None,
356                    app_data_dir: ctx.app_data_dir.clone(),
357                },
358            )
359            .await;
360
361        let terminal_status = if let Err(ref e) = result {
362            // Persist a visible failure marker so the user can open the scheduled session
363            // and understand why it didn't produce output.
364            session.add_message(Message::assistant(
365                format!("❌ Scheduled run failed: {e}"),
366                None,
367            ));
368            tracing::warn!(
369                "[schedule:{}][run:{}][session:{}] scheduled run failed: {}",
370                schedule_id_for_log,
371                run_id_for_log,
372                session_id_clone,
373                e
374            );
375            if e.to_string().contains("cancelled") {
376                ScheduleRunStatus::Cancelled
377            } else {
378                ScheduleRunStatus::Failed
379            }
380        } else {
381            tracing::info!(
382                "[schedule:{}][run:{}][session:{}] scheduled run completed",
383                schedule_id_for_log,
384                run_id_for_log,
385                session_id_clone
386            );
387            ScheduleRunStatus::Success
388        };
389
390        if let Err(error) = schedule_store
391            .mark_run_terminal(
392                &schedule_id_for_state,
393                &run_id_for_state,
394                terminal_status,
395                None,
396            )
397            .await
398        {
399            tracing::warn!(
400                "failed to mark schedule run terminal state for {} / {}: {}",
401                schedule_id_for_state,
402                run_id_for_state,
403                error
404            );
405        }
406
407        finalize_runner(&agent_runners_for_status, &session_id_clone, &result).await;
408
409        let _ = persistence.merge_save_runtime(&mut session).await;
410        {
411            let mut sessions = sessions_cache.write().await;
412            sessions.insert(session_id_clone.clone(), session);
413        }
414    });
415
416    Ok(ScheduleRunLifecycleResult::BackgroundExecutionInProgress)
417}