use crate::{
    config::{
        ConstConfig,
        RetryingStrategies,
    },
};
use reactive_mutiny::prelude::{GenericUni, MutinyStream,FullDuplexUniChannel};
use std::{
    fmt::Debug,
    future::{self},
    sync::Arc,
    time::{Duration, SystemTime},
};
use keen_retry::ExponentialJitter;
use crate::serde::ReactiveMessagingSerializer;
pub fn upgrade_processor_uni_retrying_logic<const CONFIG: u64,
                                            ItemType:        Send + Sync + Debug + 'static,
                                            DerivedItemType: Send + Sync + Debug + 'static,
                                            OriginalUni:     GenericUni<ItemType=ItemType, DerivedItemType=DerivedItemType> + Send + Sync>
                                           (running_uni: Arc<OriginalUni>)
                                           -> ReactiveMessagingUniSender<CONFIG, ItemType, DerivedItemType, OriginalUni> {
    ReactiveMessagingUniSender::<CONFIG,
                                 ItemType,
                                 DerivedItemType,
                                 OriginalUni>::new(running_uni)
}
pub struct ReactiveMessagingUniSender<const CONFIG: u64,
                                      RemoteMessages:         Send + Sync + Debug + 'static,
                                      ConsumedRemoteMessages: Send + Sync + Debug + 'static,
                                      OriginalUni:            GenericUni<ItemType=RemoteMessages, DerivedItemType=ConsumedRemoteMessages> + Send + Sync> {
    uni: Arc<OriginalUni>,
}
impl<const CONFIG: u64,
     RemoteMessages:         Send + Sync + Debug + 'static,
     ConsumedRemoteMessages: Send + Sync + Debug + 'static,
     OriginalUni:            GenericUni<ItemType=RemoteMessages, DerivedItemType=ConsumedRemoteMessages> + Send + Sync>
ReactiveMessagingUniSender<CONFIG, RemoteMessages, ConsumedRemoteMessages, OriginalUni> {
    const CONST_CONFIG: ConstConfig = ConstConfig::from(CONFIG);
    pub fn new(running_uni: Arc<OriginalUni>) -> Self {
        Self {
            uni: running_uni,
        }
    }
    fn retry_error_mapper(abort: bool, error_msg: String) -> ((), (bool, String) ) {
        ( (), (abort, error_msg) )
    }
    fn first_attempt_error_mapper<T>(_: T, _: ()) -> ((), (bool, String) ) {
        panic!("reactive-messaging: send_to_local_processor(): BUG! `Uni` channel is expected never to fail fatably. Please, fix!")
    }
    #[inline(always)]
    pub async fn send(&self,
                      message: RemoteMessages)
                     -> Result<(), (bool, String)> {
        let retryable = self.uni.send(message);
        match Self::CONST_CONFIG.retrying_strategy {
            RetryingStrategies::DoNotRetry => {
                retryable
                    .map_input_and_errors(
                        Self::first_attempt_error_mapper,
                        |message, _err|
                            Self::retry_error_mapper(false, format!("Relaying received message '{:?}' to the internal processor failed. Won't retry (ignoring the error) due to retrying config {:?}",
                                                                                    message, Self::CONST_CONFIG.retrying_strategy)) )
                    .into_result()
            },
            RetryingStrategies::EndCommunications => {
                retryable
                    .map_input_and_errors(
                        Self::first_attempt_error_mapper,
                        |message, _err|
                            Self::retry_error_mapper(false, format!("Relaying received message '{:?}' to the internal processor failed. Connection will be aborted (without retrying) due to retrying config {:?}",
                                                                                    message, Self::CONST_CONFIG.retrying_strategy)) )
                    .into_result()
            },
            RetryingStrategies::RetryWithBackoffUpTo(attempts) => {
                retryable
                    .map_input(|message| ( message, SystemTime::now()) )
                    .retry_with_async(|(message, retry_start)| future::ready(
                        self.uni.send(message)
                            .map_input(|message| (message, retry_start) )
                    ))
                    .with_exponential_jitter(|| ExponentialJitter::FromBackoffRange {
                        backoff_range_millis: 1..=(2.526_f32.powi(attempts as i32) as u32),
                        re_attempts: attempts,
                        jitter_ratio: 0.2,
                    })
                    .await
                    .map_input_and_errors(
                        |(message, retry_start), _fatal_err|
                            Self::retry_error_mapper(true, format!("Relaying received message '{:?}' to the internal processor failed. Connection will be aborted (after exhausting all retries in {:?}) due to retrying config {:?}",
                                                                                   message, retry_start.elapsed(), Self::CONST_CONFIG.retrying_strategy)),
                        |_| (false, String::with_capacity(0)) )
                    .into()
            },
            RetryingStrategies::RetryYieldingForUpToMillis(millis) => {
                retryable
                    .map_input(|message| ( message, SystemTime::now()) )
                    .retry_with_async(|(message, retry_start)| future::ready(
                        self.uni.send(message)
                            .map_input(|message| (message, retry_start) )
                    ))
                    .yielding_until_timeout(Duration::from_millis(millis as u64), || ())
                    .await
                    .map_input_and_errors(
                        |(message, retry_start), _fatal_err|
                            Self::retry_error_mapper(true, format!("Relaying received message '{:?}' to the internal processor failed. Connection will be aborted (after exhausting all retries in {:?}) due to retrying config {:?}",
                                                                                   message, retry_start.elapsed(), Self::CONST_CONFIG.retrying_strategy)),
                        |_| (false, String::with_capacity(0)) )
                    .into()
            },
            RetryingStrategies::RetrySpinningForUpToMillis(_millis) => {
                unreachable!()
            },
        }
    }
    #[inline(always)]
    pub fn pending_items_count(&self) -> u32 {
        self.uni.pending_items_count()
    }
    #[inline(always)]
    pub fn buffer_size(&self) -> u32 {
        self.uni.buffer_size()
    }
    pub async fn close(&self, timeout: Duration) -> bool {
        self.uni.close(timeout).await
    }
}
pub struct ReactiveMessagingSender<const CONFIG:    u64,
                                   LocalMessages:   ReactiveMessagingSerializer<LocalMessages>                                  + Send + Sync + PartialEq + Debug + 'static,
                                   OriginalChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync> {
    channel: Arc<OriginalChannel>,
}
impl<const CONFIG: u64,
     LocalMessages:   ReactiveMessagingSerializer<LocalMessages>                                  + Send + Sync + PartialEq + Debug,
     OriginalChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync>
ReactiveMessagingSender<CONFIG, LocalMessages, OriginalChannel> {
    pub const CONST_CONFIG: ConstConfig = ConstConfig::from(CONFIG);
    pub fn new<IntoString: Into<String>>(channel_name: IntoString) -> Self {
        Self {
            channel: OriginalChannel::new(channel_name.into()),
        }
    }
    pub fn create_stream(&self) -> (MutinyStream<'static, LocalMessages, OriginalChannel, LocalMessages>, u32) {
        self.channel.create_stream()
    }
    #[inline(always)]
    pub fn pending_items_count(&self) -> u32 {
        self.channel.pending_items_count()
    }
    #[inline(always)]
    pub fn buffer_size(&self) -> u32 {
        self.channel.buffer_size()
    }
    pub async fn flush_and_close(&self, timeout: Duration) -> u32 {
        self.channel.gracefully_end_all_streams(timeout).await
    }
    pub fn cancel_and_close(&self) {
        self.channel.cancel_all_streams();
    }
    #[inline(always)]
    pub fn send(&self,
                message: LocalMessages)
               -> Result<(), (bool, String)> {
        let retryable = self.channel.send(message);
        match Self::CONST_CONFIG.retrying_strategy {
            RetryingStrategies::DoNotRetry => {
                retryable
                    .map_input_and_errors(
                        Self::first_attempt_error_mapper,
                        |message, _err|
                            Self::retry_error_mapper(false, format!("sync-Sending '{:?}' failed. Won't retry (ignoring the error) due to retrying config {:?}",
                                                                                    message, Self::CONST_CONFIG.retrying_strategy)) )
                    .into_result()
            },
            RetryingStrategies::EndCommunications => {
                retryable
                    .map_input_and_errors(
                        Self::first_attempt_error_mapper,
                        |message, _err|
                            Self::retry_error_mapper(true, format!("sync-Sending '{:?}' failed. Connection will be aborted (without retrying) due to retrying config {:?}",
                                                                                   message, Self::CONST_CONFIG.retrying_strategy)) )
                    .into_result()
            },
            RetryingStrategies::RetryWithBackoffUpTo(attempts) => {
                retryable
                    .map_input(|message| ( message, SystemTime::now()) )
                    .retry_with(|(message, retry_start)|
                        self.channel.send(message)
                            .map_input(|message| (message, retry_start) )
                    )
                    .with_exponential_jitter(|| ExponentialJitter::FromBackoffRange {
                        backoff_range_millis: 1..=(2.526_f32.powi(attempts as i32) as u32),
                        re_attempts: attempts,
                        jitter_ratio: 0.2,
                    })
                    .map_input_and_errors(
                        |(message, retry_start), _fatal_err|
                            Self::retry_error_mapper(true, format!("sync-Sending '{:?}' failed. Connection will be aborted (after exhausting all retries in {:?}) due to retrying config {:?}",
                                                                                   message, retry_start.elapsed(), Self::CONST_CONFIG.retrying_strategy)),
                        |_| (false, String::with_capacity(0)) )
                    .into()
            },
            RetryingStrategies::RetryYieldingForUpToMillis(millis) => {
                retryable
                    .map_input(|message| ( message, SystemTime::now()) )
                    .retry_with(|(message, retry_start)|
                        self.channel.send(message)
                            .map_input(|message| (message, retry_start) )
                    )
                    .spinning_until_timeout(Duration::from_millis(millis as u64), ())
                    .map_input_and_errors(
                        |(message, retry_start), _fatal_err|
                            Self::retry_error_mapper(true, format!("sync-Sending '{:?}' failed. Connection will be aborted (after exhausting all retries in {:?}) due to retrying config {:?}",
                                                                                   message, retry_start.elapsed(), Self::CONST_CONFIG.retrying_strategy)),
                        |_| (false, String::with_capacity(0)) )
                    .into()
            },
            RetryingStrategies::RetrySpinningForUpToMillis(_millis) => {
                unreachable!()
            },
        }
    }
    #[inline(always)]
    pub async fn send_async_trait(&self,
                                  message: LocalMessages)
                                 -> Result<(), (bool, String)> {
        let retryable = self.channel.send(message);
        match Self::CONST_CONFIG.retrying_strategy {
            RetryingStrategies::DoNotRetry => {
                retryable
                    .map_input_and_errors(
                        Self::first_attempt_error_mapper,
                        |message, _err|
                            Self::retry_error_mapper(false, format!("async-Sending '{:?}' failed. Won't retry (ignoring the error) due to retrying config {:?}",
                                                                                    message, Self::CONST_CONFIG.retrying_strategy)) )
                    .into_result()
            },
            RetryingStrategies::EndCommunications => {
                retryable
                    .map_input_and_errors(
                        Self::first_attempt_error_mapper,
                        |message, _err|
                            Self::retry_error_mapper(true, format!("async-Sending '{:?}' failed. Connection will be aborted (without retrying) due to retrying config {:?}",
                                                                                   message, Self::CONST_CONFIG.retrying_strategy)) )
                    .into_result()
            },
            RetryingStrategies::RetryWithBackoffUpTo(attempts) => {
                retryable
                    .map_input(|message| ( message, SystemTime::now()) )
                    .retry_with_async(|(message, retry_start)| future::ready(
                        self.channel.send(message)
                            .map_input(|message| (message, retry_start) )
                    ))
                    .with_exponential_jitter(|| ExponentialJitter::FromBackoffRange {
                        backoff_range_millis: 1..=(2.526_f32.powi(attempts as i32) as u32),
                        re_attempts: attempts,
                        jitter_ratio: 0.2,
                    })
                    .await
                    .map_input_and_errors(
                        |(message, retry_start), _fatal_err|
                            Self::retry_error_mapper(true, format!("async-Sending '{:?}' failed. Connection will be aborted (after exhausting all retries in {:?}) due to retrying config {:?}",
                                                                                   message, retry_start.elapsed(), Self::CONST_CONFIG.retrying_strategy)),
                        |_| (false, String::with_capacity(0)) )
                    .into()
            },
            RetryingStrategies::RetryYieldingForUpToMillis(millis) => {
                retryable
                    .map_input(|message| ( message, SystemTime::now()) )
                    .retry_with_async(|(message, retry_start)| future::ready(
                        self.channel.send(message)
                            .map_input(|message| (message, retry_start) )
                    ))
                    .yielding_until_timeout(Duration::from_millis(millis as u64), || ())
                    .await
                    .map_input_and_errors(
                        |(message, retry_start), _fatal_err|
                            Self::retry_error_mapper(true, format!("async-Sending '{:?}' failed. Connection will be aborted (after exhausting all retries in {:?}) due to retrying config {:?}",
                                                                                   message, retry_start.elapsed(), Self::CONST_CONFIG.retrying_strategy)),
                        |_| (false, String::with_capacity(0)) )
                    .into()
            },
            RetryingStrategies::RetrySpinningForUpToMillis(_millis) => {
                unreachable!()
            },
        }
    }
    fn retry_error_mapper(abort: bool, error_msg: String) -> ((), (bool, String) ) {
        ( (), (abort, error_msg) )
    }
    fn first_attempt_error_mapper<T>(_: T, _: ()) -> ((), (bool, String) ) {
        panic!("reactive-messaging: send_to_remote_peer(): BUG! `Uni` channel is expected never to fail fatably. Please, fix!")
    }
}
#[cfg(any(test,doc))]
mod tests {
    use crate::serde::{ReactiveMessagingDeserializer, ReactiveMessagingSerializer};
    use crate::types::ResponsiveMessages;
    impl ReactiveMessagingSerializer<String> for String {
        #[inline(always)]
        fn serialize(message: &String, buffer: &mut Vec<u8>) {
            buffer.clear();
            buffer.extend_from_slice(message.as_bytes());
        }
        #[inline(always)]
        fn processor_error_message(err: String) -> String {
            let msg = format!("ServerBug! Please, fix! Error: {}", err);
            panic!("SocketServerSerializer<String>::processor_error_message(): {}", msg);
            }
    }
    impl ResponsiveMessages<String> for String {
        #[inline(always)]
        fn is_disconnect_message(processor_answer: &String) -> bool {
            processor_answer.is_empty()
        }
        #[inline(always)]
        fn is_no_answer_message(processor_answer: &String) -> bool {
            processor_answer == "."
        }
    }
    impl ReactiveMessagingDeserializer<String> for String {
        #[inline(always)]
        fn deserialize(message: &[u8]) -> Result<String, Box<dyn std::error::Error + Sync + Send + 'static>> {
            Ok(String::from_utf8_lossy(message).to_string())
        }
    }
}