use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::time::Instant;
use arc_swap::ArcSwap;
use axum::extract::State;
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use axum::routing::{delete, get, post};
use axum::{Json, Router};
use rsigma_eval::{CorrelationConfig, MatchDetailLevel, Pipeline, ProcessResult};
use rsigma_runtime::{
AckToken, EnrichmentPipeline, FieldObserver, FileSink, InputFormat, LogProcessor, MetricsHook,
RawEvent, RuntimeEngine, Sink, StdinSource, StdoutSink, spawn_source,
};
use serde::Serialize;
use tokio::sync::mpsc;
use tower_http::trace::TraceLayer;
#[cfg(feature = "daemon-otlp")]
use tracing::Instrument;
#[derive(Serialize)]
struct DlqEntry {
original_event: String,
error: String,
timestamp: String,
}
use super::health::HealthState;
use super::metrics::Metrics;
use super::reload;
use super::store::{SourcePosition, SqliteStateStore};
use crate::EventFilter;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StateRestoreMode {
Auto,
ForceClear,
ForceKeep,
}
#[derive(Clone)]
struct AppState {
processor: Arc<LogProcessor>,
metrics: Arc<Metrics>,
health: HealthState,
reload_tx: mpsc::Sender<()>,
start_time: Instant,
event_tx: Option<mpsc::Sender<RawEvent>>,
sources_trigger_tx: Option<mpsc::Sender<rsigma_runtime::sources::refresh::RefreshTrigger>>,
source_resolver: Option<Arc<super::instrumented_resolver::InstrumentedResolver>>,
#[cfg(feature = "daemon-otlp")]
otlp_event_tx: mpsc::Sender<RawEvent>,
source_registry: Arc<rsigma_runtime::sources::registry::DaemonSourceRegistry>,
field_observer: Option<Arc<FieldObserver>>,
}
#[derive(Clone)]
pub struct DaemonConfig {
pub rules_path: PathBuf,
pub pipelines: Vec<Pipeline>,
pub pipeline_paths: Vec<PathBuf>,
pub corr_config: CorrelationConfig,
pub include_event: bool,
pub pretty: bool,
pub api_addr: SocketAddr,
pub event_filter: Arc<EventFilter>,
pub state_db: Option<PathBuf>,
pub state_save_interval: u64,
pub input: String,
pub output: Vec<String>,
pub buffer_size: usize,
pub batch_size: usize,
pub dlq: Option<String>,
#[cfg(feature = "daemon-nats")]
pub nats_config: rsigma_runtime::NatsConnectConfig,
#[cfg(feature = "daemon-nats")]
pub replay_policy: rsigma_runtime::ReplayPolicy,
#[cfg(feature = "daemon-nats")]
pub consumer_group: Option<String>,
pub state_restore_mode: StateRestoreMode,
pub drain_timeout: u64,
pub input_format: InputFormat,
pub allow_remote_include: bool,
pub bloom_prefilter: bool,
pub match_detail: MatchDetailLevel,
pub bloom_max_bytes: Option<usize>,
pub observe_fields: bool,
pub observe_fields_max_keys: usize,
#[cfg(feature = "daachorse-index")]
pub cross_rule_ac: bool,
pub enrichers_path: Option<PathBuf>,
pub source_registry: rsigma_runtime::sources::registry::DaemonSourceRegistry,
#[cfg(feature = "daemon-tls")]
pub tls_state: Option<super::tls::TlsState>,
}
pub async fn run_daemon(config: DaemonConfig) {
let metrics = Arc::new(Metrics::new());
let health = HealthState::new();
let enrichment_metrics = metrics.clone() as Arc<dyn rsigma_runtime::MetricsHook>;
let enrichment_swap: Arc<ArcSwap<Option<Arc<EnrichmentPipeline>>>> =
Arc::new(ArcSwap::new(Arc::new(None)));
let state_store = config.state_db.as_ref().map(|path| {
let store = SqliteStateStore::open(path).unwrap_or_else(|e| {
tracing::error!(error = %e, path = %path.display(), "Failed to open state database");
std::process::exit(crate::exit_code::CONFIG_ERROR);
});
tracing::info!(path = %path.display(), "State database opened");
Arc::new(store)
});
let mut engine = RuntimeEngine::new(
config.rules_path.clone(),
config.pipelines.clone(),
config.corr_config.clone(),
config.include_event,
);
engine.set_pipeline_paths(config.pipeline_paths.clone());
engine.set_allow_remote_include(config.allow_remote_include);
engine.set_match_detail(config.match_detail);
engine.set_bloom_prefilter(config.bloom_prefilter);
if let Some(budget) = config.bloom_max_bytes {
engine.set_bloom_max_bytes(budget);
}
#[cfg(feature = "daachorse-index")]
engine.set_cross_rule_ac(config.cross_rule_ac);
let has_dynamic =
!config.source_registry.is_empty() || config.pipelines.iter().any(|p| p.is_dynamic());
let mut sources_trigger_tx_val: Option<
mpsc::Sender<rsigma_runtime::sources::refresh::RefreshTrigger>,
> = None;
let mut source_resolver_val: Option<Arc<super::instrumented_resolver::InstrumentedResolver>> =
None;
if has_dynamic {
let instrumented = Arc::new(super::instrumented_resolver::InstrumentedResolver::new(
metrics.clone(),
));
source_resolver_val = Some(instrumented.clone());
let resolver: Arc<dyn rsigma_runtime::sources::SourceResolver> = instrumented;
engine.set_source_resolver(resolver.clone());
if let Err(e) = engine.resolve_dynamic_pipelines().await {
tracing::error!(error = %e, "Failed to resolve required dynamic sources at startup");
std::process::exit(crate::exit_code::CONFIG_ERROR);
}
let registry_only_sources: Vec<_> = config
.source_registry
.entries()
.iter()
.filter(|e| {
matches!(
e.origin,
rsigma_runtime::sources::registry::SourceOrigin::External(_)
)
})
.map(|e| e.source.clone())
.collect();
if !registry_only_sources.is_empty() {
match rsigma_runtime::sources::resolve_all(&*resolver, ®istry_only_sources).await {
Ok(_) => {
tracing::info!(
count = registry_only_sources.len(),
"External sources resolved at startup"
);
}
Err(e) => {
tracing::error!(error = %e, "Failed to resolve required external sources at startup");
std::process::exit(crate::exit_code::CONFIG_ERROR);
}
}
}
let all_sources: Vec<_> = config
.source_registry
.sources()
.into_iter()
.cloned()
.collect();
if !all_sources.is_empty() {
let scheduler = rsigma_runtime::sources::refresh::RefreshScheduler::new();
sources_trigger_tx_val = Some(scheduler.trigger_sender());
#[cfg(feature = "daemon-nats")]
{
let nats_url = config.nats_config.url.clone();
let trigger_tx = scheduler.trigger_sender();
tokio::spawn(async move {
let subject = rsigma_runtime::sources::refresh::NATS_CONTROL_SUBJECT;
if let Err(e) = rsigma_runtime::sources::refresh::nats_control_loop(
&nats_url, subject, trigger_tx,
)
.await
{
tracing::warn!(
error = %e,
"NATS control subject listener failed"
);
}
});
}
let optional_source_ids: Vec<String> = all_sources
.iter()
.filter(|s| !s.required)
.map(|s| s.id.clone())
.collect();
let bg_trigger_tx = scheduler.trigger_sender();
scheduler.run(all_sources, resolver);
if !optional_source_ids.is_empty() {
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
for id in optional_source_ids {
let _ = bg_trigger_tx
.send(rsigma_runtime::sources::refresh::RefreshTrigger::Single(id))
.await;
}
});
}
}
}
let processor = Arc::new(LogProcessor::new(engine, metrics.clone()));
match processor.reload_rules() {
Ok(stats) => {
tracing::info!(
detection_rules = stats.detection_rules,
correlation_rules = stats.correlation_rules,
path = %config.rules_path.display(),
"Rules loaded"
);
metrics
.detection_rules_loaded
.set(stats.detection_rules as i64);
metrics
.correlation_rules_loaded
.set(stats.correlation_rules as i64);
health.set_ready(true);
}
Err(e) => {
tracing::error!(error = %e, "Failed to load initial rules");
std::process::exit(crate::exit_code::RULE_ERROR);
}
}
if let Some(ref store) = state_store {
match store.load().await {
Ok(Some((snapshot, stored_position))) => {
let should_restore = decide_state_restore(
config.state_restore_mode,
stored_position,
#[cfg(feature = "daemon-nats")]
&config.replay_policy,
);
if should_restore {
if processor.import_state(&snapshot) {
let entries = snapshot.windows.values().map(|g| g.len()).sum::<usize>();
tracing::info!(
state_entries = entries,
"Correlation state restored from database"
);
} else {
tracing::warn!(
snapshot_version = snapshot.version,
"Incompatible snapshot version, starting with fresh state"
);
}
} else {
tracing::info!("Correlation state cleared (not restoring)");
}
}
Ok(None) => {
tracing::info!("No previous correlation state found in database");
}
Err(e) => {
tracing::warn!(error = %e, "Failed to load state from database, starting fresh");
}
}
}
let initial_source_cache = source_resolver_val.as_ref().map(|r| r.arc_cache());
if let Some(path) = config.enrichers_path.as_ref() {
match super::enrichment::load_enrichers_file(path).and_then(|file| {
super::enrichment::build_enrichers_full(
file,
initial_source_cache.clone(),
enrichment_metrics.clone(),
)
}) {
Ok(p) => {
tracing::info!(
enrichers = p.len(),
path = %path.display(),
"Enrichment pipeline loaded"
);
enrichment_swap.store(Arc::new(Some(Arc::new(p))));
}
Err(e) => {
tracing::error!(error = %e, "Failed to build enrichment pipeline");
std::process::exit(crate::exit_code::CONFIG_ERROR);
}
}
}
let (reload_tx, mut reload_rx) = mpsc::channel::<()>(4);
let pipeline_watch_paths: Vec<&std::path::Path> =
config.pipeline_paths.iter().map(|p| p.as_path()).collect();
let _watcher = if config.rules_path.is_dir() {
reload::spawn_file_watcher(&config.rules_path, &pipeline_watch_paths, reload_tx.clone())
} else {
reload::spawn_file_watcher(
config.rules_path.parent().unwrap_or(&config.rules_path),
&pipeline_watch_paths,
reload_tx.clone(),
)
};
let start_time = Instant::now();
let buffer_size = config.buffer_size;
let (event_tx, mut event_rx) = mpsc::channel::<RawEvent>(buffer_size);
let http_event_tx = if config.input == "http" {
Some(event_tx.clone())
} else {
None
};
#[cfg(feature = "daemon-otlp")]
let otlp_event_tx = event_tx.clone();
let field_observer = if config.observe_fields {
let observer = Arc::new(FieldObserver::new(config.observe_fields_max_keys));
processor.set_field_observer(Some(observer.clone()));
tracing::info!(
max_keys = config.observe_fields_max_keys,
"Field observer enabled"
);
Some(observer)
} else {
None
};
let app_state = AppState {
processor: processor.clone(),
metrics: metrics.clone(),
health: health.clone(),
reload_tx: reload_tx.clone(),
start_time,
event_tx: http_event_tx,
sources_trigger_tx: sources_trigger_tx_val.clone(),
source_resolver: source_resolver_val,
#[cfg(feature = "daemon-otlp")]
otlp_event_tx,
source_registry: Arc::new(config.source_registry.clone()),
field_observer: field_observer.clone(),
};
let app = Router::new()
.route("/healthz", get(healthz))
.route("/readyz", get(readyz))
.route("/metrics", get(metrics_handler))
.route("/api/v1/rules", get(list_rules))
.route("/api/v1/status", get(status))
.route("/api/v1/reload", post(trigger_reload))
.route("/api/v1/events", post(ingest_events))
.route("/api/v1/sources", get(list_sources))
.route("/api/v1/sources/resolve", post(resolve_sources))
.route(
"/api/v1/sources/resolve/{source_id}",
post(resolve_source_by_id),
)
.route(
"/api/v1/sources/cache/{source_id}",
delete(invalidate_source_cache),
)
.route("/api/v1/fields", get(fields_full))
.route("/api/v1/fields/unknown", get(fields_unknown))
.route("/api/v1/fields/missing", get(fields_missing))
.route("/api/v1/fields/observer", delete(fields_observer_reset));
#[cfg(feature = "daemon-otlp")]
let app = app.route("/v1/logs", post(otlp_http_logs));
let app = app.layer(TraceLayer::new_for_http()).with_state(app_state);
let listener = tokio::net::TcpListener::bind(config.api_addr)
.await
.unwrap_or_else(|e| {
tracing::error!(addr = %config.api_addr, error = %e, "Failed to bind API server");
std::process::exit(crate::exit_code::CONFIG_ERROR);
});
let actual_addr = listener.local_addr().unwrap_or(config.api_addr);
let mut shutdown_fut: Option<std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>> = {
#[cfg(unix)]
{
use tokio::signal::unix::{SignalKind, signal};
let sigint = signal(SignalKind::interrupt()).expect("install SIGINT handler");
let sigterm = signal(SignalKind::terminate()).expect("install SIGTERM handler");
Some(Box::pin(shutdown_signal(sigint, sigterm)))
}
#[cfg(not(unix))]
{
Some(Box::pin(shutdown_signal()))
}
};
#[cfg(feature = "daemon-otlp")]
let otlp_routes = {
let grpc_service = rsigma_runtime::LogsServiceServer::new(OtlpLogsGrpcService {
event_tx: event_tx.clone(),
metrics: metrics.clone(),
})
.accept_compressed(tonic::codec::CompressionEncoding::Gzip)
.send_compressed(tonic::codec::CompressionEncoding::Gzip);
tonic::service::Routes::from(app).add_service(grpc_service)
};
#[cfg(feature = "daemon-tls")]
let tls_state = config.tls_state.clone();
#[cfg(feature = "daemon-tls")]
let tls_enabled = tls_state.is_some();
#[cfg(not(feature = "daemon-tls"))]
let tls_enabled = false;
#[cfg(feature = "daemon-tls")]
if let Some(ref state) = tls_state {
update_tls_metrics(&metrics, state.expiry_unix.load(Ordering::Relaxed));
warn_if_cert_expiring_soon(state.expiry_unix.load(Ordering::Relaxed));
}
#[cfg(feature = "daemon-otlp")]
if tls_enabled {
tracing::info!(addr = %actual_addr, "API server listening (HTTPS, HTTP/2 + gRPC)");
} else {
tracing::info!(addr = %actual_addr, "API server listening (HTTP/2 + gRPC)");
}
#[cfg(not(feature = "daemon-otlp"))]
if tls_enabled {
tracing::info!(addr = %actual_addr, "API server listening (HTTPS)");
} else {
tracing::info!(addr = %actual_addr, "API server listening");
}
let sighup_reload_tx = reload_tx.clone();
let sighup_sources_tx = sources_trigger_tx_val.clone();
tokio::spawn(async move {
reload::sighup_listener(sighup_reload_tx, sighup_sources_tx).await;
});
let reload_processor = processor.clone();
let reload_metrics = metrics.clone();
let reload_health = health.clone();
let reload_enrichment_swap = enrichment_swap.clone();
let reload_enrichers_path = config.enrichers_path.clone();
let reload_enrichment_metrics = enrichment_metrics.clone();
let reload_source_cache = initial_source_cache.clone();
#[cfg(feature = "daemon-tls")]
let reload_tls_state = tls_state.clone();
#[cfg(feature = "daemon-tls")]
let reload_tls_metrics = metrics.clone();
tokio::spawn(async move {
while reload_rx.recv().await.is_some() {
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
while reload_rx.try_recv().is_ok() {}
reload_metrics.reloads_total.inc();
tracing::info!("Reloading rules and pipelines...");
match reload_processor.reload_rules() {
Ok(stats) => {
tracing::info!(
detection_rules = stats.detection_rules,
correlation_rules = stats.correlation_rules,
path = %reload_processor.rules_path().display(),
"Rules and pipelines reloaded"
);
reload_metrics
.detection_rules_loaded
.set(stats.detection_rules as i64);
reload_metrics
.correlation_rules_loaded
.set(stats.correlation_rules as i64);
reload_health.set_ready(true);
}
Err(e) => {
tracing::error!(error = %e, "Failed to reload rules");
reload_metrics.reloads_failed.inc();
}
}
if let Some(path) = reload_enrichers_path.as_deref() {
match super::enrichment::load_enrichers_file(path).and_then(|file| {
super::enrichment::build_enrichers_full(
file,
reload_source_cache.clone(),
reload_enrichment_metrics.clone(),
)
}) {
Ok(new_pipeline) => {
let prev_count = reload_enrichment_swap
.load()
.as_ref()
.as_ref()
.map(|p| p.len())
.unwrap_or(0);
let new_count = new_pipeline.len();
reload_enrichment_swap.store(Arc::new(Some(Arc::new(new_pipeline))));
tracing::info!(
previous = prev_count,
current = new_count,
path = %path.display(),
"Enrichment pipeline reloaded"
);
}
Err(e) => {
tracing::error!(
error = %e,
path = %path.display(),
"Failed to reload enrichers config; keeping previous pipeline"
);
reload_metrics.reloads_failed.inc();
}
}
}
#[cfg(feature = "daemon-tls")]
if let Some(ref state) = reload_tls_state {
match state.reload() {
Ok(new_expiry) => {
update_tls_metrics(&reload_tls_metrics, new_expiry);
warn_if_cert_expiring_soon(new_expiry);
tracing::info!(not_after = new_expiry, "TLS certificate hot-reloaded");
}
Err(e) => {
tracing::error!(
error = %e,
"Failed to reload TLS certificate; keeping previous one active"
);
reload_metrics.reloads_failed.inc();
}
}
}
}
});
let high_water_seq = Arc::new(AtomicU64::new(0));
let high_water_ts = Arc::new(AtomicI64::new(0));
if let Some(ref store) = state_store {
let save_processor = processor.clone();
let save_store = store.clone();
let save_interval_secs = config.state_save_interval;
let save_hw_seq = high_water_seq.clone();
let save_hw_ts = high_water_ts.clone();
tokio::spawn(async move {
let mut interval =
tokio::time::interval(tokio::time::Duration::from_secs(save_interval_secs));
interval.tick().await; loop {
interval.tick().await;
if let Some(snapshot) = save_processor.export_state() {
let position = source_position_from_atomics(&save_hw_seq, &save_hw_ts);
let snapshot_size = serde_json::to_vec(&snapshot).map(|v| v.len()).unwrap_or(0);
let window_count = snapshot.windows.len();
let save_start = std::time::Instant::now();
if let Err(e) = save_store.save(&snapshot, position.as_ref()).await {
tracing::warn!(
error = %e,
size_bytes = snapshot_size,
windows = window_count,
"Failed to save periodic state snapshot",
);
} else {
tracing::debug!(
duration_ms = save_start.elapsed().as_millis() as u64,
size_bytes = snapshot_size,
windows = window_count,
"Periodic state snapshot saved",
);
}
}
}
});
}
let (sink_tx, mut sink_rx) = mpsc::channel::<(ProcessResult, Vec<AckToken>)>(buffer_size);
let (ack_tx, mut ack_rx) = mpsc::channel::<AckToken>(buffer_size);
#[cfg_attr(not(feature = "daemon-otlp"), allow(unused_mut))]
let mut source_handle: Option<tokio::task::JoinHandle<()>> = match config.input.as_str() {
"stdin" | "stdin://" => {
let h = spawn_source(
StdinSource::new(),
event_tx,
Some(metrics.clone() as Arc<dyn rsigma_runtime::MetricsHook>),
);
tracing::info!(input = "stdin", "Event source started");
Some(h)
}
"http" => {
drop(event_tx);
tracing::info!(input = "http", "Event source started (POST /api/v1/events)");
None
}
#[cfg(feature = "daemon-nats")]
input if input.starts_with("nats://") => {
let (url, subject) = parse_nats_url(input);
let mut nats_cfg = config.nats_config.clone();
nats_cfg.url = url.clone();
match rsigma_runtime::NatsSource::connect(
&nats_cfg,
&subject,
&config.replay_policy,
config.consumer_group.as_deref(),
)
.await
{
Ok(source) => {
let h = spawn_source(
source,
event_tx,
Some(metrics.clone() as Arc<dyn rsigma_runtime::MetricsHook>),
);
tracing::info!(url = url, subject = subject, "NATS source started");
Some(h)
}
Err(e) => {
tracing::error!(error = %e, url = url, "Failed to connect NATS source");
std::process::exit(crate::exit_code::CONFIG_ERROR);
}
}
}
other => {
tracing::error!(
input = other,
"Unsupported input source (supported: stdin, http, nats://)"
);
std::process::exit(crate::exit_code::CONFIG_ERROR);
}
};
let (dlq_tx, mut dlq_rx) = mpsc::channel::<DlqEntry>(buffer_size);
let dlq_sink = if let Some(ref dlq_spec) = config.dlq {
let sink = build_sink(dlq_spec, false, &config).await;
tracing::info!(dlq = dlq_spec, "Dead-letter queue enabled");
Some(sink)
} else {
None
};
#[cfg(feature = "daemon-otlp")]
let source_done_notify = std::sync::Arc::new(tokio::sync::Notify::new());
#[cfg(feature = "daemon-otlp")]
if let Some(h) = source_handle.take() {
let done = source_done_notify.clone();
tokio::spawn(async move {
let _ = h.await;
done.notify_one();
});
}
let engine_processor = processor.clone();
let engine_metrics = metrics.clone();
let event_filter = config.event_filter.clone();
let batch_size = config.batch_size;
let input_format = config.input_format.clone();
let engine_ack_tx = ack_tx.clone();
let engine_dlq_tx = dlq_tx.clone();
let dlq_enabled = config.dlq.is_some();
#[cfg(feature = "daemon-otlp")]
let engine_source_done = source_done_notify.clone();
let engine_handle = tokio::spawn(async move {
let filter_fn = move |v: &serde_json::Value| crate::apply_event_filter(v, &event_filter);
#[cfg(feature = "daemon-otlp")]
let source_done = engine_source_done;
#[cfg(feature = "daemon-otlp")]
let mut source_finished = false;
loop {
let pipeline_start = std::time::Instant::now();
let first = {
#[cfg(feature = "daemon-otlp")]
{
if source_finished {
match event_rx.recv().await {
Some(e) => e,
None => break,
}
} else {
tokio::select! {
event = event_rx.recv() => match event {
Some(e) => e,
None => break,
},
_ = source_done.notified() => {
source_finished = true;
event_rx.close();
match event_rx.recv().await {
Some(e) => e,
None => break,
}
}
}
}
}
#[cfg(not(feature = "daemon-otlp"))]
match event_rx.recv().await {
Some(raw_event) => raw_event,
None => break,
}
};
engine_metrics.on_input_queue_depth_change(-1);
let mut batch = Vec::with_capacity(batch_size.min(64));
batch.push(first);
while batch.len() < batch_size {
match event_rx.try_recv() {
Ok(raw_event) => {
engine_metrics.on_input_queue_depth_change(-1);
batch.push(raw_event);
}
Err(_) => break,
}
}
let initial_batch_size = batch.len();
engine_metrics.observe_batch_size(initial_batch_size as u64);
let batch_span = tracing::debug_span!(
"process_batch",
batch_size = initial_batch_size,
input_format = ?input_format,
);
let shutdown = tracing::Instrument::instrument(
async {
let mut valid_payloads = Vec::with_capacity(batch.len());
let mut valid_tokens = Vec::with_capacity(batch.len());
for raw_event in batch {
if dlq_enabled
&& !raw_event.payload.trim().is_empty()
&& rsigma_runtime::parse_line(&raw_event.payload, &input_format)
.is_none()
{
tracing::debug!("Event routed to DLQ: parse error");
if engine_dlq_tx
.send(DlqEntry {
original_event: raw_event.payload,
error: "parse error".to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
})
.await
.is_err()
{
tracing::warn!("DLQ channel closed, parse-error event dropped");
}
if engine_ack_tx.send(raw_event.ack_token).await.is_err() {
return true;
}
continue;
}
valid_payloads.push(raw_event.payload);
valid_tokens.push(raw_event.ack_token);
}
if valid_payloads.is_empty() {
return false;
}
let process_start = std::time::Instant::now();
let results: Vec<ProcessResult> = engine_processor.process_batch_with_format(
&valid_payloads,
&input_format,
Some(&filter_fn),
);
let process_elapsed_ms = process_start.elapsed().as_millis() as u64;
let match_count = results.iter().filter(|r| !r.is_empty()).count();
tracing::debug!(
batch_size = valid_payloads.len(),
matches = match_count,
elapsed_ms = process_elapsed_ms,
"Batch processed",
);
for (result, ack_token) in results.into_iter().zip(valid_tokens) {
if result.is_empty() {
if engine_ack_tx.send(ack_token).await.is_err() {
tracing::debug!("Ack channel closed, engine shutting down");
return true;
}
continue;
}
engine_metrics.on_output_queue_depth_change(1);
if sink_tx.send((result, vec![ack_token])).await.is_err() {
tracing::debug!("Sink channel closed, engine shutting down");
return true;
}
}
false
},
batch_span,
)
.await;
engine_metrics.observe_pipeline_latency(pipeline_start.elapsed().as_secs_f64());
if shutdown {
break;
}
}
tracing::info!("Event source exhausted, engine shutting down");
});
let pretty = config.pretty;
let output_specs = if config.output.is_empty() {
vec!["stdout".to_string()]
} else {
config.output.clone()
};
let mut sinks = Vec::new();
for spec in &output_specs {
sinks.push(build_sink(spec, pretty, &config).await);
}
let sink = if sinks.len() == 1 {
sinks.pop().unwrap()
} else {
Sink::FanOut(sinks)
};
tracing::info!(output = ?output_specs, "Sink started");
let sink_metrics = metrics.clone();
let sink_dlq_tx = dlq_tx;
let enrichment_swap_for_sink = enrichment_swap.clone();
let sink_handle = tokio::spawn(async move {
let mut sink = sink;
while let Some((mut result, ack_tokens)) = sink_rx.recv().await {
sink_metrics.on_output_queue_depth_change(-1);
let pipeline_snapshot = enrichment_swap_for_sink.load_full();
if let Some(pipeline) = pipeline_snapshot.as_ref()
&& !pipeline.is_empty()
{
pipeline.run(&mut result).await;
if result.is_empty() {
for token in ack_tokens {
if ack_tx.send(token).await.is_err() {
tracing::debug!("Ack channel closed");
return;
}
}
continue;
}
}
if let Err(e) = sink.send(&result).await {
tracing::warn!(error = %e, "Error writing to sink");
let serialized = serde_json::to_string(&result).unwrap_or_default();
let _ = sink_dlq_tx
.send(DlqEntry {
original_event: serialized,
error: format!("sink delivery failure: {e}"),
timestamp: chrono::Utc::now().to_rfc3339(),
})
.await;
}
for token in ack_tokens {
if ack_tx.send(token).await.is_err() {
tracing::debug!("Ack channel closed");
return;
}
}
}
});
let dlq_metrics = metrics.clone();
let dlq_handle = tokio::spawn(async move {
tracing::debug!("DLQ task started");
let mut dlq_sink = dlq_sink;
let mut no_sink_logged = false;
while let Some(entry) = dlq_rx.recv().await {
dlq_metrics.dlq_events.inc();
if let Some(ref mut sink) = dlq_sink {
let json = serde_json::to_string(&entry).unwrap_or_default();
if let Err(e) = sink.send_raw(&json).await {
tracing::warn!(error = %e, "Failed to write to DLQ sink");
}
} else if !no_sink_logged {
tracing::debug!("DLQ entry counted but no sink configured (use --dlq to persist)");
no_sink_logged = true;
}
}
tracing::debug!("DLQ task finished");
});
#[cfg(feature = "daemon-nats")]
let (ack_hw_seq, ack_hw_ts) = (high_water_seq.clone(), high_water_ts.clone());
let ack_handle = tokio::spawn(async move {
while let Some(token) = ack_rx.recv().await {
#[cfg(feature = "daemon-nats")]
if let Some((seq, ts)) = token.nats_stream_position() {
ack_hw_seq.fetch_max(seq, Ordering::Relaxed);
ack_hw_ts.fetch_max(ts, Ordering::Relaxed);
}
token.ack().await;
}
});
let drain_duration = std::time::Duration::from_secs(config.drain_timeout);
#[cfg(feature = "daemon-otlp")]
let unified_app: axum::Router = otlp_routes.into_axum_router();
#[cfg(not(feature = "daemon-otlp"))]
let unified_app: axum::Router = app;
let mut serve_handle = {
#[cfg(feature = "daemon-tls")]
{
if let Some(state) = tls_state {
let tls_listener = super::tls::RustlsListener::new(
listener,
state.config.clone(),
metrics.tls_active_connections.clone(),
);
let shutdown_fut = shutdown_fut.take().expect("shutdown future consumed once");
tokio::spawn(async move {
if let Err(e) = axum::serve(tls_listener, unified_app)
.with_graceful_shutdown(shutdown_fut)
.await
{
tracing::error!(error = %e, "server error");
}
})
} else {
let shutdown_fut = shutdown_fut.take().expect("shutdown future consumed once");
tokio::spawn(async move {
if let Err(e) = axum::serve(listener, unified_app)
.with_graceful_shutdown(shutdown_fut)
.await
{
tracing::error!(error = %e, "server error");
}
})
}
}
#[cfg(not(feature = "daemon-tls"))]
{
let shutdown_fut = shutdown_fut.take().expect("shutdown future consumed once");
tokio::spawn(async move {
if let Err(e) = axum::serve(listener, unified_app)
.with_graceful_shutdown(shutdown_fut)
.await
{
tracing::error!(error = %e, "server error");
}
})
}
};
let shutdown_triggered = tokio::select! {
_ = &mut serve_handle => true,
_ = engine_handle => {
tracing::info!("Streaming pipeline complete");
serve_handle.abort();
false
}
};
if shutdown_triggered {
tracing::info!("Shutdown signal received, draining pipeline...");
if let Some(h) = source_handle {
h.abort();
tracing::info!("Source task aborted");
}
#[cfg(feature = "daemon-otlp")]
source_done_notify.notify_one();
let drain = async {
let _ = sink_handle.await;
tracing::debug!("Sink task finished");
let _ = dlq_handle.await;
tracing::debug!("DLQ task finished");
let _ = ack_handle.await;
tracing::debug!("Ack task finished");
};
if tokio::time::timeout(drain_duration, drain).await.is_err() {
tracing::warn!(
timeout_secs = config.drain_timeout,
"Drain timeout exceeded, some events may be lost"
);
}
} else {
let _ = sink_handle.await;
tracing::debug!("Sink task finished");
let _ = dlq_handle.await;
tracing::debug!("DLQ task finished");
let _ = ack_handle.await;
tracing::debug!("Ack task finished");
}
if let Some(ref store) = state_store
&& let Some(snapshot) = processor.export_state()
{
let position = source_position_from_atomics(&high_water_seq, &high_water_ts);
match store.save(&snapshot, position.as_ref()).await {
Ok(()) => {
if let Some(ref pos) = position {
tracing::info!(
source_sequence = pos.sequence,
"Correlation state saved to database on shutdown"
);
} else {
tracing::info!("Correlation state saved to database on shutdown");
}
}
Err(e) => tracing::error!(error = %e, "Failed to save state on shutdown"),
}
}
}
#[cfg(unix)]
async fn shutdown_signal(
mut sigint: tokio::signal::unix::Signal,
mut sigterm: tokio::signal::unix::Signal,
) {
tokio::select! {
_ = sigint.recv() => {}
_ = sigterm.recv() => {}
}
tracing::info!("Shutdown signal received");
}
#[cfg(not(unix))]
async fn shutdown_signal() {
tokio::signal::ctrl_c().await.ok();
tracing::info!("Shutdown signal received");
}
async fn build_sink(
spec: &str,
pretty: bool,
#[cfg_attr(not(feature = "daemon-nats"), allow(unused))] config: &DaemonConfig,
) -> Sink {
if spec == "stdout" || spec == "stdout://" {
return Sink::Stdout(StdoutSink::new(pretty));
}
if let Some(path) = spec.strip_prefix("file://") {
let path = std::path::Path::new(path);
return match FileSink::open(path) {
Ok(file_sink) => {
tracing::info!(path = %path.display(), "File sink opened");
Sink::File(file_sink)
}
Err(e) => {
tracing::error!(path = %path.display(), error = %e, "Failed to open file sink");
std::process::exit(crate::exit_code::CONFIG_ERROR);
}
};
}
#[cfg(feature = "daemon-nats")]
if spec.starts_with("nats://") {
let (url, subject) = parse_nats_url(spec);
let mut nats_cfg = config.nats_config.clone();
nats_cfg.url = url.clone();
return match rsigma_runtime::NatsSink::connect(&nats_cfg, &subject).await {
Ok(nats_sink) => {
tracing::info!(url = url, subject = subject, "NATS sink started");
Sink::Nats(Box::new(nats_sink))
}
Err(e) => {
tracing::error!(error = %e, url = url, "Failed to connect NATS sink");
std::process::exit(crate::exit_code::CONFIG_ERROR);
}
};
}
tracing::error!(
output = spec,
"Unsupported output sink (supported: stdout, file://<path>, nats://)"
);
std::process::exit(crate::exit_code::CONFIG_ERROR);
}
#[cfg(feature = "daemon-nats")]
fn parse_nats_url(url: &str) -> (String, String) {
let without_scheme = url.strip_prefix("nats://").unwrap_or(url);
match without_scheme.find('/') {
Some(pos) => {
let server = format!("nats://{}", &without_scheme[..pos]);
let subject = without_scheme[pos + 1..].to_string();
(server, subject)
}
None => (format!("nats://{without_scheme}"), ">".to_string()),
}
}
fn decide_state_restore(
mode: StateRestoreMode,
stored_position: Option<SourcePosition>,
#[cfg(feature = "daemon-nats")] replay_policy: &rsigma_runtime::ReplayPolicy,
) -> bool {
match mode {
StateRestoreMode::ForceClear => {
tracing::info!("State restore skipped (--clear-state)");
false
}
StateRestoreMode::ForceKeep => {
tracing::info!("State restore forced (--keep-state)");
true
}
StateRestoreMode::Auto => {
#[cfg(feature = "daemon-nats")]
{
use rsigma_runtime::ReplayPolicy;
match replay_policy {
ReplayPolicy::Resume => true,
ReplayPolicy::Latest => {
tracing::info!("State restore skipped (--replay-from-latest starts fresh)");
false
}
ReplayPolicy::FromSequence(replay_seq) => match stored_position {
Some(pos) if *replay_seq > pos.sequence => {
tracing::info!(
replay_from = replay_seq,
stored_sequence = pos.sequence,
"Restoring state (replay starts after stored position)"
);
true
}
Some(pos) => {
tracing::info!(
replay_from = replay_seq,
stored_sequence = pos.sequence,
"State restore skipped (replay starts at or before stored position, would double-count)"
);
false
}
None => {
tracing::info!(
"State restore skipped (no stored position to compare against replay)"
);
false
}
},
ReplayPolicy::FromTime(replay_time) => {
let replay_ts = replay_time.unix_timestamp();
match stored_position {
Some(pos) if replay_ts > pos.timestamp => {
tracing::info!(
replay_from_ts = replay_ts,
stored_ts = pos.timestamp,
"Restoring state (replay starts after stored timestamp)"
);
true
}
Some(pos) => {
tracing::info!(
replay_from_ts = replay_ts,
stored_ts = pos.timestamp,
"State restore skipped (replay starts at or before stored timestamp, would double-count)"
);
false
}
None => {
tracing::info!(
"State restore skipped (no stored position to compare against replay)"
);
false
}
}
}
}
}
#[cfg(not(feature = "daemon-nats"))]
{
let _ = stored_position;
true
}
}
}
}
#[cfg(feature = "daemon-tls")]
pub(crate) fn update_tls_metrics(metrics: &Metrics, expiry_unix: i64) {
let now = chrono::Utc::now().timestamp();
let delta = (expiry_unix - now) as f64;
metrics.tls_certificate_expiry_seconds.set(delta);
}
#[cfg(feature = "daemon-tls")]
pub(crate) fn warn_if_cert_expiring_soon(expiry_unix: i64) {
const WARN_WINDOW_SECS: i64 = 30 * 24 * 3600;
let now = chrono::Utc::now().timestamp();
let remaining = expiry_unix - now;
if remaining < 0 {
tracing::warn!(
expiry_unix,
"TLS server certificate has already expired; clients will reject the handshake"
);
} else if remaining < WARN_WINDOW_SECS {
let days = remaining / 86400;
tracing::warn!(
expiry_unix,
days_remaining = days,
"TLS server certificate expires in less than 30 days; rotate it soon"
);
}
}
fn source_position_from_atomics(seq: &AtomicU64, ts: &AtomicI64) -> Option<SourcePosition> {
let s = seq.load(Ordering::Relaxed);
if s == 0 {
return None;
}
Some(SourcePosition {
sequence: s,
timestamp: ts.load(Ordering::Relaxed),
})
}
async fn healthz() -> impl IntoResponse {
Json(serde_json::json!({ "status": "ok" }))
}
async fn readyz(State(state): State<AppState>) -> Response {
if state.health.is_ready() {
(
StatusCode::OK,
Json(serde_json::json!({ "status": "ready", "rules_loaded": true })),
)
.into_response()
} else {
(
StatusCode::SERVICE_UNAVAILABLE,
Json(serde_json::json!({ "status": "not_ready", "rules_loaded": false })),
)
.into_response()
}
}
async fn metrics_handler(State(state): State<AppState>) -> impl IntoResponse {
state
.metrics
.uptime_seconds
.set(state.start_time.elapsed().as_secs_f64());
if let Some(observer) = state.field_observer.as_ref() {
let snapshot = observer.snapshot();
state.metrics.update_field_observer_metrics(&snapshot);
}
(
[(
axum::http::header::CONTENT_TYPE,
"text/plain; version=0.0.4; charset=utf-8",
)],
state.metrics.encode(),
)
}
async fn list_rules(State(state): State<AppState>) -> impl IntoResponse {
let stats = state.processor.stats();
Json(serde_json::json!({
"detection_rules": stats.detection_rules,
"correlation_rules": stats.correlation_rules,
"rules_path": state.processor.rules_path().display().to_string(),
}))
}
#[derive(Serialize)]
struct StatusResponse {
status: String,
detection_rules: usize,
correlation_rules: usize,
correlation_state_entries: usize,
events_processed: u64,
detection_matches: u64,
correlation_matches: u64,
uptime_seconds: f64,
#[serde(skip_serializing_if = "Option::is_none")]
dynamic_sources: Option<DynamicSourcesSummary>,
}
#[derive(Serialize)]
struct DynamicSourcesSummary {
total: usize,
resolves_total: u64,
errors_total: u64,
cache_hits: u64,
}
async fn status(State(state): State<AppState>) -> impl IntoResponse {
let stats = state.processor.stats();
let dynamic_sources = state.source_resolver.as_ref().map(|_| {
use prometheus::core::Collector;
let resolves: u64 = state
.metrics
.source_resolves_total
.collect()
.first()
.map(|mf| {
mf.get_metric()
.iter()
.map(|m| m.get_counter().get_value() as u64)
.sum()
})
.unwrap_or(0);
let errors: u64 = state
.metrics
.source_resolve_errors
.collect()
.first()
.map(|mf| {
mf.get_metric()
.iter()
.map(|m| m.get_counter().get_value() as u64)
.sum()
})
.unwrap_or(0);
let cache_hits = state.metrics.source_cache_hits.get();
let total = state
.metrics
.source_last_resolved
.collect()
.first()
.map(|mf| mf.get_metric().len())
.unwrap_or(0);
DynamicSourcesSummary {
total,
resolves_total: resolves,
errors_total: errors,
cache_hits,
}
});
let resp = StatusResponse {
status: if state.health.is_ready() {
"running".to_string()
} else {
"loading".to_string()
},
detection_rules: stats.detection_rules,
correlation_rules: stats.correlation_rules,
correlation_state_entries: stats.state_entries,
events_processed: state.metrics.events_processed.get(),
detection_matches: state.metrics.detection_matches.get(),
correlation_matches: state.metrics.correlation_matches.get(),
uptime_seconds: state.start_time.elapsed().as_secs_f64(),
dynamic_sources,
};
Json(resp)
}
async fn trigger_reload(State(state): State<AppState>) -> impl IntoResponse {
match state.reload_tx.try_send(()) {
Ok(()) => (
StatusCode::OK,
Json(serde_json::json!({ "status": "reload_triggered" })),
),
Err(_) => (
StatusCode::TOO_MANY_REQUESTS,
Json(serde_json::json!({ "status": "reload_already_pending" })),
),
}
}
async fn list_sources(State(state): State<AppState>) -> impl IntoResponse {
let mut sources_info = Vec::new();
for entry in state.source_registry.entries() {
sources_info.push(serde_json::json!({
"source_id": entry.source.id,
"origin": entry.origin.to_string(),
"type": format!("{:?}", entry.source.source_type).split('{').next().unwrap_or("Unknown").trim(),
"refresh": format!("{:?}", entry.source.refresh),
"required": entry.source.required,
}));
}
Json(serde_json::json!({ "sources": sources_info }))
}
async fn resolve_sources(State(state): State<AppState>) -> impl IntoResponse {
use rsigma_runtime::sources::refresh::RefreshTrigger;
let Some(tx) = &state.sources_trigger_tx else {
return (
StatusCode::NOT_FOUND,
Json(serde_json::json!({ "error": "no dynamic sources configured" })),
);
};
match tx.try_send(RefreshTrigger::All) {
Ok(()) => (
StatusCode::OK,
Json(serde_json::json!({ "status": "resolve_triggered" })),
),
Err(_) => (
StatusCode::TOO_MANY_REQUESTS,
Json(serde_json::json!({ "status": "resolve_already_pending" })),
),
}
}
async fn resolve_source_by_id(
State(state): State<AppState>,
axum::extract::Path(source_id): axum::extract::Path<String>,
) -> impl IntoResponse {
use rsigma_runtime::sources::refresh::RefreshTrigger;
let Some(tx) = &state.sources_trigger_tx else {
return (
StatusCode::NOT_FOUND,
Json(serde_json::json!({ "error": "no dynamic sources configured" })),
);
};
match tx.try_send(RefreshTrigger::Single(source_id.clone())) {
Ok(()) => (
StatusCode::OK,
Json(serde_json::json!({ "status": "resolve_triggered", "source_id": source_id })),
),
Err(_) => (
StatusCode::TOO_MANY_REQUESTS,
Json(serde_json::json!({ "status": "resolve_already_pending" })),
),
}
}
async fn invalidate_source_cache(
State(state): State<AppState>,
axum::extract::Path(source_id): axum::extract::Path<String>,
) -> impl IntoResponse {
let Some(resolver) = &state.source_resolver else {
return (
StatusCode::NOT_FOUND,
Json(serde_json::json!({ "error": "no dynamic sources configured" })),
);
};
resolver.cache().invalidate(&source_id);
(
StatusCode::OK,
Json(serde_json::json!({ "status": "invalidated", "source_id": source_id })),
)
}
const FIELDS_DEFAULT_LIMIT: usize = 100;
const FIELDS_MAX_LIMIT: usize = 1000;
const MISSING_RULE_TITLES_CAP: usize = 10;
#[derive(serde::Deserialize, Default)]
struct FieldsQuery {
limit: Option<usize>,
offset: Option<usize>,
}
impl FieldsQuery {
fn limit(&self) -> usize {
self.limit
.unwrap_or(FIELDS_DEFAULT_LIMIT)
.min(FIELDS_MAX_LIMIT)
}
fn offset(&self) -> usize {
self.offset.unwrap_or(0)
}
}
fn observation_disabled_response() -> Response {
(
StatusCode::SERVICE_UNAVAILABLE,
Json(serde_json::json!({
"error": "field observation disabled",
"hint": "restart the daemon with --observe-fields to enable /api/v1/fields/*",
})),
)
.into_response()
}
fn missing_field_payload(field: &str, origin: &rsigma_eval::FieldOrigin) -> serde_json::Value {
let mut rule_titles: Vec<&str> = origin.rule_titles.iter().map(String::as_str).collect();
let total = rule_titles.len();
let truncated = total > MISSING_RULE_TITLES_CAP;
rule_titles.truncate(MISSING_RULE_TITLES_CAP);
let sources: Vec<&str> = origin.sources.iter().map(|s| s.as_str()).collect();
serde_json::json!({
"field": field,
"rule_count": total,
"sources": sources,
"rule_titles": rule_titles,
"truncated": truncated,
})
}
fn paginate<T>(mut items: Vec<T>, offset: usize, limit: usize) -> (Vec<T>, usize, Option<usize>) {
let total = items.len();
if offset >= total {
return (Vec::new(), total, None);
}
let end = offset.saturating_add(limit).min(total);
items.truncate(end);
let page: Vec<T> = items.drain(offset..).collect();
let next_offset = if end < total { Some(end) } else { None };
(page, total, next_offset)
}
async fn fields_full(
State(state): State<AppState>,
axum::extract::Query(query): axum::extract::Query<FieldsQuery>,
) -> Response {
let Some(observer) = state.field_observer.as_ref() else {
return observation_disabled_response();
};
let snapshot = observer.snapshot();
state.metrics.update_field_observer_metrics(&snapshot);
let rule_field_set = state.processor.rule_field_set();
let coverage = snapshot.coverage(&rule_field_set);
let unknown_entries: Vec<serde_json::Value> = coverage
.unknown
.iter()
.map(|e| {
let field: &str = &e.field;
serde_json::json!({ "field": field, "count": e.count })
})
.collect();
let missing_entries: Vec<serde_json::Value> = coverage
.missing
.iter()
.map(|(name, origin)| missing_field_payload(name, origin))
.collect();
let intersection_count = coverage.intersection_count;
let (unknown_page, unknown_total, unknown_next) =
paginate(unknown_entries, query.offset(), query.limit());
let (missing_page, missing_total, missing_next) =
paginate(missing_entries, query.offset(), query.limit());
let body = serde_json::json!({
"summary": {
"events_observed": snapshot.events_observed,
"unique_keys_observed": snapshot.unique_keys,
"rule_fields_loaded": rule_field_set.len(),
"overflow_dropped": snapshot.overflow_dropped,
"max_keys": snapshot.max_keys,
"uptime_seconds": snapshot.uptime_seconds,
"intersection_count": intersection_count,
"unknown_count": unknown_total,
"missing_count": missing_total,
},
"unknown": {
"items": unknown_page,
"total": unknown_total,
"offset": query.offset(),
"limit": query.limit(),
"next_offset": unknown_next,
},
"missing": {
"items": missing_page,
"total": missing_total,
"offset": query.offset(),
"limit": query.limit(),
"next_offset": missing_next,
},
});
(StatusCode::OK, Json(body)).into_response()
}
async fn fields_unknown(
State(state): State<AppState>,
axum::extract::Query(query): axum::extract::Query<FieldsQuery>,
) -> Response {
let Some(observer) = state.field_observer.as_ref() else {
return observation_disabled_response();
};
let snapshot = observer.snapshot();
state.metrics.update_field_observer_metrics(&snapshot);
let rule_field_set = state.processor.rule_field_set();
let coverage = snapshot.coverage(&rule_field_set);
let entries: Vec<serde_json::Value> = coverage
.unknown
.iter()
.map(|e| {
let field: &str = &e.field;
serde_json::json!({ "field": field, "count": e.count })
})
.collect();
let (page, total, next_offset) = paginate(entries, query.offset(), query.limit());
(
StatusCode::OK,
Json(serde_json::json!({
"items": page,
"total": total,
"offset": query.offset(),
"limit": query.limit(),
"next_offset": next_offset,
})),
)
.into_response()
}
async fn fields_missing(
State(state): State<AppState>,
axum::extract::Query(query): axum::extract::Query<FieldsQuery>,
) -> Response {
let Some(observer) = state.field_observer.as_ref() else {
return observation_disabled_response();
};
let snapshot = observer.snapshot();
state.metrics.update_field_observer_metrics(&snapshot);
let rule_field_set = state.processor.rule_field_set();
let coverage = snapshot.coverage(&rule_field_set);
let entries: Vec<serde_json::Value> = coverage
.missing
.iter()
.map(|(name, origin)| missing_field_payload(name, origin))
.collect();
let (page, total, next_offset) = paginate(entries, query.offset(), query.limit());
(
StatusCode::OK,
Json(serde_json::json!({
"items": page,
"total": total,
"offset": query.offset(),
"limit": query.limit(),
"next_offset": next_offset,
})),
)
.into_response()
}
async fn fields_observer_reset(State(state): State<AppState>) -> Response {
let Some(observer) = state.field_observer.as_ref() else {
return observation_disabled_response();
};
let (previous_keys, previous_events) = observer.reset();
let snapshot = observer.snapshot();
state.metrics.update_field_observer_metrics(&snapshot);
(
StatusCode::OK,
Json(serde_json::json!({
"status": "reset",
"previous_keys": previous_keys,
"previous_events": previous_events,
})),
)
.into_response()
}
async fn ingest_events(State(state): State<AppState>, body: String) -> Response {
let event_tx = match &state.event_tx {
Some(tx) => tx,
None => {
return (
StatusCode::NOT_FOUND,
Json(serde_json::json!({
"error": "event ingestion not enabled (start with --input http)"
})),
)
.into_response();
}
};
const MAX_LINE_BYTES: usize = 1_048_576;
let mut accepted = 0u64;
for line in body.lines() {
if line.trim().is_empty() {
continue;
}
if line.len() > MAX_LINE_BYTES {
return (
StatusCode::PAYLOAD_TOO_LARGE,
Json(serde_json::json!({
"error": "line exceeds maximum size",
"max_bytes": MAX_LINE_BYTES,
})),
)
.into_response();
}
let raw_event = RawEvent {
payload: line.to_string(),
ack_token: AckToken::Noop,
};
if event_tx.send(raw_event).await.is_err() {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(serde_json::json!({
"error": "event channel closed",
"accepted": accepted,
})),
)
.into_response();
}
accepted += 1;
}
(
StatusCode::OK,
Json(serde_json::json!({ "accepted": accepted })),
)
.into_response()
}
#[cfg(feature = "daemon-otlp")]
async fn otlp_http_logs(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
body: axum::body::Bytes,
) -> Response {
let content_type = headers
.get(axum::http::header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.unwrap_or("application/x-protobuf")
.to_string();
let is_proto = content_type.starts_with("application/x-protobuf")
|| content_type.starts_with("application/protobuf");
let is_json = content_type.starts_with("application/json");
let encoding = if is_proto { "protobuf" } else { "json" };
let span = tracing::debug_span!("otlp_ingest", transport = "http", encoding);
async move {
if !is_proto && !is_json {
state
.metrics
.otlp_errors
.with_label_values(&["http", "unsupported_content_type"])
.inc();
return (
StatusCode::UNSUPPORTED_MEDIA_TYPE,
Json(serde_json::json!({
"error": format!(
"unsupported content-type: {content_type} \
(expected application/x-protobuf or application/json)"
)
})),
)
.into_response();
}
let body = match otlp_maybe_decompress(&headers, body) {
Ok(b) => b,
Err(e) => {
state
.metrics
.otlp_errors
.with_label_values(&["http", "decompression"])
.inc();
return (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({
"error": format!("decompression error: {e}")
})),
)
.into_response();
}
};
let request = if is_proto {
use prost::Message;
match rsigma_runtime::ExportLogsServiceRequest::decode(body) {
Ok(req) => req,
Err(e) => {
state
.metrics
.otlp_errors
.with_label_values(&["http", "decode"])
.inc();
return (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({
"error": format!("protobuf decode error: {e}")
})),
)
.into_response();
}
}
} else {
match serde_json::from_slice::<rsigma_runtime::ExportLogsServiceRequest>(&body) {
Ok(req) => req,
Err(e) => {
state
.metrics
.otlp_errors
.with_label_values(&["http", "decode"])
.inc();
return (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({
"error": format!("JSON decode error: {e}")
})),
)
.into_response();
}
}
};
state
.metrics
.otlp_requests
.with_label_values(&["http", encoding])
.inc();
let raw_events = rsigma_runtime::logs_request_to_raw_events(&request);
let record_count = raw_events.len();
state.metrics.otlp_log_records.inc_by(record_count as u64);
tracing::debug!(record_count, "OTLP logs ingested");
for event in raw_events {
if state.otlp_event_tx.send(event).await.is_err() {
state
.metrics
.otlp_errors
.with_label_values(&["http", "channel_closed"])
.inc();
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(serde_json::json!({
"error": "event channel closed"
})),
)
.into_response();
}
}
(
StatusCode::OK,
Json(serde_json::json!({
"partialSuccess": {
"rejectedLogRecords": 0,
"errorMessage": ""
}
})),
)
.into_response()
}
.instrument(span)
.await
}
#[cfg(feature = "daemon-otlp")]
fn otlp_maybe_decompress(
headers: &axum::http::HeaderMap,
body: axum::body::Bytes,
) -> Result<axum::body::Bytes, std::io::Error> {
let content_encoding = headers
.get(axum::http::header::CONTENT_ENCODING)
.and_then(|v| v.to_str().ok())
.unwrap_or("");
if content_encoding == "gzip" {
use std::io::Read;
let mut decoder = flate2::read::GzDecoder::new(&body[..]);
let mut decompressed = Vec::new();
decoder.read_to_end(&mut decompressed)?;
Ok(axum::body::Bytes::from(decompressed))
} else {
Ok(body)
}
}
#[cfg(feature = "daemon-otlp")]
struct OtlpLogsGrpcService {
event_tx: mpsc::Sender<RawEvent>,
metrics: Arc<Metrics>,
}
#[cfg(feature = "daemon-otlp")]
#[tonic::async_trait]
impl rsigma_runtime::LogsService for OtlpLogsGrpcService {
async fn export(
&self,
request: tonic::Request<rsigma_runtime::ExportLogsServiceRequest>,
) -> Result<tonic::Response<rsigma_runtime::ExportLogsServiceResponse>, tonic::Status> {
let span = tracing::debug_span!("otlp_ingest", transport = "grpc", encoding = "protobuf");
async move {
self.metrics
.otlp_requests
.with_label_values(&["grpc", "protobuf"])
.inc();
let raw_events = rsigma_runtime::logs_request_to_raw_events(&request.into_inner());
let record_count = raw_events.len();
self.metrics.otlp_log_records.inc_by(record_count as u64);
tracing::debug!(record_count, "OTLP logs ingested");
for event in raw_events {
self.event_tx.send(event).await.map_err(|_| {
self.metrics
.otlp_errors
.with_label_values(&["grpc", "channel_closed"])
.inc();
tonic::Status::unavailable("event channel closed")
})?;
}
Ok(tonic::Response::new(
rsigma_runtime::ExportLogsServiceResponse::default(),
))
}
.instrument(span)
.await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn force_clear_always_skips() {
let result = decide_state_restore(
StateRestoreMode::ForceClear,
Some(SourcePosition {
sequence: 100,
timestamp: 1000,
}),
#[cfg(feature = "daemon-nats")]
&rsigma_runtime::ReplayPolicy::Resume,
);
assert!(!result);
}
#[test]
fn force_keep_always_restores() {
let result = decide_state_restore(
StateRestoreMode::ForceKeep,
None,
#[cfg(feature = "daemon-nats")]
&rsigma_runtime::ReplayPolicy::Latest,
);
assert!(result);
}
#[cfg(feature = "daemon-nats")]
mod nats_auto {
use super::*;
use rsigma_runtime::ReplayPolicy;
#[test]
fn resume_restores() {
assert!(decide_state_restore(
StateRestoreMode::Auto,
None,
&ReplayPolicy::Resume,
));
}
#[test]
fn latest_skips() {
assert!(!decide_state_restore(
StateRestoreMode::Auto,
Some(SourcePosition {
sequence: 100,
timestamp: 1000,
}),
&ReplayPolicy::Latest,
));
}
#[test]
fn forward_sequence_restores() {
assert!(decide_state_restore(
StateRestoreMode::Auto,
Some(SourcePosition {
sequence: 100,
timestamp: 1000,
}),
&ReplayPolicy::FromSequence(101),
));
}
#[test]
fn backward_sequence_skips() {
assert!(!decide_state_restore(
StateRestoreMode::Auto,
Some(SourcePosition {
sequence: 100,
timestamp: 1000,
}),
&ReplayPolicy::FromSequence(50),
));
}
#[test]
fn equal_sequence_skips() {
assert!(!decide_state_restore(
StateRestoreMode::Auto,
Some(SourcePosition {
sequence: 100,
timestamp: 1000,
}),
&ReplayPolicy::FromSequence(100),
));
}
#[test]
fn forward_time_restores() {
let future = time::OffsetDateTime::from_unix_timestamp(2000).unwrap();
assert!(decide_state_restore(
StateRestoreMode::Auto,
Some(SourcePosition {
sequence: 100,
timestamp: 1000,
}),
&ReplayPolicy::FromTime(future),
));
}
#[test]
fn backward_time_skips() {
let past = time::OffsetDateTime::from_unix_timestamp(500).unwrap();
assert!(!decide_state_restore(
StateRestoreMode::Auto,
Some(SourcePosition {
sequence: 100,
timestamp: 1000,
}),
&ReplayPolicy::FromTime(past),
));
}
#[test]
fn no_stored_position_skips_on_replay() {
assert!(!decide_state_restore(
StateRestoreMode::Auto,
None,
&ReplayPolicy::FromSequence(42),
));
}
}
#[cfg(not(feature = "daemon-nats"))]
#[test]
fn auto_without_nats_restores() {
assert!(decide_state_restore(StateRestoreMode::Auto, None));
}
}