krossbar_log_lib/
logger.rs1use std::{
2 io::{self, Write},
3 path::PathBuf,
4 sync::{
5 atomic::{AtomicUsize, Ordering},
6 Arc,
7 },
8 time::{Duration, SystemTime},
9};
10
11use chrono::Local;
12use colored::Colorize;
13use futures::{select, FutureExt};
14use log::{warn, Level, LevelFilter, Log, Record};
15use tokio::{
16 net::UnixStream,
17 runtime::Handle,
18 sync::mpsc::{channel, Receiver, Sender},
19};
20
21use krossbar_log_common::{log_message::LogMessage, logger_interface::REGISTER_METHOD_NAME};
22use krossbar_rpc::{Error, Result, RpcData};
23
24use crate::rpc::Rpc;
25
26const RECONNECT_PERIOD: Duration = Duration::from_millis(1000);
28const LOG_BUFFER_SIZE: usize = 100;
30
31pub struct Logger {
33 service_name: String,
35 rpc: Option<Rpc>,
37 last_connect_ts_ms: SystemTime,
39 logger_socket_path: Option<PathBuf>,
41 log_receiver: Receiver<LogMessage>,
43 _level: Arc<AtomicUsize>,
45}
46
47struct LogHandle {
49 log_to_stdout: bool,
51 log_to_rpc: bool,
53 level: Arc<AtomicUsize>,
55 log_sender: Sender<LogMessage>,
57}
58
59impl Logger {
60 pub async fn new(
67 service_name: &str,
68 level: LevelFilter,
69 log_to_stdout: bool,
70 logger_socket_path: Option<PathBuf>,
71 ) -> Result<Logger> {
72 let log_to_rpc = logger_socket_path.is_some();
73
74 let rpc = if logger_socket_path.is_none() {
75 None
76 } else {
77 Some(Self::connect(&service_name, logger_socket_path.clone().unwrap()).await?)
78 };
79
80 let (log_sender, log_receiver) = channel(LOG_BUFFER_SIZE);
81 let arc_level = Arc::new(AtomicUsize::new(level as usize));
82
83 let this = Self {
84 service_name: service_name.into(),
85 _level: arc_level.clone(),
86 rpc,
87 last_connect_ts_ms: SystemTime::now(),
88 logger_socket_path: logger_socket_path,
89 log_receiver,
90 };
91
92 let log_handle = Box::new(LogHandle::new(
93 log_to_stdout,
94 log_to_rpc,
95 arc_level,
96 log_sender,
97 ));
98
99 log::set_boxed_logger(log_handle)
100 .map(|()| log::set_max_level(level))
101 .unwrap();
102
103 Ok(this)
104 }
105
106 async fn connect(service_name: &str, socket_path: PathBuf) -> Result<Rpc> {
107 let socket = UnixStream::connect(socket_path)
108 .await
109 .map_err(|_| Error::PeerDisconnected)?;
110
111 let mut rpc = Rpc::new(socket);
112 let call = rpc
113 .call(REGISTER_METHOD_NAME, &service_name.to_owned())
114 .await?;
115
116 match call.data {
117 RpcData::Response(res) => {
118 res?;
119 }
120 m => {
121 return Err(Error::InternalError(format!(
122 "Invalid response on connect from logger: {m:?}"
123 )));
124 }
125 }
126
127 Ok(rpc)
128 }
129
130 pub async fn run(mut self) {
132 loop {
133 select! {
134 message = self.log_receiver.recv().fuse() => {
135 if let Some(message) = message {
136 self.send_rpc_message(&message).await
137 } else {
138 eprintln!("Log handle closed");
139 break;
140 }
141 }
142 incoming = self.rpc.as_mut().unwrap().read_message().fuse() => {
143 eprintln!("Incoming command: {incoming:?}");
144
145 if let Err(e) = incoming {
146 warn!("No logger connection logger: {e:?}");
147
148 tokio::time::sleep(RECONNECT_PERIOD).await;
149 }
150 }
151 };
152 }
153 }
154
155 fn log_to_stdout(message: &LogMessage) {
156 let colored_level = match message.level {
157 Level::Error => "ERROR".bright_red(),
158 Level::Warn => "WARNING".bright_yellow(),
159 Level::Info => "INFO".bright_green(),
160 Level::Debug => "DEBUG".bright_blue(),
161 Level::Trace => "TRACE".bright_white(),
162 };
163
164 println!(
165 "{}: {} > {}",
166 colored_level,
167 message.target.bright_white(),
168 message.message
169 );
170 }
171
172 async fn send_rpc_message(&mut self, log_message: &LogMessage) {
173 let internal_log_message = |message: String| -> LogMessage {
174 LogMessage {
175 timestamp: Local::now(),
176 level: Level::Info,
177 target: "logger".to_owned(),
178 message: message,
179 }
180 };
181
182 let rpc = self.rpc.as_mut().unwrap();
183
184 if rpc.send_log(&log_message).await.is_err() {
186 if (SystemTime::now() - RECONNECT_PERIOD) > self.last_connect_ts_ms {
188 Self::log_to_stdout(&internal_log_message(
189 "Logger is down. Trying to reconnect".into(),
190 ));
191
192 self.last_connect_ts_ms = SystemTime::now();
194
195 if let Ok(new_rpc) =
197 Self::connect(&self.service_name, self.logger_socket_path.clone().unwrap())
198 .await
199 {
200 Self::log_to_stdout(&internal_log_message(
201 "Succesfully reconnected to a loger. Sending source message".into(),
202 ));
203
204 rpc.replace_stream(new_rpc);
205
206 let _ = rpc.send_log(&log_message).await;
207 } else {
209 Self::log_to_stdout(&internal_log_message(
210 "Failed to reconnect to a logger".into(),
211 ));
212
213 Self::log_to_stdout(&log_message)
214 }
215 } else {
217 Self::log_to_stdout(&log_message)
218 }
219 }
220 }
221}
222
223impl LogHandle {
224 pub fn new(
225 log_to_stdout: bool,
226 log_to_rpc: bool,
227 level: Arc<AtomicUsize>,
228 log_sender: Sender<LogMessage>,
229 ) -> Self {
230 Self {
231 log_to_stdout,
232 level,
233 log_sender,
234 log_to_rpc,
235 }
236 }
237}
238
239impl Log for LogHandle {
240 fn enabled(&self, metadata: &log::Metadata) -> bool {
241 metadata.level() as usize <= self.level.load(Ordering::Relaxed)
242 }
243
244 fn log(&self, record: &Record) {
245 if self.enabled(record.metadata()) {
246 let log_message = LogMessage {
247 timestamp: Local::now(),
248 level: record.level(),
249 target: record.metadata().target().to_owned(),
250 message: format!("{}", record.args()),
251 };
252
253 if self.log_to_stdout {
254 Logger::log_to_stdout(&log_message)
255 }
256
257 if self.log_to_rpc {
258 if let Ok(handle) = Handle::try_current() {
260 let sender = self.log_sender.clone();
261
262 handle.spawn(async move {
263 if sender.send(log_message).await.is_err() {
264 eprintln!("Failed to send log message into channel");
265 }
266 });
267 } else {
268 if self.log_sender.blocking_send(log_message).is_err() {
269 eprintln!("Failed to send log message into channel");
270 }
271 }
272 }
273 }
274 }
275
276 fn flush(&self) {
277 if self.log_to_stdout {
278 let _ = io::stdout().flush();
279 }
280 }
281}