1use {
2 crate::error::ReceiveError,
3 futures::stream::{BoxStream, Stream},
4 pin_project_lite::pin_project,
5 prost::Message,
6 richat_proto::geyser::SubscribeUpdate,
7 std::{
8 fmt,
9 pin::Pin,
10 task::{Context, Poll, ready},
11 },
12};
13
14type InputStream = BoxStream<'static, Result<Vec<u8>, ReceiveError>>;
15
16pin_project! {
17 pub struct SubscribeStream {
18 stream: InputStream,
19 }
20}
21
22impl SubscribeStream {
23 pub(crate) fn new(stream: InputStream) -> Self {
24 Self { stream }
25 }
26}
27
28impl fmt::Debug for SubscribeStream {
29 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30 f.debug_struct("SubscribeStream").finish()
31 }
32}
33
34impl Stream for SubscribeStream {
35 type Item = Result<SubscribeUpdate, ReceiveError>;
36
37 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
38 let mut me = self.project();
39 Poll::Ready(match ready!(Pin::new(&mut me.stream).poll_next(cx)) {
40 Some(Ok(slice)) => Some(SubscribeUpdate::decode(slice.as_slice()).map_err(Into::into)),
41 Some(Err(error)) => Some(Err(error)),
42 None => None,
43 })
44 }
45}