use std::error::Error;
use std::result::Result;
use futures_util::stream::StreamExt;
use log::{debug, error, info, trace};
use tokio::sync::mpsc;
use crate::Transaction;
use fsipc::legacy::fsipcProxy;
use crate::matches::Matches;
use crate::metadata::Metadata;
use crate::senml_limited::SenML;
pub async fn make_connection(session: bool) -> Result<zbus::Connection, Box<dyn Error>> {
let connection = if session {
debug!("Connecting to session bus");
zbus::ConnectionBuilder::session()?
.internal_executor(false)
.build()
.await?
} else {
debug!("Connecting to system bus");
zbus::ConnectionBuilder::system()?
.internal_executor(false)
.build()
.await?
};
Ok(connection)
}
pub async fn executor_loop(conn: zbus::Connection) -> Result<(), Box<dyn Error>> {
info!(target: "executor_loop", "Ticking the internal dbus executor");
loop {
trace!(target: "executor_loop", "polling dbus executor");
conn.executor().tick().await;
}
}
mod meta {
use crate::metadata::Metadata;
use fsipc::logger1::Logger1Proxy;
use log::{debug, error, info};
use std::collections::HashMap;
use std::error::Error;
use tokio::sync::mpsc;
pub(crate) struct Meta<'a> {
cache: HashMap<String, Metadata>,
ipc: Logger1Proxy<'a>,
tx: mpsc::Sender<(String, String)>,
prefix: String,
}
impl Meta<'_> {
pub async fn new<'a>(
connection: zbus::Connection,
tx: mpsc::Sender<(String, String)>,
prefix: String,
) -> Result<Meta<'a>, Box<dyn Error>> {
let cache = HashMap::<String, Metadata>::new();
let ipc = Logger1Proxy::builder(&connection)
.destination("se.modio.logger")?
.path("/se/modio/logger")?
.build()
.await?;
Ok(Meta {
cache,
ipc,
tx,
prefix,
})
}
fn broadcast_item(&self, key: &str, data: &Metadata) -> Result<(), Box<dyn Error>> {
let name = key.replace('.', "/");
let topic = format!("{}/metadata/{}", &self.prefix, name);
let to_xmit = serde_json::to_string(&data)?;
self.tx.try_send((topic, to_xmit))?;
Ok(())
}
async fn bus_query(&self, key: &str) -> Result<Metadata, MetaError> {
debug!(target: "metadata", "Querying modio-logger for metadata on key={}", key);
match self.ipc.get_metadata(key).await {
Ok(data) => {
let data = Metadata::from(data);
Ok(data)
}
Err(e) => Err(MetaError::from(e)),
}
}
pub async fn query(&mut self, key: &str) -> Result<&Metadata, MetaError> {
if !self.cache.contains_key(key) {
match self.bus_query(key).await {
Ok(data) => {
info!(target: "metadata", "Got metadata for key={} metadata={:?}", key, data);
if let Err(e) = self.broadcast_item(key, &data) {
error!(target: "metadata", "Failed to publish to mqtt: {}", e);
return Err(MetaError::Mqtt);
}
self.cache.insert(key.to_string(), data)
}
Err(MetaError::NotFound) => {
info!(target: "metadata", "No metadata for key={}", key);
let data = Metadata {
n: key.to_string(),
u: None,
name: None,
description: None,
value_map: None,
mode: None,
};
self.cache.insert(key.to_string(), data)
}
Err(other) => {
error!(target: "metadata", "Error fetching metadata for key={} err={}", key, other);
return Err(other);
}
};
}
Ok(self.cache.get(key).expect("Metadata not in cache."))
}
}
#[derive(Debug, thiserror::Error)]
pub enum MetaError {
#[error("Not Found")]
NotFound,
#[error("MQTT metadata relate")]
Mqtt,
#[error("Other Error")]
Other(zbus::Error),
}
impl From<zbus::Error> for MetaError {
fn from(v: zbus::Error) -> MetaError {
match v {
zbus::Error::MethodError(ref errname, _, _) => match errname.as_str() {
"se.modio.logger.fsipc.NotFound" => MetaError::NotFound,
_ => MetaError::Other(v),
},
_ => MetaError::Other(v),
}
}
}
}
fn to_senml(bn: &str, args: &fsipc::legacy::StoreSignalArgs, metadata: &Metadata) -> SenML {
#[allow(clippy::cast_precision_loss)]
let when = args.when as f64;
let datum = SenML::make_one(bn, args.key, args.value, when, metadata.u.as_deref());
datum
}
pub async fn signal_loop(
connection: zbus::Connection,
store_tx: mpsc::Sender<(String, String)>,
matches: Matches,
prefix: String,
) -> Result<(), Box<dyn Error>> {
let ipc = fsipcProxy::builder(&connection).build().await?;
let boxid: String = ipc.get_boxid().await?;
let bn: String = format!("urn:dev:mac:{}:", boxid);
info!(target: "signal_loop", "Got boxid: {}", boxid);
let mut stream = ipc.receive_store_signal().await?;
let mut meta = meta::Meta::new(connection, store_tx.clone(), prefix.clone()).await?;
ipc.store("modio.mqtt.bridge.store", "online").await?;
info!(target: "signal_loop", "Proxy is ready, awaiting signals, key filters: {:?}", &matches);
while let Some(v) = stream.next().await {
let args = v.args()?;
debug!(target: "signal_loop", "StoreSignal from dbus, {}, {}, {}", args.key, args.value, args.when);
if matches.key_matches(args.key) {
let metadata = meta.query(args.key).await?;
let push = to_senml(&bn, &args, metadata);
let topic = push.make_topic(&prefix);
if let Err(msg) = store_tx.try_send((topic, push.to_string())) {
error!(target: "signal_loop", "Failed to pass to emitter: {}", msg);
};
} else {
debug!(target: "signal_loop", "Ignored due to prefix filter: {}", args.key);
};
}
panic!("No more signals for us?")
}
pub async fn transaction_loop(
connection: zbus::Connection,
mut rx_trans: mpsc::Receiver<Transaction>,
) -> Result<(), Box<dyn Error>> {
let ipc = fsipcProxy::builder(&connection).build().await?;
ipc.store("modio.mqtt.bridge.transaction", "online").await?;
info!(target: "transaction_loop", "Proxy is ready, awaiting new transactions");
while let Some(data) = rx_trans.recv().await {
info!(target: "transaction_loop", "Pushing Transaction to the logger: {:?}", &data);
ipc.transaction_add(&data.n, &data.expected(), &data.value(), &data.token)
.await?;
debug!(target: "transaction_loop", "Change request delivered");
}
panic!("TX end of the channel hung up")
}