Skip to main content

hermod/server/
mod.rs

1//! Full `hermod-tracer` server implementation
2//!
3//! This module provides [`TracerServer`], which accepts trace connections from
4//! Cardano nodes and routes them to file logs, Prometheus metrics, EKG polling,
5//! and optional re-forwarding — feature-for-feature with the Haskell
6//! `cardano-tracer` (excluding RTView and email alerts).
7//!
8//! # Architecture
9//!
10//! ```text
11//! ┌─────────────────────────────────────────────────────────┐
12//! │                     TracerServer                         │
13//! │                                                          │
14//! │  ┌──────────┐   ┌──────────────┐   ┌─────────────────┐ │
15//! │  │ Network  │   │  Prometheus  │   │ Log rotation    │ │
16//! │  │ acceptor │   │  HTTP server │   │ background task │ │
17//! │  └────┬─────┘   └──────────────┘   └─────────────────┘ │
18//! │       │ per-connection                                   │
19//! │  ┌────▼──────────────────────────────────────────────┐  │
20//! │  │  handle_connection (one task per node)             │  │
21//! │  │   ├─ trace loop  →  LogWriter + ReForwarder        │  │
22//! │  │   ├─ EKG poller  →  NodeState::registry            │  │
23//! │  │   └─ DataPoint idle (keeps channel alive)          │  │
24//! │  └────────────────────────────────────────────────────┘  │
25//! └─────────────────────────────────────────────────────────┘
26//! ```
27//!
28//! # Usage
29//!
30//! Load a [`config::TracerConfig`] from a YAML file and pass it to
31//! [`TracerServer::new`], then `.await` [`TracerServer::run`]:
32//!
33//! ```no_run
34//! use hermod::server::{TracerServer, config::TracerConfig};
35//!
36//! #[tokio::main]
37//! async fn main() -> anyhow::Result<()> {
38//!     let config = TracerConfig::from_file("hermod-tracer.yaml".as_ref())?;
39//!     TracerServer::new(config).run().await
40//! }
41//! ```
42//!
43//! See `config/hermod-tracer.yaml` in the repository for a fully-annotated
44//! example configuration.
45
46pub mod acceptor;
47pub mod config;
48pub mod datapoint;
49pub mod ekg;
50pub mod logging;
51pub mod node;
52pub mod prometheus;
53pub mod reforwarder;
54pub mod rotation;
55pub mod trace_handler;
56
57use crate::forwarder::{ForwarderAddress, ForwarderConfig, TraceForwarder};
58use crate::server::acceptor::run_network;
59use crate::server::config::TracerConfig;
60use crate::server::logging::LogWriter;
61use crate::server::node::TracerState;
62use crate::server::reforwarder::ReForwarder;
63use crate::server::rotation::run_rotation_loop;
64use std::sync::Arc;
65use tracing::info;
66
67/// The top-level tracer server
68pub struct TracerServer {
69    config: Arc<TracerConfig>,
70    state: Arc<TracerState>,
71}
72
73impl TracerServer {
74    /// Create a new server from the given config
75    pub fn new(config: TracerConfig) -> Self {
76        let config = Arc::new(config);
77        let state = Arc::new(TracerState::new(config.clone()));
78        TracerServer { config, state }
79    }
80
81    /// Run until cancelled
82    pub async fn run(self) -> anyhow::Result<()> {
83        info!("Starting hermod-tracer server");
84
85        let config = self.config.clone();
86        let state = self.state.clone();
87        let writer = Arc::new(LogWriter::new());
88
89        // --- Re-forwarder ---
90        let reforwarder: Option<Arc<ReForwarder>> = if let Some(rf_cfg) = &config.has_forwarding {
91            match &rf_cfg.network {
92                crate::server::config::Network::AcceptAt(addr) => {
93                    // hermod-tracer acts as the trace-forward FORWARDER, connecting
94                    // out to the downstream acceptor's socket.
95                    let fwd_address = match addr {
96                        crate::server::config::Address::LocalPipe(p) => {
97                            ForwarderAddress::Unix(p.clone())
98                        }
99                        crate::server::config::Address::RemoteSocket(host, port) => {
100                            ForwarderAddress::Tcp(host.clone(), *port)
101                        }
102                    };
103                    let fwd_config = ForwarderConfig {
104                        address: fwd_address,
105                        queue_size: rf_cfg.forwarder_opts.queue_size,
106                        network_magic: config.network_magic as u64,
107                        ..Default::default()
108                    };
109                    let forwarder = TraceForwarder::new(fwd_config);
110                    let handle = forwarder.handle();
111                    tokio::spawn(async move {
112                        let _ = forwarder.run().await;
113                    });
114                    Some(Arc::new(ReForwarder::new(
115                        handle,
116                        rf_cfg.namespace_filters.clone(),
117                    )))
118                }
119                crate::server::config::Network::ConnectTo(addrs) => {
120                    // hermod-tracer listens; downstream acceptors connect to it.
121                    // We broadcast forwarded traces to all connected downstreams.
122                    let capacity = rf_cfg.forwarder_opts.queue_size.max(1);
123                    let (tx, _) = tokio::sync::broadcast::channel(capacity);
124                    let rf = Arc::new(ReForwarder::new_inbound(
125                        tx.clone(),
126                        rf_cfg.namespace_filters.clone(),
127                    ));
128                    let addrs = addrs.clone();
129                    let network_magic = config.network_magic as u64;
130                    tokio::spawn(async move {
131                        crate::server::reforwarder::run_accepting_loop(&addrs, tx, network_magic)
132                            .await;
133                    });
134                    Some(rf)
135                }
136            }
137        } else {
138            None
139        };
140
141        let mut tasks = tokio::task::JoinSet::new();
142
143        // --- Network (accept/connect loop) ---
144        {
145            let state = state.clone();
146            let writer = writer.clone();
147            let rf = reforwarder.clone();
148            let network = config.network.clone();
149            tasks.spawn(async move {
150                if let Err(e) = run_network(&network, state, writer, rf).await {
151                    tracing::error!("Network loop error: {}", e);
152                }
153            });
154        }
155
156        // --- Prometheus HTTP server ---
157        if let Some(ep) = config.has_prometheus.clone() {
158            let state = state.clone();
159            let labels = config.prometheus_labels.clone();
160            let no_suffix = config.metrics_no_suffix.unwrap_or(false);
161            tasks.spawn(async move {
162                if let Err(e) =
163                    prometheus::run_prometheus_server(ep, state, labels, no_suffix).await
164                {
165                    tracing::error!("Prometheus server error: {}", e);
166                }
167            });
168        }
169
170        // --- Log rotation ---
171        if let Some(rot) = config.rotation.clone() {
172            let writer = writer.clone();
173            let state = state.clone();
174            let logging = config.logging.clone();
175            tasks.spawn(async move {
176                run_rotation_loop(writer, state, rot, logging).await;
177            });
178        }
179
180        // Wait for any task to finish (normally they run forever)
181        tasks.join_next().await;
182        Ok(())
183    }
184}