use crate::protocol::TraceObject;
use crate::protocol::types::Severity;
use crate::server::config::LogMode;
use crate::server::logging::LogWriter;
use crate::server::node::NodeState;
use crate::server::reforwarder::ReForwarder;
use prometheus::GaugeVec;
use std::collections::HashMap;
use std::sync::Arc;
use tracing::warn;
pub async fn handle_traces(
traces: Vec<TraceObject>,
node: &NodeState,
writer: &Arc<LogWriter>,
logging_params: &[crate::server::config::LoggingParams],
reforwarder: Option<&ReForwarder>,
) {
for params in logging_params {
match params.log_mode {
LogMode::FileMode => {
if let Err(e) = writer.write_traces(&node.name, params, &traces) {
warn!("Log write error for node {}: {}", node.name, e);
}
}
LogMode::JournalMode => {
#[cfg(unix)]
for trace in &traces {
write_to_journal(trace, &node.name);
}
#[cfg(not(unix))]
{
static WARNED: std::sync::OnceLock<()> = std::sync::OnceLock::new();
WARNED.get_or_init(|| {
warn!("JournalMode is not supported on this platform; log entries are discarded");
});
}
}
}
}
if let Some(rf) = reforwarder {
rf.forward(&traces).await;
}
push_trace_metrics(&traces, node);
}
fn push_trace_metrics(traces: &[TraceObject], node: &NodeState) {
let mut cache = node.trace_gauge_cache.lock().unwrap();
for trace in traces {
let prefix = trace.to_namespace.join("_");
if prefix.is_empty() {
continue;
}
let map = match serde_json::from_str::<serde_json::Value>(&trace.to_machine) {
Ok(serde_json::Value::Object(m)) => m,
_ => continue,
};
for (field, value) in &map {
let f_val = match value {
serde_json::Value::Number(n) => match n.as_f64() {
Some(f) => f,
None => continue,
},
_ => continue,
};
let metric_name = sanitise_metric_name(&format!("{}_{}", prefix, field));
let gauge = get_or_create_gauge(&node.registry, &mut cache, &metric_name);
if let Some(g) = gauge {
g.with_label_values(&[]).set(f_val);
}
}
}
}
fn get_or_create_gauge(
registry: &prometheus::Registry,
cache: &mut HashMap<String, GaugeVec>,
name: &str,
) -> Option<GaugeVec> {
if let Some(g) = cache.get(name) {
return Some(g.clone());
}
let opts = prometheus::Opts::new(name.to_string(), name.to_string());
let g = GaugeVec::new(opts, &[]).ok()?;
registry.register(Box::new(g.clone())).ok()?;
cache.insert(name.to_string(), g.clone());
Some(g)
}
fn sanitise_metric_name(name: &str) -> String {
name.chars()
.map(|c| {
if c.is_alphanumeric() || c == '_' {
c
} else {
'_'
}
})
.collect()
}
#[cfg(unix)]
fn write_to_journal(trace: &TraceObject, node_name: &str) {
use std::os::unix::net::UnixDatagram;
let priority = severity_to_journal_priority(trace.to_severity);
let namespace = trace.to_namespace.join(".");
let raw_message = trace.to_human.as_deref().unwrap_or(&trace.to_machine);
let message = raw_message.replace('\n', " ");
let payload = format!(
"PRIORITY={priority}\nSYSLOG_IDENTIFIER=hermod-tracer\n\
HERMOD_NODE={node_name}\nHERMOD_NAMESPACE={namespace}\n\
MESSAGE={message}\n"
);
if let Ok(socket) = UnixDatagram::unbound() {
let _ = socket.send_to(payload.as_bytes(), "/run/systemd/journal/socket");
}
}
#[cfg(unix)]
fn severity_to_journal_priority(sev: Severity) -> u8 {
match sev {
Severity::Debug => 7,
Severity::Info => 6,
Severity::Notice => 5,
Severity::Warning => 4,
Severity::Error => 3,
Severity::Critical => 2,
Severity::Alert => 1,
Severity::Emergency => 0,
}
}