paxakos/node/
shutdown.rs

1use futures::future::BoxFuture;
2use futures::future::FutureExt;
3use futures::stream::StreamExt;
4
5use crate::event::ShutdownEvent;
6use crate::invocation::Invocation;
7
8use super::state_keeper::EventStream;
9
10/// A `Node` that is being [`shut_down`][crate::Node::shut_down].
11pub trait Shutdown {
12    /// Parametrization of the paxakos algorithm.
13    type Invocation: Invocation;
14
15    /// Polls the node's event stream, driving the shutdown to conclusion.
16    fn poll_shutdown(
17        &mut self,
18        cx: &mut std::task::Context<'_>,
19    ) -> std::task::Poll<ShutdownEvent<Self::Invocation>>;
20}
21
22/// The default `Shutdown` implementation.
23pub struct DefaultShutdown<I: Invocation> {
24    trigger: BoxFuture<'static, ()>,
25    events: EventStream<I>,
26}
27
28impl<I: Invocation> DefaultShutdown<I> {
29    pub(crate) fn new(trigger: BoxFuture<'static, ()>, events: EventStream<I>) -> Self {
30        Self { trigger, events }
31    }
32}
33
34impl<I: Invocation> super::Shutdown for DefaultShutdown<I> {
35    type Invocation = I;
36
37    fn poll_shutdown(
38        &mut self,
39        cx: &mut std::task::Context<'_>,
40    ) -> std::task::Poll<ShutdownEvent<Self::Invocation>> {
41        let _ = self.trigger.poll_unpin(cx);
42
43        self.events
44            .poll_next_unpin(cx)
45            .map(|e| e.expect("Event stream ended"))
46    }
47}