eva_common/
logger.rs

1use crate::events::{LOG_CALL_TRACE_TOPIC, LOG_INPUT_TOPIC};
2use crate::payload::pack;
3use crate::{EResult, Error};
4use busrt::client::AsyncClient;
5use busrt::QoS;
6use lazy_static::lazy_static;
7use log::{Level, LevelFilter, Log};
8use once_cell::sync::OnceCell;
9use serde::Serialize;
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::Duration;
13use std::time::Instant;
14use uuid::Uuid;
15
16const MSG_MAX_REPEAT_DELAY: Duration = Duration::from_millis(100);
17
18tokio::task_local! {
19    pub static CALL_TRACE_ID: Option<Uuid>;
20}
21
22#[derive(Serialize)]
23pub struct TraceMessage {
24    l: u8,
25    msg: Arc<String>,
26}
27
28lazy_static! {
29    static ref LOG_TOPICS: HashMap<Level, String> = {
30        let mut topics = HashMap::new();
31        topics.insert(Level::Trace, format!("{}{}", LOG_INPUT_TOPIC, "trace"));
32        topics.insert(Level::Debug, format!("{}{}", LOG_INPUT_TOPIC, "debug"));
33        topics.insert(Level::Info, format!("{}{}", LOG_INPUT_TOPIC, "info"));
34        topics.insert(Level::Warn, format!("{}{}", LOG_INPUT_TOPIC, "warn"));
35        topics.insert(Level::Error, format!("{}{}", LOG_INPUT_TOPIC, "error"));
36        topics
37    };
38    static ref LOG_TX: OnceCell<async_channel::Sender<(log::Level, Arc<String>)>> = <_>::default();
39    static ref TRACE_TX: OnceCell<async_channel::Sender<(TraceMessage, Uuid)>> = <_>::default();
40}
41
42static BUS_LOGGER: BusLogger = BusLogger {
43    log_filter: OnceCell::new(),
44    prev_message: parking_lot::Mutex::new(None),
45};
46
47struct LogMessage {
48    level: log::Level,
49    message: Arc<String>,
50    t: Instant,
51}
52
53struct BusLogger {
54    log_filter: OnceCell<LevelFilter>,
55    prev_message: parking_lot::Mutex<Option<LogMessage>>,
56}
57
58impl Log for BusLogger {
59    #[inline]
60    fn enabled(&self, metadata: &log::Metadata) -> bool {
61        !metadata.target().starts_with("busrt::") && !metadata.target().starts_with("mio::")
62    }
63    #[inline]
64    fn log(&self, record: &log::Record) {
65        if self.enabled(record.metadata()) {
66            let mut message: Option<Arc<String>> = None;
67            macro_rules! format_msg {
68                () => {{
69                    if message.is_none() {
70                        message.replace(Arc::new(record.args().to_string()));
71                    }
72                    message.as_ref().unwrap().clone()
73                }};
74            }
75            let trid: Option<Uuid> = CALL_TRACE_ID.try_with(Clone::clone).unwrap_or_default();
76            if let Some(trace_id) = trid {
77                if let Some(tx) = TRACE_TX.get() {
78                    let _r = tx.try_send((
79                        TraceMessage {
80                            l: crate::log_level_code(record.level()),
81                            msg: format_msg!(),
82                        },
83                        trace_id,
84                    ));
85                }
86            }
87            if let Some(tx) = LOG_TX.get() {
88                let level = record.level();
89                if level <= *self.log_filter.get().unwrap() {
90                    let msg: Arc<String> = format_msg!();
91                    {
92                        let mut prev = self.prev_message.lock();
93                        // ignore messages wich repeat too fast
94                        if let Some(p) = prev.as_mut() {
95                            if p.level == level
96                                && p.message == msg
97                                && p.t.elapsed() < MSG_MAX_REPEAT_DELAY
98                            {
99                                return;
100                            }
101                        }
102                        prev.replace(LogMessage {
103                            level,
104                            message: msg.clone(),
105                            t: Instant::now(),
106                        });
107                    }
108                    let _r = tx.try_send((level, msg));
109                }
110            }
111        }
112    }
113    #[inline]
114    fn flush(&self) {}
115}
116
117async fn handle_logs<C>(
118    client: Arc<tokio::sync::Mutex<C>>,
119    rx: async_channel::Receiver<(Level, Arc<String>)>,
120) where
121    C: ?Sized + AsyncClient,
122{
123    while let Ok((level, message)) = rx.recv().await {
124        if let Err(e) = client
125            .lock()
126            .await
127            .publish(
128                LOG_TOPICS.get(&level).unwrap(),
129                message.as_bytes().into(),
130                QoS::No,
131            )
132            .await
133        {
134            eprintln!("{}", e);
135        }
136    }
137}
138
139async fn handle_traces<C>(
140    client: Arc<tokio::sync::Mutex<C>>,
141    rx: async_channel::Receiver<(TraceMessage, Uuid)>,
142) where
143    C: ?Sized + AsyncClient,
144{
145    while let Ok((trace_message, trace_id)) = rx.recv().await {
146        let trace_topic = format!("{}{}", LOG_CALL_TRACE_TOPIC, trace_id);
147        match pack(&trace_message) {
148            Ok(payload) => {
149                if let Err(e) = client
150                    .lock()
151                    .await
152                    .publish(&trace_topic, payload.into(), QoS::No)
153                    .await
154                {
155                    eprintln!("{}", e);
156                }
157            }
158            Err(e) => eprintln!("{}", e),
159        }
160    }
161}
162
163/// Must not be called twice
164///
165pub fn init_bus<C>(
166    client: Arc<tokio::sync::Mutex<C>>,
167    queue_size: usize,
168    filter: LevelFilter,
169    call_tracing: bool,
170) -> EResult<()>
171where
172    C: ?Sized + AsyncClient + 'static,
173{
174    let (tx, rx) = async_channel::bounded(queue_size);
175    LOG_TX
176        .set(tx)
177        .map_err(|_| Error::failed("Unable to set LOG_TX"))?;
178    let cl = client.clone();
179    tokio::spawn(async move {
180        handle_logs(cl, rx).await;
181    });
182    if call_tracing {
183        let (tx, rx) = async_channel::bounded(queue_size);
184        TRACE_TX
185            .set(tx)
186            .map_err(|_| Error::failed("Unable to set TRACE_TX"))?;
187        tokio::spawn(async move {
188            handle_traces(client, rx).await;
189        });
190    }
191    BUS_LOGGER
192        .log_filter
193        .set(filter)
194        .map_err(|_| Error::failed("Unable to set BUS_LOGGER filter"))?;
195    log::set_logger(&BUS_LOGGER)
196        .map(|()| {
197            log::set_max_level(if call_tracing {
198                LevelFilter::Trace
199            } else {
200                filter
201            });
202        })
203        .map_err(Error::failed)?;
204    Ok(())
205}