eva_common/
workers.rs

1use crate::EResult;
2use crate::{Error, ErrorKind};
3use std::sync::{Arc, LazyLock};
4use std::time::Duration;
5use tokio::sync::{Mutex, Notify};
6
7#[macro_export]
8macro_rules! periodic_worker {
9    ($name: expr, $target: expr, $interval: expr) => {
10        let (trigger, _fut) = bmart::worker!($target);
11        $crate::workers::recreate_scheduler($name, trigger, $interval, false).await?;
12    };
13    ($name: expr, $target: expr, $interval: expr, $($arg:tt)+) => {
14        let (trigger, _fut) = bmart::worker!($target, $($arg)+);
15        $crate::workers::recreate_scheduler($name, trigger, $interval, false).await?;
16    };
17}
18
19#[macro_export]
20macro_rules! cleaner {
21    ($name: expr, $target: expr, $interval: expr) => {
22        $crate::periodic_worker!(&format!("cleaner::{}", $name), $target, $interval);
23    };
24    ($name: expr, $target: expr, $interval: expr, $($arg:tt)+) => {
25        $crate::periodic_worker!(&format!("cleaner::{}", $name), $target, $interval, $($arg)+);
26    };
27}
28
29impl From<bmart::Error> for Error {
30    fn from(error: bmart::Error) -> Self {
31        match error.kind {
32            bmart::ErrorKind::Duplicate => {
33                Error::newc(ErrorKind::ResourceAlreadyExists, error.message)
34            }
35            bmart::ErrorKind::NotFound => Error::newc(ErrorKind::ResourceNotFound, error.message),
36            bmart::ErrorKind::Internal => Error::newc(ErrorKind::CoreError, error.message),
37            bmart::ErrorKind::InvalidData => Error::newc(ErrorKind::InvalidData, error.message),
38            bmart::ErrorKind::Timeout => Error::newc(ErrorKind::Timeout, error.message),
39        }
40    }
41}
42
43static WORKERS: LazyLock<Mutex<bmart::workers::WorkerFactory>> =
44    LazyLock::new(|| Mutex::new(bmart::workers::WorkerFactory::new()));
45
46/// # Errors
47///
48/// Will return `Err` if failed to recreate the worker
49pub async fn recreate_scheduler(
50    worker_id: &str,
51    trigger: Arc<Notify>,
52    interval: Duration,
53    instant: bool,
54) -> EResult<()> {
55    WORKERS
56        .lock()
57        .await
58        .recreate_scheduler(worker_id, trigger, interval, instant)
59        .map_err(Into::into)
60}
61
62/// # Errors
63///
64/// Will return `Err` if the worker is not found
65pub async fn destroy_scheduler(worker_id: &str) -> EResult<()> {
66    WORKERS
67        .lock()
68        .await
69        .destroy_scheduler(worker_id)
70        .map_err(Into::into)
71}