round_based/state_machine/
delivery.rs

1use core::task::{Poll, ready};
2
3/// Provides a stream of incoming and sink for outgoing messages
4pub struct Delivery<M> {
5    shared_state: super::shared_state::SharedStateRef<M>,
6}
7
8impl<M> Delivery<M> {
9    pub(super) fn new(shared_state: super::shared_state::SharedStateRef<M>) -> Self {
10        Self { shared_state }
11    }
12}
13
14impl<M> futures_util::Stream for Delivery<M> {
15    type Item = Result<crate::Incoming<M>, DeliveryErr>;
16
17    fn poll_next(
18        self: core::pin::Pin<&mut Self>,
19        _cx: &mut core::task::Context<'_>,
20    ) -> Poll<Option<Self::Item>> {
21        let scheduler = ready!(self.shared_state.can_schedule());
22
23        scheduler
24            .protocol_needs_one_more_msg()
25            .map(|msg| Some(Ok(msg)))
26    }
27}
28
29impl<M> futures_util::Sink<crate::Outgoing<M>> for Delivery<M> {
30    type Error = DeliveryErr;
31
32    fn poll_ready(
33        self: core::pin::Pin<&mut Self>,
34        _cx: &mut core::task::Context<'_>,
35    ) -> Poll<Result<(), Self::Error>> {
36        let scheduler = ready!(self.shared_state.can_schedule());
37        scheduler.protocol_flushes_outgoing_msg().map(Ok)
38    }
39
40    fn start_send(
41        self: core::pin::Pin<&mut Self>,
42        msg: crate::Outgoing<M>,
43    ) -> Result<(), Self::Error> {
44        self.shared_state
45            .protocol_saves_msg_to_be_sent(msg)
46            .map_err(|_| DeliveryErr(Reason::NotReady))
47    }
48
49    fn poll_flush(
50        self: core::pin::Pin<&mut Self>,
51        _cx: &mut core::task::Context<'_>,
52    ) -> Poll<Result<(), Self::Error>> {
53        let scheduler = ready!(self.shared_state.can_schedule());
54        scheduler.protocol_flushes_outgoing_msg().map(Ok)
55    }
56
57    fn poll_close(
58        self: core::pin::Pin<&mut Self>,
59        _cx: &mut core::task::Context<'_>,
60    ) -> Poll<Result<(), Self::Error>> {
61        Poll::Ready(Ok(()))
62    }
63}
64
65/// Error returned by [`Delivery`]
66#[derive(Debug, thiserror::Error)]
67#[error(transparent)]
68pub struct DeliveryErr(Reason);
69
70#[derive(Debug, thiserror::Error)]
71enum Reason {
72    #[error("sink is not ready")]
73    NotReady,
74}