Skip to main content

bamboo_server/schedule_app/
manager.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use chrono::Utc;
6use tokio::sync::{broadcast, mpsc, RwLock};
7
8use bamboo_agent_core::tools::ToolExecutor;
9use bamboo_agent_core::{AgentEvent, Message, Role, Session};
10use bamboo_domain::reasoning::ReasoningEffort;
11use bamboo_engine::config::GoldConfig;
12use bamboo_engine::execution::{
13    create_event_forwarder, finalize_runner, get_or_create_event_sender, try_reserve_runner,
14    AgentRunner, RunnerReservation,
15};
16use bamboo_engine::{AuxiliaryModelConfig, ExecuteRequest};
17use bamboo_infrastructure::LockedSessionStore;
18
19use super::store::{ClaimedScheduleRun, ScheduleStore};
20use super::trigger_engine::DynTriggerEngine;
21use bamboo_domain::{ScheduleRunConfig, ScheduleRunStatus};
22
23#[derive(Debug, Clone)]
24pub struct ScheduleRunJob {
25    pub run_id: String,
26    pub schedule_id: String,
27    pub schedule_name: String,
28    pub run_config: ScheduleRunConfig,
29    pub scheduled_for: chrono::DateTime<chrono::Utc>,
30    pub claimed_at: chrono::DateTime<chrono::Utc>,
31    pub was_catch_up: bool,
32}
33
34/// Resolved run configuration computed by the adapter layer.
35///
36/// The schedule crate delegates model/prompt/workspace resolution to the
37/// caller via [`ScheduleContext::resolve_run_config`] so that server-specific
38/// concerns (Config, filesystem prompt templates) stay out of the crate.
39#[derive(Clone)]
40pub struct ResolvedRunConfig {
41    pub model: String,
42    pub provider_name: Option<String>,
43    pub provider_type: Option<String>,
44    pub fast_model: Option<String>,
45    pub fast_model_provider: Option<Arc<dyn bamboo_infrastructure::LLMProvider>>,
46    pub background_model: Option<String>,
47    pub background_model_provider: Option<Arc<dyn bamboo_infrastructure::LLMProvider>>,
48    pub summarization_model: Option<String>,
49    pub summarization_model_provider: Option<Arc<dyn bamboo_infrastructure::LLMProvider>>,
50    pub reasoning_effort: Option<ReasoningEffort>,
51    pub gold_config: Option<GoldConfig>,
52    pub system_prompt: String,
53    pub base_system_prompt: String,
54    pub workspace_path: Option<String>,
55}
56
57#[derive(Clone)]
58pub struct ScheduleContext {
59    pub schedule_store: Arc<ScheduleStore>,
60    pub agent: Arc<bamboo_engine::Agent>,
61    pub persistence: Arc<LockedSessionStore>,
62    pub tools: Arc<dyn ToolExecutor>,
63    pub sessions_cache: Arc<RwLock<HashMap<String, Session>>>,
64    pub agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
65    pub session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
66    /// Optional inbox to the account-wide change feed (durable multi-client sync).
67    pub account_feed_inbox: Option<bamboo_engine::execution::AccountFeedInbox>,
68    pub app_data_dir: Option<std::path::PathBuf>,
69    pub trigger_engine: DynTriggerEngine,
70    /// Adapter-provided callback that resolves model, system prompt, workspace path
71    /// and reasoning effort for a schedule run job.
72    pub resolve_run_config: Arc<dyn Fn(&ScheduleRunJob) -> ResolvedRunConfig + Send + Sync>,
73}
74
75#[derive(Debug, Clone, Copy, PartialEq, Eq)]
76enum ScheduleRunLifecycleResult {
77    Terminal(ScheduleRunStatus),
78    BackgroundExecutionInProgress,
79}
80
81#[derive(Clone)]
82pub struct ScheduleManager {
83    tx: mpsc::Sender<ScheduleRunJob>,
84}
85
86impl ScheduleManager {
87    pub fn new(ctx: ScheduleContext) -> Self {
88        let (tx, mut rx) = mpsc::channel::<ScheduleRunJob>(128);
89
90        // Worker: executes jobs sequentially (simple + predictable).
91        tokio::spawn({
92            let ctx = ctx.clone();
93            async move {
94                while let Some(job) = rx.recv().await {
95                    if let Err(error) = ctx
96                        .schedule_store
97                        .mark_run_started(&job.schedule_id, &job.run_id)
98                        .await
99                    {
100                        tracing::warn!(
101                            "failed to mark schedule run started for {} / {}: {}",
102                            job.schedule_id,
103                            job.run_id,
104                            error
105                        );
106                    }
107                    let schedule_id = job.schedule_id.clone();
108                    let run_id = job.run_id.clone();
109                    match run_schedule_job(ctx.clone(), job).await {
110                        Ok(ScheduleRunLifecycleResult::Terminal(status)) => {
111                            if let Err(error) = ctx
112                                .schedule_store
113                                .mark_run_terminal(&schedule_id, &run_id, status, None)
114                                .await
115                            {
116                                tracing::warn!(
117                                    "failed to mark schedule run terminal state for {} / {}: {}",
118                                    schedule_id,
119                                    run_id,
120                                    error
121                                );
122                            }
123                        }
124                        Ok(ScheduleRunLifecycleResult::BackgroundExecutionInProgress) => {}
125                        Err(e) => {
126                            tracing::warn!("schedule job failed: {e}");
127                            if let Err(error) = ctx
128                                .schedule_store
129                                .mark_run_terminal(
130                                    &schedule_id,
131                                    &run_id,
132                                    ScheduleRunStatus::Failed,
133                                    Some(e.clone()),
134                                )
135                                .await
136                            {
137                                tracing::warn!(
138                                    "failed to mark schedule run failed state for {} / {}: {}",
139                                    schedule_id,
140                                    run_id,
141                                    error
142                                );
143                            }
144                        }
145                    }
146                }
147            }
148        });
149
150        // Ticker: claims due schedules and enqueues jobs.
151        tokio::spawn({
152            let tx = tx.clone();
153            let store = ctx.schedule_store.clone();
154            let trigger_engine = ctx.trigger_engine.clone();
155            async move {
156                let mut ticker = tokio::time::interval(Duration::from_secs(15));
157                loop {
158                    ticker.tick().await;
159                    let now = Utc::now();
160                    let claimed: Vec<ClaimedScheduleRun> = match store
161                        .claim_due_runs_with_engine(now, trigger_engine.as_ref())
162                        .await
163                    {
164                        Ok(v) => v,
165                        Err(e) => {
166                            tracing::warn!("claim_due_runs failed: {e}");
167                            continue;
168                        }
169                    };
170                    for c in claimed {
171                        let schedule_id = c.schedule_id.clone();
172                        let run_id = c.run_id.clone();
173                        if tx
174                            .send(ScheduleRunJob {
175                                run_id: c.run_id,
176                                schedule_id: c.schedule_id,
177                                schedule_name: c.schedule_name,
178                                run_config: c.run_config,
179                                scheduled_for: c.scheduled_for,
180                                claimed_at: c.claimed_at,
181                                was_catch_up: c.was_catch_up,
182                            })
183                            .await
184                            .is_err()
185                        {
186                            let _ = store
187                                .mark_run_dequeued_without_start(
188                                    &schedule_id,
189                                    &run_id,
190                                    Some("schedule manager is not running".to_string()),
191                                )
192                                .await;
193                        }
194                    }
195                }
196            }
197        });
198
199        Self { tx }
200    }
201
202    pub async fn enqueue_run_now(&self, job: ScheduleRunJob) -> Result<(), String> {
203        self.tx
204            .send(job)
205            .await
206            .map_err(|_| "schedule manager is not running".to_string())
207    }
208}
209
210async fn run_schedule_job(
211    ctx: ScheduleContext,
212    job: ScheduleRunJob,
213) -> Result<ScheduleRunLifecycleResult, String> {
214    let resolved = (ctx.resolve_run_config)(&job);
215
216    // If the adapter resolved an empty model, skip the run.
217    if resolved.model.trim().is_empty() {
218        tracing::warn!(
219            "[schedule:{}] skipping run: resolved model is empty",
220            job.schedule_id
221        );
222        return Ok(ScheduleRunLifecycleResult::Terminal(
223            ScheduleRunStatus::Skipped,
224        ));
225    }
226
227    let requested_model = job
228        .run_config
229        .model
230        .as_deref()
231        .map(str::trim)
232        .filter(|v| !v.is_empty())
233        .map(|v| v.to_string());
234    let requested_reasoning_effort = job.run_config.reasoning_effort;
235
236    let mut session = super::session_factory::create_schedule_session(
237        &job,
238        &resolved.model,
239        &resolved.system_prompt,
240        &resolved.base_system_prompt,
241        resolved.workspace_path.as_deref(),
242        resolved.reasoning_effort,
243    );
244    let session_id = session.id.clone();
245
246    // Persist session and index entry.
247    ctx.persistence
248        .merge_save_runtime(&mut session)
249        .await
250        .map_err(|e| format!("failed to save scheduled session: {e}"))?;
251    if let Err(error) = ctx
252        .schedule_store
253        .bind_run_session(&job.schedule_id, &job.run_id, &session_id)
254        .await
255    {
256        tracing::warn!(
257            "failed to bind session {} to schedule run {} / {}: {}",
258            session_id,
259            job.schedule_id,
260            job.run_id,
261            error
262        );
263    }
264    {
265        let mut sessions = ctx.sessions_cache.write().await;
266        sessions.insert(session_id.clone(), session.clone());
267    }
268
269    // If no task message (or not configured to execute), we're done.
270    let should_execute = job.run_config.auto_execute
271        && session
272            .messages
273            .last()
274            .map(|m| matches!(m.role, Role::User))
275            .unwrap_or(false);
276
277    tracing::info!(
278        "[schedule:{}] created session {} (auto_execute={}, model={}, model_source={}, reasoning_effort={}, reasoning_source={})",
279        job.schedule_id,
280        session_id,
281        job.run_config.auto_execute,
282        resolved.model,
283        if requested_model.is_some() {
284            "schedule.run_config.model"
285        } else {
286            "resolved"
287        },
288        resolved.reasoning_effort.map(|value| value.as_str()).unwrap_or("none"),
289        if requested_reasoning_effort.is_some() {
290            "schedule.run_config.reasoning_effort"
291        } else {
292            "resolved"
293        }
294    );
295    if !should_execute {
296        return Ok(ScheduleRunLifecycleResult::Terminal(
297            ScheduleRunStatus::Success,
298        ));
299    }
300
301    // Model is required by the provider trait; if resolution failed we'd have returned earlier.
302    if resolved.model.trim().is_empty() {
303        let msg = "resolved model is empty".to_string();
304        session.add_message(Message::assistant(format!("❌ {msg}"), None));
305        let _ = ctx.persistence.merge_save_runtime(&mut session).await;
306        return Err(msg);
307    }
308
309    let session_tx = get_or_create_event_sender(&ctx.session_event_senders, &session_id).await;
310    let schedule_id_for_log = job.schedule_id.clone();
311    let run_id_for_log = job.run_id.clone();
312
313    // Insert runner status (for cancellation/status introspection).
314    let Some(RunnerReservation { cancel_token, .. }) =
315        try_reserve_runner(&ctx.agent_runners, &session_id, &session_tx).await
316    else {
317        return Ok(ScheduleRunLifecycleResult::Terminal(
318            ScheduleRunStatus::Skipped,
319        ));
320    };
321
322    let (mpsc_tx, _forwarder_handle) = create_event_forwarder(
323        session_id.clone(),
324        session_tx.clone(),
325        ctx.agent_runners.clone(),
326        ctx.account_feed_inbox.clone(),
327    );
328
329    // Run agent loop in background.
330    let agent_runtime = ctx.agent.clone();
331    let tools = ctx.tools.clone();
332    let schedule_store = ctx.schedule_store.clone();
333    let persistence = ctx.persistence.clone();
334    let session_id_clone = session_id.clone();
335    let schedule_id_for_state = job.schedule_id.clone();
336    let run_id_for_state = job.run_id.clone();
337    let agent_runners_for_status = ctx.agent_runners.clone();
338    let sessions_cache = ctx.sessions_cache.clone();
339    let model = resolved.model.clone();
340    let provider_name = resolved.provider_name.clone();
341    let provider_type = resolved.provider_type.clone();
342    let fast_model = resolved.fast_model.clone();
343    let fast_model_provider = resolved.fast_model_provider.clone();
344    let background_model = resolved.background_model.clone();
345    let background_model_provider = resolved.background_model_provider.clone();
346    let summarization_model = resolved.summarization_model.clone();
347    let summarization_model_provider = resolved.summarization_model_provider.clone();
348    let reasoning_effort = resolved.reasoning_effort;
349    let gold_config = resolved.gold_config.clone();
350
351    tokio::spawn(async move {
352        let initial_message = session
353            .messages
354            .last()
355            .filter(|m| matches!(m.role, Role::User))
356            .map(|m| m.content.clone())
357            .unwrap_or_default();
358
359        let aux_fast_model = fast_model.clone();
360        let aux_fast_provider = fast_model_provider.clone();
361        let aux_background_model = background_model.clone();
362        let aux_background_provider = background_model_provider.clone();
363        let aux_summarization_model = summarization_model.clone();
364        let aux_summarization_provider = summarization_model_provider.clone();
365        let auxiliary_model_resolver = Arc::new(move || AuxiliaryModelConfig {
366            fast_model_name: aux_fast_model.clone(),
367            fast_model_provider: aux_fast_provider.clone(),
368            background_model_name: aux_background_model.clone(),
369            planning_model_name: None,
370            search_model_name: None,
371            summarization_model_name: aux_summarization_model.clone(),
372            background_model_provider: aux_background_provider.clone(),
373            summarization_model_provider: aux_summarization_provider.clone(),
374        });
375
376        let result = agent_runtime
377            .execute(
378                &mut session,
379                ExecuteRequest {
380                    initial_message,
381                    event_tx: mpsc_tx,
382                    cancel_token,
383                    tools: Some(tools),
384                    provider_override: None,
385                    model: Some(model.clone()),
386                    provider_name,
387                    provider_type,
388                    fast_model,
389                    fast_model_provider,
390                    background_model,
391                    background_model_provider,
392                    summarization_model,
393                    summarization_model_provider,
394                    reasoning_effort,
395                    auxiliary_model_resolver: Some(auxiliary_model_resolver),
396                    disabled_tools: None,
397                    disabled_skill_ids: None,
398                    selected_skill_ids: None,
399                    selected_skill_mode: None,
400                    image_fallback: None,
401                    gold_config,
402                    app_data_dir: ctx.app_data_dir.clone(),
403                },
404            )
405            .await;
406
407        let terminal_status = if let Err(ref e) = result {
408            // Persist a visible failure marker so the user can open the scheduled session
409            // and understand why it didn't produce output.
410            session.add_message(Message::assistant(
411                format!("❌ Scheduled run failed: {e}"),
412                None,
413            ));
414            tracing::warn!(
415                "[schedule:{}][run:{}][session:{}] scheduled run failed: {}",
416                schedule_id_for_log,
417                run_id_for_log,
418                session_id_clone,
419                e
420            );
421            if e.to_string().contains("cancelled") {
422                ScheduleRunStatus::Cancelled
423            } else {
424                ScheduleRunStatus::Failed
425            }
426        } else {
427            tracing::info!(
428                "[schedule:{}][run:{}][session:{}] scheduled run completed",
429                schedule_id_for_log,
430                run_id_for_log,
431                session_id_clone
432            );
433            ScheduleRunStatus::Success
434        };
435
436        if let Err(error) = schedule_store
437            .mark_run_terminal(
438                &schedule_id_for_state,
439                &run_id_for_state,
440                terminal_status,
441                None,
442            )
443            .await
444        {
445            tracing::warn!(
446                "failed to mark schedule run terminal state for {} / {}: {}",
447                schedule_id_for_state,
448                run_id_for_state,
449                error
450            );
451        }
452
453        finalize_runner(&agent_runners_for_status, &session_id_clone, &result).await;
454
455        let _ = persistence.merge_save_runtime(&mut session).await;
456        {
457            let mut sessions = sessions_cache.write().await;
458            sessions.insert(session_id_clone.clone(), session);
459        }
460    });
461
462    Ok(ScheduleRunLifecycleResult::BackgroundExecutionInProgress)
463}