eva_common/
logger.rs

1use crate::events::{LOG_CALL_TRACE_TOPIC, LOG_INPUT_TOPIC};
2use crate::payload::pack;
3use crate::{EResult, Error};
4use busrt::QoS;
5use busrt::client::AsyncClient;
6use log::{Level, LevelFilter, Log};
7use serde::Serialize;
8use std::collections::HashMap;
9use std::sync::{Arc, LazyLock, OnceLock};
10use std::time::Duration;
11use std::time::Instant;
12use uuid::Uuid;
13
14const MSG_MAX_REPEAT_DELAY: Duration = Duration::from_millis(100);
15
16tokio::task_local! {
17    pub static CALL_TRACE_ID: Option<Uuid>;
18}
19
20#[derive(Serialize)]
21pub struct TraceMessage {
22    l: u8,
23    msg: Arc<String>,
24}
25
26static LOG_TOPICS: LazyLock<HashMap<Level, String>> = LazyLock::new(|| {
27    let mut topics = HashMap::new();
28    topics.insert(Level::Trace, format!("{}{}", LOG_INPUT_TOPIC, "trace"));
29    topics.insert(Level::Debug, format!("{}{}", LOG_INPUT_TOPIC, "debug"));
30    topics.insert(Level::Info, format!("{}{}", LOG_INPUT_TOPIC, "info"));
31    topics.insert(Level::Warn, format!("{}{}", LOG_INPUT_TOPIC, "warn"));
32    topics.insert(Level::Error, format!("{}{}", LOG_INPUT_TOPIC, "error"));
33    topics
34});
35static LOG_TX: OnceLock<async_channel::Sender<(log::Level, Arc<String>)>> = OnceLock::new();
36static TRACE_TX: OnceLock<async_channel::Sender<(TraceMessage, Uuid)>> = OnceLock::new();
37
38static BUS_LOGGER: BusLogger = BusLogger {
39    log_filter: OnceLock::new(),
40    prev_message: parking_lot::Mutex::new(None),
41};
42
43struct LogMessage {
44    level: log::Level,
45    message: Arc<String>,
46    t: Instant,
47}
48
49struct BusLogger {
50    log_filter: OnceLock<LevelFilter>,
51    prev_message: parking_lot::Mutex<Option<LogMessage>>,
52}
53
54impl Log for BusLogger {
55    #[inline]
56    fn enabled(&self, metadata: &log::Metadata) -> bool {
57        !metadata.target().starts_with("busrt::") && !metadata.target().starts_with("mio::")
58    }
59    #[inline]
60    fn log(&self, record: &log::Record) {
61        if self.enabled(record.metadata()) {
62            let mut message: Option<Arc<String>> = None;
63            macro_rules! format_msg {
64                () => {{
65                    if message.is_none() {
66                        message.replace(Arc::new(record.args().to_string()));
67                    }
68                    message.as_ref().unwrap().clone()
69                }};
70            }
71            let trid: Option<Uuid> = CALL_TRACE_ID.try_with(Clone::clone).unwrap_or_default();
72            if let Some(trace_id) = trid
73                && let Some(tx) = TRACE_TX.get()
74            {
75                let _r = tx.try_send((
76                    TraceMessage {
77                        l: crate::log_level_code(record.level()),
78                        msg: format_msg!(),
79                    },
80                    trace_id,
81                ));
82            }
83            if let Some(tx) = LOG_TX.get() {
84                let level = record.level();
85                if level <= *self.log_filter.get().unwrap() {
86                    let msg: Arc<String> = format_msg!();
87                    {
88                        let mut prev = self.prev_message.lock();
89                        // ignore messages wich repeat too fast
90                        if let Some(p) = prev.as_mut()
91                            && p.level == level
92                            && p.message == msg
93                            && p.t.elapsed() < MSG_MAX_REPEAT_DELAY
94                        {
95                            return;
96                        }
97                        prev.replace(LogMessage {
98                            level,
99                            message: msg.clone(),
100                            t: Instant::now(),
101                        });
102                    }
103                    let _r = tx.try_send((level, msg));
104                }
105            }
106        }
107    }
108    #[inline]
109    fn flush(&self) {}
110}
111
112async fn handle_logs<C>(
113    client: Arc<tokio::sync::Mutex<C>>,
114    rx: async_channel::Receiver<(Level, Arc<String>)>,
115) where
116    C: ?Sized + AsyncClient,
117{
118    while let Ok((level, message)) = rx.recv().await {
119        if let Err(e) = client
120            .lock()
121            .await
122            .publish(
123                LOG_TOPICS.get(&level).unwrap(),
124                message.as_bytes().into(),
125                QoS::No,
126            )
127            .await
128        {
129            eprintln!("{}", e);
130        }
131    }
132}
133
134async fn handle_traces<C>(
135    client: Arc<tokio::sync::Mutex<C>>,
136    rx: async_channel::Receiver<(TraceMessage, Uuid)>,
137) where
138    C: ?Sized + AsyncClient,
139{
140    while let Ok((trace_message, trace_id)) = rx.recv().await {
141        let trace_topic = format!("{}{}", LOG_CALL_TRACE_TOPIC, trace_id);
142        match pack(&trace_message) {
143            Ok(payload) => {
144                if let Err(e) = client
145                    .lock()
146                    .await
147                    .publish(&trace_topic, payload.into(), QoS::No)
148                    .await
149                {
150                    eprintln!("{}", e);
151                }
152            }
153            Err(e) => eprintln!("{}", e),
154        }
155    }
156}
157
158/// Must not be called twice
159///
160pub fn init_bus<C>(
161    client: Arc<tokio::sync::Mutex<C>>,
162    queue_size: usize,
163    filter: LevelFilter,
164    call_tracing: bool,
165) -> EResult<()>
166where
167    C: ?Sized + AsyncClient + 'static,
168{
169    let (tx, rx) = async_channel::bounded(queue_size);
170    LOG_TX
171        .set(tx)
172        .map_err(|_| Error::failed("Unable to set LOG_TX"))?;
173    let cl = client.clone();
174    tokio::spawn(async move {
175        handle_logs(cl, rx).await;
176    });
177    if call_tracing {
178        let (tx, rx) = async_channel::bounded(queue_size);
179        TRACE_TX
180            .set(tx)
181            .map_err(|_| Error::failed("Unable to set TRACE_TX"))?;
182        tokio::spawn(async move {
183            handle_traces(client, rx).await;
184        });
185    }
186    BUS_LOGGER
187        .log_filter
188        .set(filter)
189        .map_err(|_| Error::failed("Unable to set BUS_LOGGER filter"))?;
190    log::set_logger(&BUS_LOGGER)
191        .map(|()| {
192            log::set_max_level(if call_tracing {
193                LevelFilter::Trace
194            } else {
195                filter
196            });
197        })
198        .map_err(Error::failed)?;
199    Ok(())
200}