1use std::{
2 fmt::{self, Debug, Formatter},
3 sync::Arc,
4};
5
6use futures::{future::BoxFuture, Future, FutureExt};
7use tower::{Layer, Service};
8
9pub mod shutdown;
11
12use crate::{
13 backend::Backend,
14 error::BoxDynError,
15 request::Request,
16 worker::{Context, Event, EventHandler, Ready, Worker, WorkerId},
17};
18
19use self::shutdown::Shutdown;
20
21pub struct Monitor {
23 futures: Vec<BoxFuture<'static, ()>>,
24 workers: Vec<Worker<Context>>,
25 terminator: Option<BoxFuture<'static, ()>>,
26 shutdown: Shutdown,
27 event_handler: EventHandler,
28}
29
30impl Debug for Monitor {
31 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
32 f.debug_struct("Monitor")
33 .field("shutdown", &"[Graceful shutdown listener]")
34 .field("workers", &self.futures.len())
35 .finish()
36 }
37}
38
39impl Monitor {
40 pub fn register<Req, S, P, Ctx>(mut self, mut worker: Worker<Ready<S, P>>) -> Self
42 where
43 S: Service<Request<Req, Ctx>> + Send + 'static,
44 S::Future: Send,
45 S::Error: Send + Sync + 'static + Into<BoxDynError>,
46 P: Backend<Request<Req, Ctx>> + Send + 'static,
47 P::Stream: Unpin + Send + 'static,
48 P::Layer: Layer<S> + Send,
49 <P::Layer as Layer<S>>::Service: Service<Request<Req, Ctx>> + Send,
50 <<P::Layer as Layer<S>>::Service as Service<Request<Req, Ctx>>>::Future: Send,
51 <<P::Layer as Layer<S>>::Service as Service<Request<Req, Ctx>>>::Error:
52 Send + Sync + Into<BoxDynError>,
53 Req: Send + 'static,
54 Ctx: Send + 'static,
55 {
56 worker.state.shutdown = Some(self.shutdown.clone());
57 worker.state.event_handler = self.event_handler.clone();
58 let runnable = worker.run();
59 let handle = runnable.get_handle();
60 self.workers.push(handle);
61 self.futures.push(runnable.boxed());
62 self
63 }
64
65 #[deprecated(
76 since = "0.6.0",
77 note = "Consider using the `.register` as workers now offer concurrency by default"
78 )]
79 pub fn register_with_count<Req, S, P, Ctx>(
80 mut self,
81 count: usize,
82 worker: Worker<Ready<S, P>>,
83 ) -> Self
84 where
85 S: Service<Request<Req, Ctx>> + Send + 'static + Clone,
86 S::Future: Send,
87 S::Error: Send + Sync + 'static + Into<BoxDynError>,
88 P: Backend<Request<Req, Ctx>> + Send + 'static + Clone,
89 P::Stream: Unpin + Send + 'static,
90 P::Layer: Layer<S> + Send,
91 <P::Layer as Layer<S>>::Service: Service<Request<Req, Ctx>> + Send,
92 <<P::Layer as Layer<S>>::Service as Service<Request<Req, Ctx>>>::Future: Send,
93 <<P::Layer as Layer<S>>::Service as Service<Request<Req, Ctx>>>::Error:
94 Send + Sync + Into<BoxDynError>,
95 Req: Send + 'static,
96 Ctx: Send + 'static,
97 {
98 for index in 0..count {
99 let mut worker = worker.clone();
100 let name = format!("{}-{index}", worker.id());
101 worker.id = WorkerId::new(name);
102 self = self.register(worker);
103 }
104 self
105 }
106 pub async fn run_with_signal<S>(self, signal: S) -> std::io::Result<()>
122 where
123 S: Send + Future<Output = std::io::Result<()>>,
124 {
125 let shutdown = self.shutdown.clone();
126 let shutdown_after = self.shutdown.shutdown_after(signal);
127 if let Some(terminator) = self.terminator {
128 let _res = futures::future::select(
129 futures::future::join_all(self.futures)
130 .map(|_| shutdown.start_shutdown())
131 .boxed(),
132 async {
133 let _res = shutdown_after.await;
134 terminator.await;
135 }
136 .boxed(),
137 )
138 .await;
139 } else {
140 let runner = self.run();
141 let _res = futures::join!(shutdown_after, runner); }
143 Ok(())
144 }
145
146 pub async fn run(self) -> std::io::Result<()> {
156 let shutdown = self.shutdown.clone();
157 let shutdown_future = self.shutdown.boxed().map(|_| ());
158 futures::join!(
159 futures::future::join_all(self.futures).map(|_| shutdown.start_shutdown()),
160 shutdown_future,
161 );
162
163 Ok(())
164 }
165
166 pub fn on_event<F: Fn(Worker<Event>) + Send + Sync + 'static>(self, f: F) -> Self {
168 let _ = self.event_handler.write().map(|mut res| {
169 let _ = res.insert(Box::new(f));
170 });
171 self
172 }
173}
174
175impl Default for Monitor {
176 fn default() -> Self {
177 Self {
178 shutdown: Shutdown::new(),
179 futures: Vec::new(),
180 terminator: None,
181 event_handler: Arc::default(),
182 workers: Vec::new(),
183 }
184 }
185}
186
187impl Monitor {
188 pub fn new() -> Self {
194 Self::default()
195 }
196
197 #[cfg(feature = "sleep")]
207 pub fn shutdown_timeout(self, duration: std::time::Duration) -> Self {
208 self.with_terminator(crate::sleep(duration))
209 }
210
211 pub fn with_terminator(mut self, fut: impl Future<Output = ()> + Send + 'static) -> Self {
219 self.terminator = Some(fut.boxed());
220 self
221 }
222}
223
224#[cfg(test)]
225mod tests {
226 use crate::test_utils::apalis_test_service_fn;
227 use std::{io, time::Duration};
228
229 use tokio::time::sleep;
230
231 use crate::{
232 builder::{WorkerBuilder, WorkerFactory},
233 memory::MemoryStorage,
234 monitor::Monitor,
235 mq::MessageQueue,
236 request::Request,
237 test_message_queue,
238 test_utils::TestWrapper,
239 };
240
241 test_message_queue!(MemoryStorage::new());
242
243 #[tokio::test]
244 async fn it_works_with_workers() {
245 let backend = MemoryStorage::new();
246 let mut handle = backend.clone();
247
248 tokio::spawn(async move {
249 for i in 0..10 {
250 handle.enqueue(i).await.unwrap();
251 }
252 });
253 let service = tower::service_fn(|request: Request<u32, ()>| async {
254 tokio::time::sleep(Duration::from_secs(1)).await;
255 Ok::<_, io::Error>(request)
256 });
257 let worker = WorkerBuilder::new("rango-tango")
258 .backend(backend)
259 .build(service);
260 let monitor: Monitor = Monitor::new();
261 let monitor = monitor.register(worker);
262 let shutdown = monitor.shutdown.clone();
263 tokio::spawn(async move {
264 sleep(Duration::from_millis(1500)).await;
265 shutdown.start_shutdown();
266 });
267 monitor.run().await.unwrap();
268 }
269 #[tokio::test]
270 async fn test_monitor_run() {
271 let backend = MemoryStorage::new();
272 let mut handle = backend.clone();
273
274 tokio::spawn(async move {
275 for i in 0..10 {
276 handle.enqueue(i).await.unwrap();
277 }
278 });
279 let service = tower::service_fn(|request: Request<u32, _>| async {
280 tokio::time::sleep(Duration::from_secs(1)).await;
281 Ok::<_, io::Error>(request)
282 });
283 let worker = WorkerBuilder::new("rango-tango")
284 .backend(backend)
285 .build(service);
286 let monitor: Monitor = Monitor::new();
287 let monitor = monitor.on_event(|e| {
288 println!("{e:?}");
289 });
290 let monitor = monitor.register(worker);
291 assert_eq!(monitor.futures.len(), 1);
292 let shutdown = monitor.shutdown.clone();
293 tokio::spawn(async move {
294 sleep(Duration::from_millis(1000)).await;
295 shutdown.start_shutdown();
296 });
297
298 let result = monitor.run().await;
299 sleep(Duration::from_millis(1000)).await;
300 assert!(result.is_ok());
301 }
302}