Documentation
use crate::registry;
use crate::tools::format_path;
use crate::EResult;
use elbus::broker::{Broker, Client};
use log::debug;
use serde::Deserialize;
use std::time::Duration;

#[derive(Deserialize)]
struct ElbusConfig {
    buf_size: usize,
    #[serde(deserialize_with = "crate::tools::de_float_as_duration_us")]
    buf_ttl: Duration,
    queue_size: usize,
    sockets: Vec<String>,
}

macro_rules! is_unix_socket {
    ($socket: expr) => {
        $socket.ends_with(".sock") || $socket.ends_with(".$socket") || $socket.ends_with(".ipc")
    };
}

pub struct EvaBroker {
    broker: Broker,
    config: ElbusConfig,
}

impl EvaBroker {
    pub fn new_from_db(db: &mut yedb::Database, dir_eva: &str) -> EResult<EvaBroker> {
        let mut config: ElbusConfig =
            serde_json::from_value(db.key_get(&registry::format_config_key("elbus"))?)?;
        debug!("elbus.buf_size = {}", config.buf_size);
        debug!("elbus.buf_ttl = {:?}", config.buf_ttl);
        debug!("elbus.queue_size = {}", config.queue_size);
        for i in 0..config.sockets.len() {
            #[allow(clippy::case_sensitive_file_extension_comparisons)]
            if is_unix_socket!(config.sockets[i]) {
                config.sockets[i] = format_path(dir_eva, Some(&config.sockets[i]), None);
            }
        }
        debug!("elbus.sockets = {}", config.sockets.join(","));
        let mut broker = Broker::new();
        broker.set_queue_size(config.queue_size);
        Ok(EvaBroker { broker, config })
    }
    pub async fn init(&mut self, core: &mut crate::core::Core) -> EResult<()> {
        self.broker.init_default_core_rpc().await?;
        for socket in &self.config.sockets {
            #[allow(clippy::case_sensitive_file_extension_comparisons)]
            if is_unix_socket!(socket) {
                core.add_file_to_remove(socket);
                self.broker
                    .spawn_unix_server(
                        socket,
                        self.config.buf_size,
                        self.config.buf_ttl,
                        core.timeout(),
                    )
                    .await?;
            } else {
                self.broker
                    .spawn_tcp_server(
                        socket,
                        self.config.buf_size,
                        self.config.buf_ttl,
                        core.timeout(),
                    )
                    .await?;
            }
        }
        Ok(())
    }
    #[inline]
    pub async fn register_client(&self, name: &str) -> EResult<Client> {
        self.broker.register_client(name).await.map_err(Into::into)
    }
}