1use std::collections::HashSet;
2use std::pin::Pin;
3use tokio_util::sync::CancellationToken;
4
5use crate::Storage;
6use crate::queue::{Queue, QueueConfig};
7use crate::worker::Worker;
8use crate::worker_registry::WorkerRegistry;
9
10pub struct Config<DT, ET> {
11 pub(crate) registry: WorkerRegistry<DT, ET>,
12 pub(crate) queues: HashSet<QueueConfig>,
13 pub(crate) exit_when_processed: Option<u64>,
14 pub(crate) shutdown_signal:
15 Pin<Box<dyn Future<Output = Result<(), std::io::Error>> + Send + Sync>>,
16 pub(crate) shutdown_timeout: std::time::Duration,
17 pub(crate) cancel_token: CancellationToken,
18 pub storage: Storage,
19}
20
21impl<DT, ET> Config<DT, ET> {
22 pub fn new(storage: &Storage) -> Self {
23 Self {
24 registry: WorkerRegistry::new(),
25 queues: HashSet::new(),
26 exit_when_processed: None,
27 shutdown_signal: Box::pin(default_shutdown_signal()),
28 shutdown_timeout: std::time::Duration::from_secs(180),
29 cancel_token: CancellationToken::new(),
30 storage: storage.clone(),
31 }
32 }
33
34 pub fn register_queue<Q>(mut self) -> Self
35 where
36 Q: Queue,
37 {
38 self.queues.insert(Q::to_config());
39 self
40 }
41
42 pub fn register_queue_with_concurrency<Q>(mut self, concurrency: usize) -> Self
43 where
44 Q: Queue,
45 {
46 let mut config = Q::to_config();
47 config.concurrency = concurrency;
48 self.queues.insert(config);
49 self
50 }
51
52 pub fn register_worker<T>(mut self) -> Self
53 where
54 T: Worker<Context = DT, Error = ET> + serde::de::DeserializeOwned + 'static,
55 {
56 self.registry.register::<T>();
57 self
58 }
59
60 pub fn register_cron_worker<T>(mut self, schedule: &str, queue: impl Queue) -> Self
61 where
62 T: Worker<Context = DT, Error = ET> + serde::de::DeserializeOwned + 'static,
63 {
64 self.queues.insert(queue.config());
65 self.registry.register_cron::<T>(schedule, queue.key());
66 self
67 }
68
69 pub fn exit_when_processed(mut self, processed: u64) -> Self {
70 self.exit_when_processed = Some(processed);
71 self
72 }
73
74 pub fn with_graceful_shutdown(
75 mut self,
76 fut: impl Future<Output = Result<(), std::io::Error>> + Send + Sync + 'static,
77 ) -> Self {
78 self.shutdown_signal = Box::pin(fut);
79 self
80 }
81
82 pub fn consume_shutdown_signal(
83 &mut self,
84 ) -> Pin<Box<dyn Future<Output = Result<(), std::io::Error>> + Send + Sync + 'static>> {
85 let mut shutdown_signal = no_signal();
86 std::mem::swap(&mut self.shutdown_signal, &mut shutdown_signal);
87 shutdown_signal
88 }
89}
90
91#[cfg(any(target_os = "linux", target_os = "macos"))]
92async fn default_shutdown_signal() -> Result<(), std::io::Error> {
93 let ctrl_c = tokio::signal::ctrl_c();
94 let mut terminate = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
95
96 tokio::select! {
97 _ = ctrl_c => Ok(()),
98 _ = terminate.recv() => Ok(()),
99 }
100}
101
102#[cfg(target_os = "windows")]
103async fn default_shutdown_signal() -> Result<(), std::io::Error> {
104 tokio::signal::ctrl_c().await
105}
106
107fn no_signal() -> Pin<Box<dyn Future<Output = Result<(), std::io::Error>> + Send + Sync + 'static>>
108{
109 Box::pin(async move { Ok(()) })
110}