Skip to main content

richat_client/
stream.rs

1use {
2    crate::error::ReceiveError,
3    futures::stream::{BoxStream, Stream},
4    pin_project_lite::pin_project,
5    prost::Message,
6    richat_proto::geyser::SubscribeUpdate,
7    std::{
8        fmt,
9        pin::Pin,
10        task::{Context, Poll, ready},
11    },
12};
13
14type InputStream = BoxStream<'static, Result<Vec<u8>, ReceiveError>>;
15
16pin_project! {
17    pub struct SubscribeStream {
18        stream: InputStream,
19    }
20}
21
22impl SubscribeStream {
23    pub(crate) fn new(stream: InputStream) -> Self {
24        Self { stream }
25    }
26}
27
28impl fmt::Debug for SubscribeStream {
29    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30        f.debug_struct("SubscribeStream").finish()
31    }
32}
33
34impl Stream for SubscribeStream {
35    type Item = Result<SubscribeUpdate, ReceiveError>;
36
37    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
38        let mut me = self.project();
39        Poll::Ready(match ready!(Pin::new(&mut me.stream).poll_next(cx)) {
40            Some(Ok(slice)) => Some(SubscribeUpdate::decode(slice.as_slice()).map_err(Into::into)),
41            Some(Err(error)) => Some(Err(error)),
42            None => None,
43        })
44    }
45}