use crate::{
    types::ConnectionEvent,
    prelude::ProcessorRemoteStreamType,
    socket_connection_handler::{self, Peer},
    ReactiveMessagingDeserializer,
    ResponsiveMessages, ReactiveMessagingSerializer,
};
use std::{
    fmt::Debug,
    future::Future,
    sync::{
        Arc, 
        atomic::AtomicBool,
        atomic::Ordering::Relaxed,
    },
};
use futures::Stream;
use tokio::sync::oneshot::Sender;
use log::warn;
#[derive(Debug)]
pub struct SocketClient {
    connected: Arc<AtomicBool>,
    ip:        String,
    port:      u16,
    processor_shutdown_signaler: Sender<u32>,
}
impl SocketClient {
    #[must_use = "the client won't do a thing if its value isn't hold until the disconnection time"]
    pub async fn spawn_responsive_processor<ServerMessages:                 ReactiveMessagingDeserializer<ServerMessages> + Send + Sync + PartialEq + Debug + 'static,
                                            ClientMessages:                 ReactiveMessagingSerializer<ClientMessages>   +
                                                                            ResponsiveMessages<ClientMessages>            + Send + Sync + PartialEq + Debug + 'static,
                                            ConnectionEventsCallbackFuture: Future<Output=()>                             + Send,
                                            ClientStreamType:               Stream<Item=ClientMessages>                   + Send + 'static,
                                            IntoString:                     Into<String>>
                                           (ip:                         IntoString,
                                            port:                       u16,
                                            connection_events_callback: impl Fn(ConnectionEvent<ClientMessages>) -> ConnectionEventsCallbackFuture + Send + Sync + 'static,
                                            processor_stream_builder:   impl Fn(String, u16, Arc<Peer<ClientMessages>>, ProcessorRemoteStreamType<ServerMessages>) -> ClientStreamType + Send + Sync + 'static)
                                           -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
        let ip = ip.into();
        let (processor_shutdown_sender, processor_shutdown_receiver) = tokio::sync::oneshot::channel::<u32>();
        let connected_state = Arc::new(AtomicBool::new(false));
        let connection_events_callback = upgrade_to_connected_state_tracking(&connected_state, connection_events_callback);
        socket_connection_handler::client_for_responsive_text_protocol(ip.clone(), port, processor_shutdown_receiver, connection_events_callback, processor_stream_builder).await?;
        let socket_client = Self { connected: connected_state, ip, port, processor_shutdown_signaler: processor_shutdown_sender };
        Ok(socket_client)
    }
    #[must_use = "the client won't do a thing if its value isn't hold until the disconnection time"]
    pub async fn spawn_unresponsive_processor<ServerMessages:                 ReactiveMessagingDeserializer<ServerMessages> + Send + Sync + PartialEq + Debug + 'static,
                                              ClientMessages:                 ReactiveMessagingSerializer<ClientMessages>   + Send + Sync + PartialEq + Debug + 'static,
                                              OutputStreamItemsType:                                                          Send + Sync             + Debug + 'static,
                                              OutputStreamType:               Stream<Item=OutputStreamItemsType>            + Send                            + 'static,
                                              ConnectionEventsCallbackFuture: Future<Output=()>                             + Send,
                                              IntoString:                     Into<String>>
                                             (ip:                         IntoString,
                                              port:                       u16,
                                              connection_events_callback: impl Fn(ConnectionEvent<ClientMessages>) -> ConnectionEventsCallbackFuture + Send + Sync + 'static,
                                              processor_stream_builder:   impl Fn(String, u16, Arc<Peer<ClientMessages>>, ProcessorRemoteStreamType<ServerMessages>) -> OutputStreamType + Send + Sync + 'static)
                                             -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
        let ip = ip.into();
        let (processor_shutdown_sender, processor_shutdown_receiver) = tokio::sync::oneshot::channel::<u32>();
        let connected_state = Arc::new(AtomicBool::new(false));
        let connection_events_callback = upgrade_to_connected_state_tracking(&connected_state, connection_events_callback);
        socket_connection_handler::client_for_unresponsive_text_protocol(ip.clone(), port, processor_shutdown_receiver, connection_events_callback, processor_stream_builder).await?;
        let socket_client = Self { connected: connected_state, ip, port, processor_shutdown_signaler: processor_shutdown_sender };
        Ok(socket_client)
    }
    pub fn is_connected(&self) -> bool {
        self.connected.load(Relaxed)
    }
    pub fn shutdown(self, timeout_ms: u32) -> Result<(), Box<dyn std::error::Error>> {
        warn!("Socket Client: Shutdown asked & initiated for client connected @ {}:{} -- timeout: {timeout_ms}ms", self.ip, self.port);
        if let Err(_err) = self.processor_shutdown_signaler.send(timeout_ms) {
            Err(Box::from("Socket Client BUG: couldn't send shutdown signal to the network loop. Program is, likely, hanged. Please, investigate and fix!"))
        } else {
            Ok(())
        }
    }
}
fn upgrade_to_connected_state_tracking<ClientMessages:                 ReactiveMessagingSerializer<ClientMessages>   + Send + Sync + PartialEq + Debug + 'static,
                                       ConnectionEventsCallbackFuture: Future<Output=()>                        + Send>
                                      (connected_state:                          &Arc<AtomicBool>,
                                       user_provided_connection_events_callback: impl Fn(ConnectionEvent<ClientMessages>) -> ConnectionEventsCallbackFuture + Send + Sync + 'static)
                                      -> impl Fn(ConnectionEvent<ClientMessages>) -> ConnectionEventsCallbackFuture + Send + Sync + 'static {
    let connected_state = Arc::clone(connected_state);
    move |connection_event | {
        if let ConnectionEvent::PeerConnected { .. } = connection_event {
            connected_state.store(true, Relaxed);
        } else if let ConnectionEvent::PeerDisconnected {..} = connection_event {
            connected_state.store(false, Relaxed);
        }
        user_provided_connection_events_callback(connection_event)
    }
}