proc_heim/process/
message_stream.rs1use super::{Message, ReceiveDeserializedMessageError, ReceiveMessageError};
2use std::{fmt::Debug, string::FromUtf8Error};
3
4#[cfg(any(feature = "json", feature = "message-pack"))]
5use super::{MessageFormat, SerdeError};
6
7use futures::{future, Stream, StreamExt};
8
9impl<S: ?Sized> MessageStreamExt for S where S: Stream<Item = Message> {}
10
11pub trait MessageStreamExt: Stream<Item = Message> {
13 fn into_bytes_stream(self) -> impl Stream<Item = Vec<u8>>
15 where
16 Self: Sized,
17 {
18 self.map(Message::into)
19 }
20
21 fn into_string_stream(self) -> impl Stream<Item = Result<String, FromUtf8Error>>
24 where
25 Self: Sized,
26 {
27 self.map(Message::try_into_string)
28 }
29
30 #[cfg(any(feature = "json", feature = "message-pack"))]
33 fn into_deserialized_stream<T>(
34 self,
35 format: &MessageFormat,
36 ) -> impl Stream<Item = Result<T, SerdeError>>
37 where
38 T: for<'de> serde::Deserialize<'de>,
39 Self: Sized,
40 {
41 self.map(|message| message.deserialize(format))
42 }
43}
44
45impl<S: ?Sized> TryMessageStreamExt for S where
46 S: Stream<Item = Result<Message, ReceiveMessageError>>
47{
48}
49
50pub trait TryMessageStreamExt: Stream<Item = Result<Message, ReceiveMessageError>> {
52 fn into_bytes_stream(self) -> impl Stream<Item = Result<Vec<u8>, ReceiveMessageError>>
54 where
55 Self: Sized,
56 {
57 self.map(|result| result.map(Message::into_bytes))
58 }
59
60 fn into_string_stream(
63 self,
64 ) -> impl Stream<Item = Result<String, ReceiveDeserializedMessageError>>
65 where
66 Self: Sized,
67 {
68 self.map(to_string)
69 }
70
71 #[cfg(any(feature = "json", feature = "message-pack"))]
74 fn into_deserialized_stream<T>(
75 self,
76 format: &MessageFormat,
77 ) -> impl Stream<Item = Result<T, ReceiveDeserializedMessageError>>
78 where
79 T: for<'de> serde::Deserialize<'de>,
80 Self: Sized,
81 {
82 self.map(|result| deserialize_message(result, format))
83 }
84
85 fn ignore_lost_messages(self) -> impl Stream<Item = Message>
88 where
89 Self: Sized,
90 {
91 self.filter(|result| future::ready(result.is_ok()))
92 .map(Result::unwrap)
93 }
94}
95
96#[allow(unused)]
97fn to_string(
98 result: Result<Message, ReceiveMessageError>,
99) -> Result<String, ReceiveDeserializedMessageError> {
100 result?.try_into_string().map_err(|err| {
101 ReceiveDeserializedMessageError::CannotDeserializeMessage(format!(
102 "Cannot deserialize data from raw bytes to string: {err}"
103 ))
104 })
105}
106
107#[cfg(any(feature = "json", feature = "message-pack"))]
108fn deserialize_message<T: for<'de> serde::Deserialize<'de>>(
109 result: Result<Message, ReceiveMessageError>,
110 format: &MessageFormat,
111) -> Result<T, ReceiveDeserializedMessageError> {
112 result?
113 .deserialize(format)
114 .map_err(|err| ReceiveDeserializedMessageError::CannotDeserializeMessage(err.to_string()))
115}
116
117impl<S: ?Sized, T, E: Debug> ResultStreamExt<T, E> for S where S: Stream<Item = Result<T, E>> {}
118
119pub trait ResultStreamExt<T, E: Debug>: Stream<Item = Result<T, E>> {
121 fn filter_ok(self) -> impl Stream<Item = T>
123 where
124 Self: Sized,
125 {
126 self.filter(|result| future::ready(result.is_ok()))
127 .map(Result::unwrap)
128 }
129}