apalis_core/monitor/
mod.rs

1use std::{
2    fmt::{self, Debug, Formatter},
3    sync::Arc,
4};
5
6use futures::{future::BoxFuture, Future, FutureExt};
7use tower::{Layer, Service};
8
9/// Shutdown utilities
10pub mod shutdown;
11
12use crate::{
13    backend::Backend,
14    error::BoxDynError,
15    request::Request,
16    worker::{Context, Event, EventHandler, Ready, Worker, WorkerId},
17};
18
19use self::shutdown::Shutdown;
20
21/// A monitor for coordinating and managing a collection of workers.
22pub struct Monitor {
23    futures: Vec<BoxFuture<'static, ()>>,
24    workers: Vec<Worker<Context>>,
25    terminator: Option<BoxFuture<'static, ()>>,
26    shutdown: Shutdown,
27    event_handler: EventHandler,
28}
29
30impl Debug for Monitor {
31    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
32        f.debug_struct("Monitor")
33            .field("shutdown", &"[Graceful shutdown listener]")
34            .field("workers", &self.futures.len())
35            .finish()
36    }
37}
38
39impl Monitor {
40    /// Registers a single instance of a [Worker]
41    pub fn register<Req, S, P, Ctx>(mut self, mut worker: Worker<Ready<S, P>>) -> Self
42    where
43        S: Service<Request<Req, Ctx>> + Send + 'static,
44        S::Future: Send,
45        S::Error: Send + Sync + 'static + Into<BoxDynError>,
46        P: Backend<Request<Req, Ctx>> + Send + 'static,
47        P::Stream: Unpin + Send + 'static,
48        P::Layer: Layer<S> + Send,
49        <P::Layer as Layer<S>>::Service: Service<Request<Req, Ctx>> + Send,
50        <<P::Layer as Layer<S>>::Service as Service<Request<Req, Ctx>>>::Future: Send,
51        <<P::Layer as Layer<S>>::Service as Service<Request<Req, Ctx>>>::Error:
52            Send + Sync + Into<BoxDynError>,
53        Req: Send + 'static,
54        Ctx: Send + 'static,
55    {
56        worker.state.shutdown = Some(self.shutdown.clone());
57        worker.state.event_handler = self.event_handler.clone();
58        let runnable = worker.run();
59        let handle = runnable.get_handle();
60        self.workers.push(handle);
61        self.futures.push(runnable.boxed());
62        self
63    }
64
65    /// Registers multiple workers with the monitor.
66    ///
67    /// # Arguments
68    ///
69    /// * `count` - The number of workers to register.
70    /// * `worker` - A Worker that is ready for running.
71    ///
72    /// # Returns
73    ///
74    /// The monitor instance, with all workers added to the collection.
75    #[deprecated(
76        since = "0.6.0",
77        note = "Consider using the `.register` as workers now offer concurrency by default"
78    )]
79    pub fn register_with_count<Req, S, P, Ctx>(
80        mut self,
81        count: usize,
82        worker: Worker<Ready<S, P>>,
83    ) -> Self
84    where
85        S: Service<Request<Req, Ctx>> + Send + 'static + Clone,
86        S::Future: Send,
87        S::Error: Send + Sync + 'static + Into<BoxDynError>,
88        P: Backend<Request<Req, Ctx>> + Send + 'static + Clone,
89        P::Stream: Unpin + Send + 'static,
90        P::Layer: Layer<S> + Send,
91        <P::Layer as Layer<S>>::Service: Service<Request<Req, Ctx>> + Send,
92        <<P::Layer as Layer<S>>::Service as Service<Request<Req, Ctx>>>::Future: Send,
93        <<P::Layer as Layer<S>>::Service as Service<Request<Req, Ctx>>>::Error:
94            Send + Sync + Into<BoxDynError>,
95        Req: Send + 'static,
96        Ctx: Send + 'static,
97    {
98        for index in 0..count {
99            let mut worker = worker.clone();
100            let name = format!("{}-{index}", worker.id());
101            worker.id = WorkerId::new(name);
102            self = self.register(worker);
103        }
104        self
105    }
106    /// Runs the monitor and all its registered workers until they have all completed or a shutdown signal is received.
107    ///
108    /// # Arguments
109    ///
110    /// * `signal` - A `Future` that resolves when a shutdown signal is received.
111    ///
112    /// # Errors
113    ///
114    /// If the monitor fails to shutdown gracefully, an `std::io::Error` will be returned.
115    ///
116    /// # Remarks
117    ///
118    /// If a timeout has been set using the `Monitor::shutdown_timeout` method, the monitor
119    /// will wait for all workers to complete up to the timeout duration before exiting.
120    /// If the timeout is reached and workers have not completed, the monitor will exit forcefully.
121    pub async fn run_with_signal<S>(self, signal: S) -> std::io::Result<()>
122    where
123        S: Send + Future<Output = std::io::Result<()>>,
124    {
125        let shutdown = self.shutdown.clone();
126        let shutdown_after = self.shutdown.shutdown_after(signal);
127        if let Some(terminator) = self.terminator {
128            let _res = futures::future::select(
129                futures::future::join_all(self.futures)
130                    .map(|_| shutdown.start_shutdown())
131                    .boxed(),
132                async {
133                    let _res = shutdown_after.await;
134                    terminator.await;
135                }
136                .boxed(),
137            )
138            .await;
139        } else {
140            let runner = self.run();
141            let _res = futures::join!(shutdown_after, runner); // If no terminator is provided, we wait for both the shutdown call and all workers to complete
142        }
143        Ok(())
144    }
145
146    /// Runs the monitor and all its registered workers until they have all completed.
147    ///
148    /// # Errors
149    ///
150    /// If the monitor fails to run gracefully, an `std::io::Error` will be returned.
151    ///
152    /// # Remarks
153    ///
154    /// If all workers have completed execution, then by default the monitor will start a shutdown
155    pub async fn run(self) -> std::io::Result<()> {
156        let shutdown = self.shutdown.clone();
157        let shutdown_future = self.shutdown.boxed().map(|_| ());
158        futures::join!(
159            futures::future::join_all(self.futures).map(|_| shutdown.start_shutdown()),
160            shutdown_future,
161        );
162
163        Ok(())
164    }
165
166    /// Handles events emitted
167    pub fn on_event<F: Fn(Worker<Event>) + Send + Sync + 'static>(self, f: F) -> Self {
168        let _ = self.event_handler.write().map(|mut res| {
169            let _ = res.insert(Box::new(f));
170        });
171        self
172    }
173}
174
175impl Default for Monitor {
176    fn default() -> Self {
177        Self {
178            shutdown: Shutdown::new(),
179            futures: Vec::new(),
180            terminator: None,
181            event_handler: Arc::default(),
182            workers: Vec::new(),
183        }
184    }
185}
186
187impl Monitor {
188    /// Creates a new monitor instance.
189    ///
190    /// # Returns
191    ///
192    /// A new monitor instance, with an empty collection of workers.
193    pub fn new() -> Self {
194        Self::default()
195    }
196
197    /// Sets a timeout duration for the monitor's shutdown process.
198    ///
199    /// # Arguments
200    ///
201    /// * `duration` - The timeout duration.
202    ///
203    /// # Returns
204    ///
205    /// The monitor instance, with the shutdown timeout duration set.
206    #[cfg(feature = "sleep")]
207    pub fn shutdown_timeout(self, duration: std::time::Duration) -> Self {
208        self.with_terminator(crate::sleep(duration))
209    }
210
211    /// Sets a future that will start being polled when the monitor's shutdown process starts.
212    ///
213    /// After shutdown has been initiated, the `terminator` future will be run, and if it completes
214    /// before all tasks are completed the shutdown process will complete, thus finishing the
215    /// shutdown even if there are outstanding tasks. This can be useful for using a timeout or
216    /// signal (or combination) to force a full shutdown even if one or more tasks are taking
217    /// longer than expected to finish.
218    pub fn with_terminator(mut self, fut: impl Future<Output = ()> + Send + 'static) -> Self {
219        self.terminator = Some(fut.boxed());
220        self
221    }
222}
223
224#[cfg(test)]
225mod tests {
226    use crate::test_utils::apalis_test_service_fn;
227    use std::{io, time::Duration};
228
229    use tokio::time::sleep;
230
231    use crate::{
232        builder::{WorkerBuilder, WorkerFactory},
233        memory::MemoryStorage,
234        monitor::Monitor,
235        mq::MessageQueue,
236        request::Request,
237        test_message_queue,
238        test_utils::TestWrapper,
239    };
240
241    test_message_queue!(MemoryStorage::new());
242
243    #[tokio::test]
244    async fn it_works_with_workers() {
245        let backend = MemoryStorage::new();
246        let mut handle = backend.clone();
247
248        tokio::spawn(async move {
249            for i in 0..10 {
250                handle.enqueue(i).await.unwrap();
251            }
252        });
253        let service = tower::service_fn(|request: Request<u32, ()>| async {
254            tokio::time::sleep(Duration::from_secs(1)).await;
255            Ok::<_, io::Error>(request)
256        });
257        let worker = WorkerBuilder::new("rango-tango")
258            .backend(backend)
259            .build(service);
260        let monitor: Monitor = Monitor::new();
261        let monitor = monitor.register(worker);
262        let shutdown = monitor.shutdown.clone();
263        tokio::spawn(async move {
264            sleep(Duration::from_millis(1500)).await;
265            shutdown.start_shutdown();
266        });
267        monitor.run().await.unwrap();
268    }
269    #[tokio::test]
270    async fn test_monitor_run() {
271        let backend = MemoryStorage::new();
272        let mut handle = backend.clone();
273
274        tokio::spawn(async move {
275            for i in 0..10 {
276                handle.enqueue(i).await.unwrap();
277            }
278        });
279        let service = tower::service_fn(|request: Request<u32, _>| async {
280            tokio::time::sleep(Duration::from_secs(1)).await;
281            Ok::<_, io::Error>(request)
282        });
283        let worker = WorkerBuilder::new("rango-tango")
284            .backend(backend)
285            .build(service);
286        let monitor: Monitor = Monitor::new();
287        let monitor = monitor.on_event(|e| {
288            println!("{e:?}");
289        });
290        let monitor = monitor.register(worker);
291        assert_eq!(monitor.futures.len(), 1);
292        let shutdown = monitor.shutdown.clone();
293        tokio::spawn(async move {
294            sleep(Duration::from_millis(1000)).await;
295            shutdown.start_shutdown();
296        });
297
298        let result = monitor.run().await;
299        sleep(Duration::from_millis(1000)).await;
300        assert!(result.is_ok());
301    }
302}