round_based/state_machine/
delivery.rs1use core::task::{ready, Poll};
2
3pub struct Incomings<M> {
5 shared_state: super::shared_state::SharedStateRef<M>,
6}
7
8impl<M> Incomings<M> {
9 pub(super) fn new(shared_state: super::shared_state::SharedStateRef<M>) -> Self {
10 Self { shared_state }
11 }
12}
13
14impl<M> crate::Stream for Incomings<M> {
15 type Item = Result<crate::Incoming<M>, core::convert::Infallible>;
16
17 fn poll_next(
18 self: core::pin::Pin<&mut Self>,
19 _cx: &mut core::task::Context<'_>,
20 ) -> Poll<Option<Self::Item>> {
21 let scheduler = ready!(self.shared_state.can_schedule());
22
23 scheduler
24 .protocol_needs_one_more_msg()
25 .map(|msg| Some(Ok(msg)))
26 }
27}
28
29pub struct Outgoings<M> {
31 shared_state: super::shared_state::SharedStateRef<M>,
32}
33
34impl<M> Outgoings<M> {
35 pub(super) fn new(shared_state: super::shared_state::SharedStateRef<M>) -> Self {
36 Self { shared_state }
37 }
38}
39
40impl<M> crate::Sink<crate::Outgoing<M>> for Outgoings<M> {
41 type Error = SendErr;
42
43 fn poll_ready(
44 self: core::pin::Pin<&mut Self>,
45 _cx: &mut core::task::Context<'_>,
46 ) -> Poll<Result<(), Self::Error>> {
47 let scheduler = ready!(self.shared_state.can_schedule());
48 scheduler.protocol_flushes_outgoing_msg().map(Ok)
49 }
50
51 fn start_send(
52 self: core::pin::Pin<&mut Self>,
53 msg: crate::Outgoing<M>,
54 ) -> Result<(), Self::Error> {
55 self.shared_state
56 .protocol_saves_msg_to_be_sent(msg)
57 .map_err(|_| SendErr(SendErrReason::NotReady))
58 }
59
60 fn poll_flush(
61 self: core::pin::Pin<&mut Self>,
62 _cx: &mut core::task::Context<'_>,
63 ) -> Poll<Result<(), Self::Error>> {
64 let scheduler = ready!(self.shared_state.can_schedule());
65 scheduler.protocol_flushes_outgoing_msg().map(Ok)
66 }
67
68 fn poll_close(
69 self: core::pin::Pin<&mut Self>,
70 _cx: &mut core::task::Context<'_>,
71 ) -> Poll<Result<(), Self::Error>> {
72 Poll::Ready(Ok(()))
73 }
74}
75
76#[derive(Debug, thiserror::Error)]
78#[error(transparent)]
79pub struct SendErr(SendErrReason);
80
81#[derive(Debug, thiserror::Error)]
82enum SendErrReason {
83 #[error("sink is not ready")]
84 NotReady,
85}