oxanus/
config.rs

1use std::collections::HashSet;
2use std::pin::Pin;
3use tokio_util::sync::CancellationToken;
4
5use crate::Storage;
6use crate::queue::{Queue, QueueConfig};
7use crate::worker::Worker;
8use crate::worker_registry::{WorkerConfig, WorkerRegistry};
9
10pub struct Config<DT, ET> {
11    pub(crate) registry: WorkerRegistry<DT, ET>,
12    pub(crate) queues: HashSet<QueueConfig>,
13    pub(crate) exit_when_processed: Option<u64>,
14    pub(crate) shutdown_signal:
15        Pin<Box<dyn Future<Output = Result<(), std::io::Error>> + Send + Sync>>,
16    pub(crate) shutdown_timeout: std::time::Duration,
17    pub(crate) cancel_token: CancellationToken,
18    pub storage: Storage,
19}
20
21impl<DT, ET> Config<DT, ET> {
22    pub fn new(storage: &Storage) -> Self {
23        Self {
24            registry: WorkerRegistry::new(),
25            queues: HashSet::new(),
26            exit_when_processed: None,
27            shutdown_signal: Box::pin(default_shutdown_signal()),
28            shutdown_timeout: std::time::Duration::from_secs(180),
29            cancel_token: CancellationToken::new(),
30            storage: storage.clone(),
31        }
32    }
33
34    pub fn register_queue<Q>(mut self) -> Self
35    where
36        Q: Queue,
37    {
38        self.register_queue_with(Q::to_config());
39        self
40    }
41
42    pub fn register_queue_with(&mut self, config: QueueConfig) {
43        self.queues.insert(config);
44    }
45
46    pub fn register_queue_with_concurrency<Q>(mut self, concurrency: usize) -> Self
47    where
48        Q: Queue,
49    {
50        let mut config = Q::to_config();
51        config.concurrency = concurrency;
52        self.register_queue_with(config);
53        self
54    }
55
56    pub fn register_worker<W>(mut self) -> Self
57    where
58        W: Worker<Context = DT, Error = ET> + serde::de::DeserializeOwned + 'static,
59    {
60        if let (Some(schedule), Some(queue_config)) = (W::cron_schedule(), W::cron_queue_config()) {
61            let key = queue_config
62                .static_key()
63                .expect("Statically defined cron workers can only use static queues");
64            self.register_queue_with(queue_config);
65            self.registry.register_cron::<W>(&schedule, key);
66        } else {
67            self.registry.register::<W>();
68        }
69        self
70    }
71
72    /// Register a cron worker with a dynamic queue
73    pub fn register_cron_worker<W>(mut self, queue: impl Queue) -> Self
74    where
75        W: Worker<Context = DT, Error = ET> + serde::de::DeserializeOwned + 'static,
76    {
77        self.register_queue_with(queue.config());
78        let schedule = W::cron_schedule().expect("Cron Worker must have cron_schedule defined");
79        self.registry.register_cron::<W>(&schedule, queue.key());
80        self
81    }
82
83    pub fn register_worker_with(&mut self, config: WorkerConfig<DT, ET>) {
84        self.registry.register_worker_with(config);
85    }
86
87    pub fn exit_when_processed(mut self, processed: u64) -> Self {
88        self.exit_when_processed = Some(processed);
89        self
90    }
91
92    pub fn with_graceful_shutdown(
93        mut self,
94        fut: impl Future<Output = Result<(), std::io::Error>> + Send + Sync + 'static,
95    ) -> Self {
96        self.shutdown_signal = Box::pin(fut);
97        self
98    }
99
100    pub fn consume_shutdown_signal(
101        &mut self,
102    ) -> Pin<Box<dyn Future<Output = Result<(), std::io::Error>> + Send + Sync + 'static>> {
103        let mut shutdown_signal = no_signal();
104        std::mem::swap(&mut self.shutdown_signal, &mut shutdown_signal);
105        shutdown_signal
106    }
107
108    pub fn has_registered_queue<Q: Queue>(&self) -> bool {
109        self.queues.contains(&Q::to_config())
110    }
111
112    pub fn has_registered_worker<W>(&self) -> bool
113    where
114        W: Worker<Context = DT, Error = ET>,
115    {
116        self.registry.has_registered::<W>()
117    }
118
119    pub fn has_registered_cron_worker<W>(&self) -> bool
120    where
121        W: Worker<Context = DT, Error = ET>,
122    {
123        self.registry.has_registered_cron::<W>()
124    }
125}
126
127#[cfg(any(target_os = "linux", target_os = "macos"))]
128async fn default_shutdown_signal() -> Result<(), std::io::Error> {
129    let ctrl_c = tokio::signal::ctrl_c();
130    let mut terminate = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
131
132    tokio::select! {
133        _ = ctrl_c => Ok(()),
134        _ = terminate.recv() => Ok(()),
135    }
136}
137
138#[cfg(target_os = "windows")]
139async fn default_shutdown_signal() -> Result<(), std::io::Error> {
140    tokio::signal::ctrl_c().await
141}
142
143fn no_signal() -> Pin<Box<dyn Future<Output = Result<(), std::io::Error>> + Send + Sync + 'static>>
144{
145    Box::pin(async move { Ok(()) })
146}