use std::path::PathBuf;
use std::process;
use clap::Args;
use crate::daemon;
use crate::exit_code;
#[derive(Args, Debug)]
pub(crate) struct DaemonArgs {
#[arg(short, long)]
pub rules: PathBuf,
#[arg(short = 'p', long = "pipeline")]
pub pipelines: Vec<PathBuf>,
#[arg(long = "jq", conflicts_with = "jsonpath")]
pub jq: Option<String>,
#[arg(long = "jsonpath", conflicts_with = "jq")]
pub jsonpath: Option<String>,
#[arg(long = "include-event")]
pub include_event: bool,
#[arg(long)]
pub pretty: bool,
#[arg(long = "api-addr", default_value = "0.0.0.0:9090")]
pub api_addr: String,
#[arg(long = "suppress")]
pub suppress: Option<String>,
#[arg(long = "action", value_parser = ["alert", "reset"])]
pub action: Option<String>,
#[arg(long = "no-detections")]
pub no_detections: bool,
#[arg(long = "correlation-event-mode", default_value = "none")]
pub correlation_event_mode: String,
#[arg(long = "max-correlation-events", default_value = "10")]
pub max_correlation_events: usize,
#[arg(long = "timestamp-field")]
pub timestamp_fields: Vec<String>,
#[arg(long = "state-db")]
pub state_db: Option<PathBuf>,
#[arg(long = "state-save-interval", default_value = "30", value_parser = clap::value_parser!(u64).range(1..))]
pub state_save_interval: u64,
#[arg(long = "input", default_value = "stdin")]
pub input: String,
#[arg(long = "output", default_value = "stdout")]
pub output: Vec<String>,
#[arg(long = "buffer-size", default_value = "10000")]
pub buffer_size: usize,
#[arg(long = "batch-size", default_value = "1")]
pub batch_size: usize,
#[arg(long = "drain-timeout", default_value = "5")]
pub drain_timeout: u64,
#[arg(long = "dlq")]
pub dlq: Option<String>,
#[arg(long = "input-format", default_value = "auto")]
pub input_format: String,
#[arg(long = "syslog-tz", default_value = "+00:00")]
pub syslog_tz: String,
#[cfg(feature = "daemon-nats")]
#[arg(long = "nats-creds", env = "NATS_CREDS")]
pub nats_creds: Option<PathBuf>,
#[cfg(feature = "daemon-nats")]
#[arg(long = "nats-token", env = "NATS_TOKEN", conflicts_with = "nats_creds")]
pub nats_token: Option<String>,
#[cfg(feature = "daemon-nats")]
#[arg(long = "nats-user", env = "NATS_USER", requires = "nats_password", conflicts_with_all = ["nats_creds", "nats_token"])]
pub nats_user: Option<String>,
#[cfg(feature = "daemon-nats")]
#[arg(long = "nats-password", env = "NATS_PASSWORD", requires = "nats_user")]
pub nats_password: Option<String>,
#[cfg(feature = "daemon-nats")]
#[arg(long = "nats-nkey", env = "NATS_NKEY", conflicts_with_all = ["nats_creds", "nats_token", "nats_user"])]
pub nats_nkey: Option<String>,
#[cfg(feature = "daemon-nats")]
#[arg(long = "nats-tls-cert", requires = "nats_tls_key")]
pub nats_tls_cert: Option<PathBuf>,
#[cfg(feature = "daemon-nats")]
#[arg(long = "nats-tls-key", requires = "nats_tls_cert")]
pub nats_tls_key: Option<PathBuf>,
#[cfg(feature = "daemon-nats")]
#[arg(long = "nats-require-tls")]
pub nats_require_tls: bool,
#[cfg(feature = "daemon-nats")]
#[arg(long = "replay-from-sequence", conflicts_with_all = ["replay_from_time", "replay_from_latest"])]
pub replay_from_sequence: Option<u64>,
#[cfg(feature = "daemon-nats")]
#[arg(long = "replay-from-time", conflicts_with_all = ["replay_from_sequence", "replay_from_latest"])]
pub replay_from_time: Option<String>,
#[cfg(feature = "daemon-nats")]
#[arg(long = "replay-from-latest", conflicts_with_all = ["replay_from_sequence", "replay_from_time"])]
pub replay_from_latest: bool,
#[arg(long = "clear-state", conflicts_with = "keep_state")]
pub clear_state: bool,
#[arg(long = "keep-state", conflicts_with = "clear_state")]
pub keep_state: bool,
#[arg(long = "timestamp-fallback", default_value = "wallclock",
value_parser = ["wallclock", "skip"])]
pub timestamp_fallback: String,
#[cfg(feature = "daemon-nats")]
#[arg(long = "consumer-group", env = "RSIGMA_CONSUMER_GROUP")]
pub consumer_group: Option<String>,
#[arg(long = "allow-remote-include")]
pub allow_remote_include: bool,
#[arg(long = "bloom-prefilter")]
pub bloom_prefilter: bool,
#[arg(long = "bloom-max-bytes")]
pub bloom_max_bytes: Option<usize>,
#[cfg(feature = "daachorse-index")]
#[arg(long = "cross-rule-ac")]
pub cross_rule_ac: bool,
}
#[cfg(feature = "daemon-nats")]
pub(crate) struct NatsAuthArgs {
pub nats_creds: Option<PathBuf>,
pub nats_token: Option<String>,
pub nats_user: Option<String>,
pub nats_password: Option<String>,
pub nats_nkey: Option<String>,
pub nats_tls_cert: Option<PathBuf>,
pub nats_tls_key: Option<PathBuf>,
pub nats_require_tls: bool,
}
pub(crate) fn cmd_daemon(args: DaemonArgs) {
let DaemonArgs {
rules: rules_path,
pipelines: pipeline_paths,
jq,
jsonpath,
include_event,
pretty,
api_addr,
suppress,
action,
no_detections,
correlation_event_mode,
max_correlation_events,
timestamp_fields,
state_db,
state_save_interval,
input,
output,
buffer_size,
batch_size,
drain_timeout,
dlq,
input_format,
syslog_tz,
#[cfg(feature = "daemon-nats")]
nats_creds,
#[cfg(feature = "daemon-nats")]
nats_token,
#[cfg(feature = "daemon-nats")]
nats_user,
#[cfg(feature = "daemon-nats")]
nats_password,
#[cfg(feature = "daemon-nats")]
nats_nkey,
#[cfg(feature = "daemon-nats")]
nats_tls_cert,
#[cfg(feature = "daemon-nats")]
nats_tls_key,
#[cfg(feature = "daemon-nats")]
nats_require_tls,
#[cfg(feature = "daemon-nats")]
replay_from_sequence,
#[cfg(feature = "daemon-nats")]
replay_from_time,
#[cfg(feature = "daemon-nats")]
replay_from_latest,
clear_state,
keep_state,
timestamp_fallback,
#[cfg(feature = "daemon-nats")]
consumer_group,
allow_remote_include,
bloom_prefilter,
bloom_max_bytes,
#[cfg(feature = "daachorse-index")]
cross_rule_ac,
} = args;
#[cfg(feature = "daemon-nats")]
let nats_auth = NatsAuthArgs {
nats_creds,
nats_token,
nats_user,
nats_password,
nats_nkey,
nats_tls_cert,
nats_tls_key,
nats_require_tls,
};
#[cfg(feature = "daemon-nats")]
let replay_policy = if let Some(seq) = replay_from_sequence {
rsigma_runtime::ReplayPolicy::FromSequence(seq)
} else if let Some(ref ts) = replay_from_time {
let t = time::OffsetDateTime::parse(ts, &time::format_description::well_known::Rfc3339)
.unwrap_or_else(|e| {
eprintln!("Invalid --replay-from-time '{ts}': {e}");
process::exit(exit_code::CONFIG_ERROR);
});
rsigma_runtime::ReplayPolicy::FromTime(t)
} else if replay_from_latest {
rsigma_runtime::ReplayPolicy::Latest
} else {
rsigma_runtime::ReplayPolicy::Resume
};
let state_restore_mode = if clear_state {
daemon::server::StateRestoreMode::ForceClear
} else if keep_state {
daemon::server::StateRestoreMode::ForceKeep
} else {
daemon::server::StateRestoreMode::Auto
};
run_daemon(
rules_path,
pipeline_paths,
jq,
jsonpath,
include_event,
pretty,
api_addr,
suppress,
action,
no_detections,
correlation_event_mode,
max_correlation_events,
timestamp_fields,
timestamp_fallback,
state_db,
state_save_interval,
input,
output,
buffer_size,
batch_size,
drain_timeout,
dlq,
input_format,
syslog_tz,
state_restore_mode,
#[cfg(feature = "daemon-nats")]
nats_auth,
#[cfg(feature = "daemon-nats")]
replay_policy,
#[cfg(feature = "daemon-nats")]
consumer_group,
allow_remote_include,
bloom_prefilter,
bloom_max_bytes,
#[cfg(feature = "daachorse-index")]
cross_rule_ac,
);
}
#[allow(clippy::too_many_arguments)]
fn run_daemon(
rules_path: PathBuf,
pipeline_paths: Vec<PathBuf>,
jq: Option<String>,
jsonpath: Option<String>,
include_event: bool,
pretty: bool,
api_addr: String,
suppress: Option<String>,
action: Option<String>,
no_detections: bool,
correlation_event_mode: String,
max_correlation_events: usize,
timestamp_fields: Vec<String>,
timestamp_fallback: String,
state_db: Option<PathBuf>,
state_save_interval: u64,
input: String,
output: Vec<String>,
buffer_size: usize,
batch_size: usize,
drain_timeout: u64,
dlq: Option<String>,
input_format: String,
syslog_tz: String,
state_restore_mode: daemon::server::StateRestoreMode,
#[cfg(feature = "daemon-nats")] nats_auth: NatsAuthArgs,
#[cfg(feature = "daemon-nats")] replay_policy: rsigma_runtime::ReplayPolicy,
#[cfg(feature = "daemon-nats")] consumer_group: Option<String>,
allow_remote_include: bool,
bloom_prefilter: bool,
bloom_max_bytes: Option<usize>,
#[cfg(feature = "daachorse-index")] cross_rule_ac: bool,
) {
use rsigma_eval::resolve_builtin_pipeline;
tracing_subscriber::fmt()
.json()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.with_writer(std::io::stderr)
.init();
let pipelines = crate::load_pipelines(&pipeline_paths);
let event_filter = std::sync::Arc::new(crate::build_event_filter(jq, jsonpath));
let parsed_input_format = parse_input_format(&input_format, &syslog_tz);
let corr_config = crate::build_correlation_config(
suppress,
action,
no_detections,
correlation_event_mode,
max_correlation_events,
timestamp_fields,
×tamp_fallback,
);
let addr: std::net::SocketAddr = api_addr.parse().unwrap_or_else(|e| {
eprintln!("Invalid API address '{api_addr}': {e}");
process::exit(exit_code::CONFIG_ERROR);
});
#[cfg(feature = "daemon-nats")]
let nats_config = rsigma_runtime::NatsConnectConfig {
credentials_file: nats_auth.nats_creds,
token: nats_auth.nats_token,
username: nats_auth.nats_user,
password: nats_auth.nats_password,
nkey: nats_auth.nats_nkey,
tls_client_cert: nats_auth.nats_tls_cert,
tls_client_key: nats_auth.nats_tls_key,
require_tls: nats_auth.nats_require_tls,
..Default::default()
};
let file_pipeline_paths: Vec<PathBuf> = pipeline_paths
.into_iter()
.filter(|p| resolve_builtin_pipeline(p.to_str().unwrap_or("")).is_none())
.collect();
let config = daemon::server::DaemonConfig {
rules_path,
pipelines,
pipeline_paths: file_pipeline_paths,
corr_config,
include_event,
pretty,
api_addr: addr,
event_filter,
state_db,
state_save_interval,
input,
output,
buffer_size,
batch_size,
drain_timeout,
dlq,
input_format: parsed_input_format,
#[cfg(feature = "daemon-nats")]
nats_config,
#[cfg(feature = "daemon-nats")]
replay_policy,
#[cfg(feature = "daemon-nats")]
consumer_group,
state_restore_mode,
allow_remote_include,
bloom_prefilter,
bloom_max_bytes,
#[cfg(feature = "daachorse-index")]
cross_rule_ac,
};
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap_or_else(|e| {
eprintln!("Failed to create Tokio runtime: {e}");
process::exit(exit_code::CONFIG_ERROR);
});
rt.block_on(daemon::run_daemon(config));
}
pub(crate) fn parse_input_format(format_str: &str, syslog_tz: &str) -> rsigma_runtime::InputFormat {
use rsigma_runtime::InputFormat;
use rsigma_runtime::input::SyslogConfig;
let tz_secs = parse_tz_offset(syslog_tz);
match format_str {
"auto" => InputFormat::Auto(SyslogConfig {
default_tz_offset_secs: tz_secs,
}),
"json" => InputFormat::Json,
"syslog" => InputFormat::Syslog(SyslogConfig {
default_tz_offset_secs: tz_secs,
}),
"plain" => InputFormat::Plain,
#[cfg(feature = "logfmt")]
"logfmt" => InputFormat::Logfmt,
#[cfg(feature = "cef")]
"cef" => InputFormat::Cef,
other => {
eprintln!("Unknown input format: '{other}'");
eprintln!("Supported formats: auto, json, syslog, plain");
#[cfg(feature = "logfmt")]
eprintln!(" (with logfmt feature): logfmt");
#[cfg(feature = "cef")]
eprintln!(" (with cef feature): cef");
process::exit(exit_code::CONFIG_ERROR);
}
}
}
fn parse_tz_offset(s: &str) -> i32 {
let s = s.trim();
if s == "UTC" || s == "utc" || s == "Z" || s == "+00:00" {
return 0;
}
let (sign, rest) = if let Some(rest) = s.strip_prefix('+') {
(1i32, rest)
} else if let Some(rest) = s.strip_prefix('-') {
(-1i32, rest)
} else {
eprintln!("Invalid timezone offset: '{s}' (expected +HH:MM or -HH:MM)");
process::exit(exit_code::CONFIG_ERROR);
};
let parts: Vec<&str> = rest.split(':').collect();
if parts.len() != 2 {
eprintln!("Invalid timezone offset: '{s}' (expected +HH:MM or -HH:MM)");
process::exit(exit_code::CONFIG_ERROR);
}
let hours: i32 = parts[0].parse().unwrap_or_else(|_| {
eprintln!("Invalid timezone offset hours: '{}'", parts[0]);
process::exit(exit_code::CONFIG_ERROR);
});
let minutes: i32 = parts[1].parse().unwrap_or_else(|_| {
eprintln!("Invalid timezone offset minutes: '{}'", parts[1]);
process::exit(exit_code::CONFIG_ERROR);
});
sign * (hours * 3600 + minutes * 60)
}