use crate::{
config::{
ConstConfig,
RetryingStrategies,
},
ReactiveMessagingSerializer
};
use reactive_mutiny::prelude::{GenericUni, MutinyStream,FullDuplexUniChannel};
use std::{
fmt::Debug,
future::{self},
sync::Arc,
time::{Duration, SystemTime},
};
pub fn upgrade_processor_uni_retrying_logic<const CONFIG: u64,
ItemType: Send + Sync + Debug + 'static,
DerivedItemType: Send + Sync + Debug + 'static,
OriginalUni: GenericUni<ItemType=ItemType, DerivedItemType=DerivedItemType> + Send + Sync>
(running_uni: Arc<OriginalUni>)
-> ReactiveMessagingUniSender<CONFIG, ItemType, DerivedItemType, OriginalUni> {
ReactiveMessagingUniSender::<CONFIG,
ItemType,
DerivedItemType,
OriginalUni>::new(running_uni)
}
pub struct ReactiveMessagingUniSender<const CONFIG: u64,
RemoteMessages: Send + Sync + Debug + 'static,
ConsumedRemoteMessages: Send + Sync + Debug + 'static,
OriginalUni: GenericUni<ItemType=RemoteMessages, DerivedItemType=ConsumedRemoteMessages> + Send + Sync> {
uni: Arc<OriginalUni>,
}
impl<const CONFIG: u64,
RemoteMessages: Send + Sync + Debug + 'static,
ConsumedRemoteMessages: Send + Sync + Debug + 'static,
OriginalUni: GenericUni<ItemType=RemoteMessages, DerivedItemType=ConsumedRemoteMessages> + Send + Sync>
ReactiveMessagingUniSender<CONFIG, RemoteMessages, ConsumedRemoteMessages, OriginalUni> {
const CONST_CONFIG: ConstConfig = ConstConfig::from(CONFIG);
pub fn new(running_uni: Arc<OriginalUni>) -> Self {
Self {
uni: running_uni,
}
}
fn retry_error_mapper(abort: bool, error_msg: String) -> ((), (bool, String) ) {
( (), (abort, error_msg) )
}
fn first_attempt_error_mapper<T>(_: T, _: ()) -> ((), (bool, String) ) {
panic!("reactive-messaging: send_to_local_processor(): BUG! `Uni` channel is expected never to fail fatably. Please, fix!")
}
#[inline(always)]
pub async fn send(&self,
message: RemoteMessages)
-> Result<(), (bool, String)> {
let retryable = self.uni.send(message);
match Self::CONST_CONFIG.retrying_strategy {
RetryingStrategies::DoNotRetry => {
retryable
.map_input_and_errors(
Self::first_attempt_error_mapper,
|message, _err|
Self::retry_error_mapper(false, format!("Relaying received message '{:?}' to the internal processor failed. Won't retry (ignoring the error) due to retrying config {:?}",
message, Self::CONST_CONFIG.retrying_strategy)) )
.into_result()
},
RetryingStrategies::EndCommunications => {
retryable
.map_input_and_errors(
Self::first_attempt_error_mapper,
|message, _err|
Self::retry_error_mapper(false, format!("Relaying received message '{:?}' to the internal processor failed. Connection will be aborted (without retrying) due to retrying config {:?}",
message, Self::CONST_CONFIG.retrying_strategy)) )
.into_result()
},
RetryingStrategies::RetrySleepingArithmetically(steps) => {
retryable
.map_input(|message| ( message, SystemTime::now()) )
.retry_with_async(|(message, retry_start)| future::ready(
self.uni.send(message)
.map_input(|message| (message, retry_start) )
))
.with_delays((10..=(10*steps as u64)).step_by(10).map(Duration::from_millis))
.await
.map_input_and_errors(
|(message, retry_start), _fatal_err|
Self::retry_error_mapper(true, format!("Relaying received message '{:?}' to the internal processor failed. Connection will be aborted (after exhausting all retries in {:?}) due to retrying config {:?}",
message, retry_start.elapsed(), Self::CONST_CONFIG.retrying_strategy)),
|_| (false, String::with_capacity(0)) )
.into()
},
RetryingStrategies::RetryYieldingForUpToMillis(millis) => {
retryable
.map_input(|message| ( message, SystemTime::now()) )
.retry_with_async(|(message, retry_start)| future::ready(
self.uni.send(message)
.map_input(|message| (message, retry_start) )
))
.yielding_until_timeout(Duration::from_millis(millis as u64), || ())
.await
.map_input_and_errors(
|(message, retry_start), _fatal_err|
Self::retry_error_mapper(true, format!("Relaying received message '{:?}' to the internal processor failed. Connection will be aborted (after exhausting all retries in {:?}) due to retrying config {:?}",
message, retry_start.elapsed(), Self::CONST_CONFIG.retrying_strategy)),
|_| (false, String::with_capacity(0)) )
.into()
},
RetryingStrategies::RetrySpinningForUpToMillis(_millis) => {
unreachable!()
},
}
}
#[inline(always)]
pub fn pending_items_count(&self) -> u32 {
self.uni.pending_items_count()
}
#[inline(always)]
pub fn buffer_size(&self) -> u32 {
self.uni.buffer_size()
}
pub async fn close(&self, timeout: Duration) -> bool {
self.uni.close(timeout).await
}
}
pub struct ReactiveMessagingSender<const CONFIG: u64,
LocalMessages: ReactiveMessagingSerializer<LocalMessages> + Send + Sync + PartialEq + Debug + 'static,
OriginalChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync> {
channel: Arc<OriginalChannel>,
}
impl<const CONFIG: u64,
LocalMessages: ReactiveMessagingSerializer<LocalMessages> + Send + Sync + PartialEq + Debug,
OriginalChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync>
ReactiveMessagingSender<CONFIG, LocalMessages, OriginalChannel> {
pub const CONST_CONFIG: ConstConfig = ConstConfig::from(CONFIG);
pub fn new<IntoString: Into<String>>(channel_name: IntoString) -> Self {
Self {
channel: OriginalChannel::new(channel_name.into()),
}
}
pub fn create_stream(&self) -> (MutinyStream<'static, LocalMessages, OriginalChannel, LocalMessages>, u32) {
self.channel.create_stream()
}
#[inline(always)]
pub fn pending_items_count(&self) -> u32 {
self.channel.pending_items_count()
}
#[inline(always)]
pub fn buffer_size(&self) -> u32 {
self.channel.buffer_size()
}
pub async fn flush_and_close(&self, timeout: Duration) -> u32 {
self.channel.gracefully_end_all_streams(timeout).await
}
pub fn cancel_and_close(&self) {
self.channel.cancel_all_streams();
}
#[inline(always)]
pub fn send(&self,
message: LocalMessages)
-> Result<(), (bool, String)> {
let retryable = self.channel.send(message);
match Self::CONST_CONFIG.retrying_strategy {
RetryingStrategies::DoNotRetry => {
retryable
.map_input_and_errors(
Self::first_attempt_error_mapper,
|message, _err|
Self::retry_error_mapper(false, format!("sync-Sending '{:?}' failed. Won't retry (ignoring the error) due to retrying config {:?}",
message, Self::CONST_CONFIG.retrying_strategy)) )
.into_result()
},
RetryingStrategies::EndCommunications => {
retryable
.map_input_and_errors(
Self::first_attempt_error_mapper,
|message, _err|
Self::retry_error_mapper(true, format!("sync-Sending '{:?}' failed. Connection will be aborted (without retrying) due to retrying config {:?}",
message, Self::CONST_CONFIG.retrying_strategy)) )
.into_result()
},
RetryingStrategies::RetrySleepingArithmetically(steps) => {
retryable
.map_input(|message| ( message, SystemTime::now()) )
.retry_with(|(message, retry_start)|
self.channel.send(message)
.map_input(|message| (message, retry_start) )
)
.with_delays((10..=(10*steps as u64)).step_by(10).map(Duration::from_millis))
.map_input_and_errors(
|(message, retry_start), _fatal_err|
Self::retry_error_mapper(true, format!("sync-Sending '{:?}' failed. Connection will be aborted (after exhausting all retries in {:?}) due to retrying config {:?}",
message, retry_start.elapsed(), Self::CONST_CONFIG.retrying_strategy)),
|_| (false, String::with_capacity(0)) )
.into()
},
RetryingStrategies::RetryYieldingForUpToMillis(millis) => {
retryable
.map_input(|message| ( message, SystemTime::now()) )
.retry_with(|(message, retry_start)|
self.channel.send(message)
.map_input(|message| (message, retry_start) )
)
.spinning_until_timeout(Duration::from_millis(millis as u64), ())
.map_input_and_errors(
|(message, retry_start), _fatal_err|
Self::retry_error_mapper(true, format!("sync-Sending '{:?}' failed. Connection will be aborted (after exhausting all retries in {:?}) due to retrying config {:?}",
message, retry_start.elapsed(), Self::CONST_CONFIG.retrying_strategy)),
|_| (false, String::with_capacity(0)) )
.into()
},
RetryingStrategies::RetrySpinningForUpToMillis(_millis) => {
unreachable!()
},
}
}
#[inline(always)]
pub async fn send_async_trait(&self,
message: LocalMessages)
-> Result<(), (bool, String)> {
let retryable = self.channel.send(message);
match Self::CONST_CONFIG.retrying_strategy {
RetryingStrategies::DoNotRetry => {
retryable
.map_input_and_errors(
Self::first_attempt_error_mapper,
|message, _err|
Self::retry_error_mapper(false, format!("async-Sending '{:?}' failed. Won't retry (ignoring the error) due to retrying config {:?}",
message, Self::CONST_CONFIG.retrying_strategy)) )
.into_result()
},
RetryingStrategies::EndCommunications => {
retryable
.map_input_and_errors(
Self::first_attempt_error_mapper,
|message, _err|
Self::retry_error_mapper(true, format!("async-Sending '{:?}' failed. Connection will be aborted (without retrying) due to retrying config {:?}",
message, Self::CONST_CONFIG.retrying_strategy)) )
.into_result()
},
RetryingStrategies::RetrySleepingArithmetically(steps) => {
retryable
.map_input(|message| ( message, SystemTime::now()) )
.retry_with_async(|(message, retry_start)| future::ready(
self.channel.send(message)
.map_input(|message| (message, retry_start) )
))
.with_delays((10..=(10*steps as u64)).step_by(10).map(Duration::from_millis))
.await
.map_input_and_errors(
|(message, retry_start), _fatal_err|
Self::retry_error_mapper(true, format!("async-Sending '{:?}' failed. Connection will be aborted (after exhausting all retries in {:?}) due to retrying config {:?}",
message, retry_start.elapsed(), Self::CONST_CONFIG.retrying_strategy)),
|_| (false, String::with_capacity(0)) )
.into()
},
RetryingStrategies::RetryYieldingForUpToMillis(millis) => {
retryable
.map_input(|message| ( message, SystemTime::now()) )
.retry_with_async(|(message, retry_start)| future::ready(
self.channel.send(message)
.map_input(|message| (message, retry_start) )
))
.yielding_until_timeout(Duration::from_millis(millis as u64), || ())
.await
.map_input_and_errors(
|(message, retry_start), _fatal_err|
Self::retry_error_mapper(true, format!("async-Sending '{:?}' failed. Connection will be aborted (after exhausting all retries in {:?}) due to retrying config {:?}",
message, retry_start.elapsed(), Self::CONST_CONFIG.retrying_strategy)),
|_| (false, String::with_capacity(0)) )
.into()
},
RetryingStrategies::RetrySpinningForUpToMillis(_millis) => {
unreachable!()
},
}
}
fn retry_error_mapper(abort: bool, error_msg: String) -> ((), (bool, String) ) {
( (), (abort, error_msg) )
}
fn first_attempt_error_mapper<T>(_: T, _: ()) -> ((), (bool, String) ) {
panic!("reactive-messaging: send_to_remote_peer(): BUG! `Uni` channel is expected never to fail fatably. Please, fix!")
}
}
#[cfg(any(test,doc))]
mod tests {
use crate::{ReactiveMessagingDeserializer, ReactiveMessagingSerializer};
use crate::types::ResponsiveMessages;
impl ReactiveMessagingSerializer<String> for String {
#[inline(always)]
fn serialize(message: &String, buffer: &mut Vec<u8>) {
buffer.clear();
buffer.extend_from_slice(message.as_bytes());
}
#[inline(always)]
fn processor_error_message(err: String) -> String {
let msg = format!("ServerBug! Please, fix! Error: {}", err);
panic!("SocketServerSerializer<String>::processor_error_message(): {}", msg);
}
}
impl ResponsiveMessages<String> for String {
#[inline(always)]
fn is_disconnect_message(processor_answer: &String) -> bool {
processor_answer.is_empty()
}
#[inline(always)]
fn is_no_answer_message(processor_answer: &String) -> bool {
processor_answer == "."
}
}
impl ReactiveMessagingDeserializer<String> for String {
#[inline(always)]
fn deserialize(message: &[u8]) -> Result<String, Box<dyn std::error::Error + Sync + Send + 'static>> {
Ok(String::from_utf8_lossy(message).to_string())
}
}
}