1use crate::{
5 config::{
6 ConstConfig,
7 RetryingStrategies,
8 },
9};
10use reactive_mutiny::prelude::{GenericUni, MutinyStream,FullDuplexUniChannel};
11use std::{
12 fmt::Debug,
13 future::{self},
14 sync::Arc,
15 time::{Duration, SystemTime},
16};
17use futures::{Stream, StreamExt};
18use keen_retry::ExponentialJitter;
19use log::{trace, warn};
20use crate::prelude::Peer;
21use crate::serde::ReactiveMessagingSerializer;
22use crate::types::ResponsiveStream;
23
24pub fn upgrade_processor_uni_retrying_logic<const CONFIG: u64,
26 ItemType: Send + Sync + Debug + 'static,
27 DerivedItemType: Send + Sync + Debug + 'static,
28 OriginalUni: GenericUni<ItemType=ItemType, DerivedItemType=DerivedItemType> + Send + Sync>
29 (running_uni: Arc<OriginalUni>)
30 -> ReactiveMessagingUniSender<CONFIG, ItemType, DerivedItemType, OriginalUni> {
31 ReactiveMessagingUniSender::<CONFIG,
32 ItemType,
33 DerivedItemType,
34 OriginalUni>::new(running_uni)
35}
36
37pub struct ReactiveMessagingUniSender<const CONFIG: u64,
41 RemoteMessages: Send + Sync + Debug + 'static,
42 ConsumedRemoteMessages: Send + Sync + Debug + 'static,
43 OriginalUni: GenericUni<ItemType=RemoteMessages, DerivedItemType=ConsumedRemoteMessages> + Send + Sync> {
44 uni: Arc<OriginalUni>,
45}
46impl<const CONFIG: u64,
47 RemoteMessages: Send + Sync + Debug + 'static,
48 ConsumedRemoteMessages: Send + Sync + Debug + 'static,
49 OriginalUni: GenericUni<ItemType=RemoteMessages, DerivedItemType=ConsumedRemoteMessages> + Send + Sync>
50ReactiveMessagingUniSender<CONFIG, RemoteMessages, ConsumedRemoteMessages, OriginalUni> {
51
52 const CONST_CONFIG: ConstConfig = ConstConfig::from(CONFIG);
53
54 pub fn new(running_uni: Arc<OriginalUni>) -> Self {
57 Self {
58 uni: running_uni,
59 }
60 }
61
62 fn retry_error_mapper(abort: bool, error_msg: String) -> ((), (bool, String) ) {
64 ( (), (abort, error_msg) )
65 }
66 fn first_attempt_error_mapper<T>(_: T, _: ()) -> ((), (bool, String) ) {
68 panic!("reactive-messaging: send_to_local_processor(): BUG! `Uni` channel is expected never to fail fatably. Please, fix!")
69 }
70
71 #[inline(always)]
75 pub async fn send(&self,
76 message: RemoteMessages)
77 -> Result<(), (bool, String)> {
78
79 let retryable = self.uni.send(message);
80 match Self::CONST_CONFIG.retrying_strategy {
81 RetryingStrategies::DoNotRetry => {
82 retryable
83 .map_input_and_errors(
84 Self::first_attempt_error_mapper,
85 |message, _err|
86 Self::retry_error_mapper(false, format!("Relaying received message '{:?}' to the internal processor failed. Won't retry (ignoring the error) due to retrying config {:?}",
87 message, Self::CONST_CONFIG.retrying_strategy)) )
88 .into_result()
89 },
90 RetryingStrategies::EndCommunications => {
91 retryable
92 .map_input_and_errors(
93 Self::first_attempt_error_mapper,
94 |message, _err|
95 Self::retry_error_mapper(false, format!("Relaying received message '{:?}' to the internal processor failed. Connection will be aborted (without retrying) due to retrying config {:?}",
96 message, Self::CONST_CONFIG.retrying_strategy)) )
97 .into_result()
98 },
99 RetryingStrategies::RetryWithBackoffUpTo(attempts) => {
100 retryable
101 .map_input(|message| ( message, SystemTime::now()) )
102 .retry_with_async(|(message, retry_start)| future::ready(
103 self.uni.send(message)
104 .map_input(|message| (message, retry_start) )
105 ))
106 .with_exponential_jitter(|| ExponentialJitter::FromBackoffRange {
107 backoff_range_millis: 1..=(2.526_f32.powi(attempts as i32) as u32),
108 re_attempts: attempts,
109 jitter_ratio: 0.2,
110 })
111 .await
112 .map_input_and_errors(
113 |(message, retry_start), _fatal_err|
114 Self::retry_error_mapper(true, format!("Relaying received message '{:?}' to the internal processor failed. Connection will be aborted (after exhausting all retries in {:?}) due to retrying config {:?}",
115 message, retry_start.elapsed(), Self::CONST_CONFIG.retrying_strategy)),
116 |_| (false, String::with_capacity(0)) )
117 .into()
118 },
119 RetryingStrategies::RetryYieldingForUpToMillis(millis) => {
120 retryable
121 .map_input(|message| ( message, SystemTime::now()) )
122 .retry_with_async(|(message, retry_start)| future::ready(
123 self.uni.send(message)
124 .map_input(|message| (message, retry_start) )
125 ))
126 .yielding_until_timeout(Duration::from_millis(millis as u64), || ())
127 .await
128 .map_input_and_errors(
129 |(message, retry_start), _fatal_err|
130 Self::retry_error_mapper(true, format!("Relaying received message '{:?}' to the internal processor failed. Connection will be aborted (after exhausting all retries in {:?}) due to retrying config {:?}",
131 message, retry_start.elapsed(), Self::CONST_CONFIG.retrying_strategy)),
132 |_| (false, String::with_capacity(0)) )
133 .into()
134 },
135 RetryingStrategies::RetrySpinningForUpToMillis(_millis) => {
136 unreachable!()
138 },
139 }
140 }
141
142 #[inline(always)]
144 pub fn pending_items_count(&self) -> u32 {
145 self.uni.pending_items_count()
146 }
147
148 #[inline(always)]
150 pub fn buffer_size(&self) -> u32 {
151 self.uni.buffer_size()
152 }
153
154 pub async fn close(&self, timeout: Duration) -> bool {
156 self.uni.close(timeout).await
157 }
158}
159
160pub struct ReactiveMessagingSender<const CONFIG: u64,
164 LocalMessages: ReactiveMessagingSerializer<LocalMessages> + Send + Sync + PartialEq + Debug + 'static,
165 OriginalChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync> {
166 channel: Arc<OriginalChannel>,
167}
168impl<const CONFIG: u64,
169 LocalMessages: ReactiveMessagingSerializer<LocalMessages> + Send + Sync + PartialEq + Debug,
170 OriginalChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync>
171ReactiveMessagingSender<CONFIG, LocalMessages, OriginalChannel> {
172
173 pub const CONST_CONFIG: ConstConfig = ConstConfig::from(CONFIG);
175
176 pub fn new<IntoString: Into<String>>(channel_name: IntoString) -> Self {
179 Self {
180 channel: OriginalChannel::new(channel_name.into()),
181 }
182 }
183
184 pub fn create_stream(&self) -> (MutinyStream<'static, LocalMessages, OriginalChannel, LocalMessages>, u32) {
185 self.channel.create_stream()
186 }
187
188 #[inline(always)]
189 pub fn pending_items_count(&self) -> u32 {
190 self.channel.pending_items_count()
191 }
192
193 #[inline(always)]
194 pub fn buffer_size(&self) -> u32 {
195 self.channel.buffer_size()
196 }
197
198 pub async fn flush_and_close(&self, timeout: Duration) -> u32 {
199 self.channel.gracefully_end_all_streams(timeout).await
200 }
201
202 pub fn cancel_and_close(&self) {
203 self.channel.cancel_all_streams();
204 }
205
206 #[inline(always)]
213 pub fn send(&self,
214 message: LocalMessages)
215 -> Result<(), (bool, String)> {
216
217 let retryable = self.channel.send(message);
218 match Self::CONST_CONFIG.retrying_strategy {
219 RetryingStrategies::DoNotRetry => {
220 retryable
221 .map_input_and_errors(
222 Self::first_attempt_error_mapper,
223 |message, _err|
224 Self::retry_error_mapper(false, format!("sync-Sending '{:?}' failed. Won't retry (ignoring the error) due to retrying config {:?}",
225 message, Self::CONST_CONFIG.retrying_strategy)) )
226 .into_result()
227 },
228 RetryingStrategies::EndCommunications => {
229 retryable
230 .map_input_and_errors(
231 Self::first_attempt_error_mapper,
232 |message, _err|
233 Self::retry_error_mapper(true, format!("sync-Sending '{:?}' failed. Connection will be aborted (without retrying) due to retrying config {:?}",
234 message, Self::CONST_CONFIG.retrying_strategy)) )
235 .into_result()
236 },
237 RetryingStrategies::RetryWithBackoffUpTo(attempts) => {
238 retryable
239 .map_input(|message| ( message, SystemTime::now()) )
240 .retry_with(|(message, retry_start)|
241 self.channel.send(message)
242 .map_input(|message| (message, retry_start) )
243 )
244 .with_exponential_jitter(|| ExponentialJitter::FromBackoffRange {
245 backoff_range_millis: 1..=(2.526_f32.powi(attempts as i32) as u32),
246 re_attempts: attempts,
247 jitter_ratio: 0.2,
248 })
249 .map_input_and_errors(
250 |(message, retry_start), _fatal_err|
251 Self::retry_error_mapper(true, format!("sync-Sending '{:?}' failed. Connection will be aborted (after exhausting all retries in {:?}) due to retrying config {:?}",
252 message, retry_start.elapsed(), Self::CONST_CONFIG.retrying_strategy)),
253 |_| (false, String::with_capacity(0)) )
254 .into()
255 },
256 RetryingStrategies::RetryYieldingForUpToMillis(millis) => {
257 retryable
258 .map_input(|message| ( message, SystemTime::now()) )
259 .retry_with(|(message, retry_start)|
260 self.channel.send(message)
261 .map_input(|message| (message, retry_start) )
262 )
263 .spinning_until_timeout(Duration::from_millis(millis as u64), ())
264 .map_input_and_errors(
265 |(message, retry_start), _fatal_err|
266 Self::retry_error_mapper(true, format!("sync-Sending '{:?}' failed. Connection will be aborted (after exhausting all retries in {:?}) due to retrying config {:?}",
267 message, retry_start.elapsed(), Self::CONST_CONFIG.retrying_strategy)),
268 |_| (false, String::with_capacity(0)) )
269 .into()
270 },
271 RetryingStrategies::RetrySpinningForUpToMillis(_millis) => {
272 unreachable!()
274 },
275 }
276 }
277
278 #[inline(always)]
284 pub async fn send_async_trait(&self,
285 message: LocalMessages)
286 -> Result<(), (bool, String)> {
287
288 let retryable = self.channel.send(message);
289 match Self::CONST_CONFIG.retrying_strategy {
290 RetryingStrategies::DoNotRetry => {
291 retryable
292 .map_input_and_errors(
293 Self::first_attempt_error_mapper,
294 |message, _err|
295 Self::retry_error_mapper(false, format!("async-Sending '{:?}' failed. Won't retry (ignoring the error) due to retrying config {:?}",
296 message, Self::CONST_CONFIG.retrying_strategy)) )
297 .into_result()
298 },
299 RetryingStrategies::EndCommunications => {
300 retryable
301 .map_input_and_errors(
302 Self::first_attempt_error_mapper,
303 |message, _err|
304 Self::retry_error_mapper(true, format!("async-Sending '{:?}' failed. Connection will be aborted (without retrying) due to retrying config {:?}",
305 message, Self::CONST_CONFIG.retrying_strategy)) )
306 .into_result()
307 },
308 RetryingStrategies::RetryWithBackoffUpTo(attempts) => {
309 retryable
310 .map_input(|message| ( message, SystemTime::now()) )
311 .retry_with_async(|(message, retry_start)| future::ready(
312 self.channel.send(message)
313 .map_input(|message| (message, retry_start) )
314 ))
315 .with_exponential_jitter(|| ExponentialJitter::FromBackoffRange {
316 backoff_range_millis: 1..=(2.526_f32.powi(attempts as i32) as u32),
317 re_attempts: attempts,
318 jitter_ratio: 0.2,
319 })
320 .await
321 .map_input_and_errors(
322 |(message, retry_start), _fatal_err|
323 Self::retry_error_mapper(true, format!("async-Sending '{:?}' failed. Connection will be aborted (after exhausting all retries in {:?}) due to retrying config {:?}",
324 message, retry_start.elapsed(), Self::CONST_CONFIG.retrying_strategy)),
325 |_| (false, String::with_capacity(0)) )
326 .into()
327 },
328 RetryingStrategies::RetryYieldingForUpToMillis(millis) => {
329 retryable
330 .map_input(|message| ( message, SystemTime::now()) )
331 .retry_with_async(|(message, retry_start)| future::ready(
332 self.channel.send(message)
333 .map_input(|message| (message, retry_start) )
334 ))
335 .yielding_until_timeout(Duration::from_millis(millis as u64), || ())
336 .await
337 .map_input_and_errors(
338 |(message, retry_start), _fatal_err|
339 Self::retry_error_mapper(true, format!("async-Sending '{:?}' failed. Connection will be aborted (after exhausting all retries in {:?}) due to retrying config {:?}",
340 message, retry_start.elapsed(), Self::CONST_CONFIG.retrying_strategy)),
341 |_| (false, String::with_capacity(0)) )
342 .into()
343 },
344 RetryingStrategies::RetrySpinningForUpToMillis(_millis) => {
345 unreachable!()
347 },
348 }
349 }
350
351 fn retry_error_mapper(abort: bool, error_msg: String) -> ((), (bool, String) ) {
353 ( (), (abort, error_msg) )
354 }
355 fn first_attempt_error_mapper<T>(_: T, _: ()) -> ((), (bool, String) ) {
357 panic!("reactive-messaging: send_to_remote_peer(): BUG! `Uni` channel is expected never to fail fatably. Please, fix!")
358 }
359
360}
361
362impl<const CONFIG: u64,
363 T: ?Sized,
364 LocalMessagesType: ReactiveMessagingSerializer<LocalMessagesType> + Send + Sync + PartialEq + Debug,
365 SenderChannel: FullDuplexUniChannel<ItemType=LocalMessagesType, DerivedItemType=LocalMessagesType> + Send + Sync,
366 StateType: Send + Sync + Clone + Debug>
367ResponsiveStream<CONFIG, LocalMessagesType, SenderChannel, StateType>
368for T where T: Stream<Item=LocalMessagesType> {
369
370 #[inline(always)]
371 fn to_responsive_stream<YieldedItemType>
372
373 (self,
374 peer: Arc<Peer<CONFIG, LocalMessagesType, SenderChannel, StateType>>,
375 mut item_mapper: impl FnMut(&LocalMessagesType, &Arc<Peer<CONFIG, LocalMessagesType, SenderChannel, StateType>>) -> YieldedItemType)
376
377 -> impl Stream<Item = YieldedItemType>
378
379 where Self: Sized + Stream<Item = LocalMessagesType> {
380
381 let flush_timeout_millis = peer.config().flush_timeout_millis;
382
383 self.map(move |outgoing| {
385 trace!("`to_responsive_stream()`: Sending Answer `{:?}` to {:?} (peer id {})", outgoing, peer.peer_address, peer.peer_id);
386 let remapped_item = item_mapper(&outgoing, &peer);
387 if let Err((abort, error_msg)) = peer.send(outgoing) {
388 warn!("`to_responsive_stream()`: Slow reader detected while sending to {peer:?}: {error_msg}");
390 if abort {
391 std::thread::sleep(Duration::from_millis(flush_timeout_millis as u64));
392 peer.cancel_and_close();
393 }
394 }
395 remapped_item
396 })
397
398 }
399}
400
401#[cfg(any(test,doc))]
403mod tests {
404 use crate::serde::{ReactiveMessagingDeserializer, ReactiveMessagingSerializer};
405
406 impl ReactiveMessagingSerializer<String> for String {
408 #[inline(always)]
409 fn serialize(message: &String, buffer: &mut Vec<u8>) {
410 buffer.clear();
411 buffer.extend_from_slice(message.as_bytes());
412 }
413 #[inline(always)]
414 fn processor_error_message(err: String) -> String {
415 let msg = format!("ServerBug! Please, fix! Error: {}", err);
416 panic!("SocketServerSerializer<String>::processor_error_message(): {}", msg);
417 }
419 }
420
421 impl ReactiveMessagingDeserializer<String> for String {
423 #[inline(always)]
424 fn deserialize(message: &[u8]) -> Result<String, Box<dyn std::error::Error + Sync + Send + 'static>> {
425 Ok(String::from_utf8_lossy(message).to_string())
426 }
427 }
428
429}