Skip to main content

nautilus_common/logging/
logger.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    cell::RefCell,
18    fmt::{Display, Write as _},
19    sync::{
20        Mutex, OnceLock,
21        atomic::{AtomicBool, Ordering},
22        mpsc::SendError,
23    },
24};
25
26use ahash::AHashMap;
27use indexmap::IndexMap;
28use log::{
29    Level, LevelFilter, Log, STATIC_MAX_LEVEL,
30    kv::{ToValue, Value},
31    set_boxed_logger, set_max_level,
32};
33use nautilus_core::{
34    UUID4, UnixNanos,
35    datetime::unix_nanos_to_iso8601,
36    time::{get_atomic_clock_realtime, get_atomic_clock_static},
37};
38use nautilus_model::identifiers::TraderId;
39use serde::{Deserialize, Serialize, Serializer, ser::SerializeMap};
40use smallvec::SmallVec;
41use ustr::Ustr;
42
43pub use super::config::LoggerConfig;
44use super::{LOGGING_BYPASSED, LOGGING_GUARDS_ACTIVE, LOGGING_INITIALIZED, LOGGING_REALTIME};
45#[cfg(not(all(feature = "simulation", madsim)))]
46use crate::logging::writer::{FileWriter, LogWriter, StderrWriter, StdoutWriter};
47use crate::{
48    enums::{LogColor, LogLevel},
49    logging::writer::FileWriterConfig,
50};
51
52#[cfg(not(all(feature = "simulation", madsim)))]
53const LOGGING: &str = "logging";
54const KV_COLOR: &str = "color";
55const KV_COMPONENT: &str = "component";
56const LOG_FIELDS_INLINE_CAP: usize = 0;
57const MAX_LEVEL_DISPLAY_LEN: usize = "ERROR".len();
58const ANSI_BOLD_LEN: usize = "\x1b[1m".len();
59const ANSI_RESET_LEN: usize = "\x1b[0m".len();
60const PLAIN_FORMAT_OVERHEAD: usize = " [".len() + "] ".len() + ".".len() + ": ".len() + "\n".len();
61const COLORED_FORMAT_OVERHEAD: usize = ANSI_BOLD_LEN
62    + ANSI_RESET_LEN
63    + " ".len()
64    + "[".len()
65    + "] ".len()
66    + ".".len()
67    + ": ".len()
68    + ANSI_RESET_LEN
69    + "\n".len();
70const REPEATED_USTR_CACHE_CAP: usize = 8;
71
72thread_local! {
73    static REPEATED_USTR_CACHE: RefCell<RepeatedUstrCache> =
74        const { RefCell::new(RepeatedUstrCache::new()) };
75}
76
77#[derive(Clone, Copy)]
78struct RepeatedUstrCacheEntry {
79    ptr: usize,
80    len: usize,
81    value: Ustr,
82}
83
84#[derive(Clone, Copy)]
85struct RepeatedUstrCache {
86    entries: [Option<RepeatedUstrCacheEntry>; REPEATED_USTR_CACHE_CAP],
87    next: usize,
88}
89
90impl RepeatedUstrCache {
91    const fn new() -> Self {
92        Self {
93            entries: [None; REPEATED_USTR_CACHE_CAP],
94            next: 0,
95        }
96    }
97}
98
99/// Storage for structured log fields.
100/// Inline capacity is intentionally zero to keep the producer-side `LogLine` payload small.
101pub type LogFields = SmallVec<[(Ustr, String); LOG_FIELDS_INLINE_CAP]>;
102
103/// Global log sender which allows multiple log guards per process.
104static LOGGER_TX: OnceLock<std::sync::mpsc::Sender<LogEvent>> = OnceLock::new();
105
106/// Global handle to the logging thread - only one thread exists per process.
107static LOGGER_HANDLE: Mutex<Option<std::thread::JoinHandle<()>>> = Mutex::new(None);
108
109static SHUTDOWN_ON_ERROR: OnceLock<ShutdownOnError> = OnceLock::new();
110
111/// The first error log captured after shutdown-on-error is armed.
112#[derive(Debug, Clone, PartialEq, Eq)]
113pub struct ShutdownOnErrorTrigger {
114    /// The UNIX timestamp (ns) of the error log.
115    pub timestamp: UnixNanos,
116    /// The log component that emitted the error.
117    pub component: Ustr,
118    /// The formatted error log message.
119    pub message: String,
120}
121
122#[derive(Debug, Default)]
123struct ShutdownOnError {
124    armed: AtomicBool,
125    triggered: AtomicBool,
126    pending: Mutex<Option<ShutdownOnErrorTrigger>>,
127}
128
129impl ShutdownOnError {
130    fn is_armed(&self) -> bool {
131        self.armed.load(Ordering::Acquire)
132    }
133
134    fn arm(&self, enabled: bool) {
135        if let Ok(mut pending) = self.pending.lock() {
136            pending.take();
137        }
138        self.triggered.store(false, Ordering::Release);
139        self.armed.store(enabled, Ordering::Release);
140    }
141
142    fn disarm(&self) {
143        self.armed.store(false, Ordering::Release);
144        self.triggered.store(false, Ordering::Release);
145
146        if let Ok(mut pending) = self.pending.lock() {
147            pending.take();
148        }
149    }
150
151    fn maybe_record_trigger<F>(
152        &self,
153        level: Level,
154        timestamp: UnixNanos,
155        component: Ustr,
156        message: F,
157    ) where
158        F: FnOnce() -> String,
159    {
160        if !self.armed.load(Ordering::Acquire) || level != Level::Error {
161            return;
162        }
163
164        let Ok(mut pending) = self.pending.lock() else {
165            return;
166        };
167
168        if self
169            .triggered
170            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
171            .is_err()
172        {
173            return;
174        }
175
176        *pending = Some(ShutdownOnErrorTrigger {
177            timestamp,
178            component,
179            message: message(),
180        });
181    }
182
183    fn take_trigger(&self) -> Option<ShutdownOnErrorTrigger> {
184        if !self.triggered.load(Ordering::Acquire) {
185            return None;
186        }
187
188        self.pending
189            .lock()
190            .ok()
191            .and_then(|mut pending| pending.take())
192    }
193
194    fn try_drain_trigger<F>(&self, drain: F) -> bool
195    where
196        F: FnOnce(&ShutdownOnErrorTrigger) -> bool,
197    {
198        if !self.triggered.load(Ordering::Acquire) {
199            return false;
200        }
201
202        let Ok(mut pending) = self.pending.lock() else {
203            return false;
204        };
205
206        let Some(trigger) = pending.as_ref() else {
207            return false;
208        };
209
210        if !drain(trigger) {
211            return false;
212        }
213
214        pending.take();
215        true
216    }
217}
218
219/// Arms shutdown-on-error handling for the current run.
220pub fn arm_shutdown_on_error(enabled: bool) {
221    shutdown_on_error().arm(enabled);
222}
223
224/// Disarms shutdown-on-error handling and clears any pending trigger.
225pub fn disarm_shutdown_on_error() {
226    shutdown_on_error().disarm();
227}
228
229/// Returns and clears the pending shutdown-on-error trigger, if one was recorded.
230pub fn take_shutdown_on_error_trigger() -> Option<ShutdownOnErrorTrigger> {
231    shutdown_on_error().take_trigger()
232}
233
234/// Conditionally drains the pending shutdown-on-error trigger.
235pub fn try_drain_shutdown_on_error_trigger<F>(drain: F) -> bool
236where
237    F: FnOnce(&ShutdownOnErrorTrigger) -> bool,
238{
239    shutdown_on_error().try_drain_trigger(drain)
240}
241
242fn shutdown_on_error() -> &'static ShutdownOnError {
243    SHUTDOWN_ON_ERROR.get_or_init(ShutdownOnError::default)
244}
245
246/// Producer-side filtering policy derived from [`LoggerConfig`].
247#[derive(Debug, Clone)]
248struct FilterPolicy {
249    /// Module filters pre-sorted by descending path length for longest-prefix lookup.
250    modules_by_longest_prefix: Vec<(Ustr, LevelFilter)>,
251    /// Per-component log level overrides.
252    components: AHashMap<Ustr, LevelFilter>,
253    /// Whether logs without an explicit component/module filter should be skipped.
254    components_only: bool,
255}
256
257impl FilterPolicy {
258    fn from_config(config: &LoggerConfig) -> Option<Self> {
259        let modules_by_longest_prefix = sorted_module_filters_from_map(&config.module_level);
260        if !config.log_components_only
261            && modules_by_longest_prefix.is_empty()
262            && config.component_level.is_empty()
263        {
264            return None;
265        }
266
267        Some(Self {
268            modules_by_longest_prefix,
269            components: config.component_level.clone(),
270            components_only: config.log_components_only,
271        })
272    }
273
274    fn should_skip(&self, component: &Ustr, level: Level) -> bool {
275        should_filter_log_inner(
276            component,
277            level,
278            &self.modules_by_longest_prefix,
279            &self.components,
280            self.components_only,
281        )
282    }
283}
284
285/// A high-performance logger utilizing a MPSC channel under the hood.
286///
287/// A logger is initialized with a [`LoggerConfig`] to set up different logging levels for
288/// stdout, file, and components. The logger spawns a thread that listens for [`LogEvent`]s
289/// sent via an MPSC channel.
290#[derive(Debug)]
291pub struct Logger {
292    /// Initialization snapshot for logging levels and behavior.
293    ///
294    /// Producer filters are derived into `filter_policy` at initialization; mutating this field
295    /// after registration does not reload component/module filters.
296    pub config: LoggerConfig,
297    /// Producer-side component/module filtering policy.
298    filter_policy: Option<FilterPolicy>,
299    /// Transmitter for sending log events to the 'logging' thread.
300    tx: std::sync::mpsc::Sender<LogEvent>,
301}
302
303/// Represents a type of log event.
304#[derive(Debug)]
305pub enum LogEvent {
306    /// A log line event.
307    Log(LogLine),
308    /// A command to flush all logger buffers.
309    Flush,
310    /// A command to flush and sync file logs to disk, then acknowledge completion.
311    Sync(std::sync::mpsc::Sender<anyhow::Result<()>>),
312    /// A command to close the logger.
313    Close,
314}
315
316/// Represents a log event which includes a message.
317#[derive(Clone, Debug, Serialize, Deserialize)]
318pub struct LogLine {
319    /// The timestamp for the event.
320    pub timestamp: UnixNanos,
321    /// The log level for the event.
322    pub level: Level,
323    /// The color for the log message content.
324    pub color: LogColor,
325    /// The Nautilus system component the log event originated from.
326    pub component: Ustr,
327    /// The log message content.
328    pub message: String,
329    /// Arbitrary structured key-value fields attached to this log event.
330    #[serde(default, skip_serializing_if = "SmallVec::is_empty")]
331    pub fields: LogFields,
332}
333
334impl Display for LogLine {
335    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
336        write!(f, "[{}] {}: {}", self.level, self.component, self.message)?;
337        for (k, v) in &self.fields {
338            write!(f, " {k}={v}")?;
339        }
340        Ok(())
341    }
342}
343
344/// A wrapper around a log line that provides formatted and cached representations.
345///
346/// This struct contains a log line and provides various formatted versions
347/// of it, such as plain string, colored string, and JSON. It also caches the
348/// results for repeated calls, optimizing performance when the same message
349/// needs to be logged multiple times in different formats.
350#[derive(Clone, Debug)]
351pub struct LogLineWrapper {
352    /// The underlying log line that contains the log data.
353    line: LogLine,
354    /// Cached plain string representation of the log line.
355    cache: Option<String>,
356    /// Cached colored string representation of the log line.
357    colored: Option<String>,
358    /// The ID of the trader associated with this log event.
359    trader_id: Ustr,
360}
361
362impl LogLineWrapper {
363    /// Creates a new [`LogLineWrapper`] instance.
364    #[must_use]
365    pub const fn new(line: LogLine, trader_id: Ustr) -> Self {
366        Self {
367            line,
368            cache: None,
369            colored: None,
370            trader_id,
371        }
372    }
373
374    /// Returns the plain log message string, caching the result.
375    ///
376    /// This method constructs the log line format and caches it for repeated calls. Useful when the
377    /// same log message needs to be printed multiple times.
378    pub fn get_string(&mut self) -> &str {
379        self.cache.get_or_insert_with(|| {
380            let timestamp = unix_nanos_to_iso8601(self.line.timestamp);
381            let mut s = String::with_capacity(plain_log_line_capacity(
382                &timestamp,
383                self.trader_id,
384                &self.line,
385            ));
386
387            write!(
388                s,
389                "{} [{}] {}.{}: {}",
390                timestamp, self.line.level, self.trader_id, self.line.component, self.line.message,
391            )
392            .expect("writing to String should not fail");
393
394            for (k, v) in &self.line.fields {
395                s.push(' ');
396                s.push_str(k);
397                s.push('=');
398                s.push_str(v);
399            }
400            s.push('\n');
401            s
402        })
403    }
404
405    /// Returns the colored log message string, caching the result.
406    ///
407    /// This method constructs the colored log line format and caches the result
408    /// for repeated calls, providing the message with ANSI color codes if the
409    /// logger is configured to use colors.
410    pub fn get_colored(&mut self) -> &str {
411        self.colored.get_or_insert_with(|| {
412            let timestamp = unix_nanos_to_iso8601(self.line.timestamp);
413            let color_ansi = self.line.color.as_ansi();
414            let mut s = String::with_capacity(colored_log_line_capacity(
415                &timestamp,
416                color_ansi,
417                self.trader_id,
418                &self.line,
419            ));
420
421            write!(
422                s,
423                "\x1b[1m{}\x1b[0m {}[{}] {}.{}: {}",
424                timestamp,
425                color_ansi,
426                self.line.level,
427                self.trader_id,
428                self.line.component,
429                self.line.message,
430            )
431            .expect("writing to String should not fail");
432
433            for (k, v) in &self.line.fields {
434                s.push(' ');
435                s.push_str(k);
436                s.push('=');
437                s.push_str(v);
438            }
439            s.push_str("\x1b[0m\n");
440            s
441        })
442    }
443
444    /// Returns the log message as a JSON string.
445    ///
446    /// This method serializes the log line and its associated metadata
447    /// (timestamp, trader ID, etc.) into a JSON string format. This is useful
448    /// for structured logging or when logs need to be stored in a JSON format.
449    /// # Panics
450    ///
451    /// Panics if serialization of the log event to JSON fails.
452    #[must_use]
453    pub fn get_json(&self) -> String {
454        let mut json_string =
455            serde_json::to_string(&self).expect("Error serializing log event to string");
456        json_string.push('\n');
457        json_string
458    }
459}
460
461fn formatted_fields_len(fields: &LogFields) -> usize {
462    fields.iter().map(|(k, v)| 2 + k.len() + v.len()).sum()
463}
464
465fn log_line_capacity(
466    timestamp: &str,
467    trader_id: Ustr,
468    line: &LogLine,
469    overhead: usize,
470    ansi_extra_len: usize,
471) -> usize {
472    timestamp.len()
473        + overhead
474        + ansi_extra_len
475        + MAX_LEVEL_DISPLAY_LEN
476        + trader_id.len()
477        + line.component.len()
478        + line.message.len()
479        + formatted_fields_len(&line.fields)
480}
481
482fn plain_log_line_capacity(timestamp: &str, trader_id: Ustr, line: &LogLine) -> usize {
483    log_line_capacity(timestamp, trader_id, line, PLAIN_FORMAT_OVERHEAD, 0)
484}
485
486fn colored_log_line_capacity(
487    timestamp: &str,
488    color_ansi: &str,
489    trader_id: Ustr,
490    line: &LogLine,
491) -> usize {
492    log_line_capacity(
493        timestamp,
494        trader_id,
495        line,
496        COLORED_FORMAT_OVERHEAD,
497        color_ansi.len(),
498    )
499}
500
501impl Serialize for LogLineWrapper {
502    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
503    where
504        S: Serializer,
505    {
506        if has_duplicate_json_field(&self.line.fields) {
507            return serialize_log_line_with_indexmap(self, serializer);
508        }
509
510        let timestamp = unix_nanos_to_iso8601(self.line.timestamp);
511        let mut map = serializer.serialize_map(None)?;
512
513        map.serialize_entry("timestamp", &timestamp)?;
514        map.serialize_entry("trader_id", self.trader_id.as_str())?;
515        map.serialize_entry("level", &DisplayAsString(&self.line.level))?;
516        map.serialize_entry("color", &DisplayAsString(&self.line.color))?;
517        map.serialize_entry("component", self.line.component.as_str())?;
518        map.serialize_entry("message", &self.line.message)?;
519
520        for (k, v) in &self.line.fields {
521            let key = k.as_str();
522            if !is_reserved_json_key(key) {
523                map.serialize_entry(key, v)?;
524            }
525        }
526
527        map.end()
528    }
529}
530
531struct DisplayAsString<'a, T: ?Sized>(&'a T);
532
533impl<T> Serialize for DisplayAsString<'_, T>
534where
535    T: Display + ?Sized,
536{
537    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
538    where
539        S: Serializer,
540    {
541        serializer.collect_str(self.0)
542    }
543}
544
545fn is_reserved_json_key(key: &str) -> bool {
546    matches!(
547        key,
548        "timestamp" | "trader_id" | "level" | "color" | "component" | "message"
549    )
550}
551
552fn has_duplicate_json_field(fields: &LogFields) -> bool {
553    if fields.is_empty() {
554        return false;
555    }
556
557    for (idx, (key, _)) in fields.iter().enumerate() {
558        let key = key.as_str();
559        if is_reserved_json_key(key) {
560            continue;
561        }
562
563        if fields
564            .iter()
565            .take(idx)
566            .any(|(prev, _)| !is_reserved_json_key(prev.as_str()) && prev.as_str() == key)
567        {
568            return true;
569        }
570    }
571
572    false
573}
574
575fn serialize_log_line_with_indexmap<S>(
576    wrapper: &LogLineWrapper,
577    serializer: S,
578) -> Result<S::Ok, S::Error>
579where
580    S: Serializer,
581{
582    let mut json_obj = IndexMap::new();
583    let timestamp = unix_nanos_to_iso8601(wrapper.line.timestamp);
584    json_obj.insert("timestamp".to_string(), timestamp);
585    json_obj.insert("trader_id".to_string(), wrapper.trader_id.to_string());
586    json_obj.insert("level".to_string(), wrapper.line.level.to_string());
587    json_obj.insert("color".to_string(), wrapper.line.color.to_string());
588    json_obj.insert("component".to_string(), wrapper.line.component.to_string());
589    json_obj.insert("message".to_string(), wrapper.line.message.clone());
590    for (k, v) in &wrapper.line.fields {
591        let key = k.as_str();
592        if !is_reserved_json_key(key) {
593            json_obj.insert(k.to_string(), v.clone());
594        }
595    }
596
597    json_obj.serialize(serializer)
598}
599
600fn sorted_module_filters_from_map(
601    module_level: &AHashMap<Ustr, LevelFilter>,
602) -> Vec<(Ustr, LevelFilter)> {
603    let mut filters: Vec<_> = module_level
604        .iter()
605        .map(|(path, level)| (*path, *level))
606        .collect();
607    filters.sort_by_key(|(path, _)| std::cmp::Reverse(path.len()));
608    filters
609}
610
611fn current_log_timestamp() -> UnixNanos {
612    if LOGGING_REALTIME.load(Ordering::Relaxed) {
613        get_atomic_clock_realtime().get_time_ns()
614    } else {
615        get_atomic_clock_static().get_time_ns()
616    }
617}
618
619fn intern_repeated(value: &str) -> Ustr {
620    REPEATED_USTR_CACHE.with(|cache| {
621        let mut cache_state = cache.borrow_mut();
622        let ptr = value.as_ptr() as usize;
623        let len = value.len();
624
625        // Targets and components are usually repeated static strings; the content check keeps
626        // dynamic RecordBuilder targets correct if an allocator reuses the same pointer.
627        for entry in cache_state.entries.iter().flatten() {
628            if entry.ptr == ptr && entry.len == len && entry.value.as_str() == value {
629                return entry.value;
630            }
631        }
632
633        let interned = Ustr::from(value);
634        let insert_idx = cache_state.next;
635        cache_state.entries[insert_idx] = Some(RepeatedUstrCacheEntry {
636            ptr,
637            len,
638            value: interned,
639        });
640        cache_state.next = (insert_idx + 1) % REPEATED_USTR_CACHE_CAP;
641        interned
642    })
643}
644
645fn intern_component_value(value: &log::kv::Value<'_>) -> Ustr {
646    match value.to_borrowed_str() {
647        Some(component) => intern_repeated(component),
648        None => Ustr::from(&value.to_string()),
649    }
650}
651
652struct ComponentProbe {
653    component: Option<Ustr>,
654}
655
656impl ComponentProbe {
657    const fn new() -> Self {
658        Self { component: None }
659    }
660}
661
662impl<'kvs> log::kv::VisitSource<'kvs> for ComponentProbe {
663    fn visit_pair(
664        &mut self,
665        key: log::kv::Key<'kvs>,
666        value: log::kv::Value<'kvs>,
667    ) -> Result<(), log::kv::Error> {
668        if key.as_str() == KV_COMPONENT {
669            self.component = Some(intern_component_value(&value));
670        }
671        Ok(())
672    }
673}
674
675struct PayloadCollector {
676    color: Option<LogColor>,
677    fields: LogFields,
678}
679
680impl PayloadCollector {
681    fn new() -> Self {
682        Self {
683            color: None,
684            fields: SmallVec::new(),
685        }
686    }
687}
688
689impl<'kvs> log::kv::VisitSource<'kvs> for PayloadCollector {
690    fn visit_pair(
691        &mut self,
692        key: log::kv::Key<'kvs>,
693        value: log::kv::Value<'kvs>,
694    ) -> Result<(), log::kv::Error> {
695        match key.as_str() {
696            KV_COLOR => {
697                self.color = value.to_u64().map(|v| (v as u8).into());
698            }
699            KV_COMPONENT => {}
700            _ => {
701                self.fields
702                    .push((Ustr::from(key.as_str()), value.to_string()));
703            }
704        }
705        Ok(())
706    }
707}
708
709struct FieldCollector {
710    color: Option<LogColor>,
711    component: Option<Ustr>,
712    fields: LogFields,
713}
714
715impl FieldCollector {
716    fn new() -> Self {
717        Self {
718            color: None,
719            component: None,
720            fields: SmallVec::new(),
721        }
722    }
723}
724
725impl<'kvs> log::kv::VisitSource<'kvs> for FieldCollector {
726    fn visit_pair(
727        &mut self,
728        key: log::kv::Key<'kvs>,
729        value: log::kv::Value<'kvs>,
730    ) -> Result<(), log::kv::Error> {
731        match key.as_str() {
732            KV_COLOR => {
733                self.color = value.to_u64().map(|v| (v as u8).into());
734            }
735            KV_COMPONENT => {
736                self.component = Some(intern_component_value(&value));
737            }
738            _ => {
739                self.fields
740                    .push((Ustr::from(key.as_str()), value.to_string()));
741            }
742        }
743        Ok(())
744    }
745}
746
747impl Log for Logger {
748    fn enabled(&self, metadata: &log::Metadata) -> bool {
749        if LOGGING_BYPASSED.load(Ordering::Relaxed) {
750            return metadata.level() == Level::Error && shutdown_on_error().is_armed();
751        }
752
753        metadata.level() == Level::Error
754            || metadata.level() <= self.config.stdout_level
755            || metadata.level() <= self.config.fileout_level
756    }
757
758    fn log(&self, record: &log::Record) {
759        let level = record.level();
760
761        if LOGGING_BYPASSED.load(Ordering::Relaxed) {
762            if level == Level::Error {
763                record_shutdown_on_error(record, level);
764            }
765            return;
766        }
767
768        if self.enabled(record.metadata()) {
769            if let Some(filter_policy) = &self.filter_policy {
770                // Probe only the component before filtering. Filtered error logs still need
771                // enough payload to trigger shutdown-on-error.
772                let mut probe = ComponentProbe::new();
773                let _ = record.key_values().visit(&mut probe);
774                let component = probe
775                    .component
776                    .unwrap_or_else(|| intern_repeated(record.metadata().target()));
777
778                if filter_policy.should_skip(&component, level) {
779                    if level == Level::Error {
780                        shutdown_on_error().maybe_record_trigger(
781                            level,
782                            current_log_timestamp(),
783                            component,
784                            || format!("{}", record.args()),
785                        );
786                    }
787                    return;
788                }
789
790                let timestamp = current_log_timestamp();
791                let mut collector = PayloadCollector::new();
792                let _ = record.key_values().visit(&mut collector);
793                let color = collector.color.unwrap_or_else(|| level.into());
794
795                let line = LogLine {
796                    timestamp,
797                    level,
798                    color,
799                    component,
800                    message: format!("{}", record.args()),
801                    fields: collector.fields,
802                };
803
804                shutdown_on_error().maybe_record_trigger(
805                    line.level,
806                    line.timestamp,
807                    line.component,
808                    || line.message.clone(),
809                );
810                self.send_log_line(line);
811                return;
812            }
813
814            // With no component/module filters configured, keep the producer path to one KV visit.
815            let timestamp = current_log_timestamp();
816            let mut collector = FieldCollector::new();
817            let _ = record.key_values().visit(&mut collector);
818            let color = collector.color.unwrap_or_else(|| level.into());
819            let component = collector
820                .component
821                .unwrap_or_else(|| intern_repeated(record.metadata().target()));
822
823            let line = LogLine {
824                timestamp,
825                level,
826                color,
827                component,
828                message: format!("{}", record.args()),
829                fields: collector.fields,
830            };
831
832            shutdown_on_error().maybe_record_trigger(
833                line.level,
834                line.timestamp,
835                line.component,
836                || line.message.clone(),
837            );
838            self.send_log_line(line);
839        }
840    }
841
842    fn flush(&self) {
843        // Don't attempt to flush if we're already bypassed/shutdown
844        if LOGGING_BYPASSED.load(Ordering::Relaxed) {
845            return;
846        }
847
848        if let Err(e) = self.tx.send(LogEvent::Flush) {
849            eprintln!("Error sending flush log event: {e}");
850        }
851    }
852}
853
854fn record_shutdown_on_error(record: &log::Record, level: Level) {
855    let mut probe = ComponentProbe::new();
856    let _ = record.key_values().visit(&mut probe);
857    let component = probe
858        .component
859        .unwrap_or_else(|| intern_repeated(record.metadata().target()));
860
861    shutdown_on_error().maybe_record_trigger(level, current_log_timestamp(), component, || {
862        format!("{}", record.args())
863    });
864}
865
866impl Logger {
867    /// Creates a logger instance for direct benchmark harnesses.
868    ///
869    /// This bypasses the global `log::set_logger` singleton so benchmark code can compare
870    /// multiple logger configurations in one process. It does not spawn a writer thread.
871    #[doc(hidden)]
872    #[must_use]
873    pub fn new_for_benchmark(config: LoggerConfig, tx: std::sync::mpsc::Sender<LogEvent>) -> Self {
874        let filter_policy = FilterPolicy::from_config(&config);
875
876        Self {
877            config,
878            filter_policy,
879            tx,
880        }
881    }
882
883    fn send_log_line(&self, line: LogLine) {
884        if let Err(SendError(LogEvent::Log(line))) = self.tx.send(LogEvent::Log(line)) {
885            eprintln!("Error sending log event (receiver closed): {line}");
886        }
887    }
888
889    /// Initializes the logger based on the `NAUTILUS_LOG` environment variable.
890    ///
891    /// # Errors
892    ///
893    /// Returns an error if reading the environment variable or parsing the configuration fails.
894    pub fn init_with_env(
895        trader_id: TraderId,
896        instance_id: UUID4,
897        file_config: FileWriterConfig,
898    ) -> anyhow::Result<LogGuard> {
899        let config = LoggerConfig::from_env()?;
900        Self::init_with_config(trader_id, instance_id, config, file_config)
901    }
902
903    /// Initializes the logger with the given configuration.
904    ///
905    /// # Errors
906    ///
907    /// Returns an error if the logger fails to register or initialize the background thread.
908    pub fn init_with_config(
909        trader_id: TraderId,
910        instance_id: UUID4,
911        config: LoggerConfig,
912        file_config: FileWriterConfig,
913    ) -> anyhow::Result<LogGuard> {
914        // Fast path: already initialized
915        if super::LOGGING_INITIALIZED.load(Ordering::SeqCst) {
916            return LogGuard::new().ok_or_else(|| {
917                anyhow::anyhow!("Logging already initialized but new guard could not be created")
918            });
919        }
920
921        let (tx, rx) = std::sync::mpsc::channel::<LogEvent>();
922        let filter_policy = FilterPolicy::from_config(&config);
923
924        let logger_tx = tx.clone();
925        let logger = Self {
926            config: config.clone(),
927            filter_policy,
928            tx: logger_tx,
929        };
930
931        set_boxed_logger(Box::new(logger))?;
932
933        // Store the sender globally so additional guards can be created
934        if LOGGER_TX.set(tx).is_err() {
935            debug_assert!(
936                false,
937                "LOGGER_TX already set - re-initialization not supported"
938            );
939        }
940
941        if config.bypass_logging {
942            super::logging_set_bypass();
943        }
944
945        let is_colored = config.is_colored;
946
947        let print_config = config.print_config;
948        if print_config {
949            println!("STATIC_MAX_LEVEL={STATIC_MAX_LEVEL}");
950            println!("Logger initialized with {config:?} {file_config:?}");
951        }
952
953        #[cfg(not(all(feature = "simulation", madsim)))]
954        {
955            let handle = std::thread::Builder::new()
956                .name(LOGGING.to_string())
957                .spawn(move || {
958                    Self::handle_messages(
959                        trader_id.to_string(),
960                        instance_id.to_string(),
961                        config,
962                        file_config,
963                        rx,
964                    );
965                })?;
966
967            // Store the handle globally
968            if let Ok(mut handle_guard) = LOGGER_HANDLE.lock() {
969                debug_assert!(
970                    handle_guard.is_none(),
971                    "LOGGER_HANDLE already set - re-initialization not supported"
972                );
973                *handle_guard = Some(handle);
974            }
975        }
976
977        #[cfg(all(feature = "simulation", madsim))]
978        {
979            // Under simulation, the background writer thread would escape the
980            // madsim scheduler. Drop the receiver so the channel closes cleanly
981            // and force the bypass flag so subsequent log calls no-op without
982            // SendError noise.
983            let _ = (trader_id, instance_id, config, file_config, rx);
984            super::logging_set_bypass();
985        }
986
987        let max_level = log::LevelFilter::Trace;
988        set_max_level(max_level);
989
990        if print_config {
991            println!("Logger set as `log` implementation with max level {max_level}");
992        }
993
994        super::LOGGING_INITIALIZED.store(true, Ordering::SeqCst);
995        super::LOGGING_COLORED.store(is_colored, Ordering::SeqCst);
996
997        LogGuard::new()
998            .ok_or_else(|| anyhow::anyhow!("Failed to create LogGuard from global sender"))
999    }
1000
1001    #[cfg(not(all(feature = "simulation", madsim)))]
1002    #[expect(clippy::needless_pass_by_value)]
1003    fn handle_messages(
1004        trader_id: String,
1005        instance_id: String,
1006        config: LoggerConfig,
1007        file_config: FileWriterConfig,
1008        rx: std::sync::mpsc::Receiver<LogEvent>,
1009    ) {
1010        let LoggerConfig {
1011            stdout_level,
1012            fileout_level,
1013            component_level: _,
1014            module_level: _,
1015            log_components_only: _,
1016            is_colored,
1017            print_config: _,
1018            use_tracing: _,
1019            bypass_logging: _,
1020            file_config: _,
1021            clear_log_file,
1022            fileout_sync_on_flush,
1023            buffered_stdout,
1024        } = config;
1025
1026        let trader_id_cache = Ustr::from(&trader_id);
1027
1028        // Set up std I/O buffers
1029        let mut stdout_writer = StdoutWriter::new(stdout_level, is_colored, buffered_stdout);
1030        let mut stderr_writer = StderrWriter::new(is_colored);
1031
1032        // Conditionally create file writer based on fileout_level
1033        let mut file_writer_opt = if fileout_level == LevelFilter::Off {
1034            None
1035        } else {
1036            FileWriter::new(
1037                trader_id,
1038                instance_id,
1039                file_config,
1040                fileout_level,
1041                clear_log_file,
1042                fileout_sync_on_flush,
1043            )
1044        };
1045
1046        let process_event = |event: LogEvent,
1047                             stdout_writer: &mut StdoutWriter,
1048                             stderr_writer: &mut StderrWriter,
1049                             file_writer_opt: &mut Option<FileWriter>| {
1050            match event {
1051                LogEvent::Log(line) => {
1052                    let mut wrapper = LogLineWrapper::new(line, trader_id_cache);
1053
1054                    if stderr_writer.enabled(&wrapper.line) {
1055                        if is_colored {
1056                            stderr_writer.write(wrapper.get_colored());
1057                        } else {
1058                            stderr_writer.write(wrapper.get_string());
1059                        }
1060                    }
1061
1062                    if stdout_writer.enabled(&wrapper.line) {
1063                        if is_colored {
1064                            stdout_writer.write(wrapper.get_colored());
1065                        } else {
1066                            stdout_writer.write(wrapper.get_string());
1067                        }
1068                    }
1069
1070                    if let Some(file_writer) = file_writer_opt
1071                        && file_writer.enabled(&wrapper.line)
1072                    {
1073                        if file_writer.json_format {
1074                            file_writer.write(&wrapper.get_json());
1075                        } else {
1076                            file_writer.write(wrapper.get_string());
1077                        }
1078                    }
1079                }
1080                LogEvent::Flush => {
1081                    stdout_writer.flush();
1082                    stderr_writer.flush();
1083
1084                    if let Some(file_writer) = file_writer_opt {
1085                        file_writer.flush();
1086                    }
1087                }
1088                LogEvent::Sync(done) => {
1089                    let result = if let Some(file_writer) = file_writer_opt {
1090                        file_writer.flush_and_sync().map_err(anyhow::Error::from)
1091                    } else {
1092                        Ok(())
1093                    };
1094
1095                    let _ = done.send(result);
1096                }
1097                LogEvent::Close => {
1098                    // Close handled in the main loop; ignore here.
1099                }
1100            }
1101        };
1102
1103        // Continue to receive and handle log events until channel is hung up
1104        while let Ok(event) = rx.recv() {
1105            match event {
1106                LogEvent::Log(_) | LogEvent::Flush | LogEvent::Sync(_) => process_event(
1107                    event,
1108                    &mut stdout_writer,
1109                    &mut stderr_writer,
1110                    &mut file_writer_opt,
1111                ),
1112                LogEvent::Close => {
1113                    // First flush what's been written so far
1114                    stdout_writer.flush();
1115                    stderr_writer.flush();
1116
1117                    if let Some(ref mut file_writer) = file_writer_opt {
1118                        file_writer.flush();
1119                    }
1120
1121                    // Drain any remaining events that may have raced with shutdown
1122                    // This ensures logs enqueued just before/around shutdown aren't lost.
1123                    while let Ok(evt) = rx.try_recv() {
1124                        match evt {
1125                            LogEvent::Close => (), // ignore extra Close events
1126                            _ => process_event(
1127                                evt,
1128                                &mut stdout_writer,
1129                                &mut stderr_writer,
1130                                &mut file_writer_opt,
1131                            ),
1132                        }
1133                    }
1134
1135                    // Final flush after draining
1136                    stdout_writer.flush();
1137                    stderr_writer.flush();
1138
1139                    if let Some(ref mut file_writer) = file_writer_opt {
1140                        file_writer.flush_and_sync_logged();
1141                    }
1142
1143                    break;
1144                }
1145            }
1146        }
1147    }
1148}
1149
1150/// Determines if a log line should be filtered out based on module and component filters.
1151///
1152/// Returns `true` if the line should be skipped (filtered out), `false` if it should be logged.
1153///
1154/// The `module_filters_sorted` slice must be pre-sorted by descending path length so the
1155/// first `starts_with` match is the longest prefix.
1156#[must_use]
1157pub fn should_filter_log(
1158    component: &Ustr,
1159    line_level: log::Level,
1160    module_filters_sorted: &[(Ustr, LevelFilter)],
1161    component_level: &AHashMap<Ustr, LevelFilter>,
1162    log_components_only: bool,
1163) -> bool {
1164    should_filter_log_inner(
1165        component,
1166        line_level,
1167        module_filters_sorted,
1168        component_level,
1169        log_components_only,
1170    )
1171}
1172
1173fn should_filter_log_inner(
1174    component: &Ustr,
1175    line_level: log::Level,
1176    module_filters_sorted: &[(Ustr, LevelFilter)],
1177    component_level: &AHashMap<Ustr, LevelFilter>,
1178    log_components_only: bool,
1179) -> bool {
1180    if module_filters_sorted.is_empty() && component_level.is_empty() {
1181        return log_components_only;
1182    }
1183
1184    // Module filter: first match in sorted list is longest prefix
1185    let module_filter = module_filters_sorted
1186        .iter()
1187        .find(|(path, _)| component.starts_with(path.as_str()))
1188        .map(|(_, level)| *level);
1189
1190    let component_filter = component_level.get(component).copied();
1191
1192    if log_components_only && module_filter.is_none() && component_filter.is_none() {
1193        return true;
1194    }
1195
1196    // Module filter takes precedence over component filter
1197    if let Some(filter_level) = module_filter.or(component_filter)
1198        && line_level > filter_level
1199    {
1200        return true;
1201    }
1202
1203    false
1204}
1205
1206/// Gracefully shuts down the logging subsystem.
1207///
1208/// Performs the same shutdown sequence as dropping the last `LogGuard`, but can be called
1209/// explicitly for deterministic shutdown timing (e.g., testing or Windows Python applications).
1210///
1211/// # Safety
1212///
1213/// Safe to call multiple times. Thread join is skipped if called from the logging thread.
1214pub(crate) fn shutdown_graceful() {
1215    // Prevent further logging
1216    LOGGING_BYPASSED.store(true, Ordering::SeqCst);
1217    log::set_max_level(log::LevelFilter::Off);
1218
1219    // Signal Close if the sender exists
1220    if let Some(tx) = LOGGER_TX.get() {
1221        let _ = tx.send(LogEvent::Close);
1222    }
1223
1224    if let Ok(mut handle_guard) = LOGGER_HANDLE.lock()
1225        && let Some(handle) = handle_guard.take()
1226        && handle.thread().id() != std::thread::current().id()
1227    {
1228        let _ = handle.join();
1229    }
1230
1231    LOGGING_INITIALIZED.store(false, Ordering::SeqCst);
1232}
1233
1234/// Flushes and syncs file logs to disk through the logging thread.
1235///
1236/// This is a no-op when logging is not initialized or file logging is disabled.
1237///
1238/// # Errors
1239///
1240/// Returns an error if the sync request cannot be delivered or acknowledged.
1241pub fn sync_to_disk() -> anyhow::Result<()> {
1242    if !LOGGING_INITIALIZED.load(Ordering::SeqCst) {
1243        return Ok(());
1244    }
1245
1246    let Some(tx) = LOGGER_TX.get() else {
1247        return Ok(());
1248    };
1249
1250    let (done_tx, done_rx) = std::sync::mpsc::channel();
1251    tx.send(LogEvent::Sync(done_tx))
1252        .map_err(|e| anyhow::anyhow!("failed to request logging sync: {e}"))?;
1253
1254    done_rx
1255        .recv()
1256        .map_err(|e| anyhow::anyhow!("failed to receive logging sync acknowledgement: {e}"))?
1257}
1258
1259pub fn log<T: AsRef<str>>(level: LogLevel, color: LogColor, component: Ustr, message: T) {
1260    let color = Value::from(color as u8);
1261
1262    match level {
1263        LogLevel::Off => {}
1264        LogLevel::Trace => {
1265            log::trace!(component = component.to_value(), color = color; "{}", message.as_ref());
1266        }
1267        LogLevel::Debug => {
1268            log::debug!(component = component.to_value(), color = color; "{}", message.as_ref());
1269        }
1270        LogLevel::Info => {
1271            log::info!(component = component.to_value(), color = color; "{}", message.as_ref());
1272        }
1273        LogLevel::Warning => {
1274            log::warn!(component = component.to_value(), color = color; "{}", message.as_ref());
1275        }
1276        LogLevel::Error => {
1277            log::error!(component = component.to_value(), color = color; "{}", message.as_ref());
1278        }
1279    }
1280}
1281
1282/// A guard that manages the lifecycle of the logging subsystem.
1283///
1284/// `LogGuard` ensures the logging thread remains active while instances exist and properly
1285/// terminates when all guards are dropped. The system uses reference counting to track active
1286/// guards - when the last `LogGuard` is dropped, the logging thread is joined to ensure all
1287/// pending log messages are written before the process terminates.
1288///
1289/// # Reference Counting
1290///
1291/// The logging system maintains a global atomic counter of active `LogGuard` instances. This
1292/// ensures that:
1293/// - The logging thread remains active as long as at least one `LogGuard` exists.
1294/// - All log messages are properly flushed when intermediate guards are dropped.
1295/// - The logging thread is cleanly terminated and joined when the last guard is dropped.
1296///
1297/// # Shutdown Behavior
1298///
1299/// When the last guard is dropped, the logging thread is signaled to close, drains pending
1300/// messages, and is joined to ensure all logs are written before process termination.
1301///
1302/// **Python on Windows:** Non-deterministic GC order during interpreter shutdown can
1303/// occasionally prevent proper thread join, resulting in truncated logs.
1304///
1305/// # Limits
1306///
1307/// The system supports a maximum of 255 concurrent `LogGuard` instances.
1308#[cfg_attr(
1309    feature = "python",
1310    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
1311)]
1312#[cfg_attr(
1313    feature = "python",
1314    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
1315)]
1316#[derive(Debug)]
1317pub struct LogGuard {
1318    tx: std::sync::mpsc::Sender<LogEvent>,
1319}
1320
1321impl LogGuard {
1322    /// Creates a new [`LogGuard`] instance from the global logger.
1323    ///
1324    /// Returns `None` if logging has not been initialized or the active `LogGuard`
1325    /// count would exceed 255.
1326    #[must_use]
1327    pub fn new() -> Option<Self> {
1328        let tx = LOGGER_TX.get()?;
1329        LOGGING_GUARDS_ACTIVE
1330            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
1331                if count == u8::MAX {
1332                    None
1333                } else {
1334                    Some(count + 1)
1335                }
1336            })
1337            .ok()?;
1338
1339        Some(Self { tx: tx.clone() })
1340    }
1341}
1342
1343impl Drop for LogGuard {
1344    /// Handles cleanup when a `LogGuard` is dropped.
1345    ///
1346    /// Sends `Flush` if other guards remain active, otherwise sends `Close`, joins the
1347    /// logging thread, and resets the subsystem state.
1348    fn drop(&mut self) {
1349        let previous_count = LOGGING_GUARDS_ACTIVE
1350            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
1351                assert!(count != 0, "LogGuard reference count underflow");
1352                Some(count - 1)
1353            })
1354            .expect("Failed to decrement LogGuard count");
1355
1356        // Check if this was the last LogGuard - re-check after decrement to avoid race
1357        if previous_count == 1 && LOGGING_GUARDS_ACTIVE.load(Ordering::SeqCst) == 0 {
1358            // This is truly the last LogGuard, so we should close the logger and join the thread
1359            // to ensure all log messages are written before the process terminates.
1360            // Prevent any new log events from being accepted while shutting down.
1361            LOGGING_BYPASSED.store(true, Ordering::SeqCst);
1362
1363            // Disable all log levels to reduce overhead on late calls
1364            log::set_max_level(log::LevelFilter::Off);
1365
1366            // Ensure Close is delivered before joining (critical for shutdown)
1367            let _ = self.tx.send(LogEvent::Close);
1368
1369            // Join the logging thread to ensure all pending logs are written
1370            if let Ok(mut handle_guard) = LOGGER_HANDLE.lock()
1371                && let Some(handle) = handle_guard.take()
1372            {
1373                // Avoid self-join deadlock
1374                if handle.thread().id() != std::thread::current().id() {
1375                    let _ = handle.join();
1376                }
1377            }
1378
1379            // Reset LOGGING_INITIALIZED since the logging thread has terminated
1380            LOGGING_INITIALIZED.store(false, Ordering::SeqCst);
1381        } else {
1382            // Other LogGuards are still active, just flush our logs
1383            let _ = self.tx.send(LogEvent::Flush);
1384        }
1385    }
1386}
1387
1388#[cfg(test)]
1389mod tests {
1390    use ahash::AHashMap;
1391    use log::LevelFilter;
1392    use nautilus_core::UUID4;
1393    use nautilus_model::identifiers::TraderId;
1394    use rstest::*;
1395    use serde_json::Value;
1396    use tempfile::tempdir;
1397    use ustr::Ustr;
1398
1399    use super::*;
1400    use crate::enums::LogColor;
1401
1402    #[rstest]
1403    fn log_message_serialization() {
1404        let log_message = LogLine {
1405            timestamp: UnixNanos::default(),
1406            level: log::Level::Info,
1407            color: LogColor::Normal,
1408            component: Ustr::from("Portfolio"),
1409            message: "This is a log message".to_string(),
1410            fields: SmallVec::new(),
1411        };
1412
1413        let serialized_json = serde_json::to_string(&log_message).unwrap();
1414        let deserialized_value: Value = serde_json::from_str(&serialized_json).unwrap();
1415
1416        assert_eq!(deserialized_value["level"], "INFO");
1417        assert_eq!(deserialized_value["component"], "Portfolio");
1418        assert_eq!(deserialized_value["message"], "This is a log message");
1419    }
1420
1421    #[rstest]
1422    fn log_config_parsing() {
1423        let config =
1424            LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Error")
1425                .unwrap();
1426        assert_eq!(
1427            config,
1428            LoggerConfig {
1429                stdout_level: LevelFilter::Info,
1430                fileout_level: LevelFilter::Debug,
1431                component_level: AHashMap::from_iter(vec![(
1432                    Ustr::from("RiskEngine"),
1433                    LevelFilter::Error
1434                )]),
1435                module_level: AHashMap::new(),
1436                log_components_only: false,
1437                is_colored: true,
1438                print_config: false,
1439                use_tracing: false,
1440                ..Default::default()
1441            }
1442        );
1443    }
1444
1445    #[rstest]
1446    fn log_config_parsing2() {
1447        let config = LoggerConfig::from_spec("stdout=Warn;print_config;fileout=Error;").unwrap();
1448        assert_eq!(
1449            config,
1450            LoggerConfig {
1451                stdout_level: LevelFilter::Warn,
1452                fileout_level: LevelFilter::Error,
1453                component_level: AHashMap::new(),
1454                module_level: AHashMap::new(),
1455                log_components_only: false,
1456                is_colored: true,
1457                print_config: true,
1458                use_tracing: false,
1459                ..Default::default()
1460            }
1461        );
1462    }
1463
1464    #[rstest]
1465    fn log_config_parsing_with_log_components_only() {
1466        let config =
1467            LoggerConfig::from_spec("stdout=Info;log_components_only;RiskEngine=Debug").unwrap();
1468        assert_eq!(
1469            config,
1470            LoggerConfig {
1471                stdout_level: LevelFilter::Info,
1472                fileout_level: LevelFilter::Off,
1473                component_level: AHashMap::from_iter(vec![(
1474                    Ustr::from("RiskEngine"),
1475                    LevelFilter::Debug
1476                )]),
1477                module_level: AHashMap::new(),
1478                log_components_only: true,
1479                is_colored: true,
1480                print_config: false,
1481                use_tracing: false,
1482                ..Default::default()
1483            }
1484        );
1485    }
1486
1487    #[rstest]
1488    fn test_log_line_wrapper_plain_string() {
1489        let line = LogLine {
1490            timestamp: 1_650_000_000_000_000_000.into(),
1491            level: log::Level::Info,
1492            color: LogColor::Normal,
1493            component: Ustr::from("TestComponent"),
1494            message: "Test message".to_string(),
1495            fields: SmallVec::new(),
1496        };
1497
1498        let mut wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-001"));
1499        let result = wrapper.get_string();
1500
1501        assert!(result.contains("TRADER-001"));
1502        assert!(result.contains("TestComponent"));
1503        assert!(result.contains("Test message"));
1504        assert!(result.contains("[INFO]"));
1505        assert!(result.ends_with('\n'));
1506        // Should NOT contain ANSI codes
1507        assert!(!result.contains("\x1b["));
1508    }
1509
1510    #[rstest]
1511    fn test_log_line_wrapper_colored_string() {
1512        let line = LogLine {
1513            timestamp: 1_650_000_000_000_000_000.into(),
1514            level: log::Level::Info,
1515            color: LogColor::Green,
1516            component: Ustr::from("TestComponent"),
1517            message: "Test message".to_string(),
1518            fields: SmallVec::new(),
1519        };
1520
1521        let mut wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-001"));
1522        let result = wrapper.get_colored();
1523
1524        assert!(result.contains("TRADER-001"));
1525        assert!(result.contains("TestComponent"));
1526        assert!(result.contains("Test message"));
1527        // Should contain ANSI codes
1528        assert!(result.contains("\x1b["));
1529        assert!(result.ends_with('\n'));
1530    }
1531
1532    #[rstest]
1533    fn test_log_line_wrapper_json_output() {
1534        let line = LogLine {
1535            timestamp: 1_650_000_000_000_000_000.into(),
1536            level: log::Level::Warn,
1537            color: LogColor::Yellow,
1538            component: Ustr::from("RiskEngine"),
1539            message: "Warning message".to_string(),
1540            fields: SmallVec::new(),
1541        };
1542
1543        let wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-002"));
1544        let json = wrapper.get_json();
1545
1546        let parsed: Value = serde_json::from_str(json.trim()).unwrap();
1547        assert_eq!(parsed["trader_id"], "TRADER-002");
1548        assert_eq!(parsed["component"], "RiskEngine");
1549        assert_eq!(parsed["message"], "Warning message");
1550        assert_eq!(parsed["level"], "WARN");
1551        assert_eq!(parsed["color"], "YELLOW");
1552    }
1553
1554    #[rstest]
1555    fn test_log_line_wrapper_caches_string() {
1556        let line = LogLine {
1557            timestamp: 1_650_000_000_000_000_000.into(),
1558            level: log::Level::Info,
1559            color: LogColor::Normal,
1560            component: Ustr::from("Test"),
1561            message: "Cached".to_string(),
1562            fields: SmallVec::new(),
1563        };
1564
1565        let mut wrapper = LogLineWrapper::new(line, Ustr::from("TRADER"));
1566        let first = wrapper.get_string().to_string();
1567        let second = wrapper.get_string().to_string();
1568
1569        assert_eq!(first, second);
1570    }
1571
1572    #[rstest]
1573    fn test_log_line_display() {
1574        let line = LogLine {
1575            timestamp: 0.into(),
1576            level: log::Level::Error,
1577            color: LogColor::Red,
1578            component: Ustr::from("Component"),
1579            message: "Error occurred".to_string(),
1580            fields: SmallVec::new(),
1581        };
1582
1583        let display = format!("{line}");
1584        assert_eq!(display, "[ERROR] Component: Error occurred");
1585    }
1586
1587    #[rstest]
1588    fn test_log_line_display_with_fields() {
1589        let line = LogLine {
1590            timestamp: 0.into(),
1591            level: log::Level::Info,
1592            color: LogColor::Normal,
1593            component: Ustr::from("RiskEngine"),
1594            message: "Order filled".to_string(),
1595            fields: smallvec::smallvec![
1596                (Ustr::from("venue"), "BINANCE".to_string()),
1597                (Ustr::from("order_id"), "O-001".to_string()),
1598            ],
1599        };
1600
1601        let display = format!("{line}");
1602        assert_eq!(
1603            display,
1604            "[INFO] RiskEngine: Order filled venue=BINANCE order_id=O-001"
1605        );
1606    }
1607
1608    #[rstest]
1609    fn test_log_line_wrapper_plain_string_with_fields() {
1610        let line = LogLine {
1611            timestamp: 1_650_000_000_000_000_000.into(),
1612            level: log::Level::Info,
1613            color: LogColor::Normal,
1614            component: Ustr::from("DataEngine"),
1615            message: "Connected".to_string(),
1616            fields: smallvec::smallvec![
1617                (Ustr::from("venue"), "BINANCE".to_string()),
1618                (Ustr::from("product_type"), "SPOT".to_string()),
1619            ],
1620        };
1621
1622        let mut wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-001"));
1623        let result = wrapper.get_string();
1624
1625        assert!(result.contains("Connected"));
1626        assert!(result.contains("venue=BINANCE"));
1627        assert!(result.contains("product_type=SPOT"));
1628        assert!(result.ends_with('\n'));
1629        assert!(!result.contains("\x1b["));
1630    }
1631
1632    #[rstest]
1633    fn test_log_line_wrapper_json_with_fields() {
1634        let line = LogLine {
1635            timestamp: 1_650_000_000_000_000_000.into(),
1636            level: log::Level::Info,
1637            color: LogColor::Normal,
1638            component: Ustr::from("RiskEngine"),
1639            message: "Order filled".to_string(),
1640            fields: smallvec::smallvec![
1641                (Ustr::from("strategy_id"), "S-001".to_string()),
1642                (Ustr::from("venue"), "BINANCE".to_string()),
1643            ],
1644        };
1645
1646        let wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-001"));
1647        let json = wrapper.get_json();
1648
1649        let parsed: Value = serde_json::from_str(json.trim()).unwrap();
1650        assert_eq!(parsed["component"], "RiskEngine");
1651        assert_eq!(parsed["message"], "Order filled");
1652        assert_eq!(parsed["strategy_id"], "S-001");
1653        assert_eq!(parsed["venue"], "BINANCE");
1654    }
1655
1656    #[rstest]
1657    fn test_log_line_wrapper_json_no_fields_has_no_extra_keys() {
1658        let line = LogLine {
1659            timestamp: 1_650_000_000_000_000_000.into(),
1660            level: log::Level::Info,
1661            color: LogColor::Normal,
1662            component: Ustr::from("Test"),
1663            message: "Simple".to_string(),
1664            fields: SmallVec::new(),
1665        };
1666
1667        let wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-001"));
1668        let json = wrapper.get_json();
1669
1670        let parsed: Value = serde_json::from_str(json.trim()).unwrap();
1671        let obj = parsed.as_object().unwrap();
1672        assert_eq!(obj.len(), 6); // timestamp, trader_id, level, color, component, message
1673    }
1674
1675    #[rstest]
1676    fn test_log_line_wrapper_json_reserved_keys_not_overwritten() {
1677        let line = LogLine {
1678            timestamp: 1_650_000_000_000_000_000.into(),
1679            level: log::Level::Warn,
1680            color: LogColor::Normal,
1681            component: Ustr::from("Test"),
1682            message: "Real message".to_string(),
1683            fields: smallvec::smallvec![
1684                (Ustr::from("level"), "FAKE".to_string()),
1685                (Ustr::from("message"), "injected".to_string()),
1686                (Ustr::from("timestamp"), "bogus".to_string()),
1687                (Ustr::from("venue"), "BINANCE".to_string()),
1688            ],
1689        };
1690
1691        let wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-001"));
1692        let json = wrapper.get_json();
1693        let parsed: Value = serde_json::from_str(json.trim()).unwrap();
1694
1695        assert_eq!(parsed["level"], "WARN");
1696        assert_eq!(parsed["message"], "Real message");
1697        assert_ne!(parsed["timestamp"], "bogus");
1698        assert_eq!(parsed["venue"], "BINANCE");
1699    }
1700
1701    #[rstest]
1702    fn test_log_line_wrapper_json_duplicate_extra_fields_last_value_wins() {
1703        let line = LogLine {
1704            timestamp: 1_650_000_000_000_000_000.into(),
1705            level: log::Level::Info,
1706            color: LogColor::Normal,
1707            component: Ustr::from("Test"),
1708            message: "Duplicate field".to_string(),
1709            fields: smallvec::smallvec![
1710                (Ustr::from("venue"), "BINANCE".to_string()),
1711                (Ustr::from("venue"), "OKX".to_string()),
1712            ],
1713        };
1714
1715        let wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-001"));
1716        let json = wrapper.get_json();
1717        let parsed: Value = serde_json::from_str(json.trim()).unwrap();
1718
1719        assert_eq!(json.matches("\"venue\"").count(), 1);
1720        assert_eq!(parsed["venue"], "OKX");
1721    }
1722
1723    /// Helper to convert module level map to sorted vec (descending by path length)
1724    fn sorted_module_filters(map: AHashMap<Ustr, LevelFilter>) -> Vec<(Ustr, LevelFilter)> {
1725        let mut v: Vec<_> = map.into_iter().collect();
1726        v.sort_by_key(|b| std::cmp::Reverse(b.0.len()));
1727        v
1728    }
1729
1730    #[rstest]
1731    fn test_filter_no_filters_passes_all() {
1732        let module_filters = vec![];
1733        let component_level = AHashMap::new();
1734
1735        assert!(!should_filter_log(
1736            &Ustr::from("anything"),
1737            Level::Trace,
1738            &module_filters,
1739            &component_level,
1740            false
1741        ));
1742    }
1743
1744    #[rstest]
1745    fn test_filter_component_exact_match() {
1746        let module_filters = vec![];
1747        let component_level = AHashMap::from_iter([(Ustr::from("RiskEngine"), LevelFilter::Error)]);
1748
1749        assert!(should_filter_log(
1750            &Ustr::from("RiskEngine"),
1751            Level::Info,
1752            &module_filters,
1753            &component_level,
1754            false
1755        ));
1756        assert!(!should_filter_log(
1757            &Ustr::from("RiskEngine"),
1758            Level::Error,
1759            &module_filters,
1760            &component_level,
1761            false
1762        ));
1763        assert!(!should_filter_log(
1764            &Ustr::from("Portfolio"),
1765            Level::Info,
1766            &module_filters,
1767            &component_level,
1768            false
1769        ));
1770    }
1771
1772    #[rstest]
1773    fn test_filter_module_prefix_match() {
1774        let module_filters = vec![(Ustr::from("nautilus_okx::websocket"), LevelFilter::Debug)];
1775        let component_level = AHashMap::new();
1776
1777        assert!(!should_filter_log(
1778            &Ustr::from("nautilus_okx::websocket"),
1779            Level::Debug,
1780            &module_filters,
1781            &component_level,
1782            false
1783        ));
1784        assert!(!should_filter_log(
1785            &Ustr::from("nautilus_okx::websocket::handler"),
1786            Level::Debug,
1787            &module_filters,
1788            &component_level,
1789            false
1790        ));
1791        assert!(should_filter_log(
1792            &Ustr::from("nautilus_okx::websocket::handler"),
1793            Level::Trace,
1794            &module_filters,
1795            &component_level,
1796            false
1797        ));
1798        assert!(!should_filter_log(
1799            &Ustr::from("nautilus_binance::data"),
1800            Level::Trace,
1801            &module_filters,
1802            &component_level,
1803            false
1804        ));
1805    }
1806
1807    #[rstest]
1808    fn test_filter_longest_prefix_wins() {
1809        let module_filters = sorted_module_filters(AHashMap::from_iter([
1810            (Ustr::from("nautilus_okx"), LevelFilter::Error),
1811            (Ustr::from("nautilus_okx::websocket"), LevelFilter::Debug),
1812        ]));
1813        let component_level = AHashMap::new();
1814
1815        assert!(!should_filter_log(
1816            &Ustr::from("nautilus_okx::websocket::handler"),
1817            Level::Debug,
1818            &module_filters,
1819            &component_level,
1820            false
1821        ));
1822        assert!(should_filter_log(
1823            &Ustr::from("nautilus_okx::data"),
1824            Level::Debug,
1825            &module_filters,
1826            &component_level,
1827            false
1828        ));
1829        assert!(!should_filter_log(
1830            &Ustr::from("nautilus_okx::data"),
1831            Level::Error,
1832            &module_filters,
1833            &component_level,
1834            false
1835        ));
1836    }
1837
1838    #[rstest]
1839    fn test_filter_module_precedence_over_component() {
1840        let module_filters = vec![(Ustr::from("nautilus_okx::websocket"), LevelFilter::Debug)];
1841        let component_level =
1842            AHashMap::from_iter([(Ustr::from("nautilus_okx::websocket"), LevelFilter::Error)]);
1843
1844        assert!(!should_filter_log(
1845            &Ustr::from("nautilus_okx::websocket"),
1846            Level::Debug,
1847            &module_filters,
1848            &component_level,
1849            false
1850        ));
1851    }
1852
1853    #[rstest]
1854    fn test_filter_log_components_only_blocks_unknown() {
1855        let module_filters = vec![];
1856        let component_level = AHashMap::from_iter([(Ustr::from("RiskEngine"), LevelFilter::Debug)]);
1857
1858        assert!(should_filter_log(
1859            &Ustr::from("Portfolio"),
1860            Level::Info,
1861            &module_filters,
1862            &component_level,
1863            true
1864        ));
1865        assert!(!should_filter_log(
1866            &Ustr::from("RiskEngine"),
1867            Level::Info,
1868            &module_filters,
1869            &component_level,
1870            true
1871        ));
1872    }
1873
1874    #[rstest]
1875    fn test_filter_log_components_only_with_module() {
1876        let module_filters = vec![(Ustr::from("nautilus_okx"), LevelFilter::Debug)];
1877        let component_level = AHashMap::new();
1878
1879        assert!(!should_filter_log(
1880            &Ustr::from("nautilus_okx::websocket"),
1881            Level::Debug,
1882            &module_filters,
1883            &component_level,
1884            true
1885        ));
1886        assert!(should_filter_log(
1887            &Ustr::from("nautilus_binance::data"),
1888            Level::Debug,
1889            &module_filters,
1890            &component_level,
1891            true
1892        ));
1893    }
1894
1895    #[rstest]
1896    fn test_filter_level_comparison() {
1897        let module_filters = vec![];
1898        let component_level = AHashMap::from_iter([(Ustr::from("Test"), LevelFilter::Warn)]);
1899
1900        assert!(!should_filter_log(
1901            &Ustr::from("Test"),
1902            Level::Error,
1903            &module_filters,
1904            &component_level,
1905            false
1906        ));
1907        assert!(!should_filter_log(
1908            &Ustr::from("Test"),
1909            Level::Warn,
1910            &module_filters,
1911            &component_level,
1912            false
1913        ));
1914        assert!(should_filter_log(
1915            &Ustr::from("Test"),
1916            Level::Info,
1917            &module_filters,
1918            &component_level,
1919            false
1920        ));
1921        assert!(should_filter_log(
1922            &Ustr::from("Test"),
1923            Level::Debug,
1924            &module_filters,
1925            &component_level,
1926            false
1927        ));
1928        assert!(should_filter_log(
1929            &Ustr::from("Test"),
1930            Level::Trace,
1931            &module_filters,
1932            &component_level,
1933            false
1934        ));
1935    }
1936
1937    // These tests use global logging state (one logger per process).
1938    // They run correctly with cargo-nextest which isolates each test in its own process.
1939    //
1940    // Gated out under `cfg(madsim)`: every test here drives the file-logging writer
1941    // thread, which is itself gated out under simulation (see `Logger::init_with_config`),
1942    // so log events are dropped and these tests would either hang on `wait_until` or
1943    // assert against an empty log file. Logging is outside the determinism contract.
1944    #[cfg(not(all(feature = "simulation", madsim)))]
1945    mod serial_tests {
1946        use std::{sync::atomic::Ordering, time::Duration};
1947
1948        use super::*;
1949        use crate::{
1950            logging::{
1951                LOGGING_BYPASSED, logging_clock_set_static_mode, logging_clock_set_static_time,
1952                logging_is_initialized, logging_set_bypass, logging_sync_to_disk,
1953            },
1954            testing::wait_until,
1955        };
1956
1957        #[rstest]
1958        fn test_shutdown_on_error_records_once_then_rearms() {
1959            disarm_shutdown_on_error();
1960
1961            let (tx, _rx) = std::sync::mpsc::channel();
1962            let logger = Logger::new_for_benchmark(LoggerConfig::default(), tx);
1963
1964            arm_shutdown_on_error(false);
1965            let args = format_args!("Disabled error");
1966            let record = log::Record::builder()
1967                .args(args)
1968                .level(Level::Error)
1969                .target("RunComponent")
1970                .build();
1971            log::Log::log(&logger, &record);
1972            assert_eq!(take_shutdown_on_error_trigger(), None);
1973
1974            arm_shutdown_on_error(true);
1975            let args = format_args!("First error");
1976            let record = log::Record::builder()
1977                .args(args)
1978                .level(Level::Error)
1979                .target("RunComponent")
1980                .build();
1981            log::Log::log(&logger, &record);
1982
1983            let args = format_args!("Second error");
1984            let record = log::Record::builder()
1985                .args(args)
1986                .level(Level::Error)
1987                .target("RunComponent")
1988                .build();
1989            log::Log::log(&logger, &record);
1990
1991            let first = take_shutdown_on_error_trigger().unwrap();
1992            assert_eq!(first.component, Ustr::from("RunComponent"));
1993            assert_eq!(first.message, "First error");
1994            assert_eq!(take_shutdown_on_error_trigger(), None);
1995
1996            arm_shutdown_on_error(true);
1997            let args = format_args!("Third error");
1998            let record = log::Record::builder()
1999                .args(args)
2000                .level(Level::Error)
2001                .target("RunComponent")
2002                .build();
2003            log::Log::log(&logger, &record);
2004
2005            let third = take_shutdown_on_error_trigger().unwrap();
2006            assert_eq!(third.component, Ustr::from("RunComponent"));
2007            assert_eq!(third.message, "Third error");
2008
2009            let (tx, rx) = std::sync::mpsc::channel();
2010            let logger = Logger::new_for_benchmark(
2011                LoggerConfig {
2012                    log_components_only: true,
2013                    ..Default::default()
2014                },
2015                tx,
2016            );
2017
2018            arm_shutdown_on_error(true);
2019            let args = format_args!("Filtered error");
2020            let record = log::Record::builder()
2021                .args(args)
2022                .level(Level::Error)
2023                .target("FilteredComponent")
2024                .build();
2025            log::Log::log(&logger, &record);
2026
2027            let trigger = take_shutdown_on_error_trigger().unwrap();
2028            assert_eq!(trigger.component, Ustr::from("FilteredComponent"));
2029            assert_eq!(trigger.message, "Filtered error");
2030            assert!(matches!(
2031                rx.try_recv(),
2032                Err(std::sync::mpsc::TryRecvError::Empty)
2033            ));
2034            disarm_shutdown_on_error();
2035        }
2036
2037        #[rstest]
2038        fn test_shutdown_on_error_records_bypassed_error() {
2039            LOGGING_BYPASSED.store(false, Ordering::Relaxed);
2040            disarm_shutdown_on_error();
2041
2042            let (tx, rx) = std::sync::mpsc::channel();
2043            let logger = Logger::new_for_benchmark(LoggerConfig::default(), tx);
2044            let metadata = log::Metadata::builder()
2045                .level(Level::Error)
2046                .target("BypassedComponent")
2047                .build();
2048
2049            logging_set_bypass();
2050            arm_shutdown_on_error(false);
2051            assert!(!log::Log::enabled(&logger, &metadata));
2052
2053            arm_shutdown_on_error(true);
2054            assert!(log::Log::enabled(&logger, &metadata));
2055
2056            let args = format_args!("Bypassed error");
2057            let record = log::Record::builder()
2058                .args(args)
2059                .level(Level::Error)
2060                .target("BypassedComponent")
2061                .build();
2062            log::Log::log(&logger, &record);
2063
2064            let trigger = take_shutdown_on_error_trigger().unwrap();
2065            assert_eq!(trigger.component, Ustr::from("BypassedComponent"));
2066            assert_eq!(trigger.message, "Bypassed error");
2067            assert!(matches!(
2068                rx.try_recv(),
2069                Err(std::sync::mpsc::TryRecvError::Empty)
2070            ));
2071
2072            LOGGING_BYPASSED.store(false, Ordering::Relaxed);
2073            disarm_shutdown_on_error();
2074        }
2075
2076        #[rstest]
2077        fn test_shutdown_on_error_failed_drain_keeps_trigger_pending() {
2078            LOGGING_BYPASSED.store(false, Ordering::Relaxed);
2079            disarm_shutdown_on_error();
2080
2081            let (tx, _rx) = std::sync::mpsc::channel();
2082            let logger = Logger::new_for_benchmark(LoggerConfig::default(), tx);
2083
2084            arm_shutdown_on_error(true);
2085            let args = format_args!("Pending error");
2086            let record = log::Record::builder()
2087                .args(args)
2088                .level(Level::Error)
2089                .target("PendingComponent")
2090                .build();
2091            log::Log::log(&logger, &record);
2092
2093            let drained = try_drain_shutdown_on_error_trigger(|trigger| {
2094                assert_eq!(trigger.component, Ustr::from("PendingComponent"));
2095                assert_eq!(trigger.message, "Pending error");
2096                false
2097            });
2098            assert!(!drained);
2099
2100            let trigger = take_shutdown_on_error_trigger().unwrap();
2101            assert_eq!(trigger.component, Ustr::from("PendingComponent"));
2102            assert_eq!(trigger.message, "Pending error");
2103            disarm_shutdown_on_error();
2104        }
2105
2106        #[rstest]
2107        fn test_logging_to_file() {
2108            let config = LoggerConfig {
2109                fileout_level: LevelFilter::Debug,
2110                ..Default::default()
2111            };
2112
2113            let temp_dir = tempdir().expect("Failed to create temporary directory");
2114            let file_config = FileWriterConfig {
2115                directory: Some(temp_dir.path().to_str().unwrap().to_string()),
2116                ..Default::default()
2117            };
2118
2119            let log_guard = Logger::init_with_config(
2120                TraderId::from("TRADER-001"),
2121                UUID4::new(),
2122                config,
2123                file_config,
2124            );
2125
2126            logging_clock_set_static_mode();
2127            logging_clock_set_static_time(1_650_000_000_000_000);
2128
2129            log::info!(
2130                component = "RiskEngine";
2131                "This is a test"
2132            );
2133
2134            let mut log_contents = String::new();
2135
2136            wait_until(
2137                || {
2138                    std::fs::read_dir(&temp_dir)
2139                        .expect("Failed to read directory")
2140                        .filter_map(Result::ok)
2141                        .any(|entry| entry.path().is_file())
2142                },
2143                Duration::from_secs(3),
2144            );
2145
2146            drop(log_guard); // Ensure log buffers are flushed
2147
2148            wait_until(
2149                || {
2150                    let log_file_path = std::fs::read_dir(&temp_dir)
2151                        .expect("Failed to read directory")
2152                        .filter_map(Result::ok)
2153                        .find(|entry| entry.path().is_file())
2154                        .expect("No files found in directory")
2155                        .path();
2156                    log_contents = std::fs::read_to_string(log_file_path)
2157                        .expect("Error while reading log file");
2158                    !log_contents.is_empty()
2159                },
2160                Duration::from_secs(3),
2161            );
2162
2163            assert_eq!(
2164                log_contents,
2165                "1970-01-20T02:20:00.000000000Z [INFO] TRADER-001.RiskEngine: This is a test\n"
2166            );
2167        }
2168
2169        #[rstest]
2170        fn test_logging_sync_to_disk_flushes_fast_flush_policy() {
2171            let config = LoggerConfig {
2172                fileout_level: LevelFilter::Debug,
2173                fileout_sync_on_flush: false,
2174                ..Default::default()
2175            };
2176
2177            let temp_dir = tempdir().expect("Failed to create temporary directory");
2178            let file_config = FileWriterConfig {
2179                directory: Some(temp_dir.path().to_str().unwrap().to_string()),
2180                ..Default::default()
2181            };
2182
2183            let log_guard = Logger::init_with_config(
2184                TraderId::from("TRADER-SYNC"),
2185                UUID4::new(),
2186                config,
2187                file_config,
2188            )
2189            .expect("Failed to initialize logger");
2190
2191            logging_clock_set_static_mode();
2192            logging_clock_set_static_time(1_650_000_000_000_000);
2193
2194            log::info!(
2195                component = "RiskEngine";
2196                "sync me"
2197            );
2198
2199            logging_sync_to_disk().expect("sync-to-disk should succeed");
2200
2201            let log_file_path = std::fs::read_dir(&temp_dir)
2202                .expect("Failed to read directory")
2203                .filter_map(Result::ok)
2204                .find(|entry| entry.path().is_file())
2205                .expect("No files found in directory")
2206                .path();
2207            let log_contents =
2208                std::fs::read_to_string(log_file_path).expect("Error while reading log file");
2209
2210            assert!(log_contents.contains("sync me"));
2211
2212            drop(log_guard);
2213        }
2214
2215        #[rstest]
2216        fn test_shutdown_drains_backlog_tail() {
2217            const N: usize = 1000;
2218
2219            // Configure file logging at Info level
2220            let config = LoggerConfig {
2221                stdout_level: LevelFilter::Off,
2222                fileout_level: LevelFilter::Info,
2223                ..Default::default()
2224            };
2225
2226            let temp_dir = tempdir().expect("Failed to create temporary directory");
2227            let file_config = FileWriterConfig {
2228                directory: Some(temp_dir.path().to_str().unwrap().to_string()),
2229                ..Default::default()
2230            };
2231
2232            let log_guard = Logger::init_with_config(
2233                TraderId::from("TRADER-TAIL"),
2234                UUID4::new(),
2235                config,
2236                file_config,
2237            )
2238            .expect("Failed to initialize logger");
2239
2240            // Use static time for reproducibility
2241            logging_clock_set_static_mode();
2242            logging_clock_set_static_time(1_700_000_000_000_000);
2243
2244            // Enqueue a known number of messages synchronously
2245            for i in 0..N {
2246                log::info!(component = "TailDrain"; "BacklogTest {i}");
2247            }
2248
2249            // Drop guard to trigger shutdown (bypass + close + drain)
2250            drop(log_guard);
2251
2252            // Wait until the file exists and contains at least N lines with our marker
2253            let mut count = 0usize;
2254            wait_until(
2255                || {
2256                    if let Some(log_file) = std::fs::read_dir(&temp_dir)
2257                        .expect("Failed to read directory")
2258                        .filter_map(Result::ok)
2259                        .find(|entry| entry.path().is_file())
2260                    {
2261                        let log_file_path = log_file.path();
2262                        if let Ok(contents) = std::fs::read_to_string(log_file_path) {
2263                            count = contents
2264                                .lines()
2265                                .filter(|l| l.contains("BacklogTest "))
2266                                .count();
2267                            count >= N
2268                        } else {
2269                            false
2270                        }
2271                    } else {
2272                        false
2273                    }
2274                },
2275                Duration::from_secs(5),
2276            );
2277
2278            assert_eq!(count, N, "Expected all pre-shutdown messages to be written");
2279        }
2280
2281        #[rstest]
2282        fn test_log_component_level_filtering() {
2283            let config =
2284                LoggerConfig::from_spec("stdout=Info;fileout=Debug;RiskEngine=Error").unwrap();
2285
2286            let temp_dir = tempdir().expect("Failed to create temporary directory");
2287            let file_config = FileWriterConfig {
2288                directory: Some(temp_dir.path().to_str().unwrap().to_string()),
2289                ..Default::default()
2290            };
2291
2292            let log_guard = Logger::init_with_config(
2293                TraderId::from("TRADER-001"),
2294                UUID4::new(),
2295                config,
2296                file_config,
2297            );
2298
2299            logging_clock_set_static_mode();
2300            logging_clock_set_static_time(1_650_000_000_000_000);
2301
2302            log::info!(
2303                component = "RiskEngine";
2304                "This is a test"
2305            );
2306
2307            drop(log_guard); // Ensure log buffers are flushed
2308
2309            wait_until(
2310                || {
2311                    if let Some(log_file) = std::fs::read_dir(&temp_dir)
2312                        .expect("Failed to read directory")
2313                        .filter_map(Result::ok)
2314                        .find(|entry| entry.path().is_file())
2315                    {
2316                        let log_file_path = log_file.path();
2317                        let log_contents = std::fs::read_to_string(log_file_path)
2318                            .expect("Error while reading log file");
2319                        !log_contents.contains("RiskEngine")
2320                    } else {
2321                        false
2322                    }
2323                },
2324                Duration::from_secs(3),
2325            );
2326
2327            assert!(
2328                std::fs::read_dir(&temp_dir)
2329                    .expect("Failed to read directory")
2330                    .filter_map(Result::ok)
2331                    .any(|entry| entry.path().is_file()),
2332                "Log file exists"
2333            );
2334        }
2335
2336        #[rstest]
2337        fn test_logging_to_file_in_json_format() {
2338            let config =
2339                LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Info")
2340                    .unwrap();
2341
2342            let temp_dir = tempdir().expect("Failed to create temporary directory");
2343            let file_config = FileWriterConfig {
2344                directory: Some(temp_dir.path().to_str().unwrap().to_string()),
2345                file_format: Some("json".to_string()),
2346                ..Default::default()
2347            };
2348
2349            let log_guard = Logger::init_with_config(
2350                TraderId::from("TRADER-001"),
2351                UUID4::new(),
2352                config,
2353                file_config,
2354            );
2355
2356            logging_clock_set_static_mode();
2357            logging_clock_set_static_time(1_650_000_000_000_000);
2358
2359            log::info!(
2360                component = "RiskEngine";
2361                "This is a test"
2362            );
2363
2364            let mut log_contents = String::new();
2365
2366            drop(log_guard); // Ensure log buffers are flushed
2367
2368            wait_until(
2369                || {
2370                    if let Some(log_file) = std::fs::read_dir(&temp_dir)
2371                        .expect("Failed to read directory")
2372                        .filter_map(Result::ok)
2373                        .find(|entry| entry.path().is_file())
2374                    {
2375                        let log_file_path = log_file.path();
2376                        log_contents = std::fs::read_to_string(log_file_path)
2377                            .expect("Error while reading log file");
2378                        !log_contents.is_empty()
2379                    } else {
2380                        false
2381                    }
2382                },
2383                Duration::from_secs(3),
2384            );
2385
2386            assert_eq!(
2387                log_contents,
2388                "{\"timestamp\":\"1970-01-20T02:20:00.000000000Z\",\"trader_id\":\"TRADER-001\",\"level\":\"INFO\",\"color\":\"NORMAL\",\"component\":\"RiskEngine\",\"message\":\"This is a test\"}\n"
2389            );
2390        }
2391
2392        #[rstest]
2393        fn test_init_sets_logging_is_initialized_flag() {
2394            let config = LoggerConfig::default();
2395            let file_config = FileWriterConfig::default();
2396
2397            let guard = Logger::init_with_config(
2398                TraderId::from("TRADER-001"),
2399                UUID4::new(),
2400                config,
2401                file_config,
2402            );
2403            assert!(guard.is_ok());
2404            assert!(logging_is_initialized());
2405
2406            drop(guard);
2407            assert!(!logging_is_initialized());
2408        }
2409
2410        #[rstest]
2411        fn test_init_returns_error_when_log_guard_limit_reached() {
2412            let guard = Logger::init_with_config(
2413                TraderId::from("TRADER-001"),
2414                UUID4::new(),
2415                LoggerConfig::default(),
2416                FileWriterConfig::default(),
2417            )
2418            .expect("Failed to initialize logger");
2419
2420            LOGGING_GUARDS_ACTIVE.store(u8::MAX, Ordering::SeqCst);
2421            let result = Logger::init_with_config(
2422                TraderId::from("TRADER-001"),
2423                UUID4::new(),
2424                LoggerConfig::default(),
2425                FileWriterConfig::default(),
2426            );
2427            LOGGING_GUARDS_ACTIVE.store(1, Ordering::SeqCst);
2428            drop(guard);
2429
2430            assert_eq!(
2431                result.unwrap_err().to_string(),
2432                "Logging already initialized but new guard could not be created"
2433            );
2434        }
2435
2436        #[rstest]
2437        fn test_reinit_after_guard_drop_fails() {
2438            let config = LoggerConfig::default();
2439            let file_config = FileWriterConfig::default();
2440
2441            let guard1 = Logger::init_with_config(
2442                TraderId::from("TRADER-001"),
2443                UUID4::new(),
2444                config.clone(),
2445                file_config.clone(),
2446            );
2447            assert!(guard1.is_ok());
2448            drop(guard1);
2449
2450            // Re-init fails because log crate's set_boxed_logger only works once per process
2451            let guard2 = Logger::init_with_config(
2452                TraderId::from("TRADER-002"),
2453                UUID4::new(),
2454                config,
2455                file_config,
2456            );
2457            assert!(guard2.is_err());
2458        }
2459
2460        #[rstest]
2461        fn test_bypass_before_init_prevents_logging() {
2462            logging_set_bypass();
2463            assert!(LOGGING_BYPASSED.load(Ordering::Relaxed));
2464
2465            let temp_dir = tempdir().expect("Failed to create temporary directory");
2466            let config = LoggerConfig {
2467                fileout_level: LevelFilter::Debug,
2468                ..Default::default()
2469            };
2470            let file_config = FileWriterConfig {
2471                directory: Some(temp_dir.path().to_str().unwrap().to_string()),
2472                ..Default::default()
2473            };
2474
2475            let guard = Logger::init_with_config(
2476                TraderId::from("TRADER-001"),
2477                UUID4::new(),
2478                config,
2479                file_config,
2480            );
2481            assert!(guard.is_ok());
2482
2483            log::info!(
2484                component = "TestComponent";
2485                "This should be bypassed"
2486            );
2487            std::thread::sleep(Duration::from_millis(100));
2488            drop(guard);
2489
2490            // Bypass flag remains permanently set (no reset mechanism)
2491            assert!(LOGGING_BYPASSED.load(Ordering::Relaxed));
2492        }
2493
2494        #[rstest]
2495        fn test_module_level_filtering() {
2496            // Configure module-level filters (note: requires :: to be a module filter):
2497            // - nautilus::adapters=Warn (general adapter logs at Warn+)
2498            // - nautilus::adapters::okx=Debug (OKX adapter logs at Debug+)
2499            let config = LoggerConfig::from_spec(
2500                "stdout=Off;fileout=Trace;nautilus::adapters=Warn;nautilus::adapters::okx=Debug",
2501            )
2502            .unwrap();
2503
2504            let temp_dir = tempdir().expect("Failed to create temporary directory");
2505            let file_config = FileWriterConfig {
2506                directory: Some(temp_dir.path().to_str().unwrap().to_string()),
2507                ..Default::default()
2508            };
2509
2510            let log_guard = Logger::init_with_config(
2511                TraderId::from("TRADER-MOD"),
2512                UUID4::new(),
2513                config,
2514                file_config,
2515            )
2516            .expect("Failed to initialize logger");
2517
2518            logging_clock_set_static_mode();
2519            logging_clock_set_static_time(1_650_000_000_000_000);
2520
2521            // Log from nautilus::adapters::okx::websocket - should pass (Debug allowed)
2522            log::debug!(
2523                component = "nautilus::adapters::okx::websocket";
2524                "OKX debug message"
2525            );
2526
2527            // Log from nautilus::adapters::okx - should pass (Debug allowed)
2528            log::info!(
2529                component = "nautilus::adapters::okx";
2530                "OKX info message"
2531            );
2532
2533            // Log from nautilus::adapters::binance - should be filtered (only Warn+ allowed)
2534            log::info!(
2535                component = "nautilus::adapters::binance";
2536                "Binance info message SHOULD NOT APPEAR"
2537            );
2538
2539            // Log from nautilus::adapters::binance at Warn - should pass
2540            log::warn!(
2541                component = "nautilus::adapters::binance";
2542                "Binance warn message"
2543            );
2544
2545            // Log from unrelated component - should pass (no filter)
2546            log::trace!(
2547                component = "Portfolio";
2548                "Portfolio trace message"
2549            );
2550
2551            drop(log_guard);
2552
2553            wait_until(
2554                || {
2555                    std::fs::read_dir(&temp_dir)
2556                        .expect("Failed to read directory")
2557                        .filter_map(Result::ok)
2558                        .any(|entry| entry.path().is_file())
2559                },
2560                Duration::from_secs(3),
2561            );
2562
2563            let log_file_path = std::fs::read_dir(&temp_dir)
2564                .expect("Failed to read directory")
2565                .filter_map(Result::ok)
2566                .find(|entry| entry.path().is_file())
2567                .expect("No log file found")
2568                .path();
2569
2570            let log_contents =
2571                std::fs::read_to_string(log_file_path).expect("Error reading log file");
2572
2573            assert!(
2574                log_contents.contains("OKX debug message"),
2575                "OKX debug should pass (longer prefix wins)"
2576            );
2577            assert!(
2578                log_contents.contains("OKX info message"),
2579                "OKX info should pass"
2580            );
2581            assert!(
2582                log_contents.contains("Binance warn message"),
2583                "Binance warn should pass"
2584            );
2585            assert!(
2586                log_contents.contains("Portfolio trace message"),
2587                "Unfiltered component should pass"
2588            );
2589            assert!(
2590                !log_contents.contains("SHOULD NOT APPEAR"),
2591                "Binance info should be filtered (adapters=Warn)"
2592            );
2593        }
2594
2595        #[rstest]
2596        fn test_logging_to_file_with_kv_fields() {
2597            let config = LoggerConfig {
2598                fileout_level: LevelFilter::Debug,
2599                ..Default::default()
2600            };
2601
2602            let temp_dir = tempdir().expect("Failed to create temporary directory");
2603            let file_config = FileWriterConfig {
2604                directory: Some(temp_dir.path().to_str().unwrap().to_string()),
2605                ..Default::default()
2606            };
2607
2608            let log_guard = Logger::init_with_config(
2609                TraderId::from("TRADER-001"),
2610                UUID4::new(),
2611                config,
2612                file_config,
2613            );
2614
2615            logging_clock_set_static_mode();
2616            logging_clock_set_static_time(1_650_000_000_000_000);
2617
2618            log::info!(
2619                component = "DataEngine",
2620                venue = "BINANCE",
2621                product_type = "SPOT";
2622                "WebSocket connected"
2623            );
2624
2625            let mut log_contents = String::new();
2626
2627            drop(log_guard);
2628
2629            wait_until(
2630                || {
2631                    if let Some(log_file) = std::fs::read_dir(&temp_dir)
2632                        .expect("Failed to read directory")
2633                        .filter_map(Result::ok)
2634                        .find(|entry| entry.path().is_file())
2635                    {
2636                        log_contents = std::fs::read_to_string(log_file.path())
2637                            .expect("Error while reading log file");
2638                        !log_contents.is_empty()
2639                    } else {
2640                        false
2641                    }
2642                },
2643                Duration::from_secs(3),
2644            );
2645
2646            assert!(
2647                log_contents.contains("WebSocket connected"),
2648                "Message should be present"
2649            );
2650            assert!(
2651                log_contents.contains("venue=BINANCE"),
2652                "venue field should appear in output, was:\n{log_contents}"
2653            );
2654            assert!(
2655                log_contents.contains("product_type=SPOT"),
2656                "product_type field should appear in output, was:\n{log_contents}"
2657            );
2658        }
2659
2660        #[rstest]
2661        fn test_logging_to_file_json_with_kv_fields() {
2662            let config =
2663                LoggerConfig::from_spec("stdout=Off;fileout=Debug;DataEngine=Debug").unwrap();
2664
2665            let temp_dir = tempdir().expect("Failed to create temporary directory");
2666            let file_config = FileWriterConfig {
2667                directory: Some(temp_dir.path().to_str().unwrap().to_string()),
2668                file_format: Some("json".to_string()),
2669                ..Default::default()
2670            };
2671
2672            let log_guard = Logger::init_with_config(
2673                TraderId::from("TRADER-001"),
2674                UUID4::new(),
2675                config,
2676                file_config,
2677            );
2678
2679            logging_clock_set_static_mode();
2680            logging_clock_set_static_time(1_650_000_000_000_000);
2681
2682            log::info!(
2683                component = "DataEngine",
2684                venue = "BINANCE",
2685                order_id = "O-12345";
2686                "Order filled"
2687            );
2688
2689            let mut log_contents = String::new();
2690
2691            drop(log_guard);
2692
2693            wait_until(
2694                || {
2695                    if let Some(log_file) = std::fs::read_dir(&temp_dir)
2696                        .expect("Failed to read directory")
2697                        .filter_map(Result::ok)
2698                        .find(|entry| entry.path().is_file())
2699                    {
2700                        log_contents = std::fs::read_to_string(log_file.path())
2701                            .expect("Error while reading log file");
2702                        !log_contents.is_empty()
2703                    } else {
2704                        false
2705                    }
2706                },
2707                Duration::from_secs(3),
2708            );
2709
2710            let parsed: serde_json::Value =
2711                serde_json::from_str(log_contents.trim()).expect("Should be valid JSON");
2712            assert_eq!(parsed["component"], "DataEngine");
2713            assert_eq!(parsed["message"], "Order filled");
2714            assert_eq!(parsed["venue"], "BINANCE");
2715            assert_eq!(parsed["order_id"], "O-12345");
2716        }
2717    }
2718
2719    #[cfg(all(feature = "simulation", madsim))]
2720    mod sim_tests {
2721        use std::sync::atomic::Ordering;
2722
2723        use super::*;
2724        use crate::logging::LOGGING_BYPASSED;
2725
2726        #[rstest]
2727        fn test_init_under_madsim_skips_writer_thread_and_forces_bypass() {
2728            let config = LoggerConfig {
2729                bypass_logging: false,
2730                ..Default::default()
2731            };
2732            let temp_dir = tempdir().expect("Failed to create temporary directory");
2733            let file_config = FileWriterConfig {
2734                directory: Some(temp_dir.path().to_str().unwrap().to_string()),
2735                ..Default::default()
2736            };
2737
2738            let _guard = Logger::init_with_config(
2739                TraderId::from("TRADER-SIM"),
2740                UUID4::new(),
2741                config,
2742                file_config,
2743            )
2744            .expect("init should succeed under simulation");
2745
2746            assert!(LOGGING_INITIALIZED.load(Ordering::SeqCst));
2747            assert!(
2748                LOGGING_BYPASSED.load(Ordering::SeqCst),
2749                "bypass must be forced under cfg(madsim) even when config disables it"
2750            );
2751            assert!(
2752                LOGGER_HANDLE
2753                    .lock()
2754                    .expect("LOGGER_HANDLE mutex should not be poisoned")
2755                    .is_none(),
2756                "writer thread must not be spawned under cfg(madsim)"
2757            );
2758        }
2759    }
2760}