1#![allow(unknown_lints)] #![warn(clippy::pedantic, rust_2018_idioms)]
3
4use std::{
63 error::Error,
64 future::Future,
65 marker::{PhantomData, Unpin},
66 pin::Pin,
67 sync::{
68 atomic::{AtomicBool, Ordering},
69 Arc,
70 },
71 task::{Context, Poll},
72};
73
74use futures_core::TryFuture;
75
76#[must_use = "futures do nothing unless you `.await` or poll them"]
78#[pin_project::pin_project]
79pub struct Evermore<E, S, D, F>
80where
81 S: Future<Output = ()> + Send,
82 D: Clone,
83 F: Unpin + factory::Factory<D>,
84{
85 _e: PhantomData<E>,
86
87 #[cfg(feature = "with-tracing")]
88 span: tracing::Span,
89
90 data: Worker<D>,
91 workers: Vec<(bool, PinnedWorkerFactory<E, D, F>)>,
92
93 #[pin]
94 signal: S,
95}
96
97impl<E, S, D, F> Evermore<E, S, D, F>
98where
99 E: Error,
100 S: Future<Output = ()> + Send,
101 D: Clone,
102 F: Unpin + factory::Factory<D>,
103 <F as factory::Factory<D>>::Future: TryFuture<Error = E> + Unpin,
104{
105 pub fn new(signal: S, worker_count: usize, data: D, factory: F) -> Self {
117 assert!(worker_count == 0, "Worker count cannot be 0");
118
119 let worker_data = Worker {
120 data,
121 stop: Arc::new(AtomicBool::new(false)),
122 };
123
124 let mut workers = Vec::with_capacity(worker_count as usize);
125
126 for i in 0..(worker_count - 1) {
127 workers.push((
128 true,
129 Box::pin(WorkerFactory::new(
130 i + 1,
131 worker_data.clone(),
132 factory.clone(),
133 )),
134 ));
135 }
136
137 workers.push((
139 true,
140 Box::pin(WorkerFactory::new(
141 worker_count,
142 worker_data.clone(),
143 factory,
144 )),
145 ));
146
147 Self {
148 _e: PhantomData,
149 #[cfg(feature = "with-tracing")]
150 span: tracing::info_span!("evermore"),
151 data: worker_data,
152 workers,
153 signal,
154 }
155 }
156
157 pub fn new_default(signal: S, worker_count: usize, factory: F) -> Self
161 where
162 D: Default,
163 {
164 Self::new(signal, worker_count, D::default(), factory)
165 }
166}
167
168impl<E, S, D, F> Future for Evermore<E, S, D, F>
169where
170 E: Error,
171 S: Future<Output = ()> + Send,
172 D: Clone,
173 F: Unpin + factory::Factory<D>,
174 <F as factory::Factory<D>>::Future: TryFuture<Error = E>,
175{
176 type Output = ();
177
178 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
179 let this = self.as_mut().project();
180
181 #[cfg(feature = "with-tracing")]
182 let _entered = this.span.enter();
183
184 let data: &mut Worker<D> = this.data;
185 let workers: &mut Vec<(bool, PinnedWorkerFactory<E, D, F>)> = this.workers;
186
187 if !data.stop.load(Ordering::SeqCst) {
188 #[cfg(feature = "with-log")]
189 log::trace!("Polling shutdown signal");
190 #[cfg(feature = "with-tracing")]
191 tracing::trace!("Polling shutdown signal");
192
193 if let Poll::Ready(()) = this.signal.poll(cx) {
194 #[cfg(feature = "with-log")]
195 log::debug!("Received shutdown signal, setting `stop` to `true`");
196 #[cfg(feature = "with-tracing")]
197 tracing::debug!("Received shutdown signal, setting `stop` to `true`");
198
199 data.stop.store(true, Ordering::SeqCst);
200 }
201 }
202
203 if data.stop.load(Ordering::SeqCst) {
204 for entry in workers.iter_mut() {
206 let (running, worker): &mut (bool, PinnedWorkerFactory<E, D, F>) = entry;
207
208 #[cfg(feature = "with-log")]
209 log::trace!("Polling worker [id: {}]", worker.id);
210 #[cfg(feature = "with-tracing")]
211 tracing::trace!(id = worker.id, "Polling worker");
212
213 let worker: Pin<&mut WorkerFactory<E, D, F>> = worker.as_mut();
214
215 let poll: Poll<<<F as factory::Factory<D>>::Future as TryFuture>::Ok> =
216 worker.poll(cx);
217
218 if let Poll::Ready(_res) = poll {
219 *running = false;
220 }
221 }
222
223 if workers.iter().any(|(running, _)| *running) {
224 Poll::Pending
225 } else {
226 Poll::Ready(())
227 }
228 } else {
229 for entry in workers.iter_mut() {
231 let (running, worker): &mut (bool, PinnedWorkerFactory<E, D, F>) = entry;
232
233 #[cfg(any(feature = "with-log", feature = "with-tracing"))]
234 let id = worker.id;
235
236 #[cfg(feature = "with-log")]
237 log::trace!("Polling worker [id: {}]", id);
238 #[cfg(feature = "with-tracing")]
239 tracing::trace!(id = id, "Polling worker");
240
241 if *running {
244 let worker: Pin<&mut WorkerFactory<E, D, F>> = worker.as_mut();
245
246 let poll: Poll<<<F as factory::Factory<D>>::Future as TryFuture>::Ok> =
247 worker.poll(cx);
248
249 match poll {
250 Poll::Pending => {}
251 Poll::Ready(_res) => {
252 #[cfg(feature = "with-log")]
255 log::trace!("Worker has stopped, without the shutdown signal, and has not restarted [id: {}]", id);
256 #[cfg(feature = "with-tracing")]
257 tracing::error!(id = id, "Worker has stopped, without the shutdown signal, and has not restarted");
258
259 *running = false;
260 }
261 }
262 }
263 }
264
265 Poll::Pending
266 }
267 }
268}
269
270#[derive(Debug)]
276pub struct Worker<D>
277where
278 D: Clone,
279{
280 stop: Arc<AtomicBool>,
281 data: D,
282}
283
284impl<D> Worker<D>
285where
286 D: Clone,
287{
288 #[inline]
290 pub fn data(&self) -> &D {
291 &self.data
292 }
293
294 #[inline]
296 pub fn should_stop(&self) -> bool {
297 self.stop.load(Ordering::Acquire)
298 }
299}
300
301impl<D> Clone for Worker<D>
302where
303 D: Clone,
304{
305 fn clone(&self) -> Self {
306 Self {
307 stop: self.stop.clone(),
308 data: self.data.clone(),
309 }
310 }
311}
312
313type PinnedWorkerFactory<E, D, F> = Pin<Box<WorkerFactory<E, D, F>>>;
314
315#[pin_project::pin_project]
316struct WorkerFactory<E, D, F>
317where
318 D: Clone,
319 F: Unpin + factory::Factory<D>,
320{
321 _e: PhantomData<E>,
322
323 id: usize,
324 generation: usize,
325 data: Worker<D>,
326
327 #[pin]
328 state: FactoryState<F::Future>,
329 #[pin]
330 factory: F,
331}
332
333impl<E, D, F> WorkerFactory<E, D, F>
334where
335 D: Clone,
336 F: Unpin + factory::Factory<D>,
337{
338 #[inline]
339 fn new(id: usize, data: Worker<D>, factory: F) -> Self {
340 Self {
341 _e: PhantomData,
342 id,
343 data,
344 factory,
345 generation: 1,
346 state: FactoryState::Idle,
347 }
348 }
349}
350
351impl<E, D, F> Future for WorkerFactory<E, D, F>
352where
353 E: Error,
354 D: Clone,
355 F: Unpin + factory::Factory<D>,
356 <F as factory::Factory<D>>::Future: TryFuture<Error = E>,
357{
358 type Output = <<F as factory::Factory<D>>::Future as TryFuture>::Ok;
359
360 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
361 #[cfg(feature = "with-tracing")]
362 let span = tracing::info_span!("worker", id = self.id);
363 #[cfg(feature = "with-tracing")]
364 let _entered = span.enter();
365
366 loop {
367 let this = self.as_mut().project();
368
369 let generation: &mut usize = this.generation;
370 let data: &mut Worker<D> = this.data;
371
372 let mut factory: Pin<&mut F> = this.factory;
373
374 let state = match this.state.project() {
375 FactoryStateProject::Idle => {
376 #[cfg(feature = "with-log")]
377 log::trace!("No future task, creating from factory");
378 #[cfg(feature = "with-tracing")]
379 tracing::trace!("No future task, creating from factory");
380
381 FactoryState::Waiting {
382 task: factory.construct(data.clone()),
383 }
384 }
385 FactoryStateProject::Waiting { task } => {
386 let task: Pin<&mut <F as factory::Factory<D>>::Future> = task;
387
388 match futures_core::ready!(task.try_poll(cx)) {
389 Ok(x) => {
390 *generation = 1;
391
392 return Poll::Ready(x);
393 }
394 Err(_e) => {
395 *generation += 1;
396
397 #[cfg(any(feature = "with-log", feature = "with-tracing"))]
398 #[cfg_attr(
399 any(feature = "with-log", feature = "with-tracing"),
400 allow(clippy::used_underscore_binding)
401 )]
402 let err: E = _e;
403
404 #[cfg(feature = "with-log")]
405 log::error!("Task failed with error: {}", err);
406 #[cfg(feature = "with-tracing")]
407 tracing::error!(error = ?err, "Task failed with error");
408
409 FactoryState::Waiting {
410 task: factory.construct(data.clone()),
411 }
412 }
413 }
414 }
415 };
416
417 self.as_mut().project().state.set(state);
418 }
419 }
420}
421
422#[pin_project::pin_project(project = FactoryStateProject)]
423enum FactoryState<F> {
424 Idle,
425 Waiting {
426 #[pin]
427 task: F,
428 },
429}
430
431mod factory {
432 use {super::Worker, futures_core::TryFuture};
433
434 pub trait Factory<D>: Clone
435 where
436 D: Clone,
437 {
438 type Future: TryFuture;
439
440 fn construct(&mut self, data: Worker<D>) -> Self::Future;
441 }
442
443 impl<D, T, F> Factory<D> for T
444 where
445 D: Clone,
446 T: Unpin + Clone + FnMut(Worker<D>) -> F,
447 F: TryFuture,
448 {
449 type Future = F;
450
451 #[inline]
452 fn construct(&mut self, data: Worker<D>) -> Self::Future {
453 (self)(data)
454 }
455 }
456}