pub mod acceptor;
pub mod config;
pub mod datapoint;
pub mod ekg;
pub mod logging;
pub mod node;
pub mod prometheus;
pub mod reforwarder;
pub mod rotation;
pub mod trace_handler;
use crate::forwarder::{ForwarderAddress, ForwarderConfig, TraceForwarder};
use crate::server::acceptor::run_network;
use crate::server::config::TracerConfig;
use crate::server::logging::LogWriter;
use crate::server::node::TracerState;
use crate::server::reforwarder::ReForwarder;
use crate::server::rotation::run_rotation_loop;
use std::sync::Arc;
use tracing::info;
pub struct TracerServer {
config: Arc<TracerConfig>,
state: Arc<TracerState>,
}
impl TracerServer {
pub fn new(config: TracerConfig) -> Self {
let config = Arc::new(config);
let state = Arc::new(TracerState::new(config.clone()));
TracerServer { config, state }
}
pub async fn run(self) -> anyhow::Result<()> {
info!("Starting hermod-tracer server");
let config = self.config.clone();
let state = self.state.clone();
let writer = Arc::new(LogWriter::new());
let reforwarder: Option<Arc<ReForwarder>> = if let Some(rf_cfg) = &config.has_forwarding {
match &rf_cfg.network {
crate::server::config::Network::AcceptAt(addr) => {
let fwd_address = match addr {
crate::server::config::Address::LocalPipe(p) => {
ForwarderAddress::Unix(p.clone())
}
crate::server::config::Address::RemoteSocket(host, port) => {
ForwarderAddress::Tcp(host.clone(), *port)
}
};
let fwd_config = ForwarderConfig {
address: fwd_address,
queue_size: rf_cfg.forwarder_opts.queue_size,
network_magic: config.network_magic as u64,
..Default::default()
};
let forwarder = TraceForwarder::new(fwd_config);
let handle = forwarder.handle();
tokio::spawn(async move {
let _ = forwarder.run().await;
});
Some(Arc::new(ReForwarder::new(
handle,
rf_cfg.namespace_filters.clone(),
)))
}
crate::server::config::Network::ConnectTo(addrs) => {
let capacity = rf_cfg.forwarder_opts.queue_size.max(1);
let (tx, _) = tokio::sync::broadcast::channel(capacity);
let rf = Arc::new(ReForwarder::new_inbound(
tx.clone(),
rf_cfg.namespace_filters.clone(),
));
let addrs = addrs.clone();
let network_magic = config.network_magic as u64;
tokio::spawn(async move {
crate::server::reforwarder::run_accepting_loop(&addrs, tx, network_magic)
.await;
});
Some(rf)
}
}
} else {
None
};
let mut tasks = tokio::task::JoinSet::new();
{
let state = state.clone();
let writer = writer.clone();
let rf = reforwarder.clone();
let network = config.network.clone();
tasks.spawn(async move {
if let Err(e) = run_network(&network, state, writer, rf).await {
tracing::error!("Network loop error: {}", e);
}
});
}
if let Some(ep) = config.has_prometheus.clone() {
let state = state.clone();
let labels = config.prometheus_labels.clone();
let no_suffix = config.metrics_no_suffix.unwrap_or(false);
tasks.spawn(async move {
if let Err(e) =
prometheus::run_prometheus_server(ep, state, labels, no_suffix).await
{
tracing::error!("Prometheus server error: {}", e);
}
});
}
if let Some(rot) = config.rotation.clone() {
let writer = writer.clone();
let state = state.clone();
let logging = config.logging.clone();
tasks.spawn(async move {
run_rotation_loop(writer, state, rot, logging).await;
});
}
tasks.join_next().await;
Ok(())
}
}