1use std::collections::HashSet;
2use std::pin::Pin;
3use tokio_util::sync::CancellationToken;
4
5use crate::Storage;
6use crate::queue::{Queue, QueueConfig};
7use crate::worker::Worker;
8use crate::worker_registry::{WorkerConfig, WorkerRegistry};
9
10pub struct Config<DT, ET> {
11 pub(crate) registry: WorkerRegistry<DT, ET>,
12 pub(crate) queues: HashSet<QueueConfig>,
13 pub(crate) exit_when_processed: Option<u64>,
14 pub(crate) shutdown_signal:
15 Pin<Box<dyn Future<Output = Result<(), std::io::Error>> + Send + Sync>>,
16 pub(crate) shutdown_timeout: std::time::Duration,
17 pub(crate) cancel_token: CancellationToken,
18 pub storage: Storage,
19}
20
21impl<DT, ET> Config<DT, ET> {
22 pub fn new(storage: &Storage) -> Self {
23 Self {
24 registry: WorkerRegistry::new(),
25 queues: HashSet::new(),
26 exit_when_processed: None,
27 shutdown_signal: Box::pin(default_shutdown_signal()),
28 shutdown_timeout: std::time::Duration::from_secs(180),
29 cancel_token: CancellationToken::new(),
30 storage: storage.clone(),
31 }
32 }
33
34 pub fn register_queue<Q>(mut self) -> Self
35 where
36 Q: Queue,
37 {
38 self.register_queue_with(Q::to_config());
39 self
40 }
41
42 pub fn register_queue_with(&mut self, config: QueueConfig) {
43 self.queues.insert(config);
44 }
45
46 pub fn register_queue_with_concurrency<Q>(mut self, concurrency: usize) -> Self
47 where
48 Q: Queue,
49 {
50 let mut config = Q::to_config();
51 config.concurrency = concurrency;
52 self.register_queue_with(config);
53 self
54 }
55
56 pub fn register_worker<W>(mut self) -> Self
57 where
58 W: Worker<Context = DT, Error = ET> + serde::de::DeserializeOwned + 'static,
59 {
60 if let (Some(schedule), Some(queue_config)) = (W::cron_schedule(), W::cron_queue_config()) {
61 let key = queue_config
62 .static_key()
63 .expect("Statically defined cron workers can only use static queues");
64 self.register_queue_with(queue_config);
65 self.registry.register_cron::<W>(&schedule, key);
66 } else {
67 self.registry.register::<W>();
68 }
69 self
70 }
71
72 pub fn register_cron_worker<W>(mut self, queue: impl Queue) -> Self
74 where
75 W: Worker<Context = DT, Error = ET> + serde::de::DeserializeOwned + 'static,
76 {
77 self.register_queue_with(queue.config());
78 let schedule = W::cron_schedule().expect("Cron Worker must have cron_schedule defined");
79 self.registry.register_cron::<W>(&schedule, queue.key());
80 self
81 }
82
83 pub fn register_worker_with(&mut self, config: WorkerConfig<DT, ET>) {
84 self.registry.register_worker_with(config);
85 }
86
87 pub fn exit_when_processed(mut self, processed: u64) -> Self {
88 self.exit_when_processed = Some(processed);
89 self
90 }
91
92 pub fn with_graceful_shutdown(
93 mut self,
94 fut: impl Future<Output = Result<(), std::io::Error>> + Send + Sync + 'static,
95 ) -> Self {
96 self.shutdown_signal = Box::pin(fut);
97 self
98 }
99
100 pub fn consume_shutdown_signal(
101 &mut self,
102 ) -> Pin<Box<dyn Future<Output = Result<(), std::io::Error>> + Send + Sync + 'static>> {
103 let mut shutdown_signal = no_signal();
104 std::mem::swap(&mut self.shutdown_signal, &mut shutdown_signal);
105 shutdown_signal
106 }
107
108 pub fn has_registered_queue<Q: Queue>(&self) -> bool {
109 self.queues.contains(&Q::to_config())
110 }
111
112 pub fn has_registered_worker<W>(&self) -> bool
113 where
114 W: Worker<Context = DT, Error = ET>,
115 {
116 self.registry.has_registered::<W>()
117 }
118
119 pub fn has_registered_cron_worker<W>(&self) -> bool
120 where
121 W: Worker<Context = DT, Error = ET>,
122 {
123 self.registry.has_registered_cron::<W>()
124 }
125}
126
127#[cfg(any(target_os = "linux", target_os = "macos"))]
128async fn default_shutdown_signal() -> Result<(), std::io::Error> {
129 let ctrl_c = tokio::signal::ctrl_c();
130 let mut terminate = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
131
132 tokio::select! {
133 _ = ctrl_c => Ok(()),
134 _ = terminate.recv() => Ok(()),
135 }
136}
137
138#[cfg(target_os = "windows")]
139async fn default_shutdown_signal() -> Result<(), std::io::Error> {
140 tokio::signal::ctrl_c().await
141}
142
143fn no_signal() -> Pin<Box<dyn Future<Output = Result<(), std::io::Error>> + Send + Sync + 'static>>
144{
145 Box::pin(async move { Ok(()) })
146}