use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::{Duration, Instant};
use apalis::prelude::{
BackendBuilder, BoxDynError, CustomBackend, Parts, RandomId, Task, TaskBuilder, TaskId,
};
use futures::{sink, stream, StreamExt};
use tracing::{debug, error, warn};
use ulid::Ulid;
use gradatum_core::{GradatumJob, JobOutput, JobRecord, JobResult, QueueStore};
pub type GradatumBackend = CustomBackend<
GradatumJob,
Arc<dyn QueueStore + Send + Sync>,
futures::stream::BoxStream<
'static,
Result<Option<Task<GradatumJob, (), RandomId>>, BoxDynError>,
>,
futures::sink::Drain<Task<GradatumJob, (), RandomId>>,
RandomId,
>;
#[derive(Clone)]
pub struct GradatumAcknowledger {
store: Arc<dyn QueueStore + Send + Sync>,
}
impl GradatumAcknowledger {
#[must_use]
pub fn new(store: Arc<dyn QueueStore + Send + Sync>) -> Self {
Self { store }
}
}
impl fmt::Debug for GradatumAcknowledger {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("GradatumAcknowledger")
.field("store", &"Arc<dyn QueueStore>")
.finish()
}
}
impl apalis::prelude::Acknowledge<JobOutput, (), RandomId> for GradatumAcknowledger {
type Error = std::convert::Infallible;
type Future = Pin<Box<dyn Future<Output = Result<(), std::convert::Infallible>> + Send>>;
fn ack(
&mut self,
res: &Result<JobOutput, BoxDynError>,
parts: &Parts<(), RandomId>,
) -> Self::Future {
let maybe_job_id = parts.data.get::<Ulid>().copied();
let duration_ms = parts
.data
.get::<Instant>()
.map(|start| start.elapsed().as_millis() as u32)
.unwrap_or(0);
let attempt = parts.attempt.current() as u32;
let store = Arc::clone(&self.store);
let result = match res {
Ok(output) => AckResult::Success(output.result_note_md.clone()),
Err(e) => AckResult::Failure(format!("{e:#}"), attempt),
};
Box::pin(async move {
let job_id = match maybe_job_id {
Some(id) => id,
None => {
error!("ack: Ulid absent des parts.data — job non retrouvable, status REST Running");
return Ok(());
}
};
match result {
AckResult::Success(desc) => {
let job_result = JobResult {
success: true,
duration_ms, cost_usd: None,
result_note: None,
conflict_payload: None,
};
if let Err(e) = store.complete(job_id, job_result).await {
error!(
job_id = %job_id,
error = %e,
desc = %desc,
"ack: store.complete échoué — status restera Running"
);
} else {
tracing::info!(
job_id = %job_id,
"ack: job marqué Done"
);
}
}
AckResult::Failure(err_msg, att) => {
if let Err(e) = store.fail(job_id, &err_msg, att).await {
error!(
job_id = %job_id,
error = %e,
handler_error = %err_msg,
"ack: store.fail échoué — status restera Running"
);
} else {
warn!(
job_id = %job_id,
attempt = att,
handler_error = %err_msg,
"ack: job marqué Failed"
);
}
}
}
Ok(())
})
}
}
enum AckResult {
Success(String),
Failure(String, u32),
}
pub fn build_gradatum_backend(
store: Arc<dyn QueueStore + Send + Sync>,
kind: &'static str,
) -> Result<(GradatumBackend, GradatumAcknowledger), anyhow::Error> {
let store_for_fetcher = Arc::clone(&store);
let acknowledger = GradatumAcknowledger::new(Arc::clone(&store));
let backend = BackendBuilder::<GradatumJob, _, _, _, RandomId>::new()
.database(store_for_fetcher)
.fetcher(move |db, _, _| {
let db = Arc::clone(db);
stream::unfold(db, move |store| async move {
tokio::time::sleep(Duration::from_millis(200)).await;
match store.dequeue_by_kind(kind).await {
Ok(Some(record)) => {
let task = record_to_task(record);
Some((Ok(Some(task)), store))
}
Ok(None) => {
debug!("gradatum_backend: file vide (Idle), re-poll dans 200ms");
Some((Ok(None), store))
}
Err(e) => {
Some((Err(BoxDynError::from(format!("dequeue error: {e}"))), store))
}
}
})
.boxed()
})
.sink(|_, _| {
sink::drain()
})
.build()
.map_err(|e| anyhow::anyhow!("build_gradatum_backend failed: {e:?}"))?;
Ok((backend, acknowledger))
}
fn record_to_task(record: JobRecord) -> Task<GradatumJob, (), RandomId> {
let job_id = record.id;
let priority = record.spec.priority.as_u8();
let job = GradatumJob { record, priority };
let started_at = Instant::now();
TaskBuilder::new(job)
.with_task_id(TaskId::new(RandomId::default()))
.data(job_id)
.data(started_at)
.build()
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
use gradatum_core::{
CurateSpec, Job, JobClass, JobLifecycle, JobLineage, JobMode, JobPriority, JobRecord,
JobRetry, JobScheduling, JobScope, JobSpec, JobStatus, TriggerSource,
};
use ulid::Ulid;
fn make_record() -> JobRecord {
let now = Utc::now();
let class = JobClass::Agent;
JobRecord {
id: Ulid::new(),
spec: JobSpec {
kind: Job::Curate(CurateSpec {
note_id: Ulid::new(),
tenant_id: "main".to_string(),
..Default::default()
}),
class,
mode: JobMode::Batch,
scope: JobScope::VaultWide,
priority: JobPriority::High,
},
scheduling: JobScheduling {
trigger: TriggerSource::Demand,
scheduled_at: now,
await_jobs: vec![],
deadline: None,
cron_expr: None,
},
lifecycle: JobLifecycle {
status: JobStatus::Pending,
created_at: now,
started_at: None,
completed_at: None,
lease_until: None,
result: None,
},
retry: JobRetry::default(),
lineage: JobLineage {
triggered_by: None,
parent_job: None,
pipeline_id: None,
pipeline_step: None,
children: vec![],
cost_usd: None,
},
}
}
#[test]
fn record_to_task_preserves_fields() {
let record = make_record();
let id = record.id;
let task = record_to_task(record);
assert_eq!(task.args.record.id, id);
assert_eq!(task.args.priority, JobPriority::High.as_u8());
}
#[test]
fn record_to_task_injects_task_id() {
let record = make_record();
let task = record_to_task(record);
assert!(
task.parts.task_id.is_some(),
"task_id doit être injecté pour TracingLayer"
);
}
#[test]
fn record_to_task_injects_ulid_in_data() {
let record = make_record();
let expected_id = record.id;
let task = record_to_task(record);
let retrieved = task.parts.data.get::<Ulid>().copied();
assert_eq!(
retrieved,
Some(expected_id),
"Ulid du JobRecord doit être dans parts.data"
);
}
#[test]
fn acknowledger_is_clone() {
use std::sync::Arc;
struct MockStore;
#[async_trait::async_trait]
impl QueueStore for MockStore {
async fn enqueue(&self, _: JobRecord) -> Result<Ulid, gradatum_core::QueueError> {
unimplemented!()
}
async fn dequeue(&self) -> Result<Option<JobRecord>, gradatum_core::QueueError> {
unimplemented!()
}
async fn get(&self, _: Ulid) -> Result<Option<JobRecord>, gradatum_core::QueueError> {
unimplemented!()
}
async fn complete(
&self,
_: Ulid,
_: JobResult,
) -> Result<(), gradatum_core::QueueError> {
unimplemented!()
}
async fn fail(
&self,
_: Ulid,
_: &str,
_: u32,
) -> Result<(), gradatum_core::QueueError> {
unimplemented!()
}
async fn cancel(&self, _: Ulid) -> Result<(), gradatum_core::QueueError> {
unimplemented!()
}
async fn fail_dlq(&self, _: Ulid, _: &str) -> Result<(), gradatum_core::QueueError> {
unimplemented!()
}
async fn find_awaiting(
&self,
_: Ulid,
) -> Result<Vec<JobRecord>, gradatum_core::QueueError> {
unimplemented!()
}
async fn set_pending(&self, _: Ulid) -> Result<(), gradatum_core::QueueError> {
unimplemented!()
}
async fn recover_stale_leases(
&self,
_: std::time::Duration,
) -> Result<Vec<Ulid>, gradatum_core::QueueError> {
unimplemented!()
}
async fn cancel_expired_deadlines(
&self,
_: chrono::DateTime<chrono::Utc>,
) -> Result<Vec<Ulid>, gradatum_core::QueueError> {
unimplemented!()
}
async fn promote_retries(
&self,
_: chrono::DateTime<chrono::Utc>,
) -> Result<Vec<Ulid>, gradatum_core::QueueError> {
unimplemented!()
}
async fn schedule_retry(
&self,
_: Ulid,
_: chrono::DateTime<chrono::Utc>,
) -> Result<(), gradatum_core::QueueError> {
unimplemented!()
}
async fn list(
&self,
_: gradatum_core::JobFilter,
) -> Result<Vec<JobRecord>, gradatum_core::QueueError> {
unimplemented!()
}
fn subscribe(&self) -> tokio::sync::broadcast::Receiver<gradatum_core::QueueEvent> {
unimplemented!()
}
}
let ack = GradatumAcknowledger::new(Arc::new(MockStore));
let _ack2 = ack.clone(); }
#[test]
fn record_to_task_injects_instant_for_duration_measurement() {
let record = make_record();
let before = Instant::now();
let task = record_to_task(record);
let after = Instant::now();
let injected = task.parts.data.get::<Instant>().copied();
assert!(
injected.is_some(),
"Instant doit être injecté dans parts.data pour mesure duration_ms (D-21)"
);
let start = injected.unwrap();
assert!(
start >= before,
"Instant injecté doit être >= before (créé après before)"
);
assert!(
start <= after,
"Instant injecté doit être <= after (créé avant after)"
);
}
}