use std::sync::Arc;
use tokio::sync::{Mutex, mpsc};
use tokio::time::{Duration, interval};
use crate::config::Config;
use crate::correlate::Trace;
use crate::correlate::window::{TraceWindow, WindowConfig};
use crate::detect;
use crate::detect::DetectConfig;
use crate::event::SpanEvent;
use crate::normalize;
use crate::report::GreenSummary;
use crate::report::metrics::MetricsState;
use crate::score;
#[derive(Debug, thiserror::Error)]
pub enum DaemonError {
#[error("invalid listen address: {0}")]
InvalidAddr(#[from] std::net::AddrParseError),
#[error("failed to bind HTTP listener: {0}")]
HttpBind(std::io::Error),
#[error("failed to bind gRPC listener: {0}")]
GrpcBind(std::io::Error),
}
#[allow(clippy::too_many_lines)] pub async fn run(config: Config) -> Result<(), DaemonError> {
let (tx, mut rx) = mpsc::channel::<Vec<SpanEvent>>(1024);
let window = Arc::new(Mutex::new(TraceWindow::new(WindowConfig {
max_events_per_trace: config.max_events_per_trace,
trace_ttl_ms: config.trace_ttl_ms,
max_active_traces: config.max_active_traces,
})));
let max_payload = config.max_payload_size;
let metrics = Arc::new(MetricsState::new());
let grpc_addr: std::net::SocketAddr =
format!("{}:{}", config.listen_addr, config.listen_port_grpc).parse()?;
let http_addr: std::net::SocketAddr =
format!("{}:{}", config.listen_addr, config.listen_port).parse()?;
let http_listener = tokio::net::TcpListener::bind(http_addr)
.await
.map_err(DaemonError::HttpBind)?;
let grpc_incoming =
tonic::transport::server::TcpIncoming::bind(grpc_addr).map_err(DaemonError::GrpcBind)?;
let grpc_service = crate::ingest::otlp::OtlpGrpcService::new(tx.clone());
tokio::spawn(async move {
use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer;
tracing::info!("OTLP gRPC listening on {grpc_addr}");
if let Err(e) = tonic::transport::Server::builder()
.timeout(Duration::from_secs(60))
.add_service(
TraceServiceServer::new(grpc_service).max_decoding_message_size(max_payload),
)
.serve_with_incoming(grpc_incoming)
.await
{
tracing::error!("gRPC server error: {e}");
}
});
let otlp_router = crate::ingest::otlp::otlp_http_router(tx.clone(), max_payload);
let metrics_router = crate::report::metrics::metrics_route(metrics.clone());
let http_router = otlp_router.merge(metrics_router).layer(
tower::ServiceBuilder::new()
.layer(axum::error_handling::HandleErrorLayer::new(|_| async {
tracing::debug!("HTTP request timed out");
axum::http::StatusCode::REQUEST_TIMEOUT
}))
.layer(tower::timeout::TimeoutLayer::new(Duration::from_secs(60))),
);
tokio::spawn(async move {
tracing::info!("OTLP HTTP listening on {http_addr}");
if let Err(e) = axum::serve(http_listener, http_router).await {
tracing::error!("HTTP server error: {e}");
}
});
#[cfg(unix)]
{
let socket_path = config.json_socket.clone();
let socket_tx = tx.clone();
let max_payload = config.max_payload_size;
tokio::spawn(async move {
run_json_socket(&socket_path, socket_tx, max_payload).await;
});
}
#[cfg(not(unix))]
{
tracing::warn!("JSON socket ingestion not available on this platform; use OTLP HTTP/gRPC");
}
let detect_config = DetectConfig::from(&config);
let green_region = config.green_region.clone();
let green_enabled = config.green_enabled;
let sampling_rate = config.sampling_rate;
let evict_ms = config.trace_ttl_ms / 2;
let mut ticker = interval(Duration::from_millis(evict_ms.max(100)));
loop {
tokio::select! {
Some(events) = rx.recv() => {
let events = apply_sampling(events, sampling_rate);
let event_count = events.len();
let normalized: Vec<_> = events
.into_iter()
.map(normalize::normalize)
.collect();
let now_ms = current_time_ms();
let mut lru_evicted = Vec::new();
{
let mut w = window.lock().await;
for event in normalized {
if let Some(evicted) = w.push(event, now_ms) {
lru_evicted.push(evicted);
}
}
metrics.active_traces.set(w.active_traces() as f64);
}
metrics.events_processed_total.inc_by(event_count as f64);
if !lru_evicted.is_empty() {
process_traces(
lru_evicted,
&detect_config,
green_enabled,
green_region.as_deref(),
&metrics,
);
}
}
_ = ticker.tick() => {
let now_ms = current_time_ms();
let expired = {
let mut w = window.lock().await;
let expired = w.evict_expired(now_ms);
metrics.active_traces.set(w.active_traces() as f64);
expired
};
process_traces(
expired,
&detect_config,
green_enabled,
green_region.as_deref(),
&metrics,
);
}
_ = tokio::signal::ctrl_c() => {
tracing::info!("Shutting down daemon, processing remaining traces...");
let remaining = {
let mut w = window.lock().await;
w.drain_all()
};
process_traces(
remaining,
&detect_config,
green_enabled,
green_region.as_deref(),
&metrics,
);
break;
}
}
}
Ok(())
}
fn process_traces(
traces: Vec<(String, Vec<normalize::NormalizedEvent>)>,
detect_config: &DetectConfig,
green_enabled: bool,
green_region: Option<&str>,
metrics: &MetricsState,
) {
if traces.is_empty() {
return;
}
let trace_count = traces.len();
let trace_structs: Vec<Trace> = traces
.into_iter()
.map(|(trace_id, spans)| Trace { trace_id, spans })
.collect();
let findings = detect::detect(&trace_structs, detect_config);
let (findings, green_summary) = if green_enabled {
score::score_green(&trace_structs, findings, green_region)
} else {
let total_io_ops = trace_structs.iter().map(|t| t.spans.len()).sum();
(findings, GreenSummary::disabled(total_io_ops))
};
metrics.traces_analyzed_total.inc_by(trace_count as f64);
metrics
.total_io_ops
.inc_by(green_summary.total_io_ops as f64);
metrics
.avoidable_io_ops
.inc_by(green_summary.avoidable_io_ops as f64);
let cumulative_total = metrics.total_io_ops.get();
if cumulative_total > 0.0 {
metrics
.io_waste_ratio
.set(metrics.avoidable_io_ops.get() / cumulative_total);
}
metrics.record_exemplars(&findings, &green_summary);
{
use std::io::Write;
let stdout = std::io::stdout();
let mut lock = stdout.lock();
for finding in &findings {
metrics
.findings_total
.with_label_values(&[finding.finding_type.as_str(), finding.severity.as_str()])
.inc();
if serde_json::to_writer(&mut lock, finding).is_ok() {
let _ = writeln!(lock);
}
}
}
}
fn apply_sampling(events: Vec<SpanEvent>, rate: f64) -> Vec<SpanEvent> {
if rate >= 1.0 {
return events;
}
let mut cache = std::collections::HashMap::<String, bool>::new();
events
.into_iter()
.filter(|e| {
if let Some(&decision) = cache.get(e.trace_id.as_str()) {
return decision;
}
let decision = should_sample(&e.trace_id, rate);
cache.insert(e.trace_id.clone(), decision);
decision
})
.collect()
}
fn should_sample(trace_id: &str, rate: f64) -> bool {
if rate >= 1.0 {
return true;
}
if rate <= 0.0 {
return false;
}
let mut hash: u64 = 0xcbf2_9ce4_8422_2325;
for b in trace_id.as_bytes() {
hash ^= u64::from(*b);
hash = hash.wrapping_mul(0x0100_0000_01b3);
}
(hash as f64 / u64::MAX as f64) < rate
}
fn current_time_ms() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}
#[cfg(unix)]
async fn run_json_socket(path: &str, tx: mpsc::Sender<Vec<SpanEvent>>, max_payload_size: usize) {
use tokio::net::UnixListener;
let _ = std::fs::remove_file(path);
let listener = match UnixListener::bind(path) {
Ok(l) => l,
Err(e) => {
tracing::error!("Failed to bind Unix socket {path}: {e}");
return;
}
};
{
use std::os::unix::fs::PermissionsExt;
if let Err(e) = std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600)) {
tracing::error!(
"Failed to set socket permissions on {path}: {e} — refusing to listen on insecure socket"
);
let _ = std::fs::remove_file(path);
return;
}
}
tracing::info!("JSON socket listening on {path}");
let semaphore = Arc::new(tokio::sync::Semaphore::new(128));
loop {
match listener.accept().await {
Ok((stream, _)) => {
let tx = tx.clone();
let Ok(permit) = semaphore.clone().acquire_owned().await else {
break; };
tokio::spawn(async move {
handle_json_connection(stream, tx, max_payload_size).await;
drop(permit);
});
}
Err(e) => {
tracing::error!("Unix socket accept error: {e}");
}
}
}
}
#[cfg(unix)]
async fn handle_json_connection(
stream: tokio::net::UnixStream,
tx: mpsc::Sender<Vec<SpanEvent>>,
max_payload_size: usize,
) {
use tokio::io::{AsyncBufReadExt, AsyncReadExt};
const CONNECTION_LIMIT_FACTOR: u64 = 16;
let limited = stream.take(max_payload_size as u64 * CONNECTION_LIMIT_FACTOR);
let reader = tokio::io::BufReader::new(limited);
let mut lines = reader.lines();
let ingest = crate::ingest::json::JsonIngest::new(max_payload_size);
while let Ok(Some(line)) = lines.next_line().await {
if line.len() > max_payload_size {
tracing::warn!("JSON socket: line exceeds max payload size, skipping");
continue;
}
match crate::ingest::IngestSource::ingest(&ingest, line.as_bytes()) {
Ok(events) if !events.is_empty() => {
if tx.send(events).await.is_err() {
tracing::warn!("JSON socket: event channel closed");
break;
}
}
Ok(_) => {}
Err(e) => {
tracing::debug!("JSON socket: failed to parse line: {e}");
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::correlate::window::WindowConfig;
use crate::event::{EventSource, EventType, SpanEvent};
use crate::normalize;
fn make_normalized(trace_id: &str, target: &str) -> normalize::NormalizedEvent {
normalize::normalize(SpanEvent {
timestamp: "2025-07-10T14:32:01.123Z".to_string(),
trace_id: trace_id.to_string(),
span_id: "s1".to_string(),
parent_span_id: None,
service: "test".to_string(),
event_type: EventType::Sql,
operation: "SELECT".to_string(),
target: target.to_string(),
duration_us: 100,
source: EventSource {
endpoint: "GET /test".to_string(),
method: "Test::test".to_string(),
},
status_code: None,
})
}
fn default_detect_config() -> DetectConfig {
DetectConfig {
n_plus_one_threshold: 5,
window_ms: 500,
slow_threshold_ms: 500,
slow_min_occurrences: 3,
max_fanout: 20,
}
}
#[test]
fn process_traces_empty_does_nothing() {
let metrics = MetricsState::new();
process_traces(vec![], &default_detect_config(), true, None, &metrics);
}
#[test]
fn process_traces_with_n_plus_one() {
let events: Vec<_> = (1..=6)
.map(|i| {
make_normalized(
"t1",
&format!("SELECT * FROM order_item WHERE order_id = {i}"),
)
})
.collect();
let metrics = MetricsState::new();
process_traces(
vec![("t1".to_string(), events)],
&default_detect_config(),
true,
None,
&metrics,
);
}
#[test]
fn process_traces_clean_no_finding() {
let events = vec![
make_normalized("t1", "SELECT * FROM users WHERE id = 1"),
make_normalized("t1", "SELECT * FROM orders WHERE id = 2"),
];
let metrics = MetricsState::new();
process_traces(
vec![("t1".to_string(), events)],
&default_detect_config(),
true,
None,
&metrics,
);
}
#[test]
fn current_time_ms_returns_nonzero() {
let ms = current_time_ms();
assert!(ms > 0, "current_time_ms should return a positive value");
}
#[test]
fn evict_expired_returns_traces() {
let config = WindowConfig {
trace_ttl_ms: 100,
..Default::default()
};
let mut w = TraceWindow::new(config);
let event = normalize::normalize(SpanEvent {
timestamp: "2025-07-10T14:32:01.123Z".to_string(),
trace_id: "t1".to_string(),
span_id: "s1".to_string(),
parent_span_id: None,
service: "test".to_string(),
event_type: EventType::Sql,
operation: "SELECT".to_string(),
target: "SELECT 1".to_string(),
duration_us: 100,
source: EventSource {
endpoint: "GET /test".to_string(),
method: "Test::test".to_string(),
},
status_code: None,
});
w.push(event, 0);
assert_eq!(w.active_traces(), 1);
let expired = w.evict_expired(50);
assert!(expired.is_empty());
assert_eq!(w.active_traces(), 1);
let expired = w.evict_expired(150);
assert_eq!(expired.len(), 1);
assert_eq!(expired[0].0, "t1");
assert_eq!(expired[0].1.len(), 1);
assert_eq!(w.active_traces(), 0);
}
#[test]
fn process_traces_updates_metrics() {
let events: Vec<_> = (1..=6)
.map(|i| {
make_normalized(
"t1",
&format!("SELECT * FROM order_item WHERE order_id = {i}"),
)
})
.collect();
let metrics = MetricsState::new();
process_traces(
vec![("t1".to_string(), events)],
&default_detect_config(),
true,
None,
&metrics,
);
let output = metrics.render();
assert!(output.contains("perf_sentinel_traces_analyzed_total"));
assert!(output.contains("perf_sentinel_findings_total"));
}
#[test]
fn process_traces_green_disabled() {
let events: Vec<_> = (1..=6)
.map(|i| {
make_normalized(
"t1",
&format!("SELECT * FROM order_item WHERE order_id = {i}"),
)
})
.collect();
let metrics = MetricsState::new();
process_traces(
vec![("t1".to_string(), events)],
&default_detect_config(),
false, None,
&metrics,
);
assert!((metrics.avoidable_io_ops.get() - 0.0).abs() < f64::EPSILON);
assert!(metrics.total_io_ops.get() > 0.0);
}
#[test]
fn should_sample_deterministic() {
let r1 = should_sample("trace-abc-123", 0.5);
let r2 = should_sample("trace-abc-123", 0.5);
assert_eq!(r1, r2);
}
#[test]
fn should_sample_rate_zero_drops_all() {
assert!(!should_sample("any-trace", 0.0));
assert!(!should_sample("another-trace", 0.0));
}
#[test]
fn should_sample_rate_one_keeps_all() {
assert!(should_sample("any-trace", 1.0));
assert!(should_sample("another-trace", 1.0));
}
#[test]
fn should_sample_rate_half_splits() {
let sampled = (0..1000)
.filter(|i| should_sample(&format!("trace-{i}"), 0.5))
.count();
assert!(
(300..=700).contains(&sampled),
"expected ~500 sampled, got {sampled}"
);
}
}