reactive_messaging/socket_connection/
common.rs

1//! Contains some functions and other goodies used across this module
2
3
4use crate::{
5    config::{
6        ConstConfig,
7        RetryingStrategies,
8    },
9};
10use reactive_mutiny::prelude::{GenericUni, MutinyStream,FullDuplexUniChannel};
11use std::{
12    fmt::Debug,
13    future::{self},
14    sync::Arc,
15    time::{Duration, SystemTime},
16};
17use futures::{Stream, StreamExt};
18use keen_retry::ExponentialJitter;
19use log::{trace, warn};
20use crate::prelude::Peer;
21use crate::serde::ReactiveMessagingSerializer;
22use crate::types::ResponsiveStream;
23
24/// Upgrades a standard `GenericUni` to a version able to retry, as dictated by `CONFIG`
25pub fn upgrade_processor_uni_retrying_logic<const CONFIG: u64,
26                                            ItemType:        Send + Sync + Debug + 'static,
27                                            DerivedItemType: Send + Sync + Debug + 'static,
28                                            OriginalUni:     GenericUni<ItemType=ItemType, DerivedItemType=DerivedItemType> + Send + Sync>
29                                           (running_uni: Arc<OriginalUni>)
30                                           -> ReactiveMessagingUniSender<CONFIG, ItemType, DerivedItemType, OriginalUni> {
31    ReactiveMessagingUniSender::<CONFIG,
32                                 ItemType,
33                                 DerivedItemType,
34                                 OriginalUni>::new(running_uni)
35}
36
37/// Our special sender over a [Uni], adding
38/// retrying logic & connection control return values
39/// -- used to "send" messages from the remote peer to the local processor `Stream`
40pub struct ReactiveMessagingUniSender<const CONFIG: u64,
41                                      RemoteMessages:         Send + Sync + Debug + 'static,
42                                      ConsumedRemoteMessages: Send + Sync + Debug + 'static,
43                                      OriginalUni:            GenericUni<ItemType=RemoteMessages, DerivedItemType=ConsumedRemoteMessages> + Send + Sync> {
44    uni: Arc<OriginalUni>,
45}
46impl<const CONFIG: u64,
47     RemoteMessages:         Send + Sync + Debug + 'static,
48     ConsumedRemoteMessages: Send + Sync + Debug + 'static,
49     OriginalUni:            GenericUni<ItemType=RemoteMessages, DerivedItemType=ConsumedRemoteMessages> + Send + Sync>
50ReactiveMessagingUniSender<CONFIG, RemoteMessages, ConsumedRemoteMessages, OriginalUni> {
51
52    const CONST_CONFIG: ConstConfig = ConstConfig::from(CONFIG);
53
54    /// Takes in a [Uni] (already configured and under execution) and wrap it to allow
55    /// our special [Self::send()] to operate on it
56    pub fn new(running_uni: Arc<OriginalUni>) -> Self {
57        Self {
58            uni: running_uni,
59        }
60    }
61
62    /// mapper for eventual first-time-being retrying attempts -- or for fatal errors that might happen during retrying
63    fn retry_error_mapper(abort: bool, error_msg: String) -> ((), (bool, String) ) {
64        ( (), (abort, error_msg) )
65    }
66    /// mapper for any fatal errors that happens on the first attempt (which should not happen in the current `reactive-mutiny` Uni Channel API)
67    fn first_attempt_error_mapper<T>(_: T, _: ()) -> ((), (bool, String) ) {
68        panic!("reactive-messaging: send_to_local_processor(): BUG! `Uni` channel is expected never to fail fatably. Please, fix!")
69    }
70
71    /// Routes a received `message` (from a remote peer) to the local processor, honoring the configured retrying options.\
72    /// Returns `Ok` if sent, `Err(details)` if sending was not possible, where `details` contain:
73    ///   - `(abort?, error_message, unsent_message)`
74    #[inline(always)]
75    pub async fn send(&self,
76                      message: RemoteMessages)
77                     -> Result<(), (/*abort?*/bool, /*error_message: */String)> {
78
79        let retryable = self.uni.send(message);
80        match Self::CONST_CONFIG.retrying_strategy {
81            RetryingStrategies::DoNotRetry => {
82                retryable
83                    .map_input_and_errors(
84                        Self::first_attempt_error_mapper,
85                        |message, _err|
86                            Self::retry_error_mapper(false, format!("Relaying received message '{:?}' to the internal processor failed. Won't retry (ignoring the error) due to retrying config {:?}",
87                                                                                    message, Self::CONST_CONFIG.retrying_strategy)) )
88                    .into_result()
89            },
90            RetryingStrategies::EndCommunications => {
91                retryable
92                    .map_input_and_errors(
93                        Self::first_attempt_error_mapper,
94                        |message, _err|
95                            Self::retry_error_mapper(false, format!("Relaying received message '{:?}' to the internal processor failed. Connection will be aborted (without retrying) due to retrying config {:?}",
96                                                                                    message, Self::CONST_CONFIG.retrying_strategy)) )
97                    .into_result()
98            },
99            RetryingStrategies::RetryWithBackoffUpTo(attempts) => {
100                retryable
101                    .map_input(|message| ( message, SystemTime::now()) )
102                    .retry_with_async(|(message, retry_start)| future::ready(
103                        self.uni.send(message)
104                            .map_input(|message| (message, retry_start) )
105                    ))
106                    .with_exponential_jitter(|| ExponentialJitter::FromBackoffRange {
107                        backoff_range_millis: 1..=(2.526_f32.powi(attempts as i32) as u32),
108                        re_attempts: attempts,
109                        jitter_ratio: 0.2,
110                    })
111                    .await
112                    .map_input_and_errors(
113                        |(message, retry_start), _fatal_err|
114                            Self::retry_error_mapper(true, format!("Relaying received message '{:?}' to the internal processor failed. Connection will be aborted (after exhausting all retries in {:?}) due to retrying config {:?}",
115                                                                                   message, retry_start.elapsed(), Self::CONST_CONFIG.retrying_strategy)),
116                        |_| (false, String::with_capacity(0)) )
117                    .into()
118            },
119            RetryingStrategies::RetryYieldingForUpToMillis(millis) => {
120                retryable
121                    .map_input(|message| ( message, SystemTime::now()) )
122                    .retry_with_async(|(message, retry_start)| future::ready(
123                        self.uni.send(message)
124                            .map_input(|message| (message, retry_start) )
125                    ))
126                    .yielding_until_timeout(Duration::from_millis(millis as u64), || ())
127                    .await
128                    .map_input_and_errors(
129                        |(message, retry_start), _fatal_err|
130                            Self::retry_error_mapper(true, format!("Relaying received message '{:?}' to the internal processor failed. Connection will be aborted (after exhausting all retries in {:?}) due to retrying config {:?}",
131                                                                                   message, retry_start.elapsed(), Self::CONST_CONFIG.retrying_strategy)),
132                        |_| (false, String::with_capacity(0)) )
133                    .into()
134            },
135            RetryingStrategies::RetrySpinningForUpToMillis(_millis) => {
136                // this option is deprecated
137                unreachable!()
138            },
139        }
140    }
141
142    /// See [GenericUni::pending_items_count()]
143    #[inline(always)]
144    pub fn pending_items_count(&self) -> u32 {
145        self.uni.pending_items_count()
146    }
147
148    /// See [GenericUni::buffer_size()]
149    #[inline(always)]
150    pub fn buffer_size(&self) -> u32 {
151        self.uni.buffer_size()
152    }
153
154    /// See [GenericUni::close()]
155    pub async fn close(&self, timeout: Duration) -> bool {
156        self.uni.close(timeout).await
157    }
158}
159
160/// Our special "sender of messages to the remote peer" over a `reactive-mutiny`s [FullDuplexUniChannel], adding
161/// retrying logic & connection control return values
162/// -- used to send messages to the remote peer
163pub struct ReactiveMessagingSender<const CONFIG:    u64,
164                                   LocalMessages:   ReactiveMessagingSerializer<LocalMessages>                                  + Send + Sync + PartialEq + Debug + 'static,
165                                   OriginalChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync> {
166    channel: Arc<OriginalChannel>,
167}
168impl<const CONFIG: u64,
169     LocalMessages:   ReactiveMessagingSerializer<LocalMessages>                                  + Send + Sync + PartialEq + Debug,
170     OriginalChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync>
171ReactiveMessagingSender<CONFIG, LocalMessages, OriginalChannel> {
172
173    /// The instance config this generic implementation adheres to
174    pub const CONST_CONFIG: ConstConfig = ConstConfig::from(CONFIG);
175
176    /// Instantiates a new `channel` (from `reactive-mutiny`, with type `Self::SenderChannelType`) and wrap in a way to allow
177    /// our special [Self::send()] to operate on
178    pub fn new<IntoString: Into<String>>(channel_name: IntoString) -> Self {
179        Self {
180            channel: OriginalChannel::new(channel_name.into()),
181        }
182    }
183
184    pub fn create_stream(&self) -> (MutinyStream<'static, LocalMessages, OriginalChannel, LocalMessages>, u32) {
185        self.channel.create_stream()
186    }
187
188    #[inline(always)]
189    pub fn pending_items_count(&self) -> u32 {
190        self.channel.pending_items_count()
191    }
192
193    #[inline(always)]
194    pub fn buffer_size(&self) -> u32 {
195        self.channel.buffer_size()
196    }
197
198    pub async fn flush_and_close(&self, timeout: Duration) -> u32 {
199        self.channel.gracefully_end_all_streams(timeout).await
200    }
201
202    pub fn cancel_and_close(&self) {
203        self.channel.cancel_all_streams();
204    }
205
206    /// Routes `message` to the remote peer,
207    /// honoring the configured retrying options.
208    /// On error, returns whether the connection should be dropped or not.\
209    /// Returns `Ok` if sent successfully, `Err(details)` if sending was not possible, where `details` contain:
210    ///   - `(abort_the_connection?, error_message)`
211    /// See [Self::send_async_trait()] if your retrying strategy sleeps, and you are calling this from an async context.
212    #[inline(always)]
213    pub fn send(&self,
214                message: LocalMessages)
215               -> Result<(), (/*abort_the_connection?*/bool, /*error_message: */String)> {
216
217        let retryable = self.channel.send(message);
218        match Self::CONST_CONFIG.retrying_strategy {
219            RetryingStrategies::DoNotRetry => {
220                retryable
221                    .map_input_and_errors(
222                        Self::first_attempt_error_mapper,
223                        |message, _err|
224                            Self::retry_error_mapper(false, format!("sync-Sending '{:?}' failed. Won't retry (ignoring the error) due to retrying config {:?}",
225                                                                                    message, Self::CONST_CONFIG.retrying_strategy)) )
226                    .into_result()
227            },
228            RetryingStrategies::EndCommunications => {
229                retryable
230                    .map_input_and_errors(
231                        Self::first_attempt_error_mapper,
232                        |message, _err|
233                            Self::retry_error_mapper(true, format!("sync-Sending '{:?}' failed. Connection will be aborted (without retrying) due to retrying config {:?}",
234                                                                                   message, Self::CONST_CONFIG.retrying_strategy)) )
235                    .into_result()
236            },
237            RetryingStrategies::RetryWithBackoffUpTo(attempts) => {
238                retryable
239                    .map_input(|message| ( message, SystemTime::now()) )
240                    .retry_with(|(message, retry_start)|
241                        self.channel.send(message)
242                            .map_input(|message| (message, retry_start) )
243                    )
244                    .with_exponential_jitter(|| ExponentialJitter::FromBackoffRange {
245                        backoff_range_millis: 1..=(2.526_f32.powi(attempts as i32) as u32),
246                        re_attempts: attempts,
247                        jitter_ratio: 0.2,
248                    })
249                    .map_input_and_errors(
250                        |(message, retry_start), _fatal_err|
251                            Self::retry_error_mapper(true, format!("sync-Sending '{:?}' failed. Connection will be aborted (after exhausting all retries in {:?}) due to retrying config {:?}",
252                                                                                   message, retry_start.elapsed(), Self::CONST_CONFIG.retrying_strategy)),
253                        |_| (false, String::with_capacity(0)) )
254                    .into()
255            },
256            RetryingStrategies::RetryYieldingForUpToMillis(millis) => {
257                retryable
258                    .map_input(|message| ( message, SystemTime::now()) )
259                    .retry_with(|(message, retry_start)|
260                        self.channel.send(message)
261                            .map_input(|message| (message, retry_start) )
262                    )
263                    .spinning_until_timeout(Duration::from_millis(millis as u64), ())
264                    .map_input_and_errors(
265                        |(message, retry_start), _fatal_err|
266                            Self::retry_error_mapper(true, format!("sync-Sending '{:?}' failed. Connection will be aborted (after exhausting all retries in {:?}) due to retrying config {:?}",
267                                                                                   message, retry_start.elapsed(), Self::CONST_CONFIG.retrying_strategy)),
268                        |_| (false, String::with_capacity(0)) )
269                    .into()
270            },
271            RetryingStrategies::RetrySpinningForUpToMillis(_millis) => {
272                // this option is deprecated
273                unreachable!()
274            },
275        }
276    }
277
278    /// Similar to [Self::send()], but async.
279    /// The name contains `async_trait` to emphasize that there is a performance loss when calling this function through the trait:
280    /// A boxing + dynamic dispatch -- this cost is not charged when calling this function from the implementing type directly.
281    /// Depending on your retrying strategy, it might be preferred to use [Self::send()] instead -- knowing it will cause the whole thread to sleep,
282    /// when retrying, instead of causing only the task to sleep (as done here).
283    #[inline(always)]
284    pub async fn send_async_trait(&self,
285                                  message: LocalMessages)
286                                 -> Result<(), (/*abort_the_connection?*/bool, /*error_message: */String)> {
287
288        let retryable = self.channel.send(message);
289        match Self::CONST_CONFIG.retrying_strategy {
290            RetryingStrategies::DoNotRetry => {
291                retryable
292                    .map_input_and_errors(
293                        Self::first_attempt_error_mapper,
294                        |message, _err|
295                            Self::retry_error_mapper(false, format!("async-Sending '{:?}' failed. Won't retry (ignoring the error) due to retrying config {:?}",
296                                                                                    message, Self::CONST_CONFIG.retrying_strategy)) )
297                    .into_result()
298            },
299            RetryingStrategies::EndCommunications => {
300                retryable
301                    .map_input_and_errors(
302                        Self::first_attempt_error_mapper,
303                        |message, _err|
304                            Self::retry_error_mapper(true, format!("async-Sending '{:?}' failed. Connection will be aborted (without retrying) due to retrying config {:?}",
305                                                                                   message, Self::CONST_CONFIG.retrying_strategy)) )
306                    .into_result()
307            },
308            RetryingStrategies::RetryWithBackoffUpTo(attempts) => {
309                retryable
310                    .map_input(|message| ( message, SystemTime::now()) )
311                    .retry_with_async(|(message, retry_start)| future::ready(
312                        self.channel.send(message)
313                            .map_input(|message| (message, retry_start) )
314                    ))
315                    .with_exponential_jitter(|| ExponentialJitter::FromBackoffRange {
316                        backoff_range_millis: 1..=(2.526_f32.powi(attempts as i32) as u32),
317                        re_attempts: attempts,
318                        jitter_ratio: 0.2,
319                    })
320                    .await
321                    .map_input_and_errors(
322                        |(message, retry_start), _fatal_err|
323                            Self::retry_error_mapper(true, format!("async-Sending '{:?}' failed. Connection will be aborted (after exhausting all retries in {:?}) due to retrying config {:?}",
324                                                                                   message, retry_start.elapsed(), Self::CONST_CONFIG.retrying_strategy)),
325                        |_| (false, String::with_capacity(0)) )
326                    .into()
327            },
328            RetryingStrategies::RetryYieldingForUpToMillis(millis) => {
329                retryable
330                    .map_input(|message| ( message, SystemTime::now()) )
331                    .retry_with_async(|(message, retry_start)| future::ready(
332                        self.channel.send(message)
333                            .map_input(|message| (message, retry_start) )
334                    ))
335                    .yielding_until_timeout(Duration::from_millis(millis as u64), || ())
336                    .await
337                    .map_input_and_errors(
338                        |(message, retry_start), _fatal_err|
339                            Self::retry_error_mapper(true, format!("async-Sending '{:?}' failed. Connection will be aborted (after exhausting all retries in {:?}) due to retrying config {:?}",
340                                                                                   message, retry_start.elapsed(), Self::CONST_CONFIG.retrying_strategy)),
341                        |_| (false, String::with_capacity(0)) )
342                    .into()
343            },
344            RetryingStrategies::RetrySpinningForUpToMillis(_millis) => {
345                // this option is deprecated
346                unreachable!()
347            },
348        }
349    }
350
351    /// mapper for eventual first-time-being retrying attempts -- or for fatal errors that might happen during retrying
352    fn retry_error_mapper(abort: bool, error_msg: String) -> ((), (bool, String) ) {
353        ( (), (abort, error_msg) )
354    }
355    /// mapper for any fatal errors that happens on the first attempt (which should not happen in the current `reactive-mutiny` Uni Channel API)
356    fn first_attempt_error_mapper<T>(_: T, _: ()) -> ((), (bool, String) ) {
357        panic!("reactive-messaging: send_to_remote_peer(): BUG! `Uni` channel is expected never to fail fatably. Please, fix!")
358    }
359
360}
361
362impl<const CONFIG:        u64,
363     T:                   ?Sized,
364     LocalMessagesType:   ReactiveMessagingSerializer<LocalMessagesType>                                      + Send + Sync + PartialEq + Debug,
365     SenderChannel:       FullDuplexUniChannel<ItemType=LocalMessagesType, DerivedItemType=LocalMessagesType> + Send + Sync,
366     StateType:                                                                                                 Send + Sync + Clone     + Debug>
367ResponsiveStream<CONFIG, LocalMessagesType, SenderChannel, StateType>
368for T where T: Stream<Item=LocalMessagesType> {
369
370    #[inline(always)]
371    fn to_responsive_stream<YieldedItemType>
372
373                           (self,
374                            peer:            Arc<Peer<CONFIG, LocalMessagesType, SenderChannel, StateType>>,
375                            mut item_mapper: impl FnMut(&LocalMessagesType, &Arc<Peer<CONFIG, LocalMessagesType, SenderChannel, StateType>>) -> YieldedItemType)
376
377                           -> impl Stream<Item = YieldedItemType>
378
379                           where Self: Sized + Stream<Item = LocalMessagesType> {
380
381        let flush_timeout_millis = peer.config().flush_timeout_millis;
382
383        // send back each message
384        self.map(move |outgoing| {
385            trace!("`to_responsive_stream()`: Sending Answer `{:?}` to {:?} (peer id {})", outgoing, peer.peer_address, peer.peer_id);
386            let remapped_item = item_mapper(&outgoing, &peer);
387            if let Err((abort, error_msg)) = peer.send(outgoing) {
388                // peer is slow-reading -- and, possibly, fast sending
389                warn!("`to_responsive_stream()`: Slow reader detected while sending to {peer:?}: {error_msg}");
390                if abort {
391                    std::thread::sleep(Duration::from_millis(flush_timeout_millis as u64));
392                    peer.cancel_and_close();
393                }
394            }
395            remapped_item
396        })
397
398    }
399}
400
401/// Common test code for this module
402#[cfg(any(test,doc))]
403mod tests {
404    use crate::serde::{ReactiveMessagingDeserializer, ReactiveMessagingSerializer};
405
406    /// Test implementation for our text-only protocol as used across this module
407    impl ReactiveMessagingSerializer<String> for String {
408        #[inline(always)]
409        fn serialize(message: &String, buffer: &mut Vec<u8>) {
410            buffer.clear();
411            buffer.extend_from_slice(message.as_bytes());
412        }
413        #[inline(always)]
414        fn processor_error_message(err: String) -> String {
415            let msg = format!("ServerBug! Please, fix! Error: {}", err);
416            panic!("SocketServerSerializer<String>::processor_error_message(): {}", msg);
417            // msg
418        }
419    }
420
421    /// Testable implementation for our text-only protocol as used across this module
422    impl ReactiveMessagingDeserializer<String> for String {
423        #[inline(always)]
424        fn deserialize(message: &[u8]) -> Result<String, Box<dyn std::error::Error + Sync + Send + 'static>> {
425            Ok(String::from_utf8_lossy(message).to_string())
426        }
427    }
428
429}