use std::{cell::RefCell, net::SocketAddr, rc::Rc};
use mio::{net::TcpStream, Token};
use rusty_ulid::Ulid;
use crate::{
    backends::Backend,
    logs::{Endpoint, LogContext, RequestRecord},
    pool::Checkout,
    protocol::SessionState,
    socket::{stats::socket_rtt, SocketHandler, SocketResult, TransportProtocol},
    sozu_command::ready::Ready,
    timer::TimeoutContainer,
    L7Proxy, ListenerHandler, Protocol, Readiness, SessionMetrics, SessionResult, StateResult,
};
#[derive(PartialEq, Eq)]
pub enum SessionStatus {
    Normal,
    DefaultAnswer,
}
#[derive(Copy, Clone, Debug)]
enum ConnectionStatus {
    Normal,
    ReadOpen,
    WriteOpen,
    Closed,
}
pub struct Pipe<Front: SocketHandler, L: ListenerHandler> {
    backend_buffer: Checkout,
    backend_id: Option<String>,
    pub backend_readiness: Readiness,
    backend_socket: Option<TcpStream>,
    backend_status: ConnectionStatus,
    backend_token: Option<Token>,
    pub backend: Option<Rc<RefCell<Backend>>>,
    cluster_id: Option<String>,
    pub container_backend_timeout: Option<TimeoutContainer>,
    pub container_frontend_timeout: Option<TimeoutContainer>,
    frontend_buffer: Checkout,
    pub frontend_readiness: Readiness,
    frontend_status: ConnectionStatus,
    frontend_token: Token,
    frontend: Front,
    listener: Rc<RefCell<L>>,
    protocol: Protocol,
    request_id: Ulid,
    session_address: Option<SocketAddr>,
    websocket_context: Option<String>,
}
impl<Front: SocketHandler, L: ListenerHandler> Pipe<Front, L> {
    #[allow(clippy::too_many_arguments)]
    pub fn new(
        backend_buffer: Checkout,
        backend_id: Option<String>,
        backend_socket: Option<TcpStream>,
        backend: Option<Rc<RefCell<Backend>>>,
        container_backend_timeout: Option<TimeoutContainer>,
        container_frontend_timeout: Option<TimeoutContainer>,
        cluster_id: Option<String>,
        frontend_buffer: Checkout,
        frontend_token: Token,
        frontend: Front,
        listener: Rc<RefCell<L>>,
        protocol: Protocol,
        request_id: Ulid,
        session_address: Option<SocketAddr>,
        websocket_context: Option<String>,
    ) -> Pipe<Front, L> {
        let frontend_status = ConnectionStatus::Normal;
        let backend_status = if backend_socket.is_none() {
            ConnectionStatus::Closed
        } else {
            ConnectionStatus::Normal
        };
        let session = Pipe {
            backend_buffer,
            backend_id,
            backend_readiness: Readiness {
                interest: Ready::READABLE | Ready::WRITABLE | Ready::HUP | Ready::ERROR,
                event: Ready::EMPTY,
            },
            backend_socket,
            backend_status,
            backend_token: None,
            backend,
            cluster_id,
            container_backend_timeout,
            container_frontend_timeout,
            frontend_buffer,
            frontend_readiness: Readiness {
                interest: Ready::READABLE | Ready::WRITABLE | Ready::HUP | Ready::ERROR,
                event: Ready::EMPTY,
            },
            frontend_status,
            frontend_token,
            frontend,
            listener,
            protocol,
            request_id,
            session_address,
            websocket_context,
        };
        trace!("created pipe");
        session
    }
    pub fn front_socket(&self) -> &TcpStream {
        self.frontend.socket_ref()
    }
    pub fn front_socket_mut(&mut self) -> &mut TcpStream {
        self.frontend.socket_mut()
    }
    pub fn back_socket(&self) -> Option<&TcpStream> {
        self.backend_socket.as_ref()
    }
    pub fn back_socket_mut(&mut self) -> Option<&mut TcpStream> {
        self.backend_socket.as_mut()
    }
    pub fn set_back_socket(&mut self, socket: TcpStream) {
        self.backend_socket = Some(socket);
        self.backend_status = ConnectionStatus::Normal;
    }
    pub fn back_token(&self) -> Vec<Token> {
        self.backend_token.iter().cloned().collect()
    }
    fn reset_timeouts(&mut self) {
        if let Some(t) = self.container_frontend_timeout.as_mut() {
            if !t.reset() {
                error!("{} could not reset front timeout (pipe)", self.request_id);
            }
        }
        if let Some(t) = self.container_backend_timeout.as_mut() {
            if !t.reset() {
                error!("{} could not reset back timeout (pipe)", self.request_id);
            }
        }
    }
    pub fn set_cluster_id(&mut self, cluster_id: Option<String>) {
        self.cluster_id = cluster_id;
    }
    pub fn set_backend_id(&mut self, backend_id: Option<String>) {
        self.backend_id = backend_id;
    }
    pub fn set_back_token(&mut self, token: Token) {
        self.backend_token = Some(token);
    }
    pub fn get_session_address(&self) -> Option<SocketAddr> {
        self.session_address
            .or_else(|| self.frontend.socket_ref().peer_addr().ok())
    }
    pub fn get_backend_address(&self) -> Option<SocketAddr> {
        self.backend_socket
            .as_ref()
            .and_then(|backend| backend.peer_addr().ok())
    }
    fn protocol_string(&self) -> &'static str {
        match self.protocol {
            Protocol::TCP => "TCP",
            Protocol::HTTP => "WS",
            Protocol::HTTPS => match self.frontend.protocol() {
                TransportProtocol::Ssl2 => "WSS-SSL2",
                TransportProtocol::Ssl3 => "WSS-SSL3",
                TransportProtocol::Tls1_0 => "WSS-TLS1.0",
                TransportProtocol::Tls1_1 => "WSS-TLS1.1",
                TransportProtocol::Tls1_2 => "WSS-TLS1.2",
                TransportProtocol::Tls1_3 => "WSS-TLS1.3",
                _ => unreachable!(),
            },
            _ => unreachable!(),
        }
    }
    pub fn log_request(&self, metrics: &SessionMetrics, message: Option<&str>) {
        let listener = self.listener.borrow();
        RequestRecord {
            error: message,
            context: self.log_context(),
            session_address: self.get_session_address(),
            backend_address: self.get_backend_address(),
            protocol: self.protocol_string(),
            endpoint: Endpoint::Tcp {
                context: self.websocket_context.as_deref(),
            },
            tags: listener.get_concatenated_tags(&listener.get_addr().to_string()),
            client_rtt: socket_rtt(self.front_socket()),
            server_rtt: self.backend_socket.as_ref().and_then(socket_rtt),
            metrics,
            user_agent: None,
        }
        .log();
    }
    pub fn log_request_success(&self, metrics: &SessionMetrics) {
        self.log_request(metrics, None);
    }
    pub fn log_request_error(&self, metrics: &SessionMetrics, message: &str) {
        incr!("pipe.errors");
        self.log_request(metrics, Some(message));
    }
    pub fn check_connections(&self) -> bool {
        match (self.frontend_status, self.backend_status) {
            (ConnectionStatus::Normal, ConnectionStatus::WriteOpen) => {
                self.frontend_readiness.event.is_readable()
                    || self.frontend_buffer.available_data() > 0
                    || self.backend_buffer.available_data() > 0
            }
            (ConnectionStatus::Normal, ConnectionStatus::Closed) => {
                self.backend_buffer.available_data() > 0
            }
            (ConnectionStatus::WriteOpen, ConnectionStatus::Normal) => {
                self.backend_readiness.event.is_readable()
                    || self.backend_buffer.available_data() > 0
                    || self.frontend_buffer.available_data() > 0
            }
            (ConnectionStatus::WriteOpen, ConnectionStatus::WriteOpen) => {
                self.frontend_buffer.available_data() > 0
                    || self.backend_buffer.available_data() > 0
            }
            (ConnectionStatus::WriteOpen, ConnectionStatus::Closed) => {
                self.backend_buffer.available_data() > 0
            }
            (ConnectionStatus::ReadOpen, ConnectionStatus::ReadOpen) => false,
            (ConnectionStatus::ReadOpen, ConnectionStatus::Closed) => false,
            (ConnectionStatus::Closed, ConnectionStatus::Normal) => {
                self.frontend_buffer.available_data() > 0
            }
            (ConnectionStatus::Closed, ConnectionStatus::ReadOpen) => false,
            (ConnectionStatus::Closed, ConnectionStatus::WriteOpen) => {
                self.frontend_buffer.available_data() > 0
            }
            (ConnectionStatus::Closed, ConnectionStatus::Closed) => false,
            _ => true,
        }
    }
    pub fn frontend_hup(&mut self, metrics: &mut SessionMetrics) -> StateResult {
        self.log_request_success(metrics);
        self.frontend_status = ConnectionStatus::Closed;
        StateResult::CloseSession
    }
    pub fn backend_hup(&mut self, metrics: &mut SessionMetrics) -> StateResult {
        self.backend_status = ConnectionStatus::Closed;
        if self.backend_buffer.available_data() == 0 {
            if self.backend_readiness.event.is_readable() {
                self.backend_readiness.interest.insert(Ready::READABLE);
                error!("Pipe::back_hup: backend connection closed but the kernel still holds some data. readiness: {:?} -> {:?}", self.frontend_readiness, self.backend_readiness);
                StateResult::Continue
            } else {
                self.log_request_success(metrics);
                StateResult::CloseSession
            }
        } else {
            self.frontend_readiness.interest.insert(Ready::WRITABLE);
            if self.backend_readiness.event.is_readable() {
                self.backend_readiness.interest.insert(Ready::READABLE);
            }
            StateResult::Continue
        }
    }
    pub fn readable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
        self.reset_timeouts();
        trace!("pipe readable");
        if self.frontend_buffer.available_space() == 0 {
            self.frontend_readiness.interest.remove(Ready::READABLE);
            self.backend_readiness.interest.insert(Ready::WRITABLE);
            return StateResult::Continue;
        }
        let (sz, res) = self.frontend.socket_read(self.frontend_buffer.space());
        debug!(
            "{}\tFRONT [{:?}]: read {} bytes",
            self.log_context(),
            self.frontend_token,
            sz
        );
        if sz > 0 {
            self.frontend_buffer.fill(sz);
            count!("bytes_in", sz as i64);
            metrics.bin += sz;
            if self.frontend_buffer.available_space() == 0 {
                self.frontend_readiness.interest.remove(Ready::READABLE);
            }
            self.backend_readiness.interest.insert(Ready::WRITABLE);
        } else {
            self.frontend_readiness.event.remove(Ready::READABLE);
            if res == SocketResult::Continue {
                self.frontend_status = match self.frontend_status {
                    ConnectionStatus::Normal => ConnectionStatus::WriteOpen,
                    ConnectionStatus::ReadOpen => ConnectionStatus::Closed,
                    s => s,
                };
            }
        }
        if !self.check_connections() {
            self.frontend_readiness.reset();
            self.backend_readiness.reset();
            self.log_request_success(metrics);
            return StateResult::CloseSession;
        }
        match res {
            SocketResult::Error => {
                self.frontend_readiness.reset();
                self.backend_readiness.reset();
                self.log_request_error(metrics, "front socket read error");
                return StateResult::CloseSession;
            }
            SocketResult::Closed => {
                self.frontend_readiness.reset();
                self.backend_readiness.reset();
                self.log_request_success(metrics);
                return StateResult::CloseSession;
            }
            SocketResult::WouldBlock => {
                self.frontend_readiness.event.remove(Ready::READABLE);
            }
            SocketResult::Continue => {}
        };
        self.backend_readiness.interest.insert(Ready::WRITABLE);
        StateResult::Continue
    }
    pub fn writable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
        trace!("pipe writable");
        if self.backend_buffer.available_data() == 0 {
            self.backend_readiness.interest.insert(Ready::READABLE);
            self.frontend_readiness.interest.remove(Ready::WRITABLE);
            return StateResult::Continue;
        }
        let mut sz = 0usize;
        let mut res = SocketResult::Continue;
        while res == SocketResult::Continue {
            if self.backend_buffer.available_data() == 0 {
                count!("bytes_out", sz as i64);
                metrics.bout += sz;
                self.backend_readiness.interest.insert(Ready::READABLE);
                self.frontend_readiness.interest.remove(Ready::WRITABLE);
                return StateResult::Continue;
            }
            let (current_sz, current_res) = self.frontend.socket_write(self.backend_buffer.data());
            res = current_res;
            self.backend_buffer.consume(current_sz);
            sz += current_sz;
            if current_sz == 0 && res == SocketResult::Continue {
                self.frontend_status = match self.frontend_status {
                    ConnectionStatus::Normal => ConnectionStatus::ReadOpen,
                    ConnectionStatus::WriteOpen => ConnectionStatus::Closed,
                    s => s,
                };
            }
            if !self.check_connections() {
                metrics.bout += sz;
                count!("bytes_out", sz as i64);
                self.frontend_readiness.reset();
                self.backend_readiness.reset();
                self.log_request_success(metrics);
                return StateResult::CloseSession;
            }
        }
        if sz > 0 {
            count!("bytes_out", sz as i64);
            self.backend_readiness.interest.insert(Ready::READABLE);
            metrics.bout += sz;
        }
        debug!(
            "{}\tFRONT [{}<-{:?}]: wrote {} bytes of {}",
            self.log_context(),
            self.frontend_token.0,
            self.backend_token,
            sz,
            self.backend_buffer.available_data()
        );
        match res {
            SocketResult::Error => {
                self.frontend_readiness.reset();
                self.backend_readiness.reset();
                self.log_request_error(metrics, "front socket write error");
                return StateResult::CloseSession;
            }
            SocketResult::Closed => {
                self.frontend_readiness.reset();
                self.backend_readiness.reset();
                self.log_request_success(metrics);
                return StateResult::CloseSession;
            }
            SocketResult::WouldBlock => {
                self.frontend_readiness.event.remove(Ready::WRITABLE);
            }
            SocketResult::Continue => {}
        }
        StateResult::Continue
    }
    pub fn backend_writable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
        trace!("pipe back_writable");
        if self.frontend_buffer.available_data() == 0 {
            self.frontend_readiness.interest.insert(Ready::READABLE);
            self.backend_readiness.interest.remove(Ready::WRITABLE);
            return StateResult::Continue;
        }
        let output_size = self.frontend_buffer.available_data();
        let mut sz = 0usize;
        let mut socket_res = SocketResult::Continue;
        if let Some(ref mut backend) = self.backend_socket {
            while socket_res == SocketResult::Continue {
                if self.frontend_buffer.available_data() == 0 {
                    self.frontend_readiness.interest.insert(Ready::READABLE);
                    self.backend_readiness.interest.remove(Ready::WRITABLE);
                    return StateResult::Continue;
                }
                let (current_sz, current_res) = backend.socket_write(self.frontend_buffer.data());
                socket_res = current_res;
                self.frontend_buffer.consume(current_sz);
                sz += current_sz;
                if current_sz == 0 && current_res == SocketResult::Continue {
                    self.backend_status = match self.backend_status {
                        ConnectionStatus::Normal => ConnectionStatus::ReadOpen,
                        ConnectionStatus::WriteOpen => ConnectionStatus::Closed,
                        s => s,
                    };
                }
            }
        }
        metrics.backend_bout += sz;
        if !self.check_connections() {
            self.frontend_readiness.reset();
            self.backend_readiness.reset();
            self.log_request_success(metrics);
            return StateResult::CloseSession;
        }
        debug!(
            "{}\tBACK [{:?}->{}]: wrote {} bytes of {}",
            self.log_context(),
            self.backend_token,
            self.frontend_token.0,
            sz,
            output_size
        );
        match socket_res {
            SocketResult::Error => {
                self.frontend_readiness.reset();
                self.backend_readiness.reset();
                self.log_request_error(metrics, "back socket write error");
                return StateResult::CloseSession;
            }
            SocketResult::Closed => {
                self.frontend_readiness.reset();
                self.backend_readiness.reset();
                self.log_request_success(metrics);
                return StateResult::CloseSession;
            }
            SocketResult::WouldBlock => {
                self.backend_readiness.event.remove(Ready::WRITABLE);
            }
            SocketResult::Continue => {}
        }
        StateResult::Continue
    }
    pub fn backend_readable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
        self.reset_timeouts();
        trace!("pipe back_readable");
        if self.backend_buffer.available_space() == 0 {
            self.backend_readiness.interest.remove(Ready::READABLE);
            return StateResult::Continue;
        }
        if let Some(ref mut backend) = self.backend_socket {
            let (size, remaining) = backend.socket_read(self.backend_buffer.space());
            self.backend_buffer.fill(size);
            debug!(
                "{}\tBACK  [{}<-{:?}]: read {} bytes",
                self.log_context(),
                self.frontend_token.0,
                self.backend_token,
                size
            );
            if remaining != SocketResult::Continue || size == 0 {
                self.backend_readiness.event.remove(Ready::READABLE);
            }
            if size > 0 {
                self.frontend_readiness.interest.insert(Ready::WRITABLE);
                metrics.backend_bin += size;
            }
            if size == 0 && remaining == SocketResult::Closed {
                self.backend_status = match self.backend_status {
                    ConnectionStatus::Normal => ConnectionStatus::WriteOpen,
                    ConnectionStatus::ReadOpen => ConnectionStatus::Closed,
                    s => s,
                };
                if !self.check_connections() {
                    self.frontend_readiness.reset();
                    self.backend_readiness.reset();
                    self.log_request_success(metrics);
                    return StateResult::CloseSession;
                }
            }
            match remaining {
                SocketResult::Error => {
                    self.frontend_readiness.reset();
                    self.backend_readiness.reset();
                    self.log_request_error(metrics, "back socket read error");
                    return StateResult::CloseSession;
                }
                SocketResult::Closed => {
                    if !self.check_connections() {
                        self.frontend_readiness.reset();
                        self.backend_readiness.reset();
                        self.log_request_success(metrics);
                        return StateResult::CloseSession;
                    }
                }
                SocketResult::WouldBlock => {
                    self.backend_readiness.event.remove(Ready::READABLE);
                }
                SocketResult::Continue => {}
            }
        }
        StateResult::Continue
    }
    pub fn log_context(&self) -> LogContext {
        LogContext {
            request_id: self.request_id,
            cluster_id: self.cluster_id.as_deref(),
            backend_id: self.backend_id.as_deref(),
        }
    }
}
impl<Front: SocketHandler, L: ListenerHandler> SessionState for Pipe<Front, L> {
    fn ready(
        &mut self,
        _session: Rc<RefCell<dyn crate::ProxySession>>,
        _proxy: Rc<RefCell<dyn crate::L7Proxy>>,
        metrics: &mut SessionMetrics,
    ) -> SessionResult {
        let mut counter = 0;
        let max_loop_iterations = 100000;
        if self.frontend_readiness.event.is_hup() {
            return SessionResult::Close;
        }
        let token = self.frontend_token;
        while counter < max_loop_iterations {
            let frontend_interest = self.frontend_readiness.filter_interest();
            let backend_interest = self.backend_readiness.filter_interest();
            trace!(
                "PROXY\t{} {:?} {:?} -> {:?}",
                self.log_context(),
                token,
                self.frontend_readiness,
                self.backend_readiness
            );
            if frontend_interest.is_empty() && backend_interest.is_empty() {
                break;
            }
            if self.backend_readiness.event.is_hup()
                && self.frontend_readiness.interest.is_writable()
                && !self.frontend_readiness.event.is_writable()
            {
                break;
            }
            if frontend_interest.is_readable()
                && self.readable(metrics) == StateResult::CloseSession
            {
                return SessionResult::Close;
            }
            if backend_interest.is_writable()
                && self.backend_writable(metrics) == StateResult::CloseSession
            {
                return SessionResult::Close;
            }
            if backend_interest.is_readable()
                && self.backend_readable(metrics) == StateResult::CloseSession
            {
                return SessionResult::Close;
            }
            if frontend_interest.is_writable()
                && self.writable(metrics) == StateResult::CloseSession
            {
                return SessionResult::Close;
            }
            if backend_interest.is_hup() && self.backend_hup(metrics) == StateResult::CloseSession {
                return SessionResult::Close;
            }
            if frontend_interest.is_error() {
                error!(
                    "PROXY session {:?} front error, disconnecting",
                    self.frontend_token
                );
                self.frontend_readiness.interest = Ready::EMPTY;
                self.backend_readiness.interest = Ready::EMPTY;
                return SessionResult::Close;
            }
            if backend_interest.is_error() && self.backend_hup(metrics) == StateResult::CloseSession
            {
                self.frontend_readiness.interest = Ready::EMPTY;
                self.backend_readiness.interest = Ready::EMPTY;
                error!(
                    "PROXY session {:?} back error, disconnecting",
                    self.frontend_token
                );
                return SessionResult::Close;
            }
            counter += 1;
        }
        if counter == max_loop_iterations {
            error!(
                "PROXY\thandling session {:?} went through {} iterations, there's a probable infinite loop bug, closing the connection",
                self.frontend_token, max_loop_iterations
            );
            incr!("http.infinite_loop.error");
            self.print_state(&format!("{:?}", self.protocol));
            return SessionResult::Close;
        }
        SessionResult::Continue
    }
    fn update_readiness(&mut self, token: Token, events: Ready) {
        if self.frontend_token == token {
            self.frontend_readiness.event |= events;
        } else if self.backend_token == Some(token) {
            self.backend_readiness.event |= events;
        }
    }
    fn timeout(&mut self, token: Token, metrics: &mut SessionMetrics) -> StateResult {
        if self.frontend_token == token {
            self.log_request_error(metrics, "front socket timeout");
            if let Some(timeout) = self.container_frontend_timeout.as_mut() {
                timeout.triggered()
            }
            return StateResult::CloseSession;
        }
        if self.backend_token == Some(token) {
            if let Some(timeout) = self.container_backend_timeout.as_mut() {
                timeout.triggered()
            }
            self.log_request_error(metrics, "back socket timeout");
            return StateResult::CloseSession;
        }
        error!("got timeout for an invalid token");
        self.log_request_error(metrics, "invalid token timeout");
        StateResult::CloseSession
    }
    fn cancel_timeouts(&mut self) {
        self.container_frontend_timeout.as_mut().map(|t| t.cancel());
        self.container_backend_timeout.as_mut().map(|t| t.cancel());
    }
    fn close(&mut self, _proxy: Rc<RefCell<dyn L7Proxy>>, _metrics: &mut SessionMetrics) {
        if let Some(backend) = self.backend.as_mut() {
            let mut backend = backend.borrow_mut();
            backend.active_requests = backend.active_requests.saturating_sub(1);
        }
    }
    fn print_state(&self, context: &str) {
        error!(
            "\
{} Session(Pipe)
\tFrontend:
\t\ttoken: {:?}\treadiness: {:?}
\tBackend:
\t\ttoken: {:?}\treadiness: {:?}",
            context,
            self.frontend_token,
            self.frontend_readiness,
            self.backend_token,
            self.backend_readiness
        );
    }
}