proc_heim/process/
message_stream.rs

1use super::{Message, ReceiveDeserializedMessageError, ReceiveMessageError};
2use std::{fmt::Debug, string::FromUtf8Error};
3
4#[cfg(any(feature = "json", feature = "message-pack"))]
5use super::{MessageFormat, SerdeError};
6
7use futures::{future, Stream, StreamExt};
8
9impl<S: ?Sized> MessageStreamExt for S where S: Stream<Item = Message> {}
10
11/// Adapters specific to `Message`-returning streams.
12pub trait MessageStreamExt: Stream<Item = Message> {
13    /// Convert stream's messages into bytes.
14    fn into_bytes_stream(self) -> impl Stream<Item = Vec<u8>>
15    where
16        Self: Sized,
17    {
18        self.map(Message::into)
19    }
20
21    /// Convert stream's messages into strings.
22    /// If the message cannot be deserialized from bytes, the [`FromUtf8Error`] error will be returned.
23    fn into_string_stream(self) -> impl Stream<Item = Result<String, FromUtf8Error>>
24    where
25        Self: Sized,
26    {
27        self.map(Message::try_into_string)
28    }
29
30    /// Convert stream's messages into `Rust` types implementing `Deserialize` trait.
31    /// If the message cannot be deserialized from bytes, the [`SerdeError::DeserializationFailure`] error will be returned.
32    #[cfg(any(feature = "json", feature = "message-pack"))]
33    fn into_deserialized_stream<T>(
34        self,
35        format: &MessageFormat,
36    ) -> impl Stream<Item = Result<T, SerdeError>>
37    where
38        T: for<'de> serde::Deserialize<'de>,
39        Self: Sized,
40    {
41        self.map(|message| message.deserialize(format))
42    }
43}
44
45impl<S: ?Sized> TryMessageStreamExt for S where
46    S: Stream<Item = Result<Message, ReceiveMessageError>>
47{
48}
49
50/// Adapters for streams yielding `Result<Message, ReceiveMessageError>`.
51pub trait TryMessageStreamExt: Stream<Item = Result<Message, ReceiveMessageError>> {
52    /// Convert stream's messages into bytes.
53    fn into_bytes_stream(self) -> impl Stream<Item = Result<Vec<u8>, ReceiveMessageError>>
54    where
55        Self: Sized,
56    {
57        self.map(|result| result.map(Message::into_bytes))
58    }
59
60    /// Convert stream's messages into strings.
61    /// If the message cannot be deserialized from bytes, the [`ReceiveDeserializedMessageError::CannotDeserializeMessage`] error will be returned.
62    fn into_string_stream(
63        self,
64    ) -> impl Stream<Item = Result<String, ReceiveDeserializedMessageError>>
65    where
66        Self: Sized,
67    {
68        self.map(to_string)
69    }
70
71    /// Convert stream's messages into `Rust` types implementing `Deserialize` trait.
72    /// If the message cannot be deserialized from bytes, the [`ReceiveDeserializedMessageError::CannotDeserializeMessage`] error will be returned.
73    #[cfg(any(feature = "json", feature = "message-pack"))]
74    fn into_deserialized_stream<T>(
75        self,
76        format: &MessageFormat,
77    ) -> impl Stream<Item = Result<T, ReceiveDeserializedMessageError>>
78    where
79        T: for<'de> serde::Deserialize<'de>,
80        Self: Sized,
81    {
82        self.map(|result| deserialize_message(result, format))
83    }
84
85    /// Ignore lost messages (see [`ProcessManagerHandle::subscribe_message_stream`](crate::manager::ProcessManagerHandle::subscribe_message_stream)), returning a stream yielding `Message` types.
86    /// The returned stream can be transformed further using [`MessageStreamExt`] trait.
87    fn ignore_lost_messages(self) -> impl Stream<Item = Message>
88    where
89        Self: Sized,
90    {
91        self.filter(|result| future::ready(result.is_ok()))
92            .map(Result::unwrap)
93    }
94}
95
96#[allow(unused)]
97fn to_string(
98    result: Result<Message, ReceiveMessageError>,
99) -> Result<String, ReceiveDeserializedMessageError> {
100    result?.try_into_string().map_err(|err| {
101        ReceiveDeserializedMessageError::CannotDeserializeMessage(format!(
102            "Cannot deserialize data from raw bytes to string: {err}"
103        ))
104    })
105}
106
107#[cfg(any(feature = "json", feature = "message-pack"))]
108fn deserialize_message<T: for<'de> serde::Deserialize<'de>>(
109    result: Result<Message, ReceiveMessageError>,
110    format: &MessageFormat,
111) -> Result<T, ReceiveDeserializedMessageError> {
112    result?
113        .deserialize(format)
114        .map_err(|err| ReceiveDeserializedMessageError::CannotDeserializeMessage(err.to_string()))
115}
116
117impl<S: ?Sized, T, E: Debug> ResultStreamExt<T, E> for S where S: Stream<Item = Result<T, E>> {}
118
119/// Adapters specific to `Result`-returning streams.
120pub trait ResultStreamExt<T, E: Debug>: Stream<Item = Result<T, E>> {
121    /// Returns a stream with filtered out `Err(T)` items.
122    fn filter_ok(self) -> impl Stream<Item = T>
123    where
124        Self: Sized,
125    {
126        self.filter(|result| future::ready(result.is_ok()))
127            .map(Result::unwrap)
128    }
129}