oxanus/
config.rs

1use std::collections::HashSet;
2use std::pin::Pin;
3use tokio_util::sync::CancellationToken;
4
5use crate::Storage;
6use crate::queue::{Queue, QueueConfig};
7use crate::worker::Worker;
8use crate::worker_registry::WorkerRegistry;
9
10pub struct Config<DT, ET> {
11    pub(crate) registry: WorkerRegistry<DT, ET>,
12    pub(crate) queues: HashSet<QueueConfig>,
13    pub(crate) exit_when_processed: Option<u64>,
14    pub(crate) shutdown_signal:
15        Pin<Box<dyn Future<Output = Result<(), std::io::Error>> + Send + Sync>>,
16    pub(crate) shutdown_timeout: std::time::Duration,
17    pub(crate) cancel_token: CancellationToken,
18    pub storage: Storage,
19}
20
21impl<DT, ET> Config<DT, ET> {
22    pub fn new(storage: &Storage) -> Self {
23        Self {
24            registry: WorkerRegistry::new(),
25            queues: HashSet::new(),
26            exit_when_processed: None,
27            shutdown_signal: Box::pin(default_shutdown_signal()),
28            shutdown_timeout: std::time::Duration::from_secs(180),
29            cancel_token: CancellationToken::new(),
30            storage: storage.clone(),
31        }
32    }
33
34    pub fn register_queue<Q>(mut self) -> Self
35    where
36        Q: Queue,
37    {
38        self.queues.insert(Q::to_config());
39        self
40    }
41
42    pub fn register_queue_with_concurrency<Q>(mut self, concurrency: usize) -> Self
43    where
44        Q: Queue,
45    {
46        let mut config = Q::to_config();
47        config.concurrency = concurrency;
48        self.queues.insert(config);
49        self
50    }
51
52    pub fn register_worker<T>(mut self) -> Self
53    where
54        T: Worker<Context = DT, Error = ET> + serde::de::DeserializeOwned + 'static,
55    {
56        self.registry.register::<T>();
57        self
58    }
59
60    pub fn register_cron_worker<T>(mut self, schedule: &str, queue: impl Queue) -> Self
61    where
62        T: Worker<Context = DT, Error = ET> + serde::de::DeserializeOwned + 'static,
63    {
64        self.queues.insert(queue.config());
65        self.registry.register_cron::<T>(schedule, queue.key());
66        self
67    }
68
69    pub fn exit_when_processed(mut self, processed: u64) -> Self {
70        self.exit_when_processed = Some(processed);
71        self
72    }
73
74    pub fn with_graceful_shutdown(
75        mut self,
76        fut: impl Future<Output = Result<(), std::io::Error>> + Send + Sync + 'static,
77    ) -> Self {
78        self.shutdown_signal = Box::pin(fut);
79        self
80    }
81
82    pub fn consume_shutdown_signal(
83        &mut self,
84    ) -> Pin<Box<dyn Future<Output = Result<(), std::io::Error>> + Send + Sync + 'static>> {
85        let mut shutdown_signal = no_signal();
86        std::mem::swap(&mut self.shutdown_signal, &mut shutdown_signal);
87        shutdown_signal
88    }
89}
90
91#[cfg(any(target_os = "linux", target_os = "macos"))]
92async fn default_shutdown_signal() -> Result<(), std::io::Error> {
93    let ctrl_c = tokio::signal::ctrl_c();
94    let mut terminate = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
95
96    tokio::select! {
97        _ = ctrl_c => Ok(()),
98        _ = terminate.recv() => Ok(()),
99    }
100}
101
102#[cfg(target_os = "windows")]
103async fn default_shutdown_signal() -> Result<(), std::io::Error> {
104    tokio::signal::ctrl_c().await
105}
106
107fn no_signal() -> Pin<Box<dyn Future<Output = Result<(), std::io::Error>> + Send + Sync + 'static>>
108{
109    Box::pin(async move { Ok(()) })
110}