1use std::sync::Arc;
7
8use crate::mcp_utils::ToolResult;
9use chrono::Utc;
10use rmcp::model::{Content, ErrorCode, ErrorData};
11
12use super::Agent;
13use crate::recipe::Recipe;
14use crate::scheduler_trait::SchedulerTrait;
15
16impl Agent {
17 pub async fn handle_schedule_management(
19 &self,
20 arguments: serde_json::Value,
21 _request_id: String,
22 ) -> ToolResult<Vec<Content>> {
23 let scheduler = match self.scheduler_service.lock().await.as_ref() {
24 Some(s) => s.clone(),
25 None => {
26 return Err(ErrorData::new(
27 ErrorCode::INTERNAL_ERROR,
28 "Scheduler not available. This tool only works in server mode.".to_string(),
29 None,
30 ))
31 }
32 };
33
34 let action = arguments
35 .get("action")
36 .and_then(|v| v.as_str())
37 .ok_or_else(|| {
38 ErrorData::new(
39 ErrorCode::INVALID_PARAMS,
40 "Missing 'action' parameter".to_string(),
41 None,
42 )
43 })?;
44
45 match action {
46 "list" => self.handle_list_jobs(scheduler).await,
47 "create" => self.handle_create_job(scheduler, arguments).await,
48 "run_now" => self.handle_run_now(scheduler, arguments).await,
49 "pause" => self.handle_pause_job(scheduler, arguments).await,
50 "unpause" => self.handle_unpause_job(scheduler, arguments).await,
51 "delete" => self.handle_delete_job(scheduler, arguments).await,
52 "kill" => self.handle_kill_job(scheduler, arguments).await,
53 "inspect" => self.handle_inspect_job(scheduler, arguments).await,
54 "sessions" => self.handle_list_sessions(scheduler, arguments).await,
55 "session_content" => self.handle_session_content(arguments).await,
56 _ => Err(ErrorData::new(
57 ErrorCode::INTERNAL_ERROR,
58 format!("Unknown action: {}", action),
59 None,
60 )),
61 }
62 }
63
64 async fn handle_list_jobs(
65 &self,
66 scheduler: Arc<dyn SchedulerTrait>,
67 ) -> ToolResult<Vec<Content>> {
68 let jobs = scheduler.list_scheduled_jobs().await;
69 let jobs_json = serde_json::to_string_pretty(&jobs).map_err(|e| {
70 ErrorData::new(
71 ErrorCode::INTERNAL_ERROR,
72 format!("Failed to serialize jobs: {}", e),
73 None,
74 )
75 })?;
76 Ok(vec![Content::text(format!(
77 "Scheduled Jobs:\n{}",
78 jobs_json
79 ))])
80 }
81
82 async fn handle_create_job(
83 &self,
84 scheduler: Arc<dyn SchedulerTrait>,
85 arguments: serde_json::Value,
86 ) -> ToolResult<Vec<Content>> {
87 let recipe_path = arguments
88 .get("recipe_path")
89 .and_then(|v| v.as_str())
90 .ok_or_else(|| {
91 ErrorData::new(
92 ErrorCode::INVALID_PARAMS,
93 "Missing 'recipe_path' parameter".to_string(),
94 None,
95 )
96 })?;
97
98 let cron_expression = arguments
99 .get("cron_expression")
100 .and_then(|v| v.as_str())
101 .ok_or_else(|| {
102 ErrorData::new(
103 ErrorCode::INVALID_PARAMS,
104 "Missing 'cron_expression' parameter".to_string(),
105 None,
106 )
107 })?;
108
109 let execution_mode = arguments
111 .get("execution_mode")
112 .and_then(|v| v.as_str())
113 .unwrap_or("background");
114
115 if !std::path::Path::new(recipe_path).exists() {
116 return Err(ErrorData::new(
117 ErrorCode::INTERNAL_ERROR,
118 format!("Recipe file not found: {}", recipe_path),
119 None,
120 ));
121 }
122
123 match std::fs::read_to_string(recipe_path) {
125 Ok(content) => {
126 if recipe_path.ends_with(".json") {
127 serde_json::from_str::<Recipe>(&content).map_err(|e| {
128 ErrorData::new(
129 ErrorCode::INTERNAL_ERROR,
130 format!("Invalid JSON recipe: {}", e),
131 None,
132 )
133 })?;
134 } else {
135 serde_yaml::from_str::<Recipe>(&content).map_err(|e| {
136 ErrorData::new(
137 ErrorCode::INTERNAL_ERROR,
138 format!("Invalid YAML recipe: {}", e),
139 None,
140 )
141 })?;
142 }
143 }
144 Err(e) => {
145 return Err(ErrorData::new(
146 ErrorCode::INTERNAL_ERROR,
147 format!("Cannot read recipe file: {}", e),
148 None,
149 ))
150 }
151 }
152
153 let job_id = format!("agent_created_{}", Utc::now().timestamp());
155
156 let job = crate::scheduler::ScheduledJob {
157 id: job_id.clone(),
158 source: recipe_path.to_string(),
159 cron: cron_expression.to_string(),
160 last_run: None,
161 currently_running: false,
162 paused: false,
163 current_session_id: None,
164 process_start_time: None,
165 };
166
167 match scheduler.add_scheduled_job(job, true).await {
168 Ok(()) => Ok(vec![Content::text(format!(
169 "Successfully created scheduled job '{}' for recipe '{}' with cron expression '{}' in {} mode",
170 job_id, recipe_path, cron_expression, execution_mode
171 ))]),
172 Err(e) => Err(ErrorData::new(
173 ErrorCode::INTERNAL_ERROR,
174 format!("Failed to create job: {}", e),
175 None,
176 )),
177 }
178 }
179
180 async fn handle_run_now(
182 &self,
183 scheduler: Arc<dyn SchedulerTrait>,
184 arguments: serde_json::Value,
185 ) -> ToolResult<Vec<Content>> {
186 let job_id = arguments
187 .get("job_id")
188 .and_then(|v| v.as_str())
189 .ok_or_else(|| {
190 ErrorData::new(
191 ErrorCode::INVALID_PARAMS,
192 "Missing 'job_id' parameter".to_string(),
193 None,
194 )
195 })?;
196
197 match scheduler.run_now(job_id).await {
198 Ok(session_id) => Ok(vec![Content::text(format!(
199 "Successfully started job '{}'. Session ID: {}",
200 job_id, session_id
201 ))]),
202 Err(e) => Err(ErrorData::new(
203 ErrorCode::INTERNAL_ERROR,
204 format!("Failed to run job: {}", e),
205 None,
206 )),
207 }
208 }
209
210 async fn handle_pause_job(
212 &self,
213 scheduler: Arc<dyn SchedulerTrait>,
214 arguments: serde_json::Value,
215 ) -> ToolResult<Vec<Content>> {
216 let job_id = arguments
217 .get("job_id")
218 .and_then(|v| v.as_str())
219 .ok_or_else(|| {
220 ErrorData::new(
221 ErrorCode::INTERNAL_ERROR,
222 "Missing 'job_id' parameter".to_string(),
223 None,
224 )
225 })?;
226
227 match scheduler.pause_schedule(job_id).await {
228 Ok(()) => Ok(vec![Content::text(format!(
229 "Successfully paused job '{}'",
230 job_id
231 ))]),
232 Err(e) => Err(ErrorData::new(
233 ErrorCode::INTERNAL_ERROR,
234 format!("Failed to pause job: {}", e),
235 None,
236 )),
237 }
238 }
239
240 async fn handle_unpause_job(
242 &self,
243 scheduler: Arc<dyn SchedulerTrait>,
244 arguments: serde_json::Value,
245 ) -> ToolResult<Vec<Content>> {
246 let job_id = arguments
247 .get("job_id")
248 .and_then(|v| v.as_str())
249 .ok_or_else(|| {
250 ErrorData::new(
251 ErrorCode::INTERNAL_ERROR,
252 "Missing 'job_id' parameter".to_string(),
253 None,
254 )
255 })?;
256
257 match scheduler.unpause_schedule(job_id).await {
258 Ok(()) => Ok(vec![Content::text(format!(
259 "Successfully unpaused job '{}'",
260 job_id
261 ))]),
262 Err(e) => Err(ErrorData::new(
263 ErrorCode::INTERNAL_ERROR,
264 format!("Failed to unpause job: {}", e),
265 None,
266 )),
267 }
268 }
269
270 async fn handle_delete_job(
272 &self,
273 scheduler: Arc<dyn SchedulerTrait>,
274 arguments: serde_json::Value,
275 ) -> ToolResult<Vec<Content>> {
276 let job_id = arguments
277 .get("job_id")
278 .and_then(|v| v.as_str())
279 .ok_or_else(|| {
280 ErrorData::new(
281 ErrorCode::INTERNAL_ERROR,
282 "Missing 'job_id' parameter".to_string(),
283 None,
284 )
285 })?;
286
287 match scheduler.remove_scheduled_job(job_id, true).await {
288 Ok(()) => Ok(vec![Content::text(format!(
289 "Successfully deleted job '{}'",
290 job_id
291 ))]),
292 Err(e) => Err(ErrorData::new(
293 ErrorCode::INTERNAL_ERROR,
294 format!("Failed to delete job: {}", e),
295 None,
296 )),
297 }
298 }
299
300 async fn handle_kill_job(
302 &self,
303 scheduler: Arc<dyn SchedulerTrait>,
304 arguments: serde_json::Value,
305 ) -> ToolResult<Vec<Content>> {
306 let job_id = arguments
307 .get("job_id")
308 .and_then(|v| v.as_str())
309 .ok_or_else(|| {
310 ErrorData::new(
311 ErrorCode::INTERNAL_ERROR,
312 "Missing 'job_id' parameter".to_string(),
313 None,
314 )
315 })?;
316
317 match scheduler.kill_running_job(job_id).await {
318 Ok(()) => Ok(vec![Content::text(format!(
319 "Successfully killed running job '{}'",
320 job_id
321 ))]),
322 Err(e) => Err(ErrorData::new(
323 ErrorCode::INTERNAL_ERROR,
324 format!("Failed to kill job: {}", e),
325 None,
326 )),
327 }
328 }
329
330 async fn handle_inspect_job(
332 &self,
333 scheduler: Arc<dyn SchedulerTrait>,
334 arguments: serde_json::Value,
335 ) -> ToolResult<Vec<Content>> {
336 let job_id = arguments
337 .get("job_id")
338 .and_then(|v| v.as_str())
339 .ok_or_else(|| {
340 ErrorData::new(
341 ErrorCode::INTERNAL_ERROR,
342 "Missing 'job_id' parameter".to_string(),
343 None,
344 )
345 })?;
346
347 match scheduler.get_running_job_info(job_id).await {
348 Ok(Some((session_id, start_time))) => {
349 let duration = Utc::now().signed_duration_since(start_time);
350 Ok(vec![Content::text(format!(
351 "Job '{}' is currently running:\n- Session ID: {}\n- Started: {}\n- Duration: {} seconds",
352 job_id, session_id, start_time.to_rfc3339(), duration.num_seconds()
353 ))])
354 }
355 Ok(None) => Ok(vec![Content::text(format!(
356 "Job '{}' is not currently running",
357 job_id
358 ))]),
359 Err(e) => Err(ErrorData::new(
360 ErrorCode::INTERNAL_ERROR,
361 format!("Failed to inspect job: {}", e),
362 None,
363 )),
364 }
365 }
366
367 async fn handle_list_sessions(
369 &self,
370 scheduler: Arc<dyn SchedulerTrait>,
371 arguments: serde_json::Value,
372 ) -> ToolResult<Vec<Content>> {
373 let job_id = arguments
374 .get("job_id")
375 .and_then(|v| v.as_str())
376 .ok_or_else(|| {
377 ErrorData::new(
378 ErrorCode::INVALID_PARAMS,
379 "Missing 'job_id' parameter".to_string(),
380 None,
381 )
382 })?;
383
384 let limit = arguments
385 .get("limit")
386 .and_then(|v| v.as_u64())
387 .unwrap_or(50) as usize;
388
389 match scheduler.sessions(job_id, limit).await {
390 Ok(sessions) => {
391 if sessions.is_empty() {
392 Ok(vec![Content::text(format!(
393 "No sessions found for job '{}'",
394 job_id
395 ))])
396 } else {
397 let sessions_info: Vec<String> = sessions
398 .into_iter()
399 .map(|(session_name, session)| {
400 format!(
401 "- Session: {} (Messages: {}, Working Dir: {})",
402 session_name,
403 session.conversation.unwrap_or_default().len(),
404 session.working_dir.display()
405 )
406 })
407 .collect();
408
409 Ok(vec![Content::text(format!(
410 "Sessions for job '{}':\n{}",
411 job_id,
412 sessions_info.join("\n")
413 ))])
414 }
415 }
416 Err(e) => Err(ErrorData::new(
417 ErrorCode::INTERNAL_ERROR,
418 format!("Failed to list sessions: {}", e),
419 None,
420 )),
421 }
422 }
423
424 async fn handle_session_content(
426 &self,
427 arguments: serde_json::Value,
428 ) -> ToolResult<Vec<Content>> {
429 let session_id = arguments
430 .get("session_id")
431 .and_then(|v| v.as_str())
432 .ok_or_else(|| {
433 ErrorData::new(
434 ErrorCode::INTERNAL_ERROR,
435 "Missing 'session_id' parameter".to_string(),
436 None,
437 )
438 })?;
439
440 let session = match crate::session::SessionManager::get_session(session_id, true).await {
441 Ok(metadata) => metadata,
442 Err(e) => {
443 return Err(ErrorData::new(
444 ErrorCode::INTERNAL_ERROR,
445 format!("Failed to read session for '{}': {}", session_id, e),
446 None,
447 ));
448 }
449 };
450
451 let metadata_json = match serde_json::to_string_pretty(&session) {
453 Ok(json) => json,
454 Err(e) => {
455 return Err(ErrorData::new(
456 ErrorCode::INTERNAL_ERROR,
457 format!("Failed to serialize metadata: {}", e),
458 None,
459 ));
460 }
461 };
462
463 Ok(vec![Content::text(format!(
464 "Session '{}' Content:\n\nSession:\n{}",
465 session_id, metadata_json
466 ))])
467 }
468}