aptos_logger_link/
aptos_logger.rs

1// Copyright (c) Aptos
2// SPDX-License-Identifier: Apache-2.0
3
4//! Implementation of writing logs to both local printers (e.g. stdout) and remote loggers
5//! (e.g. Logstash)
6
7use crate::sample::SampleRate;
8use crate::telemetry_log_writer::{TelemetryLog, TelemetryLogWriter};
9use crate::{
10    counters::{
11        PROCESSED_STRUCT_LOG_COUNT, SENT_STRUCT_LOG_BYTES, SENT_STRUCT_LOG_COUNT,
12        STRUCT_LOG_PARSE_ERROR_COUNT, STRUCT_LOG_QUEUE_ERROR_COUNT, STRUCT_LOG_SEND_ERROR_COUNT,
13    },
14    logger::Logger,
15    sample,
16    struct_log::TcpWriter,
17    Event, Filter, Key, Level, LevelFilter, Metadata,
18};
19use aptos_infallible::RwLock;
20use backtrace::Backtrace;
21use chrono::{SecondsFormat, Utc};
22use futures::channel;
23use once_cell::sync::Lazy;
24use serde::ser::SerializeStruct;
25use serde::{Serialize, Serializer};
26use std::fmt::Debug;
27use std::io::Stdout;
28use std::time::Duration;
29use std::{
30    collections::BTreeMap,
31    env, fmt,
32    io::Write,
33    str::FromStr,
34    sync::{self, Arc},
35    thread,
36};
37use strum_macros::EnumString;
38use tokio::time;
39
40const RUST_LOG: &str = "RUST_LOG";
41const RUST_LOG_REMOTE: &str = "RUST_LOG_REMOTE";
42pub const RUST_LOG_TELEMETRY: &str = "RUST_LOG_TELEMETRY";
43const RUST_LOG_FORMAT: &str = "RUST_LOG_FORMAT";
44/// Default size of log write channel, if the channel is full, logs will be dropped
45pub const CHANNEL_SIZE: usize = 10000;
46const NUM_SEND_RETRIES: u8 = 1;
47const FLUSH_TIMEOUT: Duration = Duration::from_secs(5);
48const FILTER_REFRESH_INTERVAL: Duration =
49    Duration::from_secs(5 /* minutes */ * 60 /* seconds */);
50
51#[derive(EnumString)]
52#[strum(serialize_all = "lowercase")]
53enum LogFormat {
54    Json,
55    Text,
56}
57
58/// A single log entry emitted by a logging macro with associated metadata
59#[derive(Debug)]
60pub struct LogEntry {
61    metadata: Metadata,
62    thread_name: Option<String>,
63    /// The program backtrace taken when the event occurred. Backtraces
64    /// are only supported for errors and must be configured.
65    backtrace: Option<String>,
66    hostname: Option<&'static str>,
67    namespace: Option<&'static str>,
68    timestamp: String,
69    data: BTreeMap<Key, serde_json::Value>,
70    message: Option<String>,
71}
72
73// implement custom serializer for LogEntry since we want to promote the `metadata.level` field into a top-level `level` field
74// and prefix the remaining metadata attributes as `source.<metadata_field>` which can't be expressed with serde macros alone.
75impl Serialize for LogEntry {
76    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
77    where
78        S: Serializer,
79    {
80        let mut state = serializer.serialize_struct("LogEntry", 9)?;
81        state.serialize_field("level", &self.metadata.level())?;
82        state.serialize_field("source", &self.metadata)?;
83        if let Some(thread_name) = &self.thread_name {
84            state.serialize_field("thread_name", thread_name)?;
85        }
86        if let Some(hostname) = &self.hostname {
87            state.serialize_field("hostname", hostname)?;
88        }
89        if let Some(namespace) = &self.namespace {
90            state.serialize_field("namespace", namespace)?;
91        }
92        state.serialize_field("timestamp", &self.timestamp)?;
93        if let Some(message) = &self.message {
94            state.serialize_field("message", message)?;
95        }
96        if !&self.data.is_empty() {
97            state.serialize_field("data", &self.data)?;
98        }
99        if let Some(backtrace) = &self.backtrace {
100            state.serialize_field("backtrace", backtrace)?;
101        }
102        state.end()
103    }
104}
105
106impl LogEntry {
107    fn new(event: &Event, thread_name: Option<&str>, enable_backtrace: bool) -> Self {
108        use crate::{Value, Visitor};
109
110        struct JsonVisitor<'a>(&'a mut BTreeMap<Key, serde_json::Value>);
111
112        impl<'a> Visitor for JsonVisitor<'a> {
113            fn visit_pair(&mut self, key: Key, value: Value<'_>) {
114                let v = match value {
115                    Value::Debug(d) => serde_json::Value::String(format!("{:?}", d)),
116                    Value::Display(d) => serde_json::Value::String(d.to_string()),
117                    Value::Serde(s) => match serde_json::to_value(s) {
118                        Ok(value) => value,
119                        Err(e) => {
120                            // Log and skip the value that can't be serialized
121                            eprintln!("error serializing structured log: {} for key {:?}", e, key);
122                            return;
123                        }
124                    },
125                };
126
127                self.0.insert(key, v);
128            }
129        }
130
131        let metadata = *event.metadata();
132        let thread_name = thread_name.map(ToOwned::to_owned);
133        let message = event.message().map(fmt::format);
134
135        static HOSTNAME: Lazy<Option<String>> = Lazy::new(|| {
136            hostname::get()
137                .ok()
138                .and_then(|name| name.into_string().ok())
139        });
140
141        static NAMESPACE: Lazy<Option<String>> =
142            Lazy::new(|| env::var("KUBERNETES_NAMESPACE").ok());
143
144        let hostname = HOSTNAME.as_deref();
145        let namespace = NAMESPACE.as_deref();
146
147        let backtrace = if enable_backtrace && matches!(metadata.level(), Level::Error) {
148            let mut backtrace = Backtrace::new();
149            let mut frames = backtrace.frames().to_vec();
150            if frames.len() > 3 {
151                frames.drain(0..3); // Remove the first 3 unnecessary frames to simplify backtrace
152            }
153            backtrace = frames.into();
154            Some(format!("{:?}", backtrace))
155        } else {
156            None
157        };
158
159        let mut data = BTreeMap::new();
160        for schema in event.keys_and_values() {
161            schema.visit(&mut JsonVisitor(&mut data));
162        }
163
164        Self {
165            metadata,
166            thread_name,
167            backtrace,
168            hostname,
169            namespace,
170            timestamp: Utc::now().to_rfc3339_opts(SecondsFormat::Micros, true),
171            data,
172            message,
173        }
174    }
175
176    pub fn metadata(&self) -> &Metadata {
177        &self.metadata
178    }
179
180    pub fn thread_name(&self) -> Option<&str> {
181        self.thread_name.as_deref()
182    }
183
184    pub fn backtrace(&self) -> Option<&str> {
185        self.backtrace.as_deref()
186    }
187
188    pub fn hostname(&self) -> Option<&str> {
189        self.hostname
190    }
191
192    pub fn namespace(&self) -> Option<&str> {
193        self.namespace
194    }
195
196    pub fn timestamp(&self) -> &str {
197        self.timestamp.as_str()
198    }
199
200    pub fn data(&self) -> &BTreeMap<Key, serde_json::Value> {
201        &self.data
202    }
203
204    pub fn message(&self) -> Option<&str> {
205        self.message.as_deref()
206    }
207}
208
209/// A builder for a `AptosData`, configures what, where, and how to write logs.
210pub struct AptosDataBuilder {
211    channel_size: usize,
212    console_port: Option<u16>,
213    enable_backtrace: bool,
214    level: Level,
215    remote_level: Level,
216    telemetry_level: Level,
217    address: Option<String>,
218    printer: Option<Box<dyn Writer>>,
219    remote_log_tx: Option<channel::mpsc::Sender<TelemetryLog>>,
220    is_async: bool,
221    enable_telemetry_flush: bool,
222    custom_format: Option<fn(&LogEntry) -> Result<String, fmt::Error>>,
223}
224
225impl AptosDataBuilder {
226    #[allow(clippy::new_without_default)]
227    pub fn new() -> Self {
228        Self {
229            channel_size: CHANNEL_SIZE,
230            console_port: Some(6669),
231            enable_backtrace: false,
232            level: Level::Info,
233            remote_level: Level::Info,
234            telemetry_level: Level::Warn,
235            address: None,
236            printer: Some(Box::new(StdoutWriter::new())),
237            remote_log_tx: None,
238            is_async: false,
239            enable_telemetry_flush: true,
240            custom_format: None,
241        }
242    }
243
244    pub fn address(&mut self, address: String) -> &mut Self {
245        self.address = Some(address);
246        self
247    }
248
249    pub fn enable_backtrace(&mut self) -> &mut Self {
250        self.enable_backtrace = true;
251        self
252    }
253
254    pub fn read_env(&mut self) -> &mut Self {
255        if let Ok(address) = env::var("STRUCT_LOG_TCP_ADDR") {
256            self.address(address);
257        }
258        self
259    }
260
261    pub fn level(&mut self, level: Level) -> &mut Self {
262        self.level = level;
263        self
264    }
265
266    pub fn remote_level(&mut self, level: Level) -> &mut Self {
267        self.remote_level = level;
268        self
269    }
270
271    pub fn telemetry_level(&mut self, level: Level) -> &mut Self {
272        self.telemetry_level = level;
273        self
274    }
275
276    pub fn channel_size(&mut self, channel_size: usize) -> &mut Self {
277        self.channel_size = channel_size;
278        self
279    }
280
281    pub fn printer(&mut self, printer: Box<dyn Writer + Send + Sync + 'static>) -> &mut Self {
282        self.printer = Some(printer);
283        self
284    }
285
286    pub fn console_port(&mut self, console_port: Option<u16>) -> &mut Self {
287        self.console_port = console_port;
288        self
289    }
290
291    pub fn remote_log_tx(
292        &mut self,
293        remote_log_tx: channel::mpsc::Sender<TelemetryLog>,
294    ) -> &mut Self {
295        self.remote_log_tx = Some(remote_log_tx);
296        self
297    }
298
299    pub fn is_async(&mut self, is_async: bool) -> &mut Self {
300        self.is_async = is_async;
301        self
302    }
303
304    pub fn enable_telemetry_flush(&mut self, enable_telemetry_flush: bool) -> &mut Self {
305        self.enable_telemetry_flush = enable_telemetry_flush;
306        self
307    }
308
309    pub fn custom_format(
310        &mut self,
311        format: fn(&LogEntry) -> Result<String, fmt::Error>,
312    ) -> &mut Self {
313        self.custom_format = Some(format);
314        self
315    }
316
317    pub fn init(&mut self) {
318        self.build();
319    }
320
321    fn build_filter(&self) -> FilterTuple {
322        let local_filter = {
323            let mut filter_builder = Filter::builder();
324
325            if env::var(RUST_LOG).is_ok() {
326                filter_builder.with_env(RUST_LOG);
327            } else {
328                filter_builder.filter_level(self.level.into());
329            }
330
331            filter_builder.build()
332        };
333        let remote_filter = {
334            let mut filter_builder = Filter::builder();
335
336            if self.is_async && self.address.is_some() {
337                if env::var(RUST_LOG_REMOTE).is_ok() {
338                    filter_builder.with_env(RUST_LOG_REMOTE);
339                } else if env::var(RUST_LOG).is_ok() {
340                    filter_builder.with_env(RUST_LOG);
341                } else {
342                    filter_builder.filter_level(self.remote_level.into());
343                }
344            } else {
345                filter_builder.filter_level(LevelFilter::Off);
346            }
347
348            filter_builder.build()
349        };
350        let telemetry_filter = {
351            let mut filter_builder = Filter::builder();
352
353            if self.is_async && self.remote_log_tx.is_some() {
354                if env::var(RUST_LOG_TELEMETRY).is_ok() {
355                    filter_builder.with_env(RUST_LOG_TELEMETRY);
356                } else {
357                    filter_builder.filter_level(self.telemetry_level.into());
358                }
359            } else {
360                filter_builder.filter_level(LevelFilter::Off);
361            }
362
363            filter_builder.build()
364        };
365
366        FilterTuple {
367            local_filter,
368            remote_filter,
369            telemetry_filter,
370        }
371    }
372
373    fn build_logger(&mut self) -> Arc<AptosData> {
374        let filter = self.build_filter();
375
376        if let Ok(log_format) = env::var(RUST_LOG_FORMAT) {
377            let log_format = LogFormat::from_str(&log_format).unwrap();
378            self.custom_format = match log_format {
379                LogFormat::Json => Some(json_format),
380                LogFormat::Text => Some(text_format),
381            }
382        }
383
384        if self.is_async {
385            let (sender, receiver) = sync::mpsc::sync_channel(self.channel_size);
386            let mut remote_tx = None;
387            if let Some(tx) = &self.remote_log_tx {
388                remote_tx = Some(tx.clone());
389            }
390
391            let logger = Arc::new(AptosData {
392                enable_backtrace: self.enable_backtrace,
393                sender: Some(sender),
394                printer: None,
395                filter: RwLock::new(filter),
396                enable_telemetry_flush: self.enable_telemetry_flush,
397                formatter: self.custom_format.take().unwrap_or(text_format),
398            });
399            let service = LoggerService {
400                receiver,
401                address: self.address.clone(),
402                printer: self.printer.take(),
403                facade: logger.clone(),
404                remote_tx,
405            };
406
407            thread::spawn(move || service.run());
408            logger
409        } else {
410            Arc::new(AptosData {
411                enable_backtrace: self.enable_backtrace,
412                sender: None,
413                printer: self.printer.take(),
414                filter: RwLock::new(filter),
415                enable_telemetry_flush: self.enable_telemetry_flush,
416                formatter: self.custom_format.take().unwrap_or(text_format),
417            })
418        }
419    }
420
421    pub fn build(&mut self) -> Arc<AptosData> {
422        let logger = self.build_logger();
423
424        let console_port = if cfg!(feature = "aptos-console") {
425            self.console_port
426        } else {
427            None
428        };
429
430        crate::logger::set_global_logger(logger.clone(), console_port);
431        logger
432    }
433}
434
435/// A combination of `Filter`s to control where logs are written
436pub struct FilterTuple {
437    /// The local printer `Filter` to control what is logged in text output
438    local_filter: Filter,
439    /// The remote logging `Filter` to control what is sent to external logging
440    remote_filter: Filter,
441    /// The logging `Filter` to control what is sent to telemetry service
442    telemetry_filter: Filter,
443}
444
445impl FilterTuple {
446    fn enabled(&self, metadata: &Metadata) -> bool {
447        self.local_filter.enabled(metadata)
448            || self.remote_filter.enabled(metadata)
449            || self.telemetry_filter.enabled(metadata)
450    }
451}
452
453pub struct AptosData {
454    enable_backtrace: bool,
455    sender: Option<sync::mpsc::SyncSender<LoggerServiceEvent>>,
456    printer: Option<Box<dyn Writer>>,
457    filter: RwLock<FilterTuple>,
458    enable_telemetry_flush: bool,
459    pub(crate) formatter: fn(&LogEntry) -> Result<String, fmt::Error>,
460}
461
462impl AptosData {
463    pub fn builder() -> AptosDataBuilder {
464        AptosDataBuilder::new()
465    }
466
467    #[allow(clippy::new_ret_no_self)]
468    pub fn new() -> AptosDataBuilder {
469        Self::builder()
470    }
471
472    pub fn init_for_testing() {
473        if env::var(RUST_LOG).is_err() {
474            return;
475        }
476
477        Self::builder()
478            .is_async(false)
479            .enable_backtrace()
480            .printer(Box::new(StdoutWriter::new()))
481            .build();
482    }
483
484    pub fn set_filter(&self, filter_tuple: FilterTuple) {
485        *self.filter.write() = filter_tuple;
486    }
487
488    pub fn set_local_filter(&self, filter: Filter) {
489        self.filter.write().local_filter = filter;
490    }
491
492    pub fn set_remote_filter(&self, filter: Filter) {
493        self.filter.write().remote_filter = filter;
494    }
495
496    pub fn set_telemetry_filter(&self, filter: Filter) {
497        self.filter.write().telemetry_filter = filter;
498    }
499
500    fn send_entry(&self, entry: LogEntry) {
501        if let Some(printer) = &self.printer {
502            let s = (self.formatter)(&entry).expect("Unable to format");
503            printer.write(s);
504        }
505
506        if let Some(sender) = &self.sender {
507            if sender
508                .try_send(LoggerServiceEvent::LogEntry(entry))
509                .is_err()
510            {
511                STRUCT_LOG_QUEUE_ERROR_COUNT.inc();
512            }
513        }
514    }
515}
516
517impl Logger for AptosData {
518    fn enabled(&self, metadata: &Metadata) -> bool {
519        self.filter.read().enabled(metadata)
520    }
521
522    fn record(&self, event: &Event) {
523        let entry = LogEntry::new(
524            event,
525            ::std::thread::current().name(),
526            self.enable_backtrace,
527        );
528
529        self.send_entry(entry)
530    }
531
532    fn flush(&self) {
533        if let Some(sender) = &self.sender {
534            let (oneshot_sender, oneshot_receiver) = sync::mpsc::sync_channel(1);
535            match sender.try_send(LoggerServiceEvent::Flush(oneshot_sender)) {
536                Ok(_) => {
537                    if let Err(err) = oneshot_receiver.recv_timeout(FLUSH_TIMEOUT) {
538                        eprintln!("[Logging] Unable to flush recv: {}", err);
539                    }
540                }
541                Err(err) => {
542                    eprintln!("[Logging] Unable to flush send: {}", err);
543                    std::thread::sleep(FLUSH_TIMEOUT);
544                }
545            }
546        }
547    }
548}
549
550enum LoggerServiceEvent {
551    LogEntry(LogEntry),
552    Flush(sync::mpsc::SyncSender<()>),
553}
554
555/// A service for running a log listener, that will continually export logs through a local printer
556/// or to a `AptosData` for external logging.
557struct LoggerService {
558    receiver: sync::mpsc::Receiver<LoggerServiceEvent>,
559    address: Option<String>,
560    printer: Option<Box<dyn Writer>>,
561    facade: Arc<AptosData>,
562    remote_tx: Option<channel::mpsc::Sender<TelemetryLog>>,
563}
564
565impl LoggerService {
566    pub fn run(mut self) {
567        let mut tcp_writer = self.address.take().map(TcpWriter::new);
568        let mut telemetry_writer = self.remote_tx.take().map(TelemetryLogWriter::new);
569
570        for event in &self.receiver {
571            match event {
572                LoggerServiceEvent::LogEntry(entry) => {
573                    PROCESSED_STRUCT_LOG_COUNT.inc();
574
575                    if let Some(printer) = &mut self.printer {
576                        if self
577                            .facade
578                            .filter
579                            .read()
580                            .local_filter
581                            .enabled(&entry.metadata)
582                        {
583                            let s = (self.facade.formatter)(&entry).expect("Unable to format");
584                            printer.write_buferred(s);
585                        }
586                    }
587
588                    if let Some(writer) = &mut tcp_writer {
589                        if self
590                            .facade
591                            .filter
592                            .read()
593                            .remote_filter
594                            .enabled(&entry.metadata)
595                        {
596                            Self::write_to_logstash(writer, &entry);
597                        }
598                    }
599
600                    if let Some(writer) = &mut telemetry_writer {
601                        if self
602                            .facade
603                            .filter
604                            .read()
605                            .telemetry_filter
606                            .enabled(&entry.metadata)
607                        {
608                            let s = (self.facade.formatter)(&entry).expect("Unable to format");
609                            let _ = writer.write(s);
610                        }
611                    }
612                }
613                LoggerServiceEvent::Flush(sender) => {
614                    // Flush is only done on TelemetryLogWriter
615                    if let Some(writer) = &mut telemetry_writer {
616                        if self.facade.enable_telemetry_flush {
617                            match writer.flush() {
618                                Ok(rx) => {
619                                    if let Err(err) = rx.recv_timeout(FLUSH_TIMEOUT) {
620                                        sample!(
621                                            SampleRate::Duration(Duration::from_secs(60)),
622                                            eprintln!("Timed out flushing telemetry: {}", err)
623                                        );
624                                    }
625                                }
626                                Err(err) => {
627                                    sample!(
628                                        SampleRate::Duration(Duration::from_secs(60)),
629                                        eprintln!("Failed to flush telemetry: {}", err)
630                                    );
631                                }
632                            }
633                        }
634                    }
635                    let _ = sender.send(());
636                }
637            }
638        }
639    }
640
641    /// Writes a log line into json_lines logstash format, which has a newline at the end
642    fn write_to_logstash(stream: &mut TcpWriter, entry: &LogEntry) {
643        let message = if let Ok(json) = json_format(entry) {
644            json
645        } else {
646            return;
647        };
648        let message = message + "\n";
649        let bytes = message.as_bytes();
650        let message_length = bytes.len();
651
652        // Attempt to write the log up to NUM_SEND_RETRIES + 1, and then drop it
653        // Each `write_all` call will attempt to open a connection if one isn't open
654        let mut result = stream.write_all(bytes);
655        for _ in 0..NUM_SEND_RETRIES {
656            if result.is_ok() {
657                break;
658            } else {
659                result = stream.write_all(bytes);
660            }
661        }
662
663        if let Err(e) = result {
664            STRUCT_LOG_SEND_ERROR_COUNT.inc();
665            eprintln!(
666                "[Logging] Error while sending data to logstash({}): {}",
667                stream.endpoint(),
668                e
669            );
670        } else {
671            SENT_STRUCT_LOG_COUNT.inc();
672            SENT_STRUCT_LOG_BYTES.inc_by(message_length as u64);
673        }
674    }
675}
676
677/// A trait encapsulating the operations required for writing logs.
678pub trait Writer: Send + Sync {
679    /// Write the log.
680    fn write(&self, log: String);
681
682    /// Write the log in an async task.
683    fn write_buferred(&mut self, log: String);
684}
685
686/// A struct for writing logs to stdout
687struct StdoutWriter {
688    buffer: std::io::BufWriter<Stdout>,
689}
690
691impl StdoutWriter {
692    pub fn new() -> Self {
693        let buffer = std::io::BufWriter::new(std::io::stdout());
694        Self { buffer }
695    }
696}
697impl Writer for StdoutWriter {
698    /// Write log to stdout
699    fn write(&self, log: String) {
700        println!("{}", log);
701    }
702    fn write_buferred(&mut self, log: String) {
703        self.buffer
704            .write_fmt(format_args!("{}\n", log))
705            .unwrap_or_default();
706    }
707}
708
709/// A struct for writing logs to a file
710pub struct FileWriter {
711    log_file: RwLock<std::fs::File>,
712}
713
714impl FileWriter {
715    pub fn new(log_file: std::path::PathBuf) -> Self {
716        let file = std::fs::OpenOptions::new()
717            .append(true)
718            .create(true)
719            .open(log_file)
720            .expect("Unable to open log file");
721        Self {
722            log_file: RwLock::new(file),
723        }
724    }
725}
726
727impl Writer for FileWriter {
728    /// Write to file
729    fn write(&self, log: String) {
730        if let Err(err) = writeln!(self.log_file.write(), "{}", log) {
731            eprintln!("Unable to write to log file: {}", err);
732        }
733    }
734    fn write_buferred(&mut self, log: String) {
735        self.write(log);
736    }
737}
738
739/// Converts a record into a string representation:
740/// UNIX_TIMESTAMP LOG_LEVEL [thread_name] FILE:LINE MESSAGE JSON_DATA
741/// Example:
742/// 2020-03-07 05:03:03 INFO [thread_name] common/aptos-logger/src/lib.rs:261 Hello { "world": true }
743fn text_format(entry: &LogEntry) -> Result<String, fmt::Error> {
744    use std::fmt::Write;
745
746    let mut w = String::new();
747    write!(w, "{}", entry.timestamp)?;
748
749    if let Some(thread_name) = &entry.thread_name {
750        write!(w, " [{}]", thread_name)?;
751    }
752
753    write!(
754        w,
755        " {} {}",
756        entry.metadata.level(),
757        entry.metadata.source_path()
758    )?;
759
760    if let Some(message) = &entry.message {
761        write!(w, " {}", message)?;
762    }
763
764    if !entry.data.is_empty() {
765        write!(w, " {}", serde_json::to_string(&entry.data).unwrap())?;
766    }
767
768    Ok(w)
769}
770
771// converts a record into json format
772fn json_format(entry: &LogEntry) -> Result<String, fmt::Error> {
773    match serde_json::to_string(&entry) {
774        Ok(s) => Ok(s),
775        Err(_) => {
776            // TODO: Improve the error handling here. Currently we're just increasing some misleadingly-named metric and dropping any context on why this could not be deserialized.
777            STRUCT_LOG_PARSE_ERROR_COUNT.inc();
778            Err(fmt::Error)
779        }
780    }
781}
782
783/// Periodically rebuilds the filter and replaces the current logger filter.
784/// This is useful for dynamically changing log levels at runtime via existing
785/// environment variables such as `RUST_LOG_TELEMETRY`.
786pub struct LoggerFilterUpdater {
787    logger: Arc<AptosData>,
788    logger_builder: AptosDataBuilder,
789}
790
791impl LoggerFilterUpdater {
792    pub fn new(logger: Arc<AptosData>, logger_builder: AptosDataBuilder) -> Self {
793        Self {
794            logger,
795            logger_builder,
796        }
797    }
798
799    pub async fn run(self) {
800        let mut interval = time::interval(FILTER_REFRESH_INTERVAL);
801        loop {
802            interval.tick().await;
803
804            self.update_filter();
805        }
806    }
807
808    fn update_filter(&self) {
809        // TODO: check for change to env var before rebuilding filter.
810        let filter = self.logger_builder.build_filter();
811        self.logger.set_filter(filter);
812    }
813}
814
815#[cfg(test)]
816mod tests {
817    use super::{AptosData, LogEntry};
818    use crate::{
819        aptos_logger::{json_format, RUST_LOG_TELEMETRY},
820        debug, error, info,
821        logger::Logger,
822        trace, warn, AptosDataBuilder, Event, Key, KeyValue, Level, LoggerFilterUpdater, Metadata,
823        Schema, Value, Visitor,
824    };
825    use chrono::{DateTime, Utc};
826    #[cfg(test)]
827    use pretty_assertions::assert_eq;
828    use serde_json::Value as JsonValue;
829    use std::{
830        sync::{
831            mpsc::{self, Receiver, SyncSender},
832            Arc,
833        },
834        thread,
835    };
836
837    #[derive(serde::Serialize)]
838    #[serde(rename_all = "snake_case")]
839    enum Enum {
840        FooBar,
841    }
842
843    struct TestSchema<'a> {
844        foo: usize,
845        bar: &'a Enum,
846    }
847
848    impl Schema for TestSchema<'_> {
849        fn visit(&self, visitor: &mut dyn Visitor) {
850            visitor.visit_pair(Key::new("foo"), Value::from_serde(&self.foo));
851            visitor.visit_pair(Key::new("bar"), Value::from_serde(&self.bar));
852        }
853    }
854
855    struct LogStream {
856        sender: SyncSender<LogEntry>,
857        enable_backtrace: bool,
858    }
859
860    impl LogStream {
861        fn new(enable_backtrace: bool) -> (Self, Receiver<LogEntry>) {
862            let (sender, receiver) = mpsc::sync_channel(1024);
863            let log_stream = Self {
864                sender,
865                enable_backtrace,
866            };
867            (log_stream, receiver)
868        }
869    }
870
871    impl Logger for LogStream {
872        fn enabled(&self, metadata: &Metadata) -> bool {
873            metadata.level() <= Level::Debug
874        }
875
876        fn record(&self, event: &Event) {
877            let entry = LogEntry::new(
878                event,
879                ::std::thread::current().name(),
880                self.enable_backtrace,
881            );
882            self.sender.send(entry).unwrap();
883        }
884
885        fn flush(&self) {}
886    }
887
888    fn set_test_logger() -> Receiver<LogEntry> {
889        let (logger, receiver) = LogStream::new(true);
890        let logger = Arc::new(logger);
891        crate::logger::set_global_logger(logger, None);
892        receiver
893    }
894
895    // TODO: Find a better mechanism for testing that allows setting the logger not globally
896    #[test]
897    fn basic() {
898        let receiver = set_test_logger();
899        let number = 12345;
900
901        // Send an info log
902        let before = Utc::now();
903        let mut line_num = line!();
904        info!(
905            TestSchema {
906                foo: 5,
907                bar: &Enum::FooBar
908            },
909            test = true,
910            category = "name",
911            KeyValue::new("display", Value::from_display(&number)),
912            "This is a log"
913        );
914
915        let after = Utc::now();
916
917        let mut entry = receiver.recv().unwrap();
918
919        // Ensure standard fields are filled
920        assert_eq!(entry.metadata.level(), Level::Info);
921        assert_eq!(
922            entry.metadata.target(),
923            module_path!().split("::").next().unwrap()
924        );
925        assert_eq!(entry.message.as_deref(), Some("This is a log"));
926        assert!(entry.backtrace.is_none());
927
928        // Ensure json formatter works
929        // hardcoding a timestamp and hostname to make the tests deterministic and not depend on environment
930        let original_timestamp = entry.timestamp;
931        entry.timestamp = String::from("2022-07-24T23:42:29.540278Z");
932        entry.hostname = Some("test-host");
933        line_num += 1;
934        let thread_name = thread::current().name().map(|s| s.to_string()).unwrap();
935
936        let expected = format!("{{\"level\":\"INFO\",\"source\":{{\"package\":\"aptos_logger\",\"file\":\"crates/aptos-logger/src/aptos_logger.rs:{line_num}\"}},\"thread_name\":\"{thread_name}\",\"hostname\":\"test-host\",\"timestamp\":\"2022-07-24T23:42:29.540278Z\",\"message\":\"This is a log\",\"data\":{{\"bar\":\"foo_bar\",\"category\":\"name\",\"display\":\"12345\",\"foo\":5,\"test\":true}}}}");
937
938        assert_eq!(json_format(&entry).unwrap(), expected);
939
940        entry.timestamp = original_timestamp;
941
942        // Log time should be the time the structured log entry was created
943        let timestamp = DateTime::parse_from_rfc3339(&entry.timestamp).unwrap();
944        let timestamp: DateTime<Utc> = DateTime::from(timestamp);
945        assert!(before <= timestamp && timestamp <= after);
946
947        // Ensure data stored is the right type
948        assert_eq!(
949            entry.data.get(&Key::new("foo")).and_then(JsonValue::as_u64),
950            Some(5)
951        );
952        assert_eq!(
953            entry.data.get(&Key::new("bar")).and_then(JsonValue::as_str),
954            Some("foo_bar")
955        );
956        assert_eq!(
957            entry
958                .data
959                .get(&Key::new("display"))
960                .and_then(JsonValue::as_str),
961            Some(format!("{}", number)).as_deref(),
962        );
963        assert_eq!(
964            entry
965                .data
966                .get(&Key::new("test"))
967                .and_then(JsonValue::as_bool),
968            Some(true),
969        );
970        assert_eq!(
971            entry
972                .data
973                .get(&Key::new("category"))
974                .and_then(JsonValue::as_str),
975            Some("name"),
976        );
977
978        // Test error logs contain backtraces
979        error!("This is an error log");
980        let entry = receiver.recv().unwrap();
981        assert!(entry.backtrace.is_some());
982
983        // Test all log levels work properly
984        // Tracing should be skipped because the Logger was setup to skip Tracing events
985        trace!("trace");
986        debug!("debug");
987        info!("info");
988        warn!("warn");
989        error!("error");
990
991        let levels = &[Level::Debug, Level::Info, Level::Warn, Level::Error];
992
993        for level in levels {
994            let entry = receiver.recv().unwrap();
995            assert_eq!(entry.metadata.level(), *level);
996        }
997
998        // Verify that the thread name is properly included
999        let handler = thread::Builder::new()
1000            .name("named thread".into())
1001            .spawn(|| info!("thread"))
1002            .unwrap();
1003
1004        handler.join().unwrap();
1005        let entry = receiver.recv().unwrap();
1006        assert_eq!(entry.thread_name.as_deref(), Some("named thread"));
1007
1008        // Test Debug and Display inputs
1009        let debug_struct = DebugStruct {};
1010        let display_struct = DisplayStruct {};
1011
1012        error!(identifier = ?debug_struct, "Debug test");
1013        error!(identifier = ?debug_struct, other = "value", "Debug2 test");
1014        error!(identifier = %display_struct, "Display test");
1015        error!(identifier = %display_struct, other = "value", "Display2 test");
1016        error!("Literal" = ?debug_struct, "Debug test");
1017        error!("Literal" = ?debug_struct, other = "value", "Debug test");
1018        error!("Literal" = %display_struct, "Display test");
1019        error!("Literal" = %display_struct, other = "value", "Display2 test");
1020        error!("Literal" = %display_struct, other = "value", identifier = ?debug_struct, "Mixed test");
1021    }
1022
1023    struct DebugStruct {}
1024
1025    impl std::fmt::Debug for DebugStruct {
1026        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1027            write!(f, "DebugStruct!")
1028        }
1029    }
1030
1031    struct DisplayStruct {}
1032
1033    impl std::fmt::Display for DisplayStruct {
1034        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1035            write!(f, "DisplayStruct!")
1036        }
1037    }
1038
1039    fn new_async_logger() -> (AptosDataBuilder, Arc<AptosData>) {
1040        let mut logger_builder = AptosDataBuilder::new();
1041        let (remote_log_tx, _) = futures::channel::mpsc::channel(10);
1042        let logger = logger_builder
1043            .remote_log_tx(remote_log_tx)
1044            .is_async(true)
1045            .build_logger();
1046        (logger_builder, logger)
1047    }
1048
1049    #[test]
1050    fn test_logger_filter_updater() {
1051        let (logger_builder, logger) = new_async_logger();
1052        let debug_metadata = &Metadata::new(Level::Debug, "target", "module_path", "source_path");
1053
1054        assert!(!logger
1055            .filter
1056            .read()
1057            .telemetry_filter
1058            .enabled(debug_metadata));
1059
1060        std::env::set_var(RUST_LOG_TELEMETRY, "debug");
1061
1062        let updater = LoggerFilterUpdater::new(logger.clone(), logger_builder);
1063        updater.update_filter();
1064
1065        assert!(logger
1066            .filter
1067            .read()
1068            .telemetry_filter
1069            .enabled(debug_metadata));
1070    }
1071
1072    #[test]
1073    fn test_logger_filter_updater_invalid_value() {
1074        let (logger_builder, logger) = new_async_logger();
1075
1076        let debug_metadata = &Metadata::new(Level::Debug, "target", "module_path", "source_path");
1077
1078        assert!(!logger
1079            .filter
1080            .read()
1081            .telemetry_filter
1082            .enabled(debug_metadata));
1083
1084        std::env::set_var(RUST_LOG_TELEMETRY, "debug;hyper=off"); // log values should be separated by commas not semicolons.
1085
1086        let updater = LoggerFilterUpdater::new(logger.clone(), logger_builder);
1087        updater.update_filter();
1088
1089        assert!(!logger
1090            .filter
1091            .read()
1092            .telemetry_filter
1093            .enabled(debug_metadata));
1094
1095        std::env::set_var(RUST_LOG_TELEMETRY, "debug,hyper=off"); // log values should be separated by commas not semicolons.
1096        updater.update_filter();
1097
1098        assert!(logger
1099            .filter
1100            .read()
1101            .telemetry_filter
1102            .enabled(debug_metadata));
1103
1104        assert!(!logger
1105            .filter
1106            .read()
1107            .telemetry_filter
1108            .enabled(&Metadata::new(
1109                Level::Error,
1110                "target",
1111                "hyper",
1112                "source_path"
1113            )));
1114    }
1115}