krossbar_log_lib/
logger.rs

1use std::{
2    io::{self, Write},
3    path::PathBuf,
4    sync::{
5        atomic::{AtomicUsize, Ordering},
6        Arc,
7    },
8    time::{Duration, SystemTime},
9};
10
11use chrono::Local;
12use colored::Colorize;
13use futures::{select, FutureExt};
14use log::{warn, Level, LevelFilter, Log, Record};
15use tokio::{
16    net::UnixStream,
17    runtime::Handle,
18    sync::mpsc::{channel, Receiver, Sender},
19};
20
21use krossbar_log_common::{log_message::LogMessage, logger_interface::REGISTER_METHOD_NAME};
22use krossbar_rpc::{Error, Result, RpcData};
23
24use crate::rpc::Rpc;
25
26/// How often the library tries to reconnect to a logger
27const RECONNECT_PERIOD: Duration = Duration::from_millis(1000);
28/// How many message to store in a buffer
29const LOG_BUFFER_SIZE: usize = 100;
30
31/// Logger handle to use for running the logger
32pub struct Logger {
33    /// Client service name
34    service_name: String,
35    /// Logger RPC handle
36    rpc: Option<Rpc>,
37    /// Last sussessfull logger connection
38    last_connect_ts_ms: SystemTime,
39    /// Logger socket path
40    logger_socket_path: Option<PathBuf>,
41    /// Receiving part of log messages channel
42    log_receiver: Receiver<LogMessage>,
43    /// Logging level
44    _level: Arc<AtomicUsize>,
45}
46
47/// Global [Log] handle
48struct LogHandle {
49    /// If log to stdout
50    log_to_stdout: bool,
51    /// If send messages to the logger
52    log_to_rpc: bool,
53    /// Logging level
54    level: Arc<AtomicUsize>,
55    /// Sending part of the log messages channel
56    log_sender: Sender<LogMessage>,
57}
58
59impl Logger {
60    /// Create Logging handle, which can be used to run logger message sending.
61    /// **service_name** is a client service name. It must be uniques across the system.
62    /// **log_to_stdout** sets if logger should log to stdout. If set, library
63    /// logs to stdout even if it then sends messages to the logger.
64    /// **logger_socket_path** sets logger path. If is some, logging lib tries to connect
65    /// to the logger at the provided path.
66    pub async fn new(
67        service_name: &str,
68        level: LevelFilter,
69        log_to_stdout: bool,
70        logger_socket_path: Option<PathBuf>,
71    ) -> Result<Logger> {
72        let log_to_rpc = logger_socket_path.is_some();
73
74        let rpc = if logger_socket_path.is_none() {
75            None
76        } else {
77            Some(Self::connect(&service_name, logger_socket_path.clone().unwrap()).await?)
78        };
79
80        let (log_sender, log_receiver) = channel(LOG_BUFFER_SIZE);
81        let arc_level = Arc::new(AtomicUsize::new(level as usize));
82
83        let this = Self {
84            service_name: service_name.into(),
85            _level: arc_level.clone(),
86            rpc,
87            last_connect_ts_ms: SystemTime::now(),
88            logger_socket_path: logger_socket_path,
89            log_receiver,
90        };
91
92        let log_handle = Box::new(LogHandle::new(
93            log_to_stdout,
94            log_to_rpc,
95            arc_level,
96            log_sender,
97        ));
98
99        log::set_boxed_logger(log_handle)
100            .map(|()| log::set_max_level(level))
101            .unwrap();
102
103        Ok(this)
104    }
105
106    async fn connect(service_name: &str, socket_path: PathBuf) -> Result<Rpc> {
107        let socket = UnixStream::connect(socket_path)
108            .await
109            .map_err(|_| Error::PeerDisconnected)?;
110
111        let mut rpc = Rpc::new(socket);
112        let call = rpc
113            .call(REGISTER_METHOD_NAME, &service_name.to_owned())
114            .await?;
115
116        match call.data {
117            RpcData::Response(res) => {
118                res?;
119            }
120            m => {
121                return Err(Error::InternalError(format!(
122                    "Invalid response on connect from logger: {m:?}"
123                )));
124            }
125        }
126
127        Ok(rpc)
128    }
129
130    /// Run logger message sending. Can be ommited if set to log only to stdout.
131    pub async fn run(mut self) {
132        loop {
133            select! {
134                message = self.log_receiver.recv().fuse() => {
135                    if let Some(message) = message {
136                        self.send_rpc_message(&message).await
137                    } else {
138                        eprintln!("Log handle closed");
139                        break;
140                    }
141                }
142                incoming = self.rpc.as_mut().unwrap().read_message().fuse() => {
143                    eprintln!("Incoming command: {incoming:?}");
144
145                    if let Err(e) = incoming {
146                        warn!("No logger connection logger: {e:?}");
147
148                        tokio::time::sleep(RECONNECT_PERIOD).await;
149                    }
150                }
151            };
152        }
153    }
154
155    fn log_to_stdout(message: &LogMessage) {
156        let colored_level = match message.level {
157            Level::Error => "ERROR".bright_red(),
158            Level::Warn => "WARNING".bright_yellow(),
159            Level::Info => "INFO".bright_green(),
160            Level::Debug => "DEBUG".bright_blue(),
161            Level::Trace => "TRACE".bright_white(),
162        };
163
164        println!(
165            "{}: {} > {}",
166            colored_level,
167            message.target.bright_white(),
168            message.message
169        );
170    }
171
172    async fn send_rpc_message(&mut self, log_message: &LogMessage) {
173        let internal_log_message = |message: String| -> LogMessage {
174            LogMessage {
175                timestamp: Local::now(),
176                level: Level::Info,
177                target: "logger".to_owned(),
178                message: message,
179            }
180        };
181
182        let rpc = self.rpc.as_mut().unwrap();
183
184        // Failed to send message to logger. Check if we already want to reconnect
185        if rpc.send_log(&log_message).await.is_err() {
186            // We want to reconnect
187            if (SystemTime::now() - RECONNECT_PERIOD) > self.last_connect_ts_ms {
188                Self::log_to_stdout(&internal_log_message(
189                    "Logger is down. Trying to reconnect".into(),
190                ));
191
192                // Update last reconnect time, so we don't retry too often
193                self.last_connect_ts_ms = SystemTime::now();
194
195                // Succesfully reconnected
196                if let Ok(new_rpc) =
197                    Self::connect(&self.service_name, self.logger_socket_path.clone().unwrap())
198                        .await
199                {
200                    Self::log_to_stdout(&internal_log_message(
201                        "Succesfully reconnected to a loger. Sending source message".into(),
202                    ));
203
204                    rpc.replace_stream(new_rpc);
205
206                    let _ = rpc.send_log(&log_message).await;
207                // Failed to reconnect
208                } else {
209                    Self::log_to_stdout(&internal_log_message(
210                        "Failed to reconnect to a logger".into(),
211                    ));
212
213                    Self::log_to_stdout(&log_message)
214                }
215            // It's not time to reconnect. Log into stdout
216            } else {
217                Self::log_to_stdout(&log_message)
218            }
219        }
220    }
221}
222
223impl LogHandle {
224    pub fn new(
225        log_to_stdout: bool,
226        log_to_rpc: bool,
227        level: Arc<AtomicUsize>,
228        log_sender: Sender<LogMessage>,
229    ) -> Self {
230        Self {
231            log_to_stdout,
232            level,
233            log_sender,
234            log_to_rpc,
235        }
236    }
237}
238
239impl Log for LogHandle {
240    fn enabled(&self, metadata: &log::Metadata) -> bool {
241        metadata.level() as usize <= self.level.load(Ordering::Relaxed)
242    }
243
244    fn log(&self, record: &Record) {
245        if self.enabled(record.metadata()) {
246            let log_message = LogMessage {
247                timestamp: Local::now(),
248                level: record.level(),
249                target: record.metadata().target().to_owned(),
250                message: format!("{}", record.args()),
251            };
252
253            if self.log_to_stdout {
254                Logger::log_to_stdout(&log_message)
255            }
256
257            if self.log_to_rpc {
258                // If we're inside Tokio runtime, we spawn a task. Otherwise we'll block to send
259                if let Ok(handle) = Handle::try_current() {
260                    let sender = self.log_sender.clone();
261
262                    handle.spawn(async move {
263                        if sender.send(log_message).await.is_err() {
264                            eprintln!("Failed to send log message into channel");
265                        }
266                    });
267                } else {
268                    if self.log_sender.blocking_send(log_message).is_err() {
269                        eprintln!("Failed to send log message into channel");
270                    }
271                }
272            }
273        }
274    }
275
276    fn flush(&self) {
277        if self.log_to_stdout {
278            let _ = io::stdout().flush();
279        }
280    }
281}