modio-mqttbridge 0.6.1

Modio on-device MQTT bridge
// Author: D.S. Ljungmark <spider@skuggor.se>, Modio AB
// SPDX-License-Identifier: AGPL-3.0-or-later
use std::error::Error;
use std::result::Result;

use futures_util::stream::StreamExt;
use log::{debug, error, info, trace};
use tokio::sync::mpsc;

use crate::Transaction;

use fsipc::legacy::fsipcProxy;

use crate::matches::Matches;
use crate::metadata::Metadata;
use crate::senml_limited::SenML;

pub async fn make_connection(session: bool) -> Result<zbus::Connection, Box<dyn Error>> {
    let connection = if session {
        debug!("Connecting to session bus");
        zbus::ConnectionBuilder::session()?
            .internal_executor(false)
            .build()
            .await?
    } else {
        debug!("Connecting to system bus");
        zbus::ConnectionBuilder::system()?
            .internal_executor(false)
            .build()
            .await?
    };
    Ok(connection)
}

pub async fn executor_loop(conn: zbus::Connection) -> Result<(), Box<dyn Error>> {
    info!(target: "executor_loop", "Ticking the internal dbus executor");
    loop {
        trace!(target: "executor_loop", "polling dbus executor");
        conn.executor().tick().await;
    }
}

mod meta {
    use crate::metadata::Metadata;
    use fsipc::logger1::Logger1Proxy;
    use log::{debug, error, info};
    use std::collections::HashMap;
    use std::error::Error;
    use tokio::sync::mpsc;

    pub(crate) struct Meta<'a> {
        cache: HashMap<String, Metadata>,
        ipc: Logger1Proxy<'a>,
        tx: mpsc::Sender<(String, String)>,
        prefix: String,
    }

    impl Meta<'_> {
        pub async fn new<'a>(
            connection: zbus::Connection,
            tx: mpsc::Sender<(String, String)>,
            prefix: String,
        ) -> Result<Meta<'a>, Box<dyn Error>> {
            let cache = HashMap::<String, Metadata>::new();
            let ipc = Logger1Proxy::builder(&connection)
                .destination("se.modio.logger")?
                .path("/se/modio/logger")?
                .build()
                .await?;
            Ok(Meta {
                cache,
                ipc,
                tx,
                prefix,
            })
        }

        /// Broadcast an item to the MQTT bus.
        fn broadcast_item(&self, key: &str, data: &Metadata) -> Result<(), Box<dyn Error>> {
            let name = key.replace('.', "/");
            let topic = format!("{}/metadata/{}", &self.prefix, name);
            let to_xmit = serde_json::to_string(&data)?;
            self.tx.try_send((topic, to_xmit))?;
            Ok(())
        }

        async fn bus_query(&self, key: &str) -> Result<Metadata, MetaError> {
            debug!(target: "metadata", "Querying modio-logger for metadata on key={}", key);
            match self.ipc.get_metadata(key).await {
                Ok(data) => {
                    let data = Metadata::from(data);
                    Ok(data)
                }
                Err(e) => Err(MetaError::from(e)),
            }
        }
        pub async fn query(&mut self, key: &str) -> Result<&Metadata, MetaError> {
            if !self.cache.contains_key(key) {
                match self.bus_query(key).await {
                    Ok(data) => {
                        info!(target: "metadata", "Got metadata for key={} metadata={:?}", key, data);
                        if let Err(e) = self.broadcast_item(key, &data) {
                            error!(target: "metadata", "Failed to publish to mqtt: {}", e);
                            return Err(MetaError::Mqtt);
                        }
                        self.cache.insert(key.to_string(), data)
                    }
                    Err(MetaError::NotFound) => {
                        info!(target: "metadata", "No metadata for key={}", key);
                        let data = Metadata {
                            n: key.to_string(),
                            u: None,
                            name: None,
                            description: None,
                            value_map: None,
                            mode: None,
                        };
                        self.cache.insert(key.to_string(), data)
                    }
                    Err(other) => {
                        error!(target: "metadata", "Error fetching metadata for key={} err={}", key, other);
                        return Err(other);
                    }
                };
            }
            // since we just inserted metadata into this cache, we really should have it here.
            Ok(self.cache.get(key).expect("Metadata not in cache."))
        }
    }

    #[derive(Debug, thiserror::Error)]
    /// Since the dbus layer passes the error type through internal data, this is a bit reduced.
    pub enum MetaError {
        #[error("Not Found")]
        NotFound,
        #[error("MQTT metadata relate")]
        Mqtt,
        #[error("Other Error")]
        Other(zbus::Error),
    }

    impl From<zbus::Error> for MetaError {
        fn from(v: zbus::Error) -> MetaError {
            match v {
                zbus::Error::MethodError(ref errname, _, _) => match errname.as_str() {
                    "se.modio.logger.fsipc.NotFound" => MetaError::NotFound,
                    _ => MetaError::Other(v),
                },
                _ => MetaError::Other(v),
            }
        }
    }
}

fn to_senml(bn: &str, args: &fsipc::legacy::StoreSignalArgs, metadata: &Metadata) -> SenML {
    // The precision loss is ok here as we are working with 32bit timestamps up-scaled to 64bit
    #[allow(clippy::cast_precision_loss)]
    let when = args.when as f64;
    let datum = SenML::make_one(bn, args.key, args.value, when, metadata.u.as_deref());
    datum
}

pub async fn signal_loop(
    connection: zbus::Connection,
    store_tx: mpsc::Sender<(String, String)>,
    matches: Matches,
    prefix: String,
) -> Result<(), Box<dyn Error>> {
    let ipc = fsipcProxy::builder(&connection).build().await?;
    let boxid: String = ipc.get_boxid().await?;
    let bn: String = format!("urn:dev:mac:{}:", boxid);
    info!(target: "signal_loop", "Got boxid: {}", boxid);
    let mut stream = ipc.receive_store_signal().await?;

    let mut meta = meta::Meta::new(connection, store_tx.clone(), prefix.clone()).await?;

    ipc.store("modio.mqtt.bridge.store", "online").await?;
    info!(target: "signal_loop", "Proxy is ready, awaiting signals, key filters: {:?}", &matches);

    while let Some(v) = stream.next().await {
        let args = v.args()?;
        debug!(target: "signal_loop", "StoreSignal from dbus, {}, {}, {}", args.key, args.value, args.when);
        if matches.key_matches(args.key) {
            let metadata = meta.query(args.key).await?;
            let push = to_senml(&bn, &args, metadata);
            let topic = push.make_topic(&prefix);
            if let Err(msg) = store_tx.try_send((topic, push.to_string())) {
                error!(target: "signal_loop", "Failed to pass to emitter: {}", msg);
            };
        } else {
            debug!(target: "signal_loop", "Ignored due to prefix filter: {}", args.key);
        };
        // trace!("Done sending to channel : {:?}", &store_tx);
    }
    panic!("No more signals for us?")
}

pub async fn transaction_loop(
    connection: zbus::Connection,
    mut rx_trans: mpsc::Receiver<Transaction>,
) -> Result<(), Box<dyn Error>> {
    let ipc = fsipcProxy::builder(&connection).build().await?;
    ipc.store("modio.mqtt.bridge.transaction", "online").await?;
    info!(target: "transaction_loop", "Proxy is ready, awaiting new transactions");
    while let Some(data) = rx_trans.recv().await {
        info!(target: "transaction_loop", "Pushing Transaction to the logger: {:?}", &data);
        ipc.transaction_add(&data.n, &data.expected(), &data.value(), &data.token)
            .await?;
        debug!(target: "transaction_loop", "Change request delivered");
        // trace!(target: "transaction_loop", "Waiting for  rx channel {:?}", &rx_trans);
    }
    panic!("TX end of the channel hung up")
}