signalrs_client/
stream_ext.rs

1use crate::{messages::{SerializationError, MessageEncoding, ClientMessage}, protocol::Completion};
2
3use futures::{Stream, StreamExt};
4use std::{
5    pin::Pin,
6    task::{Context, Poll},
7};
8use tracing::*;
9
10pub(crate) trait SignalRStreamExt: Sized {
11    fn append_completion(
12        self,
13        stream_id: String,
14        encoding: MessageEncoding,
15    ) -> AppendCompletion<Self>;
16}
17
18impl<T> SignalRStreamExt for T
19where
20    T: Stream,
21{
22    fn append_completion(
23        self,
24        stream_id: String,
25        encoding: MessageEncoding,
26    ) -> AppendCompletion<Self> {
27        AppendCompletion {
28            stream_id,
29            encoding,
30            inner: self,
31            finished: false,
32        }
33    }
34}
35
36pub(crate) struct AppendCompletion<S> {
37    stream_id: String,
38    encoding: MessageEncoding,
39    inner: S,
40    finished: bool,
41}
42
43impl<S> AppendCompletion<S> {
44    fn get_ok_completion(&self) -> ClientMessage {
45        let completion = Completion::<()>::ok(self.stream_id.clone());
46        let serialized = self.encoding.serialize(completion).unwrap_or_else(|error| {
47            event!(Level::ERROR, error = error.to_string(), "serialization error");
48            self.get_infallible_completion()
49        });
50
51        serialized
52    }
53
54    fn get_error_completion(&self, error: String) -> ClientMessage {
55        let completion = Completion::<()>::error(self.stream_id.clone(), error);
56        let serialized = self.encoding.serialize(completion).unwrap_or_else(|error| {
57            event!(Level::ERROR, error = error.to_string(), "serialization error");
58            self.get_infallible_completion()
59        });
60
61        serialized
62    }
63
64    fn get_infallible_completion(&self) -> ClientMessage {
65        let completion = Completion::<()>::error(self.stream_id.clone(), "error in a stream");
66        self.encoding.serialize(completion).unwrap()
67    }
68}
69
70impl<S> Stream for AppendCompletion<S>
71where
72    S: Stream<Item = Result<ClientMessage, SerializationError>> + Unpin,
73{
74    type Item = ClientMessage;
75
76    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
77        if self.finished {
78            return Poll::Ready(None);
79        }
80
81        match self.inner.poll_next_unpin(cx) {
82            Poll::Ready(Some(Ok(message))) => Poll::Ready(Some(message)),
83            Poll::Ready(Some(Err(error))) => {
84                let error = error.to_string();
85                event!(Level::ERROR, error, "error in stream");
86                let serialized = self.get_error_completion(error);
87                self.finished = true;
88                Poll::Ready(Some(serialized))
89            }
90            Poll::Ready(None) => {
91                let completion = self.get_ok_completion();
92                self.finished = true;
93                Poll::Ready(Some(completion))
94            }
95            Poll::Pending => Poll::Pending,
96        }
97    }
98}