round_based/state_machine/
delivery.rs1use core::task::{Poll, ready};
2
3pub struct Delivery<M> {
5 shared_state: super::shared_state::SharedStateRef<M>,
6}
7
8impl<M> Delivery<M> {
9 pub(super) fn new(shared_state: super::shared_state::SharedStateRef<M>) -> Self {
10 Self { shared_state }
11 }
12}
13
14impl<M> futures_util::Stream for Delivery<M> {
15 type Item = Result<crate::Incoming<M>, DeliveryErr>;
16
17 fn poll_next(
18 self: core::pin::Pin<&mut Self>,
19 _cx: &mut core::task::Context<'_>,
20 ) -> Poll<Option<Self::Item>> {
21 let scheduler = ready!(self.shared_state.can_schedule());
22
23 scheduler
24 .protocol_needs_one_more_msg()
25 .map(|msg| Some(Ok(msg)))
26 }
27}
28
29impl<M> futures_util::Sink<crate::Outgoing<M>> for Delivery<M> {
30 type Error = DeliveryErr;
31
32 fn poll_ready(
33 self: core::pin::Pin<&mut Self>,
34 _cx: &mut core::task::Context<'_>,
35 ) -> Poll<Result<(), Self::Error>> {
36 let scheduler = ready!(self.shared_state.can_schedule());
37 scheduler.protocol_flushes_outgoing_msg().map(Ok)
38 }
39
40 fn start_send(
41 self: core::pin::Pin<&mut Self>,
42 msg: crate::Outgoing<M>,
43 ) -> Result<(), Self::Error> {
44 self.shared_state
45 .protocol_saves_msg_to_be_sent(msg)
46 .map_err(|_| DeliveryErr(Reason::NotReady))
47 }
48
49 fn poll_flush(
50 self: core::pin::Pin<&mut Self>,
51 _cx: &mut core::task::Context<'_>,
52 ) -> Poll<Result<(), Self::Error>> {
53 let scheduler = ready!(self.shared_state.can_schedule());
54 scheduler.protocol_flushes_outgoing_msg().map(Ok)
55 }
56
57 fn poll_close(
58 self: core::pin::Pin<&mut Self>,
59 _cx: &mut core::task::Context<'_>,
60 ) -> Poll<Result<(), Self::Error>> {
61 Poll::Ready(Ok(()))
62 }
63}
64
65#[derive(Debug, thiserror::Error)]
67#[error(transparent)]
68pub struct DeliveryErr(Reason);
69
70#[derive(Debug, thiserror::Error)]
71enum Reason {
72 #[error("sink is not ready")]
73 NotReady,
74}