1use crate::message::Message;
55use crate::transport::AsyncTransport;
56use futures_channel::mpsc;
57use futures_core::stream::Stream;
58use std::pin::Pin;
59use std::task::{Context, Poll};
60use thiserror::Error;
61
62pub fn bounded(fifo_size: usize) -> (Bounded, Bounded) {
67 let (sender1, receiver1) = mpsc::channel(fifo_size);
68 let (sender2, receiver2) = mpsc::channel(fifo_size);
69
70 (
71 Bounded::new(receiver1, sender2),
72 Bounded::new(receiver2, sender1),
73 )
74}
75
76#[derive(Debug)]
81pub struct Bounded {
82 receiver: mpsc::Receiver<Message>,
83 sender: mpsc::Sender<Message>,
84}
85
86impl Bounded {
87 fn new(receiver: mpsc::Receiver<Message>, sender: mpsc::Sender<Message>) -> Self {
88 Self { receiver, sender }
89 }
90}
91
92#[derive(Error, Debug, Copy, Clone, PartialEq, Eq)]
94#[error("disconnected")]
95pub struct Disconnected;
96
97impl AsyncTransport for Bounded {
98 type Error = Disconnected;
99
100 fn receive_poll(
101 mut self: Pin<&mut Self>,
102 cx: &mut Context,
103 ) -> Poll<Result<Message, Disconnected>> {
104 match Pin::new(&mut self.receiver).poll_next(cx) {
105 Poll::Ready(Some(msg)) => Poll::Ready(Ok(msg)),
106 Poll::Ready(None) => Poll::Ready(Err(Disconnected)),
107 Poll::Pending => Poll::Pending,
108 }
109 }
110
111 fn send_poll_ready(
112 mut self: Pin<&mut Self>,
113 cx: &mut Context,
114 ) -> Poll<Result<(), Disconnected>> {
115 match self.sender.poll_ready(cx) {
116 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
117 Poll::Ready(Err(_)) => Poll::Ready(Err(Disconnected)),
118 Poll::Pending => Poll::Pending,
119 }
120 }
121
122 fn send_start(mut self: Pin<&mut Self>, msg: Message) -> Result<(), Disconnected> {
123 self.sender.start_send(msg).map_err(|_| Disconnected)
124 }
125
126 fn send_poll_flush(
127 mut self: Pin<&mut Self>,
128 cx: &mut Context,
129 ) -> Poll<Result<(), Disconnected>> {
130 match self.sender.poll_ready(cx) {
131 Poll::Ready(_) => Poll::Ready(Ok(())),
132 Poll::Pending => Poll::Pending,
133 }
134 }
135}
136
137pub fn unbounded() -> (Unbounded, Unbounded) {
139 let (sender1, receiver1) = mpsc::unbounded();
140 let (sender2, receiver2) = mpsc::unbounded();
141
142 (
143 Unbounded::new(receiver1, sender2),
144 Unbounded::new(receiver2, sender1),
145 )
146}
147
148#[derive(Debug)]
150pub struct Unbounded {
151 receiver: mpsc::UnboundedReceiver<Message>,
152 sender: mpsc::UnboundedSender<Message>,
153}
154
155impl Unbounded {
156 fn new(
157 receiver: mpsc::UnboundedReceiver<Message>,
158 sender: mpsc::UnboundedSender<Message>,
159 ) -> Self {
160 Self { receiver, sender }
161 }
162}
163
164impl AsyncTransport for Unbounded {
165 type Error = Disconnected;
166
167 fn receive_poll(
168 mut self: Pin<&mut Self>,
169 cx: &mut Context,
170 ) -> Poll<Result<Message, Disconnected>> {
171 match Pin::new(&mut self.receiver).poll_next(cx) {
172 Poll::Ready(Some(msg)) => Poll::Ready(Ok(msg)),
173 Poll::Ready(None) => Poll::Ready(Err(Disconnected)),
174 Poll::Pending => Poll::Pending,
175 }
176 }
177
178 fn send_poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Disconnected>> {
179 match self.sender.poll_ready(cx) {
180 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
181 Poll::Ready(Err(_)) => Poll::Ready(Err(Disconnected)),
182 Poll::Pending => Poll::Pending,
183 }
184 }
185
186 fn send_start(mut self: Pin<&mut Self>, msg: Message) -> Result<(), Disconnected> {
187 self.sender.start_send(msg).map_err(|_| Disconnected)
188 }
189
190 fn send_poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Disconnected>> {
191 match self.sender.poll_ready(cx) {
192 Poll::Ready(_) => Poll::Ready(Ok(())),
193 Poll::Pending => Poll::Pending,
194 }
195 }
196}