Skip to main content

hermod/server/
trace_handler.rs

1//! Trace processing pipeline
2//!
3//! [`handle_traces`] is called once per batch of [`TraceObject`]s received
4//! from a connected node.  It fans the batch out to every configured output:
5//!
6//! 1. **File logging** — writes each trace to the appropriate log file via
7//!    [`LogWriter`], one file per `(node, logRoot, logFormat)` triple.
8//!
9//! 2. **Journal logging** — on Unix/Linux, writes each trace to the systemd
10//!    journal via `/run/systemd/journal/socket` using the native journal
11//!    protocol.  On non-Unix platforms `JournalMode` is silently skipped.
12//!
13//! 3. **Re-forwarding** — if a [`ReForwarder`] is configured, passes the
14//!    (optionally namespace-filtered) batch on to the downstream forwarder.
15
16use crate::protocol::TraceObject;
17use crate::protocol::types::Severity;
18use crate::server::config::LogMode;
19use crate::server::logging::LogWriter;
20use crate::server::node::NodeState;
21use crate::server::reforwarder::ReForwarder;
22use prometheus::GaugeVec;
23use std::collections::HashMap;
24use std::sync::Arc;
25use tracing::warn;
26
27/// Process a batch of received TraceObjects for one node
28pub async fn handle_traces(
29    traces: Vec<TraceObject>,
30    node: &NodeState,
31    writer: &Arc<LogWriter>,
32    logging_params: &[crate::server::config::LoggingParams],
33    reforwarder: Option<&ReForwarder>,
34) {
35    // --- File + Journal logging ---
36    for params in logging_params {
37        match params.log_mode {
38            LogMode::FileMode => {
39                if let Err(e) = writer.write_traces(&node.name, params, &traces) {
40                    warn!("Log write error for node {}: {}", node.name, e);
41                }
42            }
43            LogMode::JournalMode => {
44                #[cfg(unix)]
45                for trace in &traces {
46                    write_to_journal(trace, &node.name);
47                }
48                #[cfg(not(unix))]
49                {
50                    static WARNED: std::sync::OnceLock<()> = std::sync::OnceLock::new();
51                    WARNED.get_or_init(|| {
52                        warn!("JournalMode is not supported on this platform; log entries are discarded");
53                    });
54                }
55            }
56        }
57    }
58
59    // --- Re-forwarding ---
60    if let Some(rf) = reforwarder {
61        rf.forward(&traces).await;
62    }
63
64    // --- Prometheus metrics from trace fields ---
65    push_trace_metrics(&traces, node);
66}
67
68/// Extract numeric fields from each `TraceObject.to_machine` JSON blob and
69/// push them as Prometheus gauges on the node's registry.
70///
71/// Metric names are built as `<namespace>_<field>` where namespace segments
72/// are joined with `_`.  Non-alphanumeric characters are replaced with `_`.
73fn push_trace_metrics(traces: &[TraceObject], node: &NodeState) {
74    let mut cache = node.trace_gauge_cache.lock().unwrap();
75    for trace in traces {
76        let prefix = trace.to_namespace.join("_");
77        if prefix.is_empty() {
78            continue;
79        }
80        let map = match serde_json::from_str::<serde_json::Value>(&trace.to_machine) {
81            Ok(serde_json::Value::Object(m)) => m,
82            _ => continue,
83        };
84        for (field, value) in &map {
85            let f_val = match value {
86                serde_json::Value::Number(n) => match n.as_f64() {
87                    Some(f) => f,
88                    None => continue,
89                },
90                _ => continue,
91            };
92            let metric_name = sanitise_metric_name(&format!("{}_{}", prefix, field));
93            let gauge = get_or_create_gauge(&node.registry, &mut cache, &metric_name);
94            if let Some(g) = gauge {
95                g.with_label_values(&[]).set(f_val);
96            }
97        }
98    }
99}
100
101fn get_or_create_gauge(
102    registry: &prometheus::Registry,
103    cache: &mut HashMap<String, GaugeVec>,
104    name: &str,
105) -> Option<GaugeVec> {
106    if let Some(g) = cache.get(name) {
107        return Some(g.clone());
108    }
109    let opts = prometheus::Opts::new(name.to_string(), name.to_string());
110    let g = GaugeVec::new(opts, &[]).ok()?;
111    registry.register(Box::new(g.clone())).ok()?;
112    cache.insert(name.to_string(), g.clone());
113    Some(g)
114}
115
116fn sanitise_metric_name(name: &str) -> String {
117    name.chars()
118        .map(|c| {
119            if c.is_alphanumeric() || c == '_' {
120                c
121            } else {
122                '_'
123            }
124        })
125        .collect()
126}
127
128// ---------------------------------------------------------------------------
129// Journald integration (Unix only)
130// ---------------------------------------------------------------------------
131
132/// Write a single trace to the systemd journal via the native journal socket.
133///
134/// Uses the native journald protocol over a Unix datagram socket at
135/// `/run/systemd/journal/socket`.  All errors are silently ignored so that
136/// a missing or full journal socket never disrupts trace processing.
137///
138/// Each journal entry includes:
139/// - `PRIORITY` — syslog priority (0–7)
140/// - `SYSLOG_IDENTIFIER` — `"hermod-tracer"`
141/// - `HERMOD_NODE` — the node's display name
142/// - `HERMOD_NAMESPACE` — dot-joined trace namespace
143/// - `MESSAGE` — human text if available, otherwise the machine JSON
144#[cfg(unix)]
145fn write_to_journal(trace: &TraceObject, node_name: &str) {
146    use std::os::unix::net::UnixDatagram;
147
148    let priority = severity_to_journal_priority(trace.to_severity);
149    let namespace = trace.to_namespace.join(".");
150    let raw_message = trace.to_human.as_deref().unwrap_or(&trace.to_machine);
151    // Replace newlines so the message stays on one line (simple key=value format)
152    let message = raw_message.replace('\n', " ");
153
154    let payload = format!(
155        "PRIORITY={priority}\nSYSLOG_IDENTIFIER=hermod-tracer\n\
156         HERMOD_NODE={node_name}\nHERMOD_NAMESPACE={namespace}\n\
157         MESSAGE={message}\n"
158    );
159
160    if let Ok(socket) = UnixDatagram::unbound() {
161        let _ = socket.send_to(payload.as_bytes(), "/run/systemd/journal/socket");
162    }
163}
164
165#[cfg(unix)]
166fn severity_to_journal_priority(sev: Severity) -> u8 {
167    match sev {
168        Severity::Debug => 7,
169        Severity::Info => 6,
170        Severity::Notice => 5,
171        Severity::Warning => 4,
172        Severity::Error => 3,
173        Severity::Critical => 2,
174        Severity::Alert => 1,
175        Severity::Emergency => 0,
176    }
177}