hermod/server/mod.rs
1//! Full `hermod-tracer` server implementation
2//!
3//! This module provides [`TracerServer`], which accepts trace connections from
4//! Cardano nodes and routes them to file logs, Prometheus metrics, EKG polling,
5//! and optional re-forwarding — feature-for-feature with the Haskell
6//! `cardano-tracer` (excluding RTView and email alerts).
7//!
8//! # Architecture
9//!
10//! ```text
11//! ┌─────────────────────────────────────────────────────────┐
12//! │ TracerServer │
13//! │ │
14//! │ ┌──────────┐ ┌──────────────┐ ┌─────────────────┐ │
15//! │ │ Network │ │ Prometheus │ │ Log rotation │ │
16//! │ │ acceptor │ │ HTTP server │ │ background task │ │
17//! │ └────┬─────┘ └──────────────┘ └─────────────────┘ │
18//! │ │ per-connection │
19//! │ ┌────▼──────────────────────────────────────────────┐ │
20//! │ │ handle_connection (one task per node) │ │
21//! │ │ ├─ trace loop → LogWriter + ReForwarder │ │
22//! │ │ ├─ EKG poller → NodeState::registry │ │
23//! │ │ └─ DataPoint idle (keeps channel alive) │ │
24//! │ └────────────────────────────────────────────────────┘ │
25//! └─────────────────────────────────────────────────────────┘
26//! ```
27//!
28//! # Usage
29//!
30//! Load a [`config::TracerConfig`] from a YAML file and pass it to
31//! [`TracerServer::new`], then `.await` [`TracerServer::run`]:
32//!
33//! ```no_run
34//! use hermod::server::{TracerServer, config::TracerConfig};
35//!
36//! #[tokio::main]
37//! async fn main() -> anyhow::Result<()> {
38//! let config = TracerConfig::from_file("hermod-tracer.yaml".as_ref())?;
39//! TracerServer::new(config).run().await
40//! }
41//! ```
42//!
43//! See `config/hermod-tracer.yaml` in the repository for a fully-annotated
44//! example configuration.
45
46pub mod acceptor;
47pub mod config;
48pub mod datapoint;
49pub mod ekg;
50pub mod logging;
51pub mod node;
52pub mod prometheus;
53pub mod reforwarder;
54pub mod rotation;
55pub mod trace_handler;
56
57use crate::forwarder::{ForwarderAddress, ForwarderConfig, TraceForwarder};
58use crate::server::acceptor::run_network;
59use crate::server::config::TracerConfig;
60use crate::server::logging::LogWriter;
61use crate::server::node::TracerState;
62use crate::server::reforwarder::ReForwarder;
63use crate::server::rotation::run_rotation_loop;
64use std::sync::Arc;
65use tracing::info;
66
67/// The top-level tracer server
68pub struct TracerServer {
69 config: Arc<TracerConfig>,
70 state: Arc<TracerState>,
71}
72
73impl TracerServer {
74 /// Create a new server from the given config
75 pub fn new(config: TracerConfig) -> Self {
76 let config = Arc::new(config);
77 let state = Arc::new(TracerState::new(config.clone()));
78 TracerServer { config, state }
79 }
80
81 /// Run until cancelled
82 pub async fn run(self) -> anyhow::Result<()> {
83 info!("Starting hermod-tracer server");
84
85 let config = self.config.clone();
86 let state = self.state.clone();
87 let writer = Arc::new(LogWriter::new());
88
89 // --- Re-forwarder ---
90 let reforwarder: Option<Arc<ReForwarder>> = if let Some(rf_cfg) = &config.has_forwarding {
91 match &rf_cfg.network {
92 crate::server::config::Network::AcceptAt(addr) => {
93 // hermod-tracer acts as the trace-forward FORWARDER, connecting
94 // out to the downstream acceptor's socket.
95 let fwd_address = match addr {
96 crate::server::config::Address::LocalPipe(p) => {
97 ForwarderAddress::Unix(p.clone())
98 }
99 crate::server::config::Address::RemoteSocket(host, port) => {
100 ForwarderAddress::Tcp(host.clone(), *port)
101 }
102 };
103 let fwd_config = ForwarderConfig {
104 address: fwd_address,
105 queue_size: rf_cfg.forwarder_opts.queue_size,
106 network_magic: config.network_magic as u64,
107 ..Default::default()
108 };
109 let forwarder = TraceForwarder::new(fwd_config);
110 let handle = forwarder.handle();
111 tokio::spawn(async move {
112 let _ = forwarder.run().await;
113 });
114 Some(Arc::new(ReForwarder::new(
115 handle,
116 rf_cfg.namespace_filters.clone(),
117 )))
118 }
119 crate::server::config::Network::ConnectTo(addrs) => {
120 // hermod-tracer listens; downstream acceptors connect to it.
121 // We broadcast forwarded traces to all connected downstreams.
122 let capacity = rf_cfg.forwarder_opts.queue_size.max(1);
123 let (tx, _) = tokio::sync::broadcast::channel(capacity);
124 let rf = Arc::new(ReForwarder::new_inbound(
125 tx.clone(),
126 rf_cfg.namespace_filters.clone(),
127 ));
128 let addrs = addrs.clone();
129 let network_magic = config.network_magic as u64;
130 tokio::spawn(async move {
131 crate::server::reforwarder::run_accepting_loop(&addrs, tx, network_magic)
132 .await;
133 });
134 Some(rf)
135 }
136 }
137 } else {
138 None
139 };
140
141 let mut tasks = tokio::task::JoinSet::new();
142
143 // --- Network (accept/connect loop) ---
144 {
145 let state = state.clone();
146 let writer = writer.clone();
147 let rf = reforwarder.clone();
148 let network = config.network.clone();
149 tasks.spawn(async move {
150 if let Err(e) = run_network(&network, state, writer, rf).await {
151 tracing::error!("Network loop error: {}", e);
152 }
153 });
154 }
155
156 // --- Prometheus HTTP server ---
157 if let Some(ep) = config.has_prometheus.clone() {
158 let state = state.clone();
159 let labels = config.prometheus_labels.clone();
160 let no_suffix = config.metrics_no_suffix.unwrap_or(false);
161 tasks.spawn(async move {
162 if let Err(e) =
163 prometheus::run_prometheus_server(ep, state, labels, no_suffix).await
164 {
165 tracing::error!("Prometheus server error: {}", e);
166 }
167 });
168 }
169
170 // --- Log rotation ---
171 if let Some(rot) = config.rotation.clone() {
172 let writer = writer.clone();
173 let state = state.clone();
174 let logging = config.logging.clone();
175 tasks.spawn(async move {
176 run_rotation_loop(writer, state, rot, logging).await;
177 });
178 }
179
180 // Wait for any task to finish (normally they run forever)
181 tasks.join_next().await;
182 Ok(())
183 }
184}