Skip to main content

reddb_server/telemetry/
mod.rs

1//! Structured logging façade over `tracing` + `tracing-subscriber`.
2//!
3//! Entry point for the `red` binary and any embedder that wants RedDB
4//! to manage logging on its behalf. Sets up two layers:
5//!
6//! - **stderr** — pretty (TTY) or JSON (piped/CI) formatted logs
7//! - **file**   — optional daily-rotating file in `log_dir`, non-blocking
8//!   writer backed by `tracing-appender`
9//!
10//! A background janitor purges rotated files older than
11//! `rotation_keep_days` every hour.
12//!
13//! The returned `Option<TelemetryGuard>` must live for the process
14//! lifetime — dropping it flushes the non-blocking buffer so no log
15//! lines are lost on graceful shutdown.
16
17use std::path::PathBuf;
18
19use tracing_appender::non_blocking::{NonBlockingBuilder, WorkerGuard};
20use tracing_subscriber::fmt::writer::MakeWriterExt;
21use tracing_subscriber::prelude::*;
22use tracing_subscriber::{fmt, EnvFilter, Registry};
23
24/// Non-blocking buffer size — higher than the `tracing-appender`
25/// default of 128k because under log-heavy bursts (bulk imports,
26/// CDC storms) 128k drops lines silently via `DropCurrent`.
27///
28/// 1M entries × ~200 bytes per event ≈ 200 MB worst-case RAM —
29/// fine for a server process, and dropped log lines are far more
30/// painful than the memory.
31const LOG_BUFFER_LINES: usize = 1_000_000;
32
33pub mod admin_intent_log;
34pub mod janitor;
35pub mod operator_event;
36pub mod operator_event_router;
37#[cfg(feature = "otel")]
38pub mod otel;
39pub mod slow_query_logger;
40pub mod span;
41
42/// Stdio / file output format. `Pretty` renders human-readable coloured
43/// lines; `Json` emits NDJSON suitable for Loki / ELK / Datadog.
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub enum LogFormat {
46    Pretty,
47    Json,
48}
49
50impl LogFormat {
51    pub fn parse(s: &str) -> Option<Self> {
52        match s.to_ascii_lowercase().as_str() {
53            "pretty" | "text" | "human" => Some(Self::Pretty),
54            "json" | "ndjson" => Some(Self::Json),
55            _ => None,
56        }
57    }
58}
59
60#[derive(Debug, Clone)]
61pub struct TelemetryConfig {
62    /// Directory for rotating log files. `None` = stderr-only (CLI
63    /// one-shot / embedded default).
64    pub log_dir: Option<PathBuf>,
65    /// Prefix for rotated files; defaults to `"reddb.log"` when empty.
66    pub file_prefix: String,
67    /// `RUST_LOG`-style filter expression. Example:
68    /// `"info,reddb::wire=debug"`.
69    pub level_filter: String,
70    /// stderr output format. File output always matches.
71    pub format: LogFormat,
72    /// How many rotated files to keep (older ones deleted by janitor).
73    pub rotation_keep_days: u16,
74    /// Service name stamped on every record under the `service` field.
75    pub service_name: &'static str,
76
77    /// Per-invocation intent flags — set by the CLI parser to record
78    /// which fields the operator explicitly passed. The config merge
79    /// (red_config → CLI) uses these to decide whether a persisted
80    /// `red.logging.*` value should be promoted.
81    ///
82    /// Not serialised; always recomputed per process start.
83    pub level_explicit: bool,
84    pub format_explicit: bool,
85    pub rotation_keep_days_explicit: bool,
86    pub file_prefix_explicit: bool,
87    pub log_dir_explicit: bool,
88    /// `--no-log-file` was passed: file sink must stay off regardless
89    /// of `red.logging.dir`.
90    pub log_file_disabled: bool,
91}
92
93impl Default for TelemetryConfig {
94    fn default() -> Self {
95        Self {
96            log_dir: None,
97            file_prefix: "reddb.log".to_string(),
98            level_filter: "info".to_string(),
99            format: LogFormat::Pretty,
100            rotation_keep_days: 14,
101            service_name: "reddb",
102            level_explicit: false,
103            format_explicit: false,
104            rotation_keep_days_explicit: false,
105            file_prefix_explicit: false,
106            log_dir_explicit: false,
107            log_file_disabled: false,
108        }
109    }
110}
111
112/// Opaque handle that keeps the non-blocking log writers alive.
113/// Drop at process exit to flush the buffered records for stderr
114/// AND the rotating file sink. Both writers run on their own
115/// dedicated background threads — the hot path only pushes onto
116/// an MPSC channel, never touches stdio syscalls directly.
117pub struct TelemetryGuard {
118    _stderr_worker: Option<WorkerGuard>,
119    _file_worker: Option<WorkerGuard>,
120}
121
122/// Install the global `tracing` subscriber. Idempotent: if another
123/// subscriber is already registered (e.g. an embedder set up its own),
124/// we silently return `None` and let the caller proceed.
125///
126/// RedDB embedders that want to own the subscriber should simply not
127/// call this.
128pub fn init(cfg: TelemetryConfig) -> Option<TelemetryGuard> {
129    let env_filter =
130        EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(&cfg.level_filter));
131
132    // stderr — wrapped in `non_blocking` so the hot path never
133    // blocks on `write(2)` syscalls. A dedicated thread owns the
134    // stderr file descriptor and drains an MPSC channel; on a
135    // full buffer we'd rather drop a line than stall a request.
136    let (stderr_writer, stderr_worker) = NonBlockingBuilder::default()
137        .buffered_lines_limit(LOG_BUFFER_LINES)
138        .lossy(true)
139        .finish(std::io::stderr());
140
141    // Optional file layer + worker guard
142    let (file_writer_opt, file_worker) = match cfg.log_dir.as_ref() {
143        Some(dir) => {
144            if let Err(err) = std::fs::create_dir_all(dir) {
145                // Surface the failure to stderr directly — the
146                // subscriber isn't up yet, so tracing::warn! wouldn't
147                // land anywhere. Skip file logging and continue with
148                // stderr-only.
149                eprintln!(
150                    "telemetry: failed to create log dir {}: {err}",
151                    dir.display()
152                );
153                (None, None)
154            } else {
155                let file_appender = tracing_appender::rolling::daily(dir, &cfg.file_prefix);
156                let (writer, guard) = NonBlockingBuilder::default()
157                    .buffered_lines_limit(LOG_BUFFER_LINES)
158                    .lossy(true)
159                    .finish(file_appender);
160
161                // Spawn retention janitor (if tokio runtime active).
162                if cfg.rotation_keep_days > 0 {
163                    janitor::spawn(dir.clone(), cfg.file_prefix.clone(), cfg.rotation_keep_days);
164                }
165                (Some(writer), Some(guard))
166            }
167        }
168        None => (None, None),
169    };
170
171    // Build the subscriber. We commit to one format branch at a time —
172    // mixing pretty + json per-layer is rarely useful, and the branching
173    // keeps the type signatures tractable.
174    let result = match cfg.format {
175        LogFormat::Pretty => {
176            let stderr_layer = fmt::layer()
177                .with_writer(stderr_writer.clone())
178                .with_target(true)
179                .with_thread_ids(false)
180                .with_thread_names(false);
181            let base = Registry::default().with(env_filter).with(stderr_layer);
182            if let Some(writer) = file_writer_opt.clone() {
183                let file_layer = fmt::layer()
184                    .with_writer(writer.with_max_level(tracing::Level::TRACE))
185                    .with_target(true)
186                    .with_ansi(false);
187                base.with(file_layer).try_init()
188            } else {
189                base.try_init()
190            }
191        }
192        LogFormat::Json => {
193            let stderr_json = fmt::layer()
194                .with_writer(stderr_writer.clone())
195                .with_target(true)
196                .with_thread_ids(false)
197                .with_thread_names(false)
198                .json()
199                .with_current_span(true)
200                .with_span_list(false);
201            let base = Registry::default().with(env_filter).with(stderr_json);
202            if let Some(writer) = file_writer_opt {
203                let file_json = fmt::layer()
204                    .with_writer(writer.with_max_level(tracing::Level::TRACE))
205                    .with_target(true)
206                    .json()
207                    .with_current_span(true)
208                    .with_span_list(false);
209                base.with(file_json).try_init()
210            } else {
211                base.try_init()
212            }
213        }
214    };
215
216    if result.is_err() {
217        // Subscriber already set — library mode. That's fine.
218        return None;
219    }
220
221    // Root event so users know telemetry is alive.
222    tracing::info!(
223        service = cfg.service_name,
224        log_dir = cfg.log_dir.as_ref().map(|p| p.display().to_string()).unwrap_or_else(|| "<none>".into()),
225        format = ?cfg.format,
226        "telemetry initialised"
227    );
228
229    Some(TelemetryGuard {
230        _stderr_worker: Some(stderr_worker),
231        _file_worker: file_worker,
232    })
233}