Skip to main content

jflow_core/
logging.rs

1//! High-performance multi-layer logging for the Janus trading system.
2//!
3//! ## Runtime Log Level Changes
4//!
5//! After calling [`init_logging`], use [`LoggingGuard::create_controller`] to
6//! obtain a [`Box<dyn LogLevelController>`](crate::state::LogLevelController)
7//! that can be installed into [`JanusState`](crate::JanusState) for the API
8//! module to expose as `POST /api/log-level`.
9//!
10//! This module implements the **Layered Registry Model** described in the
11//! Janus Supervisor Architecture Refactor document. It splits logging into
12//! two independent pipelines:
13//!
14//! - **Layer 1 — Operational Telemetry**: Human-readable logs to stdout,
15//!   filtered by `RUST_LOG` / [`EnvFilter`], with runtime-reloadable log
16//!   levels via a [`ReloadHandle`].
17//!
18//! - **Layer 2 — HFT Data Stream**: High-frequency market data events
19//!   written to a non-blocking rolling file appender. Events are buffered
20//!   in a ring buffer and flushed by a dedicated worker thread, fully
21//!   decoupling the latency-sensitive trading path from disk I/O.
22//!
23//! # Architecture
24//!
25//! ```text
26//! ┌───────────────────────────────────────────────────┐
27//! │                 tracing Registry                   │
28//! │                                                    │
29//! │  ┌──────────────────────┐  ┌────────────────────┐ │
30//! │  │  Layer 1: Ops/Stdout │  │ Layer 2: HFT File  │ │
31//! │  │  EnvFilter (reload)  │  │ Targets("janus::   │ │
32//! │  │  fmt::layer().pretty │  │   hft" = TRACE)    │ │
33//! │  │                      │  │ non_blocking writer │ │
34//! │  └──────────────────────┘  └────────────────────┘ │
35//! └───────────────────────────────────────────────────┘
36//! ```
37//!
38//! # Critical: WorkerGuard Lifetime
39//!
40//! The [`LoggingGuard`] returned by [`init_logging`] **must** be held alive
41//! in `main()` until the very end of the program. Dropping it prematurely
42//! causes the non-blocking HFT writer's buffer to be discarded, losing
43//! market data audit trail entries during shutdown.
44//!
45//! # Usage
46//!
47//! ```rust,ignore
48//! use janus_core::logging::{init_logging, LoggingConfig};
49//!
50//! #[tokio::main]
51//! async fn main() -> anyhow::Result<()> {
52//!     let config = LoggingConfig::default();
53//!     let guard = init_logging(config)?;
54//!
55//!     // ... application code ...
56//!
57//!     // `guard` drops here — HFT buffer is flushed
58//!     Ok(())
59//! }
60//! ```
61
62use std::path::{Path, PathBuf};
63use std::sync::Arc;
64
65use tracing::Level;
66use tracing_appender::non_blocking::WorkerGuard;
67use tracing_subscriber::{
68    Layer, Registry,
69    filter::{EnvFilter, Targets},
70    fmt,
71    layer::SubscriberExt,
72    reload,
73    util::SubscriberInitExt,
74};
75
76use crate::state::LogLevelController;
77
78// ---------------------------------------------------------------------------
79// Configuration
80// ---------------------------------------------------------------------------
81
82/// Configuration for the multi-layer logging system.
83#[derive(Debug, Clone)]
84pub struct LoggingConfig {
85    /// Default `RUST_LOG` filter string if `RUST_LOG` env var is not set.
86    /// Example: `"info,janus=debug,janus::hft=off"`
87    pub default_env_filter: String,
88
89    /// Whether to use the `.pretty()` formatter for stdout (multi-line,
90    /// coloured). Set to `false` for JSON output in production / CI.
91    pub pretty_stdout: bool,
92
93    /// Whether to enable the HFT file logging layer.
94    pub enable_hft_layer: bool,
95
96    /// Directory for HFT rolling log files.
97    /// Default: `"./logs/hft"`
98    pub hft_log_dir: PathBuf,
99
100    /// Filename prefix for HFT rolling logs.
101    /// Default: `"hft.log"`
102    pub hft_log_prefix: String,
103
104    /// The tracing target that the HFT layer listens to.
105    /// Default: `"janus::hft"`
106    ///
107    /// Events must be emitted with this target to appear in the HFT log:
108    /// ```rust,ignore
109    /// tracing::trace!(target: "janus::hft", symbol = "BTCUSD", price = 42000.0);
110    /// ```
111    pub hft_target: String,
112
113    /// Maximum tracing level recorded by the HFT layer.
114    /// Default: [`Level::TRACE`] (capture everything targeted at `janus::hft`).
115    pub hft_level: Level,
116}
117
118impl Default for LoggingConfig {
119    fn default() -> Self {
120        Self {
121            default_env_filter: "info,janus=debug".to_string(),
122            pretty_stdout: true,
123            enable_hft_layer: true,
124            hft_log_dir: PathBuf::from("./logs/hft"),
125            hft_log_prefix: "hft.log".to_string(),
126            hft_target: "janus::hft".to_string(),
127            hft_level: Level::TRACE,
128        }
129    }
130}
131
132impl LoggingConfig {
133    /// Create a minimal config for tests — stdout only, no HFT file layer.
134    pub fn for_tests() -> Self {
135        Self {
136            default_env_filter: "warn".to_string(),
137            pretty_stdout: false,
138            enable_hft_layer: false,
139            ..Default::default()
140        }
141    }
142
143    /// Builder: set the default env filter.
144    pub fn with_env_filter(mut self, filter: impl Into<String>) -> Self {
145        self.default_env_filter = filter.into();
146        self
147    }
148
149    /// Builder: set the HFT log directory.
150    pub fn with_hft_dir(mut self, dir: impl Into<PathBuf>) -> Self {
151        self.hft_log_dir = dir.into();
152        self
153    }
154
155    /// Builder: disable HFT file logging.
156    pub fn without_hft(mut self) -> Self {
157        self.enable_hft_layer = false;
158        self
159    }
160
161    /// Builder: use JSON output instead of pretty printing.
162    pub fn with_json_stdout(mut self) -> Self {
163        self.pretty_stdout = false;
164        self
165    }
166}
167
168// ---------------------------------------------------------------------------
169// LoggingGuard — must live until shutdown
170// ---------------------------------------------------------------------------
171
172/// Holds resources that must outlive the logging system.
173///
174/// **Critical**: This guard owns the [`WorkerGuard`] for the non-blocking
175/// HFT file appender. If dropped, the background writer thread stops and
176/// any buffered log entries are lost.
177///
178/// Always bind this to a named variable in `main()`:
179/// ```rust,ignore
180/// let _guard = init_logging(config)?;
181/// ```
182pub struct LoggingGuard {
183    /// The non-blocking writer's guard. Dropping this flushes and stops
184    /// the background writer thread.
185    _hft_guard: Option<WorkerGuard>,
186
187    /// Handle to dynamically reload the stdout layer's filter at runtime.
188    /// Exposed so the API server or operator tooling can change log levels
189    /// without restarting the process.
190    ops_reload_handle: Option<reload::Handle<EnvFilter, Registry>>,
191}
192
193impl LoggingGuard {
194    /// Get a reference to the reload handle for the operational (stdout)
195    /// layer's [`EnvFilter`].
196    ///
197    /// Returns `None` if the stdout layer was not installed with a reload
198    /// handle (shouldn't happen in normal operation).
199    ///
200    /// # Example
201    ///
202    /// ```rust,ignore
203    /// if let Some(handle) = guard.ops_reload_handle() {
204    ///     let new_filter = EnvFilter::new("debug,hyper=info");
205    ///     handle.reload(new_filter).expect("reload failed");
206    ///     tracing::info!("Log level changed to debug at runtime");
207    /// }
208    /// ```
209    pub fn ops_reload_handle(&self) -> Option<&reload::Handle<EnvFilter, Registry>> {
210        self.ops_reload_handle.as_ref()
211    }
212
213    /// Reload the operational log filter at runtime.
214    ///
215    /// Accepts any string that [`EnvFilter`] can parse, e.g.:
216    /// - `"debug"`
217    /// - `"info,janus=trace"`
218    /// - `"warn,janus::supervisor=debug"`
219    ///
220    /// Returns an error if the filter string is invalid or if the reload
221    /// handle is missing.
222    pub fn set_log_level(&self, filter_str: &str) -> anyhow::Result<()> {
223        let handle = self
224            .ops_reload_handle
225            .as_ref()
226            .ok_or_else(|| anyhow::anyhow!("no reload handle available"))?;
227
228        let new_filter = EnvFilter::try_new(filter_str)
229            .map_err(|e| anyhow::anyhow!("invalid filter '{}': {}", filter_str, e))?;
230
231        handle
232            .reload(new_filter)
233            .map_err(|e| anyhow::anyhow!("reload failed: {}", e))?;
234
235        tracing::info!(filter = filter_str, "log level reloaded at runtime");
236        Ok(())
237    }
238
239    /// Create a [`LogLevelController`] that can be installed into
240    /// [`JanusState`](crate::JanusState) for the API to change log
241    /// levels at runtime.
242    ///
243    /// The returned controller shares the same reload handle as this
244    /// guard — it does **not** take ownership of the guard, so the
245    /// guard must still be held alive in `main()`.
246    ///
247    /// Returns `None` if no reload handle is available (shouldn't
248    /// happen in normal operation).
249    ///
250    /// # Example
251    ///
252    /// ```rust,ignore
253    /// let guard = init_logging(config)?;
254    /// if let Some(ctrl) = guard.create_controller() {
255    ///     state.set_log_level_controller(ctrl).await;
256    /// }
257    /// ```
258    pub fn create_controller(&self) -> Option<Box<dyn LogLevelController>> {
259        self.ops_reload_handle
260            .as_ref()
261            .map(|handle| -> Box<dyn LogLevelController> {
262                Box::new(ReloadHandleController {
263                    handle: handle.clone(),
264                    current_filter: Arc::new(std::sync::RwLock::new(None)),
265                })
266            })
267    }
268}
269
270// ---------------------------------------------------------------------------
271// ReloadHandleController — concrete LogLevelController implementation
272// ---------------------------------------------------------------------------
273
274/// Wraps a [`reload::Handle`] as a [`LogLevelController`] so it can be
275/// stored in [`JanusState`](crate::JanusState) without exposing
276/// `tracing_subscriber` internals to the rest of the codebase.
277struct ReloadHandleController {
278    handle: reload::Handle<EnvFilter, Registry>,
279    /// Tracks the last successfully applied filter string.
280    current_filter: Arc<std::sync::RwLock<Option<String>>>,
281}
282
283impl LogLevelController for ReloadHandleController {
284    fn set_log_level(&self, filter_str: &str) -> Result<(), String> {
285        let new_filter = EnvFilter::try_new(filter_str)
286            .map_err(|e| format!("invalid filter '{}': {}", filter_str, e))?;
287
288        self.handle
289            .reload(new_filter)
290            .map_err(|e| format!("reload failed: {}", e))?;
291
292        // Track the successfully applied filter
293        if let Ok(mut current) = self.current_filter.write() {
294            *current = Some(filter_str.to_string());
295        }
296
297        tracing::info!(filter = filter_str, "log level changed via API");
298        Ok(())
299    }
300
301    fn current_filter(&self) -> Option<String> {
302        self.current_filter
303            .read()
304            .ok()
305            .and_then(|guard| guard.clone())
306    }
307}
308
309impl std::fmt::Debug for LoggingGuard {
310    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
311        f.debug_struct("LoggingGuard")
312            .field("has_hft_guard", &self._hft_guard.is_some())
313            .field("has_reload_handle", &self.ops_reload_handle.is_some())
314            .finish()
315    }
316}
317
318// ---------------------------------------------------------------------------
319// Initialization
320// ---------------------------------------------------------------------------
321
322/// Initialize the multi-layer tracing subscriber.
323///
324/// Sets up:
325/// 1. **Operational layer** — stdout with `EnvFilter`, runtime-reloadable
326/// 2. **HFT layer** (optional) — non-blocking rolling file appender filtered
327///    to `janus::hft` target only
328///
329/// Returns a [`LoggingGuard`] that **must** be held alive until shutdown.
330///
331/// # Errors
332///
333/// Returns an error if the HFT log directory cannot be created or if the
334/// tracing subscriber fails to initialize.
335pub fn init_logging(config: LoggingConfig) -> anyhow::Result<LoggingGuard> {
336    // ── Layer 1: Operational Telemetry (stdout) ───────────────────────
337
338    // Parse the env filter from RUST_LOG or fall back to the config default.
339    let env_filter = EnvFilter::try_from_default_env()
340        .unwrap_or_else(|_| EnvFilter::new(&config.default_env_filter));
341
342    // Wrap the filter in a reload layer so we can change levels at runtime.
343    let (env_filter_layer, reload_handle) = reload::Layer::new(env_filter);
344
345    // Build the stdout formatting layer with the env filter applied as a
346    // **per-layer** filter (not a global registry filter). This ensures
347    // the EnvFilter only gates stdout output — the HFT layer's own
348    // Targets filter operates independently and can see TRACE-level
349    // events even when the ops filter is set to "info,janus=debug".
350    let stdout_layer = if config.pretty_stdout {
351        fmt::layer()
352            .pretty()
353            .with_thread_ids(true)
354            .with_thread_names(true)
355            .with_target(true)
356            .with_filter(env_filter_layer)
357            .boxed()
358    } else {
359        // JSON output for structured logging in CI / production
360        fmt::layer()
361            .json()
362            .with_thread_ids(true)
363            .with_target(true)
364            .with_current_span(true)
365            .with_filter(env_filter_layer)
366            .boxed()
367    };
368
369    // ── Layer 2: HFT Data Stream (non-blocking file) ─────────────────
370
371    let (hft_guard, hft_layer) = if config.enable_hft_layer {
372        ensure_dir_exists(&config.hft_log_dir)?;
373
374        // Create a rolling daily file appender
375        let file_appender =
376            tracing_appender::rolling::daily(&config.hft_log_dir, &config.hft_log_prefix);
377
378        // Wrap in a non-blocking writer — spawns a background thread that
379        // drains the ring buffer to disk. The WorkerGuard ensures the
380        // buffer is flushed on drop.
381        let (non_blocking_writer, guard) = tracing_appender::non_blocking(file_appender);
382
383        // Build the HFT layer with a target-based filter so ONLY events
384        // with `target: "janus::hft"` are recorded here.
385        let hft_filter = Targets::new().with_target(&config.hft_target, config.hft_level);
386
387        let layer = fmt::layer()
388            .with_writer(non_blocking_writer)
389            .with_ansi(false) // No ANSI colours in log files
390            .with_target(true)
391            .with_thread_ids(false)
392            .json() // Structured JSON for machine parsing of HFT data
393            .with_filter(hft_filter)
394            .boxed();
395
396        (Some(guard), Some(layer))
397    } else {
398        (None, None)
399    };
400
401    // ── Assemble the subscriber ──────────────────────────────────────
402
403    let registry = Registry::default().with(stdout_layer).with(hft_layer);
404
405    registry
406        .try_init()
407        .map_err(|e| anyhow::anyhow!("failed to initialize tracing subscriber: {}", e))?;
408
409    tracing::info!(
410        pretty = config.pretty_stdout,
411        hft_enabled = config.enable_hft_layer,
412        hft_dir = %config.hft_log_dir.display(),
413        "logging initialized"
414    );
415
416    Ok(LoggingGuard {
417        _hft_guard: hft_guard,
418        ops_reload_handle: Some(reload_handle),
419    })
420}
421
422/// Ensure a directory exists, creating it (and parents) if necessary.
423fn ensure_dir_exists(path: &Path) -> anyhow::Result<()> {
424    if !path.exists() {
425        std::fs::create_dir_all(path)
426            .map_err(|e| anyhow::anyhow!("failed to create log directory {:?}: {}", path, e))?;
427    }
428    Ok(())
429}
430
431// ---------------------------------------------------------------------------
432// Tests
433// ---------------------------------------------------------------------------
434
435#[cfg(test)]
436mod tests {
437    use super::*;
438
439    #[test]
440    fn test_default_config() {
441        let cfg = LoggingConfig::default();
442        assert_eq!(cfg.default_env_filter, "info,janus=debug");
443        assert!(cfg.pretty_stdout);
444        assert!(cfg.enable_hft_layer);
445        assert_eq!(cfg.hft_log_dir, PathBuf::from("./logs/hft"));
446        assert_eq!(cfg.hft_log_prefix, "hft.log");
447        assert_eq!(cfg.hft_target, "janus::hft");
448        assert_eq!(cfg.hft_level, Level::TRACE);
449    }
450
451    #[test]
452    fn test_config_for_tests() {
453        let cfg = LoggingConfig::for_tests();
454        assert!(!cfg.enable_hft_layer);
455        assert!(!cfg.pretty_stdout);
456        assert_eq!(cfg.default_env_filter, "warn");
457    }
458
459    #[test]
460    fn test_config_builder() {
461        let cfg = LoggingConfig::default()
462            .with_env_filter("debug,hyper=warn")
463            .with_hft_dir("/tmp/hft-logs")
464            .with_json_stdout();
465
466        assert_eq!(cfg.default_env_filter, "debug,hyper=warn");
467        assert_eq!(cfg.hft_log_dir, PathBuf::from("/tmp/hft-logs"));
468        assert!(!cfg.pretty_stdout);
469        assert!(cfg.enable_hft_layer);
470    }
471
472    #[test]
473    fn test_config_without_hft() {
474        let cfg = LoggingConfig::default().without_hft();
475        assert!(!cfg.enable_hft_layer);
476    }
477
478    // Note: We don't test `init_logging` in unit tests because
479    // `tracing_subscriber::try_init()` can only be called once per process.
480    // Integration tests should use `LoggingConfig::for_tests()` and call
481    // `init_logging` in a dedicated test binary or use `tracing::subscriber::with_default`.
482}