1use crate::events::{LOG_CALL_TRACE_TOPIC, LOG_INPUT_TOPIC};
2use crate::payload::pack;
3use crate::{EResult, Error};
4use busrt::QoS;
5use busrt::client::AsyncClient;
6use log::{Level, LevelFilter, Log};
7use serde::Serialize;
8use std::collections::HashMap;
9use std::sync::{Arc, LazyLock, OnceLock};
10use std::time::Duration;
11use std::time::Instant;
12use uuid::Uuid;
13
14const MSG_MAX_REPEAT_DELAY: Duration = Duration::from_millis(100);
15
16tokio::task_local! {
17 pub static CALL_TRACE_ID: Option<Uuid>;
18}
19
20#[derive(Serialize)]
21pub struct TraceMessage {
22 l: u8,
23 msg: Arc<String>,
24}
25
26static LOG_TOPICS: LazyLock<HashMap<Level, String>> = LazyLock::new(|| {
27 let mut topics = HashMap::new();
28 topics.insert(Level::Trace, format!("{}{}", LOG_INPUT_TOPIC, "trace"));
29 topics.insert(Level::Debug, format!("{}{}", LOG_INPUT_TOPIC, "debug"));
30 topics.insert(Level::Info, format!("{}{}", LOG_INPUT_TOPIC, "info"));
31 topics.insert(Level::Warn, format!("{}{}", LOG_INPUT_TOPIC, "warn"));
32 topics.insert(Level::Error, format!("{}{}", LOG_INPUT_TOPIC, "error"));
33 topics
34});
35static LOG_TX: OnceLock<async_channel::Sender<(log::Level, Arc<String>)>> = OnceLock::new();
36static TRACE_TX: OnceLock<async_channel::Sender<(TraceMessage, Uuid)>> = OnceLock::new();
37
38static BUS_LOGGER: BusLogger = BusLogger {
39 log_filter: OnceLock::new(),
40 prev_message: parking_lot::Mutex::new(None),
41};
42
43struct LogMessage {
44 level: log::Level,
45 message: Arc<String>,
46 t: Instant,
47}
48
49struct BusLogger {
50 log_filter: OnceLock<LevelFilter>,
51 prev_message: parking_lot::Mutex<Option<LogMessage>>,
52}
53
54impl Log for BusLogger {
55 #[inline]
56 fn enabled(&self, metadata: &log::Metadata) -> bool {
57 !metadata.target().starts_with("busrt::") && !metadata.target().starts_with("mio::")
58 }
59 #[inline]
60 fn log(&self, record: &log::Record) {
61 if self.enabled(record.metadata()) {
62 let mut message: Option<Arc<String>> = None;
63 macro_rules! format_msg {
64 () => {{
65 if message.is_none() {
66 message.replace(Arc::new(record.args().to_string()));
67 }
68 message.as_ref().unwrap().clone()
69 }};
70 }
71 let trid: Option<Uuid> = CALL_TRACE_ID.try_with(Clone::clone).unwrap_or_default();
72 if let Some(trace_id) = trid
73 && let Some(tx) = TRACE_TX.get()
74 {
75 let _r = tx.try_send((
76 TraceMessage {
77 l: crate::log_level_code(record.level()),
78 msg: format_msg!(),
79 },
80 trace_id,
81 ));
82 }
83 if let Some(tx) = LOG_TX.get() {
84 let level = record.level();
85 if level <= *self.log_filter.get().unwrap() {
86 let msg: Arc<String> = format_msg!();
87 {
88 let mut prev = self.prev_message.lock();
89 if let Some(p) = prev.as_mut()
91 && p.level == level
92 && p.message == msg
93 && p.t.elapsed() < MSG_MAX_REPEAT_DELAY
94 {
95 return;
96 }
97 prev.replace(LogMessage {
98 level,
99 message: msg.clone(),
100 t: Instant::now(),
101 });
102 }
103 let _r = tx.try_send((level, msg));
104 }
105 }
106 }
107 }
108 #[inline]
109 fn flush(&self) {}
110}
111
112async fn handle_logs<C>(
113 client: Arc<tokio::sync::Mutex<C>>,
114 rx: async_channel::Receiver<(Level, Arc<String>)>,
115) where
116 C: ?Sized + AsyncClient,
117{
118 while let Ok((level, message)) = rx.recv().await {
119 if let Err(e) = client
120 .lock()
121 .await
122 .publish(
123 LOG_TOPICS.get(&level).unwrap(),
124 message.as_bytes().into(),
125 QoS::No,
126 )
127 .await
128 {
129 eprintln!("{}", e);
130 }
131 }
132}
133
134async fn handle_traces<C>(
135 client: Arc<tokio::sync::Mutex<C>>,
136 rx: async_channel::Receiver<(TraceMessage, Uuid)>,
137) where
138 C: ?Sized + AsyncClient,
139{
140 while let Ok((trace_message, trace_id)) = rx.recv().await {
141 let trace_topic = format!("{}{}", LOG_CALL_TRACE_TOPIC, trace_id);
142 match pack(&trace_message) {
143 Ok(payload) => {
144 if let Err(e) = client
145 .lock()
146 .await
147 .publish(&trace_topic, payload.into(), QoS::No)
148 .await
149 {
150 eprintln!("{}", e);
151 }
152 }
153 Err(e) => eprintln!("{}", e),
154 }
155 }
156}
157
158pub fn init_bus<C>(
161 client: Arc<tokio::sync::Mutex<C>>,
162 queue_size: usize,
163 filter: LevelFilter,
164 call_tracing: bool,
165) -> EResult<()>
166where
167 C: ?Sized + AsyncClient + 'static,
168{
169 let (tx, rx) = async_channel::bounded(queue_size);
170 LOG_TX
171 .set(tx)
172 .map_err(|_| Error::failed("Unable to set LOG_TX"))?;
173 let cl = client.clone();
174 tokio::spawn(async move {
175 handle_logs(cl, rx).await;
176 });
177 if call_tracing {
178 let (tx, rx) = async_channel::bounded(queue_size);
179 TRACE_TX
180 .set(tx)
181 .map_err(|_| Error::failed("Unable to set TRACE_TX"))?;
182 tokio::spawn(async move {
183 handle_traces(client, rx).await;
184 });
185 }
186 BUS_LOGGER
187 .log_filter
188 .set(filter)
189 .map_err(|_| Error::failed("Unable to set BUS_LOGGER filter"))?;
190 log::set_logger(&BUS_LOGGER)
191 .map(|()| {
192 log::set_max_level(if call_tracing {
193 LevelFilter::Trace
194 } else {
195 filter
196 });
197 })
198 .map_err(Error::failed)?;
199 Ok(())
200}