use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;
use anyhow::Result;
use async_trait::async_trait;
use bytes::Bytes;
use folk_api::{Executor, PluginContext, ServerPlugin};
use folk_plugin_jobs::Driver;
use folk_plugin_jobs::MemoryDriver;
use folk_plugin_jobs::config::{BackoffStrategy, DriverKind, JobsConfig, QueueConfig};
use folk_plugin_jobs::plugin::JobsPlugin;
struct CountingExecutor {
count: AtomicU32,
}
impl CountingExecutor {
fn new() -> Arc<Self> {
Arc::new(Self {
count: AtomicU32::new(0),
})
}
}
#[async_trait]
impl Executor for CountingExecutor {
async fn execute_method(&self, _method: &str, _payload: Bytes) -> Result<Bytes> {
self.count.fetch_add(1, Ordering::SeqCst);
Ok(Bytes::new())
}
}
struct FailingExecutor {
call_count: AtomicU32,
fail_until: u32,
}
impl FailingExecutor {
fn new(fail_until: u32) -> Arc<Self> {
Arc::new(Self {
call_count: AtomicU32::new(0),
fail_until,
})
}
}
#[async_trait]
impl Executor for FailingExecutor {
async fn execute_method(&self, _method: &str, _payload: Bytes) -> Result<Bytes> {
let n = self.call_count.fetch_add(1, Ordering::SeqCst) + 1;
if n <= self.fail_until {
anyhow::bail!("simulated failure #{n}");
}
Ok(Bytes::new())
}
}
fn make_ctx(
executor: Arc<dyn Executor>,
shutdown_rx: tokio::sync::watch::Receiver<bool>,
) -> PluginContext {
PluginContext {
executor,
shutdown: shutdown_rx,
rpc_registrar: None,
health_registry: None,
metrics_registry: None,
}
}
fn default_queue() -> QueueConfig {
QueueConfig {
name: "default".into(),
concurrency: 1,
max_retries: 3,
retry_delay: Duration::from_millis(10),
retry_backoff: BackoffStrategy::Fixed,
job_timeout: Duration::ZERO,
dead_letter_queue: None,
priority: 10,
}
}
#[tokio::test]
async fn jobs_plugin_dispatches_via_executor() {
let executor = CountingExecutor::new();
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
let _ctx = make_ctx(executor.clone(), shutdown_rx);
let plugin = JobsPlugin::new(JobsConfig {
driver: DriverKind::Memory,
queues: vec![default_queue()],
..Default::default()
});
let driver = MemoryDriver::new();
driver
.push("default", Bytes::from("test-job"))
.await
.unwrap();
let d = driver.clone();
let exec = executor.clone();
let handle = tokio::spawn(async move {
let queue = default_queue();
let payload = d.pop(&queue.name).await.unwrap().unwrap();
let json_payload = serde_json::json!({"payload": String::from_utf8_lossy(&payload)});
let encoded = Bytes::from(serde_json::to_vec(&json_payload).unwrap());
exec.execute_method("jobs.process", encoded).await.unwrap();
});
tokio::time::timeout(Duration::from_millis(500), handle)
.await
.expect("timed out")
.expect("task panicked");
assert_eq!(executor.count.load(Ordering::SeqCst), 1);
shutdown_tx.send(true).ok();
assert_eq!(plugin.name(), "jobs");
}
#[tokio::test]
async fn retry_exhaustion_discards_job() {
let executor = FailingExecutor::new(100);
let driver = MemoryDriver::new();
driver
.push("default", Bytes::from("doomed-job"))
.await
.unwrap();
let d = driver.clone();
let exec: Arc<dyn Executor> = executor.clone();
let (shutdown_tx, _shutdown_rx) = tokio::sync::watch::channel(false);
let handle = tokio::spawn(async move {
let queue = QueueConfig {
max_retries: 2,
..default_queue()
};
let payload = d.pop(&queue.name).await.unwrap().unwrap();
let json_payload = serde_json::json!({"payload": String::from_utf8_lossy(&payload)});
let encoded = Bytes::from(serde_json::to_vec(&json_payload).unwrap());
let mut retries = 0u32;
loop {
match exec.execute_method("jobs.process", encoded.clone()).await {
Ok(_) => break,
Err(_) => {
retries += 1;
if retries >= queue.max_retries {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
}
});
tokio::time::timeout(Duration::from_secs(2), handle)
.await
.expect("timed out")
.expect("task panicked");
assert_eq!(executor.call_count.load(Ordering::SeqCst), 2);
shutdown_tx.send(true).ok();
}
#[tokio::test]
async fn dead_letter_queue_receives_failed_jobs() {
let executor = FailingExecutor::new(100);
let driver = MemoryDriver::new();
driver
.push("default", Bytes::from("dlq-job"))
.await
.unwrap();
let d = driver.clone();
let exec: Arc<dyn Executor> = executor.clone();
let handle = tokio::spawn(async move {
let queue = QueueConfig {
max_retries: 2,
dead_letter_queue: Some("failed".into()),
..default_queue()
};
let payload = d.pop(&queue.name).await.unwrap().unwrap();
let json_payload = serde_json::json!({"payload": String::from_utf8_lossy(&payload)});
let encoded = Bytes::from(serde_json::to_vec(&json_payload).unwrap());
let mut retries = 0u32;
loop {
match exec.execute_method("jobs.process", encoded.clone()).await {
Ok(_) => break,
Err(_) => {
retries += 1;
if retries >= queue.max_retries {
d.push(queue.dead_letter_queue.as_ref().unwrap(), payload.clone())
.await
.unwrap();
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
}
});
tokio::time::timeout(Duration::from_secs(2), handle)
.await
.expect("timed out")
.expect("task panicked");
assert_eq!(driver.depth("failed").await.unwrap(), 1);
let dlq_job = tokio::time::timeout(Duration::from_millis(100), driver.pop("failed"))
.await
.unwrap()
.unwrap()
.unwrap();
assert_eq!(dlq_job, Bytes::from("dlq-job"));
}
#[test]
fn backoff_strategy_exponential() {
let base = Duration::from_millis(100);
assert_eq!(
BackoffStrategy::Exponential.delay(base, 0),
Duration::from_millis(100)
);
assert_eq!(
BackoffStrategy::Exponential.delay(base, 1),
Duration::from_millis(200)
);
assert_eq!(
BackoffStrategy::Exponential.delay(base, 2),
Duration::from_millis(400)
);
assert_eq!(
BackoffStrategy::Exponential.delay(base, 3),
Duration::from_millis(800)
);
}
#[test]
fn backoff_strategy_linear() {
let base = Duration::from_millis(100);
assert_eq!(
BackoffStrategy::Linear.delay(base, 0),
Duration::from_millis(100)
);
assert_eq!(
BackoffStrategy::Linear.delay(base, 1),
Duration::from_millis(200)
);
assert_eq!(
BackoffStrategy::Linear.delay(base, 2),
Duration::from_millis(300)
);
}
#[test]
fn backoff_strategy_fixed() {
let base = Duration::from_millis(100);
assert_eq!(
BackoffStrategy::Fixed.delay(base, 0),
Duration::from_millis(100)
);
assert_eq!(
BackoffStrategy::Fixed.delay(base, 5),
Duration::from_millis(100)
);
}
#[tokio::test]
async fn config_redis_url_generation() {
let config = JobsConfig {
driver: DriverKind::Redis,
host: "myhost".into(),
port: 6380,
password: "secret".into(),
db: 2,
queues: vec![],
};
assert_eq!(config.redis_url(), "redis://:secret@myhost:6380/2");
let config_no_pass = JobsConfig {
driver: DriverKind::Redis,
host: "localhost".into(),
port: 6379,
password: String::new(),
db: 0,
queues: vec![],
};
assert_eq!(config_no_pass.redis_url(), "redis://localhost:6379/0");
}
#[tokio::test]
async fn config_deserialization_with_new_fields() {
let json = serde_json::json!({
"driver": "memory",
"queues": [{
"name": "emails",
"concurrency": 2,
"max_retries": 5,
"retry_delay": "2s",
"retry_backoff": "linear",
"job_timeout": "30s",
"dead_letter_queue": "failed-emails",
"priority": 1
}]
});
let config: JobsConfig = serde_json::from_value(json).unwrap();
assert_eq!(config.queues.len(), 1);
let q = &config.queues[0];
assert_eq!(q.name, "emails");
assert_eq!(q.concurrency, 2);
assert_eq!(q.max_retries, 5);
assert_eq!(q.retry_delay, Duration::from_secs(2));
assert_eq!(q.retry_backoff, BackoffStrategy::Linear);
assert_eq!(q.job_timeout, Duration::from_secs(30));
assert_eq!(q.dead_letter_queue.as_deref(), Some("failed-emails"));
assert_eq!(q.priority, 1);
}
#[tokio::test]
async fn config_defaults_are_sensible() {
let json = serde_json::json!({});
let config: JobsConfig = serde_json::from_value(json).unwrap();
assert_eq!(config.driver, DriverKind::Memory);
assert_eq!(config.host, "127.0.0.1");
assert_eq!(config.port, 6379);
assert_eq!(config.queues.len(), 1);
let q = &config.queues[0];
assert_eq!(q.name, "default");
assert_eq!(q.retry_delay, Duration::from_secs(1));
assert_eq!(q.retry_backoff, BackoffStrategy::Exponential);
assert_eq!(q.job_timeout, Duration::from_secs(60));
assert!(q.dead_letter_queue.is_none());
}
#[test]
fn factory_accepts_empty_config() {
use folk_plugin_jobs::folk_plugin_factory;
assert!(
folk_plugin_factory().create(serde_json::json!({})).is_ok(),
"absent [jobs] section must not crash the server"
);
}