1#![allow(clippy::unknown_clippy_lints)] #![warn(clippy::pedantic, rust_2018_idioms)]
3
4use {
63 futures_core::TryFuture,
64 std::{
65 error::Error,
66 future::Future,
67 marker::{PhantomData, Unpin},
68 pin::Pin,
69 sync::{
70 atomic::{AtomicBool, Ordering},
71 Arc,
72 },
73 task::{Context, Poll},
74 },
75};
76
77#[must_use = "futures do nothing unless you `.await` or poll them"]
79#[pin_project::pin_project]
80pub struct Evermore<E, S, D, F>
81where
82 S: Future<Output = ()> + Send,
83 D: Clone,
84 F: Unpin + factory::Factory<D>,
85{
86 _e: PhantomData<E>,
87
88 #[cfg(feature = "with-tracing")]
89 span: tracing::Span,
90
91 data: Worker<D>,
92 workers: Vec<(bool, PinnedWorkerFactory<E, D, F>)>,
93
94 #[pin]
95 signal: S,
96}
97
98impl<E, S, D, F> Evermore<E, S, D, F>
99where
100 E: Error,
101 S: Future<Output = ()> + Send,
102 D: Clone,
103 F: Unpin + factory::Factory<D>,
104 <F as factory::Factory<D>>::Future: TryFuture<Error = E> + Unpin,
105{
106 pub fn new(signal: S, worker_count: usize, data: D, factory: F) -> Self {
107 debug_assert!(worker_count == 0, "Worker count cannot be 0");
108
109 let worker_data = Worker {
110 data,
111 stop: Arc::new(AtomicBool::new(false)),
112 };
113
114 let mut workers = Vec::with_capacity(worker_count as usize);
115
116 for i in 0..(worker_count - 1) {
117 workers.push((
118 true,
119 Box::pin(WorkerFactory::new(
120 i + 1,
121 worker_data.clone(),
122 factory.clone(),
123 )),
124 ));
125 }
126
127 workers.push((
129 true,
130 Box::pin(WorkerFactory::new(
131 worker_count,
132 worker_data.clone(),
133 factory,
134 )),
135 ));
136
137 Self {
138 _e: PhantomData,
139 #[cfg(feature = "with-tracing")]
140 span: tracing::info_span!("evermore"),
141 data: worker_data,
142 workers,
143 signal,
144 }
145 }
146}
147
148impl<E, S, D, F> Future for Evermore<E, S, D, F>
149where
150 E: Error,
151 S: Future<Output = ()> + Send,
152 D: Clone,
153 F: Unpin + factory::Factory<D>,
154 <F as factory::Factory<D>>::Future: TryFuture<Error = E>,
155{
156 type Output = ();
157
158 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
159 let this = self.as_mut().project();
160
161 #[cfg(feature = "with-tracing")]
162 let _entered = this.span.enter();
163
164 let data: &mut Worker<D> = this.data;
165 let workers: &mut Vec<(bool, PinnedWorkerFactory<E, D, F>)> = this.workers;
166
167 if !data.stop.load(Ordering::SeqCst) {
168 #[cfg(feature = "with-log")]
169 log::trace!("Polling shutdown signal");
170 #[cfg(feature = "with-tracing")]
171 tracing::trace!("Polling shutdown signal");
172
173 if let Poll::Ready(()) = this.signal.poll(cx) {
174 #[cfg(feature = "with-log")]
175 log::debug!("Received shutdown signal, setting `stop` to `true`");
176 #[cfg(feature = "with-tracing")]
177 tracing::debug!("Received shutdown signal, setting `stop` to `true`");
178
179 data.stop.store(true, Ordering::SeqCst);
180 }
181 }
182
183 if data.stop.load(Ordering::SeqCst) {
184 for entry in workers.iter_mut() {
186 let (running, worker): &mut (bool, PinnedWorkerFactory<E, D, F>) = entry;
187
188 #[cfg(feature = "with-log")]
189 log::trace!("Polling worker [id: {}]", worker.id);
190 #[cfg(feature = "with-tracing")]
191 tracing::trace!(id = worker.id, "Polling worker");
192
193 let worker: Pin<&mut WorkerFactory<E, D, F>> = worker.as_mut();
194
195 let poll: Poll<<<F as factory::Factory<D>>::Future as TryFuture>::Ok> =
196 worker.poll(cx);
197
198 if let Poll::Ready(_res) = poll {
199 *running = false;
200 }
201 }
202
203 if workers.iter().any(|(running, _)| *running) {
204 Poll::Pending
205 } else {
206 Poll::Ready(())
207 }
208 } else {
209 for entry in workers.iter_mut() {
211 let (running, worker): &mut (bool, PinnedWorkerFactory<E, D, F>) = entry;
212
213 #[cfg(any(feature = "with-log", feature = "with-tracing"))]
214 let id = worker.id;
215
216 #[cfg(feature = "with-log")]
217 log::trace!("Polling worker [id: {}]", id);
218 #[cfg(feature = "with-tracing")]
219 tracing::trace!(id = id, "Polling worker");
220
221 if *running {
224 let worker: Pin<&mut WorkerFactory<E, D, F>> = worker.as_mut();
225
226 let poll: Poll<<<F as factory::Factory<D>>::Future as TryFuture>::Ok> =
227 worker.poll(cx);
228
229 match poll {
230 Poll::Pending => {}
231 Poll::Ready(_res) => {
232 #[cfg(feature = "with-log")]
235 log::trace!("Worker has stopped, without the shutdown signal, and has not restarted [id: {}]", id);
236 #[cfg(feature = "with-tracing")]
237 tracing::error!(id = id, "Worker has stopped, without the shutdown signal, and has not restarted");
238
239 *running = false;
240 }
241 }
242 }
243 }
244
245 Poll::Pending
246 }
247 }
248}
249
250#[derive(Debug)]
256pub struct Worker<D>
257where
258 D: Clone,
259{
260 stop: Arc<AtomicBool>,
261
262 pub data: D,
264}
265
266impl<D> Worker<D>
267where
268 D: Clone,
269{
270 #[inline]
272 pub fn should_stop(&self) -> bool {
273 self.stop.load(Ordering::Acquire)
274 }
275}
276
277impl<D> Clone for Worker<D>
278where
279 D: Clone,
280{
281 fn clone(&self) -> Self {
282 Self {
283 stop: self.stop.clone(),
284 data: self.data.clone(),
285 }
286 }
287}
288
289type PinnedWorkerFactory<E, D, F> = Pin<Box<WorkerFactory<E, D, F>>>;
290
291#[pin_project::pin_project]
292struct WorkerFactory<E, D, F>
293where
294 D: Clone,
295 F: Unpin + factory::Factory<D>,
296{
297 _e: PhantomData<E>,
298
299 id: usize,
300 generation: usize,
301 data: Worker<D>,
302
303 #[pin]
304 state: FactoryState<F::Future>,
305 #[pin]
306 factory: F,
307}
308
309impl<E, D, F> WorkerFactory<E, D, F>
310where
311 D: Clone,
312 F: Unpin + factory::Factory<D>,
313{
314 #[inline]
315 fn new(id: usize, data: Worker<D>, factory: F) -> Self {
316 Self {
317 _e: PhantomData,
318 id,
319 data,
320 factory,
321 generation: 1,
322 state: FactoryState::Idle,
323 }
324 }
325}
326
327impl<E, D, F> Future for WorkerFactory<E, D, F>
328where
329 E: Error,
330 D: Clone,
331 F: Unpin + factory::Factory<D>,
332 <F as factory::Factory<D>>::Future: TryFuture<Error = E>,
333{
334 type Output = <<F as factory::Factory<D>>::Future as TryFuture>::Ok;
335
336 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
337 #[cfg(feature = "with-tracing")]
338 let span = tracing::info_span!("worker", id = self.id);
339 #[cfg(feature = "with-tracing")]
340 let _entered = span.enter();
341
342 loop {
343 let this = self.as_mut().project();
344
345 let generation: &mut usize = this.generation;
346 let data: &mut Worker<D> = this.data;
347
348 let mut factory: Pin<&mut F> = this.factory;
349
350 let state = match this.state.project() {
351 FactoryStateProject::Idle => {
352 #[cfg(feature = "with-log")]
353 log::trace!("No future task, creating from factory");
354 #[cfg(feature = "with-tracing")]
355 tracing::trace!("No future task, creating from factory");
356
357 FactoryState::Waiting {
358 task: factory.new(data.clone()),
359 }
360 }
361 FactoryStateProject::Waiting { task } => {
362 let task: Pin<&mut <F as factory::Factory<D>>::Future> = task;
363
364 match futures_core::ready!(task.try_poll(cx)) {
365 Ok(x) => {
366 *generation = 1;
367
368 return Poll::Ready(x);
369 }
370 Err(_e) => {
371 *generation += 1;
372
373 #[cfg(any(feature = "with-log", feature = "with-tracing"))]
374 #[cfg_attr(any(feature = "with-log", feature = "with-tracing"), allow(clippy::used_underscore_binding))]
375 let err: E = _e;
376
377 #[cfg(feature = "with-log")]
378 log::error!("Task failed with error: {}", err);
379 #[cfg(feature = "with-tracing")]
380 tracing::error!(error = ?err, "Task failed with error");
381
382 FactoryState::Waiting {
383 task: factory.new(data.clone()),
384 }
385 }
386 }
387 }
388 };
389
390 self.as_mut().project().state.set(state);
391 }
392 }
393}
394
395#[pin_project::pin_project(project = FactoryStateProject)]
396enum FactoryState<F> {
397 Idle,
398 Waiting {
399 #[pin]
400 task: F,
401 },
402}
403
404mod factory {
405 use {super::Worker, futures_core::TryFuture};
406
407 pub trait Factory<D>: Clone
408 where
409 D: Clone,
410 {
411 type Future: TryFuture;
412
413 fn new(&mut self, data: Worker<D>) -> Self::Future;
414 }
415
416 impl<D, T, F> Factory<D> for T
417 where
418 D: Clone,
419 T: Unpin + Clone + FnMut(Worker<D>) -> F,
420 F: TryFuture,
421 {
422 type Future = F;
423
424 #[inline]
425 fn new(&mut self, data: Worker<D>) -> Self::Future {
426 (self)(data)
427 }
428 }
429}