1use crate::events::{LOG_CALL_TRACE_TOPIC, LOG_INPUT_TOPIC};
2use crate::payload::pack;
3use crate::{EResult, Error};
4use busrt::client::AsyncClient;
5use busrt::QoS;
6use lazy_static::lazy_static;
7use log::{Level, LevelFilter, Log};
8use once_cell::sync::OnceCell;
9use serde::Serialize;
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::Duration;
13use std::time::Instant;
14use uuid::Uuid;
15
16const MSG_MAX_REPEAT_DELAY: Duration = Duration::from_millis(100);
17
18tokio::task_local! {
19 pub static CALL_TRACE_ID: Option<Uuid>;
20}
21
22#[derive(Serialize)]
23pub struct TraceMessage {
24 l: u8,
25 msg: Arc<String>,
26}
27
28lazy_static! {
29 static ref LOG_TOPICS: HashMap<Level, String> = {
30 let mut topics = HashMap::new();
31 topics.insert(Level::Trace, format!("{}{}", LOG_INPUT_TOPIC, "trace"));
32 topics.insert(Level::Debug, format!("{}{}", LOG_INPUT_TOPIC, "debug"));
33 topics.insert(Level::Info, format!("{}{}", LOG_INPUT_TOPIC, "info"));
34 topics.insert(Level::Warn, format!("{}{}", LOG_INPUT_TOPIC, "warn"));
35 topics.insert(Level::Error, format!("{}{}", LOG_INPUT_TOPIC, "error"));
36 topics
37 };
38 static ref LOG_TX: OnceCell<async_channel::Sender<(log::Level, Arc<String>)>> = <_>::default();
39 static ref TRACE_TX: OnceCell<async_channel::Sender<(TraceMessage, Uuid)>> = <_>::default();
40}
41
42static BUS_LOGGER: BusLogger = BusLogger {
43 log_filter: OnceCell::new(),
44 prev_message: parking_lot::Mutex::new(None),
45};
46
47struct LogMessage {
48 level: log::Level,
49 message: Arc<String>,
50 t: Instant,
51}
52
53struct BusLogger {
54 log_filter: OnceCell<LevelFilter>,
55 prev_message: parking_lot::Mutex<Option<LogMessage>>,
56}
57
58impl Log for BusLogger {
59 #[inline]
60 fn enabled(&self, metadata: &log::Metadata) -> bool {
61 !metadata.target().starts_with("busrt::") && !metadata.target().starts_with("mio::")
62 }
63 #[inline]
64 fn log(&self, record: &log::Record) {
65 if self.enabled(record.metadata()) {
66 let mut message: Option<Arc<String>> = None;
67 macro_rules! format_msg {
68 () => {{
69 if message.is_none() {
70 message.replace(Arc::new(record.args().to_string()));
71 }
72 message.as_ref().unwrap().clone()
73 }};
74 }
75 let trid: Option<Uuid> = CALL_TRACE_ID.try_with(Clone::clone).unwrap_or_default();
76 if let Some(trace_id) = trid {
77 if let Some(tx) = TRACE_TX.get() {
78 let _r = tx.try_send((
79 TraceMessage {
80 l: crate::log_level_code(record.level()),
81 msg: format_msg!(),
82 },
83 trace_id,
84 ));
85 }
86 }
87 if let Some(tx) = LOG_TX.get() {
88 let level = record.level();
89 if level <= *self.log_filter.get().unwrap() {
90 let msg: Arc<String> = format_msg!();
91 {
92 let mut prev = self.prev_message.lock();
93 if let Some(p) = prev.as_mut() {
95 if p.level == level
96 && p.message == msg
97 && p.t.elapsed() < MSG_MAX_REPEAT_DELAY
98 {
99 return;
100 }
101 }
102 prev.replace(LogMessage {
103 level,
104 message: msg.clone(),
105 t: Instant::now(),
106 });
107 }
108 let _r = tx.try_send((level, msg));
109 }
110 }
111 }
112 }
113 #[inline]
114 fn flush(&self) {}
115}
116
117async fn handle_logs<C>(
118 client: Arc<tokio::sync::Mutex<C>>,
119 rx: async_channel::Receiver<(Level, Arc<String>)>,
120) where
121 C: ?Sized + AsyncClient,
122{
123 while let Ok((level, message)) = rx.recv().await {
124 if let Err(e) = client
125 .lock()
126 .await
127 .publish(
128 LOG_TOPICS.get(&level).unwrap(),
129 message.as_bytes().into(),
130 QoS::No,
131 )
132 .await
133 {
134 eprintln!("{}", e);
135 }
136 }
137}
138
139async fn handle_traces<C>(
140 client: Arc<tokio::sync::Mutex<C>>,
141 rx: async_channel::Receiver<(TraceMessage, Uuid)>,
142) where
143 C: ?Sized + AsyncClient,
144{
145 while let Ok((trace_message, trace_id)) = rx.recv().await {
146 let trace_topic = format!("{}{}", LOG_CALL_TRACE_TOPIC, trace_id);
147 match pack(&trace_message) {
148 Ok(payload) => {
149 if let Err(e) = client
150 .lock()
151 .await
152 .publish(&trace_topic, payload.into(), QoS::No)
153 .await
154 {
155 eprintln!("{}", e);
156 }
157 }
158 Err(e) => eprintln!("{}", e),
159 }
160 }
161}
162
163pub fn init_bus<C>(
166 client: Arc<tokio::sync::Mutex<C>>,
167 queue_size: usize,
168 filter: LevelFilter,
169 call_tracing: bool,
170) -> EResult<()>
171where
172 C: ?Sized + AsyncClient + 'static,
173{
174 let (tx, rx) = async_channel::bounded(queue_size);
175 LOG_TX
176 .set(tx)
177 .map_err(|_| Error::failed("Unable to set LOG_TX"))?;
178 let cl = client.clone();
179 tokio::spawn(async move {
180 handle_logs(cl, rx).await;
181 });
182 if call_tracing {
183 let (tx, rx) = async_channel::bounded(queue_size);
184 TRACE_TX
185 .set(tx)
186 .map_err(|_| Error::failed("Unable to set TRACE_TX"))?;
187 tokio::spawn(async move {
188 handle_traces(client, rx).await;
189 });
190 }
191 BUS_LOGGER
192 .log_filter
193 .set(filter)
194 .map_err(|_| Error::failed("Unable to set BUS_LOGGER filter"))?;
195 log::set_logger(&BUS_LOGGER)
196 .map(|()| {
197 log::set_max_level(if call_tracing {
198 LevelFilter::Trace
199 } else {
200 filter
201 });
202 })
203 .map_err(Error::failed)?;
204 Ok(())
205}