use std::{
cell::RefCell,
fmt::{Display, Write as _},
sync::{
Mutex, OnceLock,
atomic::{AtomicBool, Ordering},
mpsc::SendError,
},
};
use ahash::AHashMap;
use indexmap::IndexMap;
use log::{
Level, LevelFilter, Log, STATIC_MAX_LEVEL,
kv::{ToValue, Value},
set_boxed_logger, set_max_level,
};
use nautilus_core::{
UUID4, UnixNanos,
datetime::unix_nanos_to_iso8601,
time::{get_atomic_clock_realtime, get_atomic_clock_static},
};
use nautilus_model::identifiers::TraderId;
use serde::{Deserialize, Serialize, Serializer, ser::SerializeMap};
use smallvec::SmallVec;
use ustr::Ustr;
pub use super::config::LoggerConfig;
use super::{LOGGING_BYPASSED, LOGGING_GUARDS_ACTIVE, LOGGING_INITIALIZED, LOGGING_REALTIME};
#[cfg(not(all(feature = "simulation", madsim)))]
use crate::logging::writer::{FileWriter, LogWriter, StderrWriter, StdoutWriter};
use crate::{
enums::{LogColor, LogLevel},
logging::writer::FileWriterConfig,
};
#[cfg(not(all(feature = "simulation", madsim)))]
const LOGGING: &str = "logging";
const KV_COLOR: &str = "color";
const KV_COMPONENT: &str = "component";
const LOG_FIELDS_INLINE_CAP: usize = 0;
const MAX_LEVEL_DISPLAY_LEN: usize = "ERROR".len();
const ANSI_BOLD_LEN: usize = "\x1b[1m".len();
const ANSI_RESET_LEN: usize = "\x1b[0m".len();
const PLAIN_FORMAT_OVERHEAD: usize = " [".len() + "] ".len() + ".".len() + ": ".len() + "\n".len();
const COLORED_FORMAT_OVERHEAD: usize = ANSI_BOLD_LEN
+ ANSI_RESET_LEN
+ " ".len()
+ "[".len()
+ "] ".len()
+ ".".len()
+ ": ".len()
+ ANSI_RESET_LEN
+ "\n".len();
const REPEATED_USTR_CACHE_CAP: usize = 8;
thread_local! {
static REPEATED_USTR_CACHE: RefCell<RepeatedUstrCache> =
const { RefCell::new(RepeatedUstrCache::new()) };
}
#[derive(Clone, Copy)]
struct RepeatedUstrCacheEntry {
ptr: usize,
len: usize,
value: Ustr,
}
#[derive(Clone, Copy)]
struct RepeatedUstrCache {
entries: [Option<RepeatedUstrCacheEntry>; REPEATED_USTR_CACHE_CAP],
next: usize,
}
impl RepeatedUstrCache {
const fn new() -> Self {
Self {
entries: [None; REPEATED_USTR_CACHE_CAP],
next: 0,
}
}
}
pub type LogFields = SmallVec<[(Ustr, String); LOG_FIELDS_INLINE_CAP]>;
static LOGGER_TX: OnceLock<std::sync::mpsc::Sender<LogEvent>> = OnceLock::new();
static LOGGER_HANDLE: Mutex<Option<std::thread::JoinHandle<()>>> = Mutex::new(None);
static SHUTDOWN_ON_ERROR: OnceLock<ShutdownOnError> = OnceLock::new();
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ShutdownOnErrorTrigger {
pub timestamp: UnixNanos,
pub component: Ustr,
pub message: String,
}
#[derive(Debug, Default)]
struct ShutdownOnError {
armed: AtomicBool,
triggered: AtomicBool,
pending: Mutex<Option<ShutdownOnErrorTrigger>>,
}
impl ShutdownOnError {
fn is_armed(&self) -> bool {
self.armed.load(Ordering::Acquire)
}
fn arm(&self, enabled: bool) {
if let Ok(mut pending) = self.pending.lock() {
pending.take();
}
self.triggered.store(false, Ordering::Release);
self.armed.store(enabled, Ordering::Release);
}
fn disarm(&self) {
self.armed.store(false, Ordering::Release);
self.triggered.store(false, Ordering::Release);
if let Ok(mut pending) = self.pending.lock() {
pending.take();
}
}
fn maybe_record_trigger<F>(
&self,
level: Level,
timestamp: UnixNanos,
component: Ustr,
message: F,
) where
F: FnOnce() -> String,
{
if !self.armed.load(Ordering::Acquire) || level != Level::Error {
return;
}
let Ok(mut pending) = self.pending.lock() else {
return;
};
if self
.triggered
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
return;
}
*pending = Some(ShutdownOnErrorTrigger {
timestamp,
component,
message: message(),
});
}
fn take_trigger(&self) -> Option<ShutdownOnErrorTrigger> {
if !self.triggered.load(Ordering::Acquire) {
return None;
}
self.pending
.lock()
.ok()
.and_then(|mut pending| pending.take())
}
fn try_drain_trigger<F>(&self, drain: F) -> bool
where
F: FnOnce(&ShutdownOnErrorTrigger) -> bool,
{
if !self.triggered.load(Ordering::Acquire) {
return false;
}
let Ok(mut pending) = self.pending.lock() else {
return false;
};
let Some(trigger) = pending.as_ref() else {
return false;
};
if !drain(trigger) {
return false;
}
pending.take();
true
}
}
pub fn arm_shutdown_on_error(enabled: bool) {
shutdown_on_error().arm(enabled);
}
pub fn disarm_shutdown_on_error() {
shutdown_on_error().disarm();
}
pub fn take_shutdown_on_error_trigger() -> Option<ShutdownOnErrorTrigger> {
shutdown_on_error().take_trigger()
}
pub fn try_drain_shutdown_on_error_trigger<F>(drain: F) -> bool
where
F: FnOnce(&ShutdownOnErrorTrigger) -> bool,
{
shutdown_on_error().try_drain_trigger(drain)
}
fn shutdown_on_error() -> &'static ShutdownOnError {
SHUTDOWN_ON_ERROR.get_or_init(ShutdownOnError::default)
}
#[derive(Debug, Clone)]
struct FilterPolicy {
modules_by_longest_prefix: Vec<(Ustr, LevelFilter)>,
components: AHashMap<Ustr, LevelFilter>,
components_only: bool,
}
impl FilterPolicy {
fn from_config(config: &LoggerConfig) -> Option<Self> {
let modules_by_longest_prefix = sorted_module_filters_from_map(&config.module_level);
if !config.log_components_only
&& modules_by_longest_prefix.is_empty()
&& config.component_level.is_empty()
{
return None;
}
Some(Self {
modules_by_longest_prefix,
components: config.component_level.clone(),
components_only: config.log_components_only,
})
}
fn should_skip(&self, component: &Ustr, level: Level) -> bool {
should_filter_log_inner(
component,
level,
&self.modules_by_longest_prefix,
&self.components,
self.components_only,
)
}
}
#[derive(Debug)]
pub struct Logger {
pub config: LoggerConfig,
filter_policy: Option<FilterPolicy>,
tx: std::sync::mpsc::Sender<LogEvent>,
}
#[derive(Debug)]
pub enum LogEvent {
Log(LogLine),
Flush,
Sync(std::sync::mpsc::Sender<anyhow::Result<()>>),
Close,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct LogLine {
pub timestamp: UnixNanos,
pub level: Level,
pub color: LogColor,
pub component: Ustr,
pub message: String,
#[serde(default, skip_serializing_if = "SmallVec::is_empty")]
pub fields: LogFields,
}
impl Display for LogLine {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[{}] {}: {}", self.level, self.component, self.message)?;
for (k, v) in &self.fields {
write!(f, " {k}={v}")?;
}
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct LogLineWrapper {
line: LogLine,
cache: Option<String>,
colored: Option<String>,
trader_id: Ustr,
}
impl LogLineWrapper {
#[must_use]
pub const fn new(line: LogLine, trader_id: Ustr) -> Self {
Self {
line,
cache: None,
colored: None,
trader_id,
}
}
pub fn get_string(&mut self) -> &str {
self.cache.get_or_insert_with(|| {
let timestamp = unix_nanos_to_iso8601(self.line.timestamp);
let mut s = String::with_capacity(plain_log_line_capacity(
×tamp,
self.trader_id,
&self.line,
));
write!(
s,
"{} [{}] {}.{}: {}",
timestamp, self.line.level, self.trader_id, self.line.component, self.line.message,
)
.expect("writing to String should not fail");
for (k, v) in &self.line.fields {
s.push(' ');
s.push_str(k);
s.push('=');
s.push_str(v);
}
s.push('\n');
s
})
}
pub fn get_colored(&mut self) -> &str {
self.colored.get_or_insert_with(|| {
let timestamp = unix_nanos_to_iso8601(self.line.timestamp);
let color_ansi = self.line.color.as_ansi();
let mut s = String::with_capacity(colored_log_line_capacity(
×tamp,
color_ansi,
self.trader_id,
&self.line,
));
write!(
s,
"\x1b[1m{}\x1b[0m {}[{}] {}.{}: {}",
timestamp,
color_ansi,
self.line.level,
self.trader_id,
self.line.component,
self.line.message,
)
.expect("writing to String should not fail");
for (k, v) in &self.line.fields {
s.push(' ');
s.push_str(k);
s.push('=');
s.push_str(v);
}
s.push_str("\x1b[0m\n");
s
})
}
#[must_use]
pub fn get_json(&self) -> String {
let mut json_string =
serde_json::to_string(&self).expect("Error serializing log event to string");
json_string.push('\n');
json_string
}
}
fn formatted_fields_len(fields: &LogFields) -> usize {
fields.iter().map(|(k, v)| 2 + k.len() + v.len()).sum()
}
fn log_line_capacity(
timestamp: &str,
trader_id: Ustr,
line: &LogLine,
overhead: usize,
ansi_extra_len: usize,
) -> usize {
timestamp.len()
+ overhead
+ ansi_extra_len
+ MAX_LEVEL_DISPLAY_LEN
+ trader_id.len()
+ line.component.len()
+ line.message.len()
+ formatted_fields_len(&line.fields)
}
fn plain_log_line_capacity(timestamp: &str, trader_id: Ustr, line: &LogLine) -> usize {
log_line_capacity(timestamp, trader_id, line, PLAIN_FORMAT_OVERHEAD, 0)
}
fn colored_log_line_capacity(
timestamp: &str,
color_ansi: &str,
trader_id: Ustr,
line: &LogLine,
) -> usize {
log_line_capacity(
timestamp,
trader_id,
line,
COLORED_FORMAT_OVERHEAD,
color_ansi.len(),
)
}
impl Serialize for LogLineWrapper {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
if has_duplicate_json_field(&self.line.fields) {
return serialize_log_line_with_indexmap(self, serializer);
}
let timestamp = unix_nanos_to_iso8601(self.line.timestamp);
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("timestamp", ×tamp)?;
map.serialize_entry("trader_id", self.trader_id.as_str())?;
map.serialize_entry("level", &DisplayAsString(&self.line.level))?;
map.serialize_entry("color", &DisplayAsString(&self.line.color))?;
map.serialize_entry("component", self.line.component.as_str())?;
map.serialize_entry("message", &self.line.message)?;
for (k, v) in &self.line.fields {
let key = k.as_str();
if !is_reserved_json_key(key) {
map.serialize_entry(key, v)?;
}
}
map.end()
}
}
struct DisplayAsString<'a, T: ?Sized>(&'a T);
impl<T> Serialize for DisplayAsString<'_, T>
where
T: Display + ?Sized,
{
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.collect_str(self.0)
}
}
fn is_reserved_json_key(key: &str) -> bool {
matches!(
key,
"timestamp" | "trader_id" | "level" | "color" | "component" | "message"
)
}
fn has_duplicate_json_field(fields: &LogFields) -> bool {
if fields.is_empty() {
return false;
}
for (idx, (key, _)) in fields.iter().enumerate() {
let key = key.as_str();
if is_reserved_json_key(key) {
continue;
}
if fields
.iter()
.take(idx)
.any(|(prev, _)| !is_reserved_json_key(prev.as_str()) && prev.as_str() == key)
{
return true;
}
}
false
}
fn serialize_log_line_with_indexmap<S>(
wrapper: &LogLineWrapper,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut json_obj = IndexMap::new();
let timestamp = unix_nanos_to_iso8601(wrapper.line.timestamp);
json_obj.insert("timestamp".to_string(), timestamp);
json_obj.insert("trader_id".to_string(), wrapper.trader_id.to_string());
json_obj.insert("level".to_string(), wrapper.line.level.to_string());
json_obj.insert("color".to_string(), wrapper.line.color.to_string());
json_obj.insert("component".to_string(), wrapper.line.component.to_string());
json_obj.insert("message".to_string(), wrapper.line.message.clone());
for (k, v) in &wrapper.line.fields {
let key = k.as_str();
if !is_reserved_json_key(key) {
json_obj.insert(k.to_string(), v.clone());
}
}
json_obj.serialize(serializer)
}
fn sorted_module_filters_from_map(
module_level: &AHashMap<Ustr, LevelFilter>,
) -> Vec<(Ustr, LevelFilter)> {
let mut filters: Vec<_> = module_level
.iter()
.map(|(path, level)| (*path, *level))
.collect();
filters.sort_by_key(|(path, _)| std::cmp::Reverse(path.len()));
filters
}
fn current_log_timestamp() -> UnixNanos {
if LOGGING_REALTIME.load(Ordering::Relaxed) {
get_atomic_clock_realtime().get_time_ns()
} else {
get_atomic_clock_static().get_time_ns()
}
}
fn intern_repeated(value: &str) -> Ustr {
REPEATED_USTR_CACHE.with(|cache| {
let mut cache_state = cache.borrow_mut();
let ptr = value.as_ptr() as usize;
let len = value.len();
for entry in cache_state.entries.iter().flatten() {
if entry.ptr == ptr && entry.len == len && entry.value.as_str() == value {
return entry.value;
}
}
let interned = Ustr::from(value);
let insert_idx = cache_state.next;
cache_state.entries[insert_idx] = Some(RepeatedUstrCacheEntry {
ptr,
len,
value: interned,
});
cache_state.next = (insert_idx + 1) % REPEATED_USTR_CACHE_CAP;
interned
})
}
fn intern_component_value(value: &log::kv::Value<'_>) -> Ustr {
match value.to_borrowed_str() {
Some(component) => intern_repeated(component),
None => Ustr::from(&value.to_string()),
}
}
struct ComponentProbe {
component: Option<Ustr>,
}
impl ComponentProbe {
const fn new() -> Self {
Self { component: None }
}
}
impl<'kvs> log::kv::VisitSource<'kvs> for ComponentProbe {
fn visit_pair(
&mut self,
key: log::kv::Key<'kvs>,
value: log::kv::Value<'kvs>,
) -> Result<(), log::kv::Error> {
if key.as_str() == KV_COMPONENT {
self.component = Some(intern_component_value(&value));
}
Ok(())
}
}
struct PayloadCollector {
color: Option<LogColor>,
fields: LogFields,
}
impl PayloadCollector {
fn new() -> Self {
Self {
color: None,
fields: SmallVec::new(),
}
}
}
impl<'kvs> log::kv::VisitSource<'kvs> for PayloadCollector {
fn visit_pair(
&mut self,
key: log::kv::Key<'kvs>,
value: log::kv::Value<'kvs>,
) -> Result<(), log::kv::Error> {
match key.as_str() {
KV_COLOR => {
self.color = value.to_u64().map(|v| (v as u8).into());
}
KV_COMPONENT => {}
_ => {
self.fields
.push((Ustr::from(key.as_str()), value.to_string()));
}
}
Ok(())
}
}
struct FieldCollector {
color: Option<LogColor>,
component: Option<Ustr>,
fields: LogFields,
}
impl FieldCollector {
fn new() -> Self {
Self {
color: None,
component: None,
fields: SmallVec::new(),
}
}
}
impl<'kvs> log::kv::VisitSource<'kvs> for FieldCollector {
fn visit_pair(
&mut self,
key: log::kv::Key<'kvs>,
value: log::kv::Value<'kvs>,
) -> Result<(), log::kv::Error> {
match key.as_str() {
KV_COLOR => {
self.color = value.to_u64().map(|v| (v as u8).into());
}
KV_COMPONENT => {
self.component = Some(intern_component_value(&value));
}
_ => {
self.fields
.push((Ustr::from(key.as_str()), value.to_string()));
}
}
Ok(())
}
}
impl Log for Logger {
fn enabled(&self, metadata: &log::Metadata) -> bool {
if LOGGING_BYPASSED.load(Ordering::Relaxed) {
return metadata.level() == Level::Error && shutdown_on_error().is_armed();
}
metadata.level() == Level::Error
|| metadata.level() <= self.config.stdout_level
|| metadata.level() <= self.config.fileout_level
}
fn log(&self, record: &log::Record) {
let level = record.level();
if LOGGING_BYPASSED.load(Ordering::Relaxed) {
if level == Level::Error {
record_shutdown_on_error(record, level);
}
return;
}
if self.enabled(record.metadata()) {
if let Some(filter_policy) = &self.filter_policy {
let mut probe = ComponentProbe::new();
let _ = record.key_values().visit(&mut probe);
let component = probe
.component
.unwrap_or_else(|| intern_repeated(record.metadata().target()));
if filter_policy.should_skip(&component, level) {
if level == Level::Error {
shutdown_on_error().maybe_record_trigger(
level,
current_log_timestamp(),
component,
|| format!("{}", record.args()),
);
}
return;
}
let timestamp = current_log_timestamp();
let mut collector = PayloadCollector::new();
let _ = record.key_values().visit(&mut collector);
let color = collector.color.unwrap_or_else(|| level.into());
let line = LogLine {
timestamp,
level,
color,
component,
message: format!("{}", record.args()),
fields: collector.fields,
};
shutdown_on_error().maybe_record_trigger(
line.level,
line.timestamp,
line.component,
|| line.message.clone(),
);
self.send_log_line(line);
return;
}
let timestamp = current_log_timestamp();
let mut collector = FieldCollector::new();
let _ = record.key_values().visit(&mut collector);
let color = collector.color.unwrap_or_else(|| level.into());
let component = collector
.component
.unwrap_or_else(|| intern_repeated(record.metadata().target()));
let line = LogLine {
timestamp,
level,
color,
component,
message: format!("{}", record.args()),
fields: collector.fields,
};
shutdown_on_error().maybe_record_trigger(
line.level,
line.timestamp,
line.component,
|| line.message.clone(),
);
self.send_log_line(line);
}
}
fn flush(&self) {
if LOGGING_BYPASSED.load(Ordering::Relaxed) {
return;
}
if let Err(e) = self.tx.send(LogEvent::Flush) {
eprintln!("Error sending flush log event: {e}");
}
}
}
fn record_shutdown_on_error(record: &log::Record, level: Level) {
let mut probe = ComponentProbe::new();
let _ = record.key_values().visit(&mut probe);
let component = probe
.component
.unwrap_or_else(|| intern_repeated(record.metadata().target()));
shutdown_on_error().maybe_record_trigger(level, current_log_timestamp(), component, || {
format!("{}", record.args())
});
}
impl Logger {
#[doc(hidden)]
#[must_use]
pub fn new_for_benchmark(config: LoggerConfig, tx: std::sync::mpsc::Sender<LogEvent>) -> Self {
let filter_policy = FilterPolicy::from_config(&config);
Self {
config,
filter_policy,
tx,
}
}
fn send_log_line(&self, line: LogLine) {
if let Err(SendError(LogEvent::Log(line))) = self.tx.send(LogEvent::Log(line)) {
eprintln!("Error sending log event (receiver closed): {line}");
}
}
pub fn init_with_env(
trader_id: TraderId,
instance_id: UUID4,
file_config: FileWriterConfig,
) -> anyhow::Result<LogGuard> {
let config = LoggerConfig::from_env()?;
Self::init_with_config(trader_id, instance_id, config, file_config)
}
pub fn init_with_config(
trader_id: TraderId,
instance_id: UUID4,
config: LoggerConfig,
file_config: FileWriterConfig,
) -> anyhow::Result<LogGuard> {
if super::LOGGING_INITIALIZED.load(Ordering::SeqCst) {
return LogGuard::new().ok_or_else(|| {
anyhow::anyhow!("Logging already initialized but new guard could not be created")
});
}
let (tx, rx) = std::sync::mpsc::channel::<LogEvent>();
let filter_policy = FilterPolicy::from_config(&config);
let logger_tx = tx.clone();
let logger = Self {
config: config.clone(),
filter_policy,
tx: logger_tx,
};
set_boxed_logger(Box::new(logger))?;
if LOGGER_TX.set(tx).is_err() {
debug_assert!(
false,
"LOGGER_TX already set - re-initialization not supported"
);
}
if config.bypass_logging {
super::logging_set_bypass();
}
let is_colored = config.is_colored;
let print_config = config.print_config;
if print_config {
println!("STATIC_MAX_LEVEL={STATIC_MAX_LEVEL}");
println!("Logger initialized with {config:?} {file_config:?}");
}
#[cfg(not(all(feature = "simulation", madsim)))]
{
let handle = std::thread::Builder::new()
.name(LOGGING.to_string())
.spawn(move || {
Self::handle_messages(
trader_id.to_string(),
instance_id.to_string(),
config,
file_config,
rx,
);
})?;
if let Ok(mut handle_guard) = LOGGER_HANDLE.lock() {
debug_assert!(
handle_guard.is_none(),
"LOGGER_HANDLE already set - re-initialization not supported"
);
*handle_guard = Some(handle);
}
}
#[cfg(all(feature = "simulation", madsim))]
{
let _ = (trader_id, instance_id, config, file_config, rx);
super::logging_set_bypass();
}
let max_level = log::LevelFilter::Trace;
set_max_level(max_level);
if print_config {
println!("Logger set as `log` implementation with max level {max_level}");
}
super::LOGGING_INITIALIZED.store(true, Ordering::SeqCst);
super::LOGGING_COLORED.store(is_colored, Ordering::SeqCst);
LogGuard::new()
.ok_or_else(|| anyhow::anyhow!("Failed to create LogGuard from global sender"))
}
#[cfg(not(all(feature = "simulation", madsim)))]
#[expect(clippy::needless_pass_by_value)]
fn handle_messages(
trader_id: String,
instance_id: String,
config: LoggerConfig,
file_config: FileWriterConfig,
rx: std::sync::mpsc::Receiver<LogEvent>,
) {
let LoggerConfig {
stdout_level,
fileout_level,
component_level: _,
module_level: _,
log_components_only: _,
is_colored,
print_config: _,
use_tracing: _,
bypass_logging: _,
file_config: _,
clear_log_file,
fileout_sync_on_flush,
buffered_stdout,
} = config;
let trader_id_cache = Ustr::from(&trader_id);
let mut stdout_writer = StdoutWriter::new(stdout_level, is_colored, buffered_stdout);
let mut stderr_writer = StderrWriter::new(is_colored);
let mut file_writer_opt = if fileout_level == LevelFilter::Off {
None
} else {
FileWriter::new(
trader_id,
instance_id,
file_config,
fileout_level,
clear_log_file,
fileout_sync_on_flush,
)
};
let process_event = |event: LogEvent,
stdout_writer: &mut StdoutWriter,
stderr_writer: &mut StderrWriter,
file_writer_opt: &mut Option<FileWriter>| {
match event {
LogEvent::Log(line) => {
let mut wrapper = LogLineWrapper::new(line, trader_id_cache);
if stderr_writer.enabled(&wrapper.line) {
if is_colored {
stderr_writer.write(wrapper.get_colored());
} else {
stderr_writer.write(wrapper.get_string());
}
}
if stdout_writer.enabled(&wrapper.line) {
if is_colored {
stdout_writer.write(wrapper.get_colored());
} else {
stdout_writer.write(wrapper.get_string());
}
}
if let Some(file_writer) = file_writer_opt
&& file_writer.enabled(&wrapper.line)
{
if file_writer.json_format {
file_writer.write(&wrapper.get_json());
} else {
file_writer.write(wrapper.get_string());
}
}
}
LogEvent::Flush => {
stdout_writer.flush();
stderr_writer.flush();
if let Some(file_writer) = file_writer_opt {
file_writer.flush();
}
}
LogEvent::Sync(done) => {
let result = if let Some(file_writer) = file_writer_opt {
file_writer.flush_and_sync().map_err(anyhow::Error::from)
} else {
Ok(())
};
let _ = done.send(result);
}
LogEvent::Close => {
}
}
};
while let Ok(event) = rx.recv() {
match event {
LogEvent::Log(_) | LogEvent::Flush | LogEvent::Sync(_) => process_event(
event,
&mut stdout_writer,
&mut stderr_writer,
&mut file_writer_opt,
),
LogEvent::Close => {
stdout_writer.flush();
stderr_writer.flush();
if let Some(ref mut file_writer) = file_writer_opt {
file_writer.flush();
}
while let Ok(evt) = rx.try_recv() {
match evt {
LogEvent::Close => (), _ => process_event(
evt,
&mut stdout_writer,
&mut stderr_writer,
&mut file_writer_opt,
),
}
}
stdout_writer.flush();
stderr_writer.flush();
if let Some(ref mut file_writer) = file_writer_opt {
file_writer.flush_and_sync_logged();
}
break;
}
}
}
}
}
#[must_use]
pub fn should_filter_log(
component: &Ustr,
line_level: log::Level,
module_filters_sorted: &[(Ustr, LevelFilter)],
component_level: &AHashMap<Ustr, LevelFilter>,
log_components_only: bool,
) -> bool {
should_filter_log_inner(
component,
line_level,
module_filters_sorted,
component_level,
log_components_only,
)
}
fn should_filter_log_inner(
component: &Ustr,
line_level: log::Level,
module_filters_sorted: &[(Ustr, LevelFilter)],
component_level: &AHashMap<Ustr, LevelFilter>,
log_components_only: bool,
) -> bool {
if module_filters_sorted.is_empty() && component_level.is_empty() {
return log_components_only;
}
let module_filter = module_filters_sorted
.iter()
.find(|(path, _)| component.starts_with(path.as_str()))
.map(|(_, level)| *level);
let component_filter = component_level.get(component).copied();
if log_components_only && module_filter.is_none() && component_filter.is_none() {
return true;
}
if let Some(filter_level) = module_filter.or(component_filter)
&& line_level > filter_level
{
return true;
}
false
}
pub(crate) fn shutdown_graceful() {
LOGGING_BYPASSED.store(true, Ordering::SeqCst);
log::set_max_level(log::LevelFilter::Off);
if let Some(tx) = LOGGER_TX.get() {
let _ = tx.send(LogEvent::Close);
}
if let Ok(mut handle_guard) = LOGGER_HANDLE.lock()
&& let Some(handle) = handle_guard.take()
&& handle.thread().id() != std::thread::current().id()
{
let _ = handle.join();
}
LOGGING_INITIALIZED.store(false, Ordering::SeqCst);
}
pub fn sync_to_disk() -> anyhow::Result<()> {
if !LOGGING_INITIALIZED.load(Ordering::SeqCst) {
return Ok(());
}
let Some(tx) = LOGGER_TX.get() else {
return Ok(());
};
let (done_tx, done_rx) = std::sync::mpsc::channel();
tx.send(LogEvent::Sync(done_tx))
.map_err(|e| anyhow::anyhow!("failed to request logging sync: {e}"))?;
done_rx
.recv()
.map_err(|e| anyhow::anyhow!("failed to receive logging sync acknowledgement: {e}"))?
}
pub fn log<T: AsRef<str>>(level: LogLevel, color: LogColor, component: Ustr, message: T) {
let color = Value::from(color as u8);
match level {
LogLevel::Off => {}
LogLevel::Trace => {
log::trace!(component = component.to_value(), color = color; "{}", message.as_ref());
}
LogLevel::Debug => {
log::debug!(component = component.to_value(), color = color; "{}", message.as_ref());
}
LogLevel::Info => {
log::info!(component = component.to_value(), color = color; "{}", message.as_ref());
}
LogLevel::Warning => {
log::warn!(component = component.to_value(), color = color; "{}", message.as_ref());
}
LogLevel::Error => {
log::error!(component = component.to_value(), color = color; "{}", message.as_ref());
}
}
}
#[cfg_attr(
feature = "python",
pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
)]
#[cfg_attr(
feature = "python",
pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
)]
#[derive(Debug)]
pub struct LogGuard {
tx: std::sync::mpsc::Sender<LogEvent>,
}
impl LogGuard {
#[must_use]
pub fn new() -> Option<Self> {
let tx = LOGGER_TX.get()?;
LOGGING_GUARDS_ACTIVE
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
if count == u8::MAX {
None
} else {
Some(count + 1)
}
})
.ok()?;
Some(Self { tx: tx.clone() })
}
}
impl Drop for LogGuard {
fn drop(&mut self) {
let previous_count = LOGGING_GUARDS_ACTIVE
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
assert!(count != 0, "LogGuard reference count underflow");
Some(count - 1)
})
.expect("Failed to decrement LogGuard count");
if previous_count == 1 && LOGGING_GUARDS_ACTIVE.load(Ordering::SeqCst) == 0 {
LOGGING_BYPASSED.store(true, Ordering::SeqCst);
log::set_max_level(log::LevelFilter::Off);
let _ = self.tx.send(LogEvent::Close);
if let Ok(mut handle_guard) = LOGGER_HANDLE.lock()
&& let Some(handle) = handle_guard.take()
{
if handle.thread().id() != std::thread::current().id() {
let _ = handle.join();
}
}
LOGGING_INITIALIZED.store(false, Ordering::SeqCst);
} else {
let _ = self.tx.send(LogEvent::Flush);
}
}
}
#[cfg(test)]
mod tests {
use ahash::AHashMap;
use log::LevelFilter;
use nautilus_core::UUID4;
use nautilus_model::identifiers::TraderId;
use rstest::*;
use serde_json::Value;
use tempfile::tempdir;
use ustr::Ustr;
use super::*;
use crate::enums::LogColor;
#[rstest]
fn log_message_serialization() {
let log_message = LogLine {
timestamp: UnixNanos::default(),
level: log::Level::Info,
color: LogColor::Normal,
component: Ustr::from("Portfolio"),
message: "This is a log message".to_string(),
fields: SmallVec::new(),
};
let serialized_json = serde_json::to_string(&log_message).unwrap();
let deserialized_value: Value = serde_json::from_str(&serialized_json).unwrap();
assert_eq!(deserialized_value["level"], "INFO");
assert_eq!(deserialized_value["component"], "Portfolio");
assert_eq!(deserialized_value["message"], "This is a log message");
}
#[rstest]
fn log_config_parsing() {
let config =
LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Error")
.unwrap();
assert_eq!(
config,
LoggerConfig {
stdout_level: LevelFilter::Info,
fileout_level: LevelFilter::Debug,
component_level: AHashMap::from_iter(vec![(
Ustr::from("RiskEngine"),
LevelFilter::Error
)]),
module_level: AHashMap::new(),
log_components_only: false,
is_colored: true,
print_config: false,
use_tracing: false,
..Default::default()
}
);
}
#[rstest]
fn log_config_parsing2() {
let config = LoggerConfig::from_spec("stdout=Warn;print_config;fileout=Error;").unwrap();
assert_eq!(
config,
LoggerConfig {
stdout_level: LevelFilter::Warn,
fileout_level: LevelFilter::Error,
component_level: AHashMap::new(),
module_level: AHashMap::new(),
log_components_only: false,
is_colored: true,
print_config: true,
use_tracing: false,
..Default::default()
}
);
}
#[rstest]
fn log_config_parsing_with_log_components_only() {
let config =
LoggerConfig::from_spec("stdout=Info;log_components_only;RiskEngine=Debug").unwrap();
assert_eq!(
config,
LoggerConfig {
stdout_level: LevelFilter::Info,
fileout_level: LevelFilter::Off,
component_level: AHashMap::from_iter(vec![(
Ustr::from("RiskEngine"),
LevelFilter::Debug
)]),
module_level: AHashMap::new(),
log_components_only: true,
is_colored: true,
print_config: false,
use_tracing: false,
..Default::default()
}
);
}
#[rstest]
fn test_log_line_wrapper_plain_string() {
let line = LogLine {
timestamp: 1_650_000_000_000_000_000.into(),
level: log::Level::Info,
color: LogColor::Normal,
component: Ustr::from("TestComponent"),
message: "Test message".to_string(),
fields: SmallVec::new(),
};
let mut wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-001"));
let result = wrapper.get_string();
assert!(result.contains("TRADER-001"));
assert!(result.contains("TestComponent"));
assert!(result.contains("Test message"));
assert!(result.contains("[INFO]"));
assert!(result.ends_with('\n'));
assert!(!result.contains("\x1b["));
}
#[rstest]
fn test_log_line_wrapper_colored_string() {
let line = LogLine {
timestamp: 1_650_000_000_000_000_000.into(),
level: log::Level::Info,
color: LogColor::Green,
component: Ustr::from("TestComponent"),
message: "Test message".to_string(),
fields: SmallVec::new(),
};
let mut wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-001"));
let result = wrapper.get_colored();
assert!(result.contains("TRADER-001"));
assert!(result.contains("TestComponent"));
assert!(result.contains("Test message"));
assert!(result.contains("\x1b["));
assert!(result.ends_with('\n'));
}
#[rstest]
fn test_log_line_wrapper_json_output() {
let line = LogLine {
timestamp: 1_650_000_000_000_000_000.into(),
level: log::Level::Warn,
color: LogColor::Yellow,
component: Ustr::from("RiskEngine"),
message: "Warning message".to_string(),
fields: SmallVec::new(),
};
let wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-002"));
let json = wrapper.get_json();
let parsed: Value = serde_json::from_str(json.trim()).unwrap();
assert_eq!(parsed["trader_id"], "TRADER-002");
assert_eq!(parsed["component"], "RiskEngine");
assert_eq!(parsed["message"], "Warning message");
assert_eq!(parsed["level"], "WARN");
assert_eq!(parsed["color"], "YELLOW");
}
#[rstest]
fn test_log_line_wrapper_caches_string() {
let line = LogLine {
timestamp: 1_650_000_000_000_000_000.into(),
level: log::Level::Info,
color: LogColor::Normal,
component: Ustr::from("Test"),
message: "Cached".to_string(),
fields: SmallVec::new(),
};
let mut wrapper = LogLineWrapper::new(line, Ustr::from("TRADER"));
let first = wrapper.get_string().to_string();
let second = wrapper.get_string().to_string();
assert_eq!(first, second);
}
#[rstest]
fn test_log_line_display() {
let line = LogLine {
timestamp: 0.into(),
level: log::Level::Error,
color: LogColor::Red,
component: Ustr::from("Component"),
message: "Error occurred".to_string(),
fields: SmallVec::new(),
};
let display = format!("{line}");
assert_eq!(display, "[ERROR] Component: Error occurred");
}
#[rstest]
fn test_log_line_display_with_fields() {
let line = LogLine {
timestamp: 0.into(),
level: log::Level::Info,
color: LogColor::Normal,
component: Ustr::from("RiskEngine"),
message: "Order filled".to_string(),
fields: smallvec::smallvec![
(Ustr::from("venue"), "BINANCE".to_string()),
(Ustr::from("order_id"), "O-001".to_string()),
],
};
let display = format!("{line}");
assert_eq!(
display,
"[INFO] RiskEngine: Order filled venue=BINANCE order_id=O-001"
);
}
#[rstest]
fn test_log_line_wrapper_plain_string_with_fields() {
let line = LogLine {
timestamp: 1_650_000_000_000_000_000.into(),
level: log::Level::Info,
color: LogColor::Normal,
component: Ustr::from("DataEngine"),
message: "Connected".to_string(),
fields: smallvec::smallvec![
(Ustr::from("venue"), "BINANCE".to_string()),
(Ustr::from("product_type"), "SPOT".to_string()),
],
};
let mut wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-001"));
let result = wrapper.get_string();
assert!(result.contains("Connected"));
assert!(result.contains("venue=BINANCE"));
assert!(result.contains("product_type=SPOT"));
assert!(result.ends_with('\n'));
assert!(!result.contains("\x1b["));
}
#[rstest]
fn test_log_line_wrapper_json_with_fields() {
let line = LogLine {
timestamp: 1_650_000_000_000_000_000.into(),
level: log::Level::Info,
color: LogColor::Normal,
component: Ustr::from("RiskEngine"),
message: "Order filled".to_string(),
fields: smallvec::smallvec![
(Ustr::from("strategy_id"), "S-001".to_string()),
(Ustr::from("venue"), "BINANCE".to_string()),
],
};
let wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-001"));
let json = wrapper.get_json();
let parsed: Value = serde_json::from_str(json.trim()).unwrap();
assert_eq!(parsed["component"], "RiskEngine");
assert_eq!(parsed["message"], "Order filled");
assert_eq!(parsed["strategy_id"], "S-001");
assert_eq!(parsed["venue"], "BINANCE");
}
#[rstest]
fn test_log_line_wrapper_json_no_fields_has_no_extra_keys() {
let line = LogLine {
timestamp: 1_650_000_000_000_000_000.into(),
level: log::Level::Info,
color: LogColor::Normal,
component: Ustr::from("Test"),
message: "Simple".to_string(),
fields: SmallVec::new(),
};
let wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-001"));
let json = wrapper.get_json();
let parsed: Value = serde_json::from_str(json.trim()).unwrap();
let obj = parsed.as_object().unwrap();
assert_eq!(obj.len(), 6); }
#[rstest]
fn test_log_line_wrapper_json_reserved_keys_not_overwritten() {
let line = LogLine {
timestamp: 1_650_000_000_000_000_000.into(),
level: log::Level::Warn,
color: LogColor::Normal,
component: Ustr::from("Test"),
message: "Real message".to_string(),
fields: smallvec::smallvec![
(Ustr::from("level"), "FAKE".to_string()),
(Ustr::from("message"), "injected".to_string()),
(Ustr::from("timestamp"), "bogus".to_string()),
(Ustr::from("venue"), "BINANCE".to_string()),
],
};
let wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-001"));
let json = wrapper.get_json();
let parsed: Value = serde_json::from_str(json.trim()).unwrap();
assert_eq!(parsed["level"], "WARN");
assert_eq!(parsed["message"], "Real message");
assert_ne!(parsed["timestamp"], "bogus");
assert_eq!(parsed["venue"], "BINANCE");
}
#[rstest]
fn test_log_line_wrapper_json_duplicate_extra_fields_last_value_wins() {
let line = LogLine {
timestamp: 1_650_000_000_000_000_000.into(),
level: log::Level::Info,
color: LogColor::Normal,
component: Ustr::from("Test"),
message: "Duplicate field".to_string(),
fields: smallvec::smallvec![
(Ustr::from("venue"), "BINANCE".to_string()),
(Ustr::from("venue"), "OKX".to_string()),
],
};
let wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-001"));
let json = wrapper.get_json();
let parsed: Value = serde_json::from_str(json.trim()).unwrap();
assert_eq!(json.matches("\"venue\"").count(), 1);
assert_eq!(parsed["venue"], "OKX");
}
fn sorted_module_filters(map: AHashMap<Ustr, LevelFilter>) -> Vec<(Ustr, LevelFilter)> {
let mut v: Vec<_> = map.into_iter().collect();
v.sort_by_key(|b| std::cmp::Reverse(b.0.len()));
v
}
#[rstest]
fn test_filter_no_filters_passes_all() {
let module_filters = vec![];
let component_level = AHashMap::new();
assert!(!should_filter_log(
&Ustr::from("anything"),
Level::Trace,
&module_filters,
&component_level,
false
));
}
#[rstest]
fn test_filter_component_exact_match() {
let module_filters = vec![];
let component_level = AHashMap::from_iter([(Ustr::from("RiskEngine"), LevelFilter::Error)]);
assert!(should_filter_log(
&Ustr::from("RiskEngine"),
Level::Info,
&module_filters,
&component_level,
false
));
assert!(!should_filter_log(
&Ustr::from("RiskEngine"),
Level::Error,
&module_filters,
&component_level,
false
));
assert!(!should_filter_log(
&Ustr::from("Portfolio"),
Level::Info,
&module_filters,
&component_level,
false
));
}
#[rstest]
fn test_filter_module_prefix_match() {
let module_filters = vec![(Ustr::from("nautilus_okx::websocket"), LevelFilter::Debug)];
let component_level = AHashMap::new();
assert!(!should_filter_log(
&Ustr::from("nautilus_okx::websocket"),
Level::Debug,
&module_filters,
&component_level,
false
));
assert!(!should_filter_log(
&Ustr::from("nautilus_okx::websocket::handler"),
Level::Debug,
&module_filters,
&component_level,
false
));
assert!(should_filter_log(
&Ustr::from("nautilus_okx::websocket::handler"),
Level::Trace,
&module_filters,
&component_level,
false
));
assert!(!should_filter_log(
&Ustr::from("nautilus_binance::data"),
Level::Trace,
&module_filters,
&component_level,
false
));
}
#[rstest]
fn test_filter_longest_prefix_wins() {
let module_filters = sorted_module_filters(AHashMap::from_iter([
(Ustr::from("nautilus_okx"), LevelFilter::Error),
(Ustr::from("nautilus_okx::websocket"), LevelFilter::Debug),
]));
let component_level = AHashMap::new();
assert!(!should_filter_log(
&Ustr::from("nautilus_okx::websocket::handler"),
Level::Debug,
&module_filters,
&component_level,
false
));
assert!(should_filter_log(
&Ustr::from("nautilus_okx::data"),
Level::Debug,
&module_filters,
&component_level,
false
));
assert!(!should_filter_log(
&Ustr::from("nautilus_okx::data"),
Level::Error,
&module_filters,
&component_level,
false
));
}
#[rstest]
fn test_filter_module_precedence_over_component() {
let module_filters = vec![(Ustr::from("nautilus_okx::websocket"), LevelFilter::Debug)];
let component_level =
AHashMap::from_iter([(Ustr::from("nautilus_okx::websocket"), LevelFilter::Error)]);
assert!(!should_filter_log(
&Ustr::from("nautilus_okx::websocket"),
Level::Debug,
&module_filters,
&component_level,
false
));
}
#[rstest]
fn test_filter_log_components_only_blocks_unknown() {
let module_filters = vec![];
let component_level = AHashMap::from_iter([(Ustr::from("RiskEngine"), LevelFilter::Debug)]);
assert!(should_filter_log(
&Ustr::from("Portfolio"),
Level::Info,
&module_filters,
&component_level,
true
));
assert!(!should_filter_log(
&Ustr::from("RiskEngine"),
Level::Info,
&module_filters,
&component_level,
true
));
}
#[rstest]
fn test_filter_log_components_only_with_module() {
let module_filters = vec![(Ustr::from("nautilus_okx"), LevelFilter::Debug)];
let component_level = AHashMap::new();
assert!(!should_filter_log(
&Ustr::from("nautilus_okx::websocket"),
Level::Debug,
&module_filters,
&component_level,
true
));
assert!(should_filter_log(
&Ustr::from("nautilus_binance::data"),
Level::Debug,
&module_filters,
&component_level,
true
));
}
#[rstest]
fn test_filter_level_comparison() {
let module_filters = vec![];
let component_level = AHashMap::from_iter([(Ustr::from("Test"), LevelFilter::Warn)]);
assert!(!should_filter_log(
&Ustr::from("Test"),
Level::Error,
&module_filters,
&component_level,
false
));
assert!(!should_filter_log(
&Ustr::from("Test"),
Level::Warn,
&module_filters,
&component_level,
false
));
assert!(should_filter_log(
&Ustr::from("Test"),
Level::Info,
&module_filters,
&component_level,
false
));
assert!(should_filter_log(
&Ustr::from("Test"),
Level::Debug,
&module_filters,
&component_level,
false
));
assert!(should_filter_log(
&Ustr::from("Test"),
Level::Trace,
&module_filters,
&component_level,
false
));
}
#[cfg(not(all(feature = "simulation", madsim)))]
mod serial_tests {
use std::{sync::atomic::Ordering, time::Duration};
use super::*;
use crate::{
logging::{
LOGGING_BYPASSED, logging_clock_set_static_mode, logging_clock_set_static_time,
logging_is_initialized, logging_set_bypass, logging_sync_to_disk,
},
testing::wait_until,
};
#[rstest]
fn test_shutdown_on_error_records_once_then_rearms() {
disarm_shutdown_on_error();
let (tx, _rx) = std::sync::mpsc::channel();
let logger = Logger::new_for_benchmark(LoggerConfig::default(), tx);
arm_shutdown_on_error(false);
let args = format_args!("Disabled error");
let record = log::Record::builder()
.args(args)
.level(Level::Error)
.target("RunComponent")
.build();
log::Log::log(&logger, &record);
assert_eq!(take_shutdown_on_error_trigger(), None);
arm_shutdown_on_error(true);
let args = format_args!("First error");
let record = log::Record::builder()
.args(args)
.level(Level::Error)
.target("RunComponent")
.build();
log::Log::log(&logger, &record);
let args = format_args!("Second error");
let record = log::Record::builder()
.args(args)
.level(Level::Error)
.target("RunComponent")
.build();
log::Log::log(&logger, &record);
let first = take_shutdown_on_error_trigger().unwrap();
assert_eq!(first.component, Ustr::from("RunComponent"));
assert_eq!(first.message, "First error");
assert_eq!(take_shutdown_on_error_trigger(), None);
arm_shutdown_on_error(true);
let args = format_args!("Third error");
let record = log::Record::builder()
.args(args)
.level(Level::Error)
.target("RunComponent")
.build();
log::Log::log(&logger, &record);
let third = take_shutdown_on_error_trigger().unwrap();
assert_eq!(third.component, Ustr::from("RunComponent"));
assert_eq!(third.message, "Third error");
let (tx, rx) = std::sync::mpsc::channel();
let logger = Logger::new_for_benchmark(
LoggerConfig {
log_components_only: true,
..Default::default()
},
tx,
);
arm_shutdown_on_error(true);
let args = format_args!("Filtered error");
let record = log::Record::builder()
.args(args)
.level(Level::Error)
.target("FilteredComponent")
.build();
log::Log::log(&logger, &record);
let trigger = take_shutdown_on_error_trigger().unwrap();
assert_eq!(trigger.component, Ustr::from("FilteredComponent"));
assert_eq!(trigger.message, "Filtered error");
assert!(matches!(
rx.try_recv(),
Err(std::sync::mpsc::TryRecvError::Empty)
));
disarm_shutdown_on_error();
}
#[rstest]
fn test_shutdown_on_error_records_bypassed_error() {
LOGGING_BYPASSED.store(false, Ordering::Relaxed);
disarm_shutdown_on_error();
let (tx, rx) = std::sync::mpsc::channel();
let logger = Logger::new_for_benchmark(LoggerConfig::default(), tx);
let metadata = log::Metadata::builder()
.level(Level::Error)
.target("BypassedComponent")
.build();
logging_set_bypass();
arm_shutdown_on_error(false);
assert!(!log::Log::enabled(&logger, &metadata));
arm_shutdown_on_error(true);
assert!(log::Log::enabled(&logger, &metadata));
let args = format_args!("Bypassed error");
let record = log::Record::builder()
.args(args)
.level(Level::Error)
.target("BypassedComponent")
.build();
log::Log::log(&logger, &record);
let trigger = take_shutdown_on_error_trigger().unwrap();
assert_eq!(trigger.component, Ustr::from("BypassedComponent"));
assert_eq!(trigger.message, "Bypassed error");
assert!(matches!(
rx.try_recv(),
Err(std::sync::mpsc::TryRecvError::Empty)
));
LOGGING_BYPASSED.store(false, Ordering::Relaxed);
disarm_shutdown_on_error();
}
#[rstest]
fn test_shutdown_on_error_failed_drain_keeps_trigger_pending() {
LOGGING_BYPASSED.store(false, Ordering::Relaxed);
disarm_shutdown_on_error();
let (tx, _rx) = std::sync::mpsc::channel();
let logger = Logger::new_for_benchmark(LoggerConfig::default(), tx);
arm_shutdown_on_error(true);
let args = format_args!("Pending error");
let record = log::Record::builder()
.args(args)
.level(Level::Error)
.target("PendingComponent")
.build();
log::Log::log(&logger, &record);
let drained = try_drain_shutdown_on_error_trigger(|trigger| {
assert_eq!(trigger.component, Ustr::from("PendingComponent"));
assert_eq!(trigger.message, "Pending error");
false
});
assert!(!drained);
let trigger = take_shutdown_on_error_trigger().unwrap();
assert_eq!(trigger.component, Ustr::from("PendingComponent"));
assert_eq!(trigger.message, "Pending error");
disarm_shutdown_on_error();
}
#[rstest]
fn test_logging_to_file() {
let config = LoggerConfig {
fileout_level: LevelFilter::Debug,
..Default::default()
};
let temp_dir = tempdir().expect("Failed to create temporary directory");
let file_config = FileWriterConfig {
directory: Some(temp_dir.path().to_str().unwrap().to_string()),
..Default::default()
};
let log_guard = Logger::init_with_config(
TraderId::from("TRADER-001"),
UUID4::new(),
config,
file_config,
);
logging_clock_set_static_mode();
logging_clock_set_static_time(1_650_000_000_000_000);
log::info!(
component = "RiskEngine";
"This is a test"
);
let mut log_contents = String::new();
wait_until(
|| {
std::fs::read_dir(&temp_dir)
.expect("Failed to read directory")
.filter_map(Result::ok)
.any(|entry| entry.path().is_file())
},
Duration::from_secs(3),
);
drop(log_guard);
wait_until(
|| {
let log_file_path = std::fs::read_dir(&temp_dir)
.expect("Failed to read directory")
.filter_map(Result::ok)
.find(|entry| entry.path().is_file())
.expect("No files found in directory")
.path();
log_contents = std::fs::read_to_string(log_file_path)
.expect("Error while reading log file");
!log_contents.is_empty()
},
Duration::from_secs(3),
);
assert_eq!(
log_contents,
"1970-01-20T02:20:00.000000000Z [INFO] TRADER-001.RiskEngine: This is a test\n"
);
}
#[rstest]
fn test_logging_sync_to_disk_flushes_fast_flush_policy() {
let config = LoggerConfig {
fileout_level: LevelFilter::Debug,
fileout_sync_on_flush: false,
..Default::default()
};
let temp_dir = tempdir().expect("Failed to create temporary directory");
let file_config = FileWriterConfig {
directory: Some(temp_dir.path().to_str().unwrap().to_string()),
..Default::default()
};
let log_guard = Logger::init_with_config(
TraderId::from("TRADER-SYNC"),
UUID4::new(),
config,
file_config,
)
.expect("Failed to initialize logger");
logging_clock_set_static_mode();
logging_clock_set_static_time(1_650_000_000_000_000);
log::info!(
component = "RiskEngine";
"sync me"
);
logging_sync_to_disk().expect("sync-to-disk should succeed");
let log_file_path = std::fs::read_dir(&temp_dir)
.expect("Failed to read directory")
.filter_map(Result::ok)
.find(|entry| entry.path().is_file())
.expect("No files found in directory")
.path();
let log_contents =
std::fs::read_to_string(log_file_path).expect("Error while reading log file");
assert!(log_contents.contains("sync me"));
drop(log_guard);
}
#[rstest]
fn test_shutdown_drains_backlog_tail() {
const N: usize = 1000;
let config = LoggerConfig {
stdout_level: LevelFilter::Off,
fileout_level: LevelFilter::Info,
..Default::default()
};
let temp_dir = tempdir().expect("Failed to create temporary directory");
let file_config = FileWriterConfig {
directory: Some(temp_dir.path().to_str().unwrap().to_string()),
..Default::default()
};
let log_guard = Logger::init_with_config(
TraderId::from("TRADER-TAIL"),
UUID4::new(),
config,
file_config,
)
.expect("Failed to initialize logger");
logging_clock_set_static_mode();
logging_clock_set_static_time(1_700_000_000_000_000);
for i in 0..N {
log::info!(component = "TailDrain"; "BacklogTest {i}");
}
drop(log_guard);
let mut count = 0usize;
wait_until(
|| {
if let Some(log_file) = std::fs::read_dir(&temp_dir)
.expect("Failed to read directory")
.filter_map(Result::ok)
.find(|entry| entry.path().is_file())
{
let log_file_path = log_file.path();
if let Ok(contents) = std::fs::read_to_string(log_file_path) {
count = contents
.lines()
.filter(|l| l.contains("BacklogTest "))
.count();
count >= N
} else {
false
}
} else {
false
}
},
Duration::from_secs(5),
);
assert_eq!(count, N, "Expected all pre-shutdown messages to be written");
}
#[rstest]
fn test_log_component_level_filtering() {
let config =
LoggerConfig::from_spec("stdout=Info;fileout=Debug;RiskEngine=Error").unwrap();
let temp_dir = tempdir().expect("Failed to create temporary directory");
let file_config = FileWriterConfig {
directory: Some(temp_dir.path().to_str().unwrap().to_string()),
..Default::default()
};
let log_guard = Logger::init_with_config(
TraderId::from("TRADER-001"),
UUID4::new(),
config,
file_config,
);
logging_clock_set_static_mode();
logging_clock_set_static_time(1_650_000_000_000_000);
log::info!(
component = "RiskEngine";
"This is a test"
);
drop(log_guard);
wait_until(
|| {
if let Some(log_file) = std::fs::read_dir(&temp_dir)
.expect("Failed to read directory")
.filter_map(Result::ok)
.find(|entry| entry.path().is_file())
{
let log_file_path = log_file.path();
let log_contents = std::fs::read_to_string(log_file_path)
.expect("Error while reading log file");
!log_contents.contains("RiskEngine")
} else {
false
}
},
Duration::from_secs(3),
);
assert!(
std::fs::read_dir(&temp_dir)
.expect("Failed to read directory")
.filter_map(Result::ok)
.any(|entry| entry.path().is_file()),
"Log file exists"
);
}
#[rstest]
fn test_logging_to_file_in_json_format() {
let config =
LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Info")
.unwrap();
let temp_dir = tempdir().expect("Failed to create temporary directory");
let file_config = FileWriterConfig {
directory: Some(temp_dir.path().to_str().unwrap().to_string()),
file_format: Some("json".to_string()),
..Default::default()
};
let log_guard = Logger::init_with_config(
TraderId::from("TRADER-001"),
UUID4::new(),
config,
file_config,
);
logging_clock_set_static_mode();
logging_clock_set_static_time(1_650_000_000_000_000);
log::info!(
component = "RiskEngine";
"This is a test"
);
let mut log_contents = String::new();
drop(log_guard);
wait_until(
|| {
if let Some(log_file) = std::fs::read_dir(&temp_dir)
.expect("Failed to read directory")
.filter_map(Result::ok)
.find(|entry| entry.path().is_file())
{
let log_file_path = log_file.path();
log_contents = std::fs::read_to_string(log_file_path)
.expect("Error while reading log file");
!log_contents.is_empty()
} else {
false
}
},
Duration::from_secs(3),
);
assert_eq!(
log_contents,
"{\"timestamp\":\"1970-01-20T02:20:00.000000000Z\",\"trader_id\":\"TRADER-001\",\"level\":\"INFO\",\"color\":\"NORMAL\",\"component\":\"RiskEngine\",\"message\":\"This is a test\"}\n"
);
}
#[rstest]
fn test_init_sets_logging_is_initialized_flag() {
let config = LoggerConfig::default();
let file_config = FileWriterConfig::default();
let guard = Logger::init_with_config(
TraderId::from("TRADER-001"),
UUID4::new(),
config,
file_config,
);
assert!(guard.is_ok());
assert!(logging_is_initialized());
drop(guard);
assert!(!logging_is_initialized());
}
#[rstest]
fn test_init_returns_error_when_log_guard_limit_reached() {
let guard = Logger::init_with_config(
TraderId::from("TRADER-001"),
UUID4::new(),
LoggerConfig::default(),
FileWriterConfig::default(),
)
.expect("Failed to initialize logger");
LOGGING_GUARDS_ACTIVE.store(u8::MAX, Ordering::SeqCst);
let result = Logger::init_with_config(
TraderId::from("TRADER-001"),
UUID4::new(),
LoggerConfig::default(),
FileWriterConfig::default(),
);
LOGGING_GUARDS_ACTIVE.store(1, Ordering::SeqCst);
drop(guard);
assert_eq!(
result.unwrap_err().to_string(),
"Logging already initialized but new guard could not be created"
);
}
#[rstest]
fn test_reinit_after_guard_drop_fails() {
let config = LoggerConfig::default();
let file_config = FileWriterConfig::default();
let guard1 = Logger::init_with_config(
TraderId::from("TRADER-001"),
UUID4::new(),
config.clone(),
file_config.clone(),
);
assert!(guard1.is_ok());
drop(guard1);
let guard2 = Logger::init_with_config(
TraderId::from("TRADER-002"),
UUID4::new(),
config,
file_config,
);
assert!(guard2.is_err());
}
#[rstest]
fn test_bypass_before_init_prevents_logging() {
logging_set_bypass();
assert!(LOGGING_BYPASSED.load(Ordering::Relaxed));
let temp_dir = tempdir().expect("Failed to create temporary directory");
let config = LoggerConfig {
fileout_level: LevelFilter::Debug,
..Default::default()
};
let file_config = FileWriterConfig {
directory: Some(temp_dir.path().to_str().unwrap().to_string()),
..Default::default()
};
let guard = Logger::init_with_config(
TraderId::from("TRADER-001"),
UUID4::new(),
config,
file_config,
);
assert!(guard.is_ok());
log::info!(
component = "TestComponent";
"This should be bypassed"
);
std::thread::sleep(Duration::from_millis(100));
drop(guard);
assert!(LOGGING_BYPASSED.load(Ordering::Relaxed));
}
#[rstest]
fn test_module_level_filtering() {
let config = LoggerConfig::from_spec(
"stdout=Off;fileout=Trace;nautilus::adapters=Warn;nautilus::adapters::okx=Debug",
)
.unwrap();
let temp_dir = tempdir().expect("Failed to create temporary directory");
let file_config = FileWriterConfig {
directory: Some(temp_dir.path().to_str().unwrap().to_string()),
..Default::default()
};
let log_guard = Logger::init_with_config(
TraderId::from("TRADER-MOD"),
UUID4::new(),
config,
file_config,
)
.expect("Failed to initialize logger");
logging_clock_set_static_mode();
logging_clock_set_static_time(1_650_000_000_000_000);
log::debug!(
component = "nautilus::adapters::okx::websocket";
"OKX debug message"
);
log::info!(
component = "nautilus::adapters::okx";
"OKX info message"
);
log::info!(
component = "nautilus::adapters::binance";
"Binance info message SHOULD NOT APPEAR"
);
log::warn!(
component = "nautilus::adapters::binance";
"Binance warn message"
);
log::trace!(
component = "Portfolio";
"Portfolio trace message"
);
drop(log_guard);
wait_until(
|| {
std::fs::read_dir(&temp_dir)
.expect("Failed to read directory")
.filter_map(Result::ok)
.any(|entry| entry.path().is_file())
},
Duration::from_secs(3),
);
let log_file_path = std::fs::read_dir(&temp_dir)
.expect("Failed to read directory")
.filter_map(Result::ok)
.find(|entry| entry.path().is_file())
.expect("No log file found")
.path();
let log_contents =
std::fs::read_to_string(log_file_path).expect("Error reading log file");
assert!(
log_contents.contains("OKX debug message"),
"OKX debug should pass (longer prefix wins)"
);
assert!(
log_contents.contains("OKX info message"),
"OKX info should pass"
);
assert!(
log_contents.contains("Binance warn message"),
"Binance warn should pass"
);
assert!(
log_contents.contains("Portfolio trace message"),
"Unfiltered component should pass"
);
assert!(
!log_contents.contains("SHOULD NOT APPEAR"),
"Binance info should be filtered (adapters=Warn)"
);
}
#[rstest]
fn test_logging_to_file_with_kv_fields() {
let config = LoggerConfig {
fileout_level: LevelFilter::Debug,
..Default::default()
};
let temp_dir = tempdir().expect("Failed to create temporary directory");
let file_config = FileWriterConfig {
directory: Some(temp_dir.path().to_str().unwrap().to_string()),
..Default::default()
};
let log_guard = Logger::init_with_config(
TraderId::from("TRADER-001"),
UUID4::new(),
config,
file_config,
);
logging_clock_set_static_mode();
logging_clock_set_static_time(1_650_000_000_000_000);
log::info!(
component = "DataEngine",
venue = "BINANCE",
product_type = "SPOT";
"WebSocket connected"
);
let mut log_contents = String::new();
drop(log_guard);
wait_until(
|| {
if let Some(log_file) = std::fs::read_dir(&temp_dir)
.expect("Failed to read directory")
.filter_map(Result::ok)
.find(|entry| entry.path().is_file())
{
log_contents = std::fs::read_to_string(log_file.path())
.expect("Error while reading log file");
!log_contents.is_empty()
} else {
false
}
},
Duration::from_secs(3),
);
assert!(
log_contents.contains("WebSocket connected"),
"Message should be present"
);
assert!(
log_contents.contains("venue=BINANCE"),
"venue field should appear in output, was:\n{log_contents}"
);
assert!(
log_contents.contains("product_type=SPOT"),
"product_type field should appear in output, was:\n{log_contents}"
);
}
#[rstest]
fn test_logging_to_file_json_with_kv_fields() {
let config =
LoggerConfig::from_spec("stdout=Off;fileout=Debug;DataEngine=Debug").unwrap();
let temp_dir = tempdir().expect("Failed to create temporary directory");
let file_config = FileWriterConfig {
directory: Some(temp_dir.path().to_str().unwrap().to_string()),
file_format: Some("json".to_string()),
..Default::default()
};
let log_guard = Logger::init_with_config(
TraderId::from("TRADER-001"),
UUID4::new(),
config,
file_config,
);
logging_clock_set_static_mode();
logging_clock_set_static_time(1_650_000_000_000_000);
log::info!(
component = "DataEngine",
venue = "BINANCE",
order_id = "O-12345";
"Order filled"
);
let mut log_contents = String::new();
drop(log_guard);
wait_until(
|| {
if let Some(log_file) = std::fs::read_dir(&temp_dir)
.expect("Failed to read directory")
.filter_map(Result::ok)
.find(|entry| entry.path().is_file())
{
log_contents = std::fs::read_to_string(log_file.path())
.expect("Error while reading log file");
!log_contents.is_empty()
} else {
false
}
},
Duration::from_secs(3),
);
let parsed: serde_json::Value =
serde_json::from_str(log_contents.trim()).expect("Should be valid JSON");
assert_eq!(parsed["component"], "DataEngine");
assert_eq!(parsed["message"], "Order filled");
assert_eq!(parsed["venue"], "BINANCE");
assert_eq!(parsed["order_id"], "O-12345");
}
}
#[cfg(all(feature = "simulation", madsim))]
mod sim_tests {
use std::sync::atomic::Ordering;
use super::*;
use crate::logging::LOGGING_BYPASSED;
#[rstest]
fn test_init_under_madsim_skips_writer_thread_and_forces_bypass() {
let config = LoggerConfig {
bypass_logging: false,
..Default::default()
};
let temp_dir = tempdir().expect("Failed to create temporary directory");
let file_config = FileWriterConfig {
directory: Some(temp_dir.path().to_str().unwrap().to_string()),
..Default::default()
};
let _guard = Logger::init_with_config(
TraderId::from("TRADER-SIM"),
UUID4::new(),
config,
file_config,
)
.expect("init should succeed under simulation");
assert!(LOGGING_INITIALIZED.load(Ordering::SeqCst));
assert!(
LOGGING_BYPASSED.load(Ordering::SeqCst),
"bypass must be forced under cfg(madsim) even when config disables it"
);
assert!(
LOGGER_HANDLE
.lock()
.expect("LOGGER_HANDLE mutex should not be poisoned")
.is_none(),
"writer thread must not be spawned under cfg(madsim)"
);
}
}
}