nautilus-common 0.56.0

Common functionality and machinery for the Nautilus trading engine
Documentation
// -------------------------------------------------------------------------------------------------
//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
//  https://nautechsystems.io
//
//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
//  You may not use this file except in compliance with the License.
//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
//
//  Unless required by applicable law or agreed to in writing, software
//  distributed under the License is distributed on an "AS IS" BASIS,
//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//  See the License for the specific language governing permissions and
//  limitations under the License.
// -------------------------------------------------------------------------------------------------

use ahash::AHashMap;
use log::LevelFilter;
use nautilus_core::{UUID4, python::to_pyvalue_err};
use nautilus_model::identifiers::TraderId;
use pyo3::prelude::*;
use ustr::Ustr;

use crate::{
    enums::{LogColor, LogLevel},
    logging::{
        self, headers,
        logger::{self, LogGuard, LoggerConfig},
        logging_clock_set_realtime_mode, logging_clock_set_static_mode,
        logging_clock_set_static_time, logging_set_bypass, map_log_level_to_filter,
        parse_level_filter_str,
        writer::FileWriterConfig,
    },
};

#[pymethods]
#[pyo3_stub_gen::derive::gen_stub_pymethods]
impl LoggerConfig {
    /// Parses a configuration from a spec string.
    ///
    /// # Format
    ///
    /// Semicolon-separated key-value pairs or bare flags:
    /// ```text
    /// stdout=Info;fileout=Debug;RiskEngine=Error;my_crate::module=Debug;is_colored
    /// ```
    ///
    /// # Errors
    ///
    /// Returns an error if the spec string contains invalid syntax or log levels.
    #[staticmethod]
    #[pyo3(name = "from_spec")]
    pub fn py_from_spec(spec: &str) -> PyResult<Self> {
        Self::from_spec(spec).map_err(to_pyvalue_err)
    }
}

#[pymethods]
#[pyo3_stub_gen::derive::gen_stub_pymethods]
impl FileWriterConfig {
    /// Creates a new `FileWriterConfig` instance.
    #[new]
    #[pyo3(signature = (directory=None, file_name=None, file_format=None, file_rotate=None))]
    #[must_use]
    pub fn py_new(
        directory: Option<String>,
        file_name: Option<String>,
        file_format: Option<String>,
        file_rotate: Option<(u64, u32)>,
    ) -> Self {
        Self::new(directory, file_name, file_format, file_rotate)
    }
}

/// Initialize logging.
///
/// Logging should be used for Python and sync Rust logic which is most of
/// the components in the [nautilus_trader](https://pypi.org/project/nautilus_trader) package.
/// Logging can be configured to filter components and write up to a specific level only
/// by passing a configuration using the `NAUTILUS_LOG` environment variable.
///
/// Should only be called once during an applications run, ideally at the
/// beginning of the run.
#[pyfunction]
#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.common")]
#[pyo3(name = "init_logging")]
#[expect(clippy::too_many_arguments)]
#[pyo3(signature = (trader_id, instance_id, level_stdout, level_file=None, component_levels=None, directory=None, file_name=None, file_format=None, file_rotate=None, is_colored=None, is_bypassed=None, print_config=None, log_components_only=None))]
pub fn py_init_logging(
    trader_id: TraderId,
    instance_id: UUID4,
    level_stdout: LogLevel,
    level_file: Option<LogLevel>,
    component_levels: Option<std::collections::HashMap<String, String>>,
    directory: Option<String>,
    file_name: Option<String>,
    file_format: Option<String>,
    file_rotate: Option<(u64, u32)>,
    is_colored: Option<bool>,
    is_bypassed: Option<bool>,
    print_config: Option<bool>,
    log_components_only: Option<bool>,
) -> PyResult<LogGuard> {
    let level_file = level_file.map_or(LevelFilter::Off, map_log_level_to_filter);

    let component_levels = parse_component_levels(component_levels).map_err(to_pyvalue_err)?;

    let file_config = FileWriterConfig::new(directory, file_name, file_format, file_rotate);

    let config = LoggerConfig::new(
        map_log_level_to_filter(level_stdout),
        level_file,
        component_levels,
        AHashMap::new(), // module_level - not exposed to Python
        log_components_only.unwrap_or(false),
        is_colored.unwrap_or(true),
        print_config.unwrap_or(false),
        false,                        // use_tracing - Python handles this separately in kernel
        is_bypassed.unwrap_or(false), // bypass_logging
        None,                         // file_config - passed separately to init_logging
        false,                        // clear_log_file
    );

    if config.bypass_logging {
        logging_set_bypass();
    }

    logging::init_logging(trader_id, instance_id, config, file_config).map_err(to_pyvalue_err)
}

#[pyfunction()]
#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.common")]
#[pyo3(name = "logger_flush")]
pub fn py_logger_flush() {
    log::logger().flush();
}

fn parse_component_levels(
    original_map: Option<std::collections::HashMap<String, String>>,
) -> anyhow::Result<AHashMap<Ustr, LevelFilter>> {
    match original_map {
        Some(map) => {
            let mut new_map = AHashMap::new();

            for (key, value) in map {
                let ustr_key = Ustr::from(&key);
                let level = parse_level_filter_str(&value)?;
                new_map.insert(ustr_key, level);
            }
            Ok(new_map)
        }
        None => Ok(AHashMap::new()),
    }
}

/// Create a new log event.
#[pyfunction]
#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.common")]
#[pyo3(name = "logger_log")]
pub fn py_logger_log(level: LogLevel, color: LogColor, component: &str, message: &str) {
    logger::log(level, color, Ustr::from(component), message);
}

/// Logs the standard Nautilus system header.
#[pyfunction]
#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.common")]
#[pyo3(name = "log_header")]
pub fn py_log_header(trader_id: TraderId, machine_id: &str, instance_id: UUID4, component: &str) {
    headers::log_header(trader_id, machine_id, instance_id, Ustr::from(component));
}

/// Logs system information.
#[pyfunction]
#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.common")]
#[pyo3(name = "log_sysinfo")]
pub fn py_log_sysinfo(component: &str) {
    headers::log_sysinfo(Ustr::from(component));
}

/// Sets the global logging clock to static mode.
#[pyfunction]
#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.common")]
#[pyo3(name = "logging_clock_set_static_mode")]
pub fn py_logging_clock_set_static_mode() {
    logging_clock_set_static_mode();
}

/// Sets the global logging clock to real-time mode.
#[pyfunction]
#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.common")]
#[pyo3(name = "logging_clock_set_realtime_mode")]
pub fn py_logging_clock_set_realtime_mode() {
    logging_clock_set_realtime_mode();
}

/// Sets the global logging clock static time with the given UNIX timestamp (nanoseconds).
#[pyfunction]
#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.common")]
#[pyo3(name = "logging_clock_set_static_time")]
pub fn py_logging_clock_set_static_time(time_ns: u64) {
    logging_clock_set_static_time(time_ns);
}

/// Returns whether the tracing subscriber has been initialized.
#[cfg(feature = "tracing-bridge")]
#[pyfunction]
#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.common")]
#[pyo3(name = "tracing_is_initialized")]
#[must_use]
pub fn py_tracing_is_initialized() -> bool {
    crate::logging::bridge::tracing_is_initialized()
}

/// Initializes a tracing subscriber for external Rust crate logging.
///
/// This sets up a standard tracing subscriber that outputs to stdout with
/// the format controlled by `RUST_LOG` environment variable. The output
/// format uses nanosecond timestamps to align with Nautilus logging.
///
/// # Environment Variables
///
/// - `RUST_LOG`: Controls which modules emit tracing events and at what level.
///   - Example: `RUST_LOG=hyper=debug,tokio=warn`.
///   - Default: `warn` (if not set).
///
/// # Errors
///
/// Returns an error if the tracing subscriber has already been initialized.
#[cfg(feature = "tracing-bridge")]
#[pyfunction]
#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.common")]
#[pyo3(name = "init_tracing")]
pub fn py_init_tracing() -> PyResult<()> {
    crate::logging::bridge::init_tracing().map_err(to_pyvalue_err)
}

/// A thin wrapper around the global Rust logger which exposes ergonomic
/// logging helpers for Python code.
///
/// It mirrors the familiar Python `logging` interface while forwarding
/// all records through the Nautilus logging infrastructure so that log levels
/// and formatting remain consistent across Rust and Python.
#[pyclass(
    module = "nautilus_trader.core.nautilus_pyo3.common",
    name = "Logger",
    unsendable,
    from_py_object
)]
#[pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")]
#[derive(Debug, Clone)]
pub struct PyLogger {
    name: Ustr,
}

impl PyLogger {
    pub fn new(name: &str) -> Self {
        Self {
            name: Ustr::from(name),
        }
    }
}

#[pymethods]
#[pyo3_stub_gen::derive::gen_stub_pymethods]
impl PyLogger {
    /// Create a new `Logger` instance.
    #[new]
    #[pyo3(signature = (name="Python"))]
    fn py_new(name: &str) -> Self {
        Self::new(name)
    }

    /// The component identifier carried by this logger.
    #[getter]
    fn name(&self) -> &str {
        &self.name
    }

    /// Emit a TRACE level record.
    #[pyo3(name = "trace")]
    fn py_trace(&self, message: &str, color: Option<LogColor>) {
        self._log(LogLevel::Trace, color, message);
    }

    /// Emit a DEBUG level record.
    #[pyo3(name = "debug")]
    fn py_debug(&self, message: &str, color: Option<LogColor>) {
        self._log(LogLevel::Debug, color, message);
    }

    /// Emit an INFO level record.
    #[pyo3(name = "info")]
    fn py_info(&self, message: &str, color: Option<LogColor>) {
        self._log(LogLevel::Info, color, message);
    }

    /// Emit a WARNING level record.
    #[pyo3(name = "warning")]
    fn py_warning(&self, message: &str, color: Option<LogColor>) {
        self._log(LogLevel::Warning, color, message);
    }

    /// Emit an ERROR level record.
    #[pyo3(name = "error")]
    fn py_error(&self, message: &str, color: Option<LogColor>) {
        self._log(LogLevel::Error, color, message);
    }

    /// Emit an ERROR level record with the active Python exception info.
    #[pyo3(name = "exception")]
    #[pyo3(signature = (message="", color=None))]
    fn py_exception(&self, py: Python, message: &str, color: Option<LogColor>) {
        let mut full_msg = message.to_owned();

        if pyo3::PyErr::occurred(py) {
            let err = PyErr::fetch(py);
            let err_str = err.to_string();

            if full_msg.is_empty() {
                full_msg = err_str;
            } else {
                full_msg = format!("{full_msg}: {err_str}");
            }
        }

        self._log(LogLevel::Error, color, &full_msg);
    }

    /// Flush buffered log records.
    #[pyo3(name = "flush")]
    fn py_flush(&self) {
        log::logger().flush();
    }

    fn _log(&self, level: LogLevel, color: Option<LogColor>, message: &str) {
        let color = color.unwrap_or(LogColor::Normal);
        logger::log(level, color, self.name, message);
    }
}