1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
use crate::events::LOG_INPUT_TOPIC;
use crate::{EResult, Error};
use elbus::client::AsyncClient;
use elbus::QoS;
use lazy_static::lazy_static;
use log::{Level, LevelFilter, Log};
use once_cell::sync::OnceCell;
use std::collections::HashMap;
use std::sync::Arc;

lazy_static! {
    static ref LOG_TOPICS: HashMap<Level, String> = {
        let mut topics = HashMap::new();
        topics.insert(Level::Trace, format!("{}{}", LOG_INPUT_TOPIC, "trace"));
        topics.insert(Level::Debug, format!("{}{}", LOG_INPUT_TOPIC, "debug"));
        topics.insert(Level::Info, format!("{}{}", LOG_INPUT_TOPIC, "info"));
        topics.insert(Level::Warn, format!("{}{}", LOG_INPUT_TOPIC, "warn"));
        topics.insert(Level::Error, format!("{}{}", LOG_INPUT_TOPIC, "error"));
        topics
    };
    static ref LOG_TX: OnceCell<async_channel::Sender<(log::Level, String)>> = <_>::default();
}

static BUS_LOGGER: BusLogger = BusLogger {};

struct BusLogger {}

impl Log for BusLogger {
    #[inline]
    fn enabled(&self, metadata: &log::Metadata) -> bool {
        !metadata.target().starts_with("elbus::") && !metadata.target().starts_with("mio::")
    }
    #[inline]
    fn log(&self, record: &log::Record) {
        if self.enabled(record.metadata()) {
            if let Some(tx) = LOG_TX.get() {
                let _r = tx.try_send((record.level(), format!("{}", record.args())));
            }
        }
    }
    #[inline]
    fn flush(&self) {}
}

async fn handle_logs<C>(
    client: Arc<tokio::sync::Mutex<C>>,
    rx: async_channel::Receiver<(Level, String)>,
) where
    C: ?Sized + AsyncClient,
{
    while let Ok((level, message)) = rx.recv().await {
        if let Err(e) = client
            .lock()
            .await
            .publish(
                LOG_TOPICS.get(&level).unwrap(),
                message.as_bytes().into(),
                QoS::No,
            )
            .await
        {
            eprintln!("{}", e);
        }
    }
}

/// Must not be called twice
///
pub fn init_bus<C>(
    client: Arc<tokio::sync::Mutex<C>>,
    queue_size: usize,
    filter: LevelFilter,
) -> EResult<()>
where
    C: ?Sized + AsyncClient + 'static,
{
    let (tx, rx) = async_channel::bounded(queue_size);
    LOG_TX
        .set(tx)
        .map_err(|_| Error::failed("Unable to set LOG_TX"))?;
    tokio::spawn(async move {
        handle_logs(client, rx).await;
    });
    log::set_logger(&BUS_LOGGER)
        .map(|()| log::set_max_level(filter))
        .map_err(Error::failed)?;
    Ok(())
}