1use crate::sample::SampleRate;
8use crate::telemetry_log_writer::{TelemetryLog, TelemetryLogWriter};
9use crate::{
10 counters::{
11 PROCESSED_STRUCT_LOG_COUNT, SENT_STRUCT_LOG_BYTES, SENT_STRUCT_LOG_COUNT,
12 STRUCT_LOG_PARSE_ERROR_COUNT, STRUCT_LOG_QUEUE_ERROR_COUNT, STRUCT_LOG_SEND_ERROR_COUNT,
13 },
14 logger::Logger,
15 sample,
16 struct_log::TcpWriter,
17 Event, Filter, Key, Level, LevelFilter, Metadata,
18};
19use aptos_infallible::RwLock;
20use backtrace::Backtrace;
21use chrono::{SecondsFormat, Utc};
22use futures::channel;
23use once_cell::sync::Lazy;
24use serde::ser::SerializeStruct;
25use serde::{Serialize, Serializer};
26use std::fmt::Debug;
27use std::io::Stdout;
28use std::time::Duration;
29use std::{
30 collections::BTreeMap,
31 env, fmt,
32 io::Write,
33 str::FromStr,
34 sync::{self, Arc},
35 thread,
36};
37use strum_macros::EnumString;
38use tokio::time;
39
40const RUST_LOG: &str = "RUST_LOG";
41const RUST_LOG_REMOTE: &str = "RUST_LOG_REMOTE";
42pub const RUST_LOG_TELEMETRY: &str = "RUST_LOG_TELEMETRY";
43const RUST_LOG_FORMAT: &str = "RUST_LOG_FORMAT";
44pub const CHANNEL_SIZE: usize = 10000;
46const NUM_SEND_RETRIES: u8 = 1;
47const FLUSH_TIMEOUT: Duration = Duration::from_secs(5);
48const FILTER_REFRESH_INTERVAL: Duration =
49 Duration::from_secs(5 * 60 );
50
51#[derive(EnumString)]
52#[strum(serialize_all = "lowercase")]
53enum LogFormat {
54 Json,
55 Text,
56}
57
58#[derive(Debug)]
60pub struct LogEntry {
61 metadata: Metadata,
62 thread_name: Option<String>,
63 backtrace: Option<String>,
66 hostname: Option<&'static str>,
67 namespace: Option<&'static str>,
68 timestamp: String,
69 data: BTreeMap<Key, serde_json::Value>,
70 message: Option<String>,
71}
72
73impl Serialize for LogEntry {
76 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
77 where
78 S: Serializer,
79 {
80 let mut state = serializer.serialize_struct("LogEntry", 9)?;
81 state.serialize_field("level", &self.metadata.level())?;
82 state.serialize_field("source", &self.metadata)?;
83 if let Some(thread_name) = &self.thread_name {
84 state.serialize_field("thread_name", thread_name)?;
85 }
86 if let Some(hostname) = &self.hostname {
87 state.serialize_field("hostname", hostname)?;
88 }
89 if let Some(namespace) = &self.namespace {
90 state.serialize_field("namespace", namespace)?;
91 }
92 state.serialize_field("timestamp", &self.timestamp)?;
93 if let Some(message) = &self.message {
94 state.serialize_field("message", message)?;
95 }
96 if !&self.data.is_empty() {
97 state.serialize_field("data", &self.data)?;
98 }
99 if let Some(backtrace) = &self.backtrace {
100 state.serialize_field("backtrace", backtrace)?;
101 }
102 state.end()
103 }
104}
105
106impl LogEntry {
107 fn new(event: &Event, thread_name: Option<&str>, enable_backtrace: bool) -> Self {
108 use crate::{Value, Visitor};
109
110 struct JsonVisitor<'a>(&'a mut BTreeMap<Key, serde_json::Value>);
111
112 impl<'a> Visitor for JsonVisitor<'a> {
113 fn visit_pair(&mut self, key: Key, value: Value<'_>) {
114 let v = match value {
115 Value::Debug(d) => serde_json::Value::String(format!("{:?}", d)),
116 Value::Display(d) => serde_json::Value::String(d.to_string()),
117 Value::Serde(s) => match serde_json::to_value(s) {
118 Ok(value) => value,
119 Err(e) => {
120 eprintln!("error serializing structured log: {} for key {:?}", e, key);
122 return;
123 }
124 },
125 };
126
127 self.0.insert(key, v);
128 }
129 }
130
131 let metadata = *event.metadata();
132 let thread_name = thread_name.map(ToOwned::to_owned);
133 let message = event.message().map(fmt::format);
134
135 static HOSTNAME: Lazy<Option<String>> = Lazy::new(|| {
136 hostname::get()
137 .ok()
138 .and_then(|name| name.into_string().ok())
139 });
140
141 static NAMESPACE: Lazy<Option<String>> =
142 Lazy::new(|| env::var("KUBERNETES_NAMESPACE").ok());
143
144 let hostname = HOSTNAME.as_deref();
145 let namespace = NAMESPACE.as_deref();
146
147 let backtrace = if enable_backtrace && matches!(metadata.level(), Level::Error) {
148 let mut backtrace = Backtrace::new();
149 let mut frames = backtrace.frames().to_vec();
150 if frames.len() > 3 {
151 frames.drain(0..3); }
153 backtrace = frames.into();
154 Some(format!("{:?}", backtrace))
155 } else {
156 None
157 };
158
159 let mut data = BTreeMap::new();
160 for schema in event.keys_and_values() {
161 schema.visit(&mut JsonVisitor(&mut data));
162 }
163
164 Self {
165 metadata,
166 thread_name,
167 backtrace,
168 hostname,
169 namespace,
170 timestamp: Utc::now().to_rfc3339_opts(SecondsFormat::Micros, true),
171 data,
172 message,
173 }
174 }
175
176 pub fn metadata(&self) -> &Metadata {
177 &self.metadata
178 }
179
180 pub fn thread_name(&self) -> Option<&str> {
181 self.thread_name.as_deref()
182 }
183
184 pub fn backtrace(&self) -> Option<&str> {
185 self.backtrace.as_deref()
186 }
187
188 pub fn hostname(&self) -> Option<&str> {
189 self.hostname
190 }
191
192 pub fn namespace(&self) -> Option<&str> {
193 self.namespace
194 }
195
196 pub fn timestamp(&self) -> &str {
197 self.timestamp.as_str()
198 }
199
200 pub fn data(&self) -> &BTreeMap<Key, serde_json::Value> {
201 &self.data
202 }
203
204 pub fn message(&self) -> Option<&str> {
205 self.message.as_deref()
206 }
207}
208
209pub struct AptosDataBuilder {
211 channel_size: usize,
212 console_port: Option<u16>,
213 enable_backtrace: bool,
214 level: Level,
215 remote_level: Level,
216 telemetry_level: Level,
217 address: Option<String>,
218 printer: Option<Box<dyn Writer>>,
219 remote_log_tx: Option<channel::mpsc::Sender<TelemetryLog>>,
220 is_async: bool,
221 enable_telemetry_flush: bool,
222 custom_format: Option<fn(&LogEntry) -> Result<String, fmt::Error>>,
223}
224
225impl AptosDataBuilder {
226 #[allow(clippy::new_without_default)]
227 pub fn new() -> Self {
228 Self {
229 channel_size: CHANNEL_SIZE,
230 console_port: Some(6669),
231 enable_backtrace: false,
232 level: Level::Info,
233 remote_level: Level::Info,
234 telemetry_level: Level::Warn,
235 address: None,
236 printer: Some(Box::new(StdoutWriter::new())),
237 remote_log_tx: None,
238 is_async: false,
239 enable_telemetry_flush: true,
240 custom_format: None,
241 }
242 }
243
244 pub fn address(&mut self, address: String) -> &mut Self {
245 self.address = Some(address);
246 self
247 }
248
249 pub fn enable_backtrace(&mut self) -> &mut Self {
250 self.enable_backtrace = true;
251 self
252 }
253
254 pub fn read_env(&mut self) -> &mut Self {
255 if let Ok(address) = env::var("STRUCT_LOG_TCP_ADDR") {
256 self.address(address);
257 }
258 self
259 }
260
261 pub fn level(&mut self, level: Level) -> &mut Self {
262 self.level = level;
263 self
264 }
265
266 pub fn remote_level(&mut self, level: Level) -> &mut Self {
267 self.remote_level = level;
268 self
269 }
270
271 pub fn telemetry_level(&mut self, level: Level) -> &mut Self {
272 self.telemetry_level = level;
273 self
274 }
275
276 pub fn channel_size(&mut self, channel_size: usize) -> &mut Self {
277 self.channel_size = channel_size;
278 self
279 }
280
281 pub fn printer(&mut self, printer: Box<dyn Writer + Send + Sync + 'static>) -> &mut Self {
282 self.printer = Some(printer);
283 self
284 }
285
286 pub fn console_port(&mut self, console_port: Option<u16>) -> &mut Self {
287 self.console_port = console_port;
288 self
289 }
290
291 pub fn remote_log_tx(
292 &mut self,
293 remote_log_tx: channel::mpsc::Sender<TelemetryLog>,
294 ) -> &mut Self {
295 self.remote_log_tx = Some(remote_log_tx);
296 self
297 }
298
299 pub fn is_async(&mut self, is_async: bool) -> &mut Self {
300 self.is_async = is_async;
301 self
302 }
303
304 pub fn enable_telemetry_flush(&mut self, enable_telemetry_flush: bool) -> &mut Self {
305 self.enable_telemetry_flush = enable_telemetry_flush;
306 self
307 }
308
309 pub fn custom_format(
310 &mut self,
311 format: fn(&LogEntry) -> Result<String, fmt::Error>,
312 ) -> &mut Self {
313 self.custom_format = Some(format);
314 self
315 }
316
317 pub fn init(&mut self) {
318 self.build();
319 }
320
321 fn build_filter(&self) -> FilterTuple {
322 let local_filter = {
323 let mut filter_builder = Filter::builder();
324
325 if env::var(RUST_LOG).is_ok() {
326 filter_builder.with_env(RUST_LOG);
327 } else {
328 filter_builder.filter_level(self.level.into());
329 }
330
331 filter_builder.build()
332 };
333 let remote_filter = {
334 let mut filter_builder = Filter::builder();
335
336 if self.is_async && self.address.is_some() {
337 if env::var(RUST_LOG_REMOTE).is_ok() {
338 filter_builder.with_env(RUST_LOG_REMOTE);
339 } else if env::var(RUST_LOG).is_ok() {
340 filter_builder.with_env(RUST_LOG);
341 } else {
342 filter_builder.filter_level(self.remote_level.into());
343 }
344 } else {
345 filter_builder.filter_level(LevelFilter::Off);
346 }
347
348 filter_builder.build()
349 };
350 let telemetry_filter = {
351 let mut filter_builder = Filter::builder();
352
353 if self.is_async && self.remote_log_tx.is_some() {
354 if env::var(RUST_LOG_TELEMETRY).is_ok() {
355 filter_builder.with_env(RUST_LOG_TELEMETRY);
356 } else {
357 filter_builder.filter_level(self.telemetry_level.into());
358 }
359 } else {
360 filter_builder.filter_level(LevelFilter::Off);
361 }
362
363 filter_builder.build()
364 };
365
366 FilterTuple {
367 local_filter,
368 remote_filter,
369 telemetry_filter,
370 }
371 }
372
373 fn build_logger(&mut self) -> Arc<AptosData> {
374 let filter = self.build_filter();
375
376 if let Ok(log_format) = env::var(RUST_LOG_FORMAT) {
377 let log_format = LogFormat::from_str(&log_format).unwrap();
378 self.custom_format = match log_format {
379 LogFormat::Json => Some(json_format),
380 LogFormat::Text => Some(text_format),
381 }
382 }
383
384 if self.is_async {
385 let (sender, receiver) = sync::mpsc::sync_channel(self.channel_size);
386 let mut remote_tx = None;
387 if let Some(tx) = &self.remote_log_tx {
388 remote_tx = Some(tx.clone());
389 }
390
391 let logger = Arc::new(AptosData {
392 enable_backtrace: self.enable_backtrace,
393 sender: Some(sender),
394 printer: None,
395 filter: RwLock::new(filter),
396 enable_telemetry_flush: self.enable_telemetry_flush,
397 formatter: self.custom_format.take().unwrap_or(text_format),
398 });
399 let service = LoggerService {
400 receiver,
401 address: self.address.clone(),
402 printer: self.printer.take(),
403 facade: logger.clone(),
404 remote_tx,
405 };
406
407 thread::spawn(move || service.run());
408 logger
409 } else {
410 Arc::new(AptosData {
411 enable_backtrace: self.enable_backtrace,
412 sender: None,
413 printer: self.printer.take(),
414 filter: RwLock::new(filter),
415 enable_telemetry_flush: self.enable_telemetry_flush,
416 formatter: self.custom_format.take().unwrap_or(text_format),
417 })
418 }
419 }
420
421 pub fn build(&mut self) -> Arc<AptosData> {
422 let logger = self.build_logger();
423
424 let console_port = if cfg!(feature = "aptos-console") {
425 self.console_port
426 } else {
427 None
428 };
429
430 crate::logger::set_global_logger(logger.clone(), console_port);
431 logger
432 }
433}
434
435pub struct FilterTuple {
437 local_filter: Filter,
439 remote_filter: Filter,
441 telemetry_filter: Filter,
443}
444
445impl FilterTuple {
446 fn enabled(&self, metadata: &Metadata) -> bool {
447 self.local_filter.enabled(metadata)
448 || self.remote_filter.enabled(metadata)
449 || self.telemetry_filter.enabled(metadata)
450 }
451}
452
453pub struct AptosData {
454 enable_backtrace: bool,
455 sender: Option<sync::mpsc::SyncSender<LoggerServiceEvent>>,
456 printer: Option<Box<dyn Writer>>,
457 filter: RwLock<FilterTuple>,
458 enable_telemetry_flush: bool,
459 pub(crate) formatter: fn(&LogEntry) -> Result<String, fmt::Error>,
460}
461
462impl AptosData {
463 pub fn builder() -> AptosDataBuilder {
464 AptosDataBuilder::new()
465 }
466
467 #[allow(clippy::new_ret_no_self)]
468 pub fn new() -> AptosDataBuilder {
469 Self::builder()
470 }
471
472 pub fn init_for_testing() {
473 if env::var(RUST_LOG).is_err() {
474 return;
475 }
476
477 Self::builder()
478 .is_async(false)
479 .enable_backtrace()
480 .printer(Box::new(StdoutWriter::new()))
481 .build();
482 }
483
484 pub fn set_filter(&self, filter_tuple: FilterTuple) {
485 *self.filter.write() = filter_tuple;
486 }
487
488 pub fn set_local_filter(&self, filter: Filter) {
489 self.filter.write().local_filter = filter;
490 }
491
492 pub fn set_remote_filter(&self, filter: Filter) {
493 self.filter.write().remote_filter = filter;
494 }
495
496 pub fn set_telemetry_filter(&self, filter: Filter) {
497 self.filter.write().telemetry_filter = filter;
498 }
499
500 fn send_entry(&self, entry: LogEntry) {
501 if let Some(printer) = &self.printer {
502 let s = (self.formatter)(&entry).expect("Unable to format");
503 printer.write(s);
504 }
505
506 if let Some(sender) = &self.sender {
507 if sender
508 .try_send(LoggerServiceEvent::LogEntry(entry))
509 .is_err()
510 {
511 STRUCT_LOG_QUEUE_ERROR_COUNT.inc();
512 }
513 }
514 }
515}
516
517impl Logger for AptosData {
518 fn enabled(&self, metadata: &Metadata) -> bool {
519 self.filter.read().enabled(metadata)
520 }
521
522 fn record(&self, event: &Event) {
523 let entry = LogEntry::new(
524 event,
525 ::std::thread::current().name(),
526 self.enable_backtrace,
527 );
528
529 self.send_entry(entry)
530 }
531
532 fn flush(&self) {
533 if let Some(sender) = &self.sender {
534 let (oneshot_sender, oneshot_receiver) = sync::mpsc::sync_channel(1);
535 match sender.try_send(LoggerServiceEvent::Flush(oneshot_sender)) {
536 Ok(_) => {
537 if let Err(err) = oneshot_receiver.recv_timeout(FLUSH_TIMEOUT) {
538 eprintln!("[Logging] Unable to flush recv: {}", err);
539 }
540 }
541 Err(err) => {
542 eprintln!("[Logging] Unable to flush send: {}", err);
543 std::thread::sleep(FLUSH_TIMEOUT);
544 }
545 }
546 }
547 }
548}
549
550enum LoggerServiceEvent {
551 LogEntry(LogEntry),
552 Flush(sync::mpsc::SyncSender<()>),
553}
554
555struct LoggerService {
558 receiver: sync::mpsc::Receiver<LoggerServiceEvent>,
559 address: Option<String>,
560 printer: Option<Box<dyn Writer>>,
561 facade: Arc<AptosData>,
562 remote_tx: Option<channel::mpsc::Sender<TelemetryLog>>,
563}
564
565impl LoggerService {
566 pub fn run(mut self) {
567 let mut tcp_writer = self.address.take().map(TcpWriter::new);
568 let mut telemetry_writer = self.remote_tx.take().map(TelemetryLogWriter::new);
569
570 for event in &self.receiver {
571 match event {
572 LoggerServiceEvent::LogEntry(entry) => {
573 PROCESSED_STRUCT_LOG_COUNT.inc();
574
575 if let Some(printer) = &mut self.printer {
576 if self
577 .facade
578 .filter
579 .read()
580 .local_filter
581 .enabled(&entry.metadata)
582 {
583 let s = (self.facade.formatter)(&entry).expect("Unable to format");
584 printer.write_buferred(s);
585 }
586 }
587
588 if let Some(writer) = &mut tcp_writer {
589 if self
590 .facade
591 .filter
592 .read()
593 .remote_filter
594 .enabled(&entry.metadata)
595 {
596 Self::write_to_logstash(writer, &entry);
597 }
598 }
599
600 if let Some(writer) = &mut telemetry_writer {
601 if self
602 .facade
603 .filter
604 .read()
605 .telemetry_filter
606 .enabled(&entry.metadata)
607 {
608 let s = (self.facade.formatter)(&entry).expect("Unable to format");
609 let _ = writer.write(s);
610 }
611 }
612 }
613 LoggerServiceEvent::Flush(sender) => {
614 if let Some(writer) = &mut telemetry_writer {
616 if self.facade.enable_telemetry_flush {
617 match writer.flush() {
618 Ok(rx) => {
619 if let Err(err) = rx.recv_timeout(FLUSH_TIMEOUT) {
620 sample!(
621 SampleRate::Duration(Duration::from_secs(60)),
622 eprintln!("Timed out flushing telemetry: {}", err)
623 );
624 }
625 }
626 Err(err) => {
627 sample!(
628 SampleRate::Duration(Duration::from_secs(60)),
629 eprintln!("Failed to flush telemetry: {}", err)
630 );
631 }
632 }
633 }
634 }
635 let _ = sender.send(());
636 }
637 }
638 }
639 }
640
641 fn write_to_logstash(stream: &mut TcpWriter, entry: &LogEntry) {
643 let message = if let Ok(json) = json_format(entry) {
644 json
645 } else {
646 return;
647 };
648 let message = message + "\n";
649 let bytes = message.as_bytes();
650 let message_length = bytes.len();
651
652 let mut result = stream.write_all(bytes);
655 for _ in 0..NUM_SEND_RETRIES {
656 if result.is_ok() {
657 break;
658 } else {
659 result = stream.write_all(bytes);
660 }
661 }
662
663 if let Err(e) = result {
664 STRUCT_LOG_SEND_ERROR_COUNT.inc();
665 eprintln!(
666 "[Logging] Error while sending data to logstash({}): {}",
667 stream.endpoint(),
668 e
669 );
670 } else {
671 SENT_STRUCT_LOG_COUNT.inc();
672 SENT_STRUCT_LOG_BYTES.inc_by(message_length as u64);
673 }
674 }
675}
676
677pub trait Writer: Send + Sync {
679 fn write(&self, log: String);
681
682 fn write_buferred(&mut self, log: String);
684}
685
686struct StdoutWriter {
688 buffer: std::io::BufWriter<Stdout>,
689}
690
691impl StdoutWriter {
692 pub fn new() -> Self {
693 let buffer = std::io::BufWriter::new(std::io::stdout());
694 Self { buffer }
695 }
696}
697impl Writer for StdoutWriter {
698 fn write(&self, log: String) {
700 println!("{}", log);
701 }
702 fn write_buferred(&mut self, log: String) {
703 self.buffer
704 .write_fmt(format_args!("{}\n", log))
705 .unwrap_or_default();
706 }
707}
708
709pub struct FileWriter {
711 log_file: RwLock<std::fs::File>,
712}
713
714impl FileWriter {
715 pub fn new(log_file: std::path::PathBuf) -> Self {
716 let file = std::fs::OpenOptions::new()
717 .append(true)
718 .create(true)
719 .open(log_file)
720 .expect("Unable to open log file");
721 Self {
722 log_file: RwLock::new(file),
723 }
724 }
725}
726
727impl Writer for FileWriter {
728 fn write(&self, log: String) {
730 if let Err(err) = writeln!(self.log_file.write(), "{}", log) {
731 eprintln!("Unable to write to log file: {}", err);
732 }
733 }
734 fn write_buferred(&mut self, log: String) {
735 self.write(log);
736 }
737}
738
739fn text_format(entry: &LogEntry) -> Result<String, fmt::Error> {
744 use std::fmt::Write;
745
746 let mut w = String::new();
747 write!(w, "{}", entry.timestamp)?;
748
749 if let Some(thread_name) = &entry.thread_name {
750 write!(w, " [{}]", thread_name)?;
751 }
752
753 write!(
754 w,
755 " {} {}",
756 entry.metadata.level(),
757 entry.metadata.source_path()
758 )?;
759
760 if let Some(message) = &entry.message {
761 write!(w, " {}", message)?;
762 }
763
764 if !entry.data.is_empty() {
765 write!(w, " {}", serde_json::to_string(&entry.data).unwrap())?;
766 }
767
768 Ok(w)
769}
770
771fn json_format(entry: &LogEntry) -> Result<String, fmt::Error> {
773 match serde_json::to_string(&entry) {
774 Ok(s) => Ok(s),
775 Err(_) => {
776 STRUCT_LOG_PARSE_ERROR_COUNT.inc();
778 Err(fmt::Error)
779 }
780 }
781}
782
783pub struct LoggerFilterUpdater {
787 logger: Arc<AptosData>,
788 logger_builder: AptosDataBuilder,
789}
790
791impl LoggerFilterUpdater {
792 pub fn new(logger: Arc<AptosData>, logger_builder: AptosDataBuilder) -> Self {
793 Self {
794 logger,
795 logger_builder,
796 }
797 }
798
799 pub async fn run(self) {
800 let mut interval = time::interval(FILTER_REFRESH_INTERVAL);
801 loop {
802 interval.tick().await;
803
804 self.update_filter();
805 }
806 }
807
808 fn update_filter(&self) {
809 let filter = self.logger_builder.build_filter();
811 self.logger.set_filter(filter);
812 }
813}
814
815#[cfg(test)]
816mod tests {
817 use super::{AptosData, LogEntry};
818 use crate::{
819 aptos_logger::{json_format, RUST_LOG_TELEMETRY},
820 debug, error, info,
821 logger::Logger,
822 trace, warn, AptosDataBuilder, Event, Key, KeyValue, Level, LoggerFilterUpdater, Metadata,
823 Schema, Value, Visitor,
824 };
825 use chrono::{DateTime, Utc};
826 #[cfg(test)]
827 use pretty_assertions::assert_eq;
828 use serde_json::Value as JsonValue;
829 use std::{
830 sync::{
831 mpsc::{self, Receiver, SyncSender},
832 Arc,
833 },
834 thread,
835 };
836
837 #[derive(serde::Serialize)]
838 #[serde(rename_all = "snake_case")]
839 enum Enum {
840 FooBar,
841 }
842
843 struct TestSchema<'a> {
844 foo: usize,
845 bar: &'a Enum,
846 }
847
848 impl Schema for TestSchema<'_> {
849 fn visit(&self, visitor: &mut dyn Visitor) {
850 visitor.visit_pair(Key::new("foo"), Value::from_serde(&self.foo));
851 visitor.visit_pair(Key::new("bar"), Value::from_serde(&self.bar));
852 }
853 }
854
855 struct LogStream {
856 sender: SyncSender<LogEntry>,
857 enable_backtrace: bool,
858 }
859
860 impl LogStream {
861 fn new(enable_backtrace: bool) -> (Self, Receiver<LogEntry>) {
862 let (sender, receiver) = mpsc::sync_channel(1024);
863 let log_stream = Self {
864 sender,
865 enable_backtrace,
866 };
867 (log_stream, receiver)
868 }
869 }
870
871 impl Logger for LogStream {
872 fn enabled(&self, metadata: &Metadata) -> bool {
873 metadata.level() <= Level::Debug
874 }
875
876 fn record(&self, event: &Event) {
877 let entry = LogEntry::new(
878 event,
879 ::std::thread::current().name(),
880 self.enable_backtrace,
881 );
882 self.sender.send(entry).unwrap();
883 }
884
885 fn flush(&self) {}
886 }
887
888 fn set_test_logger() -> Receiver<LogEntry> {
889 let (logger, receiver) = LogStream::new(true);
890 let logger = Arc::new(logger);
891 crate::logger::set_global_logger(logger, None);
892 receiver
893 }
894
895 #[test]
897 fn basic() {
898 let receiver = set_test_logger();
899 let number = 12345;
900
901 let before = Utc::now();
903 let mut line_num = line!();
904 info!(
905 TestSchema {
906 foo: 5,
907 bar: &Enum::FooBar
908 },
909 test = true,
910 category = "name",
911 KeyValue::new("display", Value::from_display(&number)),
912 "This is a log"
913 );
914
915 let after = Utc::now();
916
917 let mut entry = receiver.recv().unwrap();
918
919 assert_eq!(entry.metadata.level(), Level::Info);
921 assert_eq!(
922 entry.metadata.target(),
923 module_path!().split("::").next().unwrap()
924 );
925 assert_eq!(entry.message.as_deref(), Some("This is a log"));
926 assert!(entry.backtrace.is_none());
927
928 let original_timestamp = entry.timestamp;
931 entry.timestamp = String::from("2022-07-24T23:42:29.540278Z");
932 entry.hostname = Some("test-host");
933 line_num += 1;
934 let thread_name = thread::current().name().map(|s| s.to_string()).unwrap();
935
936 let expected = format!("{{\"level\":\"INFO\",\"source\":{{\"package\":\"aptos_logger\",\"file\":\"crates/aptos-logger/src/aptos_logger.rs:{line_num}\"}},\"thread_name\":\"{thread_name}\",\"hostname\":\"test-host\",\"timestamp\":\"2022-07-24T23:42:29.540278Z\",\"message\":\"This is a log\",\"data\":{{\"bar\":\"foo_bar\",\"category\":\"name\",\"display\":\"12345\",\"foo\":5,\"test\":true}}}}");
937
938 assert_eq!(json_format(&entry).unwrap(), expected);
939
940 entry.timestamp = original_timestamp;
941
942 let timestamp = DateTime::parse_from_rfc3339(&entry.timestamp).unwrap();
944 let timestamp: DateTime<Utc> = DateTime::from(timestamp);
945 assert!(before <= timestamp && timestamp <= after);
946
947 assert_eq!(
949 entry.data.get(&Key::new("foo")).and_then(JsonValue::as_u64),
950 Some(5)
951 );
952 assert_eq!(
953 entry.data.get(&Key::new("bar")).and_then(JsonValue::as_str),
954 Some("foo_bar")
955 );
956 assert_eq!(
957 entry
958 .data
959 .get(&Key::new("display"))
960 .and_then(JsonValue::as_str),
961 Some(format!("{}", number)).as_deref(),
962 );
963 assert_eq!(
964 entry
965 .data
966 .get(&Key::new("test"))
967 .and_then(JsonValue::as_bool),
968 Some(true),
969 );
970 assert_eq!(
971 entry
972 .data
973 .get(&Key::new("category"))
974 .and_then(JsonValue::as_str),
975 Some("name"),
976 );
977
978 error!("This is an error log");
980 let entry = receiver.recv().unwrap();
981 assert!(entry.backtrace.is_some());
982
983 trace!("trace");
986 debug!("debug");
987 info!("info");
988 warn!("warn");
989 error!("error");
990
991 let levels = &[Level::Debug, Level::Info, Level::Warn, Level::Error];
992
993 for level in levels {
994 let entry = receiver.recv().unwrap();
995 assert_eq!(entry.metadata.level(), *level);
996 }
997
998 let handler = thread::Builder::new()
1000 .name("named thread".into())
1001 .spawn(|| info!("thread"))
1002 .unwrap();
1003
1004 handler.join().unwrap();
1005 let entry = receiver.recv().unwrap();
1006 assert_eq!(entry.thread_name.as_deref(), Some("named thread"));
1007
1008 let debug_struct = DebugStruct {};
1010 let display_struct = DisplayStruct {};
1011
1012 error!(identifier = ?debug_struct, "Debug test");
1013 error!(identifier = ?debug_struct, other = "value", "Debug2 test");
1014 error!(identifier = %display_struct, "Display test");
1015 error!(identifier = %display_struct, other = "value", "Display2 test");
1016 error!("Literal" = ?debug_struct, "Debug test");
1017 error!("Literal" = ?debug_struct, other = "value", "Debug test");
1018 error!("Literal" = %display_struct, "Display test");
1019 error!("Literal" = %display_struct, other = "value", "Display2 test");
1020 error!("Literal" = %display_struct, other = "value", identifier = ?debug_struct, "Mixed test");
1021 }
1022
1023 struct DebugStruct {}
1024
1025 impl std::fmt::Debug for DebugStruct {
1026 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1027 write!(f, "DebugStruct!")
1028 }
1029 }
1030
1031 struct DisplayStruct {}
1032
1033 impl std::fmt::Display for DisplayStruct {
1034 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1035 write!(f, "DisplayStruct!")
1036 }
1037 }
1038
1039 fn new_async_logger() -> (AptosDataBuilder, Arc<AptosData>) {
1040 let mut logger_builder = AptosDataBuilder::new();
1041 let (remote_log_tx, _) = futures::channel::mpsc::channel(10);
1042 let logger = logger_builder
1043 .remote_log_tx(remote_log_tx)
1044 .is_async(true)
1045 .build_logger();
1046 (logger_builder, logger)
1047 }
1048
1049 #[test]
1050 fn test_logger_filter_updater() {
1051 let (logger_builder, logger) = new_async_logger();
1052 let debug_metadata = &Metadata::new(Level::Debug, "target", "module_path", "source_path");
1053
1054 assert!(!logger
1055 .filter
1056 .read()
1057 .telemetry_filter
1058 .enabled(debug_metadata));
1059
1060 std::env::set_var(RUST_LOG_TELEMETRY, "debug");
1061
1062 let updater = LoggerFilterUpdater::new(logger.clone(), logger_builder);
1063 updater.update_filter();
1064
1065 assert!(logger
1066 .filter
1067 .read()
1068 .telemetry_filter
1069 .enabled(debug_metadata));
1070 }
1071
1072 #[test]
1073 fn test_logger_filter_updater_invalid_value() {
1074 let (logger_builder, logger) = new_async_logger();
1075
1076 let debug_metadata = &Metadata::new(Level::Debug, "target", "module_path", "source_path");
1077
1078 assert!(!logger
1079 .filter
1080 .read()
1081 .telemetry_filter
1082 .enabled(debug_metadata));
1083
1084 std::env::set_var(RUST_LOG_TELEMETRY, "debug;hyper=off"); let updater = LoggerFilterUpdater::new(logger.clone(), logger_builder);
1087 updater.update_filter();
1088
1089 assert!(!logger
1090 .filter
1091 .read()
1092 .telemetry_filter
1093 .enabled(debug_metadata));
1094
1095 std::env::set_var(RUST_LOG_TELEMETRY, "debug,hyper=off"); updater.update_filter();
1097
1098 assert!(logger
1099 .filter
1100 .read()
1101 .telemetry_filter
1102 .enabled(debug_metadata));
1103
1104 assert!(!logger
1105 .filter
1106 .read()
1107 .telemetry_filter
1108 .enabled(&Metadata::new(
1109 Level::Error,
1110 "target",
1111 "hyper",
1112 "source_path"
1113 )));
1114 }
1115}