Skip to main content

aster_server/routes/
schedule.rs

1use std::sync::Arc;
2
3use axum::{
4    extract::{Path, Query, State},
5    http::StatusCode,
6    routing::{delete, get, post, put},
7    Json, Router,
8};
9use serde::{Deserialize, Serialize};
10
11use crate::state::AppState;
12use aster::scheduler::ScheduledJob;
13
14#[derive(Deserialize, Serialize, utoipa::ToSchema)]
15pub struct CreateScheduleRequest {
16    id: String,
17    recipe_source: String,
18    cron: String,
19}
20
21#[derive(Deserialize, Serialize, utoipa::ToSchema)]
22pub struct UpdateScheduleRequest {
23    cron: String,
24}
25
26#[derive(Serialize, utoipa::ToSchema)]
27pub struct ListSchedulesResponse {
28    jobs: Vec<ScheduledJob>,
29}
30
31// Response for the kill endpoint
32#[derive(Serialize, utoipa::ToSchema)]
33pub struct KillJobResponse {
34    message: String,
35}
36
37#[derive(Serialize, utoipa::ToSchema)]
38#[serde(rename_all = "camelCase")]
39pub struct InspectJobResponse {
40    session_id: Option<String>,
41    process_start_time: Option<String>,
42    running_duration_seconds: Option<i64>,
43}
44
45// Response for the run_now endpoint
46#[derive(Serialize, utoipa::ToSchema)]
47pub struct RunNowResponse {
48    session_id: String,
49}
50
51#[derive(Deserialize, utoipa::ToSchema, utoipa::IntoParams)]
52pub struct SessionsQuery {
53    limit: usize,
54}
55
56// Struct for the frontend session list
57#[derive(Serialize, utoipa::ToSchema)]
58#[serde(rename_all = "camelCase")]
59pub struct SessionDisplayInfo {
60    id: String,
61    name: String,
62    created_at: String,
63    working_dir: String,
64    schedule_id: Option<String>,
65    message_count: usize,
66    total_tokens: Option<i32>,
67    input_tokens: Option<i32>,
68    output_tokens: Option<i32>,
69    accumulated_total_tokens: Option<i32>,
70    accumulated_input_tokens: Option<i32>,
71    accumulated_output_tokens: Option<i32>,
72}
73
74#[utoipa::path(
75    post,
76    path = "/schedule/create",
77    request_body = CreateScheduleRequest,
78    responses(
79        (status = 200, description = "Scheduled job created successfully", body = ScheduledJob),
80        (status = 400, description = "Invalid cron expression or recipe file"),
81        (status = 409, description = "Job ID already exists"),
82        (status = 500, description = "Internal server error")
83    ),
84    tag = "schedule"
85)]
86#[axum::debug_handler]
87async fn create_schedule(
88    State(state): State<Arc<AppState>>,
89    Json(req): Json<CreateScheduleRequest>,
90) -> Result<Json<ScheduledJob>, StatusCode> {
91    let scheduler = state.scheduler();
92
93    tracing::info!(
94        "Server: Calling scheduler.add_scheduled_job() for job '{}'",
95        req.id
96    );
97    let job = ScheduledJob {
98        id: req.id,
99        source: req.recipe_source,
100        cron: req.cron,
101        last_run: None,
102        currently_running: false,
103        paused: false,
104        current_session_id: None,
105        process_start_time: None,
106    };
107    scheduler
108        .add_scheduled_job(job.clone(), true)
109        .await
110        .map_err(|e| {
111            eprintln!("Error creating schedule: {:?}", e); // Log error
112            match e {
113                aster::scheduler::SchedulerError::JobNotFound(_) => StatusCode::NOT_FOUND,
114                aster::scheduler::SchedulerError::CronParseError(_) => StatusCode::BAD_REQUEST,
115                aster::scheduler::SchedulerError::RecipeLoadError(_) => StatusCode::BAD_REQUEST,
116                aster::scheduler::SchedulerError::JobIdExists(_) => StatusCode::CONFLICT,
117                _ => StatusCode::INTERNAL_SERVER_ERROR,
118            }
119        })?;
120    Ok(Json(job))
121}
122
123#[utoipa::path(
124    get,
125    path = "/schedule/list",
126    responses(
127        (status = 200, description = "A list of scheduled jobs", body = ListSchedulesResponse),
128        (status = 500, description = "Internal server error")
129    ),
130    tag = "schedule"
131)]
132#[axum::debug_handler]
133async fn list_schedules(
134    State(state): State<Arc<AppState>>,
135) -> Result<Json<ListSchedulesResponse>, StatusCode> {
136    let scheduler = state.scheduler();
137
138    tracing::info!("Server: Calling scheduler.list_scheduled_jobs()");
139    let jobs = scheduler.list_scheduled_jobs().await;
140    Ok(Json(ListSchedulesResponse { jobs }))
141}
142
143#[utoipa::path(
144    delete,
145    path = "/schedule/delete/{id}",
146    params(
147        ("id" = String, Path, description = "ID of the schedule to delete")
148    ),
149    responses(
150        (status = 204, description = "Scheduled job deleted successfully"),
151        (status = 404, description = "Scheduled job not found"),
152        (status = 500, description = "Internal server error")
153    ),
154    tag = "schedule"
155)]
156#[axum::debug_handler]
157async fn delete_schedule(
158    State(state): State<Arc<AppState>>,
159    Path(id): Path<String>,
160) -> Result<StatusCode, StatusCode> {
161    let scheduler = state.scheduler();
162    scheduler
163        .remove_scheduled_job(&id, true)
164        .await
165        .map_err(|e| {
166            eprintln!("Error deleting schedule '{}': {:?}", id, e);
167            match e {
168                aster::scheduler::SchedulerError::JobNotFound(_) => StatusCode::NOT_FOUND,
169                _ => StatusCode::INTERNAL_SERVER_ERROR,
170            }
171        })?;
172    Ok(StatusCode::NO_CONTENT)
173}
174
175#[utoipa::path(
176    post,
177    path = "/schedule/{id}/run_now",
178    params(
179        ("id" = String, Path, description = "ID of the schedule to run")
180    ),
181    responses(
182        (status = 200, description = "Scheduled job triggered successfully, returns new session ID", body = RunNowResponse),
183        (status = 404, description = "Scheduled job not found"),
184        (status = 500, description = "Internal server error when trying to run the job")
185    ),
186    tag = "schedule"
187)]
188#[axum::debug_handler]
189async fn run_now_handler(
190    State(state): State<Arc<AppState>>,
191    Path(id): Path<String>,
192) -> Result<Json<RunNowResponse>, StatusCode> {
193    let scheduler = state.scheduler();
194
195    let (recipe_display_name, recipe_version_opt) = if let Some(job) = scheduler
196        .list_scheduled_jobs()
197        .await
198        .into_iter()
199        .find(|job| job.id == id)
200    {
201        let recipe_display_name = std::path::Path::new(&job.source)
202            .file_name()
203            .and_then(|name| name.to_str())
204            .map(|s| s.to_string())
205            .unwrap_or_else(|| id.clone());
206
207        let recipe_version_opt =
208            tokio::fs::read_to_string(&job.source)
209                .await
210                .ok()
211                .and_then(|content: String| {
212                    aster::recipe::template_recipe::parse_recipe_content(
213                        &content,
214                        Some(
215                            std::path::Path::new(&job.source)
216                                .parent()
217                                .unwrap_or_else(|| std::path::Path::new(""))
218                                .to_string_lossy()
219                                .to_string(),
220                        ),
221                    )
222                    .ok()
223                    .map(|(r, _)| r.version)
224                });
225
226        (recipe_display_name, recipe_version_opt)
227    } else {
228        (id.clone(), None)
229    };
230
231    let recipe_version_tag = recipe_version_opt.as_deref().unwrap_or("");
232    tracing::info!(
233        counter.aster.recipe_runs = 1,
234        recipe_name = %recipe_display_name,
235        recipe_version = %recipe_version_tag,
236        session_type = "schedule",
237        interface = "server",
238        "Recipe execution started"
239    );
240
241    tracing::info!("Server: Calling scheduler.run_now() for job '{}'", id);
242
243    match scheduler.run_now(&id).await {
244        Ok(session_id) => Ok(Json(RunNowResponse { session_id })),
245        Err(e) => {
246            eprintln!("Error running schedule '{}' now: {:?}", id, e);
247            match e {
248                aster::scheduler::SchedulerError::JobNotFound(_) => Err(StatusCode::NOT_FOUND),
249                aster::scheduler::SchedulerError::AnyhowError(ref err) => {
250                    // Check if this is a cancellation error
251                    if err.to_string().contains("was successfully cancelled") {
252                        // Return a special session_id to indicate cancellation
253                        Ok(Json(RunNowResponse {
254                            session_id: "CANCELLED".to_string(),
255                        }))
256                    } else {
257                        Err(StatusCode::INTERNAL_SERVER_ERROR)
258                    }
259                }
260                _ => Err(StatusCode::INTERNAL_SERVER_ERROR),
261            }
262        }
263    }
264}
265
266#[utoipa::path(
267    get,
268    path = "/schedule/{id}/sessions",
269    params(
270        ("id" = String, Path, description = "ID of the schedule"),
271        SessionsQuery // This will automatically pick up 'limit' as a query parameter
272    ),
273    responses(
274        (status = 200, description = "A list of session display info", body = Vec<SessionDisplayInfo>),
275        (status = 500, description = "Internal server error")
276    ),
277    tag = "schedule"
278)]
279#[axum::debug_handler]
280async fn sessions_handler(
281    State(state): State<Arc<AppState>>,
282    Path(schedule_id_param): Path<String>, // Renamed to avoid confusion with session_id
283    Query(query_params): Query<SessionsQuery>,
284) -> Result<Json<Vec<SessionDisplayInfo>>, StatusCode> {
285    let scheduler = state.scheduler();
286
287    match scheduler
288        .sessions(&schedule_id_param, query_params.limit)
289        .await
290    {
291        Ok(session_tuples) => {
292            let mut display_infos = Vec::new();
293            for (session_name, session) in session_tuples {
294                display_infos.push(SessionDisplayInfo {
295                    id: session_name.clone(),
296                    name: session.name,
297                    created_at: session.created_at.to_rfc3339(),
298                    working_dir: session.working_dir.to_string_lossy().into_owned(),
299                    schedule_id: session.schedule_id,
300                    message_count: session.message_count,
301                    total_tokens: session.total_tokens,
302                    input_tokens: session.input_tokens,
303                    output_tokens: session.output_tokens,
304                    accumulated_total_tokens: session.accumulated_total_tokens,
305                    accumulated_input_tokens: session.accumulated_input_tokens,
306                    accumulated_output_tokens: session.accumulated_output_tokens,
307                });
308            }
309            Ok(Json(display_infos))
310        }
311        Err(e) => {
312            eprintln!(
313                "Error fetching sessions for schedule '{}': {:?}",
314                schedule_id_param, e
315            );
316            Err(StatusCode::INTERNAL_SERVER_ERROR)
317        }
318    }
319}
320
321#[utoipa::path(
322    post,
323    path = "/schedule/{id}/pause",
324    params(
325        ("id" = String, Path, description = "ID of the schedule to pause")
326    ),
327    responses(
328        (status = 204, description = "Scheduled job paused successfully"),
329        (status = 404, description = "Scheduled job not found"),
330        (status = 400, description = "Cannot pause a currently running job"),
331        (status = 500, description = "Internal server error")
332    ),
333    tag = "schedule"
334)]
335#[axum::debug_handler]
336async fn pause_schedule(
337    State(state): State<Arc<AppState>>,
338    Path(id): Path<String>,
339) -> Result<StatusCode, StatusCode> {
340    let scheduler = state.scheduler();
341
342    scheduler.pause_schedule(&id).await.map_err(|e| {
343        eprintln!("Error pausing schedule '{}': {:?}", id, e);
344        match e {
345            aster::scheduler::SchedulerError::JobNotFound(_) => StatusCode::NOT_FOUND,
346            aster::scheduler::SchedulerError::AnyhowError(_) => StatusCode::BAD_REQUEST,
347            _ => StatusCode::INTERNAL_SERVER_ERROR,
348        }
349    })?;
350    Ok(StatusCode::NO_CONTENT)
351}
352
353#[utoipa::path(
354    post,
355    path = "/schedule/{id}/unpause",
356    params(
357        ("id" = String, Path, description = "ID of the schedule to unpause")
358    ),
359    responses(
360        (status = 204, description = "Scheduled job unpaused successfully"),
361        (status = 404, description = "Scheduled job not found"),
362        (status = 500, description = "Internal server error")
363    ),
364    tag = "schedule"
365)]
366#[axum::debug_handler]
367async fn unpause_schedule(
368    State(state): State<Arc<AppState>>,
369    Path(id): Path<String>,
370) -> Result<StatusCode, StatusCode> {
371    let scheduler = state.scheduler();
372
373    scheduler.unpause_schedule(&id).await.map_err(|e| {
374        eprintln!("Error unpausing schedule '{}': {:?}", id, e);
375        match e {
376            aster::scheduler::SchedulerError::JobNotFound(_) => StatusCode::NOT_FOUND,
377            _ => StatusCode::INTERNAL_SERVER_ERROR,
378        }
379    })?;
380    Ok(StatusCode::NO_CONTENT)
381}
382
383#[utoipa::path(
384    put,
385    path = "/schedule/{id}",
386    params(
387        ("id" = String, Path, description = "ID of the schedule to update")
388    ),
389    request_body = UpdateScheduleRequest,
390    responses(
391        (status = 200, description = "Scheduled job updated successfully", body = ScheduledJob),
392        (status = 404, description = "Scheduled job not found"),
393        (status = 400, description = "Cannot update a currently running job or invalid request"),
394        (status = 500, description = "Internal server error")
395    ),
396    tag = "schedule"
397)]
398#[axum::debug_handler]
399async fn update_schedule(
400    State(state): State<Arc<AppState>>,
401    Path(id): Path<String>,
402    Json(req): Json<UpdateScheduleRequest>,
403) -> Result<Json<ScheduledJob>, StatusCode> {
404    let scheduler = state.scheduler();
405
406    scheduler
407        .update_schedule(&id, req.cron)
408        .await
409        .map_err(|e| {
410            eprintln!("Error updating schedule '{}': {:?}", id, e);
411            match e {
412                aster::scheduler::SchedulerError::JobNotFound(_) => StatusCode::NOT_FOUND,
413                aster::scheduler::SchedulerError::AnyhowError(_) => StatusCode::BAD_REQUEST,
414                aster::scheduler::SchedulerError::CronParseError(_) => StatusCode::BAD_REQUEST,
415                _ => StatusCode::INTERNAL_SERVER_ERROR,
416            }
417        })?;
418
419    let jobs = scheduler.list_scheduled_jobs().await;
420    let updated_job = jobs
421        .into_iter()
422        .find(|job| job.id == id)
423        .ok_or(StatusCode::INTERNAL_SERVER_ERROR)?;
424
425    Ok(Json(updated_job))
426}
427
428#[utoipa::path(
429    post,
430    path = "/schedule/{id}/kill",
431    responses(
432        (status = 200, description = "Running job killed successfully"),
433    ),
434    tag = "schedule"
435)]
436#[axum::debug_handler]
437pub async fn kill_running_job(
438    State(state): State<Arc<AppState>>,
439    Path(id): Path<String>,
440) -> Result<Json<KillJobResponse>, StatusCode> {
441    let scheduler = state.scheduler();
442
443    scheduler.kill_running_job(&id).await.map_err(|e| {
444        eprintln!("Error killing running job '{}': {:?}", id, e);
445        match e {
446            aster::scheduler::SchedulerError::JobNotFound(_) => StatusCode::NOT_FOUND,
447            aster::scheduler::SchedulerError::AnyhowError(_) => StatusCode::BAD_REQUEST,
448            _ => StatusCode::INTERNAL_SERVER_ERROR,
449        }
450    })?;
451
452    Ok(Json(KillJobResponse {
453        message: format!("Successfully killed running job '{}'", id),
454    }))
455}
456
457#[utoipa::path(
458    get,
459    path = "/schedule/{id}/inspect",
460    params(
461        ("id" = String, Path, description = "ID of the schedule to inspect")
462    ),
463    responses(
464        (status = 200, description = "Running job information", body = InspectJobResponse),
465        (status = 404, description = "Scheduled job not found"),
466        (status = 500, description = "Internal server error")
467    ),
468    tag = "schedule"
469)]
470#[axum::debug_handler]
471pub async fn inspect_running_job(
472    State(state): State<Arc<AppState>>,
473    Path(id): Path<String>,
474) -> Result<Json<InspectJobResponse>, StatusCode> {
475    let scheduler = state.scheduler();
476
477    match scheduler.get_running_job_info(&id).await {
478        Ok(info) => {
479            if let Some((session_id, start_time)) = info {
480                let duration = chrono::Utc::now().signed_duration_since(start_time);
481                Ok(Json(InspectJobResponse {
482                    session_id: Some(session_id),
483                    process_start_time: Some(start_time.to_rfc3339()),
484                    running_duration_seconds: Some(duration.num_seconds()),
485                }))
486            } else {
487                Ok(Json(InspectJobResponse {
488                    session_id: None,
489                    process_start_time: None,
490                    running_duration_seconds: None,
491                }))
492            }
493        }
494        Err(e) => {
495            eprintln!("Error inspecting running job '{}': {:?}", id, e);
496            match e {
497                aster::scheduler::SchedulerError::JobNotFound(_) => Err(StatusCode::NOT_FOUND),
498                _ => Err(StatusCode::INTERNAL_SERVER_ERROR),
499            }
500        }
501    }
502}
503
504pub fn routes(state: Arc<AppState>) -> Router {
505    Router::new()
506        .route("/schedule/create", post(create_schedule))
507        .route("/schedule/list", get(list_schedules))
508        .route("/schedule/delete/{id}", delete(delete_schedule)) // Corrected
509        .route("/schedule/{id}", put(update_schedule))
510        .route("/schedule/{id}/run_now", post(run_now_handler)) // Corrected
511        .route("/schedule/{id}/pause", post(pause_schedule))
512        .route("/schedule/{id}/unpause", post(unpause_schedule))
513        .route("/schedule/{id}/kill", post(kill_running_job))
514        .route("/schedule/{id}/inspect", get(inspect_running_job))
515        .route("/schedule/{id}/sessions", get(sessions_handler)) // Corrected
516        .with_state(state)
517}