1use std::sync::Arc;
2
3use axum::{
4 extract::{Path, Query, State},
5 http::StatusCode,
6 routing::{delete, get, post, put},
7 Json, Router,
8};
9use serde::{Deserialize, Serialize};
10
11use crate::state::AppState;
12use aster::scheduler::ScheduledJob;
13
14#[derive(Deserialize, Serialize, utoipa::ToSchema)]
15pub struct CreateScheduleRequest {
16 id: String,
17 recipe_source: String,
18 cron: String,
19}
20
21#[derive(Deserialize, Serialize, utoipa::ToSchema)]
22pub struct UpdateScheduleRequest {
23 cron: String,
24}
25
26#[derive(Serialize, utoipa::ToSchema)]
27pub struct ListSchedulesResponse {
28 jobs: Vec<ScheduledJob>,
29}
30
31#[derive(Serialize, utoipa::ToSchema)]
33pub struct KillJobResponse {
34 message: String,
35}
36
37#[derive(Serialize, utoipa::ToSchema)]
38#[serde(rename_all = "camelCase")]
39pub struct InspectJobResponse {
40 session_id: Option<String>,
41 process_start_time: Option<String>,
42 running_duration_seconds: Option<i64>,
43}
44
45#[derive(Serialize, utoipa::ToSchema)]
47pub struct RunNowResponse {
48 session_id: String,
49}
50
51#[derive(Deserialize, utoipa::ToSchema, utoipa::IntoParams)]
52pub struct SessionsQuery {
53 limit: usize,
54}
55
56#[derive(Serialize, utoipa::ToSchema)]
58#[serde(rename_all = "camelCase")]
59pub struct SessionDisplayInfo {
60 id: String,
61 name: String,
62 created_at: String,
63 working_dir: String,
64 schedule_id: Option<String>,
65 message_count: usize,
66 total_tokens: Option<i32>,
67 input_tokens: Option<i32>,
68 output_tokens: Option<i32>,
69 accumulated_total_tokens: Option<i32>,
70 accumulated_input_tokens: Option<i32>,
71 accumulated_output_tokens: Option<i32>,
72}
73
74#[utoipa::path(
75 post,
76 path = "/schedule/create",
77 request_body = CreateScheduleRequest,
78 responses(
79 (status = 200, description = "Scheduled job created successfully", body = ScheduledJob),
80 (status = 400, description = "Invalid cron expression or recipe file"),
81 (status = 409, description = "Job ID already exists"),
82 (status = 500, description = "Internal server error")
83 ),
84 tag = "schedule"
85)]
86#[axum::debug_handler]
87async fn create_schedule(
88 State(state): State<Arc<AppState>>,
89 Json(req): Json<CreateScheduleRequest>,
90) -> Result<Json<ScheduledJob>, StatusCode> {
91 let scheduler = state.scheduler();
92
93 tracing::info!(
94 "Server: Calling scheduler.add_scheduled_job() for job '{}'",
95 req.id
96 );
97 let job = ScheduledJob {
98 id: req.id,
99 source: req.recipe_source,
100 cron: req.cron,
101 last_run: None,
102 currently_running: false,
103 paused: false,
104 current_session_id: None,
105 process_start_time: None,
106 };
107 scheduler
108 .add_scheduled_job(job.clone(), true)
109 .await
110 .map_err(|e| {
111 eprintln!("Error creating schedule: {:?}", e); match e {
113 aster::scheduler::SchedulerError::JobNotFound(_) => StatusCode::NOT_FOUND,
114 aster::scheduler::SchedulerError::CronParseError(_) => StatusCode::BAD_REQUEST,
115 aster::scheduler::SchedulerError::RecipeLoadError(_) => StatusCode::BAD_REQUEST,
116 aster::scheduler::SchedulerError::JobIdExists(_) => StatusCode::CONFLICT,
117 _ => StatusCode::INTERNAL_SERVER_ERROR,
118 }
119 })?;
120 Ok(Json(job))
121}
122
123#[utoipa::path(
124 get,
125 path = "/schedule/list",
126 responses(
127 (status = 200, description = "A list of scheduled jobs", body = ListSchedulesResponse),
128 (status = 500, description = "Internal server error")
129 ),
130 tag = "schedule"
131)]
132#[axum::debug_handler]
133async fn list_schedules(
134 State(state): State<Arc<AppState>>,
135) -> Result<Json<ListSchedulesResponse>, StatusCode> {
136 let scheduler = state.scheduler();
137
138 tracing::info!("Server: Calling scheduler.list_scheduled_jobs()");
139 let jobs = scheduler.list_scheduled_jobs().await;
140 Ok(Json(ListSchedulesResponse { jobs }))
141}
142
143#[utoipa::path(
144 delete,
145 path = "/schedule/delete/{id}",
146 params(
147 ("id" = String, Path, description = "ID of the schedule to delete")
148 ),
149 responses(
150 (status = 204, description = "Scheduled job deleted successfully"),
151 (status = 404, description = "Scheduled job not found"),
152 (status = 500, description = "Internal server error")
153 ),
154 tag = "schedule"
155)]
156#[axum::debug_handler]
157async fn delete_schedule(
158 State(state): State<Arc<AppState>>,
159 Path(id): Path<String>,
160) -> Result<StatusCode, StatusCode> {
161 let scheduler = state.scheduler();
162 scheduler
163 .remove_scheduled_job(&id, true)
164 .await
165 .map_err(|e| {
166 eprintln!("Error deleting schedule '{}': {:?}", id, e);
167 match e {
168 aster::scheduler::SchedulerError::JobNotFound(_) => StatusCode::NOT_FOUND,
169 _ => StatusCode::INTERNAL_SERVER_ERROR,
170 }
171 })?;
172 Ok(StatusCode::NO_CONTENT)
173}
174
175#[utoipa::path(
176 post,
177 path = "/schedule/{id}/run_now",
178 params(
179 ("id" = String, Path, description = "ID of the schedule to run")
180 ),
181 responses(
182 (status = 200, description = "Scheduled job triggered successfully, returns new session ID", body = RunNowResponse),
183 (status = 404, description = "Scheduled job not found"),
184 (status = 500, description = "Internal server error when trying to run the job")
185 ),
186 tag = "schedule"
187)]
188#[axum::debug_handler]
189async fn run_now_handler(
190 State(state): State<Arc<AppState>>,
191 Path(id): Path<String>,
192) -> Result<Json<RunNowResponse>, StatusCode> {
193 let scheduler = state.scheduler();
194
195 let (recipe_display_name, recipe_version_opt) = if let Some(job) = scheduler
196 .list_scheduled_jobs()
197 .await
198 .into_iter()
199 .find(|job| job.id == id)
200 {
201 let recipe_display_name = std::path::Path::new(&job.source)
202 .file_name()
203 .and_then(|name| name.to_str())
204 .map(|s| s.to_string())
205 .unwrap_or_else(|| id.clone());
206
207 let recipe_version_opt =
208 tokio::fs::read_to_string(&job.source)
209 .await
210 .ok()
211 .and_then(|content: String| {
212 aster::recipe::template_recipe::parse_recipe_content(
213 &content,
214 Some(
215 std::path::Path::new(&job.source)
216 .parent()
217 .unwrap_or_else(|| std::path::Path::new(""))
218 .to_string_lossy()
219 .to_string(),
220 ),
221 )
222 .ok()
223 .map(|(r, _)| r.version)
224 });
225
226 (recipe_display_name, recipe_version_opt)
227 } else {
228 (id.clone(), None)
229 };
230
231 let recipe_version_tag = recipe_version_opt.as_deref().unwrap_or("");
232 tracing::info!(
233 counter.aster.recipe_runs = 1,
234 recipe_name = %recipe_display_name,
235 recipe_version = %recipe_version_tag,
236 session_type = "schedule",
237 interface = "server",
238 "Recipe execution started"
239 );
240
241 tracing::info!("Server: Calling scheduler.run_now() for job '{}'", id);
242
243 match scheduler.run_now(&id).await {
244 Ok(session_id) => Ok(Json(RunNowResponse { session_id })),
245 Err(e) => {
246 eprintln!("Error running schedule '{}' now: {:?}", id, e);
247 match e {
248 aster::scheduler::SchedulerError::JobNotFound(_) => Err(StatusCode::NOT_FOUND),
249 aster::scheduler::SchedulerError::AnyhowError(ref err) => {
250 if err.to_string().contains("was successfully cancelled") {
252 Ok(Json(RunNowResponse {
254 session_id: "CANCELLED".to_string(),
255 }))
256 } else {
257 Err(StatusCode::INTERNAL_SERVER_ERROR)
258 }
259 }
260 _ => Err(StatusCode::INTERNAL_SERVER_ERROR),
261 }
262 }
263 }
264}
265
266#[utoipa::path(
267 get,
268 path = "/schedule/{id}/sessions",
269 params(
270 ("id" = String, Path, description = "ID of the schedule"),
271 SessionsQuery ),
273 responses(
274 (status = 200, description = "A list of session display info", body = Vec<SessionDisplayInfo>),
275 (status = 500, description = "Internal server error")
276 ),
277 tag = "schedule"
278)]
279#[axum::debug_handler]
280async fn sessions_handler(
281 State(state): State<Arc<AppState>>,
282 Path(schedule_id_param): Path<String>, Query(query_params): Query<SessionsQuery>,
284) -> Result<Json<Vec<SessionDisplayInfo>>, StatusCode> {
285 let scheduler = state.scheduler();
286
287 match scheduler
288 .sessions(&schedule_id_param, query_params.limit)
289 .await
290 {
291 Ok(session_tuples) => {
292 let mut display_infos = Vec::new();
293 for (session_name, session) in session_tuples {
294 display_infos.push(SessionDisplayInfo {
295 id: session_name.clone(),
296 name: session.name,
297 created_at: session.created_at.to_rfc3339(),
298 working_dir: session.working_dir.to_string_lossy().into_owned(),
299 schedule_id: session.schedule_id,
300 message_count: session.message_count,
301 total_tokens: session.total_tokens,
302 input_tokens: session.input_tokens,
303 output_tokens: session.output_tokens,
304 accumulated_total_tokens: session.accumulated_total_tokens,
305 accumulated_input_tokens: session.accumulated_input_tokens,
306 accumulated_output_tokens: session.accumulated_output_tokens,
307 });
308 }
309 Ok(Json(display_infos))
310 }
311 Err(e) => {
312 eprintln!(
313 "Error fetching sessions for schedule '{}': {:?}",
314 schedule_id_param, e
315 );
316 Err(StatusCode::INTERNAL_SERVER_ERROR)
317 }
318 }
319}
320
321#[utoipa::path(
322 post,
323 path = "/schedule/{id}/pause",
324 params(
325 ("id" = String, Path, description = "ID of the schedule to pause")
326 ),
327 responses(
328 (status = 204, description = "Scheduled job paused successfully"),
329 (status = 404, description = "Scheduled job not found"),
330 (status = 400, description = "Cannot pause a currently running job"),
331 (status = 500, description = "Internal server error")
332 ),
333 tag = "schedule"
334)]
335#[axum::debug_handler]
336async fn pause_schedule(
337 State(state): State<Arc<AppState>>,
338 Path(id): Path<String>,
339) -> Result<StatusCode, StatusCode> {
340 let scheduler = state.scheduler();
341
342 scheduler.pause_schedule(&id).await.map_err(|e| {
343 eprintln!("Error pausing schedule '{}': {:?}", id, e);
344 match e {
345 aster::scheduler::SchedulerError::JobNotFound(_) => StatusCode::NOT_FOUND,
346 aster::scheduler::SchedulerError::AnyhowError(_) => StatusCode::BAD_REQUEST,
347 _ => StatusCode::INTERNAL_SERVER_ERROR,
348 }
349 })?;
350 Ok(StatusCode::NO_CONTENT)
351}
352
353#[utoipa::path(
354 post,
355 path = "/schedule/{id}/unpause",
356 params(
357 ("id" = String, Path, description = "ID of the schedule to unpause")
358 ),
359 responses(
360 (status = 204, description = "Scheduled job unpaused successfully"),
361 (status = 404, description = "Scheduled job not found"),
362 (status = 500, description = "Internal server error")
363 ),
364 tag = "schedule"
365)]
366#[axum::debug_handler]
367async fn unpause_schedule(
368 State(state): State<Arc<AppState>>,
369 Path(id): Path<String>,
370) -> Result<StatusCode, StatusCode> {
371 let scheduler = state.scheduler();
372
373 scheduler.unpause_schedule(&id).await.map_err(|e| {
374 eprintln!("Error unpausing schedule '{}': {:?}", id, e);
375 match e {
376 aster::scheduler::SchedulerError::JobNotFound(_) => StatusCode::NOT_FOUND,
377 _ => StatusCode::INTERNAL_SERVER_ERROR,
378 }
379 })?;
380 Ok(StatusCode::NO_CONTENT)
381}
382
383#[utoipa::path(
384 put,
385 path = "/schedule/{id}",
386 params(
387 ("id" = String, Path, description = "ID of the schedule to update")
388 ),
389 request_body = UpdateScheduleRequest,
390 responses(
391 (status = 200, description = "Scheduled job updated successfully", body = ScheduledJob),
392 (status = 404, description = "Scheduled job not found"),
393 (status = 400, description = "Cannot update a currently running job or invalid request"),
394 (status = 500, description = "Internal server error")
395 ),
396 tag = "schedule"
397)]
398#[axum::debug_handler]
399async fn update_schedule(
400 State(state): State<Arc<AppState>>,
401 Path(id): Path<String>,
402 Json(req): Json<UpdateScheduleRequest>,
403) -> Result<Json<ScheduledJob>, StatusCode> {
404 let scheduler = state.scheduler();
405
406 scheduler
407 .update_schedule(&id, req.cron)
408 .await
409 .map_err(|e| {
410 eprintln!("Error updating schedule '{}': {:?}", id, e);
411 match e {
412 aster::scheduler::SchedulerError::JobNotFound(_) => StatusCode::NOT_FOUND,
413 aster::scheduler::SchedulerError::AnyhowError(_) => StatusCode::BAD_REQUEST,
414 aster::scheduler::SchedulerError::CronParseError(_) => StatusCode::BAD_REQUEST,
415 _ => StatusCode::INTERNAL_SERVER_ERROR,
416 }
417 })?;
418
419 let jobs = scheduler.list_scheduled_jobs().await;
420 let updated_job = jobs
421 .into_iter()
422 .find(|job| job.id == id)
423 .ok_or(StatusCode::INTERNAL_SERVER_ERROR)?;
424
425 Ok(Json(updated_job))
426}
427
428#[utoipa::path(
429 post,
430 path = "/schedule/{id}/kill",
431 responses(
432 (status = 200, description = "Running job killed successfully"),
433 ),
434 tag = "schedule"
435)]
436#[axum::debug_handler]
437pub async fn kill_running_job(
438 State(state): State<Arc<AppState>>,
439 Path(id): Path<String>,
440) -> Result<Json<KillJobResponse>, StatusCode> {
441 let scheduler = state.scheduler();
442
443 scheduler.kill_running_job(&id).await.map_err(|e| {
444 eprintln!("Error killing running job '{}': {:?}", id, e);
445 match e {
446 aster::scheduler::SchedulerError::JobNotFound(_) => StatusCode::NOT_FOUND,
447 aster::scheduler::SchedulerError::AnyhowError(_) => StatusCode::BAD_REQUEST,
448 _ => StatusCode::INTERNAL_SERVER_ERROR,
449 }
450 })?;
451
452 Ok(Json(KillJobResponse {
453 message: format!("Successfully killed running job '{}'", id),
454 }))
455}
456
457#[utoipa::path(
458 get,
459 path = "/schedule/{id}/inspect",
460 params(
461 ("id" = String, Path, description = "ID of the schedule to inspect")
462 ),
463 responses(
464 (status = 200, description = "Running job information", body = InspectJobResponse),
465 (status = 404, description = "Scheduled job not found"),
466 (status = 500, description = "Internal server error")
467 ),
468 tag = "schedule"
469)]
470#[axum::debug_handler]
471pub async fn inspect_running_job(
472 State(state): State<Arc<AppState>>,
473 Path(id): Path<String>,
474) -> Result<Json<InspectJobResponse>, StatusCode> {
475 let scheduler = state.scheduler();
476
477 match scheduler.get_running_job_info(&id).await {
478 Ok(info) => {
479 if let Some((session_id, start_time)) = info {
480 let duration = chrono::Utc::now().signed_duration_since(start_time);
481 Ok(Json(InspectJobResponse {
482 session_id: Some(session_id),
483 process_start_time: Some(start_time.to_rfc3339()),
484 running_duration_seconds: Some(duration.num_seconds()),
485 }))
486 } else {
487 Ok(Json(InspectJobResponse {
488 session_id: None,
489 process_start_time: None,
490 running_duration_seconds: None,
491 }))
492 }
493 }
494 Err(e) => {
495 eprintln!("Error inspecting running job '{}': {:?}", id, e);
496 match e {
497 aster::scheduler::SchedulerError::JobNotFound(_) => Err(StatusCode::NOT_FOUND),
498 _ => Err(StatusCode::INTERNAL_SERVER_ERROR),
499 }
500 }
501 }
502}
503
504pub fn routes(state: Arc<AppState>) -> Router {
505 Router::new()
506 .route("/schedule/create", post(create_schedule))
507 .route("/schedule/list", get(list_schedules))
508 .route("/schedule/delete/{id}", delete(delete_schedule)) .route("/schedule/{id}", put(update_schedule))
510 .route("/schedule/{id}/run_now", post(run_now_handler)) .route("/schedule/{id}/pause", post(pause_schedule))
512 .route("/schedule/{id}/unpause", post(unpause_schedule))
513 .route("/schedule/{id}/kill", post(kill_running_job))
514 .route("/schedule/{id}/inspect", get(inspect_running_job))
515 .route("/schedule/{id}/sessions", get(sessions_handler)) .with_state(state)
517}