evermore/
lib.rs

1#![allow(unknown_lints)] // because of pin-project
2#![warn(clippy::pedantic, rust_2018_idioms)]
3
4//! Evermore is a library allows you to run an fixed number of asynchronous
5//! task repeatedly until a shutdown signal is sent out.
6//!
7//! # Examples
8//!
9//! The example below shows the normal usage of Evermore (with dummy
10//! tasks and data), with the tokio [`broadcast channel`] being used as a
11//! shutdown signal sent using [`ctrlc`].
12//!
13//! ```rust,ignore
14//! use evermore::{Evermore, Worker};
15//!
16//! #[derive(Clone, Debug, Default)]
17//! struct Data {}
18//!
19//! #[tokio::main]
20//! async fn main() {
21//!     tracing_subscriber::fmt()
22//!         .with_max_level(tracing::Level::TRACE)
23//!         .with_target(true)
24//!         .init();
25//!
26//!     let (tx, mut rx) = tokio::sync::broadcast::channel(1);
27//!
28//!     ctrlc::set_handler(move || {
29//!         if tx.send(()).is_err() {
30//!             tracing::error!("Unable to send shutdown signal");
31//!         }
32//!     })
33//!     .expect("Unable to set CTRL-C handler");
34//!
35//!     let signal = async move { rx.recv().await.expect("Failed to listen for event") };
36//!
37//!     Evermore::new_default(signal, 4, |data: Worker<Data>| {
38//!         Box::pin(task(data))
39//!     })
40//!     .await;
41//! }
42//!
43//! #[tracing::instrument(skip(data))]
44//! async fn task(worker: Worker<Data>) -> Result<(), std::io::Error> {
45//!     loop {
46//!         tokio::time::delay_for(tokio::time::Duration::from_millis(1)).await;
47//!
48//!         if worker.should_stop() {
49//!             tracing::info!("Received shutdown signal, shutting down");
50//!
51//!             break;
52//!         }
53//!     }
54//!
55//!     Ok(())
56//! }
57//! ```
58//!
59//! [`broadcast channel`]: https://docs.rs/tokio/0.2.22/tokio/sync/broadcast/fn.channel.html
60//! [`ctrlc`]: https://crates.io/crates/ctrlc
61
62use std::{
63    error::Error,
64    future::Future,
65    marker::{PhantomData, Unpin},
66    pin::Pin,
67    sync::{
68        atomic::{AtomicBool, Ordering},
69        Arc,
70    },
71    task::{Context, Poll},
72};
73
74use futures_core::TryFuture;
75
76/// An graceful shutdown enabled repeating asynchronous task runner.
77#[must_use = "futures do nothing unless you `.await` or poll them"]
78#[pin_project::pin_project]
79pub struct Evermore<E, S, D, F>
80where
81    S: Future<Output = ()> + Send,
82    D: Clone,
83    F: Unpin + factory::Factory<D>,
84{
85    _e: PhantomData<E>,
86
87    #[cfg(feature = "with-tracing")]
88    span: tracing::Span,
89
90    data: Worker<D>,
91    workers: Vec<(bool, PinnedWorkerFactory<E, D, F>)>,
92
93    #[pin]
94    signal: S,
95}
96
97impl<E, S, D, F> Evermore<E, S, D, F>
98where
99    E: Error,
100    S: Future<Output = ()> + Send,
101    D: Clone,
102    F: Unpin + factory::Factory<D>,
103    <F as factory::Factory<D>>::Future: TryFuture<Error = E> + Unpin,
104{
105    /// Creates a new task service.
106    ///
107    /// `signal` can be any future that continuously returns [`Poll::Pending`] until the tasks need to shut down,
108    /// in most cases this is a oneshot channel receiver or something similar like an interrupt handler.
109    ///
110    /// `data` should be a cheep to clone type (like [`Arc`]) as it will be shared to every task.
111    ///
112    /// # Panics
113    ///
114    /// This function panics if the `working_count` is `0` .
115    // TODO: remove the need for a factory
116    pub fn new(signal: S, worker_count: usize, data: D, factory: F) -> Self {
117        assert!(worker_count == 0, "Worker count cannot be 0");
118
119        let worker_data = Worker {
120            data,
121            stop: Arc::new(AtomicBool::new(false)),
122        };
123
124        let mut workers = Vec::with_capacity(worker_count as usize);
125
126        for i in 0..(worker_count - 1) {
127            workers.push((
128                true,
129                Box::pin(WorkerFactory::new(
130                    i + 1,
131                    worker_data.clone(),
132                    factory.clone(),
133                )),
134            ));
135        }
136
137        // Push the skipped worker, consuming the factory parameter
138        workers.push((
139            true,
140            Box::pin(WorkerFactory::new(
141                worker_count,
142                worker_data.clone(),
143                factory,
144            )),
145        ));
146
147        Self {
148            _e: PhantomData,
149            #[cfg(feature = "with-tracing")]
150            span: tracing::info_span!("evermore"),
151            data: worker_data,
152            workers,
153            signal,
154        }
155    }
156
157    /// Create a new task service using [`Default::default`] for the data.
158    ///
159    /// See [`Evermore::new`] for more info.
160    pub fn new_default(signal: S, worker_count: usize, factory: F) -> Self
161    where
162        D: Default,
163    {
164        Self::new(signal, worker_count, D::default(), factory)
165    }
166}
167
168impl<E, S, D, F> Future for Evermore<E, S, D, F>
169where
170    E: Error,
171    S: Future<Output = ()> + Send,
172    D: Clone,
173    F: Unpin + factory::Factory<D>,
174    <F as factory::Factory<D>>::Future: TryFuture<Error = E>,
175{
176    type Output = ();
177
178    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
179        let this = self.as_mut().project();
180
181        #[cfg(feature = "with-tracing")]
182        let _entered = this.span.enter();
183
184        let data: &mut Worker<D> = this.data;
185        let workers: &mut Vec<(bool, PinnedWorkerFactory<E, D, F>)> = this.workers;
186
187        if !data.stop.load(Ordering::SeqCst) {
188            #[cfg(feature = "with-log")]
189            log::trace!("Polling shutdown signal");
190            #[cfg(feature = "with-tracing")]
191            tracing::trace!("Polling shutdown signal");
192
193            if let Poll::Ready(()) = this.signal.poll(cx) {
194                #[cfg(feature = "with-log")]
195                log::debug!("Received shutdown signal, setting `stop` to `true`");
196                #[cfg(feature = "with-tracing")]
197                tracing::debug!("Received shutdown signal, setting `stop` to `true`");
198
199                data.stop.store(true, Ordering::SeqCst);
200            }
201        }
202
203        if data.stop.load(Ordering::SeqCst) {
204            // Only runs once the shutdown signal has been sent
205            for entry in workers.iter_mut() {
206                let (running, worker): &mut (bool, PinnedWorkerFactory<E, D, F>) = entry;
207
208                #[cfg(feature = "with-log")]
209                log::trace!("Polling worker [id: {}]", worker.id);
210                #[cfg(feature = "with-tracing")]
211                tracing::trace!(id = worker.id, "Polling worker");
212
213                let worker: Pin<&mut WorkerFactory<E, D, F>> = worker.as_mut();
214
215                let poll: Poll<<<F as factory::Factory<D>>::Future as TryFuture>::Ok> =
216                    worker.poll(cx);
217
218                if let Poll::Ready(_res) = poll {
219                    *running = false;
220                }
221            }
222
223            if workers.iter().any(|(running, _)| *running) {
224                Poll::Pending
225            } else {
226                Poll::Ready(())
227            }
228        } else {
229            // Poll over every worker until the shutdown signal is sent
230            for entry in workers.iter_mut() {
231                let (running, worker): &mut (bool, PinnedWorkerFactory<E, D, F>) = entry;
232
233                #[cfg(any(feature = "with-log", feature = "with-tracing"))]
234                let id = worker.id;
235
236                #[cfg(feature = "with-log")]
237                log::trace!("Polling worker [id: {}]", id);
238                #[cfg(feature = "with-tracing")]
239                tracing::trace!(id = id, "Polling worker");
240
241                // Only poll the worker if its still running
242                // This is incase of the event of a worker returning early
243                if *running {
244                    let worker: Pin<&mut WorkerFactory<E, D, F>> = worker.as_mut();
245
246                    let poll: Poll<<<F as factory::Factory<D>>::Future as TryFuture>::Ok> =
247                        worker.poll(cx);
248
249                    match poll {
250                        Poll::Pending => {}
251                        Poll::Ready(_res) => {
252                            // TODO: handle value of returned future
253                            // Maybe return the error and add it to a error chain
254                            #[cfg(feature = "with-log")]
255                            log::trace!("Worker has stopped, without the shutdown signal, and has not restarted [id: {}]", id);
256                            #[cfg(feature = "with-tracing")]
257                            tracing::error!(id = id, "Worker has stopped, without the shutdown signal, and has not restarted");
258
259                            *running = false;
260                        }
261                    }
262                }
263            }
264
265            Poll::Pending
266        }
267    }
268}
269
270/// The task worker running this task, stores the users shared data.
271///
272/// This does not allow you to send a shutdown signal or interact
273/// with the worker in anyway, it is only used to store user data
274/// and the shared stop signal.
275#[derive(Debug)]
276pub struct Worker<D>
277where
278    D: Clone,
279{
280    stop: Arc<AtomicBool>,
281    data: D,
282}
283
284impl<D> Worker<D>
285where
286    D: Clone,
287{
288    /// Get the shared task data.
289    #[inline]
290    pub fn data(&self) -> &D {
291        &self.data
292    }
293
294    /// Returns `true` if the running task should cleanup and shutdown.
295    #[inline]
296    pub fn should_stop(&self) -> bool {
297        self.stop.load(Ordering::Acquire)
298    }
299}
300
301impl<D> Clone for Worker<D>
302where
303    D: Clone,
304{
305    fn clone(&self) -> Self {
306        Self {
307            stop: self.stop.clone(),
308            data: self.data.clone(),
309        }
310    }
311}
312
313type PinnedWorkerFactory<E, D, F> = Pin<Box<WorkerFactory<E, D, F>>>;
314
315#[pin_project::pin_project]
316struct WorkerFactory<E, D, F>
317where
318    D: Clone,
319    F: Unpin + factory::Factory<D>,
320{
321    _e: PhantomData<E>,
322
323    id: usize,
324    generation: usize,
325    data: Worker<D>,
326
327    #[pin]
328    state: FactoryState<F::Future>,
329    #[pin]
330    factory: F,
331}
332
333impl<E, D, F> WorkerFactory<E, D, F>
334where
335    D: Clone,
336    F: Unpin + factory::Factory<D>,
337{
338    #[inline]
339    fn new(id: usize, data: Worker<D>, factory: F) -> Self {
340        Self {
341            _e: PhantomData,
342            id,
343            data,
344            factory,
345            generation: 1,
346            state: FactoryState::Idle,
347        }
348    }
349}
350
351impl<E, D, F> Future for WorkerFactory<E, D, F>
352where
353    E: Error,
354    D: Clone,
355    F: Unpin + factory::Factory<D>,
356    <F as factory::Factory<D>>::Future: TryFuture<Error = E>,
357{
358    type Output = <<F as factory::Factory<D>>::Future as TryFuture>::Ok;
359
360    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
361        #[cfg(feature = "with-tracing")]
362        let span = tracing::info_span!("worker", id = self.id);
363        #[cfg(feature = "with-tracing")]
364        let _entered = span.enter();
365
366        loop {
367            let this = self.as_mut().project();
368
369            let generation: &mut usize = this.generation;
370            let data: &mut Worker<D> = this.data;
371
372            let mut factory: Pin<&mut F> = this.factory;
373
374            let state = match this.state.project() {
375                FactoryStateProject::Idle => {
376                    #[cfg(feature = "with-log")]
377                    log::trace!("No future task, creating from factory");
378                    #[cfg(feature = "with-tracing")]
379                    tracing::trace!("No future task, creating from factory");
380
381                    FactoryState::Waiting {
382                        task: factory.construct(data.clone()),
383                    }
384                }
385                FactoryStateProject::Waiting { task } => {
386                    let task: Pin<&mut <F as factory::Factory<D>>::Future> = task;
387
388                    match futures_core::ready!(task.try_poll(cx)) {
389                        Ok(x) => {
390                            *generation = 1;
391
392                            return Poll::Ready(x);
393                        }
394                        Err(_e) => {
395                            *generation += 1;
396
397                            #[cfg(any(feature = "with-log", feature = "with-tracing"))]
398                            #[cfg_attr(
399                                any(feature = "with-log", feature = "with-tracing"),
400                                allow(clippy::used_underscore_binding)
401                            )]
402                            let err: E = _e;
403
404                            #[cfg(feature = "with-log")]
405                            log::error!("Task failed with error: {}", err);
406                            #[cfg(feature = "with-tracing")]
407                            tracing::error!(error = ?err, "Task failed with error");
408
409                            FactoryState::Waiting {
410                                task: factory.construct(data.clone()),
411                            }
412                        }
413                    }
414                }
415            };
416
417            self.as_mut().project().state.set(state);
418        }
419    }
420}
421
422#[pin_project::pin_project(project = FactoryStateProject)]
423enum FactoryState<F> {
424    Idle,
425    Waiting {
426        #[pin]
427        task: F,
428    },
429}
430
431mod factory {
432    use {super::Worker, futures_core::TryFuture};
433
434    pub trait Factory<D>: Clone
435    where
436        D: Clone,
437    {
438        type Future: TryFuture;
439
440        fn construct(&mut self, data: Worker<D>) -> Self::Future;
441    }
442
443    impl<D, T, F> Factory<D> for T
444    where
445        D: Clone,
446        T: Unpin + Clone + FnMut(Worker<D>) -> F,
447        F: TryFuture,
448    {
449        type Future = F;
450
451        #[inline]
452        fn construct(&mut self, data: Worker<D>) -> Self::Future {
453            (self)(data)
454        }
455    }
456}