1use std::{
17 cell::RefCell,
18 fmt::{Display, Write as _},
19 sync::{
20 Mutex, OnceLock,
21 atomic::{AtomicBool, Ordering},
22 mpsc::SendError,
23 },
24};
25
26use ahash::AHashMap;
27use indexmap::IndexMap;
28use log::{
29 Level, LevelFilter, Log, STATIC_MAX_LEVEL,
30 kv::{ToValue, Value},
31 set_boxed_logger, set_max_level,
32};
33use nautilus_core::{
34 UUID4, UnixNanos,
35 datetime::unix_nanos_to_iso8601,
36 time::{get_atomic_clock_realtime, get_atomic_clock_static},
37};
38use nautilus_model::identifiers::TraderId;
39use serde::{Deserialize, Serialize, Serializer, ser::SerializeMap};
40use smallvec::SmallVec;
41use ustr::Ustr;
42
43pub use super::config::LoggerConfig;
44use super::{LOGGING_BYPASSED, LOGGING_GUARDS_ACTIVE, LOGGING_INITIALIZED, LOGGING_REALTIME};
45#[cfg(not(all(feature = "simulation", madsim)))]
46use crate::logging::writer::{FileWriter, LogWriter, StderrWriter, StdoutWriter};
47use crate::{
48 enums::{LogColor, LogLevel},
49 logging::writer::FileWriterConfig,
50};
51
52#[cfg(not(all(feature = "simulation", madsim)))]
53const LOGGING: &str = "logging";
54const KV_COLOR: &str = "color";
55const KV_COMPONENT: &str = "component";
56const LOG_FIELDS_INLINE_CAP: usize = 0;
57const MAX_LEVEL_DISPLAY_LEN: usize = "ERROR".len();
58const ANSI_BOLD_LEN: usize = "\x1b[1m".len();
59const ANSI_RESET_LEN: usize = "\x1b[0m".len();
60const PLAIN_FORMAT_OVERHEAD: usize = " [".len() + "] ".len() + ".".len() + ": ".len() + "\n".len();
61const COLORED_FORMAT_OVERHEAD: usize = ANSI_BOLD_LEN
62 + ANSI_RESET_LEN
63 + " ".len()
64 + "[".len()
65 + "] ".len()
66 + ".".len()
67 + ": ".len()
68 + ANSI_RESET_LEN
69 + "\n".len();
70const REPEATED_USTR_CACHE_CAP: usize = 8;
71
72thread_local! {
73 static REPEATED_USTR_CACHE: RefCell<RepeatedUstrCache> =
74 const { RefCell::new(RepeatedUstrCache::new()) };
75}
76
77#[derive(Clone, Copy)]
78struct RepeatedUstrCacheEntry {
79 ptr: usize,
80 len: usize,
81 value: Ustr,
82}
83
84#[derive(Clone, Copy)]
85struct RepeatedUstrCache {
86 entries: [Option<RepeatedUstrCacheEntry>; REPEATED_USTR_CACHE_CAP],
87 next: usize,
88}
89
90impl RepeatedUstrCache {
91 const fn new() -> Self {
92 Self {
93 entries: [None; REPEATED_USTR_CACHE_CAP],
94 next: 0,
95 }
96 }
97}
98
99pub type LogFields = SmallVec<[(Ustr, String); LOG_FIELDS_INLINE_CAP]>;
102
103static LOGGER_TX: OnceLock<std::sync::mpsc::Sender<LogEvent>> = OnceLock::new();
105
106static LOGGER_HANDLE: Mutex<Option<std::thread::JoinHandle<()>>> = Mutex::new(None);
108
109static SHUTDOWN_ON_ERROR: OnceLock<ShutdownOnError> = OnceLock::new();
110
111#[derive(Debug, Clone, PartialEq, Eq)]
113pub struct ShutdownOnErrorTrigger {
114 pub timestamp: UnixNanos,
116 pub component: Ustr,
118 pub message: String,
120}
121
122#[derive(Debug, Default)]
123struct ShutdownOnError {
124 armed: AtomicBool,
125 triggered: AtomicBool,
126 pending: Mutex<Option<ShutdownOnErrorTrigger>>,
127}
128
129impl ShutdownOnError {
130 fn is_armed(&self) -> bool {
131 self.armed.load(Ordering::Acquire)
132 }
133
134 fn arm(&self, enabled: bool) {
135 if let Ok(mut pending) = self.pending.lock() {
136 pending.take();
137 }
138 self.triggered.store(false, Ordering::Release);
139 self.armed.store(enabled, Ordering::Release);
140 }
141
142 fn disarm(&self) {
143 self.armed.store(false, Ordering::Release);
144 self.triggered.store(false, Ordering::Release);
145
146 if let Ok(mut pending) = self.pending.lock() {
147 pending.take();
148 }
149 }
150
151 fn maybe_record_trigger<F>(
152 &self,
153 level: Level,
154 timestamp: UnixNanos,
155 component: Ustr,
156 message: F,
157 ) where
158 F: FnOnce() -> String,
159 {
160 if !self.armed.load(Ordering::Acquire) || level != Level::Error {
161 return;
162 }
163
164 let Ok(mut pending) = self.pending.lock() else {
165 return;
166 };
167
168 if self
169 .triggered
170 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
171 .is_err()
172 {
173 return;
174 }
175
176 *pending = Some(ShutdownOnErrorTrigger {
177 timestamp,
178 component,
179 message: message(),
180 });
181 }
182
183 fn take_trigger(&self) -> Option<ShutdownOnErrorTrigger> {
184 if !self.triggered.load(Ordering::Acquire) {
185 return None;
186 }
187
188 self.pending
189 .lock()
190 .ok()
191 .and_then(|mut pending| pending.take())
192 }
193
194 fn try_drain_trigger<F>(&self, drain: F) -> bool
195 where
196 F: FnOnce(&ShutdownOnErrorTrigger) -> bool,
197 {
198 if !self.triggered.load(Ordering::Acquire) {
199 return false;
200 }
201
202 let Ok(mut pending) = self.pending.lock() else {
203 return false;
204 };
205
206 let Some(trigger) = pending.as_ref() else {
207 return false;
208 };
209
210 if !drain(trigger) {
211 return false;
212 }
213
214 pending.take();
215 true
216 }
217}
218
219pub fn arm_shutdown_on_error(enabled: bool) {
221 shutdown_on_error().arm(enabled);
222}
223
224pub fn disarm_shutdown_on_error() {
226 shutdown_on_error().disarm();
227}
228
229pub fn take_shutdown_on_error_trigger() -> Option<ShutdownOnErrorTrigger> {
231 shutdown_on_error().take_trigger()
232}
233
234pub fn try_drain_shutdown_on_error_trigger<F>(drain: F) -> bool
236where
237 F: FnOnce(&ShutdownOnErrorTrigger) -> bool,
238{
239 shutdown_on_error().try_drain_trigger(drain)
240}
241
242fn shutdown_on_error() -> &'static ShutdownOnError {
243 SHUTDOWN_ON_ERROR.get_or_init(ShutdownOnError::default)
244}
245
246#[derive(Debug, Clone)]
248struct FilterPolicy {
249 modules_by_longest_prefix: Vec<(Ustr, LevelFilter)>,
251 components: AHashMap<Ustr, LevelFilter>,
253 components_only: bool,
255}
256
257impl FilterPolicy {
258 fn from_config(config: &LoggerConfig) -> Option<Self> {
259 let modules_by_longest_prefix = sorted_module_filters_from_map(&config.module_level);
260 if !config.log_components_only
261 && modules_by_longest_prefix.is_empty()
262 && config.component_level.is_empty()
263 {
264 return None;
265 }
266
267 Some(Self {
268 modules_by_longest_prefix,
269 components: config.component_level.clone(),
270 components_only: config.log_components_only,
271 })
272 }
273
274 fn should_skip(&self, component: &Ustr, level: Level) -> bool {
275 should_filter_log_inner(
276 component,
277 level,
278 &self.modules_by_longest_prefix,
279 &self.components,
280 self.components_only,
281 )
282 }
283}
284
285#[derive(Debug)]
291pub struct Logger {
292 pub config: LoggerConfig,
297 filter_policy: Option<FilterPolicy>,
299 tx: std::sync::mpsc::Sender<LogEvent>,
301}
302
303#[derive(Debug)]
305pub enum LogEvent {
306 Log(LogLine),
308 Flush,
310 Sync(std::sync::mpsc::Sender<anyhow::Result<()>>),
312 Close,
314}
315
316#[derive(Clone, Debug, Serialize, Deserialize)]
318pub struct LogLine {
319 pub timestamp: UnixNanos,
321 pub level: Level,
323 pub color: LogColor,
325 pub component: Ustr,
327 pub message: String,
329 #[serde(default, skip_serializing_if = "SmallVec::is_empty")]
331 pub fields: LogFields,
332}
333
334impl Display for LogLine {
335 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
336 write!(f, "[{}] {}: {}", self.level, self.component, self.message)?;
337 for (k, v) in &self.fields {
338 write!(f, " {k}={v}")?;
339 }
340 Ok(())
341 }
342}
343
344#[derive(Clone, Debug)]
351pub struct LogLineWrapper {
352 line: LogLine,
354 cache: Option<String>,
356 colored: Option<String>,
358 trader_id: Ustr,
360}
361
362impl LogLineWrapper {
363 #[must_use]
365 pub const fn new(line: LogLine, trader_id: Ustr) -> Self {
366 Self {
367 line,
368 cache: None,
369 colored: None,
370 trader_id,
371 }
372 }
373
374 pub fn get_string(&mut self) -> &str {
379 self.cache.get_or_insert_with(|| {
380 let timestamp = unix_nanos_to_iso8601(self.line.timestamp);
381 let mut s = String::with_capacity(plain_log_line_capacity(
382 ×tamp,
383 self.trader_id,
384 &self.line,
385 ));
386
387 write!(
388 s,
389 "{} [{}] {}.{}: {}",
390 timestamp, self.line.level, self.trader_id, self.line.component, self.line.message,
391 )
392 .expect("writing to String should not fail");
393
394 for (k, v) in &self.line.fields {
395 s.push(' ');
396 s.push_str(k);
397 s.push('=');
398 s.push_str(v);
399 }
400 s.push('\n');
401 s
402 })
403 }
404
405 pub fn get_colored(&mut self) -> &str {
411 self.colored.get_or_insert_with(|| {
412 let timestamp = unix_nanos_to_iso8601(self.line.timestamp);
413 let color_ansi = self.line.color.as_ansi();
414 let mut s = String::with_capacity(colored_log_line_capacity(
415 ×tamp,
416 color_ansi,
417 self.trader_id,
418 &self.line,
419 ));
420
421 write!(
422 s,
423 "\x1b[1m{}\x1b[0m {}[{}] {}.{}: {}",
424 timestamp,
425 color_ansi,
426 self.line.level,
427 self.trader_id,
428 self.line.component,
429 self.line.message,
430 )
431 .expect("writing to String should not fail");
432
433 for (k, v) in &self.line.fields {
434 s.push(' ');
435 s.push_str(k);
436 s.push('=');
437 s.push_str(v);
438 }
439 s.push_str("\x1b[0m\n");
440 s
441 })
442 }
443
444 #[must_use]
453 pub fn get_json(&self) -> String {
454 let mut json_string =
455 serde_json::to_string(&self).expect("Error serializing log event to string");
456 json_string.push('\n');
457 json_string
458 }
459}
460
461fn formatted_fields_len(fields: &LogFields) -> usize {
462 fields.iter().map(|(k, v)| 2 + k.len() + v.len()).sum()
463}
464
465fn log_line_capacity(
466 timestamp: &str,
467 trader_id: Ustr,
468 line: &LogLine,
469 overhead: usize,
470 ansi_extra_len: usize,
471) -> usize {
472 timestamp.len()
473 + overhead
474 + ansi_extra_len
475 + MAX_LEVEL_DISPLAY_LEN
476 + trader_id.len()
477 + line.component.len()
478 + line.message.len()
479 + formatted_fields_len(&line.fields)
480}
481
482fn plain_log_line_capacity(timestamp: &str, trader_id: Ustr, line: &LogLine) -> usize {
483 log_line_capacity(timestamp, trader_id, line, PLAIN_FORMAT_OVERHEAD, 0)
484}
485
486fn colored_log_line_capacity(
487 timestamp: &str,
488 color_ansi: &str,
489 trader_id: Ustr,
490 line: &LogLine,
491) -> usize {
492 log_line_capacity(
493 timestamp,
494 trader_id,
495 line,
496 COLORED_FORMAT_OVERHEAD,
497 color_ansi.len(),
498 )
499}
500
501impl Serialize for LogLineWrapper {
502 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
503 where
504 S: Serializer,
505 {
506 if has_duplicate_json_field(&self.line.fields) {
507 return serialize_log_line_with_indexmap(self, serializer);
508 }
509
510 let timestamp = unix_nanos_to_iso8601(self.line.timestamp);
511 let mut map = serializer.serialize_map(None)?;
512
513 map.serialize_entry("timestamp", ×tamp)?;
514 map.serialize_entry("trader_id", self.trader_id.as_str())?;
515 map.serialize_entry("level", &DisplayAsString(&self.line.level))?;
516 map.serialize_entry("color", &DisplayAsString(&self.line.color))?;
517 map.serialize_entry("component", self.line.component.as_str())?;
518 map.serialize_entry("message", &self.line.message)?;
519
520 for (k, v) in &self.line.fields {
521 let key = k.as_str();
522 if !is_reserved_json_key(key) {
523 map.serialize_entry(key, v)?;
524 }
525 }
526
527 map.end()
528 }
529}
530
531struct DisplayAsString<'a, T: ?Sized>(&'a T);
532
533impl<T> Serialize for DisplayAsString<'_, T>
534where
535 T: Display + ?Sized,
536{
537 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
538 where
539 S: Serializer,
540 {
541 serializer.collect_str(self.0)
542 }
543}
544
545fn is_reserved_json_key(key: &str) -> bool {
546 matches!(
547 key,
548 "timestamp" | "trader_id" | "level" | "color" | "component" | "message"
549 )
550}
551
552fn has_duplicate_json_field(fields: &LogFields) -> bool {
553 if fields.is_empty() {
554 return false;
555 }
556
557 for (idx, (key, _)) in fields.iter().enumerate() {
558 let key = key.as_str();
559 if is_reserved_json_key(key) {
560 continue;
561 }
562
563 if fields
564 .iter()
565 .take(idx)
566 .any(|(prev, _)| !is_reserved_json_key(prev.as_str()) && prev.as_str() == key)
567 {
568 return true;
569 }
570 }
571
572 false
573}
574
575fn serialize_log_line_with_indexmap<S>(
576 wrapper: &LogLineWrapper,
577 serializer: S,
578) -> Result<S::Ok, S::Error>
579where
580 S: Serializer,
581{
582 let mut json_obj = IndexMap::new();
583 let timestamp = unix_nanos_to_iso8601(wrapper.line.timestamp);
584 json_obj.insert("timestamp".to_string(), timestamp);
585 json_obj.insert("trader_id".to_string(), wrapper.trader_id.to_string());
586 json_obj.insert("level".to_string(), wrapper.line.level.to_string());
587 json_obj.insert("color".to_string(), wrapper.line.color.to_string());
588 json_obj.insert("component".to_string(), wrapper.line.component.to_string());
589 json_obj.insert("message".to_string(), wrapper.line.message.clone());
590 for (k, v) in &wrapper.line.fields {
591 let key = k.as_str();
592 if !is_reserved_json_key(key) {
593 json_obj.insert(k.to_string(), v.clone());
594 }
595 }
596
597 json_obj.serialize(serializer)
598}
599
600fn sorted_module_filters_from_map(
601 module_level: &AHashMap<Ustr, LevelFilter>,
602) -> Vec<(Ustr, LevelFilter)> {
603 let mut filters: Vec<_> = module_level
604 .iter()
605 .map(|(path, level)| (*path, *level))
606 .collect();
607 filters.sort_by_key(|(path, _)| std::cmp::Reverse(path.len()));
608 filters
609}
610
611fn current_log_timestamp() -> UnixNanos {
612 if LOGGING_REALTIME.load(Ordering::Relaxed) {
613 get_atomic_clock_realtime().get_time_ns()
614 } else {
615 get_atomic_clock_static().get_time_ns()
616 }
617}
618
619fn intern_repeated(value: &str) -> Ustr {
620 REPEATED_USTR_CACHE.with(|cache| {
621 let mut cache_state = cache.borrow_mut();
622 let ptr = value.as_ptr() as usize;
623 let len = value.len();
624
625 for entry in cache_state.entries.iter().flatten() {
628 if entry.ptr == ptr && entry.len == len && entry.value.as_str() == value {
629 return entry.value;
630 }
631 }
632
633 let interned = Ustr::from(value);
634 let insert_idx = cache_state.next;
635 cache_state.entries[insert_idx] = Some(RepeatedUstrCacheEntry {
636 ptr,
637 len,
638 value: interned,
639 });
640 cache_state.next = (insert_idx + 1) % REPEATED_USTR_CACHE_CAP;
641 interned
642 })
643}
644
645fn intern_component_value(value: &log::kv::Value<'_>) -> Ustr {
646 match value.to_borrowed_str() {
647 Some(component) => intern_repeated(component),
648 None => Ustr::from(&value.to_string()),
649 }
650}
651
652struct ComponentProbe {
653 component: Option<Ustr>,
654}
655
656impl ComponentProbe {
657 const fn new() -> Self {
658 Self { component: None }
659 }
660}
661
662impl<'kvs> log::kv::VisitSource<'kvs> for ComponentProbe {
663 fn visit_pair(
664 &mut self,
665 key: log::kv::Key<'kvs>,
666 value: log::kv::Value<'kvs>,
667 ) -> Result<(), log::kv::Error> {
668 if key.as_str() == KV_COMPONENT {
669 self.component = Some(intern_component_value(&value));
670 }
671 Ok(())
672 }
673}
674
675struct PayloadCollector {
676 color: Option<LogColor>,
677 fields: LogFields,
678}
679
680impl PayloadCollector {
681 fn new() -> Self {
682 Self {
683 color: None,
684 fields: SmallVec::new(),
685 }
686 }
687}
688
689impl<'kvs> log::kv::VisitSource<'kvs> for PayloadCollector {
690 fn visit_pair(
691 &mut self,
692 key: log::kv::Key<'kvs>,
693 value: log::kv::Value<'kvs>,
694 ) -> Result<(), log::kv::Error> {
695 match key.as_str() {
696 KV_COLOR => {
697 self.color = value.to_u64().map(|v| (v as u8).into());
698 }
699 KV_COMPONENT => {}
700 _ => {
701 self.fields
702 .push((Ustr::from(key.as_str()), value.to_string()));
703 }
704 }
705 Ok(())
706 }
707}
708
709struct FieldCollector {
710 color: Option<LogColor>,
711 component: Option<Ustr>,
712 fields: LogFields,
713}
714
715impl FieldCollector {
716 fn new() -> Self {
717 Self {
718 color: None,
719 component: None,
720 fields: SmallVec::new(),
721 }
722 }
723}
724
725impl<'kvs> log::kv::VisitSource<'kvs> for FieldCollector {
726 fn visit_pair(
727 &mut self,
728 key: log::kv::Key<'kvs>,
729 value: log::kv::Value<'kvs>,
730 ) -> Result<(), log::kv::Error> {
731 match key.as_str() {
732 KV_COLOR => {
733 self.color = value.to_u64().map(|v| (v as u8).into());
734 }
735 KV_COMPONENT => {
736 self.component = Some(intern_component_value(&value));
737 }
738 _ => {
739 self.fields
740 .push((Ustr::from(key.as_str()), value.to_string()));
741 }
742 }
743 Ok(())
744 }
745}
746
747impl Log for Logger {
748 fn enabled(&self, metadata: &log::Metadata) -> bool {
749 if LOGGING_BYPASSED.load(Ordering::Relaxed) {
750 return metadata.level() == Level::Error && shutdown_on_error().is_armed();
751 }
752
753 metadata.level() == Level::Error
754 || metadata.level() <= self.config.stdout_level
755 || metadata.level() <= self.config.fileout_level
756 }
757
758 fn log(&self, record: &log::Record) {
759 let level = record.level();
760
761 if LOGGING_BYPASSED.load(Ordering::Relaxed) {
762 if level == Level::Error {
763 record_shutdown_on_error(record, level);
764 }
765 return;
766 }
767
768 if self.enabled(record.metadata()) {
769 if let Some(filter_policy) = &self.filter_policy {
770 let mut probe = ComponentProbe::new();
773 let _ = record.key_values().visit(&mut probe);
774 let component = probe
775 .component
776 .unwrap_or_else(|| intern_repeated(record.metadata().target()));
777
778 if filter_policy.should_skip(&component, level) {
779 if level == Level::Error {
780 shutdown_on_error().maybe_record_trigger(
781 level,
782 current_log_timestamp(),
783 component,
784 || format!("{}", record.args()),
785 );
786 }
787 return;
788 }
789
790 let timestamp = current_log_timestamp();
791 let mut collector = PayloadCollector::new();
792 let _ = record.key_values().visit(&mut collector);
793 let color = collector.color.unwrap_or_else(|| level.into());
794
795 let line = LogLine {
796 timestamp,
797 level,
798 color,
799 component,
800 message: format!("{}", record.args()),
801 fields: collector.fields,
802 };
803
804 shutdown_on_error().maybe_record_trigger(
805 line.level,
806 line.timestamp,
807 line.component,
808 || line.message.clone(),
809 );
810 self.send_log_line(line);
811 return;
812 }
813
814 let timestamp = current_log_timestamp();
816 let mut collector = FieldCollector::new();
817 let _ = record.key_values().visit(&mut collector);
818 let color = collector.color.unwrap_or_else(|| level.into());
819 let component = collector
820 .component
821 .unwrap_or_else(|| intern_repeated(record.metadata().target()));
822
823 let line = LogLine {
824 timestamp,
825 level,
826 color,
827 component,
828 message: format!("{}", record.args()),
829 fields: collector.fields,
830 };
831
832 shutdown_on_error().maybe_record_trigger(
833 line.level,
834 line.timestamp,
835 line.component,
836 || line.message.clone(),
837 );
838 self.send_log_line(line);
839 }
840 }
841
842 fn flush(&self) {
843 if LOGGING_BYPASSED.load(Ordering::Relaxed) {
845 return;
846 }
847
848 if let Err(e) = self.tx.send(LogEvent::Flush) {
849 eprintln!("Error sending flush log event: {e}");
850 }
851 }
852}
853
854fn record_shutdown_on_error(record: &log::Record, level: Level) {
855 let mut probe = ComponentProbe::new();
856 let _ = record.key_values().visit(&mut probe);
857 let component = probe
858 .component
859 .unwrap_or_else(|| intern_repeated(record.metadata().target()));
860
861 shutdown_on_error().maybe_record_trigger(level, current_log_timestamp(), component, || {
862 format!("{}", record.args())
863 });
864}
865
866impl Logger {
867 #[doc(hidden)]
872 #[must_use]
873 pub fn new_for_benchmark(config: LoggerConfig, tx: std::sync::mpsc::Sender<LogEvent>) -> Self {
874 let filter_policy = FilterPolicy::from_config(&config);
875
876 Self {
877 config,
878 filter_policy,
879 tx,
880 }
881 }
882
883 fn send_log_line(&self, line: LogLine) {
884 if let Err(SendError(LogEvent::Log(line))) = self.tx.send(LogEvent::Log(line)) {
885 eprintln!("Error sending log event (receiver closed): {line}");
886 }
887 }
888
889 pub fn init_with_env(
895 trader_id: TraderId,
896 instance_id: UUID4,
897 file_config: FileWriterConfig,
898 ) -> anyhow::Result<LogGuard> {
899 let config = LoggerConfig::from_env()?;
900 Self::init_with_config(trader_id, instance_id, config, file_config)
901 }
902
903 pub fn init_with_config(
909 trader_id: TraderId,
910 instance_id: UUID4,
911 config: LoggerConfig,
912 file_config: FileWriterConfig,
913 ) -> anyhow::Result<LogGuard> {
914 if super::LOGGING_INITIALIZED.load(Ordering::SeqCst) {
916 return LogGuard::new().ok_or_else(|| {
917 anyhow::anyhow!("Logging already initialized but new guard could not be created")
918 });
919 }
920
921 let (tx, rx) = std::sync::mpsc::channel::<LogEvent>();
922 let filter_policy = FilterPolicy::from_config(&config);
923
924 let logger_tx = tx.clone();
925 let logger = Self {
926 config: config.clone(),
927 filter_policy,
928 tx: logger_tx,
929 };
930
931 set_boxed_logger(Box::new(logger))?;
932
933 if LOGGER_TX.set(tx).is_err() {
935 debug_assert!(
936 false,
937 "LOGGER_TX already set - re-initialization not supported"
938 );
939 }
940
941 if config.bypass_logging {
942 super::logging_set_bypass();
943 }
944
945 let is_colored = config.is_colored;
946
947 let print_config = config.print_config;
948 if print_config {
949 println!("STATIC_MAX_LEVEL={STATIC_MAX_LEVEL}");
950 println!("Logger initialized with {config:?} {file_config:?}");
951 }
952
953 #[cfg(not(all(feature = "simulation", madsim)))]
954 {
955 let handle = std::thread::Builder::new()
956 .name(LOGGING.to_string())
957 .spawn(move || {
958 Self::handle_messages(
959 trader_id.to_string(),
960 instance_id.to_string(),
961 config,
962 file_config,
963 rx,
964 );
965 })?;
966
967 if let Ok(mut handle_guard) = LOGGER_HANDLE.lock() {
969 debug_assert!(
970 handle_guard.is_none(),
971 "LOGGER_HANDLE already set - re-initialization not supported"
972 );
973 *handle_guard = Some(handle);
974 }
975 }
976
977 #[cfg(all(feature = "simulation", madsim))]
978 {
979 let _ = (trader_id, instance_id, config, file_config, rx);
984 super::logging_set_bypass();
985 }
986
987 let max_level = log::LevelFilter::Trace;
988 set_max_level(max_level);
989
990 if print_config {
991 println!("Logger set as `log` implementation with max level {max_level}");
992 }
993
994 super::LOGGING_INITIALIZED.store(true, Ordering::SeqCst);
995 super::LOGGING_COLORED.store(is_colored, Ordering::SeqCst);
996
997 LogGuard::new()
998 .ok_or_else(|| anyhow::anyhow!("Failed to create LogGuard from global sender"))
999 }
1000
1001 #[cfg(not(all(feature = "simulation", madsim)))]
1002 #[expect(clippy::needless_pass_by_value)]
1003 fn handle_messages(
1004 trader_id: String,
1005 instance_id: String,
1006 config: LoggerConfig,
1007 file_config: FileWriterConfig,
1008 rx: std::sync::mpsc::Receiver<LogEvent>,
1009 ) {
1010 let LoggerConfig {
1011 stdout_level,
1012 fileout_level,
1013 component_level: _,
1014 module_level: _,
1015 log_components_only: _,
1016 is_colored,
1017 print_config: _,
1018 use_tracing: _,
1019 bypass_logging: _,
1020 file_config: _,
1021 clear_log_file,
1022 fileout_sync_on_flush,
1023 buffered_stdout,
1024 } = config;
1025
1026 let trader_id_cache = Ustr::from(&trader_id);
1027
1028 let mut stdout_writer = StdoutWriter::new(stdout_level, is_colored, buffered_stdout);
1030 let mut stderr_writer = StderrWriter::new(is_colored);
1031
1032 let mut file_writer_opt = if fileout_level == LevelFilter::Off {
1034 None
1035 } else {
1036 FileWriter::new(
1037 trader_id,
1038 instance_id,
1039 file_config,
1040 fileout_level,
1041 clear_log_file,
1042 fileout_sync_on_flush,
1043 )
1044 };
1045
1046 let process_event = |event: LogEvent,
1047 stdout_writer: &mut StdoutWriter,
1048 stderr_writer: &mut StderrWriter,
1049 file_writer_opt: &mut Option<FileWriter>| {
1050 match event {
1051 LogEvent::Log(line) => {
1052 let mut wrapper = LogLineWrapper::new(line, trader_id_cache);
1053
1054 if stderr_writer.enabled(&wrapper.line) {
1055 if is_colored {
1056 stderr_writer.write(wrapper.get_colored());
1057 } else {
1058 stderr_writer.write(wrapper.get_string());
1059 }
1060 }
1061
1062 if stdout_writer.enabled(&wrapper.line) {
1063 if is_colored {
1064 stdout_writer.write(wrapper.get_colored());
1065 } else {
1066 stdout_writer.write(wrapper.get_string());
1067 }
1068 }
1069
1070 if let Some(file_writer) = file_writer_opt
1071 && file_writer.enabled(&wrapper.line)
1072 {
1073 if file_writer.json_format {
1074 file_writer.write(&wrapper.get_json());
1075 } else {
1076 file_writer.write(wrapper.get_string());
1077 }
1078 }
1079 }
1080 LogEvent::Flush => {
1081 stdout_writer.flush();
1082 stderr_writer.flush();
1083
1084 if let Some(file_writer) = file_writer_opt {
1085 file_writer.flush();
1086 }
1087 }
1088 LogEvent::Sync(done) => {
1089 let result = if let Some(file_writer) = file_writer_opt {
1090 file_writer.flush_and_sync().map_err(anyhow::Error::from)
1091 } else {
1092 Ok(())
1093 };
1094
1095 let _ = done.send(result);
1096 }
1097 LogEvent::Close => {
1098 }
1100 }
1101 };
1102
1103 while let Ok(event) = rx.recv() {
1105 match event {
1106 LogEvent::Log(_) | LogEvent::Flush | LogEvent::Sync(_) => process_event(
1107 event,
1108 &mut stdout_writer,
1109 &mut stderr_writer,
1110 &mut file_writer_opt,
1111 ),
1112 LogEvent::Close => {
1113 stdout_writer.flush();
1115 stderr_writer.flush();
1116
1117 if let Some(ref mut file_writer) = file_writer_opt {
1118 file_writer.flush();
1119 }
1120
1121 while let Ok(evt) = rx.try_recv() {
1124 match evt {
1125 LogEvent::Close => (), _ => process_event(
1127 evt,
1128 &mut stdout_writer,
1129 &mut stderr_writer,
1130 &mut file_writer_opt,
1131 ),
1132 }
1133 }
1134
1135 stdout_writer.flush();
1137 stderr_writer.flush();
1138
1139 if let Some(ref mut file_writer) = file_writer_opt {
1140 file_writer.flush_and_sync_logged();
1141 }
1142
1143 break;
1144 }
1145 }
1146 }
1147 }
1148}
1149
1150#[must_use]
1157pub fn should_filter_log(
1158 component: &Ustr,
1159 line_level: log::Level,
1160 module_filters_sorted: &[(Ustr, LevelFilter)],
1161 component_level: &AHashMap<Ustr, LevelFilter>,
1162 log_components_only: bool,
1163) -> bool {
1164 should_filter_log_inner(
1165 component,
1166 line_level,
1167 module_filters_sorted,
1168 component_level,
1169 log_components_only,
1170 )
1171}
1172
1173fn should_filter_log_inner(
1174 component: &Ustr,
1175 line_level: log::Level,
1176 module_filters_sorted: &[(Ustr, LevelFilter)],
1177 component_level: &AHashMap<Ustr, LevelFilter>,
1178 log_components_only: bool,
1179) -> bool {
1180 if module_filters_sorted.is_empty() && component_level.is_empty() {
1181 return log_components_only;
1182 }
1183
1184 let module_filter = module_filters_sorted
1186 .iter()
1187 .find(|(path, _)| component.starts_with(path.as_str()))
1188 .map(|(_, level)| *level);
1189
1190 let component_filter = component_level.get(component).copied();
1191
1192 if log_components_only && module_filter.is_none() && component_filter.is_none() {
1193 return true;
1194 }
1195
1196 if let Some(filter_level) = module_filter.or(component_filter)
1198 && line_level > filter_level
1199 {
1200 return true;
1201 }
1202
1203 false
1204}
1205
1206pub(crate) fn shutdown_graceful() {
1215 LOGGING_BYPASSED.store(true, Ordering::SeqCst);
1217 log::set_max_level(log::LevelFilter::Off);
1218
1219 if let Some(tx) = LOGGER_TX.get() {
1221 let _ = tx.send(LogEvent::Close);
1222 }
1223
1224 if let Ok(mut handle_guard) = LOGGER_HANDLE.lock()
1225 && let Some(handle) = handle_guard.take()
1226 && handle.thread().id() != std::thread::current().id()
1227 {
1228 let _ = handle.join();
1229 }
1230
1231 LOGGING_INITIALIZED.store(false, Ordering::SeqCst);
1232}
1233
1234pub fn sync_to_disk() -> anyhow::Result<()> {
1242 if !LOGGING_INITIALIZED.load(Ordering::SeqCst) {
1243 return Ok(());
1244 }
1245
1246 let Some(tx) = LOGGER_TX.get() else {
1247 return Ok(());
1248 };
1249
1250 let (done_tx, done_rx) = std::sync::mpsc::channel();
1251 tx.send(LogEvent::Sync(done_tx))
1252 .map_err(|e| anyhow::anyhow!("failed to request logging sync: {e}"))?;
1253
1254 done_rx
1255 .recv()
1256 .map_err(|e| anyhow::anyhow!("failed to receive logging sync acknowledgement: {e}"))?
1257}
1258
1259pub fn log<T: AsRef<str>>(level: LogLevel, color: LogColor, component: Ustr, message: T) {
1260 let color = Value::from(color as u8);
1261
1262 match level {
1263 LogLevel::Off => {}
1264 LogLevel::Trace => {
1265 log::trace!(component = component.to_value(), color = color; "{}", message.as_ref());
1266 }
1267 LogLevel::Debug => {
1268 log::debug!(component = component.to_value(), color = color; "{}", message.as_ref());
1269 }
1270 LogLevel::Info => {
1271 log::info!(component = component.to_value(), color = color; "{}", message.as_ref());
1272 }
1273 LogLevel::Warning => {
1274 log::warn!(component = component.to_value(), color = color; "{}", message.as_ref());
1275 }
1276 LogLevel::Error => {
1277 log::error!(component = component.to_value(), color = color; "{}", message.as_ref());
1278 }
1279 }
1280}
1281
1282#[cfg_attr(
1309 feature = "python",
1310 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
1311)]
1312#[cfg_attr(
1313 feature = "python",
1314 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
1315)]
1316#[derive(Debug)]
1317pub struct LogGuard {
1318 tx: std::sync::mpsc::Sender<LogEvent>,
1319}
1320
1321impl LogGuard {
1322 #[must_use]
1327 pub fn new() -> Option<Self> {
1328 let tx = LOGGER_TX.get()?;
1329 LOGGING_GUARDS_ACTIVE
1330 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
1331 if count == u8::MAX {
1332 None
1333 } else {
1334 Some(count + 1)
1335 }
1336 })
1337 .ok()?;
1338
1339 Some(Self { tx: tx.clone() })
1340 }
1341}
1342
1343impl Drop for LogGuard {
1344 fn drop(&mut self) {
1349 let previous_count = LOGGING_GUARDS_ACTIVE
1350 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
1351 assert!(count != 0, "LogGuard reference count underflow");
1352 Some(count - 1)
1353 })
1354 .expect("Failed to decrement LogGuard count");
1355
1356 if previous_count == 1 && LOGGING_GUARDS_ACTIVE.load(Ordering::SeqCst) == 0 {
1358 LOGGING_BYPASSED.store(true, Ordering::SeqCst);
1362
1363 log::set_max_level(log::LevelFilter::Off);
1365
1366 let _ = self.tx.send(LogEvent::Close);
1368
1369 if let Ok(mut handle_guard) = LOGGER_HANDLE.lock()
1371 && let Some(handle) = handle_guard.take()
1372 {
1373 if handle.thread().id() != std::thread::current().id() {
1375 let _ = handle.join();
1376 }
1377 }
1378
1379 LOGGING_INITIALIZED.store(false, Ordering::SeqCst);
1381 } else {
1382 let _ = self.tx.send(LogEvent::Flush);
1384 }
1385 }
1386}
1387
1388#[cfg(test)]
1389mod tests {
1390 use ahash::AHashMap;
1391 use log::LevelFilter;
1392 use nautilus_core::UUID4;
1393 use nautilus_model::identifiers::TraderId;
1394 use rstest::*;
1395 use serde_json::Value;
1396 use tempfile::tempdir;
1397 use ustr::Ustr;
1398
1399 use super::*;
1400 use crate::enums::LogColor;
1401
1402 #[rstest]
1403 fn log_message_serialization() {
1404 let log_message = LogLine {
1405 timestamp: UnixNanos::default(),
1406 level: log::Level::Info,
1407 color: LogColor::Normal,
1408 component: Ustr::from("Portfolio"),
1409 message: "This is a log message".to_string(),
1410 fields: SmallVec::new(),
1411 };
1412
1413 let serialized_json = serde_json::to_string(&log_message).unwrap();
1414 let deserialized_value: Value = serde_json::from_str(&serialized_json).unwrap();
1415
1416 assert_eq!(deserialized_value["level"], "INFO");
1417 assert_eq!(deserialized_value["component"], "Portfolio");
1418 assert_eq!(deserialized_value["message"], "This is a log message");
1419 }
1420
1421 #[rstest]
1422 fn log_config_parsing() {
1423 let config =
1424 LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Error")
1425 .unwrap();
1426 assert_eq!(
1427 config,
1428 LoggerConfig {
1429 stdout_level: LevelFilter::Info,
1430 fileout_level: LevelFilter::Debug,
1431 component_level: AHashMap::from_iter(vec![(
1432 Ustr::from("RiskEngine"),
1433 LevelFilter::Error
1434 )]),
1435 module_level: AHashMap::new(),
1436 log_components_only: false,
1437 is_colored: true,
1438 print_config: false,
1439 use_tracing: false,
1440 ..Default::default()
1441 }
1442 );
1443 }
1444
1445 #[rstest]
1446 fn log_config_parsing2() {
1447 let config = LoggerConfig::from_spec("stdout=Warn;print_config;fileout=Error;").unwrap();
1448 assert_eq!(
1449 config,
1450 LoggerConfig {
1451 stdout_level: LevelFilter::Warn,
1452 fileout_level: LevelFilter::Error,
1453 component_level: AHashMap::new(),
1454 module_level: AHashMap::new(),
1455 log_components_only: false,
1456 is_colored: true,
1457 print_config: true,
1458 use_tracing: false,
1459 ..Default::default()
1460 }
1461 );
1462 }
1463
1464 #[rstest]
1465 fn log_config_parsing_with_log_components_only() {
1466 let config =
1467 LoggerConfig::from_spec("stdout=Info;log_components_only;RiskEngine=Debug").unwrap();
1468 assert_eq!(
1469 config,
1470 LoggerConfig {
1471 stdout_level: LevelFilter::Info,
1472 fileout_level: LevelFilter::Off,
1473 component_level: AHashMap::from_iter(vec![(
1474 Ustr::from("RiskEngine"),
1475 LevelFilter::Debug
1476 )]),
1477 module_level: AHashMap::new(),
1478 log_components_only: true,
1479 is_colored: true,
1480 print_config: false,
1481 use_tracing: false,
1482 ..Default::default()
1483 }
1484 );
1485 }
1486
1487 #[rstest]
1488 fn test_log_line_wrapper_plain_string() {
1489 let line = LogLine {
1490 timestamp: 1_650_000_000_000_000_000.into(),
1491 level: log::Level::Info,
1492 color: LogColor::Normal,
1493 component: Ustr::from("TestComponent"),
1494 message: "Test message".to_string(),
1495 fields: SmallVec::new(),
1496 };
1497
1498 let mut wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-001"));
1499 let result = wrapper.get_string();
1500
1501 assert!(result.contains("TRADER-001"));
1502 assert!(result.contains("TestComponent"));
1503 assert!(result.contains("Test message"));
1504 assert!(result.contains("[INFO]"));
1505 assert!(result.ends_with('\n'));
1506 assert!(!result.contains("\x1b["));
1508 }
1509
1510 #[rstest]
1511 fn test_log_line_wrapper_colored_string() {
1512 let line = LogLine {
1513 timestamp: 1_650_000_000_000_000_000.into(),
1514 level: log::Level::Info,
1515 color: LogColor::Green,
1516 component: Ustr::from("TestComponent"),
1517 message: "Test message".to_string(),
1518 fields: SmallVec::new(),
1519 };
1520
1521 let mut wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-001"));
1522 let result = wrapper.get_colored();
1523
1524 assert!(result.contains("TRADER-001"));
1525 assert!(result.contains("TestComponent"));
1526 assert!(result.contains("Test message"));
1527 assert!(result.contains("\x1b["));
1529 assert!(result.ends_with('\n'));
1530 }
1531
1532 #[rstest]
1533 fn test_log_line_wrapper_json_output() {
1534 let line = LogLine {
1535 timestamp: 1_650_000_000_000_000_000.into(),
1536 level: log::Level::Warn,
1537 color: LogColor::Yellow,
1538 component: Ustr::from("RiskEngine"),
1539 message: "Warning message".to_string(),
1540 fields: SmallVec::new(),
1541 };
1542
1543 let wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-002"));
1544 let json = wrapper.get_json();
1545
1546 let parsed: Value = serde_json::from_str(json.trim()).unwrap();
1547 assert_eq!(parsed["trader_id"], "TRADER-002");
1548 assert_eq!(parsed["component"], "RiskEngine");
1549 assert_eq!(parsed["message"], "Warning message");
1550 assert_eq!(parsed["level"], "WARN");
1551 assert_eq!(parsed["color"], "YELLOW");
1552 }
1553
1554 #[rstest]
1555 fn test_log_line_wrapper_caches_string() {
1556 let line = LogLine {
1557 timestamp: 1_650_000_000_000_000_000.into(),
1558 level: log::Level::Info,
1559 color: LogColor::Normal,
1560 component: Ustr::from("Test"),
1561 message: "Cached".to_string(),
1562 fields: SmallVec::new(),
1563 };
1564
1565 let mut wrapper = LogLineWrapper::new(line, Ustr::from("TRADER"));
1566 let first = wrapper.get_string().to_string();
1567 let second = wrapper.get_string().to_string();
1568
1569 assert_eq!(first, second);
1570 }
1571
1572 #[rstest]
1573 fn test_log_line_display() {
1574 let line = LogLine {
1575 timestamp: 0.into(),
1576 level: log::Level::Error,
1577 color: LogColor::Red,
1578 component: Ustr::from("Component"),
1579 message: "Error occurred".to_string(),
1580 fields: SmallVec::new(),
1581 };
1582
1583 let display = format!("{line}");
1584 assert_eq!(display, "[ERROR] Component: Error occurred");
1585 }
1586
1587 #[rstest]
1588 fn test_log_line_display_with_fields() {
1589 let line = LogLine {
1590 timestamp: 0.into(),
1591 level: log::Level::Info,
1592 color: LogColor::Normal,
1593 component: Ustr::from("RiskEngine"),
1594 message: "Order filled".to_string(),
1595 fields: smallvec::smallvec![
1596 (Ustr::from("venue"), "BINANCE".to_string()),
1597 (Ustr::from("order_id"), "O-001".to_string()),
1598 ],
1599 };
1600
1601 let display = format!("{line}");
1602 assert_eq!(
1603 display,
1604 "[INFO] RiskEngine: Order filled venue=BINANCE order_id=O-001"
1605 );
1606 }
1607
1608 #[rstest]
1609 fn test_log_line_wrapper_plain_string_with_fields() {
1610 let line = LogLine {
1611 timestamp: 1_650_000_000_000_000_000.into(),
1612 level: log::Level::Info,
1613 color: LogColor::Normal,
1614 component: Ustr::from("DataEngine"),
1615 message: "Connected".to_string(),
1616 fields: smallvec::smallvec![
1617 (Ustr::from("venue"), "BINANCE".to_string()),
1618 (Ustr::from("product_type"), "SPOT".to_string()),
1619 ],
1620 };
1621
1622 let mut wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-001"));
1623 let result = wrapper.get_string();
1624
1625 assert!(result.contains("Connected"));
1626 assert!(result.contains("venue=BINANCE"));
1627 assert!(result.contains("product_type=SPOT"));
1628 assert!(result.ends_with('\n'));
1629 assert!(!result.contains("\x1b["));
1630 }
1631
1632 #[rstest]
1633 fn test_log_line_wrapper_json_with_fields() {
1634 let line = LogLine {
1635 timestamp: 1_650_000_000_000_000_000.into(),
1636 level: log::Level::Info,
1637 color: LogColor::Normal,
1638 component: Ustr::from("RiskEngine"),
1639 message: "Order filled".to_string(),
1640 fields: smallvec::smallvec![
1641 (Ustr::from("strategy_id"), "S-001".to_string()),
1642 (Ustr::from("venue"), "BINANCE".to_string()),
1643 ],
1644 };
1645
1646 let wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-001"));
1647 let json = wrapper.get_json();
1648
1649 let parsed: Value = serde_json::from_str(json.trim()).unwrap();
1650 assert_eq!(parsed["component"], "RiskEngine");
1651 assert_eq!(parsed["message"], "Order filled");
1652 assert_eq!(parsed["strategy_id"], "S-001");
1653 assert_eq!(parsed["venue"], "BINANCE");
1654 }
1655
1656 #[rstest]
1657 fn test_log_line_wrapper_json_no_fields_has_no_extra_keys() {
1658 let line = LogLine {
1659 timestamp: 1_650_000_000_000_000_000.into(),
1660 level: log::Level::Info,
1661 color: LogColor::Normal,
1662 component: Ustr::from("Test"),
1663 message: "Simple".to_string(),
1664 fields: SmallVec::new(),
1665 };
1666
1667 let wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-001"));
1668 let json = wrapper.get_json();
1669
1670 let parsed: Value = serde_json::from_str(json.trim()).unwrap();
1671 let obj = parsed.as_object().unwrap();
1672 assert_eq!(obj.len(), 6); }
1674
1675 #[rstest]
1676 fn test_log_line_wrapper_json_reserved_keys_not_overwritten() {
1677 let line = LogLine {
1678 timestamp: 1_650_000_000_000_000_000.into(),
1679 level: log::Level::Warn,
1680 color: LogColor::Normal,
1681 component: Ustr::from("Test"),
1682 message: "Real message".to_string(),
1683 fields: smallvec::smallvec![
1684 (Ustr::from("level"), "FAKE".to_string()),
1685 (Ustr::from("message"), "injected".to_string()),
1686 (Ustr::from("timestamp"), "bogus".to_string()),
1687 (Ustr::from("venue"), "BINANCE".to_string()),
1688 ],
1689 };
1690
1691 let wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-001"));
1692 let json = wrapper.get_json();
1693 let parsed: Value = serde_json::from_str(json.trim()).unwrap();
1694
1695 assert_eq!(parsed["level"], "WARN");
1696 assert_eq!(parsed["message"], "Real message");
1697 assert_ne!(parsed["timestamp"], "bogus");
1698 assert_eq!(parsed["venue"], "BINANCE");
1699 }
1700
1701 #[rstest]
1702 fn test_log_line_wrapper_json_duplicate_extra_fields_last_value_wins() {
1703 let line = LogLine {
1704 timestamp: 1_650_000_000_000_000_000.into(),
1705 level: log::Level::Info,
1706 color: LogColor::Normal,
1707 component: Ustr::from("Test"),
1708 message: "Duplicate field".to_string(),
1709 fields: smallvec::smallvec![
1710 (Ustr::from("venue"), "BINANCE".to_string()),
1711 (Ustr::from("venue"), "OKX".to_string()),
1712 ],
1713 };
1714
1715 let wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-001"));
1716 let json = wrapper.get_json();
1717 let parsed: Value = serde_json::from_str(json.trim()).unwrap();
1718
1719 assert_eq!(json.matches("\"venue\"").count(), 1);
1720 assert_eq!(parsed["venue"], "OKX");
1721 }
1722
1723 fn sorted_module_filters(map: AHashMap<Ustr, LevelFilter>) -> Vec<(Ustr, LevelFilter)> {
1725 let mut v: Vec<_> = map.into_iter().collect();
1726 v.sort_by_key(|b| std::cmp::Reverse(b.0.len()));
1727 v
1728 }
1729
1730 #[rstest]
1731 fn test_filter_no_filters_passes_all() {
1732 let module_filters = vec![];
1733 let component_level = AHashMap::new();
1734
1735 assert!(!should_filter_log(
1736 &Ustr::from("anything"),
1737 Level::Trace,
1738 &module_filters,
1739 &component_level,
1740 false
1741 ));
1742 }
1743
1744 #[rstest]
1745 fn test_filter_component_exact_match() {
1746 let module_filters = vec![];
1747 let component_level = AHashMap::from_iter([(Ustr::from("RiskEngine"), LevelFilter::Error)]);
1748
1749 assert!(should_filter_log(
1750 &Ustr::from("RiskEngine"),
1751 Level::Info,
1752 &module_filters,
1753 &component_level,
1754 false
1755 ));
1756 assert!(!should_filter_log(
1757 &Ustr::from("RiskEngine"),
1758 Level::Error,
1759 &module_filters,
1760 &component_level,
1761 false
1762 ));
1763 assert!(!should_filter_log(
1764 &Ustr::from("Portfolio"),
1765 Level::Info,
1766 &module_filters,
1767 &component_level,
1768 false
1769 ));
1770 }
1771
1772 #[rstest]
1773 fn test_filter_module_prefix_match() {
1774 let module_filters = vec![(Ustr::from("nautilus_okx::websocket"), LevelFilter::Debug)];
1775 let component_level = AHashMap::new();
1776
1777 assert!(!should_filter_log(
1778 &Ustr::from("nautilus_okx::websocket"),
1779 Level::Debug,
1780 &module_filters,
1781 &component_level,
1782 false
1783 ));
1784 assert!(!should_filter_log(
1785 &Ustr::from("nautilus_okx::websocket::handler"),
1786 Level::Debug,
1787 &module_filters,
1788 &component_level,
1789 false
1790 ));
1791 assert!(should_filter_log(
1792 &Ustr::from("nautilus_okx::websocket::handler"),
1793 Level::Trace,
1794 &module_filters,
1795 &component_level,
1796 false
1797 ));
1798 assert!(!should_filter_log(
1799 &Ustr::from("nautilus_binance::data"),
1800 Level::Trace,
1801 &module_filters,
1802 &component_level,
1803 false
1804 ));
1805 }
1806
1807 #[rstest]
1808 fn test_filter_longest_prefix_wins() {
1809 let module_filters = sorted_module_filters(AHashMap::from_iter([
1810 (Ustr::from("nautilus_okx"), LevelFilter::Error),
1811 (Ustr::from("nautilus_okx::websocket"), LevelFilter::Debug),
1812 ]));
1813 let component_level = AHashMap::new();
1814
1815 assert!(!should_filter_log(
1816 &Ustr::from("nautilus_okx::websocket::handler"),
1817 Level::Debug,
1818 &module_filters,
1819 &component_level,
1820 false
1821 ));
1822 assert!(should_filter_log(
1823 &Ustr::from("nautilus_okx::data"),
1824 Level::Debug,
1825 &module_filters,
1826 &component_level,
1827 false
1828 ));
1829 assert!(!should_filter_log(
1830 &Ustr::from("nautilus_okx::data"),
1831 Level::Error,
1832 &module_filters,
1833 &component_level,
1834 false
1835 ));
1836 }
1837
1838 #[rstest]
1839 fn test_filter_module_precedence_over_component() {
1840 let module_filters = vec![(Ustr::from("nautilus_okx::websocket"), LevelFilter::Debug)];
1841 let component_level =
1842 AHashMap::from_iter([(Ustr::from("nautilus_okx::websocket"), LevelFilter::Error)]);
1843
1844 assert!(!should_filter_log(
1845 &Ustr::from("nautilus_okx::websocket"),
1846 Level::Debug,
1847 &module_filters,
1848 &component_level,
1849 false
1850 ));
1851 }
1852
1853 #[rstest]
1854 fn test_filter_log_components_only_blocks_unknown() {
1855 let module_filters = vec![];
1856 let component_level = AHashMap::from_iter([(Ustr::from("RiskEngine"), LevelFilter::Debug)]);
1857
1858 assert!(should_filter_log(
1859 &Ustr::from("Portfolio"),
1860 Level::Info,
1861 &module_filters,
1862 &component_level,
1863 true
1864 ));
1865 assert!(!should_filter_log(
1866 &Ustr::from("RiskEngine"),
1867 Level::Info,
1868 &module_filters,
1869 &component_level,
1870 true
1871 ));
1872 }
1873
1874 #[rstest]
1875 fn test_filter_log_components_only_with_module() {
1876 let module_filters = vec![(Ustr::from("nautilus_okx"), LevelFilter::Debug)];
1877 let component_level = AHashMap::new();
1878
1879 assert!(!should_filter_log(
1880 &Ustr::from("nautilus_okx::websocket"),
1881 Level::Debug,
1882 &module_filters,
1883 &component_level,
1884 true
1885 ));
1886 assert!(should_filter_log(
1887 &Ustr::from("nautilus_binance::data"),
1888 Level::Debug,
1889 &module_filters,
1890 &component_level,
1891 true
1892 ));
1893 }
1894
1895 #[rstest]
1896 fn test_filter_level_comparison() {
1897 let module_filters = vec![];
1898 let component_level = AHashMap::from_iter([(Ustr::from("Test"), LevelFilter::Warn)]);
1899
1900 assert!(!should_filter_log(
1901 &Ustr::from("Test"),
1902 Level::Error,
1903 &module_filters,
1904 &component_level,
1905 false
1906 ));
1907 assert!(!should_filter_log(
1908 &Ustr::from("Test"),
1909 Level::Warn,
1910 &module_filters,
1911 &component_level,
1912 false
1913 ));
1914 assert!(should_filter_log(
1915 &Ustr::from("Test"),
1916 Level::Info,
1917 &module_filters,
1918 &component_level,
1919 false
1920 ));
1921 assert!(should_filter_log(
1922 &Ustr::from("Test"),
1923 Level::Debug,
1924 &module_filters,
1925 &component_level,
1926 false
1927 ));
1928 assert!(should_filter_log(
1929 &Ustr::from("Test"),
1930 Level::Trace,
1931 &module_filters,
1932 &component_level,
1933 false
1934 ));
1935 }
1936
1937 #[cfg(not(all(feature = "simulation", madsim)))]
1945 mod serial_tests {
1946 use std::{sync::atomic::Ordering, time::Duration};
1947
1948 use super::*;
1949 use crate::{
1950 logging::{
1951 LOGGING_BYPASSED, logging_clock_set_static_mode, logging_clock_set_static_time,
1952 logging_is_initialized, logging_set_bypass, logging_sync_to_disk,
1953 },
1954 testing::wait_until,
1955 };
1956
1957 #[rstest]
1958 fn test_shutdown_on_error_records_once_then_rearms() {
1959 disarm_shutdown_on_error();
1960
1961 let (tx, _rx) = std::sync::mpsc::channel();
1962 let logger = Logger::new_for_benchmark(LoggerConfig::default(), tx);
1963
1964 arm_shutdown_on_error(false);
1965 let args = format_args!("Disabled error");
1966 let record = log::Record::builder()
1967 .args(args)
1968 .level(Level::Error)
1969 .target("RunComponent")
1970 .build();
1971 log::Log::log(&logger, &record);
1972 assert_eq!(take_shutdown_on_error_trigger(), None);
1973
1974 arm_shutdown_on_error(true);
1975 let args = format_args!("First error");
1976 let record = log::Record::builder()
1977 .args(args)
1978 .level(Level::Error)
1979 .target("RunComponent")
1980 .build();
1981 log::Log::log(&logger, &record);
1982
1983 let args = format_args!("Second error");
1984 let record = log::Record::builder()
1985 .args(args)
1986 .level(Level::Error)
1987 .target("RunComponent")
1988 .build();
1989 log::Log::log(&logger, &record);
1990
1991 let first = take_shutdown_on_error_trigger().unwrap();
1992 assert_eq!(first.component, Ustr::from("RunComponent"));
1993 assert_eq!(first.message, "First error");
1994 assert_eq!(take_shutdown_on_error_trigger(), None);
1995
1996 arm_shutdown_on_error(true);
1997 let args = format_args!("Third error");
1998 let record = log::Record::builder()
1999 .args(args)
2000 .level(Level::Error)
2001 .target("RunComponent")
2002 .build();
2003 log::Log::log(&logger, &record);
2004
2005 let third = take_shutdown_on_error_trigger().unwrap();
2006 assert_eq!(third.component, Ustr::from("RunComponent"));
2007 assert_eq!(third.message, "Third error");
2008
2009 let (tx, rx) = std::sync::mpsc::channel();
2010 let logger = Logger::new_for_benchmark(
2011 LoggerConfig {
2012 log_components_only: true,
2013 ..Default::default()
2014 },
2015 tx,
2016 );
2017
2018 arm_shutdown_on_error(true);
2019 let args = format_args!("Filtered error");
2020 let record = log::Record::builder()
2021 .args(args)
2022 .level(Level::Error)
2023 .target("FilteredComponent")
2024 .build();
2025 log::Log::log(&logger, &record);
2026
2027 let trigger = take_shutdown_on_error_trigger().unwrap();
2028 assert_eq!(trigger.component, Ustr::from("FilteredComponent"));
2029 assert_eq!(trigger.message, "Filtered error");
2030 assert!(matches!(
2031 rx.try_recv(),
2032 Err(std::sync::mpsc::TryRecvError::Empty)
2033 ));
2034 disarm_shutdown_on_error();
2035 }
2036
2037 #[rstest]
2038 fn test_shutdown_on_error_records_bypassed_error() {
2039 LOGGING_BYPASSED.store(false, Ordering::Relaxed);
2040 disarm_shutdown_on_error();
2041
2042 let (tx, rx) = std::sync::mpsc::channel();
2043 let logger = Logger::new_for_benchmark(LoggerConfig::default(), tx);
2044 let metadata = log::Metadata::builder()
2045 .level(Level::Error)
2046 .target("BypassedComponent")
2047 .build();
2048
2049 logging_set_bypass();
2050 arm_shutdown_on_error(false);
2051 assert!(!log::Log::enabled(&logger, &metadata));
2052
2053 arm_shutdown_on_error(true);
2054 assert!(log::Log::enabled(&logger, &metadata));
2055
2056 let args = format_args!("Bypassed error");
2057 let record = log::Record::builder()
2058 .args(args)
2059 .level(Level::Error)
2060 .target("BypassedComponent")
2061 .build();
2062 log::Log::log(&logger, &record);
2063
2064 let trigger = take_shutdown_on_error_trigger().unwrap();
2065 assert_eq!(trigger.component, Ustr::from("BypassedComponent"));
2066 assert_eq!(trigger.message, "Bypassed error");
2067 assert!(matches!(
2068 rx.try_recv(),
2069 Err(std::sync::mpsc::TryRecvError::Empty)
2070 ));
2071
2072 LOGGING_BYPASSED.store(false, Ordering::Relaxed);
2073 disarm_shutdown_on_error();
2074 }
2075
2076 #[rstest]
2077 fn test_shutdown_on_error_failed_drain_keeps_trigger_pending() {
2078 LOGGING_BYPASSED.store(false, Ordering::Relaxed);
2079 disarm_shutdown_on_error();
2080
2081 let (tx, _rx) = std::sync::mpsc::channel();
2082 let logger = Logger::new_for_benchmark(LoggerConfig::default(), tx);
2083
2084 arm_shutdown_on_error(true);
2085 let args = format_args!("Pending error");
2086 let record = log::Record::builder()
2087 .args(args)
2088 .level(Level::Error)
2089 .target("PendingComponent")
2090 .build();
2091 log::Log::log(&logger, &record);
2092
2093 let drained = try_drain_shutdown_on_error_trigger(|trigger| {
2094 assert_eq!(trigger.component, Ustr::from("PendingComponent"));
2095 assert_eq!(trigger.message, "Pending error");
2096 false
2097 });
2098 assert!(!drained);
2099
2100 let trigger = take_shutdown_on_error_trigger().unwrap();
2101 assert_eq!(trigger.component, Ustr::from("PendingComponent"));
2102 assert_eq!(trigger.message, "Pending error");
2103 disarm_shutdown_on_error();
2104 }
2105
2106 #[rstest]
2107 fn test_logging_to_file() {
2108 let config = LoggerConfig {
2109 fileout_level: LevelFilter::Debug,
2110 ..Default::default()
2111 };
2112
2113 let temp_dir = tempdir().expect("Failed to create temporary directory");
2114 let file_config = FileWriterConfig {
2115 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
2116 ..Default::default()
2117 };
2118
2119 let log_guard = Logger::init_with_config(
2120 TraderId::from("TRADER-001"),
2121 UUID4::new(),
2122 config,
2123 file_config,
2124 );
2125
2126 logging_clock_set_static_mode();
2127 logging_clock_set_static_time(1_650_000_000_000_000);
2128
2129 log::info!(
2130 component = "RiskEngine";
2131 "This is a test"
2132 );
2133
2134 let mut log_contents = String::new();
2135
2136 wait_until(
2137 || {
2138 std::fs::read_dir(&temp_dir)
2139 .expect("Failed to read directory")
2140 .filter_map(Result::ok)
2141 .any(|entry| entry.path().is_file())
2142 },
2143 Duration::from_secs(3),
2144 );
2145
2146 drop(log_guard); wait_until(
2149 || {
2150 let log_file_path = std::fs::read_dir(&temp_dir)
2151 .expect("Failed to read directory")
2152 .filter_map(Result::ok)
2153 .find(|entry| entry.path().is_file())
2154 .expect("No files found in directory")
2155 .path();
2156 log_contents = std::fs::read_to_string(log_file_path)
2157 .expect("Error while reading log file");
2158 !log_contents.is_empty()
2159 },
2160 Duration::from_secs(3),
2161 );
2162
2163 assert_eq!(
2164 log_contents,
2165 "1970-01-20T02:20:00.000000000Z [INFO] TRADER-001.RiskEngine: This is a test\n"
2166 );
2167 }
2168
2169 #[rstest]
2170 fn test_logging_sync_to_disk_flushes_fast_flush_policy() {
2171 let config = LoggerConfig {
2172 fileout_level: LevelFilter::Debug,
2173 fileout_sync_on_flush: false,
2174 ..Default::default()
2175 };
2176
2177 let temp_dir = tempdir().expect("Failed to create temporary directory");
2178 let file_config = FileWriterConfig {
2179 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
2180 ..Default::default()
2181 };
2182
2183 let log_guard = Logger::init_with_config(
2184 TraderId::from("TRADER-SYNC"),
2185 UUID4::new(),
2186 config,
2187 file_config,
2188 )
2189 .expect("Failed to initialize logger");
2190
2191 logging_clock_set_static_mode();
2192 logging_clock_set_static_time(1_650_000_000_000_000);
2193
2194 log::info!(
2195 component = "RiskEngine";
2196 "sync me"
2197 );
2198
2199 logging_sync_to_disk().expect("sync-to-disk should succeed");
2200
2201 let log_file_path = std::fs::read_dir(&temp_dir)
2202 .expect("Failed to read directory")
2203 .filter_map(Result::ok)
2204 .find(|entry| entry.path().is_file())
2205 .expect("No files found in directory")
2206 .path();
2207 let log_contents =
2208 std::fs::read_to_string(log_file_path).expect("Error while reading log file");
2209
2210 assert!(log_contents.contains("sync me"));
2211
2212 drop(log_guard);
2213 }
2214
2215 #[rstest]
2216 fn test_shutdown_drains_backlog_tail() {
2217 const N: usize = 1000;
2218
2219 let config = LoggerConfig {
2221 stdout_level: LevelFilter::Off,
2222 fileout_level: LevelFilter::Info,
2223 ..Default::default()
2224 };
2225
2226 let temp_dir = tempdir().expect("Failed to create temporary directory");
2227 let file_config = FileWriterConfig {
2228 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
2229 ..Default::default()
2230 };
2231
2232 let log_guard = Logger::init_with_config(
2233 TraderId::from("TRADER-TAIL"),
2234 UUID4::new(),
2235 config,
2236 file_config,
2237 )
2238 .expect("Failed to initialize logger");
2239
2240 logging_clock_set_static_mode();
2242 logging_clock_set_static_time(1_700_000_000_000_000);
2243
2244 for i in 0..N {
2246 log::info!(component = "TailDrain"; "BacklogTest {i}");
2247 }
2248
2249 drop(log_guard);
2251
2252 let mut count = 0usize;
2254 wait_until(
2255 || {
2256 if let Some(log_file) = std::fs::read_dir(&temp_dir)
2257 .expect("Failed to read directory")
2258 .filter_map(Result::ok)
2259 .find(|entry| entry.path().is_file())
2260 {
2261 let log_file_path = log_file.path();
2262 if let Ok(contents) = std::fs::read_to_string(log_file_path) {
2263 count = contents
2264 .lines()
2265 .filter(|l| l.contains("BacklogTest "))
2266 .count();
2267 count >= N
2268 } else {
2269 false
2270 }
2271 } else {
2272 false
2273 }
2274 },
2275 Duration::from_secs(5),
2276 );
2277
2278 assert_eq!(count, N, "Expected all pre-shutdown messages to be written");
2279 }
2280
2281 #[rstest]
2282 fn test_log_component_level_filtering() {
2283 let config =
2284 LoggerConfig::from_spec("stdout=Info;fileout=Debug;RiskEngine=Error").unwrap();
2285
2286 let temp_dir = tempdir().expect("Failed to create temporary directory");
2287 let file_config = FileWriterConfig {
2288 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
2289 ..Default::default()
2290 };
2291
2292 let log_guard = Logger::init_with_config(
2293 TraderId::from("TRADER-001"),
2294 UUID4::new(),
2295 config,
2296 file_config,
2297 );
2298
2299 logging_clock_set_static_mode();
2300 logging_clock_set_static_time(1_650_000_000_000_000);
2301
2302 log::info!(
2303 component = "RiskEngine";
2304 "This is a test"
2305 );
2306
2307 drop(log_guard); wait_until(
2310 || {
2311 if let Some(log_file) = std::fs::read_dir(&temp_dir)
2312 .expect("Failed to read directory")
2313 .filter_map(Result::ok)
2314 .find(|entry| entry.path().is_file())
2315 {
2316 let log_file_path = log_file.path();
2317 let log_contents = std::fs::read_to_string(log_file_path)
2318 .expect("Error while reading log file");
2319 !log_contents.contains("RiskEngine")
2320 } else {
2321 false
2322 }
2323 },
2324 Duration::from_secs(3),
2325 );
2326
2327 assert!(
2328 std::fs::read_dir(&temp_dir)
2329 .expect("Failed to read directory")
2330 .filter_map(Result::ok)
2331 .any(|entry| entry.path().is_file()),
2332 "Log file exists"
2333 );
2334 }
2335
2336 #[rstest]
2337 fn test_logging_to_file_in_json_format() {
2338 let config =
2339 LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Info")
2340 .unwrap();
2341
2342 let temp_dir = tempdir().expect("Failed to create temporary directory");
2343 let file_config = FileWriterConfig {
2344 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
2345 file_format: Some("json".to_string()),
2346 ..Default::default()
2347 };
2348
2349 let log_guard = Logger::init_with_config(
2350 TraderId::from("TRADER-001"),
2351 UUID4::new(),
2352 config,
2353 file_config,
2354 );
2355
2356 logging_clock_set_static_mode();
2357 logging_clock_set_static_time(1_650_000_000_000_000);
2358
2359 log::info!(
2360 component = "RiskEngine";
2361 "This is a test"
2362 );
2363
2364 let mut log_contents = String::new();
2365
2366 drop(log_guard); wait_until(
2369 || {
2370 if let Some(log_file) = std::fs::read_dir(&temp_dir)
2371 .expect("Failed to read directory")
2372 .filter_map(Result::ok)
2373 .find(|entry| entry.path().is_file())
2374 {
2375 let log_file_path = log_file.path();
2376 log_contents = std::fs::read_to_string(log_file_path)
2377 .expect("Error while reading log file");
2378 !log_contents.is_empty()
2379 } else {
2380 false
2381 }
2382 },
2383 Duration::from_secs(3),
2384 );
2385
2386 assert_eq!(
2387 log_contents,
2388 "{\"timestamp\":\"1970-01-20T02:20:00.000000000Z\",\"trader_id\":\"TRADER-001\",\"level\":\"INFO\",\"color\":\"NORMAL\",\"component\":\"RiskEngine\",\"message\":\"This is a test\"}\n"
2389 );
2390 }
2391
2392 #[rstest]
2393 fn test_init_sets_logging_is_initialized_flag() {
2394 let config = LoggerConfig::default();
2395 let file_config = FileWriterConfig::default();
2396
2397 let guard = Logger::init_with_config(
2398 TraderId::from("TRADER-001"),
2399 UUID4::new(),
2400 config,
2401 file_config,
2402 );
2403 assert!(guard.is_ok());
2404 assert!(logging_is_initialized());
2405
2406 drop(guard);
2407 assert!(!logging_is_initialized());
2408 }
2409
2410 #[rstest]
2411 fn test_init_returns_error_when_log_guard_limit_reached() {
2412 let guard = Logger::init_with_config(
2413 TraderId::from("TRADER-001"),
2414 UUID4::new(),
2415 LoggerConfig::default(),
2416 FileWriterConfig::default(),
2417 )
2418 .expect("Failed to initialize logger");
2419
2420 LOGGING_GUARDS_ACTIVE.store(u8::MAX, Ordering::SeqCst);
2421 let result = Logger::init_with_config(
2422 TraderId::from("TRADER-001"),
2423 UUID4::new(),
2424 LoggerConfig::default(),
2425 FileWriterConfig::default(),
2426 );
2427 LOGGING_GUARDS_ACTIVE.store(1, Ordering::SeqCst);
2428 drop(guard);
2429
2430 assert_eq!(
2431 result.unwrap_err().to_string(),
2432 "Logging already initialized but new guard could not be created"
2433 );
2434 }
2435
2436 #[rstest]
2437 fn test_reinit_after_guard_drop_fails() {
2438 let config = LoggerConfig::default();
2439 let file_config = FileWriterConfig::default();
2440
2441 let guard1 = Logger::init_with_config(
2442 TraderId::from("TRADER-001"),
2443 UUID4::new(),
2444 config.clone(),
2445 file_config.clone(),
2446 );
2447 assert!(guard1.is_ok());
2448 drop(guard1);
2449
2450 let guard2 = Logger::init_with_config(
2452 TraderId::from("TRADER-002"),
2453 UUID4::new(),
2454 config,
2455 file_config,
2456 );
2457 assert!(guard2.is_err());
2458 }
2459
2460 #[rstest]
2461 fn test_bypass_before_init_prevents_logging() {
2462 logging_set_bypass();
2463 assert!(LOGGING_BYPASSED.load(Ordering::Relaxed));
2464
2465 let temp_dir = tempdir().expect("Failed to create temporary directory");
2466 let config = LoggerConfig {
2467 fileout_level: LevelFilter::Debug,
2468 ..Default::default()
2469 };
2470 let file_config = FileWriterConfig {
2471 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
2472 ..Default::default()
2473 };
2474
2475 let guard = Logger::init_with_config(
2476 TraderId::from("TRADER-001"),
2477 UUID4::new(),
2478 config,
2479 file_config,
2480 );
2481 assert!(guard.is_ok());
2482
2483 log::info!(
2484 component = "TestComponent";
2485 "This should be bypassed"
2486 );
2487 std::thread::sleep(Duration::from_millis(100));
2488 drop(guard);
2489
2490 assert!(LOGGING_BYPASSED.load(Ordering::Relaxed));
2492 }
2493
2494 #[rstest]
2495 fn test_module_level_filtering() {
2496 let config = LoggerConfig::from_spec(
2500 "stdout=Off;fileout=Trace;nautilus::adapters=Warn;nautilus::adapters::okx=Debug",
2501 )
2502 .unwrap();
2503
2504 let temp_dir = tempdir().expect("Failed to create temporary directory");
2505 let file_config = FileWriterConfig {
2506 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
2507 ..Default::default()
2508 };
2509
2510 let log_guard = Logger::init_with_config(
2511 TraderId::from("TRADER-MOD"),
2512 UUID4::new(),
2513 config,
2514 file_config,
2515 )
2516 .expect("Failed to initialize logger");
2517
2518 logging_clock_set_static_mode();
2519 logging_clock_set_static_time(1_650_000_000_000_000);
2520
2521 log::debug!(
2523 component = "nautilus::adapters::okx::websocket";
2524 "OKX debug message"
2525 );
2526
2527 log::info!(
2529 component = "nautilus::adapters::okx";
2530 "OKX info message"
2531 );
2532
2533 log::info!(
2535 component = "nautilus::adapters::binance";
2536 "Binance info message SHOULD NOT APPEAR"
2537 );
2538
2539 log::warn!(
2541 component = "nautilus::adapters::binance";
2542 "Binance warn message"
2543 );
2544
2545 log::trace!(
2547 component = "Portfolio";
2548 "Portfolio trace message"
2549 );
2550
2551 drop(log_guard);
2552
2553 wait_until(
2554 || {
2555 std::fs::read_dir(&temp_dir)
2556 .expect("Failed to read directory")
2557 .filter_map(Result::ok)
2558 .any(|entry| entry.path().is_file())
2559 },
2560 Duration::from_secs(3),
2561 );
2562
2563 let log_file_path = std::fs::read_dir(&temp_dir)
2564 .expect("Failed to read directory")
2565 .filter_map(Result::ok)
2566 .find(|entry| entry.path().is_file())
2567 .expect("No log file found")
2568 .path();
2569
2570 let log_contents =
2571 std::fs::read_to_string(log_file_path).expect("Error reading log file");
2572
2573 assert!(
2574 log_contents.contains("OKX debug message"),
2575 "OKX debug should pass (longer prefix wins)"
2576 );
2577 assert!(
2578 log_contents.contains("OKX info message"),
2579 "OKX info should pass"
2580 );
2581 assert!(
2582 log_contents.contains("Binance warn message"),
2583 "Binance warn should pass"
2584 );
2585 assert!(
2586 log_contents.contains("Portfolio trace message"),
2587 "Unfiltered component should pass"
2588 );
2589 assert!(
2590 !log_contents.contains("SHOULD NOT APPEAR"),
2591 "Binance info should be filtered (adapters=Warn)"
2592 );
2593 }
2594
2595 #[rstest]
2596 fn test_logging_to_file_with_kv_fields() {
2597 let config = LoggerConfig {
2598 fileout_level: LevelFilter::Debug,
2599 ..Default::default()
2600 };
2601
2602 let temp_dir = tempdir().expect("Failed to create temporary directory");
2603 let file_config = FileWriterConfig {
2604 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
2605 ..Default::default()
2606 };
2607
2608 let log_guard = Logger::init_with_config(
2609 TraderId::from("TRADER-001"),
2610 UUID4::new(),
2611 config,
2612 file_config,
2613 );
2614
2615 logging_clock_set_static_mode();
2616 logging_clock_set_static_time(1_650_000_000_000_000);
2617
2618 log::info!(
2619 component = "DataEngine",
2620 venue = "BINANCE",
2621 product_type = "SPOT";
2622 "WebSocket connected"
2623 );
2624
2625 let mut log_contents = String::new();
2626
2627 drop(log_guard);
2628
2629 wait_until(
2630 || {
2631 if let Some(log_file) = std::fs::read_dir(&temp_dir)
2632 .expect("Failed to read directory")
2633 .filter_map(Result::ok)
2634 .find(|entry| entry.path().is_file())
2635 {
2636 log_contents = std::fs::read_to_string(log_file.path())
2637 .expect("Error while reading log file");
2638 !log_contents.is_empty()
2639 } else {
2640 false
2641 }
2642 },
2643 Duration::from_secs(3),
2644 );
2645
2646 assert!(
2647 log_contents.contains("WebSocket connected"),
2648 "Message should be present"
2649 );
2650 assert!(
2651 log_contents.contains("venue=BINANCE"),
2652 "venue field should appear in output, was:\n{log_contents}"
2653 );
2654 assert!(
2655 log_contents.contains("product_type=SPOT"),
2656 "product_type field should appear in output, was:\n{log_contents}"
2657 );
2658 }
2659
2660 #[rstest]
2661 fn test_logging_to_file_json_with_kv_fields() {
2662 let config =
2663 LoggerConfig::from_spec("stdout=Off;fileout=Debug;DataEngine=Debug").unwrap();
2664
2665 let temp_dir = tempdir().expect("Failed to create temporary directory");
2666 let file_config = FileWriterConfig {
2667 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
2668 file_format: Some("json".to_string()),
2669 ..Default::default()
2670 };
2671
2672 let log_guard = Logger::init_with_config(
2673 TraderId::from("TRADER-001"),
2674 UUID4::new(),
2675 config,
2676 file_config,
2677 );
2678
2679 logging_clock_set_static_mode();
2680 logging_clock_set_static_time(1_650_000_000_000_000);
2681
2682 log::info!(
2683 component = "DataEngine",
2684 venue = "BINANCE",
2685 order_id = "O-12345";
2686 "Order filled"
2687 );
2688
2689 let mut log_contents = String::new();
2690
2691 drop(log_guard);
2692
2693 wait_until(
2694 || {
2695 if let Some(log_file) = std::fs::read_dir(&temp_dir)
2696 .expect("Failed to read directory")
2697 .filter_map(Result::ok)
2698 .find(|entry| entry.path().is_file())
2699 {
2700 log_contents = std::fs::read_to_string(log_file.path())
2701 .expect("Error while reading log file");
2702 !log_contents.is_empty()
2703 } else {
2704 false
2705 }
2706 },
2707 Duration::from_secs(3),
2708 );
2709
2710 let parsed: serde_json::Value =
2711 serde_json::from_str(log_contents.trim()).expect("Should be valid JSON");
2712 assert_eq!(parsed["component"], "DataEngine");
2713 assert_eq!(parsed["message"], "Order filled");
2714 assert_eq!(parsed["venue"], "BINANCE");
2715 assert_eq!(parsed["order_id"], "O-12345");
2716 }
2717 }
2718
2719 #[cfg(all(feature = "simulation", madsim))]
2720 mod sim_tests {
2721 use std::sync::atomic::Ordering;
2722
2723 use super::*;
2724 use crate::logging::LOGGING_BYPASSED;
2725
2726 #[rstest]
2727 fn test_init_under_madsim_skips_writer_thread_and_forces_bypass() {
2728 let config = LoggerConfig {
2729 bypass_logging: false,
2730 ..Default::default()
2731 };
2732 let temp_dir = tempdir().expect("Failed to create temporary directory");
2733 let file_config = FileWriterConfig {
2734 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
2735 ..Default::default()
2736 };
2737
2738 let _guard = Logger::init_with_config(
2739 TraderId::from("TRADER-SIM"),
2740 UUID4::new(),
2741 config,
2742 file_config,
2743 )
2744 .expect("init should succeed under simulation");
2745
2746 assert!(LOGGING_INITIALIZED.load(Ordering::SeqCst));
2747 assert!(
2748 LOGGING_BYPASSED.load(Ordering::SeqCst),
2749 "bypass must be forced under cfg(madsim) even when config disables it"
2750 );
2751 assert!(
2752 LOGGER_HANDLE
2753 .lock()
2754 .expect("LOGGER_HANDLE mutex should not be poisoned")
2755 .is_none(),
2756 "writer thread must not be spawned under cfg(madsim)"
2757 );
2758 }
2759 }
2760}