use crate::registry;
use crate::tools::format_path;
use crate::EResult;
use elbus::broker::{Broker, Client};
use log::debug;
use serde::Deserialize;
use std::time::Duration;
#[derive(Deserialize)]
struct ElbusConfig {
buf_size: usize,
#[serde(deserialize_with = "crate::tools::de_float_as_duration_us")]
buf_ttl: Duration,
queue_size: usize,
sockets: Vec<String>,
}
macro_rules! is_unix_socket {
($socket: expr) => {
$socket.ends_with(".sock") || $socket.ends_with(".$socket") || $socket.ends_with(".ipc")
};
}
pub struct EvaBroker {
broker: Broker,
config: ElbusConfig,
}
impl EvaBroker {
pub fn new_from_db(db: &mut yedb::Database, dir_eva: &str) -> EResult<EvaBroker> {
let mut config: ElbusConfig =
serde_json::from_value(db.key_get(®istry::format_config_key("elbus"))?)?;
debug!("elbus.buf_size = {}", config.buf_size);
debug!("elbus.buf_ttl = {:?}", config.buf_ttl);
debug!("elbus.queue_size = {}", config.queue_size);
for i in 0..config.sockets.len() {
#[allow(clippy::case_sensitive_file_extension_comparisons)]
if is_unix_socket!(config.sockets[i]) {
config.sockets[i] = format_path(dir_eva, Some(&config.sockets[i]), None);
}
}
debug!("elbus.sockets = {}", config.sockets.join(","));
let mut broker = Broker::new();
broker.set_queue_size(config.queue_size);
Ok(EvaBroker { broker, config })
}
pub async fn init(&mut self, core: &mut crate::core::Core) -> EResult<()> {
self.broker.init_default_core_rpc().await?;
for socket in &self.config.sockets {
#[allow(clippy::case_sensitive_file_extension_comparisons)]
if is_unix_socket!(socket) {
core.add_file_to_remove(socket);
self.broker
.spawn_unix_server(
socket,
self.config.buf_size,
self.config.buf_ttl,
core.timeout(),
)
.await?;
} else {
self.broker
.spawn_tcp_server(
socket,
self.config.buf_size,
self.config.buf_ttl,
core.timeout(),
)
.await?;
}
}
Ok(())
}
#[inline]
pub async fn register_client(&self, name: &str) -> EResult<Client> {
self.broker.register_client(name).await.map_err(Into::into)
}
}