aldrin_core/transport/
buffered.rs

1use super::AsyncTransport;
2use crate::message::Message;
3use pin_project_lite::pin_project;
4use std::collections::VecDeque;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8pin_project! {
9    #[derive(Debug)]
10    pub struct Buffered<T> {
11        #[pin]
12        inner: T,
13
14        buffer: VecDeque<Message>,
15    }
16}
17
18impl<T> Buffered<T> {
19    pub fn new(inner: T) -> Self {
20        Self {
21            inner,
22            buffer: VecDeque::new(),
23        }
24    }
25}
26
27impl<T: AsyncTransport> AsyncTransport for Buffered<T> {
28    type Error = T::Error;
29
30    fn receive_poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<Message, Self::Error>> {
31        self.project().inner.receive_poll(cx)
32    }
33
34    fn send_poll_ready(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
35        Poll::Ready(Ok(()))
36    }
37
38    fn send_start(self: Pin<&mut Self>, msg: Message) -> Result<(), Self::Error> {
39        self.project().buffer.push_back(msg);
40        Ok(())
41    }
42
43    fn send_poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
44        let mut this = self.project();
45
46        while !this.buffer.is_empty() {
47            match this.inner.as_mut().send_poll_ready(cx) {
48                Poll::Ready(Ok(())) => {
49                    let msg = this.buffer.pop_front().unwrap();
50                    this.inner.as_mut().send_start(msg)?;
51                }
52
53                Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
54                Poll::Pending => return Poll::Pending,
55            }
56        }
57
58        this.inner.as_mut().send_poll_flush(cx)
59    }
60}