Skip to main content

bamboo_server/schedule_app/
manager.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use chrono::Utc;
6use tokio::sync::{broadcast, mpsc, RwLock};
7
8use bamboo_agent_core::tools::ToolExecutor;
9use bamboo_agent_core::{AgentEvent, Message, Role, Session};
10use bamboo_domain::reasoning::ReasoningEffort;
11use bamboo_engine::execution::{
12    create_event_forwarder, finalize_runner, get_or_create_event_sender, try_reserve_runner,
13    AgentRunner,
14};
15use bamboo_engine::ExecuteRequest;
16
17use super::store::{ClaimedScheduleRun, ScheduleStore};
18use super::trigger_engine::DynTriggerEngine;
19use bamboo_domain::{ScheduleRunConfig, ScheduleRunStatus};
20
21#[derive(Debug, Clone)]
22pub struct ScheduleRunJob {
23    pub run_id: String,
24    pub schedule_id: String,
25    pub schedule_name: String,
26    pub run_config: ScheduleRunConfig,
27    pub scheduled_for: chrono::DateTime<chrono::Utc>,
28    pub claimed_at: chrono::DateTime<chrono::Utc>,
29    pub was_catch_up: bool,
30}
31
32/// Resolved run configuration computed by the adapter layer.
33///
34/// The schedule crate delegates model/prompt/workspace resolution to the
35/// caller via [`ScheduleContext::resolve_run_config`] so that server-specific
36/// concerns (Config, filesystem prompt templates) stay out of the crate.
37#[derive(Debug, Clone)]
38pub struct ResolvedRunConfig {
39    pub model: String,
40    pub reasoning_effort: Option<ReasoningEffort>,
41    pub system_prompt: String,
42    pub base_system_prompt: String,
43    pub workspace_path: Option<String>,
44}
45
46#[derive(Clone)]
47pub struct ScheduleContext {
48    pub schedule_store: Arc<ScheduleStore>,
49    pub agent: Arc<bamboo_engine::Agent>,
50    pub tools: Arc<dyn ToolExecutor>,
51    pub sessions_cache: Arc<RwLock<HashMap<String, Session>>>,
52    pub agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
53    pub session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
54    pub trigger_engine: DynTriggerEngine,
55    /// Adapter-provided callback that resolves model, system prompt, workspace path
56    /// and reasoning effort for a schedule run job.
57    pub resolve_run_config: Arc<dyn Fn(&ScheduleRunJob) -> ResolvedRunConfig + Send + Sync>,
58}
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61enum ScheduleRunLifecycleResult {
62    Terminal(ScheduleRunStatus),
63    BackgroundExecutionInProgress,
64}
65
66#[derive(Clone)]
67pub struct ScheduleManager {
68    tx: mpsc::Sender<ScheduleRunJob>,
69}
70
71impl ScheduleManager {
72    pub fn new(ctx: ScheduleContext) -> Self {
73        let (tx, mut rx) = mpsc::channel::<ScheduleRunJob>(128);
74
75        // Worker: executes jobs sequentially (simple + predictable).
76        tokio::spawn({
77            let ctx = ctx.clone();
78            async move {
79                while let Some(job) = rx.recv().await {
80                    if let Err(error) = ctx
81                        .schedule_store
82                        .mark_run_started(&job.schedule_id, &job.run_id)
83                        .await
84                    {
85                        tracing::warn!(
86                            "failed to mark schedule run started for {} / {}: {}",
87                            job.schedule_id,
88                            job.run_id,
89                            error
90                        );
91                    }
92                    let schedule_id = job.schedule_id.clone();
93                    let run_id = job.run_id.clone();
94                    match run_schedule_job(ctx.clone(), job).await {
95                        Ok(ScheduleRunLifecycleResult::Terminal(status)) => {
96                            if let Err(error) = ctx
97                                .schedule_store
98                                .mark_run_terminal(&schedule_id, &run_id, status, None)
99                                .await
100                            {
101                                tracing::warn!(
102                                    "failed to mark schedule run terminal state for {} / {}: {}",
103                                    schedule_id,
104                                    run_id,
105                                    error
106                                );
107                            }
108                        }
109                        Ok(ScheduleRunLifecycleResult::BackgroundExecutionInProgress) => {}
110                        Err(e) => {
111                            tracing::warn!("schedule job failed: {e}");
112                            if let Err(error) = ctx
113                                .schedule_store
114                                .mark_run_terminal(
115                                    &schedule_id,
116                                    &run_id,
117                                    ScheduleRunStatus::Failed,
118                                    Some(e.clone()),
119                                )
120                                .await
121                            {
122                                tracing::warn!(
123                                    "failed to mark schedule run failed state for {} / {}: {}",
124                                    schedule_id,
125                                    run_id,
126                                    error
127                                );
128                            }
129                        }
130                    }
131                }
132            }
133        });
134
135        // Ticker: claims due schedules and enqueues jobs.
136        tokio::spawn({
137            let tx = tx.clone();
138            let store = ctx.schedule_store.clone();
139            let trigger_engine = ctx.trigger_engine.clone();
140            async move {
141                let mut ticker = tokio::time::interval(Duration::from_secs(15));
142                loop {
143                    ticker.tick().await;
144                    let now = Utc::now();
145                    let claimed: Vec<ClaimedScheduleRun> = match store
146                        .claim_due_runs_with_engine(now, trigger_engine.as_ref())
147                        .await
148                    {
149                        Ok(v) => v,
150                        Err(e) => {
151                            tracing::warn!("claim_due_runs failed: {e}");
152                            continue;
153                        }
154                    };
155                    for c in claimed {
156                        let schedule_id = c.schedule_id.clone();
157                        let run_id = c.run_id.clone();
158                        if tx
159                            .send(ScheduleRunJob {
160                                run_id: c.run_id,
161                                schedule_id: c.schedule_id,
162                                schedule_name: c.schedule_name,
163                                run_config: c.run_config,
164                                scheduled_for: c.scheduled_for,
165                                claimed_at: c.claimed_at,
166                                was_catch_up: c.was_catch_up,
167                            })
168                            .await
169                            .is_err()
170                        {
171                            let _ = store
172                                .mark_run_dequeued_without_start(
173                                    &schedule_id,
174                                    &run_id,
175                                    Some("schedule manager is not running".to_string()),
176                                )
177                                .await;
178                        }
179                    }
180                }
181            }
182        });
183
184        Self { tx }
185    }
186
187    pub async fn enqueue_run_now(&self, job: ScheduleRunJob) -> Result<(), String> {
188        self.tx
189            .send(job)
190            .await
191            .map_err(|_| "schedule manager is not running".to_string())
192    }
193}
194
195async fn run_schedule_job(
196    ctx: ScheduleContext,
197    job: ScheduleRunJob,
198) -> Result<ScheduleRunLifecycleResult, String> {
199    let resolved = (ctx.resolve_run_config)(&job);
200
201    // If the adapter resolved an empty model, skip the run.
202    if resolved.model.trim().is_empty() {
203        tracing::warn!(
204            "[schedule:{}] skipping run: resolved model is empty",
205            job.schedule_id
206        );
207        return Ok(ScheduleRunLifecycleResult::Terminal(
208            ScheduleRunStatus::Skipped,
209        ));
210    }
211
212    let requested_model = job
213        .run_config
214        .model
215        .as_deref()
216        .map(str::trim)
217        .filter(|v| !v.is_empty())
218        .map(|v| v.to_string());
219    let requested_reasoning_effort = job.run_config.reasoning_effort;
220
221    let mut session = super::session_factory::create_schedule_session(
222        &job,
223        &resolved.model,
224        &resolved.system_prompt,
225        &resolved.base_system_prompt,
226        resolved.workspace_path.as_deref(),
227        resolved.reasoning_effort,
228    );
229    let session_id = session.id.clone();
230
231    // Persist session and index entry.
232    ctx.agent
233        .storage()
234        .save_session(&session)
235        .await
236        .map_err(|e| format!("failed to save scheduled session: {e}"))?;
237    if let Err(error) = ctx
238        .schedule_store
239        .bind_run_session(&job.schedule_id, &job.run_id, &session_id)
240        .await
241    {
242        tracing::warn!(
243            "failed to bind session {} to schedule run {} / {}: {}",
244            session_id,
245            job.schedule_id,
246            job.run_id,
247            error
248        );
249    }
250    {
251        let mut sessions = ctx.sessions_cache.write().await;
252        sessions.insert(session_id.clone(), session.clone());
253    }
254
255    // If no task message (or not configured to execute), we're done.
256    let should_execute = job.run_config.auto_execute
257        && session
258            .messages
259            .last()
260            .map(|m| matches!(m.role, Role::User))
261            .unwrap_or(false);
262
263    tracing::info!(
264        "[schedule:{}] created session {} (auto_execute={}, model={}, model_source={}, reasoning_effort={}, reasoning_source={})",
265        job.schedule_id,
266        session_id,
267        job.run_config.auto_execute,
268        resolved.model,
269        if requested_model.is_some() {
270            "schedule.run_config.model"
271        } else {
272            "resolved"
273        },
274        resolved.reasoning_effort.map(|value| value.as_str()).unwrap_or("none"),
275        if requested_reasoning_effort.is_some() {
276            "schedule.run_config.reasoning_effort"
277        } else {
278            "resolved"
279        }
280    );
281    if !should_execute {
282        return Ok(ScheduleRunLifecycleResult::Terminal(
283            ScheduleRunStatus::Success,
284        ));
285    }
286
287    // Model is required by the provider trait; if resolution failed we'd have returned earlier.
288    if resolved.model.trim().is_empty() {
289        let msg = "resolved model is empty".to_string();
290        session.add_message(Message::assistant(format!("❌ {msg}"), None));
291        let _ = ctx.agent.storage().save_session(&session).await;
292        return Err(msg);
293    }
294
295    let session_tx = get_or_create_event_sender(&ctx.session_event_senders, &session_id).await;
296    let schedule_id_for_log = job.schedule_id.clone();
297    let run_id_for_log = job.run_id.clone();
298
299    // Insert runner status (for cancellation/status introspection).
300    let Some(cancel_token) = try_reserve_runner(&ctx.agent_runners, &session_id, &session_tx).await
301    else {
302        return Ok(ScheduleRunLifecycleResult::Terminal(
303            ScheduleRunStatus::Skipped,
304        ));
305    };
306
307    let (mpsc_tx, _forwarder_handle) = create_event_forwarder(
308        session_id.clone(),
309        session_tx.clone(),
310        ctx.agent_runners.clone(),
311    );
312
313    // Run agent loop in background.
314    let agent_runtime = ctx.agent.clone();
315    let tools = ctx.tools.clone();
316    let schedule_store = ctx.schedule_store.clone();
317    let storage = ctx.agent.storage().clone();
318    let session_id_clone = session_id.clone();
319    let schedule_id_for_state = job.schedule_id.clone();
320    let run_id_for_state = job.run_id.clone();
321    let agent_runners_for_status = ctx.agent_runners.clone();
322    let sessions_cache = ctx.sessions_cache.clone();
323    let model = resolved.model.clone();
324    let reasoning_effort = resolved.reasoning_effort;
325
326    tokio::spawn(async move {
327        let initial_message = session
328            .messages
329            .last()
330            .filter(|m| matches!(m.role, Role::User))
331            .map(|m| m.content.clone())
332            .unwrap_or_default();
333
334        let result = agent_runtime
335            .execute(
336                &mut session,
337                ExecuteRequest {
338                    initial_message,
339                    event_tx: mpsc_tx,
340                    cancel_token,
341                    tools: Some(tools),
342                    provider_override: None,
343                    model: Some(model.clone()),
344                    provider_name: None,
345                    background_model: None,
346                    background_model_provider: None,
347                    reasoning_effort,
348                    disabled_tools: None,
349                    disabled_skill_ids: None,
350                    selected_skill_ids: None,
351                    selected_skill_mode: None,
352                    image_fallback: None,
353                },
354            )
355            .await;
356
357        let terminal_status = if let Err(ref e) = result {
358            // Persist a visible failure marker so the user can open the scheduled session
359            // and understand why it didn't produce output.
360            session.add_message(Message::assistant(
361                format!("❌ Scheduled run failed: {e}"),
362                None,
363            ));
364            tracing::warn!(
365                "[schedule:{}][run:{}][session:{}] scheduled run failed: {}",
366                schedule_id_for_log,
367                run_id_for_log,
368                session_id_clone,
369                e
370            );
371            if e.to_string().contains("cancelled") {
372                ScheduleRunStatus::Cancelled
373            } else {
374                ScheduleRunStatus::Failed
375            }
376        } else {
377            tracing::info!(
378                "[schedule:{}][run:{}][session:{}] scheduled run completed",
379                schedule_id_for_log,
380                run_id_for_log,
381                session_id_clone
382            );
383            ScheduleRunStatus::Success
384        };
385
386        if let Err(error) = schedule_store
387            .mark_run_terminal(
388                &schedule_id_for_state,
389                &run_id_for_state,
390                terminal_status,
391                None,
392            )
393            .await
394        {
395            tracing::warn!(
396                "failed to mark schedule run terminal state for {} / {}: {}",
397                schedule_id_for_state,
398                run_id_for_state,
399                error
400            );
401        }
402
403        finalize_runner(&agent_runners_for_status, &session_id_clone, &result).await;
404
405        let _ = storage.save_session(&session).await;
406        {
407            let mut sessions = sessions_cache.write().await;
408            sessions.insert(session_id_clone.clone(), session);
409        }
410    });
411
412    Ok(ScheduleRunLifecycleResult::BackgroundExecutionInProgress)
413}