round_based/state_machine/
delivery.rs

1use core::task::{ready, Poll};
2
3/// Stream of incoming messages
4pub struct Incomings<M> {
5    shared_state: super::shared_state::SharedStateRef<M>,
6}
7
8impl<M> Incomings<M> {
9    pub(super) fn new(shared_state: super::shared_state::SharedStateRef<M>) -> Self {
10        Self { shared_state }
11    }
12}
13
14impl<M> crate::Stream for Incomings<M> {
15    type Item = Result<crate::Incoming<M>, core::convert::Infallible>;
16
17    fn poll_next(
18        self: core::pin::Pin<&mut Self>,
19        _cx: &mut core::task::Context<'_>,
20    ) -> Poll<Option<Self::Item>> {
21        let scheduler = ready!(self.shared_state.can_schedule());
22
23        scheduler
24            .protocol_needs_one_more_msg()
25            .map(|msg| Some(Ok(msg)))
26    }
27}
28
29/// Sink for outgoing messages
30pub struct Outgoings<M> {
31    shared_state: super::shared_state::SharedStateRef<M>,
32}
33
34impl<M> Outgoings<M> {
35    pub(super) fn new(shared_state: super::shared_state::SharedStateRef<M>) -> Self {
36        Self { shared_state }
37    }
38}
39
40impl<M> crate::Sink<crate::Outgoing<M>> for Outgoings<M> {
41    type Error = SendErr;
42
43    fn poll_ready(
44        self: core::pin::Pin<&mut Self>,
45        _cx: &mut core::task::Context<'_>,
46    ) -> Poll<Result<(), Self::Error>> {
47        let scheduler = ready!(self.shared_state.can_schedule());
48        scheduler.protocol_flushes_outgoing_msg().map(Ok)
49    }
50
51    fn start_send(
52        self: core::pin::Pin<&mut Self>,
53        msg: crate::Outgoing<M>,
54    ) -> Result<(), Self::Error> {
55        self.shared_state
56            .protocol_saves_msg_to_be_sent(msg)
57            .map_err(|_| SendErr(SendErrReason::NotReady))
58    }
59
60    fn poll_flush(
61        self: core::pin::Pin<&mut Self>,
62        _cx: &mut core::task::Context<'_>,
63    ) -> Poll<Result<(), Self::Error>> {
64        let scheduler = ready!(self.shared_state.can_schedule());
65        scheduler.protocol_flushes_outgoing_msg().map(Ok)
66    }
67
68    fn poll_close(
69        self: core::pin::Pin<&mut Self>,
70        _cx: &mut core::task::Context<'_>,
71    ) -> Poll<Result<(), Self::Error>> {
72        Poll::Ready(Ok(()))
73    }
74}
75
76/// Error returned by [`Outgoings`] sink
77#[derive(Debug, thiserror::Error)]
78#[error(transparent)]
79pub struct SendErr(SendErrReason);
80
81#[derive(Debug, thiserror::Error)]
82enum SendErrReason {
83    #[error("sink is not ready")]
84    NotReady,
85}