moadim 0.2.0

Moadim.io MCP/REST server for managing cron jobs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
//! Cron job data model, service functions, and Axum HTTP handlers.

use axum::{
    extract::{Path, State},
    http::StatusCode,
    Json,
};
use croner::Cron;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex};
use uuid::Uuid;

use crate::error::AppError;
use crate::paths::job_toml_path;
use crate::storage::{remove_job_dir, write_job};
use crate::utils::time::now_secs;

/// Whether a cron job is owned by this server or discovered from the OS.
#[derive(
    Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema, utoipa::ToSchema,
)]
#[serde(rename_all = "snake_case")]
pub enum CronJobSourceType {
    /// Created and managed by this server.
    Managed,
    /// Read-only entry discovered from the host OS crontab.
    System,
}

impl CronJobSourceType {
    /// Derive from the raw `source` string stored on a [`CronJob`].
    pub fn from_source(source: &str) -> Self {
        if source == "managed" {
            Self::Managed
        } else {
            Self::System
        }
    }
}

/// A persisted cron job with scheduling and metadata.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, utoipa::ToSchema)]
pub struct CronJob {
    /// Unique identifier (UUID v4).
    pub id: String,
    /// Cron expression defining when the job runs.
    pub schedule: String,
    /// Identifier for the handler that processes the job.
    pub handler: String,
    /// Arbitrary JSON metadata attached to the job.
    pub metadata: serde_json::Value,
    /// Whether the job is active.
    pub enabled: bool,
    /// `"managed"` for jobs owned by this server; `"system:*"` for read-only system cron entries.
    pub source: String,
    /// Unix timestamp (seconds) when the job was created.
    pub created_at: u64,
    /// Unix timestamp (seconds) when the job was last updated.
    pub updated_at: u64,
    /// Unix timestamp (seconds) when the job was last manually triggered, if ever.
    pub last_triggered_at: Option<u64>,
}

/// A [`CronJob`] enriched with a flag indicating whether its handler is registered.
#[derive(Debug, Clone, Serialize, JsonSchema, utoipa::ToSchema)]
pub struct CronJobResponse {
    /// The underlying cron job.
    #[serde(flatten)]
    pub job: CronJob,
    /// Whether the job is owned by this server or the host OS.
    pub source_type: CronJobSourceType,
    /// `true` if the job's handler appears in the server's handler registry.
    pub handler_registered: bool,
    /// Absolute path to the job's `job.toml` file on disk.
    pub file_path: String,
    /// Human-readable description of the schedule (e.g. `"At 09:30, Monday through Friday"`).
    /// `null` for expressions that cannot be parsed into a description (e.g. `@reboot`).
    pub schedule_description: Option<String>,
}

impl CronJobResponse {
    /// Build a response from `job`, checking `handlers` for registration status.
    pub fn from_job(job: CronJob, handlers: &HashSet<String>) -> Self {
        let source_type = CronJobSourceType::from_source(&job.source);
        let handler_registered = handlers.contains(&job.handler);
        let file_path = job_toml_path(&job.id).to_string_lossy().into_owned();
        let schedule_description = job.schedule.parse::<Cron>().ok().map(|c| c.describe());
        Self {
            job,
            source_type,
            handler_registered,
            file_path,
            schedule_description,
        }
    }
}

/// Thread-safe shared store of cron jobs keyed by ID.
pub type CronStore = Arc<Mutex<HashMap<String, CronJob>>>;
/// Thread-safe set of registered handler identifiers.
pub type HandlerRegistry = Arc<HashSet<String>>;

/// Combined Axum application state holding the job store and handler registry.
#[derive(Clone)]
pub struct AppState {
    /// Shared cron job store.
    pub store: CronStore,
    /// Registered handler identifiers.
    pub handlers: HandlerRegistry,
    /// Shared routine (agent-driven job) store.
    pub routines: crate::routines::RoutineStore,
    /// Unix timestamp (seconds) when the server started.
    pub uptime_start: u64,
}

impl axum::extract::FromRef<AppState> for CronStore {
    fn from_ref(state: &AppState) -> Self {
        state.store.clone()
    }
}

impl axum::extract::FromRef<AppState> for crate::routines::RoutineStore {
    fn from_ref(state: &AppState) -> Self {
        state.routines.clone()
    }
}

/// Create an empty [`CronStore`].
#[cfg(test)]
pub fn new_store() -> CronStore {
    Arc::new(Mutex::new(HashMap::new()))
}

/// Create an empty [`HandlerRegistry`].
pub fn new_registry() -> HandlerRegistry {
    Arc::new(HashSet::new())
}

/// Normalize `expr` to 5-field OS cron format for consistent storage.
///
/// Strips the seconds (field 0) and year (field 6) from any 7-field expression.
/// `@keyword` schedules and already-5-field expressions are returned unchanged.
pub(crate) fn normalize_schedule(expr: &str) -> String {
    let s = expr.trim();
    if s.starts_with('@') {
        return s.to_string();
    }
    let fields: Vec<&str> = s.split_ascii_whitespace().collect();
    match fields.len() {
        7 => fields[1..6].join(" "),
        _ => s.to_string(),
    }
}

/// Parse `expr` as a cron expression, returning `BadRequest` on failure.
///
/// Accepts standard 5-field (`min hour dom month dow`) and `@keyword` formats.
/// 7-field expressions are first normalized to 5-field via [`normalize_schedule`].
pub(crate) fn validate_cron(expr: &str) -> Result<(), AppError> {
    let normalized = normalize_schedule(expr.trim());
    normalized
        .parse::<Cron>()
        .map_err(|e| AppError::BadRequest(format!("invalid cron expression: {}", e)))?;
    Ok(())
}

/// Request body for creating a new cron job.
#[derive(Deserialize, JsonSchema, utoipa::ToSchema)]
pub struct CreateRequest {
    /// Cron expression for the new job.
    pub schedule: String,
    /// Handler identifier to invoke when the schedule fires.
    pub handler: String,
    /// Optional metadata (defaults to null).
    #[serde(default)]
    #[schemars(schema_with = "crate::utils::schema::metadata_schema")]
    pub metadata: serde_json::Value,
    /// Whether to create the job in an enabled state (defaults to `true`).
    #[serde(default = "bool_true")]
    pub enabled: bool,
}

/// Serde default for boolean fields that should default to `true`.
fn bool_true() -> bool {
    true
}

/// Request body for partially updating an existing cron job.
#[derive(Deserialize, JsonSchema, utoipa::ToSchema)]
pub struct UpdateRequest {
    /// New cron expression, or `None` to keep the existing value.
    pub schedule: Option<String>,
    /// New handler identifier, or `None` to keep the existing value.
    pub handler: Option<String>,
    /// New metadata, or `None` to keep the existing value.
    #[schemars(schema_with = "crate::utils::schema::metadata_schema")]
    pub metadata: Option<serde_json::Value>,
    /// New enabled state, or `None` to keep the existing value.
    pub enabled: Option<bool>,
}

// --- Service layer (no HTTP types) ---

/// Return all jobs sorted by creation time (oldest first).
pub fn svc_list(store: &CronStore, handlers: &HandlerRegistry) -> Vec<CronJobResponse> {
    let lock = store.lock().unwrap();
    let mut jobs: Vec<CronJob> = lock.values().cloned().collect();
    jobs.sort_by_key(|j| j.created_at);
    drop(lock);
    jobs.into_iter()
        .map(|j| CronJobResponse::from_job(j, handlers))
        .collect()
}

/// Look up a job by `id`, returning `NotFound` if it does not exist.
pub fn svc_get(
    store: &CronStore,
    handlers: &HandlerRegistry,
    id: &str,
) -> Result<CronJobResponse, AppError> {
    let job = store
        .lock()
        .unwrap()
        .get(id)
        .cloned()
        .ok_or(AppError::NotFound)?;
    Ok(CronJobResponse::from_job(job, handlers))
}

/// Validate `req`, assign a UUID, persist, and return the new job.
pub fn svc_create(
    store: &CronStore,
    handlers: &HandlerRegistry,
    req: CreateRequest,
) -> Result<CronJobResponse, AppError> {
    validate_cron(&req.schedule)?;
    let now = now_secs();
    let job = CronJob {
        id: Uuid::new_v4().to_string(),
        schedule: normalize_schedule(&req.schedule),
        handler: req.handler,
        metadata: req.metadata,
        enabled: req.enabled,
        source: "managed".to_string(),
        created_at: now,
        updated_at: now,
        last_triggered_at: None,
    };
    write_job(&job).map_err(|_| AppError::Internal)?;
    store.lock().unwrap().insert(job.id.clone(), job.clone());
    if let Err(e) = crate::sync::sync_to_crontab(store) {
        log::warn!("crontab sync after create failed: {e}");
    }
    Ok(CronJobResponse::from_job(job, handlers))
}

/// Apply non-`None` fields from `req` to the job identified by `id`.
pub fn svc_update(
    store: &CronStore,
    handlers: &HandlerRegistry,
    id: &str,
    req: UpdateRequest,
) -> Result<CronJobResponse, AppError> {
    if let Some(ref sched) = req.schedule {
        validate_cron(sched)?;
    }
    let mut lock = store.lock().unwrap();
    let job = lock.get_mut(id).ok_or(AppError::NotFound)?;
    if let Some(s) = req.schedule {
        job.schedule = normalize_schedule(&s);
    }
    if let Some(h) = req.handler {
        job.handler = h;
    }
    if let Some(m) = req.metadata {
        job.metadata = m;
    }
    if let Some(e) = req.enabled {
        job.enabled = e;
    }
    job.updated_at = now_secs();
    let job = job.clone();
    drop(lock);
    write_job(&job).map_err(|_| AppError::Internal)?;
    if let Err(e) = crate::sync::sync_to_crontab(store) {
        log::warn!("crontab sync after update failed: {e}");
    }
    Ok(CronJobResponse::from_job(job, handlers))
}

/// Remove the job with `id` from the store, returning the deleted job or `NotFound`.
pub fn svc_delete(
    store: &CronStore,
    handlers: &HandlerRegistry,
    id: &str,
) -> Result<CronJobResponse, AppError> {
    let job = store.lock().unwrap().remove(id).ok_or(AppError::NotFound)?;
    remove_job_dir(id).map_err(|_| AppError::Internal)?;
    if let Err(e) = crate::sync::sync_to_crontab(store) {
        log::warn!("crontab sync after delete failed: {e}");
    }
    Ok(CronJobResponse::from_job(job, handlers))
}

/// Record a manual trigger for `id`, updating `last_triggered_at` in-store and on disk,
/// then spawn the handler script from the handlers directory if it exists.
pub fn svc_trigger(store: &CronStore, id: &str) -> Result<CronJob, AppError> {
    let mut lock = store.lock().unwrap();
    let job = lock.get_mut(id).ok_or(AppError::NotFound)?;
    job.last_triggered_at = Some(now_secs());
    let job = job.clone();
    drop(lock);
    write_job(&job).map_err(|_| AppError::Internal)?;
    let handler_path = crate::paths::handlers_dir().join(&job.handler);
    if handler_path.exists() {
        if let Err(e) = std::process::Command::new(&handler_path).spawn() {
            log::warn!("trigger: failed to spawn handler {:?}: {e}", handler_path);
        }
    } else {
        log::warn!("trigger: handler script not found at {:?}", handler_path);
    }
    Ok(job)
}

// --- Axum HTTP handlers ---

/// `POST /cron-jobs` — create a new cron job.
#[utoipa::path(post, path = "/cron-jobs",
    request_body = CreateRequest,
    responses((status = 201, body = CronJobResponse), (status = 400, description = "Invalid cron expression")))]
pub async fn create(
    State(state): State<AppState>,
    Json(body): Json<CreateRequest>,
) -> Result<(StatusCode, Json<CronJobResponse>), AppError> {
    Ok((
        StatusCode::CREATED,
        Json(svc_create(&state.store, &state.handlers, body)?),
    ))
}

/// `GET /cron-jobs` — list all cron jobs sorted by creation time.
#[utoipa::path(get, path = "/cron-jobs",
    responses((status = 200, body = Vec<CronJobResponse>)))]
pub async fn list(State(state): State<AppState>) -> Json<Vec<CronJobResponse>> {
    Json(svc_list(&state.store, &state.handlers))
}

/// `GET /cron-jobs/{id}` — retrieve a single cron job by UUID.
#[utoipa::path(get, path = "/cron-jobs/{id}",
    params(("id" = String, Path, description = "Cron job UUID")),
    responses((status = 200, body = CronJobResponse), (status = 404, description = "Not found")))]
pub async fn get(
    State(state): State<AppState>,
    Path(id): Path<String>,
) -> Result<Json<CronJobResponse>, AppError> {
    Ok(Json(svc_get(&state.store, &state.handlers, &id)?))
}

/// `PATCH /cron-jobs/{id}` — partially update a cron job.
#[utoipa::path(patch, path = "/cron-jobs/{id}",
    params(("id" = String, Path, description = "Cron job UUID")),
    request_body = UpdateRequest,
    responses((status = 200, body = CronJobResponse), (status = 400, description = "Invalid"), (status = 404, description = "Not found")))]
pub async fn update(
    State(state): State<AppState>,
    Path(id): Path<String>,
    Json(body): Json<UpdateRequest>,
) -> Result<Json<CronJobResponse>, AppError> {
    Ok(Json(svc_update(&state.store, &state.handlers, &id, body)?))
}

/// `PUT /cron-jobs/{id}` — fully replace a cron job (behaves identically to PATCH).
#[utoipa::path(put, path = "/cron-jobs/{id}",
    params(("id" = String, Path, description = "Cron job UUID")),
    request_body = UpdateRequest,
    responses((status = 200, body = CronJobResponse), (status = 400, description = "Invalid"), (status = 404, description = "Not found")))]
pub async fn replace(
    state: State<AppState>,
    path: Path<String>,
    body: Json<UpdateRequest>,
) -> Result<Json<CronJobResponse>, AppError> {
    update(state, path, body).await
}

/// `DELETE /cron-jobs/{id}` — delete a cron job by UUID.
#[utoipa::path(delete, path = "/cron-jobs/{id}",
    params(("id" = String, Path, description = "Cron job UUID")),
    responses((status = 200, body = CronJobResponse), (status = 404, description = "Not found")))]
pub async fn delete(
    State(state): State<AppState>,
    Path(id): Path<String>,
) -> Result<Json<CronJobResponse>, AppError> {
    Ok(Json(svc_delete(&state.store, &state.handlers, &id)?))
}

/// `POST /cron-jobs/{id}/trigger` — manually trigger a cron job outside its schedule.
#[utoipa::path(post, path = "/cron-jobs/{id}/trigger",
    params(("id" = String, Path, description = "Cron job UUID")),
    responses((status = 200, body = CronJob), (status = 404, description = "Not found")))]
pub async fn trigger(
    State(store): State<CronStore>,
    Path(id): Path<String>,
) -> Result<Json<CronJob>, AppError> {
    Ok(Json(svc_trigger(&store, &id)?))
}

/// Return the log file path for job `id`, or `NotFound` if no such job exists.
pub fn svc_logs_path(store: &CronStore, id: &str) -> Result<std::path::PathBuf, AppError> {
    if !store.lock().unwrap().contains_key(id) {
        return Err(AppError::NotFound);
    }
    Ok(crate::paths::job_log_path(id))
}

/// `GET /cron-jobs/{id}/logs` — return the contents of the job's log file as plain text.
#[utoipa::path(get, path = "/cron-jobs/{id}/logs",
    params(("id" = String, Path, description = "Cron job UUID")),
    responses((status = 200, description = "Log file contents as plain text"), (status = 404, description = "Not found")))]
pub async fn get_logs(
    State(store): State<CronStore>,
    Path(id): Path<String>,
) -> Result<String, AppError> {
    let log_path = svc_logs_path(&store, &id)?;
    if !log_path.exists() {
        return Ok(String::new());
    }
    tokio::fs::read_to_string(&log_path)
        .await
        .map_err(|_| AppError::Internal)
}

#[cfg(test)]
#[path = "cron_jobs_tests.rs"]
mod cron_jobs_tests;