signalrs_client/
stream_ext.rs1use crate::{messages::{SerializationError, MessageEncoding, ClientMessage}, protocol::Completion};
2
3use futures::{Stream, StreamExt};
4use std::{
5 pin::Pin,
6 task::{Context, Poll},
7};
8use tracing::*;
9
10pub(crate) trait SignalRStreamExt: Sized {
11 fn append_completion(
12 self,
13 stream_id: String,
14 encoding: MessageEncoding,
15 ) -> AppendCompletion<Self>;
16}
17
18impl<T> SignalRStreamExt for T
19where
20 T: Stream,
21{
22 fn append_completion(
23 self,
24 stream_id: String,
25 encoding: MessageEncoding,
26 ) -> AppendCompletion<Self> {
27 AppendCompletion {
28 stream_id,
29 encoding,
30 inner: self,
31 finished: false,
32 }
33 }
34}
35
36pub(crate) struct AppendCompletion<S> {
37 stream_id: String,
38 encoding: MessageEncoding,
39 inner: S,
40 finished: bool,
41}
42
43impl<S> AppendCompletion<S> {
44 fn get_ok_completion(&self) -> ClientMessage {
45 let completion = Completion::<()>::ok(self.stream_id.clone());
46 let serialized = self.encoding.serialize(completion).unwrap_or_else(|error| {
47 event!(Level::ERROR, error = error.to_string(), "serialization error");
48 self.get_infallible_completion()
49 });
50
51 serialized
52 }
53
54 fn get_error_completion(&self, error: String) -> ClientMessage {
55 let completion = Completion::<()>::error(self.stream_id.clone(), error);
56 let serialized = self.encoding.serialize(completion).unwrap_or_else(|error| {
57 event!(Level::ERROR, error = error.to_string(), "serialization error");
58 self.get_infallible_completion()
59 });
60
61 serialized
62 }
63
64 fn get_infallible_completion(&self) -> ClientMessage {
65 let completion = Completion::<()>::error(self.stream_id.clone(), "error in a stream");
66 self.encoding.serialize(completion).unwrap()
67 }
68}
69
70impl<S> Stream for AppendCompletion<S>
71where
72 S: Stream<Item = Result<ClientMessage, SerializationError>> + Unpin,
73{
74 type Item = ClientMessage;
75
76 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
77 if self.finished {
78 return Poll::Ready(None);
79 }
80
81 match self.inner.poll_next_unpin(cx) {
82 Poll::Ready(Some(Ok(message))) => Poll::Ready(Some(message)),
83 Poll::Ready(Some(Err(error))) => {
84 let error = error.to_string();
85 event!(Level::ERROR, error, "error in stream");
86 let serialized = self.get_error_completion(error);
87 self.finished = true;
88 Poll::Ready(Some(serialized))
89 }
90 Poll::Ready(None) => {
91 let completion = self.get_ok_completion();
92 self.finished = true;
93 Poll::Ready(Some(completion))
94 }
95 Poll::Pending => Poll::Pending,
96 }
97 }
98}