jflow_core/logging.rs
1//! High-performance multi-layer logging for the Janus trading system.
2//!
3//! ## Runtime Log Level Changes
4//!
5//! After calling [`init_logging`], use [`LoggingGuard::create_controller`] to
6//! obtain a [`Box<dyn LogLevelController>`](crate::state::LogLevelController)
7//! that can be installed into [`JanusState`](crate::JanusState) for the API
8//! module to expose as `POST /api/log-level`.
9//!
10//! This module implements the **Layered Registry Model** described in the
11//! Janus Supervisor Architecture Refactor document. It splits logging into
12//! two independent pipelines:
13//!
14//! - **Layer 1 — Operational Telemetry**: Human-readable logs to stdout,
15//! filtered by `RUST_LOG` / [`EnvFilter`], with runtime-reloadable log
16//! levels via a [`ReloadHandle`].
17//!
18//! - **Layer 2 — HFT Data Stream**: High-frequency market data events
19//! written to a non-blocking rolling file appender. Events are buffered
20//! in a ring buffer and flushed by a dedicated worker thread, fully
21//! decoupling the latency-sensitive trading path from disk I/O.
22//!
23//! # Architecture
24//!
25//! ```text
26//! ┌───────────────────────────────────────────────────┐
27//! │ tracing Registry │
28//! │ │
29//! │ ┌──────────────────────┐ ┌────────────────────┐ │
30//! │ │ Layer 1: Ops/Stdout │ │ Layer 2: HFT File │ │
31//! │ │ EnvFilter (reload) │ │ Targets("janus:: │ │
32//! │ │ fmt::layer().pretty │ │ hft" = TRACE) │ │
33//! │ │ │ │ non_blocking writer │ │
34//! │ └──────────────────────┘ └────────────────────┘ │
35//! └───────────────────────────────────────────────────┘
36//! ```
37//!
38//! # Critical: WorkerGuard Lifetime
39//!
40//! The [`LoggingGuard`] returned by [`init_logging`] **must** be held alive
41//! in `main()` until the very end of the program. Dropping it prematurely
42//! causes the non-blocking HFT writer's buffer to be discarded, losing
43//! market data audit trail entries during shutdown.
44//!
45//! # Usage
46//!
47//! ```rust,ignore
48//! use janus_core::logging::{init_logging, LoggingConfig};
49//!
50//! #[tokio::main]
51//! async fn main() -> anyhow::Result<()> {
52//! let config = LoggingConfig::default();
53//! let guard = init_logging(config)?;
54//!
55//! // ... application code ...
56//!
57//! // `guard` drops here — HFT buffer is flushed
58//! Ok(())
59//! }
60//! ```
61
62use std::path::{Path, PathBuf};
63use std::sync::Arc;
64
65use tracing::Level;
66use tracing_appender::non_blocking::WorkerGuard;
67use tracing_subscriber::{
68 Layer, Registry,
69 filter::{EnvFilter, Targets},
70 fmt,
71 layer::SubscriberExt,
72 reload,
73 util::SubscriberInitExt,
74};
75
76use crate::state::LogLevelController;
77
78// ---------------------------------------------------------------------------
79// Configuration
80// ---------------------------------------------------------------------------
81
82/// Configuration for the multi-layer logging system.
83#[derive(Debug, Clone)]
84pub struct LoggingConfig {
85 /// Default `RUST_LOG` filter string if `RUST_LOG` env var is not set.
86 /// Example: `"info,janus=debug,janus::hft=off"`
87 pub default_env_filter: String,
88
89 /// Whether to use the `.pretty()` formatter for stdout (multi-line,
90 /// coloured). Set to `false` for JSON output in production / CI.
91 pub pretty_stdout: bool,
92
93 /// Whether to enable the HFT file logging layer.
94 pub enable_hft_layer: bool,
95
96 /// Directory for HFT rolling log files.
97 /// Default: `"./logs/hft"`
98 pub hft_log_dir: PathBuf,
99
100 /// Filename prefix for HFT rolling logs.
101 /// Default: `"hft.log"`
102 pub hft_log_prefix: String,
103
104 /// The tracing target that the HFT layer listens to.
105 /// Default: `"janus::hft"`
106 ///
107 /// Events must be emitted with this target to appear in the HFT log:
108 /// ```rust,ignore
109 /// tracing::trace!(target: "janus::hft", symbol = "BTCUSD", price = 42000.0);
110 /// ```
111 pub hft_target: String,
112
113 /// Maximum tracing level recorded by the HFT layer.
114 /// Default: [`Level::TRACE`] (capture everything targeted at `janus::hft`).
115 pub hft_level: Level,
116}
117
118impl Default for LoggingConfig {
119 fn default() -> Self {
120 Self {
121 default_env_filter: "info,janus=debug".to_string(),
122 pretty_stdout: true,
123 enable_hft_layer: true,
124 hft_log_dir: PathBuf::from("./logs/hft"),
125 hft_log_prefix: "hft.log".to_string(),
126 hft_target: "janus::hft".to_string(),
127 hft_level: Level::TRACE,
128 }
129 }
130}
131
132impl LoggingConfig {
133 /// Create a minimal config for tests — stdout only, no HFT file layer.
134 pub fn for_tests() -> Self {
135 Self {
136 default_env_filter: "warn".to_string(),
137 pretty_stdout: false,
138 enable_hft_layer: false,
139 ..Default::default()
140 }
141 }
142
143 /// Builder: set the default env filter.
144 pub fn with_env_filter(mut self, filter: impl Into<String>) -> Self {
145 self.default_env_filter = filter.into();
146 self
147 }
148
149 /// Builder: set the HFT log directory.
150 pub fn with_hft_dir(mut self, dir: impl Into<PathBuf>) -> Self {
151 self.hft_log_dir = dir.into();
152 self
153 }
154
155 /// Builder: disable HFT file logging.
156 pub fn without_hft(mut self) -> Self {
157 self.enable_hft_layer = false;
158 self
159 }
160
161 /// Builder: use JSON output instead of pretty printing.
162 pub fn with_json_stdout(mut self) -> Self {
163 self.pretty_stdout = false;
164 self
165 }
166}
167
168// ---------------------------------------------------------------------------
169// LoggingGuard — must live until shutdown
170// ---------------------------------------------------------------------------
171
172/// Holds resources that must outlive the logging system.
173///
174/// **Critical**: This guard owns the [`WorkerGuard`] for the non-blocking
175/// HFT file appender. If dropped, the background writer thread stops and
176/// any buffered log entries are lost.
177///
178/// Always bind this to a named variable in `main()`:
179/// ```rust,ignore
180/// let _guard = init_logging(config)?;
181/// ```
182pub struct LoggingGuard {
183 /// The non-blocking writer's guard. Dropping this flushes and stops
184 /// the background writer thread.
185 _hft_guard: Option<WorkerGuard>,
186
187 /// Handle to dynamically reload the stdout layer's filter at runtime.
188 /// Exposed so the API server or operator tooling can change log levels
189 /// without restarting the process.
190 ops_reload_handle: Option<reload::Handle<EnvFilter, Registry>>,
191}
192
193impl LoggingGuard {
194 /// Get a reference to the reload handle for the operational (stdout)
195 /// layer's [`EnvFilter`].
196 ///
197 /// Returns `None` if the stdout layer was not installed with a reload
198 /// handle (shouldn't happen in normal operation).
199 ///
200 /// # Example
201 ///
202 /// ```rust,ignore
203 /// if let Some(handle) = guard.ops_reload_handle() {
204 /// let new_filter = EnvFilter::new("debug,hyper=info");
205 /// handle.reload(new_filter).expect("reload failed");
206 /// tracing::info!("Log level changed to debug at runtime");
207 /// }
208 /// ```
209 pub fn ops_reload_handle(&self) -> Option<&reload::Handle<EnvFilter, Registry>> {
210 self.ops_reload_handle.as_ref()
211 }
212
213 /// Reload the operational log filter at runtime.
214 ///
215 /// Accepts any string that [`EnvFilter`] can parse, e.g.:
216 /// - `"debug"`
217 /// - `"info,janus=trace"`
218 /// - `"warn,janus::supervisor=debug"`
219 ///
220 /// Returns an error if the filter string is invalid or if the reload
221 /// handle is missing.
222 pub fn set_log_level(&self, filter_str: &str) -> anyhow::Result<()> {
223 let handle = self
224 .ops_reload_handle
225 .as_ref()
226 .ok_or_else(|| anyhow::anyhow!("no reload handle available"))?;
227
228 let new_filter = EnvFilter::try_new(filter_str)
229 .map_err(|e| anyhow::anyhow!("invalid filter '{}': {}", filter_str, e))?;
230
231 handle
232 .reload(new_filter)
233 .map_err(|e| anyhow::anyhow!("reload failed: {}", e))?;
234
235 tracing::info!(filter = filter_str, "log level reloaded at runtime");
236 Ok(())
237 }
238
239 /// Create a [`LogLevelController`] that can be installed into
240 /// [`JanusState`](crate::JanusState) for the API to change log
241 /// levels at runtime.
242 ///
243 /// The returned controller shares the same reload handle as this
244 /// guard — it does **not** take ownership of the guard, so the
245 /// guard must still be held alive in `main()`.
246 ///
247 /// Returns `None` if no reload handle is available (shouldn't
248 /// happen in normal operation).
249 ///
250 /// # Example
251 ///
252 /// ```rust,ignore
253 /// let guard = init_logging(config)?;
254 /// if let Some(ctrl) = guard.create_controller() {
255 /// state.set_log_level_controller(ctrl).await;
256 /// }
257 /// ```
258 pub fn create_controller(&self) -> Option<Box<dyn LogLevelController>> {
259 self.ops_reload_handle
260 .as_ref()
261 .map(|handle| -> Box<dyn LogLevelController> {
262 Box::new(ReloadHandleController {
263 handle: handle.clone(),
264 current_filter: Arc::new(std::sync::RwLock::new(None)),
265 })
266 })
267 }
268}
269
270// ---------------------------------------------------------------------------
271// ReloadHandleController — concrete LogLevelController implementation
272// ---------------------------------------------------------------------------
273
274/// Wraps a [`reload::Handle`] as a [`LogLevelController`] so it can be
275/// stored in [`JanusState`](crate::JanusState) without exposing
276/// `tracing_subscriber` internals to the rest of the codebase.
277struct ReloadHandleController {
278 handle: reload::Handle<EnvFilter, Registry>,
279 /// Tracks the last successfully applied filter string.
280 current_filter: Arc<std::sync::RwLock<Option<String>>>,
281}
282
283impl LogLevelController for ReloadHandleController {
284 fn set_log_level(&self, filter_str: &str) -> Result<(), String> {
285 let new_filter = EnvFilter::try_new(filter_str)
286 .map_err(|e| format!("invalid filter '{}': {}", filter_str, e))?;
287
288 self.handle
289 .reload(new_filter)
290 .map_err(|e| format!("reload failed: {}", e))?;
291
292 // Track the successfully applied filter
293 if let Ok(mut current) = self.current_filter.write() {
294 *current = Some(filter_str.to_string());
295 }
296
297 tracing::info!(filter = filter_str, "log level changed via API");
298 Ok(())
299 }
300
301 fn current_filter(&self) -> Option<String> {
302 self.current_filter
303 .read()
304 .ok()
305 .and_then(|guard| guard.clone())
306 }
307}
308
309impl std::fmt::Debug for LoggingGuard {
310 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
311 f.debug_struct("LoggingGuard")
312 .field("has_hft_guard", &self._hft_guard.is_some())
313 .field("has_reload_handle", &self.ops_reload_handle.is_some())
314 .finish()
315 }
316}
317
318// ---------------------------------------------------------------------------
319// Initialization
320// ---------------------------------------------------------------------------
321
322/// Initialize the multi-layer tracing subscriber.
323///
324/// Sets up:
325/// 1. **Operational layer** — stdout with `EnvFilter`, runtime-reloadable
326/// 2. **HFT layer** (optional) — non-blocking rolling file appender filtered
327/// to `janus::hft` target only
328///
329/// Returns a [`LoggingGuard`] that **must** be held alive until shutdown.
330///
331/// # Errors
332///
333/// Returns an error if the HFT log directory cannot be created or if the
334/// tracing subscriber fails to initialize.
335pub fn init_logging(config: LoggingConfig) -> anyhow::Result<LoggingGuard> {
336 // ── Layer 1: Operational Telemetry (stdout) ───────────────────────
337
338 // Parse the env filter from RUST_LOG or fall back to the config default.
339 let env_filter = EnvFilter::try_from_default_env()
340 .unwrap_or_else(|_| EnvFilter::new(&config.default_env_filter));
341
342 // Wrap the filter in a reload layer so we can change levels at runtime.
343 let (env_filter_layer, reload_handle) = reload::Layer::new(env_filter);
344
345 // Build the stdout formatting layer with the env filter applied as a
346 // **per-layer** filter (not a global registry filter). This ensures
347 // the EnvFilter only gates stdout output — the HFT layer's own
348 // Targets filter operates independently and can see TRACE-level
349 // events even when the ops filter is set to "info,janus=debug".
350 let stdout_layer = if config.pretty_stdout {
351 fmt::layer()
352 .pretty()
353 .with_thread_ids(true)
354 .with_thread_names(true)
355 .with_target(true)
356 .with_filter(env_filter_layer)
357 .boxed()
358 } else {
359 // JSON output for structured logging in CI / production
360 fmt::layer()
361 .json()
362 .with_thread_ids(true)
363 .with_target(true)
364 .with_current_span(true)
365 .with_filter(env_filter_layer)
366 .boxed()
367 };
368
369 // ── Layer 2: HFT Data Stream (non-blocking file) ─────────────────
370
371 let (hft_guard, hft_layer) = if config.enable_hft_layer {
372 ensure_dir_exists(&config.hft_log_dir)?;
373
374 // Create a rolling daily file appender
375 let file_appender =
376 tracing_appender::rolling::daily(&config.hft_log_dir, &config.hft_log_prefix);
377
378 // Wrap in a non-blocking writer — spawns a background thread that
379 // drains the ring buffer to disk. The WorkerGuard ensures the
380 // buffer is flushed on drop.
381 let (non_blocking_writer, guard) = tracing_appender::non_blocking(file_appender);
382
383 // Build the HFT layer with a target-based filter so ONLY events
384 // with `target: "janus::hft"` are recorded here.
385 let hft_filter = Targets::new().with_target(&config.hft_target, config.hft_level);
386
387 let layer = fmt::layer()
388 .with_writer(non_blocking_writer)
389 .with_ansi(false) // No ANSI colours in log files
390 .with_target(true)
391 .with_thread_ids(false)
392 .json() // Structured JSON for machine parsing of HFT data
393 .with_filter(hft_filter)
394 .boxed();
395
396 (Some(guard), Some(layer))
397 } else {
398 (None, None)
399 };
400
401 // ── Assemble the subscriber ──────────────────────────────────────
402
403 let registry = Registry::default().with(stdout_layer).with(hft_layer);
404
405 registry
406 .try_init()
407 .map_err(|e| anyhow::anyhow!("failed to initialize tracing subscriber: {}", e))?;
408
409 tracing::info!(
410 pretty = config.pretty_stdout,
411 hft_enabled = config.enable_hft_layer,
412 hft_dir = %config.hft_log_dir.display(),
413 "logging initialized"
414 );
415
416 Ok(LoggingGuard {
417 _hft_guard: hft_guard,
418 ops_reload_handle: Some(reload_handle),
419 })
420}
421
422/// Ensure a directory exists, creating it (and parents) if necessary.
423fn ensure_dir_exists(path: &Path) -> anyhow::Result<()> {
424 if !path.exists() {
425 std::fs::create_dir_all(path)
426 .map_err(|e| anyhow::anyhow!("failed to create log directory {:?}: {}", path, e))?;
427 }
428 Ok(())
429}
430
431// ---------------------------------------------------------------------------
432// Tests
433// ---------------------------------------------------------------------------
434
435#[cfg(test)]
436mod tests {
437 use super::*;
438
439 #[test]
440 fn test_default_config() {
441 let cfg = LoggingConfig::default();
442 assert_eq!(cfg.default_env_filter, "info,janus=debug");
443 assert!(cfg.pretty_stdout);
444 assert!(cfg.enable_hft_layer);
445 assert_eq!(cfg.hft_log_dir, PathBuf::from("./logs/hft"));
446 assert_eq!(cfg.hft_log_prefix, "hft.log");
447 assert_eq!(cfg.hft_target, "janus::hft");
448 assert_eq!(cfg.hft_level, Level::TRACE);
449 }
450
451 #[test]
452 fn test_config_for_tests() {
453 let cfg = LoggingConfig::for_tests();
454 assert!(!cfg.enable_hft_layer);
455 assert!(!cfg.pretty_stdout);
456 assert_eq!(cfg.default_env_filter, "warn");
457 }
458
459 #[test]
460 fn test_config_builder() {
461 let cfg = LoggingConfig::default()
462 .with_env_filter("debug,hyper=warn")
463 .with_hft_dir("/tmp/hft-logs")
464 .with_json_stdout();
465
466 assert_eq!(cfg.default_env_filter, "debug,hyper=warn");
467 assert_eq!(cfg.hft_log_dir, PathBuf::from("/tmp/hft-logs"));
468 assert!(!cfg.pretty_stdout);
469 assert!(cfg.enable_hft_layer);
470 }
471
472 #[test]
473 fn test_config_without_hft() {
474 let cfg = LoggingConfig::default().without_hft();
475 assert!(!cfg.enable_hft_layer);
476 }
477
478 // Note: We don't test `init_logging` in unit tests because
479 // `tracing_subscriber::try_init()` can only be called once per process.
480 // Integration tests should use `LoggingConfig::for_tests()` and call
481 // `init_logging` in a dedicated test binary or use `tracing::subscriber::with_default`.
482}