use std::convert::Infallible;
use std::time::Duration;
use axum::{
extract::{Path, Query, State},
http::{HeaderMap, StatusCode},
response::{
sse::{Event, KeepAlive, Sse},
IntoResponse, Json, Response,
},
Extension,
};
use chrono::Utc;
use serde::{Deserialize, Serialize};
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::StreamExt as _;
use ulid::Ulid;
use gradatum_core::{JobFilter, JobRecord, JobStatus, QueueEvent};
use crate::state::AppState;
use gradatum_core::trust::TrustContext;
use gradatum_db_sqlite::{idempotency_insert, idempotency_lookup};
#[derive(Debug, Serialize)]
pub struct JobListResponse {
pub items: Vec<JobRecord>,
pub next_cursor: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct JobListQuery {
pub status: Option<String>,
pub kind: Option<String>,
pub since: Option<String>,
pub limit: Option<usize>,
pub cursor: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct CreateJobRequest {
pub spec: serde_json::Value,
#[allow(dead_code)]
pub scheduling: Option<serde_json::Value>,
pub lineage: Option<serde_json::Value>,
}
#[derive(Debug, Serialize)]
pub struct CreateJobResponse {
pub id: String,
pub idempotent: bool,
}
#[derive(Debug, Serialize)]
pub struct CancelJobResponse {
pub id: String,
pub status: String,
}
fn parse_status(s: &str) -> Option<JobStatus> {
match s.to_lowercase().as_str() {
"pending" => Some(JobStatus::Pending),
"running" => Some(JobStatus::Running),
"waiting" => Some(JobStatus::Waiting),
"done" => Some(JobStatus::Done),
"failed" => Some(JobStatus::Failed),
"dlq" => Some(JobStatus::DLQ),
"cancelled" | "canceled" => Some(JobStatus::Cancelled),
_ => None,
}
}
pub async fn list_jobs(
State(state): State<AppState>,
Extension(_trust): Extension<TrustContext>,
Query(query): Query<JobListQuery>,
) -> Result<Json<JobListResponse>, StatusCode> {
let limit = query.limit.unwrap_or(50).clamp(1, 200);
let status_filter = match &query.status {
Some(s) => match parse_status(s) {
Some(st) => Some(st),
None => return Err(StatusCode::BAD_REQUEST),
},
None => None,
};
let cursor_filter = match &query.cursor {
Some(c) => match c.parse::<Ulid>() {
Ok(u) => Some(u),
Err(_) => return Err(StatusCode::BAD_REQUEST),
},
None => None,
};
let created_after = match &query.since {
Some(s) => match s.parse::<chrono::DateTime<Utc>>() {
Ok(dt) => Some(dt),
Err(_) => return Err(StatusCode::BAD_REQUEST),
},
None => None,
};
let filter = JobFilter {
status: status_filter,
kind: query.kind.clone(),
created_after,
cursor: cursor_filter,
limit: limit + 1,
..Default::default()
};
match state.job_store.list(filter).await {
Ok(mut items) => {
let has_more = items.len() > limit;
if has_more {
items.truncate(limit);
}
let next_cursor = if has_more {
items.last().map(|r| r.id.to_string())
} else {
None
};
Ok(Json(JobListResponse { items, next_cursor }))
}
Err(e) => {
tracing::error!(error = %e, "list_jobs: QueueStore.list() échoué");
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
pub async fn get_job_v2(
State(state): State<AppState>,
Extension(_trust): Extension<TrustContext>,
Path(id_str): Path<String>,
) -> Result<Json<JobRecord>, StatusCode> {
let id = match id_str.parse::<Ulid>() {
Ok(u) => u,
Err(_) => return Err(StatusCode::BAD_REQUEST),
};
match state.job_store.get(id).await {
Ok(Some(record)) => Ok(Json(record)),
Ok(None) => Err(StatusCode::NOT_FOUND),
Err(e) => {
tracing::error!(error = %e, job_id = %id, "get_job_v2: QueueStore.get() échoué");
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
pub async fn create_job(
State(state): State<AppState>,
Extension(_trust): Extension<TrustContext>,
headers: HeaderMap,
Json(body): Json<CreateJobRequest>,
) -> Result<Response, StatusCode> {
let idempotency_key = match headers.get("Idempotency-Key") {
Some(v) => match v.to_str() {
Ok(s) if !s.is_empty() && s.len() <= 256 => s.to_string(),
_ => return Err(StatusCode::BAD_REQUEST),
},
None => return Err(StatusCode::BAD_REQUEST),
};
let pool = match &state.jobs_pool {
Some(p) => p.clone(),
None => {
tracing::warn!("create_job: jobs_pool non câblé — Idempotency-Key non supporté");
return Err(StatusCode::NOT_IMPLEMENTED);
}
};
match idempotency_lookup(&pool, &idempotency_key).await {
Ok(Some(existing_id)) => {
let response = CreateJobResponse {
id: existing_id,
idempotent: true,
};
return Ok((StatusCode::OK, Json(response)).into_response());
}
Ok(None) => {} Err(e) => {
tracing::error!(error = %e, "create_job: idempotency_lookup échoué");
return Err(StatusCode::INTERNAL_SERVER_ERROR);
}
}
let job_record = build_job_record_from_spec(body);
match state.job_store.enqueue(job_record).await {
Ok(job_id) => {
let job_id_str = job_id.to_string();
if let Err(e) = idempotency_insert(&pool, &idempotency_key, &job_id_str).await {
tracing::warn!(
error = %e,
job_id = %job_id_str,
"create_job: idempotency_insert échoué — job créé mais clé non stockée"
);
}
let response = CreateJobResponse {
id: job_id_str,
idempotent: false,
};
Ok((StatusCode::ACCEPTED, Json(response)).into_response())
}
Err(e) => {
tracing::error!(error = %e, "create_job: QueueStore.enqueue() échoué");
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
fn build_job_record_from_spec(body: CreateJobRequest) -> JobRecord {
use gradatum_core::{
CurateSpec, Job, JobClass, JobLifecycle, JobLineage, JobMode, JobPriority, JobRecord,
JobRetry, JobScheduling, JobScope, JobSpec, RetryBackoff, TriggerSource,
};
let now = Utc::now();
let triggered_by = body
.lineage
.as_ref()
.and_then(|l| l.get("triggered_by"))
.and_then(|v| v.as_str())
.map(|s| s.to_string())
.or_else(|| {
serde_json::to_string(&body.spec)
.ok()
.map(|s| format!("api-spec:{}", &s[..s.len().min(200)]))
});
let stub_spec = CurateSpec {
note_id: Ulid::new(),
tenant_id: "main".to_string(),
..Default::default()
};
JobRecord {
id: Ulid::new(),
spec: JobSpec {
kind: Job::Curate(stub_spec),
class: JobClass::Api,
mode: JobMode::Batch,
scope: JobScope::VaultWide,
priority: JobPriority::default_for(&JobClass::Api),
},
scheduling: JobScheduling {
trigger: TriggerSource::Demand,
scheduled_at: now,
await_jobs: vec![],
deadline: None,
cron_expr: None,
},
lifecycle: JobLifecycle {
status: JobStatus::Pending,
created_at: now,
started_at: None,
completed_at: None,
lease_until: None,
result: None,
},
retry: JobRetry {
count: 0,
max: 3,
backoff: RetryBackoff::Exponential { base: 5, max: 120 },
last_error: None,
errors: vec![],
},
lineage: JobLineage {
triggered_by,
parent_job: None,
pipeline_id: None,
pipeline_step: None,
children: vec![],
cost_usd: None,
},
}
}
pub async fn cancel_job(
State(state): State<AppState>,
Extension(_trust): Extension<TrustContext>,
Path(id_str): Path<String>,
) -> Result<Json<CancelJobResponse>, StatusCode> {
let id = match id_str.parse::<Ulid>() {
Ok(u) => u,
Err(_) => return Err(StatusCode::BAD_REQUEST),
};
let record = match state.job_store.get(id).await {
Ok(Some(r)) => r,
Ok(None) => return Err(StatusCode::NOT_FOUND),
Err(e) => {
tracing::error!(error = %e, job_id = %id, "cancel_job: get() échoué");
return Err(StatusCode::INTERNAL_SERVER_ERROR);
}
};
match record.lifecycle.status {
JobStatus::Running => {
return Err(StatusCode::CONFLICT);
}
JobStatus::Done | JobStatus::DLQ | JobStatus::Cancelled => {
return Ok(Json(CancelJobResponse {
id: id.to_string(),
status: format!("{:?}", record.lifecycle.status).to_lowercase(),
}));
}
JobStatus::Pending | JobStatus::Waiting | JobStatus::Failed => {}
}
match state.job_store.cancel(id).await {
Ok(()) => Ok(Json(CancelJobResponse {
id: id.to_string(),
status: "cancelled".to_string(),
})),
Err(e) => {
tracing::error!(error = %e, job_id = %id, "cancel_job: QueueStore.cancel() échoué");
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
pub async fn job_events(
State(state): State<AppState>,
Extension(_trust): Extension<TrustContext>,
Path(id_str): Path<String>,
headers: HeaderMap,
) -> Result<impl IntoResponse, StatusCode> {
let id = match id_str.parse::<Ulid>() {
Ok(u) => u,
Err(_) => return Err(StatusCode::BAD_REQUEST),
};
match state.job_store.get(id).await {
Ok(None) => return Err(StatusCode::NOT_FOUND),
Ok(Some(_)) => {}
Err(e) => {
tracing::error!(error = %e, job_id = %id, "job_events: get() échoué");
return Err(StatusCode::INTERNAL_SERVER_ERROR);
}
}
let _last_event_id = headers
.get("Last-Event-ID")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u64>().ok());
let rx = state.job_store.subscribe();
let target_id = id;
let event_counter: u64 = 0;
let stream = BroadcastStream::new(rx).filter_map(move |result| {
match result {
Err(_) => {
None
}
Ok(event) => {
let matches = matches!(
&event,
QueueEvent::JobInserted(eid) |
QueueEvent::JobFailed(eid, _) |
QueueEvent::JobReady(eid) |
QueueEvent::JobCancelled(eid)
if *eid == target_id
) || matches!(
&event,
QueueEvent::JobCompleted(eid, _, _) if *eid == target_id
);
if !matches {
return None;
}
let event_data = match &event {
QueueEvent::JobCompleted(_, status, _) => {
let status_str = format!("{:?}", status).to_lowercase();
serde_json::json!({
"type": "status",
"status": status_str,
"timestamp": Utc::now().timestamp_millis()
})
}
QueueEvent::JobFailed(_, attempt) => {
serde_json::json!({
"type": "status",
"status": "failed",
"attempts": attempt,
"timestamp": Utc::now().timestamp_millis()
})
}
QueueEvent::JobCancelled(_) => {
serde_json::json!({
"type": "status",
"status": "cancelled",
"timestamp": Utc::now().timestamp_millis()
})
}
QueueEvent::JobReady(_) => {
serde_json::json!({
"type": "status",
"status": "pending",
"timestamp": Utc::now().timestamp_millis()
})
}
_ => {
serde_json::json!({
"type": "status",
"status": "inserted",
"timestamp": Utc::now().timestamp_millis()
})
}
};
let is_terminal = matches!(
&event,
QueueEvent::JobCompleted(eid, _, _) | QueueEvent::JobCancelled(eid)
if *eid == target_id
);
let data_str = serde_json::to_string(&event_data)
.unwrap_or_else(|_| r#"{"type":"error"}"#.to_string());
if is_terminal {
Some(Ok::<Event, Infallible>(Event::default().data(data_str)))
} else {
Some(Ok::<Event, Infallible>(Event::default().data(data_str)))
}
}
}
});
let _ = event_counter;
let sse = Sse::new(stream).keep_alive(
KeepAlive::new()
.interval(Duration::from_secs(30))
.text("heartbeat"),
);
Ok(sse)
}