use crate::{QueueConfig, WorkerConfigKind, context::Context, job_envelope::JobConflictStrategy};
use std::panic::UnwindSafe;
pub type BoxedWorker<DT, ET> = Box<dyn Worker<Context = DT, Error = ET>>;
#[async_trait::async_trait]
pub trait Worker: Send + Sync + UnwindSafe {
type Context: Clone + Send + Sync;
type Error: std::error::Error + Send + Sync;
async fn process(&self, data: &Context<Self::Context>) -> Result<(), Self::Error>;
fn max_retries(&self) -> u32 {
2
}
fn retry_delay(&self, retries: u32) -> u64 {
u64::pow(5, retries + 2)
}
fn unique_id(&self) -> Option<String> {
None
}
fn on_conflict(&self) -> JobConflictStrategy {
JobConflictStrategy::Skip
}
fn cron_schedule() -> Option<String>
where
Self: Sized,
{
None
}
fn cron_queue_config() -> Option<QueueConfig>
where
Self: Sized,
{
None
}
fn should_resurrect() -> bool
where
Self: Sized,
{
true
}
fn throttle_cost(&self) -> Option<u64> {
None
}
fn to_config() -> WorkerConfigKind
where
Self: Sized,
{
#[allow(clippy::collapsible_if)] if let (Some(schedule), Some(queue_config)) =
(Self::cron_schedule(), Self::cron_queue_config())
{
if let Some(queue_key) = queue_config.static_key() {
return WorkerConfigKind::Cron {
schedule,
queue_key,
resurrect: Self::should_resurrect(),
};
}
}
WorkerConfigKind::Normal
}
}
#[cfg(feature = "macros")]
#[cfg(test)]
mod tests {
use super::{JobConflictStrategy, Worker};
use crate as oxanus;
use crate::Context;
use crate::test_helper::create_worker_context;
use serde::{Deserialize, Serialize};
use std::io::Error as WorkerError;
use std::sync::{Arc, Mutex};
#[derive(Clone, Default)]
struct WorkerContext {
count: Arc<Mutex<usize>>,
}
#[derive(oxanus::Registry)]
#[allow(dead_code)]
struct ComponentRegistry(oxanus::ComponentRegistry<WorkerContext, WorkerError>);
#[derive(oxanus::Registry)]
#[allow(dead_code)]
struct ComponentRegistryFmt(oxanus::ComponentRegistry<WorkerContext, std::fmt::Error>);
#[tokio::test]
async fn test_define_worker_with_macro() {
#[derive(Serialize, Deserialize, oxanus::Worker)]
struct TestWorker {}
impl TestWorker {
async fn process(&self, ctx: &Context<WorkerContext>) -> Result<(), WorkerError> {
*ctx.ctx
.count
.lock()
.map_err(|e| std::io::Error::other(e.to_string()))? += 1;
Ok(())
}
}
assert_eq!(TestWorker {}.max_retries(), 2);
assert_eq!(TestWorker {}.on_conflict(), JobConflictStrategy::Skip);
let ctx = WorkerContext::default();
let context = create_worker_context(ctx.clone(), TestWorker {}).await;
Worker::process(&TestWorker {}, &context).await.unwrap();
assert_eq!(*ctx.count.lock().unwrap(), 1);
Worker::process(&TestWorker {}, &context).await.unwrap();
assert_eq!(*ctx.count.lock().unwrap(), 2);
#[derive(Serialize, Deserialize, oxanus::Worker)]
#[oxanus(error = std::fmt::Error, registry = ComponentRegistryFmt)]
#[oxanus(max_retries = 3, retry_delay = 10)]
#[oxanus(on_conflict = Replace)]
struct TestWorkerCustomError {}
impl TestWorkerCustomError {
async fn process(&self, _: &Context<WorkerContext>) -> Result<(), std::fmt::Error> {
use std::fmt::Write;
let mut s = String::new();
write!(&mut s, "hi")
}
}
assert_eq!(TestWorkerCustomError {}.unique_id(), None);
assert_eq!(TestWorkerCustomError {}.max_retries(), 3);
assert_eq!(TestWorkerCustomError {}.retry_delay(1), 10);
assert_eq!(
TestWorkerCustomError {}.on_conflict(),
JobConflictStrategy::Replace
);
#[derive(Serialize, Deserialize, oxanus::Worker)]
#[oxanus(unique_id = "test_worker_{id}")]
struct TestWorkerUniqueId {
id: i32,
_1: i32,
}
impl TestWorkerUniqueId {
async fn process(&self, _: &Context<WorkerContext>) -> Result<(), WorkerError> {
Ok(())
}
}
assert_eq!(TestWorkerUniqueId { id: 1, _1: 0 }.max_retries(), 2);
assert_eq!(
TestWorkerUniqueId { id: 1, _1: 0 }.unique_id().unwrap(),
"test_worker_1"
);
assert_eq!(
TestWorkerUniqueId { id: 12, _1: 0 }.unique_id().unwrap(),
"test_worker_12"
);
#[derive(Serialize, Deserialize, oxanus::Worker)]
#[oxanus(unique_id(fmt = "test_worker_{id}_{task}", id = self.id, task = self.task.name))]
struct TestWorkerNestedUniqueId {
id: i32,
task: TestWorkerNestedTask,
}
#[derive(Serialize, Deserialize, Default)]
struct TestWorkerNestedTask {
name: String,
}
impl TestWorkerNestedUniqueId {
async fn process(&self, _: &Context<WorkerContext>) -> Result<(), WorkerError> {
Ok(())
}
}
assert_eq!(
TestWorkerNestedUniqueId {
id: 1,
task: Default::default()
}
.max_retries(),
2
);
assert_eq!(
TestWorkerNestedUniqueId {
id: 1,
task: TestWorkerNestedTask {
name: "task1".to_owned(),
}
}
.unique_id()
.unwrap(),
"test_worker_1_task1"
);
assert_eq!(
TestWorkerNestedUniqueId {
id: 2,
task: TestWorkerNestedTask {
name: "task2".to_owned(),
}
}
.unique_id()
.unwrap(),
"test_worker_2_task2"
);
#[derive(Serialize, Deserialize, oxanus::Worker)]
#[oxanus(unique_id = Self::unique_id)]
#[oxanus(retry_delay = Self::retry_delay)]
#[oxanus(max_retries = Self::max_retries)]
struct TestWorkerCustomUniqueId {
id: i32,
task: TestWorkerNestedTask,
}
impl TestWorkerCustomUniqueId {
async fn process(&self, _: &Context<WorkerContext>) -> Result<(), WorkerError> {
Ok(())
}
fn unique_id(&self) -> Option<String> {
Some(format!("worker_id_{}_task_{}", self.id, self.task.name))
}
fn retry_delay(&self, retries: u32) -> u64 {
retries as u64 * 2
}
fn max_retries(&self) -> u32 {
9
}
}
assert_eq!(
Worker::unique_id(&TestWorkerCustomUniqueId {
id: 1,
task: TestWorkerNestedTask {
name: "11".to_owned(),
}
})
.unwrap(),
"worker_id_1_task_11"
);
let worker2 = TestWorkerCustomUniqueId {
id: 2,
task: TestWorkerNestedTask {
name: "22".to_owned(),
},
};
assert_eq!(Worker::unique_id(&worker2).unwrap(), "worker_id_2_task_22");
assert_eq!(worker2.retry_delay(1), 2);
assert_eq!(worker2.retry_delay(2), 4);
assert_eq!(worker2.max_retries(), 9);
}
#[tokio::test]
async fn test_define_cron_worker_with_macro() {
use crate as oxanus; use crate::Queue;
use std::io::Error as WorkerError;
#[derive(Serialize, oxanus::Queue)]
struct DefaultQueue;
#[derive(Serialize, Deserialize, oxanus::Worker)]
#[oxanus(cron(schedule = "*/1 * * * * *", queue = DefaultQueue))]
struct TestCronWorker {}
impl TestCronWorker {
async fn process(&self, _: &Context<WorkerContext>) -> Result<(), WorkerError> {
Ok(())
}
}
assert_eq!(TestCronWorker {}.unique_id(), None);
assert_eq!(TestCronWorker::cron_schedule().unwrap(), "*/1 * * * * *");
assert_eq!(
TestCronWorker::cron_queue_config().unwrap(),
DefaultQueue::to_config(),
);
assert!(TestCronWorker::should_resurrect());
}
#[tokio::test]
async fn test_define_worker_with_resurrect_false() {
use crate as oxanus;
use std::io::Error as WorkerError;
#[derive(Serialize, Deserialize, oxanus::Worker)]
#[oxanus(resurrect = false)]
struct NoResurrectWorker {}
impl NoResurrectWorker {
async fn process(&self, _: &Context<WorkerContext>) -> Result<(), WorkerError> {
Ok(())
}
}
assert!(!NoResurrectWorker::should_resurrect());
#[derive(Serialize, Deserialize, oxanus::Worker)]
struct DefaultResurrectWorker {}
impl DefaultResurrectWorker {
async fn process(&self, _: &Context<WorkerContext>) -> Result<(), WorkerError> {
Ok(())
}
}
assert!(DefaultResurrectWorker::should_resurrect());
}
}