Skip to main content

bamboo_server/tools/
schedule_tasks.rs

1use async_trait::async_trait;
2use serde::Deserialize;
3use serde_json::json;
4use std::sync::Arc;
5
6use crate::handlers::agent::schedules::ScheduleView;
7use crate::schedules::{
8    ScheduleManager, ScheduleRunConfig, ScheduleRunJob, ScheduleStore, ScheduleTrigger,
9};
10use bamboo_agent_core::storage::Storage;
11use bamboo_agent_core::tools::{Tool, ToolError, ToolExecutionContext, ToolResult};
12use bamboo_agent_core::{Session, SessionKind};
13use bamboo_infrastructure::SessionStoreV2;
14
15/// One tool for schedule CRUD + actions.
16///
17/// We intentionally keep schedule operations as a server-only tool that calls internal
18/// Rust methods (NOT http_request) to avoid SSRF issues and to keep latency low.
19pub struct ScheduleTasksTool {
20    schedule_store: Arc<ScheduleStore>,
21    schedule_manager: Arc<ScheduleManager>,
22    session_store: Arc<SessionStoreV2>,
23    storage: Arc<dyn Storage>,
24}
25
26impl ScheduleTasksTool {
27    pub fn new(
28        schedule_store: Arc<ScheduleStore>,
29        schedule_manager: Arc<ScheduleManager>,
30        session_store: Arc<SessionStoreV2>,
31        storage: Arc<dyn Storage>,
32    ) -> Self {
33        Self {
34            schedule_store,
35            schedule_manager,
36            session_store,
37            storage,
38        }
39    }
40
41    async fn load_caller_session(&self, session_id: &str) -> Result<Session, ToolError> {
42        match self.storage.load_session(session_id).await {
43            Ok(Some(session)) => Ok(session),
44            Ok(None) => Err(ToolError::Execution(format!(
45                "session not found: {session_id}"
46            ))),
47            Err(e) => Err(ToolError::Execution(format!(
48                "failed to load session {session_id}: {e}"
49            ))),
50        }
51    }
52}
53
54#[derive(Debug, Deserialize)]
55#[serde(tag = "action", rename_all = "snake_case")]
56enum ScheduleTasksArgs {
57    List {},
58    Create {
59        name: String,
60        trigger: ScheduleTrigger,
61        #[serde(default)]
62        timezone: Option<String>,
63        #[serde(default)]
64        start_at: Option<chrono::DateTime<chrono::Utc>>,
65        #[serde(default)]
66        end_at: Option<chrono::DateTime<chrono::Utc>>,
67        #[serde(default)]
68        misfire_policy: Option<crate::schedules::MisFirePolicy>,
69        #[serde(default)]
70        overlap_policy: Option<crate::schedules::OverlapPolicy>,
71        #[serde(default)]
72        enabled: Option<bool>,
73        #[serde(default)]
74        run_config: Option<ScheduleRunConfig>,
75    },
76    Patch {
77        schedule_id: String,
78        #[serde(default)]
79        name: Option<String>,
80        #[serde(default)]
81        enabled: Option<bool>,
82        #[serde(default)]
83        trigger: Option<ScheduleTrigger>,
84        #[serde(default)]
85        timezone: Option<String>,
86        #[serde(default)]
87        start_at: Option<chrono::DateTime<chrono::Utc>>,
88        #[serde(default)]
89        end_at: Option<chrono::DateTime<chrono::Utc>>,
90        #[serde(default)]
91        misfire_policy: Option<crate::schedules::MisFirePolicy>,
92        #[serde(default)]
93        overlap_policy: Option<crate::schedules::OverlapPolicy>,
94        #[serde(default)]
95        run_config: Option<ScheduleRunConfig>,
96    },
97    Delete {
98        schedule_id: String,
99    },
100    RunNow {
101        schedule_id: String,
102    },
103    ListSessions {
104        schedule_id: String,
105    },
106}
107
108#[async_trait]
109impl Tool for ScheduleTasksTool {
110    fn name(&self) -> &str {
111        "scheduler"
112    }
113
114    fn description(&self) -> &str {
115        "Manage Bamboo scheduled automation jobs (list/create/patch/delete/run_now/list_sessions). Server-only tool that calls the internal scheduler directly instead of HTTP. Child sessions cannot use this."
116    }
117
118    fn parameters_schema(&self) -> serde_json::Value {
119        // Keep schema permissive; the Rust parser enforces the action-specific requirements.
120        json!({
121            "type": "object",
122            "properties": {
123                "action": {
124                    "type": "string",
125                    "enum": ["list", "create", "patch", "delete", "run_now", "list_sessions"],
126                    "description": "Which schedule operation to perform."
127                },
128                "schedule_id": { "type": "string", "description": "Schedule id for patch/delete/run_now/list_sessions." },
129                "name": { "type": "string", "description": "Schedule name (create/patch)." },
130                "enabled": { "type": "boolean", "description": "Enable/disable schedule (create/patch)." },
131                "trigger": {
132                    "type": "object",
133                    "description": "Canonical schedule trigger definition for create/patch. Required for create."
134                },
135                "timezone": { "type": "string", "description": "Optional IANA timezone for calendar-based triggers." },
136                "start_at": { "type": "string", "description": "Optional RFC3339 inclusive schedule window start." },
137                "end_at": { "type": "string", "description": "Optional RFC3339 exclusive schedule window end." },
138                "misfire_policy": { "type": "object", "description": "Optional misfire handling policy." },
139                "overlap_policy": { "type": "string", "enum": ["allow", "skip", "queue_one"], "description": "Optional overlap policy." },
140                "run_config": {
141                    "type": "object",
142                    "description": "Schedule run configuration (create/patch).",
143                    "properties": {
144                        "system_prompt": { "type": "string" },
145                        "task_message": { "type": "string" },
146                        "model": { "type": "string" },
147                        "reasoning_effort": {
148                            "type": "string",
149                            "enum": ["low", "medium", "high", "xhigh", "max"]
150                        },
151                        "workspace_path": { "type": "string" },
152                        "enhance_prompt": { "type": "string" },
153                        "auto_execute": { "type": "boolean" }
154                    }
155                }
156            },
157            "required": ["action"]
158        })
159    }
160
161    async fn execute(&self, args: serde_json::Value) -> Result<ToolResult, ToolError> {
162        self.execute_with_context(args, ToolExecutionContext::none("tool_call"))
163            .await
164    }
165
166    async fn execute_with_context(
167        &self,
168        args: serde_json::Value,
169        ctx: ToolExecutionContext<'_>,
170    ) -> Result<ToolResult, ToolError> {
171        let caller_session_id = ctx.session_id.ok_or_else(|| {
172            ToolError::Execution("scheduler requires a session_id in tool context".to_string())
173        })?;
174
175        let caller = self.load_caller_session(caller_session_id).await?;
176        if caller.kind != SessionKind::Root {
177            return Err(ToolError::Execution(
178                "scheduler is not allowed inside child sessions".to_string(),
179            ));
180        }
181
182        let parsed: ScheduleTasksArgs = serde_json::from_value(args)
183            .map_err(|e| ToolError::InvalidArguments(format!("Invalid scheduler args: {e}")))?;
184
185        match parsed {
186            ScheduleTasksArgs::List {} => {
187                let items = self
188                    .schedule_store
189                    .list_schedules()
190                    .await
191                    .into_iter()
192                    .map(ScheduleView::from)
193                    .collect::<Vec<_>>();
194                Ok(ToolResult {
195                    success: true,
196                    result: json!({ "schedules": items }).to_string(),
197                    display_preference: Some("Collapsible".to_string()),
198                })
199            }
200            ScheduleTasksArgs::Create {
201                name,
202                trigger,
203                timezone,
204                start_at,
205                end_at,
206                misfire_policy,
207                overlap_policy,
208                enabled,
209                run_config,
210            } => {
211                let name = name.trim().to_string();
212                if name.is_empty() {
213                    return Err(ToolError::InvalidArguments(
214                        "name must be a non-empty string".to_string(),
215                    ));
216                }
217                if matches!(
218                    trigger,
219                    ScheduleTrigger::Interval {
220                        every_seconds: 0,
221                        ..
222                    }
223                ) {
224                    return Err(ToolError::InvalidArguments(
225                        "trigger.every_seconds must be > 0".to_string(),
226                    ));
227                }
228                let run_config = run_config.unwrap_or_default();
229                if run_config.auto_execute {
230                    let has_task = run_config
231                        .task_message
232                        .as_deref()
233                        .map(str::trim)
234                        .filter(|v| !v.is_empty())
235                        .is_some();
236                    if !has_task {
237                        return Err(ToolError::InvalidArguments(
238                            "run_config.task_message is required when auto_execute is true"
239                                .to_string(),
240                        ));
241                    }
242                }
243
244                let created = self
245                    .schedule_store
246                    .create_schedule_with_definition(
247                        name,
248                        enabled.unwrap_or(false),
249                        run_config,
250                        crate::schedules::store::ScheduleDefinitionChanges {
251                            trigger: Some(trigger),
252                            timezone,
253                            start_at,
254                            end_at,
255                            misfire_policy,
256                            overlap_policy,
257                        },
258                    )
259                    .await
260                    .map_err(|e| ToolError::Execution(format!("Failed to create schedule: {e}")))?;
261                Ok(ToolResult {
262                    success: true,
263                    result: json!({
264                        "schedule": ScheduleView::from(created),
265                        "note": "If run_config.auto_execute is false, scheduled runs will only create sessions (they will not run the agent loop)."
266                    })
267                    .to_string(),
268                    display_preference: Some("Collapsible".to_string()),
269                })
270            }
271            ScheduleTasksArgs::Patch {
272                schedule_id,
273                name,
274                enabled,
275                trigger,
276                timezone,
277                start_at,
278                end_at,
279                misfire_policy,
280                overlap_policy,
281                run_config,
282            } => {
283                if schedule_id.trim().is_empty() {
284                    return Err(ToolError::InvalidArguments(
285                        "schedule_id must be a non-empty string".to_string(),
286                    ));
287                }
288                if matches!(
289                    trigger,
290                    Some(ScheduleTrigger::Interval {
291                        every_seconds: 0,
292                        ..
293                    })
294                ) {
295                    return Err(ToolError::InvalidArguments(
296                        "trigger.every_seconds must be > 0".to_string(),
297                    ));
298                }
299
300                if let Some(cfg) = run_config.as_ref() {
301                    if cfg.auto_execute {
302                        let has_task = cfg
303                            .task_message
304                            .as_deref()
305                            .map(str::trim)
306                            .filter(|v| !v.is_empty())
307                            .is_some();
308                        if !has_task {
309                            return Err(ToolError::InvalidArguments(
310                                "run_config.task_message is required when auto_execute is true"
311                                    .to_string(),
312                            ));
313                        }
314                    }
315                }
316
317                let updated = self
318                    .schedule_store
319                    .patch_schedule_with_definition(
320                        schedule_id.trim(),
321                        name.map(|v| v.trim().to_string()).filter(|v| !v.is_empty()),
322                        enabled,
323                        run_config,
324                        crate::schedules::store::ScheduleDefinitionChanges {
325                            trigger,
326                            timezone,
327                            start_at,
328                            end_at,
329                            misfire_policy,
330                            overlap_policy,
331                        },
332                    )
333                    .await
334                    .map_err(|e| ToolError::Execution(format!("Failed to patch schedule: {e}")))?;
335
336                let Some(schedule) = updated else {
337                    return Err(ToolError::Execution(format!(
338                        "Schedule not found: {}",
339                        schedule_id.trim()
340                    )));
341                };
342
343                Ok(ToolResult {
344                    success: true,
345                    result: json!({ "schedule": ScheduleView::from(schedule) }).to_string(),
346                    display_preference: Some("Collapsible".to_string()),
347                })
348            }
349            ScheduleTasksArgs::Delete { schedule_id } => {
350                if schedule_id.trim().is_empty() {
351                    return Err(ToolError::InvalidArguments(
352                        "schedule_id must be a non-empty string".to_string(),
353                    ));
354                }
355                let deleted = self
356                    .schedule_store
357                    .delete_schedule(schedule_id.trim())
358                    .await
359                    .map_err(|e| ToolError::Execution(format!("Failed to delete schedule: {e}")))?;
360                if !deleted {
361                    return Err(ToolError::Execution(format!(
362                        "Schedule not found: {}",
363                        schedule_id.trim()
364                    )));
365                }
366                Ok(ToolResult {
367                    success: true,
368                    result: json!({ "success": true, "schedule_id": schedule_id.trim() })
369                        .to_string(),
370                    display_preference: Some("Default".to_string()),
371                })
372            }
373            ScheduleTasksArgs::RunNow { schedule_id } => {
374                if schedule_id.trim().is_empty() {
375                    return Err(ToolError::InvalidArguments(
376                        "schedule_id must be a non-empty string".to_string(),
377                    ));
378                }
379                let Some(claimed) = self
380                    .schedule_store
381                    .create_run_now(schedule_id.trim())
382                    .await
383                    .map_err(|e| ToolError::Execution(format!("Failed to create run job: {e}")))?
384                else {
385                    return Err(ToolError::Execution(format!(
386                        "Schedule not found: {}",
387                        schedule_id.trim()
388                    )));
389                };
390
391                let enqueued_at = claimed.claimed_at;
392                self.schedule_manager
393                    .enqueue_run_now(ScheduleRunJob {
394                        run_id: claimed.run_id.clone(),
395                        schedule_id: claimed.schedule_id.clone(),
396                        schedule_name: claimed.schedule_name.clone(),
397                        run_config: claimed.run_config.clone(),
398                        scheduled_for: claimed.scheduled_for,
399                        claimed_at: claimed.claimed_at,
400                        was_catch_up: claimed.was_catch_up,
401                    })
402                    .await
403                    .map_err(|e| ToolError::Execution(format!("Failed to enqueue run: {e}")))?;
404
405                Ok(ToolResult {
406                    success: true,
407                    result: json!({
408                        "success": true,
409                        "schedule_id": claimed.schedule_id,
410                        "run_id": claimed.run_id,
411                        "enqueued_at": enqueued_at
412                    })
413                    .to_string(),
414                    display_preference: Some("Default".to_string()),
415                })
416            }
417            ScheduleTasksArgs::ListSessions { schedule_id } => {
418                if schedule_id.trim().is_empty() {
419                    return Err(ToolError::InvalidArguments(
420                        "schedule_id must be a non-empty string".to_string(),
421                    ));
422                }
423                let schedule_id = schedule_id.trim().to_string();
424                let sessions = self
425                    .session_store
426                    .list_index_entries()
427                    .await
428                    .into_iter()
429                    .filter(|e| e.created_by_schedule_id.as_deref() == Some(schedule_id.as_str()))
430                    .map(|e| crate::handlers::agent::sessions::SessionSummary::from_entry(e, false))
431                    .collect::<Vec<_>>();
432
433                Ok(ToolResult {
434                    success: true,
435                    result: json!({ "schedule_id": schedule_id, "sessions": sessions }).to_string(),
436                    display_preference: Some("Collapsible".to_string()),
437                })
438            }
439        }
440    }
441}