use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use apalis::layers::retry::RetryPolicy;
use apalis::layers::WorkerBuilderExt;
use apalis::prelude::AcknowledgementExt;
use apalis::prelude::{Event, EventListenerExt, Monitor, WorkerBuilder};
use apalis_cron::CronStream;
use cron::Schedule;
use serde::{Deserialize, Serialize};
use tracing::info;
use gradatum_core::QueueStore;
use gradatum_curator::CuratorProcess;
use gradatum_embed::Embedder;
use gradatum_index::SqliteIndex;
use gradatum_vault::Vault;
use sqlx::SqlitePool;
use super::apalis_backend::build_gradatum_backend;
use super::apalis_handlers::{handle_curate, handle_embed, handle_reindex};
use super::metrics::WorkerMetrics;
use super::schedules::{handle_cleanup_dlq, ScheduleConfig};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkerConfig {
#[serde(default = "WorkerConfig::default_concurrency")]
pub concurrency: usize,
#[serde(default = "WorkerConfig::default_timeout_secs")]
pub timeout_secs: u64,
#[serde(default = "WorkerConfig::default_max_retries")]
pub max_retries: usize,
}
impl WorkerConfig {
fn default_concurrency() -> usize {
2
}
fn default_timeout_secs() -> u64 {
30
}
fn default_max_retries() -> usize {
3
}
}
impl Default for WorkerConfig {
fn default() -> Self {
Self {
concurrency: Self::default_concurrency(),
timeout_secs: Self::default_timeout_secs(),
max_retries: Self::default_max_retries(),
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ApalisConfig {
#[serde(default)]
pub workers: WorkersConfig,
#[serde(default)]
pub schedules: Vec<ScheduleConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkersConfig {
#[serde(default = "WorkersConfig::default_curate")]
pub curate: WorkerConfig,
#[serde(default = "WorkersConfig::default_embed")]
pub embed: WorkerConfig,
#[serde(default = "WorkersConfig::default_reindex")]
pub reindex: WorkerConfig,
}
impl WorkersConfig {
fn default_curate() -> WorkerConfig {
WorkerConfig {
concurrency: 2,
timeout_secs: 30,
max_retries: 3,
}
}
fn default_embed() -> WorkerConfig {
WorkerConfig {
concurrency: 4,
timeout_secs: 60,
max_retries: 3,
}
}
fn default_reindex() -> WorkerConfig {
WorkerConfig {
concurrency: 4,
timeout_secs: 120,
max_retries: 2,
}
}
}
impl Default for WorkersConfig {
fn default() -> Self {
Self {
curate: Self::default_curate(),
embed: Self::default_embed(),
reindex: Self::default_reindex(),
}
}
}
pub struct MonitorDeps {
pub vault: Arc<Vault>,
pub curator: Arc<dyn CuratorProcess + Send + Sync>,
pub embedder: Arc<dyn Embedder + Send + Sync>,
pub index: Arc<SqliteIndex>,
}
pub fn build_monitor(
store: Arc<dyn QueueStore + Send + Sync>,
pool: Arc<SqlitePool>,
deps: MonitorDeps,
config: &ApalisConfig,
metrics: WorkerMetrics,
shutdown_timeout_secs: u64,
) -> anyhow::Result<Monitor> {
let MonitorDeps {
vault,
curator,
embedder,
index,
} = deps;
let curate_cfg = config.workers.curate.clone();
let embed_cfg = config.workers.embed.clone();
let reindex_cfg = config.workers.reindex.clone();
let (curate_backend, curate_ack) = build_gradatum_backend(Arc::clone(&store), "Curate")?;
let (embed_backend, embed_ack) = build_gradatum_backend(Arc::clone(&store), "Embed")?;
let (reindex_backend, reindex_ack) = build_gradatum_backend(Arc::clone(&store), "ReIndex")?;
let m_curate = metrics.clone();
let m_embed = metrics.clone();
let m_reindex = metrics.clone();
let mut monitor = Monitor::new()
.register({
let cfg = curate_cfg.clone();
let backend = curate_backend;
let ack = curate_ack;
let m = m_curate;
let vault_c = Arc::clone(&vault);
let curator_c = Arc::clone(&curator);
let index_c = Arc::clone(&index);
let queue_c = Arc::clone(&store);
move |idx| {
let worker_name = format!("curate-{idx}");
WorkerBuilder::new(&worker_name)
.backend(backend.clone())
.data(Arc::clone(&vault_c))
.data(Arc::clone(&curator_c) as Arc<dyn CuratorProcess + Send + Sync>)
.data(Arc::clone(&index_c))
.data(Arc::clone(&queue_c) as Arc<dyn QueueStore + Send + Sync>)
.ack_with(ack.clone())
.enable_tracing()
.timeout(Duration::from_secs(cfg.timeout_secs))
.retry(RetryPolicy::retries(cfg.max_retries))
.catch_panic()
.concurrency(cfg.concurrency)
.on_event({
let m = m.clone();
move |_ctx, ev| {
match ev {
Event::Start => m.inc_workers_active("curate"),
Event::Stop => m.dec_workers_active("curate"),
Event::Success => m.inc_jobs_total("curate", "done"),
Event::Error(_) => m.inc_jobs_total("curate", "error"),
_ => {}
}
}
})
.build(handle_curate)
}
})
.register({
let cfg = embed_cfg.clone();
let backend = embed_backend;
let ack = embed_ack;
let m = m_embed;
let vault_e = Arc::clone(&vault);
let embedder_e = Arc::clone(&embedder);
let index_e = Arc::clone(&index);
move |idx| {
let worker_name = format!("embed-{idx}");
WorkerBuilder::new(&worker_name)
.backend(backend.clone())
.data(Arc::clone(&vault_e))
.data(Arc::clone(&embedder_e) as Arc<dyn Embedder + Send + Sync>)
.data(Arc::clone(&index_e))
.ack_with(ack.clone())
.enable_tracing()
.timeout(Duration::from_secs(cfg.timeout_secs))
.retry(RetryPolicy::retries(cfg.max_retries))
.catch_panic()
.concurrency(cfg.concurrency)
.on_event({
let m = m.clone();
move |_ctx, ev| match ev {
Event::Start => m.inc_workers_active("embed"),
Event::Stop => m.dec_workers_active("embed"),
Event::Success => m.inc_jobs_total("embed", "done"),
Event::Error(_) => m.inc_jobs_total("embed", "error"),
_ => {}
}
})
.build(handle_embed)
}
})
.register({
let cfg = reindex_cfg.clone();
let backend = reindex_backend;
let ack = reindex_ack;
let m = m_reindex;
let embedder_r = Arc::clone(&embedder);
let index_r = Arc::clone(&index);
move |idx| {
let worker_name = format!("reindex-{idx}");
WorkerBuilder::new(&worker_name)
.backend(backend.clone())
.data(Arc::clone(&index_r))
.data(Arc::clone(&embedder_r) as Arc<dyn Embedder + Send + Sync>)
.ack_with(ack.clone())
.enable_tracing()
.timeout(Duration::from_secs(cfg.timeout_secs))
.retry(RetryPolicy::retries(cfg.max_retries))
.catch_panic()
.concurrency(cfg.concurrency)
.on_event({
let m = m.clone();
move |_ctx, ev| match ev {
Event::Start => m.inc_workers_active("reindex"),
Event::Stop => m.dec_workers_active("reindex"),
Event::Success => m.inc_jobs_total("reindex", "done"),
Event::Error(_) => m.inc_jobs_total("reindex", "error"),
_ => {}
}
})
.build(handle_reindex)
}
});
for sched_cfg in &config.schedules {
if sched_cfg.name == "cleanup_dlq_daily" {
let retention = sched_cfg.retention_days;
let cron_expr = sched_cfg.cron.clone();
let dlq_pool = Arc::clone(&pool);
Schedule::from_str(&cron_expr).map_err(|e| {
anyhow::anyhow!(
"expression cron invalide pour '{}' ({}): {e}",
sched_cfg.name,
cron_expr
)
})?;
info!(
name = %sched_cfg.name,
cron = %cron_expr,
retention_days = retention,
"schedule cron enregistré"
);
monitor = monitor.register(move |_idx| {
let schedule = Schedule::from_str(&cron_expr).expect(
"expression cron validée avant enregistrement — ne peut pas échouer ici",
);
WorkerBuilder::new("cleanup-dlq-daily")
.backend(CronStream::new(schedule))
.data(Arc::clone(&dlq_pool))
.data(retention)
.enable_tracing()
.catch_panic()
.build(handle_cleanup_dlq)
});
} else {
tracing::warn!(
name = %sched_cfg.name,
"schedule cron inconnu — ignoré (v0.2.0 supporte cleanup_dlq_daily uniquement)"
);
}
}
let monitor = monitor.with_terminator(async move {
tokio::time::sleep(Duration::from_secs(shutdown_timeout_secs)).await;
info!(
secs = shutdown_timeout_secs,
"Monitor : timeout graceful shutdown atteint — arrêt forcé"
);
});
Ok(monitor)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn apalis_config_defaults_coherent() {
let cfg = ApalisConfig::default();
assert_eq!(cfg.workers.curate.concurrency, 2);
assert_eq!(cfg.workers.curate.timeout_secs, 30);
assert_eq!(cfg.workers.curate.max_retries, 3);
assert_eq!(cfg.workers.embed.concurrency, 4);
assert_eq!(cfg.workers.embed.timeout_secs, 60);
assert_eq!(cfg.workers.embed.max_retries, 3);
assert_eq!(cfg.workers.reindex.concurrency, 4);
assert_eq!(cfg.workers.reindex.timeout_secs, 120);
assert_eq!(cfg.workers.reindex.max_retries, 2);
assert!(cfg.schedules.is_empty());
}
#[test]
fn apalis_config_from_toml() {
let toml_str = r#"
[workers.curate]
concurrency = 3
timeout_secs = 45
max_retries = 5
[workers.embed]
concurrency = 8
timeout_secs = 90
max_retries = 2
[workers.reindex]
concurrency = 2
timeout_secs = 180
max_retries = 1
[[schedules]]
name = "cleanup_dlq_daily"
cron = "0 3 * * *"
retention_days = 30
"#;
let cfg: ApalisConfig = toml::from_str(toml_str).expect("parse TOML ApalisConfig");
assert_eq!(cfg.workers.curate.concurrency, 3);
assert_eq!(cfg.workers.embed.concurrency, 8);
assert_eq!(cfg.workers.reindex.timeout_secs, 180);
assert_eq!(cfg.schedules.len(), 1);
assert_eq!(cfg.schedules[0].name, "cleanup_dlq_daily");
assert_eq!(cfg.schedules[0].retention_days, 30);
}
#[test]
fn worker_config_defaults() {
let cfg: WorkerConfig = toml::from_str("").expect("parse WorkerConfig vide");
assert_eq!(cfg.concurrency, 2);
assert_eq!(cfg.timeout_secs, 30);
assert_eq!(cfg.max_retries, 3);
}
}