#![allow(clippy::unknown_clippy_lints)] #![warn(clippy::pedantic, rust_2018_idioms)]
use {
futures_core::TryFuture,
std::{
error::Error,
future::Future,
marker::{PhantomData, Unpin},
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
task::{Context, Poll},
},
};
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[pin_project::pin_project]
pub struct Evermore<E, S, D, F>
where
S: Future<Output = ()> + Send,
D: Clone,
F: Unpin + factory::Factory<D>,
{
_e: PhantomData<E>,
#[cfg(feature = "with-tracing")]
span: tracing::Span,
data: Worker<D>,
workers: Vec<(bool, PinnedWorkerFactory<E, D, F>)>,
#[pin]
signal: S,
}
impl<E, S, D, F> Evermore<E, S, D, F>
where
E: Error,
S: Future<Output = ()> + Send,
D: Clone,
F: Unpin + factory::Factory<D>,
<F as factory::Factory<D>>::Future: TryFuture<Error = E> + Unpin,
{
pub fn new(signal: S, worker_count: usize, data: D, factory: F) -> Self {
debug_assert!(worker_count == 0, "Worker count cannot be 0");
let worker_data = Worker {
data,
stop: Arc::new(AtomicBool::new(false)),
};
let mut workers = Vec::with_capacity(worker_count as usize);
for i in 0..(worker_count - 1) {
workers.push((
true,
Box::pin(WorkerFactory::new(
i + 1,
worker_data.clone(),
factory.clone(),
)),
));
}
workers.push((
true,
Box::pin(WorkerFactory::new(
worker_count,
worker_data.clone(),
factory,
)),
));
Self {
_e: PhantomData,
#[cfg(feature = "with-tracing")]
span: tracing::info_span!("evermore"),
data: worker_data,
workers,
signal,
}
}
}
impl<E, S, D, F> Future for Evermore<E, S, D, F>
where
E: Error,
S: Future<Output = ()> + Send,
D: Clone,
F: Unpin + factory::Factory<D>,
<F as factory::Factory<D>>::Future: TryFuture<Error = E>,
{
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().project();
#[cfg(feature = "with-tracing")]
let _entered = this.span.enter();
let data: &mut Worker<D> = this.data;
let workers: &mut Vec<(bool, PinnedWorkerFactory<E, D, F>)> = this.workers;
if !data.stop.load(Ordering::SeqCst) {
#[cfg(feature = "with-log")]
log::trace!("Polling shutdown signal");
#[cfg(feature = "with-tracing")]
tracing::trace!("Polling shutdown signal");
if let Poll::Ready(()) = this.signal.poll(cx) {
#[cfg(feature = "with-log")]
log::debug!("Received shutdown signal, setting `stop` to `true`");
#[cfg(feature = "with-tracing")]
tracing::debug!("Received shutdown signal, setting `stop` to `true`");
data.stop.store(true, Ordering::SeqCst);
}
}
if data.stop.load(Ordering::SeqCst) {
for entry in workers.iter_mut() {
let (running, worker): &mut (bool, PinnedWorkerFactory<E, D, F>) = entry;
#[cfg(feature = "with-log")]
log::trace!("Polling worker [id: {}]", worker.id);
#[cfg(feature = "with-tracing")]
tracing::trace!(id = worker.id, "Polling worker");
let worker: Pin<&mut WorkerFactory<E, D, F>> = worker.as_mut();
let poll: Poll<<<F as factory::Factory<D>>::Future as TryFuture>::Ok> =
worker.poll(cx);
if let Poll::Ready(_res) = poll {
*running = false;
}
}
if workers.iter().any(|(running, _)| *running) {
Poll::Pending
} else {
Poll::Ready(())
}
} else {
for entry in workers.iter_mut() {
let (running, worker): &mut (bool, PinnedWorkerFactory<E, D, F>) = entry;
#[cfg(any(feature = "with-log", feature = "with-tracing"))]
let id = worker.id;
#[cfg(feature = "with-log")]
log::trace!("Polling worker [id: {}]", id);
#[cfg(feature = "with-tracing")]
tracing::trace!(id = id, "Polling worker");
if *running {
let worker: Pin<&mut WorkerFactory<E, D, F>> = worker.as_mut();
let poll: Poll<<<F as factory::Factory<D>>::Future as TryFuture>::Ok> =
worker.poll(cx);
match poll {
Poll::Pending => {}
Poll::Ready(_res) => {
#[cfg(feature = "with-log")]
log::trace!("Worker has stopped, without the shutdown signal, and has not restarted [id: {}]", id);
#[cfg(feature = "with-tracing")]
tracing::error!(id = id, "Worker has stopped, without the shutdown signal, and has not restarted");
*running = false;
}
}
}
}
Poll::Pending
}
}
}
#[derive(Debug)]
pub struct Worker<D>
where
D: Clone,
{
stop: Arc<AtomicBool>,
pub data: D,
}
impl<D> Worker<D>
where
D: Clone,
{
#[inline]
pub fn should_stop(&self) -> bool {
self.stop.load(Ordering::Acquire)
}
}
impl<D> Clone for Worker<D>
where
D: Clone,
{
fn clone(&self) -> Self {
Self {
stop: self.stop.clone(),
data: self.data.clone(),
}
}
}
type PinnedWorkerFactory<E, D, F> = Pin<Box<WorkerFactory<E, D, F>>>;
#[pin_project::pin_project]
struct WorkerFactory<E, D, F>
where
D: Clone,
F: Unpin + factory::Factory<D>,
{
_e: PhantomData<E>,
id: usize,
generation: usize,
data: Worker<D>,
#[pin]
state: FactoryState<F::Future>,
#[pin]
factory: F,
}
impl<E, D, F> WorkerFactory<E, D, F>
where
D: Clone,
F: Unpin + factory::Factory<D>,
{
#[inline]
fn new(id: usize, data: Worker<D>, factory: F) -> Self {
Self {
_e: PhantomData,
id,
data,
factory,
generation: 1,
state: FactoryState::Idle,
}
}
}
impl<E, D, F> Future for WorkerFactory<E, D, F>
where
E: Error,
D: Clone,
F: Unpin + factory::Factory<D>,
<F as factory::Factory<D>>::Future: TryFuture<Error = E>,
{
type Output = <<F as factory::Factory<D>>::Future as TryFuture>::Ok;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
#[cfg(feature = "with-tracing")]
let span = tracing::info_span!("worker", id = self.id);
#[cfg(feature = "with-tracing")]
let _entered = span.enter();
loop {
let this = self.as_mut().project();
let generation: &mut usize = this.generation;
let data: &mut Worker<D> = this.data;
let mut factory: Pin<&mut F> = this.factory;
let state = match this.state.project() {
FactoryStateProject::Idle => {
#[cfg(feature = "with-log")]
log::trace!("No future task, creating from factory");
#[cfg(feature = "with-tracing")]
tracing::trace!("No future task, creating from factory");
FactoryState::Waiting {
task: factory.new(data.clone()),
}
}
FactoryStateProject::Waiting { task } => {
let task: Pin<&mut <F as factory::Factory<D>>::Future> = task;
match futures_core::ready!(task.try_poll(cx)) {
Ok(x) => {
*generation = 1;
return Poll::Ready(x);
}
Err(_e) => {
*generation += 1;
#[cfg(any(feature = "with-log", feature = "with-tracing"))]
#[cfg_attr(any(feature = "with-log", feature = "with-tracing"), allow(clippy::used_underscore_binding))]
let err: E = _e;
#[cfg(feature = "with-log")]
log::error!("Task failed with error: {}", err);
#[cfg(feature = "with-tracing")]
tracing::error!(error = ?err, "Task failed with error");
FactoryState::Waiting {
task: factory.new(data.clone()),
}
}
}
}
};
self.as_mut().project().state.set(state);
}
}
}
#[pin_project::pin_project(project = FactoryStateProject)]
enum FactoryState<F> {
Idle,
Waiting {
#[pin]
task: F,
},
}
mod factory {
use {super::Worker, futures_core::TryFuture};
pub trait Factory<D>: Clone
where
D: Clone,
{
type Future: TryFuture;
fn new(&mut self, data: Worker<D>) -> Self::Future;
}
impl<D, T, F> Factory<D> for T
where
D: Clone,
T: Unpin + Clone + FnMut(Worker<D>) -> F,
F: TryFuture,
{
type Future = F;
#[inline]
fn new(&mut self, data: Worker<D>) -> Self::Future {
(self)(data)
}
}
}