Skip to main content

aster/agents/
schedule_tool.rs

1//! Schedule tool handlers for the aster agent
2//!
3//! This module contains all the handlers for the schedule management platform tool,
4//! including job creation, execution, monitoring, and session management.
5
6use std::sync::Arc;
7
8use crate::mcp_utils::ToolResult;
9use chrono::Utc;
10use rmcp::model::{Content, ErrorCode, ErrorData};
11
12use super::Agent;
13use crate::recipe::Recipe;
14use crate::scheduler_trait::SchedulerTrait;
15
16impl Agent {
17    /// Handle schedule management tool calls
18    pub async fn handle_schedule_management(
19        &self,
20        arguments: serde_json::Value,
21        _request_id: String,
22    ) -> ToolResult<Vec<Content>> {
23        let scheduler = match self.scheduler_service.lock().await.as_ref() {
24            Some(s) => s.clone(),
25            None => {
26                return Err(ErrorData::new(
27                    ErrorCode::INTERNAL_ERROR,
28                    "Scheduler not available. This tool only works in server mode.".to_string(),
29                    None,
30                ))
31            }
32        };
33
34        let action = arguments
35            .get("action")
36            .and_then(|v| v.as_str())
37            .ok_or_else(|| {
38                ErrorData::new(
39                    ErrorCode::INVALID_PARAMS,
40                    "Missing 'action' parameter".to_string(),
41                    None,
42                )
43            })?;
44
45        match action {
46            "list" => self.handle_list_jobs(scheduler).await,
47            "create" => self.handle_create_job(scheduler, arguments).await,
48            "run_now" => self.handle_run_now(scheduler, arguments).await,
49            "pause" => self.handle_pause_job(scheduler, arguments).await,
50            "unpause" => self.handle_unpause_job(scheduler, arguments).await,
51            "delete" => self.handle_delete_job(scheduler, arguments).await,
52            "kill" => self.handle_kill_job(scheduler, arguments).await,
53            "inspect" => self.handle_inspect_job(scheduler, arguments).await,
54            "sessions" => self.handle_list_sessions(scheduler, arguments).await,
55            "session_content" => self.handle_session_content(arguments).await,
56            _ => Err(ErrorData::new(
57                ErrorCode::INTERNAL_ERROR,
58                format!("Unknown action: {}", action),
59                None,
60            )),
61        }
62    }
63
64    async fn handle_list_jobs(
65        &self,
66        scheduler: Arc<dyn SchedulerTrait>,
67    ) -> ToolResult<Vec<Content>> {
68        let jobs = scheduler.list_scheduled_jobs().await;
69        let jobs_json = serde_json::to_string_pretty(&jobs).map_err(|e| {
70            ErrorData::new(
71                ErrorCode::INTERNAL_ERROR,
72                format!("Failed to serialize jobs: {}", e),
73                None,
74            )
75        })?;
76        Ok(vec![Content::text(format!(
77            "Scheduled Jobs:\n{}",
78            jobs_json
79        ))])
80    }
81
82    async fn handle_create_job(
83        &self,
84        scheduler: Arc<dyn SchedulerTrait>,
85        arguments: serde_json::Value,
86    ) -> ToolResult<Vec<Content>> {
87        let recipe_path = arguments
88            .get("recipe_path")
89            .and_then(|v| v.as_str())
90            .ok_or_else(|| {
91                ErrorData::new(
92                    ErrorCode::INVALID_PARAMS,
93                    "Missing 'recipe_path' parameter".to_string(),
94                    None,
95                )
96            })?;
97
98        let cron_expression = arguments
99            .get("cron_expression")
100            .and_then(|v| v.as_str())
101            .ok_or_else(|| {
102                ErrorData::new(
103                    ErrorCode::INVALID_PARAMS,
104                    "Missing 'cron_expression' parameter".to_string(),
105                    None,
106                )
107            })?;
108
109        // Get the execution_mode parameter, defaulting to "background" if not provided
110        let execution_mode = arguments
111            .get("execution_mode")
112            .and_then(|v| v.as_str())
113            .unwrap_or("background");
114
115        if !std::path::Path::new(recipe_path).exists() {
116            return Err(ErrorData::new(
117                ErrorCode::INTERNAL_ERROR,
118                format!("Recipe file not found: {}", recipe_path),
119                None,
120            ));
121        }
122
123        // Validate it's a valid recipe by trying to parse it
124        match std::fs::read_to_string(recipe_path) {
125            Ok(content) => {
126                if recipe_path.ends_with(".json") {
127                    serde_json::from_str::<Recipe>(&content).map_err(|e| {
128                        ErrorData::new(
129                            ErrorCode::INTERNAL_ERROR,
130                            format!("Invalid JSON recipe: {}", e),
131                            None,
132                        )
133                    })?;
134                } else {
135                    serde_yaml::from_str::<Recipe>(&content).map_err(|e| {
136                        ErrorData::new(
137                            ErrorCode::INTERNAL_ERROR,
138                            format!("Invalid YAML recipe: {}", e),
139                            None,
140                        )
141                    })?;
142                }
143            }
144            Err(e) => {
145                return Err(ErrorData::new(
146                    ErrorCode::INTERNAL_ERROR,
147                    format!("Cannot read recipe file: {}", e),
148                    None,
149                ))
150            }
151        }
152
153        // Generate unique job ID
154        let job_id = format!("agent_created_{}", Utc::now().timestamp());
155
156        let job = crate::scheduler::ScheduledJob {
157            id: job_id.clone(),
158            source: recipe_path.to_string(),
159            cron: cron_expression.to_string(),
160            last_run: None,
161            currently_running: false,
162            paused: false,
163            current_session_id: None,
164            process_start_time: None,
165        };
166
167        match scheduler.add_scheduled_job(job, true).await {
168            Ok(()) => Ok(vec![Content::text(format!(
169                "Successfully created scheduled job '{}' for recipe '{}' with cron expression '{}' in {} mode",
170                job_id, recipe_path, cron_expression, execution_mode
171            ))]),
172            Err(e) => Err(ErrorData::new(
173                ErrorCode::INTERNAL_ERROR,
174                format!("Failed to create job: {}", e),
175                None,
176            )),
177        }
178    }
179
180    /// Run a scheduled job immediately
181    async fn handle_run_now(
182        &self,
183        scheduler: Arc<dyn SchedulerTrait>,
184        arguments: serde_json::Value,
185    ) -> ToolResult<Vec<Content>> {
186        let job_id = arguments
187            .get("job_id")
188            .and_then(|v| v.as_str())
189            .ok_or_else(|| {
190                ErrorData::new(
191                    ErrorCode::INVALID_PARAMS,
192                    "Missing 'job_id' parameter".to_string(),
193                    None,
194                )
195            })?;
196
197        match scheduler.run_now(job_id).await {
198            Ok(session_id) => Ok(vec![Content::text(format!(
199                "Successfully started job '{}'. Session ID: {}",
200                job_id, session_id
201            ))]),
202            Err(e) => Err(ErrorData::new(
203                ErrorCode::INTERNAL_ERROR,
204                format!("Failed to run job: {}", e),
205                None,
206            )),
207        }
208    }
209
210    /// Pause a scheduled job
211    async fn handle_pause_job(
212        &self,
213        scheduler: Arc<dyn SchedulerTrait>,
214        arguments: serde_json::Value,
215    ) -> ToolResult<Vec<Content>> {
216        let job_id = arguments
217            .get("job_id")
218            .and_then(|v| v.as_str())
219            .ok_or_else(|| {
220                ErrorData::new(
221                    ErrorCode::INTERNAL_ERROR,
222                    "Missing 'job_id' parameter".to_string(),
223                    None,
224                )
225            })?;
226
227        match scheduler.pause_schedule(job_id).await {
228            Ok(()) => Ok(vec![Content::text(format!(
229                "Successfully paused job '{}'",
230                job_id
231            ))]),
232            Err(e) => Err(ErrorData::new(
233                ErrorCode::INTERNAL_ERROR,
234                format!("Failed to pause job: {}", e),
235                None,
236            )),
237        }
238    }
239
240    /// Resume a paused scheduled job
241    async fn handle_unpause_job(
242        &self,
243        scheduler: Arc<dyn SchedulerTrait>,
244        arguments: serde_json::Value,
245    ) -> ToolResult<Vec<Content>> {
246        let job_id = arguments
247            .get("job_id")
248            .and_then(|v| v.as_str())
249            .ok_or_else(|| {
250                ErrorData::new(
251                    ErrorCode::INTERNAL_ERROR,
252                    "Missing 'job_id' parameter".to_string(),
253                    None,
254                )
255            })?;
256
257        match scheduler.unpause_schedule(job_id).await {
258            Ok(()) => Ok(vec![Content::text(format!(
259                "Successfully unpaused job '{}'",
260                job_id
261            ))]),
262            Err(e) => Err(ErrorData::new(
263                ErrorCode::INTERNAL_ERROR,
264                format!("Failed to unpause job: {}", e),
265                None,
266            )),
267        }
268    }
269
270    /// Delete a scheduled job
271    async fn handle_delete_job(
272        &self,
273        scheduler: Arc<dyn SchedulerTrait>,
274        arguments: serde_json::Value,
275    ) -> ToolResult<Vec<Content>> {
276        let job_id = arguments
277            .get("job_id")
278            .and_then(|v| v.as_str())
279            .ok_or_else(|| {
280                ErrorData::new(
281                    ErrorCode::INTERNAL_ERROR,
282                    "Missing 'job_id' parameter".to_string(),
283                    None,
284                )
285            })?;
286
287        match scheduler.remove_scheduled_job(job_id, true).await {
288            Ok(()) => Ok(vec![Content::text(format!(
289                "Successfully deleted job '{}'",
290                job_id
291            ))]),
292            Err(e) => Err(ErrorData::new(
293                ErrorCode::INTERNAL_ERROR,
294                format!("Failed to delete job: {}", e),
295                None,
296            )),
297        }
298    }
299
300    /// Terminate a currently running job
301    async fn handle_kill_job(
302        &self,
303        scheduler: Arc<dyn SchedulerTrait>,
304        arguments: serde_json::Value,
305    ) -> ToolResult<Vec<Content>> {
306        let job_id = arguments
307            .get("job_id")
308            .and_then(|v| v.as_str())
309            .ok_or_else(|| {
310                ErrorData::new(
311                    ErrorCode::INTERNAL_ERROR,
312                    "Missing 'job_id' parameter".to_string(),
313                    None,
314                )
315            })?;
316
317        match scheduler.kill_running_job(job_id).await {
318            Ok(()) => Ok(vec![Content::text(format!(
319                "Successfully killed running job '{}'",
320                job_id
321            ))]),
322            Err(e) => Err(ErrorData::new(
323                ErrorCode::INTERNAL_ERROR,
324                format!("Failed to kill job: {}", e),
325                None,
326            )),
327        }
328    }
329
330    /// Get information about a running job
331    async fn handle_inspect_job(
332        &self,
333        scheduler: Arc<dyn SchedulerTrait>,
334        arguments: serde_json::Value,
335    ) -> ToolResult<Vec<Content>> {
336        let job_id = arguments
337            .get("job_id")
338            .and_then(|v| v.as_str())
339            .ok_or_else(|| {
340                ErrorData::new(
341                    ErrorCode::INTERNAL_ERROR,
342                    "Missing 'job_id' parameter".to_string(),
343                    None,
344                )
345            })?;
346
347        match scheduler.get_running_job_info(job_id).await {
348            Ok(Some((session_id, start_time))) => {
349                let duration = Utc::now().signed_duration_since(start_time);
350                Ok(vec![Content::text(format!(
351                    "Job '{}' is currently running:\n- Session ID: {}\n- Started: {}\n- Duration: {} seconds",
352                    job_id, session_id, start_time.to_rfc3339(), duration.num_seconds()
353                ))])
354            }
355            Ok(None) => Ok(vec![Content::text(format!(
356                "Job '{}' is not currently running",
357                job_id
358            ))]),
359            Err(e) => Err(ErrorData::new(
360                ErrorCode::INTERNAL_ERROR,
361                format!("Failed to inspect job: {}", e),
362                None,
363            )),
364        }
365    }
366
367    /// List execution sessions for a job
368    async fn handle_list_sessions(
369        &self,
370        scheduler: Arc<dyn SchedulerTrait>,
371        arguments: serde_json::Value,
372    ) -> ToolResult<Vec<Content>> {
373        let job_id = arguments
374            .get("job_id")
375            .and_then(|v| v.as_str())
376            .ok_or_else(|| {
377                ErrorData::new(
378                    ErrorCode::INVALID_PARAMS,
379                    "Missing 'job_id' parameter".to_string(),
380                    None,
381                )
382            })?;
383
384        let limit = arguments
385            .get("limit")
386            .and_then(|v| v.as_u64())
387            .unwrap_or(50) as usize;
388
389        match scheduler.sessions(job_id, limit).await {
390            Ok(sessions) => {
391                if sessions.is_empty() {
392                    Ok(vec![Content::text(format!(
393                        "No sessions found for job '{}'",
394                        job_id
395                    ))])
396                } else {
397                    let sessions_info: Vec<String> = sessions
398                        .into_iter()
399                        .map(|(session_name, session)| {
400                            format!(
401                                "- Session: {} (Messages: {}, Working Dir: {})",
402                                session_name,
403                                session.conversation.unwrap_or_default().len(),
404                                session.working_dir.display()
405                            )
406                        })
407                        .collect();
408
409                    Ok(vec![Content::text(format!(
410                        "Sessions for job '{}':\n{}",
411                        job_id,
412                        sessions_info.join("\n")
413                    ))])
414                }
415            }
416            Err(e) => Err(ErrorData::new(
417                ErrorCode::INTERNAL_ERROR,
418                format!("Failed to list sessions: {}", e),
419                None,
420            )),
421        }
422    }
423
424    /// Get the full content (metadata and messages) of a specific session
425    async fn handle_session_content(
426        &self,
427        arguments: serde_json::Value,
428    ) -> ToolResult<Vec<Content>> {
429        let session_id = arguments
430            .get("session_id")
431            .and_then(|v| v.as_str())
432            .ok_or_else(|| {
433                ErrorData::new(
434                    ErrorCode::INTERNAL_ERROR,
435                    "Missing 'session_id' parameter".to_string(),
436                    None,
437                )
438            })?;
439
440        let session = match crate::session::SessionManager::get_session(session_id, true).await {
441            Ok(metadata) => metadata,
442            Err(e) => {
443                return Err(ErrorData::new(
444                    ErrorCode::INTERNAL_ERROR,
445                    format!("Failed to read session for '{}': {}", session_id, e),
446                    None,
447                ));
448            }
449        };
450
451        // Format the response with metadata and messages
452        let metadata_json = match serde_json::to_string_pretty(&session) {
453            Ok(json) => json,
454            Err(e) => {
455                return Err(ErrorData::new(
456                    ErrorCode::INTERNAL_ERROR,
457                    format!("Failed to serialize metadata: {}", e),
458                    None,
459                ));
460            }
461        };
462
463        Ok(vec![Content::text(format!(
464            "Session '{}' Content:\n\nSession:\n{}",
465            session_id, metadata_json
466        ))])
467    }
468}