use std::{
    cell::RefCell,
    collections::{HashSet, VecDeque},
    io::Error as IoError,
    os::unix::io::{AsRawFd, FromRawFd},
    rc::Rc,
    time::{Duration, Instant},
};
use mio::{
    net::{TcpListener as MioTcpListener, TcpStream},
    Events, Interest, Poll, Token,
};
use slab::Slab;
use sozu_command::{
    channel::Channel,
    logging,
    proto::command::{
        request::RequestType, response_content::ContentType, ActivateListener, AddBackend,
        CertificatesWithFingerprints, Cluster, ClusterHashes, ClusterInformations,
        DeactivateListener, Event, HttpListenerConfig, HttpsListenerConfig, InitialState,
        ListenerType, LoadBalancingAlgorithms, LoadMetric, MetricsConfiguration, RemoveBackend,
        Request, ResponseStatus, ServerConfig, TcpListenerConfig as CommandTcpListener,
        WorkerRequest, WorkerResponse,
    },
    ready::Ready,
    scm_socket::{Listeners, ScmSocket, ScmSocketError},
    state::ConfigState,
};
use crate::{
    backends::{Backend, BackendMap},
    features::FEATURES,
    http, https,
    metrics::METRICS,
    pool::Pool,
    tcp,
    timer::Timer,
    AcceptError, Protocol, ProxyConfiguration, ProxySession, SessionIsToBeClosed,
};
pub const CONN_RETRIES: u8 = 3;
pub type ProxyChannel = Channel<WorkerResponse, WorkerRequest>;
thread_local! {
  pub static QUEUE: RefCell<VecDeque<WorkerResponse>> = const { RefCell::new(VecDeque::new()) };
}
thread_local! {
  pub static TIMER: RefCell<Timer<Token>> = RefCell::new(Timer::default());
}
pub fn push_queue(message: WorkerResponse) {
    QUEUE.with(|queue| {
        (*queue.borrow_mut()).push_back(message);
    });
}
pub fn push_event(event: Event) {
    QUEUE.with(|queue| {
        (*queue.borrow_mut()).push_back(WorkerResponse {
            id: "EVENT".to_string(),
            message: String::new(),
            status: ResponseStatus::Processing.into(),
            content: Some(ContentType::Event(event).into()),
        });
    });
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ListenToken(pub usize);
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct SessionToken(pub usize);
impl From<usize> for ListenToken {
    fn from(val: usize) -> ListenToken {
        ListenToken(val)
    }
}
impl From<ListenToken> for usize {
    fn from(val: ListenToken) -> usize {
        val.0
    }
}
impl From<usize> for SessionToken {
    fn from(val: usize) -> SessionToken {
        SessionToken(val)
    }
}
impl From<SessionToken> for usize {
    fn from(val: SessionToken) -> usize {
        val.0
    }
}
pub struct SessionManager {
    pub max_connections: usize,
    pub nb_connections: usize,
    pub can_accept: bool,
    pub slab: Slab<Rc<RefCell<dyn ProxySession>>>,
}
impl SessionManager {
    pub fn new(
        slab: Slab<Rc<RefCell<dyn ProxySession>>>,
        max_connections: usize,
    ) -> Rc<RefCell<Self>> {
        Rc::new(RefCell::new(SessionManager {
            max_connections,
            nb_connections: 0,
            can_accept: true,
            slab,
        }))
    }
    pub fn at_capacity(&self) -> bool {
        self.slab.len() >= 10 + 2 * self.max_connections
    }
    pub fn check_limits(&mut self) -> bool {
        if self.nb_connections >= self.max_connections {
            error!("max number of session connection reached, flushing the accept queue");
            gauge!("accept_queue.backpressure", 1);
            self.can_accept = false;
            return false;
        }
        if self.at_capacity() {
            error!("not enough memory to accept another session, flushing the accept queue");
            error!(
                "nb_connections: {}, max_connections: {}",
                self.nb_connections, self.max_connections
            );
            gauge!("accept_queue.backpressure", 1);
            self.can_accept = false;
            return false;
        }
        true
    }
    pub fn to_session(token: Token) -> SessionToken {
        SessionToken(token.0)
    }
    pub fn incr(&mut self) {
        self.nb_connections += 1;
        assert!(self.nb_connections <= self.max_connections);
        gauge!("client.connections", self.nb_connections);
    }
    pub fn decr(&mut self) {
        assert!(self.nb_connections != 0);
        self.nb_connections -= 1;
        gauge!("client.connections", self.nb_connections);
        if !self.can_accept && self.nb_connections < self.max_connections * 90 / 100 {
            debug!(
                "nb_connections = {}, max_connections = {}, starting to accept again",
                self.nb_connections, self.max_connections
            );
            gauge!("accept_queue.backpressure", 0);
            self.can_accept = true;
        }
    }
}
#[derive(thiserror::Error, Debug)]
pub enum ServerError {
    #[error("could not create event loop with MIO poll: {0}")]
    CreatePoll(IoError),
    #[error("could not clone the MIO registry: {0}")]
    CloneRegistry(IoError),
    #[error("could not register the channel: {0}")]
    RegisterChannel(IoError),
    #[error("{msg}:{scm_err}")]
    ScmSocket {
        msg: String,
        scm_err: ScmSocketError,
    },
}
pub struct Server {
    accept_queue_timeout: Duration,
    accept_queue: VecDeque<(TcpStream, ListenToken, Protocol, Instant)>,
    accept_ready: HashSet<ListenToken>,
    backends: Rc<RefCell<BackendMap>>,
    base_sessions_count: usize,
    channel: ProxyChannel,
    config_state: ConfigState,
    current_poll_errors: i32,
    http: Rc<RefCell<http::HttpProxy>>,
    https: Rc<RefCell<https::HttpsProxy>>,
    last_sessions_len: usize,
    last_shutting_down_message: Option<Instant>,
    last_zombie_check: Instant,
    loop_start: Instant,
    max_poll_errors: i32, pub poll: Poll,
    poll_timeout: Option<Duration>, scm_listeners: Option<Listeners>,
    scm: ScmSocket,
    sessions: Rc<RefCell<SessionManager>>,
    should_poll_at: Option<Instant>,
    shutting_down: Option<String>,
    tcp: Rc<RefCell<tcp::TcpProxy>>,
    zombie_check_interval: Duration,
}
impl Server {
    pub fn try_new_from_config(
        worker_to_main_channel: ProxyChannel,
        worker_to_main_scm: ScmSocket,
        config: ServerConfig,
        initial_state: InitialState,
        expects_initial_status: bool,
    ) -> Result<Self, ServerError> {
        let event_loop = Poll::new().map_err(ServerError::CreatePoll)?;
        let pool = Rc::new(RefCell::new(Pool::with_capacity(
            config.min_buffers as usize,
            config.max_buffers as usize,
            config.buffer_size as usize,
        )));
        let backends = Rc::new(RefCell::new(BackendMap::new()));
        let sessions: Rc<RefCell<SessionManager>> = SessionManager::new(
            Slab::with_capacity(config.slab_capacity() as usize),
            config.max_connections as usize,
        );
        {
            let mut s = sessions.borrow_mut();
            let entry = s.slab.vacant_entry();
            trace!("taking token {:?} for channel", SessionToken(entry.key()));
            entry.insert(Rc::new(RefCell::new(ListenSession {
                protocol: Protocol::Channel,
            })));
        }
        {
            let mut s = sessions.borrow_mut();
            let entry = s.slab.vacant_entry();
            trace!("taking token {:?} for metrics", SessionToken(entry.key()));
            entry.insert(Rc::new(RefCell::new(ListenSession {
                protocol: Protocol::Timer,
            })));
        }
        {
            let mut s = sessions.borrow_mut();
            let entry = s.slab.vacant_entry();
            trace!("taking token {:?} for metrics", SessionToken(entry.key()));
            entry.insert(Rc::new(RefCell::new(ListenSession {
                protocol: Protocol::Metrics,
            })));
        }
        Server::new(
            event_loop,
            worker_to_main_channel,
            worker_to_main_scm,
            sessions,
            pool,
            backends,
            None,
            None,
            None,
            config,
            Some(initial_state),
            expects_initial_status,
        )
    }
    #[allow(clippy::too_many_arguments)]
    pub fn new(
        poll: Poll,
        mut channel: ProxyChannel,
        scm: ScmSocket,
        sessions: Rc<RefCell<SessionManager>>,
        pool: Rc<RefCell<Pool>>,
        backends: Rc<RefCell<BackendMap>>,
        http: Option<http::HttpProxy>,
        https: Option<https::HttpsProxy>,
        tcp: Option<tcp::TcpProxy>,
        server_config: ServerConfig,
        initial_state: Option<InitialState>,
        expects_initial_status: bool,
    ) -> Result<Self, ServerError> {
        FEATURES.with(|_features| {
            });
        poll.registry()
            .register(
                &mut channel,
                Token(0),
                Interest::READABLE | Interest::WRITABLE,
            )
            .map_err(ServerError::RegisterChannel)?;
        METRICS.with(|metrics| {
            if let Some(sock) = (*metrics.borrow_mut()).socket_mut() {
                poll.registry()
                    .register(sock, Token(2), Interest::WRITABLE)
                    .expect("should register the metrics socket");
            }
        });
        let base_sessions_count = sessions.borrow().slab.len();
        let http = Rc::new(RefCell::new(match http {
            Some(http) => http,
            None => {
                let registry = poll
                    .registry()
                    .try_clone()
                    .map_err(ServerError::CloneRegistry)?;
                http::HttpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone())
            }
        }));
        let https = Rc::new(RefCell::new(match https {
            Some(https) => https,
            None => {
                let registry = poll
                    .registry()
                    .try_clone()
                    .map_err(ServerError::CloneRegistry)?;
                https::HttpsProxy::new(registry, sessions.clone(), pool.clone(), backends.clone())
            }
        }));
        let tcp = Rc::new(RefCell::new(match tcp {
            Some(tcp) => tcp,
            None => {
                let registry = poll
                    .registry()
                    .try_clone()
                    .map_err(ServerError::CloneRegistry)?;
                tcp::TcpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone())
            }
        }));
        let mut server = Server {
            accept_queue_timeout: Duration::from_secs(u64::from(
                server_config.accept_queue_timeout,
            )),
            accept_queue: VecDeque::new(),
            accept_ready: HashSet::new(),
            backends,
            base_sessions_count,
            channel,
            config_state: ConfigState::new(),
            current_poll_errors: 0,
            http,
            https,
            last_sessions_len: 0, last_shutting_down_message: None,
            last_zombie_check: Instant::now(), loop_start: Instant::now(),        max_poll_errors: 10000,            poll_timeout: Some(Duration::from_millis(1000)), poll,
            scm_listeners: None,
            scm,
            sessions,
            should_poll_at: None,
            shutting_down: None,
            tcp,
            zombie_check_interval: Duration::from_secs(u64::from(
                server_config.zombie_check_interval,
            )),
        };
        if let Some(state) = initial_state {
            for request in state.requests {
                trace!("generating initial config request: {:#?}", request);
                server.notify_proxys(request);
            }
            QUEUE.with(|queue| {
                (*queue.borrow_mut()).clear();
            });
        }
        if expects_initial_status {
            server.block_channel();
            let msg = server.channel.read_message();
            debug!("got message: {:?}", msg);
            if let Ok(WorkerRequest {
                id,
                content:
                    Request {
                        request_type: Some(RequestType::Status(_)),
                    },
            }) = msg
            {
                if let Err(e) = server.channel.write_message(&WorkerResponse::ok(id)) {
                    error!("Could not send an ok to the main process: {}", e);
                }
            } else {
                panic!("plz give me a status request first when I start, you sent me this instead: {:?}", msg);
            }
            server.unblock_channel();
        }
        info!("will try to receive listeners");
        server
            .scm
            .set_blocking(true)
            .map_err(|scm_err| ServerError::ScmSocket {
                msg: "Could not set the scm socket to blocking".to_string(),
                scm_err,
            })?;
        let listeners =
            server
                .scm
                .receive_listeners()
                .map_err(|scm_err| ServerError::ScmSocket {
                    msg: "could not receive listeners from the scm socket".to_string(),
                    scm_err,
                })?;
        server
            .scm
            .set_blocking(false)
            .map_err(|scm_err| ServerError::ScmSocket {
                msg: "Could not set the scm socket to unblocking".to_string(),
                scm_err,
            })?;
        info!("received listeners: {:?}", listeners);
        server.scm_listeners = Some(listeners);
        Ok(server)
    }
    pub fn run(&mut self) {
        let mut events = Events::with_capacity(1024); self.last_sessions_len = self.sessions.borrow().slab.len();
        self.last_zombie_check = Instant::now();
        self.loop_start = Instant::now();
        loop {
            self.check_for_poll_errors();
            let timeout = self.reset_loop_time_and_get_timeout();
            match self.poll.poll(&mut events, timeout) {
                Ok(_) => self.current_poll_errors = 0,
                Err(error) => {
                    error!("Error while polling events: {:?}", error);
                    self.current_poll_errors += 1;
                    continue;
                }
            }
            let after_epoll = Instant::now();
            time!("epoll_time", (after_epoll - self.loop_start).as_millis());
            self.loop_start = after_epoll;
            self.send_queue();
            for event in events.iter() {
                match event.token() {
                    Token(0) => {
                        if event.is_error() {
                            error!("error reading from command channel");
                            continue;
                        }
                        if event.is_read_closed() || event.is_write_closed() {
                            error!("command channel was closed");
                            return;
                        }
                        let ready = Ready::from(event);
                        self.channel.handle_events(ready);
                        loop {
                            QUEUE.with(|queue| {
                                if !(*queue.borrow()).is_empty() {
                                    self.channel.interest.insert(Ready::WRITABLE);
                                }
                            });
                            if self.channel.readiness() == Ready::EMPTY {
                                break;
                            }
                            if self.read_channel_messages_and_notify() {
                                return;
                            }
                            QUEUE.with(|queue| {
                                if !(*queue.borrow()).is_empty() {
                                    self.channel.interest.insert(Ready::WRITABLE);
                                }
                            });
                            self.send_queue();
                        }
                    }
                    Token(1) => {
                        while let Some(t) = TIMER.with(|timer| timer.borrow_mut().poll()) {
                            self.timeout(t);
                        }
                    }
                    Token(2) => METRICS.with(|metrics| {
                        (*metrics.borrow_mut()).writable();
                    }),
                    token => self.ready(token, Ready::from(event)),
                }
            }
            if let Some(t) = self.should_poll_at.as_ref() {
                if *t <= Instant::now() {
                    while let Some(t) = TIMER.with(|timer| timer.borrow_mut().poll()) {
                        self.timeout(t);
                    }
                }
            }
            self.handle_remaining_readiness();
            self.create_sessions();
            self.should_poll_at = TIMER.with(|timer| timer.borrow().next_poll_date());
            self.zombie_check();
            let now = time::OffsetDateTime::now_utc();
            if now.minute() == 00 && now.second() == 0 {
                METRICS.with(|metrics| {
                    (*metrics.borrow_mut()).clear_local();
                });
            }
            gauge!("client.connections", self.sessions.borrow().nb_connections);
            gauge!("slab.entries", self.sessions.borrow().slab.len());
            METRICS.with(|metrics| {
                (*metrics.borrow_mut()).send_data();
            });
            if self.shutting_down.is_some() && self.shut_down_sessions() {
                return;
            }
        }
    }
    fn check_for_poll_errors(&mut self) {
        if self.current_poll_errors >= self.max_poll_errors {
            error!(
                "Something is going very wrong. Last {} poll() calls failed, crashing..",
                self.current_poll_errors
            );
            panic!(
                "poll() calls failed {} times in a row",
                self.current_poll_errors
            );
        }
    }
    fn reset_loop_time_and_get_timeout(&mut self) -> Option<Duration> {
        let now = Instant::now();
        time!("event_loop_time", (now - self.loop_start).as_millis());
        let timeout = match self.should_poll_at.as_ref() {
            None => self.poll_timeout,
            Some(i) => {
                if *i <= now {
                    self.poll_timeout
                } else {
                    let dur = *i - now;
                    match self.poll_timeout {
                        None => Some(dur),
                        Some(t) => {
                            if t < dur {
                                Some(t)
                            } else {
                                Some(dur)
                            }
                        }
                    }
                }
            }
        };
        self.loop_start = now;
        timeout
    }
    fn read_channel_messages_and_notify(&mut self) -> bool {
        if !self.channel.readiness().is_readable() {
            return false;
        }
        if let Err(e) = self.channel.readable() {
            error!("error reading from channel: {:?}", e);
        }
        loop {
            let request = self.channel.read_message();
            debug!("Received request {:?}", request);
            match request {
                Ok(request) => match request.content.request_type {
                    Some(RequestType::HardStop(_)) => {
                        let req_id = request.id.clone();
                        self.notify(request);
                        if let Err(e) = self.channel.write_message(&WorkerResponse::ok(req_id)) {
                            error!("Could not send ok response to the main process: {}", e);
                        }
                        if let Err(e) = self.channel.run() {
                            error!("Error while running the server channel: {}", e);
                        }
                        return true;
                    }
                    Some(RequestType::SoftStop(_)) => {
                        self.shutting_down = Some(request.id.clone());
                        self.last_sessions_len = self.sessions.borrow().slab.len();
                        self.notify(request);
                    }
                    Some(RequestType::ReturnListenSockets(_)) => {
                        info!("received ReturnListenSockets order");
                        match self.return_listen_sockets() {
                            Ok(_) => push_queue(WorkerResponse::ok(request.id)),
                            Err(error) => push_queue(worker_response_error(
                                request.id,
                                format!("Could not send listeners on scm socket: {error:?}"),
                            )),
                        }
                    }
                    _ => self.notify(request),
                },
                Err(_) => {
                    if (self.channel.interest & self.channel.readiness).is_readable() {
                        if let Err(e) = self.channel.readable() {
                            error!("error reading from channel: {:?}", e);
                        }
                        continue;
                    }
                    break;
                }
            }
        }
        false
    }
    fn zombie_check(&mut self) {
        let now = Instant::now();
        if now - self.last_zombie_check < self.zombie_check_interval {
            return;
        }
        info!("zombie check");
        self.last_zombie_check = now;
        let mut zombie_tokens = HashSet::new();
        for (_index, session) in self
            .sessions
            .borrow_mut()
            .slab
            .iter_mut()
            .filter(|(_, c)| now - c.borrow().last_event() > self.zombie_check_interval)
        {
            let session_token = session.borrow().frontend_token();
            if !zombie_tokens.contains(&session_token) {
                session.borrow().print_session();
                zombie_tokens.insert(session_token);
            }
        }
        let zombie_count = zombie_tokens.len() as i64;
        count!("zombies", zombie_count);
        let remaining_count = self.shut_down_sessions_by_frontend_tokens(zombie_tokens);
        info!(
            "removing {} zombies ({} remaining entries after close)",
            zombie_count, remaining_count
        );
    }
    fn shut_down_sessions_by_frontend_tokens(&self, tokens: HashSet<Token>) -> usize {
        if tokens.is_empty() {
            return 0;
        }
        for token in &tokens {
            if self.sessions.borrow().slab.contains(token.0) {
                let session = { self.sessions.borrow_mut().slab.remove(token.0) };
                session.borrow_mut().close();
                self.sessions.borrow_mut().decr();
            }
        }
        let mut dangling_entries = HashSet::new();
        for (entry_key, session) in &self.sessions.borrow().slab {
            if tokens.contains(&session.borrow().frontend_token()) {
                dangling_entries.insert(entry_key);
            }
        }
        let mut dangling_entries_count = 0;
        for entry_key in dangling_entries {
            let mut sessions = self.sessions.borrow_mut();
            if sessions.slab.contains(entry_key) {
                sessions.slab.remove(entry_key);
                dangling_entries_count += 1;
            }
        }
        dangling_entries_count
    }
    fn shut_down_sessions(&mut self) -> bool {
        let sessions_count = self.sessions.borrow().slab.len();
        let mut sessions_to_shut_down = HashSet::new();
        for (_key, session) in &self.sessions.borrow().slab {
            if session.borrow_mut().shutting_down() {
                sessions_to_shut_down.insert(Token(session.borrow().frontend_token().0));
            }
        }
        let _ = self.shut_down_sessions_by_frontend_tokens(sessions_to_shut_down);
        let new_sessions_count = self.sessions.borrow().slab.len();
        if new_sessions_count < sessions_count {
            let now = Instant::now();
            if let Some(last) = self.last_shutting_down_message {
                if (now - last) > Duration::from_secs(5) {
                    info!(
                        "closed {} sessions, {} sessions left, base_sessions_count = {}",
                        sessions_count - new_sessions_count,
                        new_sessions_count,
                        self.base_sessions_count
                    );
                }
            }
            self.last_shutting_down_message = Some(now);
        }
        if new_sessions_count <= self.base_sessions_count {
            info!("last session stopped, shutting down!");
            if let Err(e) = self.channel.run() {
                error!("Error while running the server channel: {}", e);
            }
            let id = self
                .shutting_down
                .take()
                .expect("should have shut down correctly"); debug!("Responding OK to main process for request {}", id);
            let proxy_response = WorkerResponse::ok(id);
            if let Err(e) = self.channel.write_message(&proxy_response) {
                error!("Could not write response to the main process: {}", e);
            }
            if let Err(e) = self.channel.run() {
                error!("Error while running the server channel: {}", e);
            }
            return true;
        }
        if new_sessions_count < self.last_sessions_len {
            info!(
                "shutting down, {} slab elements remaining (base: {})",
                new_sessions_count - self.base_sessions_count,
                self.base_sessions_count
            );
            self.last_sessions_len = new_sessions_count;
        }
        false
    }
    fn kill_session(&self, session: Rc<RefCell<dyn ProxySession>>) {
        let token = session.borrow().frontend_token();
        let _ = self.shut_down_sessions_by_frontend_tokens(HashSet::from([token]));
    }
    fn send_queue(&mut self) {
        if self.channel.readiness.is_writable() {
            QUEUE.with(|q| {
                let mut queue = q.borrow_mut();
                loop {
                    if let Some(resp) = queue.pop_front() {
                        debug!("Sending response {:?}", resp);
                        if let Err(e) = self.channel.write_message(&resp) {
                            error!("Could not write message {} on the channel: {}", resp, e);
                            queue.push_front(resp);
                        }
                    }
                    if self.channel.back_buf.available_data() > 0 {
                        if let Err(e) = self.channel.writable() {
                            error!("error writing to channel: {:?}", e);
                        }
                    }
                    if !self.channel.readiness.is_writable() {
                        break;
                    }
                    if self.channel.back_buf.available_data() == 0 && queue.len() == 0 {
                        break;
                    }
                }
            });
        }
    }
    fn notify(&mut self, message: WorkerRequest) {
        match &message.content.request_type {
            Some(RequestType::ConfigureMetrics(configuration)) => {
                match MetricsConfiguration::try_from(*configuration) {
                    Ok(metrics_config) => {
                        METRICS.with(|metrics| {
                            (*metrics.borrow_mut()).configure(&metrics_config);
                            push_queue(WorkerResponse::ok(message.id));
                        });
                    }
                    Err(e) => {
                        error!("Error configuring metrics: {}", e);
                        push_queue(WorkerResponse::error(message.id, e));
                    }
                }
                return;
            }
            Some(RequestType::QueryMetrics(query_metrics_options)) => {
                METRICS.with(|metrics| {
                    match (*metrics.borrow_mut()).query(query_metrics_options) {
                        Ok(c) => push_queue(WorkerResponse::ok_with_content(message.id, c)),
                        Err(e) => {
                            error!("Error querying metrics: {}", e);
                            push_queue(WorkerResponse::error(message.id, e))
                        }
                    }
                });
                return;
            }
            Some(RequestType::Logging(logging_filter)) => {
                info!(
                    "{} changing logging filter to {}",
                    message.id, logging_filter
                );
                let (directives, _errors) = logging::parse_logging_spec(logging_filter);
                logging::LOGGER.with(|logger| {
                    logger.borrow_mut().set_directives(directives);
                });
                push_queue(WorkerResponse::ok(message.id));
                return;
            }
            Some(RequestType::QueryClustersHashes(_)) => {
                push_queue(WorkerResponse::ok_with_content(
                    message.id.clone(),
                    ContentType::ClusterHashes(ClusterHashes {
                        map: self.config_state.hash_state(),
                    })
                    .into(),
                ));
                return;
            }
            Some(RequestType::QueryClusterById(cluster_id)) => {
                push_queue(WorkerResponse::ok_with_content(
                    message.id.clone(),
                    ContentType::Clusters(ClusterInformations {
                        vec: self
                            .config_state
                            .cluster_state(cluster_id)
                            .map_or(vec![], |ci| vec![ci]),
                    })
                    .into(),
                ));
            }
            Some(RequestType::QueryClustersByDomain(domain)) => {
                let cluster_ids = self
                    .config_state
                    .get_cluster_ids_by_domain(domain.hostname.clone(), domain.path.clone());
                let vec = cluster_ids
                    .iter()
                    .filter_map(|cluster_id| self.config_state.cluster_state(cluster_id))
                    .collect();
                push_queue(WorkerResponse::ok_with_content(
                    message.id.clone(),
                    ContentType::Clusters(ClusterInformations { vec }).into(),
                ));
                return;
            }
            Some(RequestType::QueryCertificatesFromWorkers(filters)) => {
                if filters.fingerprint.is_some() {
                    let certs = self.config_state.get_certificates(filters.clone());
                    let response = if !certs.is_empty() {
                        WorkerResponse::ok_with_content(
                            message.id.clone(),
                            ContentType::CertificatesWithFingerprints(
                                CertificatesWithFingerprints { certs },
                            )
                            .into(),
                        )
                    } else {
                        worker_response_error(
                            message.id.clone(),
                            "Could not find certificate for this fingerprint",
                        )
                    };
                    push_queue(response);
                    return;
                }
                }
            _other_request => {}
        }
        self.notify_proxys(message);
    }
    pub fn notify_proxys(&mut self, request: WorkerRequest) {
        if let Err(e) = self.config_state.dispatch(&request.content) {
            error!("Could not execute order on config state: {}", e);
        }
        let req_id = request.id.clone();
        match request.content.request_type {
            Some(RequestType::AddCluster(ref cluster)) => {
                self.add_cluster(cluster);
                }
            Some(RequestType::AddBackend(ref backend)) => {
                push_queue(self.add_backend(&req_id, backend));
                return;
            }
            Some(RequestType::RemoveBackend(ref remove_backend)) => {
                push_queue(self.remove_backend(&req_id, remove_backend));
                return;
            }
            _ => {}
        };
        let proxy_destinations = request.content.get_destinations();
        let mut notify_response = None;
        if proxy_destinations.to_http_proxy {
            notify_response = Some(self.http.borrow_mut().notify(request.clone()));
        }
        if proxy_destinations.to_https_proxy {
            let http_proxy_response = self.https.borrow_mut().notify(request.clone());
            if http_proxy_response.is_failure() || notify_response.is_none() {
                notify_response = Some(http_proxy_response);
            }
        }
        if proxy_destinations.to_tcp_proxy {
            let tcp_proxy_response = self.tcp.borrow_mut().notify(request.clone());
            if tcp_proxy_response.is_failure() || notify_response.is_none() {
                notify_response = Some(tcp_proxy_response);
            }
        }
        if let Some(response) = notify_response {
            push_queue(response);
        }
        match request.content.request_type {
            Some(RequestType::AddHttpListener(listener)) => {
                push_queue(self.notify_add_http_listener(&req_id, listener));
            }
            Some(RequestType::AddHttpsListener(listener)) => {
                push_queue(self.notify_add_https_listener(&req_id, listener));
            }
            Some(RequestType::AddTcpListener(listener)) => {
                push_queue(self.notify_add_tcp_listener(&req_id, listener));
            }
            Some(RequestType::RemoveListener(ref remove)) => {
                debug!("{} remove {:?} listener {:?}", req_id, remove.proxy, remove);
                self.base_sessions_count -= 1;
                let response = match ListenerType::try_from(remove.proxy) {
                    Ok(ListenerType::Http) => self.http.borrow_mut().notify(request),
                    Ok(ListenerType::Https) => self.https.borrow_mut().notify(request),
                    Ok(ListenerType::Tcp) => self.tcp.borrow_mut().notify(request),
                    Err(_) => WorkerResponse::error(req_id, "Wrong variant ListenerType"),
                };
                push_queue(response);
            }
            Some(RequestType::ActivateListener(ref activate)) => {
                push_queue(self.notify_activate_listener(&req_id, activate));
            }
            Some(RequestType::DeactivateListener(ref deactivate)) => {
                push_queue(self.notify_deactivate_listener(&req_id, deactivate));
            }
            _other_request => {}
        };
    }
    fn add_cluster(&mut self, cluster: &Cluster) {
        self.backends
            .borrow_mut()
            .set_load_balancing_policy_for_cluster(
                &cluster.cluster_id,
                LoadBalancingAlgorithms::try_from(cluster.load_balancing).unwrap_or_default(),
                cluster
                    .load_metric
                    .and_then(|n| LoadMetric::try_from(n).ok()),
            );
    }
    fn add_backend(&mut self, req_id: &str, add_backend: &AddBackend) -> WorkerResponse {
        let new_backend = Backend::new(
            &add_backend.backend_id,
            add_backend.address.clone().into(),
            add_backend.sticky_id.clone(),
            add_backend.load_balancing_parameters.clone(),
            add_backend.backup,
        );
        self.backends
            .borrow_mut()
            .add_backend(&add_backend.cluster_id, new_backend);
        WorkerResponse::ok(req_id)
    }
    fn remove_backend(&mut self, req_id: &str, backend: &RemoveBackend) -> WorkerResponse {
        let address = backend.address.clone().into();
        self.backends
            .borrow_mut()
            .remove_backend(&backend.cluster_id, &address);
        WorkerResponse::ok(req_id)
    }
    fn notify_add_http_listener(
        &mut self,
        req_id: &str,
        listener: HttpListenerConfig,
    ) -> WorkerResponse {
        debug!("{} add http listener {:?}", req_id, listener);
        if self.sessions.borrow().at_capacity() {
            return worker_response_error(req_id, "session list is full, cannot add a listener");
        }
        let mut session_manager = self.sessions.borrow_mut();
        let entry = session_manager.slab.vacant_entry();
        let token = Token(entry.key());
        match self.http.borrow_mut().add_listener(listener, token) {
            Ok(_token) => {
                entry.insert(Rc::new(RefCell::new(ListenSession {
                    protocol: Protocol::HTTPListen,
                })));
                self.base_sessions_count += 1;
                WorkerResponse::ok(req_id)
            }
            Err(e) => worker_response_error(req_id, format!("Could not add HTTP listener: {e}")),
        }
    }
    fn notify_add_https_listener(
        &mut self,
        req_id: &str,
        listener: HttpsListenerConfig,
    ) -> WorkerResponse {
        debug!("{} add https listener {:?}", req_id, listener);
        if self.sessions.borrow().at_capacity() {
            return worker_response_error(req_id, "session list is full, cannot add a listener");
        }
        let mut session_manager = self.sessions.borrow_mut();
        let entry = session_manager.slab.vacant_entry();
        let token = Token(entry.key());
        match self
            .https
            .borrow_mut()
            .add_listener(listener.clone(), token)
        {
            Ok(_token) => {
                entry.insert(Rc::new(RefCell::new(ListenSession {
                    protocol: Protocol::HTTPSListen,
                })));
                self.base_sessions_count += 1;
                WorkerResponse::ok(req_id)
            }
            Err(e) => worker_response_error(req_id, format!("Could not add HTTPS listener: {e}")),
        }
    }
    fn notify_add_tcp_listener(
        &mut self,
        req_id: &str,
        listener: CommandTcpListener,
    ) -> WorkerResponse {
        debug!("{} add tcp listener {:?}", req_id, listener);
        if self.sessions.borrow().at_capacity() {
            return worker_response_error(req_id, "session list is full, cannot add a listener");
        }
        let mut session_manager = self.sessions.borrow_mut();
        let entry = session_manager.slab.vacant_entry();
        let token = Token(entry.key());
        match self.tcp.borrow_mut().add_listener(listener, token) {
            Ok(_token) => {
                entry.insert(Rc::new(RefCell::new(ListenSession {
                    protocol: Protocol::TCPListen,
                })));
                self.base_sessions_count += 1;
                WorkerResponse::ok(req_id)
            }
            Err(e) => worker_response_error(req_id, format!("Could not add TCP listener: {e}")),
        }
    }
    fn notify_activate_listener(
        &mut self,
        req_id: &str,
        activate: &ActivateListener,
    ) -> WorkerResponse {
        debug!(
            "{} activate {:?} listener {:?}",
            req_id, activate.proxy, activate
        );
        let address: std::net::SocketAddr = activate.address.clone().into();
        match ListenerType::try_from(activate.proxy) {
            Ok(ListenerType::Http) => {
                let listener = self
                    .scm_listeners
                    .as_mut()
                    .and_then(|s| s.get_http(&address))
                    .map(|fd| unsafe { MioTcpListener::from_raw_fd(fd) });
                let activated_token = self.http.borrow_mut().activate_listener(&address, listener);
                match activated_token {
                    Ok(token) => {
                        self.accept(ListenToken(token.0), Protocol::HTTPListen);
                        WorkerResponse::ok(req_id)
                    }
                    Err(activate_error) => worker_response_error(
                        req_id,
                        format!("Could not activate HTTP listener: {}", activate_error),
                    ),
                }
            }
            Ok(ListenerType::Https) => {
                let listener = self
                    .scm_listeners
                    .as_mut()
                    .and_then(|s| s.get_https(&address))
                    .map(|fd| unsafe { MioTcpListener::from_raw_fd(fd) });
                let activated_token = self
                    .https
                    .borrow_mut()
                    .activate_listener(&address, listener);
                match activated_token {
                    Ok(token) => {
                        self.accept(ListenToken(token.0), Protocol::HTTPSListen);
                        WorkerResponse::ok(req_id)
                    }
                    Err(activate_error) => worker_response_error(
                        req_id,
                        format!("Could not activate HTTPS listener: {}", activate_error),
                    ),
                }
            }
            Ok(ListenerType::Tcp) => {
                let listener = self
                    .scm_listeners
                    .as_mut()
                    .and_then(|s| s.get_tcp(&address))
                    .map(|fd| unsafe { MioTcpListener::from_raw_fd(fd) });
                let listener_token = self.tcp.borrow_mut().activate_listener(&address, listener);
                match listener_token {
                    Ok(token) => {
                        self.accept(ListenToken(token.0), Protocol::TCPListen);
                        WorkerResponse::ok(req_id)
                    }
                    Err(activate_error) => worker_response_error(
                        req_id,
                        format!("Could not activate TCP listener: {}", activate_error),
                    ),
                }
            }
            Err(_) => worker_response_error(req_id, "Wrong variant for ListenerType on request"),
        }
    }
    fn notify_deactivate_listener(
        &mut self,
        req_id: &str,
        deactivate: &DeactivateListener,
    ) -> WorkerResponse {
        debug!(
            "{} deactivate {:?} listener {:?}",
            req_id, deactivate.proxy, deactivate
        );
        let address: std::net::SocketAddr = deactivate.address.clone().into();
        match ListenerType::try_from(deactivate.proxy) {
            Ok(ListenerType::Http) => {
                let (token, mut listener) = match self.http.borrow_mut().give_back_listener(address)
                {
                    Ok((token, listener)) => (token, listener),
                    Err(e) => {
                        return worker_response_error(
                            req_id,
                            format!(
                                "Couldn't deactivate HTTP listener at address {address:?}: {e}"
                            ),
                        )
                    }
                };
                if let Err(e) = self.poll.registry().deregister(&mut listener) {
                    error!(
                        "error deregistering HTTP listen socket({:?}): {:?}",
                        deactivate, e
                    );
                }
                {
                    let mut sessions = self.sessions.borrow_mut();
                    if sessions.slab.contains(token.0) {
                        sessions.slab.remove(token.0);
                        info!("removed listen token {:?}", token);
                    }
                }
                if deactivate.to_scm {
                    self.unblock_scm_socket();
                    let listeners = Listeners {
                        http: vec![(address, listener.as_raw_fd())],
                        tls: vec![],
                        tcp: vec![],
                    };
                    info!("sending HTTP listener: {:?}", listeners);
                    let res = self.scm.send_listeners(&listeners);
                    self.block_scm_socket();
                    info!("sent HTTP listener: {:?}", res);
                }
                WorkerResponse::ok(req_id)
            }
            Ok(ListenerType::Https) => {
                let (token, mut listener) =
                    match self.https.borrow_mut().give_back_listener(address) {
                        Ok((token, listener)) => (token, listener),
                        Err(e) => {
                            return worker_response_error(
                                req_id,
                                format!(
                                "Couldn't deactivate HTTPS listener at address {address:?}: {e}",
                            ),
                            )
                        }
                    };
                if let Err(e) = self.poll.registry().deregister(&mut listener) {
                    error!(
                        "error deregistering HTTPS listen socket({:?}): {:?}",
                        deactivate, e
                    );
                }
                if self.sessions.borrow().slab.contains(token.0) {
                    self.sessions.borrow_mut().slab.remove(token.0);
                    info!("removed listen token {:?}", token);
                }
                if deactivate.to_scm {
                    self.unblock_scm_socket();
                    let listeners = Listeners {
                        http: vec![],
                        tls: vec![(address, listener.as_raw_fd())],
                        tcp: vec![],
                    };
                    info!("sending HTTPS listener: {:?}", listeners);
                    let res = self.scm.send_listeners(&listeners);
                    self.block_scm_socket();
                    info!("sent HTTPS listener: {:?}", res);
                }
                WorkerResponse::ok(req_id)
            }
            Ok(ListenerType::Tcp) => {
                let (token, mut listener) = match self.tcp.borrow_mut().give_back_listener(address)
                {
                    Ok((token, listener)) => (token, listener),
                    Err(e) => {
                        return worker_response_error(
                            req_id,
                            format!(
                                "Could not deactivate TCP listener at address {:?}: {}",
                                address, e
                            ),
                        )
                    }
                };
                if let Err(e) = self.poll.registry().deregister(&mut listener) {
                    error!(
                        "error deregistering TCP listen socket({:?}): {:?}",
                        deactivate, e
                    );
                }
                if self.sessions.borrow().slab.contains(token.0) {
                    self.sessions.borrow_mut().slab.remove(token.0);
                    info!("removed listen token {:?}", token);
                }
                if deactivate.to_scm {
                    self.unblock_scm_socket();
                    let listeners = Listeners {
                        http: vec![],
                        tls: vec![],
                        tcp: vec![(address, listener.as_raw_fd())],
                    };
                    info!("sending TCP listener: {:?}", listeners);
                    let res = self.scm.send_listeners(&listeners);
                    self.block_scm_socket();
                    info!("sent TCP listener: {:?}", res);
                }
                WorkerResponse::ok(req_id)
            }
            Err(_) => worker_response_error(req_id, "Wrong variant for ListenerType on request"),
        }
    }
    pub fn return_listen_sockets(&mut self) -> Result<(), ScmSocketError> {
        self.unblock_scm_socket();
        let mut http_listeners = self.http.borrow_mut().give_back_listeners();
        for &mut (_, ref mut sock) in http_listeners.iter_mut() {
            if let Err(e) = self.poll.registry().deregister(sock) {
                error!(
                    "error deregistering HTTP listen socket({:?}): {:?}",
                    sock, e
                );
            }
        }
        let mut https_listeners = self.https.borrow_mut().give_back_listeners();
        for &mut (_, ref mut sock) in https_listeners.iter_mut() {
            if let Err(e) = self.poll.registry().deregister(sock) {
                error!(
                    "error deregistering HTTPS listen socket({:?}): {:?}",
                    sock, e
                );
            }
        }
        let mut tcp_listeners = self.tcp.borrow_mut().give_back_listeners();
        for &mut (_, ref mut sock) in tcp_listeners.iter_mut() {
            if let Err(e) = self.poll.registry().deregister(sock) {
                error!("error deregistering TCP listen socket({:?}): {:?}", sock, e);
            }
        }
        let listeners = Listeners {
            http: http_listeners
                .iter()
                .map(|(addr, listener)| (*addr, listener.as_raw_fd()))
                .collect(),
            tls: https_listeners
                .iter()
                .map(|(addr, listener)| (*addr, listener.as_raw_fd()))
                .collect(),
            tcp: tcp_listeners
                .iter()
                .map(|(addr, listener)| (*addr, listener.as_raw_fd()))
                .collect(),
        };
        info!("sending default listeners: {:?}", listeners);
        let res = self.scm.send_listeners(&listeners);
        self.block_scm_socket();
        info!("sent default listeners: {:?}", res);
        res
    }
    fn block_scm_socket(&mut self) {
        if let Err(e) = self.scm.set_blocking(true) {
            error!("Could not block scm socket: {}", e);
        }
    }
    fn unblock_scm_socket(&mut self) {
        if let Err(e) = self.scm.set_blocking(false) {
            error!("Could not unblock scm socket: {}", e);
        }
    }
    pub fn to_session(&self, token: Token) -> SessionToken {
        SessionToken(token.0)
    }
    pub fn from_session(&self, token: SessionToken) -> Token {
        Token(token.0)
    }
    pub fn accept(&mut self, token: ListenToken, protocol: Protocol) {
        match protocol {
            Protocol::TCPListen => loop {
                match self.tcp.borrow_mut().accept(token) {
                    Ok(sock) => self.accept_queue.push_back((
                        sock,
                        token,
                        Protocol::TCPListen,
                        Instant::now(),
                    )),
                    Err(AcceptError::WouldBlock) => {
                        self.accept_ready.remove(&token);
                        break;
                    }
                    Err(other) => {
                        error!("error accepting TCP sockets: {:?}", other);
                        self.accept_ready.remove(&token);
                        break;
                    }
                }
            },
            Protocol::HTTPListen => loop {
                match self.http.borrow_mut().accept(token) {
                    Ok(sock) => self.accept_queue.push_back((
                        sock,
                        token,
                        Protocol::HTTPListen,
                        Instant::now(),
                    )),
                    Err(AcceptError::WouldBlock) => {
                        self.accept_ready.remove(&token);
                        break;
                    }
                    Err(other) => {
                        error!("error accepting HTTP sockets: {:?}", other);
                        self.accept_ready.remove(&token);
                        break;
                    }
                }
            },
            Protocol::HTTPSListen => loop {
                match self.https.borrow_mut().accept(token) {
                    Ok(sock) => self.accept_queue.push_back((
                        sock,
                        token,
                        Protocol::HTTPSListen,
                        Instant::now(),
                    )),
                    Err(AcceptError::WouldBlock) => {
                        self.accept_ready.remove(&token);
                        break;
                    }
                    Err(other) => {
                        error!("error accepting HTTPS sockets: {:?}", other);
                        self.accept_ready.remove(&token);
                        break;
                    }
                }
            },
            _ => panic!("should not call accept() on a HTTP, HTTPS or TCP session"),
        }
        gauge!("accept_queue.connections", self.accept_queue.len());
    }
    pub fn create_sessions(&mut self) {
        while let Some((sock, token, protocol, timestamp)) = self.accept_queue.pop_back() {
            let wait_time = Instant::now() - timestamp;
            time!("accept_queue.wait_time", wait_time.as_millis());
            if wait_time > self.accept_queue_timeout {
                incr!("accept_queue.timeout");
                continue;
            }
            if !self.sessions.borrow_mut().check_limits() {
                break;
            }
            match protocol {
                Protocol::TCPListen => {
                    let proxy = self.tcp.clone();
                    if self
                        .tcp
                        .borrow_mut()
                        .create_session(sock, token, wait_time, proxy)
                        .is_err()
                    {
                        break;
                    }
                }
                Protocol::HTTPListen => {
                    let proxy = self.http.clone();
                    if self
                        .http
                        .borrow_mut()
                        .create_session(sock, token, wait_time, proxy)
                        .is_err()
                    {
                        break;
                    }
                }
                Protocol::HTTPSListen => {
                    if self
                        .https
                        .borrow_mut()
                        .create_session(sock, token, wait_time, self.https.clone())
                        .is_err()
                    {
                        break;
                    }
                }
                _ => panic!("should not call accept() on a HTTP, HTTPS or TCP session"),
            };
            self.sessions.borrow_mut().incr();
        }
        gauge!("accept_queue.connections", self.accept_queue.len());
    }
    pub fn ready(&mut self, token: Token, events: Ready) {
        trace!("PROXY\t{:?} got events: {:?}", token, events);
        let session_token = token.0;
        if self.sessions.borrow().slab.contains(session_token) {
            let protocol = self.sessions.borrow().slab[session_token]
                .borrow()
                .protocol();
            match protocol {
                Protocol::HTTPListen | Protocol::HTTPSListen | Protocol::TCPListen => {
                    if events.is_readable() {
                        self.accept_ready.insert(ListenToken(token.0));
                        if self.sessions.borrow().can_accept {
                            self.accept(ListenToken(token.0), protocol);
                        }
                        return;
                    }
                    if events.is_writable() {
                        error!(
                            "received writable for listener {:?}, this should not happen",
                            token
                        );
                        return;
                    }
                    if events.is_hup() {
                        error!("should not happen: server {:?} closed", token);
                        return;
                    }
                    unreachable!();
                }
                _ => {}
            }
            let session = self.sessions.borrow_mut().slab[session_token].clone();
            session.borrow_mut().update_readiness(token, events);
            if session.borrow_mut().ready(session.clone()) {
                self.kill_session(session);
            }
        }
    }
    pub fn timeout(&mut self, token: Token) {
        trace!("PROXY\t{:?} got timeout", token);
        let session_token = token.0;
        if self.sessions.borrow().slab.contains(session_token) {
            let session = self.sessions.borrow_mut().slab[session_token].clone();
            if session.borrow_mut().timeout(token) {
                self.kill_session(session);
            }
        }
    }
    pub fn handle_remaining_readiness(&mut self) {
        if self.sessions.borrow().can_accept && !self.accept_ready.is_empty() {
            while let Some(token) = self
                .accept_ready
                .iter()
                .next()
                .map(|token| ListenToken(token.0))
            {
                let protocol = self.sessions.borrow().slab[token.0].borrow().protocol();
                self.accept(token, protocol);
                if !self.sessions.borrow().can_accept || self.accept_ready.is_empty() {
                    break;
                }
            }
        }
    }
    fn block_channel(&mut self) {
        if let Err(e) = self.channel.blocking() {
            error!("Could not block channel: {}", e);
        }
    }
    fn unblock_channel(&mut self) {
        if let Err(e) = self.channel.nonblocking() {
            error!("Could not block channel: {}", e);
        }
    }
}
fn worker_response_error<S: ToString, T: ToString>(request_id: S, error: T) -> WorkerResponse {
    error!(
        "error on request {}, {}",
        request_id.to_string(),
        error.to_string()
    );
    WorkerResponse::error(request_id, error)
}
pub struct ListenSession {
    pub protocol: Protocol,
}
impl ProxySession for ListenSession {
    fn last_event(&self) -> Instant {
        Instant::now()
    }
    fn print_session(&self) {}
    fn frontend_token(&self) -> Token {
        Token(0)
    }
    fn protocol(&self) -> Protocol {
        self.protocol
    }
    fn ready(&mut self, _session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed {
        false
    }
    fn shutting_down(&mut self) -> SessionIsToBeClosed {
        false
    }
    fn update_readiness(&mut self, _token: Token, _events: Ready) {}
    fn close(&mut self) {}
    fn timeout(&mut self, _token: Token) -> SessionIsToBeClosed {
        error!(
            "called ProxySession::timeout(token={:?}, time) on ListenSession {{ protocol: {:?} }}",
            _token, self.protocol
        );
        false
    }
}