hermod/server/
trace_handler.rs1use crate::protocol::TraceObject;
17use crate::protocol::types::Severity;
18use crate::server::config::LogMode;
19use crate::server::logging::LogWriter;
20use crate::server::node::NodeState;
21use crate::server::reforwarder::ReForwarder;
22use prometheus::GaugeVec;
23use std::collections::HashMap;
24use std::sync::Arc;
25use tracing::warn;
26
27pub async fn handle_traces(
29 traces: Vec<TraceObject>,
30 node: &NodeState,
31 writer: &Arc<LogWriter>,
32 logging_params: &[crate::server::config::LoggingParams],
33 reforwarder: Option<&ReForwarder>,
34) {
35 for params in logging_params {
37 match params.log_mode {
38 LogMode::FileMode => {
39 if let Err(e) = writer.write_traces(&node.name, params, &traces) {
40 warn!("Log write error for node {}: {}", node.name, e);
41 }
42 }
43 LogMode::JournalMode => {
44 #[cfg(unix)]
45 for trace in &traces {
46 write_to_journal(trace, &node.name);
47 }
48 #[cfg(not(unix))]
49 {
50 static WARNED: std::sync::OnceLock<()> = std::sync::OnceLock::new();
51 WARNED.get_or_init(|| {
52 warn!("JournalMode is not supported on this platform; log entries are discarded");
53 });
54 }
55 }
56 }
57 }
58
59 if let Some(rf) = reforwarder {
61 rf.forward(&traces).await;
62 }
63
64 push_trace_metrics(&traces, node);
66}
67
68fn push_trace_metrics(traces: &[TraceObject], node: &NodeState) {
74 let mut cache = node.trace_gauge_cache.lock().unwrap();
75 for trace in traces {
76 let prefix = trace.to_namespace.join("_");
77 if prefix.is_empty() {
78 continue;
79 }
80 let map = match serde_json::from_str::<serde_json::Value>(&trace.to_machine) {
81 Ok(serde_json::Value::Object(m)) => m,
82 _ => continue,
83 };
84 for (field, value) in &map {
85 let f_val = match value {
86 serde_json::Value::Number(n) => match n.as_f64() {
87 Some(f) => f,
88 None => continue,
89 },
90 _ => continue,
91 };
92 let metric_name = sanitise_metric_name(&format!("{}_{}", prefix, field));
93 let gauge = get_or_create_gauge(&node.registry, &mut cache, &metric_name);
94 if let Some(g) = gauge {
95 g.with_label_values(&[]).set(f_val);
96 }
97 }
98 }
99}
100
101fn get_or_create_gauge(
102 registry: &prometheus::Registry,
103 cache: &mut HashMap<String, GaugeVec>,
104 name: &str,
105) -> Option<GaugeVec> {
106 if let Some(g) = cache.get(name) {
107 return Some(g.clone());
108 }
109 let opts = prometheus::Opts::new(name.to_string(), name.to_string());
110 let g = GaugeVec::new(opts, &[]).ok()?;
111 registry.register(Box::new(g.clone())).ok()?;
112 cache.insert(name.to_string(), g.clone());
113 Some(g)
114}
115
116fn sanitise_metric_name(name: &str) -> String {
117 name.chars()
118 .map(|c| {
119 if c.is_alphanumeric() || c == '_' {
120 c
121 } else {
122 '_'
123 }
124 })
125 .collect()
126}
127
128#[cfg(unix)]
145fn write_to_journal(trace: &TraceObject, node_name: &str) {
146 use std::os::unix::net::UnixDatagram;
147
148 let priority = severity_to_journal_priority(trace.to_severity);
149 let namespace = trace.to_namespace.join(".");
150 let raw_message = trace.to_human.as_deref().unwrap_or(&trace.to_machine);
151 let message = raw_message.replace('\n', " ");
153
154 let payload = format!(
155 "PRIORITY={priority}\nSYSLOG_IDENTIFIER=hermod-tracer\n\
156 HERMOD_NODE={node_name}\nHERMOD_NAMESPACE={namespace}\n\
157 MESSAGE={message}\n"
158 );
159
160 if let Ok(socket) = UnixDatagram::unbound() {
161 let _ = socket.send_to(payload.as_bytes(), "/run/systemd/journal/socket");
162 }
163}
164
165#[cfg(unix)]
166fn severity_to_journal_priority(sev: Severity) -> u8 {
167 match sev {
168 Severity::Debug => 7,
169 Severity::Info => 6,
170 Severity::Notice => 5,
171 Severity::Warning => 4,
172 Severity::Error => 3,
173 Severity::Critical => 2,
174 Severity::Alert => 1,
175 Severity::Emergency => 0,
176 }
177}