stry_evermore/
lib.rs

1#![allow(clippy::unknown_clippy_lints)] // because of pin-project
2#![warn(clippy::pedantic, rust_2018_idioms)]
3
4//! Evermore is a library allows you to run an fixed number of asynchronous
5//! task repeatedly until a shutdown signal is sent out.
6//!
7//! # Examples
8//!
9//! The example below shows the normal usage of Evermore (with dummy
10//! tasks and data), with the tokio [`broadcast channel`] being used as a
11//! shutdown signal sent using [`ctrlc`].
12//!
13//! ```rust,no_run
14//! use stry_evermore::{Evermore, Worker};
15//!
16//! #[derive(Clone, Debug, Default)]
17//! struct Data {}
18//!
19//! #[tokio::main]
20//! async fn main() {
21//!     tracing_subscriber::fmt()
22//!         .with_max_level(tracing::Level::TRACE)
23//!         .with_target(true)
24//!         .init();
25//!
26//!     let (tx, mut rx) = tokio::sync::broadcast::channel(1);
27//!
28//!     ctrlc::set_handler(move || {
29//!         if tx.send(()).is_err() {
30//!             tracing::error!("Unable to send shutdown signal");
31//!         }
32//!     })
33//!     .expect("Unable to set CTRL-C handler");
34//!
35//!     let signal = async move { rx.recv().await.expect("Failed to listen for event") };
36//!
37//!     Evermore::new(signal, 4, Data::default(), |data: Worker<Data>| {
38//!         Box::pin(task(data))
39//!     })
40//!     .await;
41//! }
42//!
43//! #[tracing::instrument(skip(data))]
44//! async fn task(worker: Worker<Data>) -> Result<(), std::io::Error> {
45//!     loop {
46//!         tokio::time::delay_for(tokio::time::Duration::from_millis(1)).await;
47//!
48//!         if worker.should_stop() {
49//!             tracing::info!("Received shutdown signal, shutting down");
50//!
51//!             break;
52//!         }
53//!     }
54//!
55//!     Ok(())
56//! }
57//! ```
58//!
59//! [`broadcast channel`]: https://docs.rs/tokio/0.2.22/tokio/sync/broadcast/fn.channel.html
60//! [`ctrlc`]: https://crates.io/crates/ctrlc
61
62use {
63    futures_core::TryFuture,
64    std::{
65        error::Error,
66        future::Future,
67        marker::{PhantomData, Unpin},
68        pin::Pin,
69        sync::{
70            atomic::{AtomicBool, Ordering},
71            Arc,
72        },
73        task::{Context, Poll},
74    },
75};
76
77/// An graceful shutdown enabled repeating asynchronous task runner.
78#[must_use = "futures do nothing unless you `.await` or poll them"]
79#[pin_project::pin_project]
80pub struct Evermore<E, S, D, F>
81where
82    S: Future<Output = ()> + Send,
83    D: Clone,
84    F: Unpin + factory::Factory<D>,
85{
86    _e: PhantomData<E>,
87
88    #[cfg(feature = "with-tracing")]
89    span: tracing::Span,
90
91    data: Worker<D>,
92    workers: Vec<(bool, PinnedWorkerFactory<E, D, F>)>,
93
94    #[pin]
95    signal: S,
96}
97
98impl<E, S, D, F> Evermore<E, S, D, F>
99where
100    E: Error,
101    S: Future<Output = ()> + Send,
102    D: Clone,
103    F: Unpin + factory::Factory<D>,
104    <F as factory::Factory<D>>::Future: TryFuture<Error = E> + Unpin,
105{
106    pub fn new(signal: S, worker_count: usize, data: D, factory: F) -> Self {
107        debug_assert!(worker_count == 0, "Worker count cannot be 0");
108
109        let worker_data = Worker {
110            data,
111            stop: Arc::new(AtomicBool::new(false)),
112        };
113
114        let mut workers = Vec::with_capacity(worker_count as usize);
115
116        for i in 0..(worker_count - 1) {
117            workers.push((
118                true,
119                Box::pin(WorkerFactory::new(
120                    i + 1,
121                    worker_data.clone(),
122                    factory.clone(),
123                )),
124            ));
125        }
126
127        // Push the skipped worker, consuming the factory parameter
128        workers.push((
129            true,
130            Box::pin(WorkerFactory::new(
131                worker_count,
132                worker_data.clone(),
133                factory,
134            )),
135        ));
136
137        Self {
138            _e: PhantomData,
139            #[cfg(feature = "with-tracing")]
140            span: tracing::info_span!("evermore"),
141            data: worker_data,
142            workers,
143            signal,
144        }
145    }
146}
147
148impl<E, S, D, F> Future for Evermore<E, S, D, F>
149where
150    E: Error,
151    S: Future<Output = ()> + Send,
152    D: Clone,
153    F: Unpin + factory::Factory<D>,
154    <F as factory::Factory<D>>::Future: TryFuture<Error = E>,
155{
156    type Output = ();
157
158    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
159        let this = self.as_mut().project();
160
161        #[cfg(feature = "with-tracing")]
162        let _entered = this.span.enter();
163
164        let data: &mut Worker<D> = this.data;
165        let workers: &mut Vec<(bool, PinnedWorkerFactory<E, D, F>)> = this.workers;
166
167        if !data.stop.load(Ordering::SeqCst) {
168            #[cfg(feature = "with-log")]
169            log::trace!("Polling shutdown signal");
170            #[cfg(feature = "with-tracing")]
171            tracing::trace!("Polling shutdown signal");
172
173            if let Poll::Ready(()) = this.signal.poll(cx) {
174                #[cfg(feature = "with-log")]
175                log::debug!("Received shutdown signal, setting `stop` to `true`");
176                #[cfg(feature = "with-tracing")]
177                tracing::debug!("Received shutdown signal, setting `stop` to `true`");
178
179                data.stop.store(true, Ordering::SeqCst);
180            }
181        }
182
183        if data.stop.load(Ordering::SeqCst) {
184            // Only runs once the shutdown signal has been sent
185            for entry in workers.iter_mut() {
186                let (running, worker): &mut (bool, PinnedWorkerFactory<E, D, F>) = entry;
187
188                #[cfg(feature = "with-log")]
189                log::trace!("Polling worker [id: {}]", worker.id);
190                #[cfg(feature = "with-tracing")]
191                tracing::trace!(id = worker.id, "Polling worker");
192
193                let worker: Pin<&mut WorkerFactory<E, D, F>> = worker.as_mut();
194
195                let poll: Poll<<<F as factory::Factory<D>>::Future as TryFuture>::Ok> =
196                    worker.poll(cx);
197
198                if let Poll::Ready(_res) = poll {
199                    *running = false;
200                }
201            }
202
203            if workers.iter().any(|(running, _)| *running) {
204                Poll::Pending
205            } else {
206                Poll::Ready(())
207            }
208        } else {
209            // Poll over every worker until the shutdown signal is sent
210            for entry in workers.iter_mut() {
211                let (running, worker): &mut (bool, PinnedWorkerFactory<E, D, F>) = entry;
212
213                #[cfg(any(feature = "with-log", feature = "with-tracing"))]
214                let id = worker.id;
215
216                #[cfg(feature = "with-log")]
217                log::trace!("Polling worker [id: {}]", id);
218                #[cfg(feature = "with-tracing")]
219                tracing::trace!(id = id, "Polling worker");
220
221                // Only poll the worker if its still running
222                // This is incase of the event of a worker returning early
223                if *running {
224                    let worker: Pin<&mut WorkerFactory<E, D, F>> = worker.as_mut();
225
226                    let poll: Poll<<<F as factory::Factory<D>>::Future as TryFuture>::Ok> =
227                        worker.poll(cx);
228
229                    match poll {
230                        Poll::Pending => {}
231                        Poll::Ready(_res) => {
232                            // TODO: handle value of returned future
233                            // Maybe return the error and add it to a error chain
234                            #[cfg(feature = "with-log")]
235                            log::trace!("Worker has stopped, without the shutdown signal, and has not restarted [id: {}]", id);
236                            #[cfg(feature = "with-tracing")]
237                            tracing::error!(id = id, "Worker has stopped, without the shutdown signal, and has not restarted");
238
239                            *running = false;
240                        }
241                    }
242                }
243            }
244
245            Poll::Pending
246        }
247    }
248}
249
250/// The task worker running this task, stores the users shared data.
251///
252/// This does not allow you to send a shutdown signal or interact
253/// with the worker in anyway, it is only used to store user data
254/// and the shared stop signal.
255#[derive(Debug)]
256pub struct Worker<D>
257where
258    D: Clone,
259{
260    stop: Arc<AtomicBool>,
261
262    /// The users shared data.
263    pub data: D,
264}
265
266impl<D> Worker<D>
267where
268    D: Clone,
269{
270    /// Returns `true` if the running task should cleanup and shutdown.
271    #[inline]
272    pub fn should_stop(&self) -> bool {
273        self.stop.load(Ordering::Acquire)
274    }
275}
276
277impl<D> Clone for Worker<D>
278where
279    D: Clone,
280{
281    fn clone(&self) -> Self {
282        Self {
283            stop: self.stop.clone(),
284            data: self.data.clone(),
285        }
286    }
287}
288
289type PinnedWorkerFactory<E, D, F> = Pin<Box<WorkerFactory<E, D, F>>>;
290
291#[pin_project::pin_project]
292struct WorkerFactory<E, D, F>
293where
294    D: Clone,
295    F: Unpin + factory::Factory<D>,
296{
297    _e: PhantomData<E>,
298
299    id: usize,
300    generation: usize,
301    data: Worker<D>,
302
303    #[pin]
304    state: FactoryState<F::Future>,
305    #[pin]
306    factory: F,
307}
308
309impl<E, D, F> WorkerFactory<E, D, F>
310where
311    D: Clone,
312    F: Unpin + factory::Factory<D>,
313{
314    #[inline]
315    fn new(id: usize, data: Worker<D>, factory: F) -> Self {
316        Self {
317            _e: PhantomData,
318            id,
319            data,
320            factory,
321            generation: 1,
322            state: FactoryState::Idle,
323        }
324    }
325}
326
327impl<E, D, F> Future for WorkerFactory<E, D, F>
328where
329    E: Error,
330    D: Clone,
331    F: Unpin + factory::Factory<D>,
332    <F as factory::Factory<D>>::Future: TryFuture<Error = E>,
333{
334    type Output = <<F as factory::Factory<D>>::Future as TryFuture>::Ok;
335
336    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
337        #[cfg(feature = "with-tracing")]
338        let span = tracing::info_span!("worker", id = self.id);
339        #[cfg(feature = "with-tracing")]
340        let _entered = span.enter();
341
342        loop {
343            let this = self.as_mut().project();
344
345            let generation: &mut usize = this.generation;
346            let data: &mut Worker<D> = this.data;
347
348            let mut factory: Pin<&mut F> = this.factory;
349
350            let state = match this.state.project() {
351                FactoryStateProject::Idle => {
352                    #[cfg(feature = "with-log")]
353                    log::trace!("No future task, creating from factory");
354                    #[cfg(feature = "with-tracing")]
355                    tracing::trace!("No future task, creating from factory");
356
357                    FactoryState::Waiting {
358                        task: factory.new(data.clone()),
359                    }
360                }
361                FactoryStateProject::Waiting { task } => {
362                    let task: Pin<&mut <F as factory::Factory<D>>::Future> = task;
363
364                    match futures_core::ready!(task.try_poll(cx)) {
365                        Ok(x) => {
366                            *generation = 1;
367
368                            return Poll::Ready(x);
369                        }
370                        Err(_e) => {
371                            *generation += 1;
372
373                            #[cfg(any(feature = "with-log", feature = "with-tracing"))]
374                            #[cfg_attr(any(feature = "with-log", feature = "with-tracing"), allow(clippy::used_underscore_binding))]
375                            let err: E = _e;
376
377                            #[cfg(feature = "with-log")]
378                            log::error!("Task failed with error: {}", err);
379                            #[cfg(feature = "with-tracing")]
380                            tracing::error!(error = ?err, "Task failed with error");
381
382                            FactoryState::Waiting {
383                                task: factory.new(data.clone()),
384                            }
385                        }
386                    }
387                }
388            };
389
390            self.as_mut().project().state.set(state);
391        }
392    }
393}
394
395#[pin_project::pin_project(project = FactoryStateProject)]
396enum FactoryState<F> {
397    Idle,
398    Waiting {
399        #[pin]
400        task: F,
401    },
402}
403
404mod factory {
405    use {super::Worker, futures_core::TryFuture};
406
407    pub trait Factory<D>: Clone
408    where
409        D: Clone,
410    {
411        type Future: TryFuture;
412
413        fn new(&mut self, data: Worker<D>) -> Self::Future;
414    }
415
416    impl<D, T, F> Factory<D> for T
417    where
418        D: Clone,
419        T: Unpin + Clone + FnMut(Worker<D>) -> F,
420        F: TryFuture,
421    {
422        type Future = F;
423
424        #[inline]
425        fn new(&mut self, data: Worker<D>) -> Self::Future {
426            (self)(data)
427        }
428    }
429}