rsigma-runtime 0.18.0

Streaming runtime for rsigma — event sources, sinks, and log processing pipeline
Documentation
/// Abstraction for runtime metrics so the runtime crate does not depend on
/// `prometheus` directly. The CLI (or any other consumer) provides a concrete
/// implementation backed by Prometheus, OpenTelemetry, or whatever it prefers.
pub trait MetricsHook: Send + Sync {
    /// A JSON line failed to parse.
    fn on_parse_error(&self);
    /// `count` events were successfully evaluated.
    fn on_events_processed(&self, count: u64);
    /// `count` detection rule matches were produced.
    fn on_detection_matches(&self, count: u64);
    /// `count` correlation rule matches were produced.
    fn on_correlation_matches(&self, count: u64);
    /// Observe per-event processing latency in seconds.
    fn observe_processing_latency(&self, seconds: f64);
    /// The input queue depth changed by `delta` (positive = enqueue, negative = dequeue).
    fn on_input_queue_depth_change(&self, delta: i64);
    /// Back-pressure event: a source tried to send but the channel was full.
    fn on_back_pressure(&self);
    /// Observe the batch size used for a single engine lock acquisition.
    fn observe_batch_size(&self, size: u64);
    /// The output queue depth changed by `delta`.
    fn on_output_queue_depth_change(&self, delta: i64);
    /// Observe end-to-end pipeline latency (dequeue → sink) in seconds.
    fn observe_pipeline_latency(&self, seconds: f64);
    /// Report current correlation state entry count.
    fn set_correlation_state_entries(&self, count: u64);

    /// A single detection rule matched. Labels enable per-rule Prometheus counters.
    fn on_detection_match_detail(&self, _rule_title: &str, _level: &str) {}
    /// A single correlation rule matched. Labels enable per-rule Prometheus counters.
    fn on_correlation_match_detail(
        &self,
        _rule_title: &str,
        _level: &str,
        _correlation_type: &str,
    ) {
    }

    /// One enrichment call completed (success, skip, error, timeout, drop).
    /// `kind` is the enricher's declared kind (`detection` / `correlation`).
    /// `status` is one of `success` / `skip` / `error` / `timeout` / `drop`.
    fn on_enrichment_completed(
        &self,
        _enricher_id: &str,
        _kind: &str,
        _status: &str,
        _duration_seconds: f64,
    ) {
    }
    /// The enrichment queue depth changed (positive = a result entered the
    /// pipeline, negative = a result completed). Sum across both kinds.
    fn on_enrichment_queue_depth_change(&self, _delta: i64) {}
    /// HTTP enrichment cache lookup hit a live entry.
    fn on_enrichment_http_cache_hit(&self, _enricher_id: &str) {}
    /// HTTP enrichment cache lookup missed (no entry).
    fn on_enrichment_http_cache_miss(&self, _enricher_id: &str) {}
    /// HTTP enrichment cache lookup found an expired entry and evicted it.
    fn on_enrichment_http_cache_expiration(&self, _enricher_id: &str) {}

    /// Pre-register an enricher's `enricher_id` + `kind` label set so
    /// `rsigma_enrichment_total{...}` and
    /// `rsigma_enrichment_duration_seconds{...}` are emitted with their
    /// `# HELP` and `# TYPE` lines from the very first `/metrics`
    /// scrape, even before the enricher has run. Called once per
    /// configured enricher at pipeline construction.
    fn register_enricher(&self, _enricher_id: &str, _kind: &str) {}
    /// Pre-register the three HTTP-cache counter label sets for an
    /// `HttpEnricher` instance so the cache counters are visible on
    /// `/metrics` from startup, even before any cache event fires.
    fn register_http_enricher_cache(&self, _enricher_id: &str) {}

    /// Pre-register a sink's `sink` label so its delivery metrics appear on
    /// `/metrics` from startup, before the first event flows. `sink` is the
    /// sink's kind label (`stdout` / `file` / `nats` / ...).
    fn register_sink(&self, _sink: &str) {}
    /// A sink's bounded delivery queue changed by `delta` (positive = enqueue,
    /// negative = dequeue into the worker).
    fn on_sink_queue_depth_change(&self, _sink: &str, _delta: i64) {}
    /// A sink retried a delivery after a retryable failure.
    fn on_sink_retry(&self, _sink: &str) {}
    /// A result was dropped because a lossy sink's queue was full
    /// (`on_full=drop`).
    fn on_sink_dropped(&self, _sink: &str) {}
    /// A sink exhausted its retries and routed the result to the DLQ.
    fn on_sink_delivery_failed(&self, _sink: &str) {}

    /// Pre-register a webhook's `webhook_id` label so its request metrics
    /// appear on `/metrics` from the first scrape, before any traffic.
    fn register_webhook(&self, _webhook_id: &str) {}
    /// A webhook request completed with `outcome` (`success` or
    /// `permanent_failure`), taking `duration_secs`. Retryable failures are
    /// covered by the shared `on_sink_retry` / `on_sink_delivery_failed`.
    fn on_webhook_request(&self, _webhook_id: &str, _outcome: &'static str, _duration_secs: f64) {}
    /// A webhook had to wait for its rate-limiter token bucket to refill.
    fn on_webhook_rate_limited(&self, _webhook_id: &str) {}

    /// An alert-pipeline dedup outcome. `action` is one of `emitted` /
    /// `folded` / `repeat` / `resolved`.
    fn on_alert_pipeline_result(&self, _action: &str) {}
    /// Report the current number of active dedup alerts in the store.
    fn set_alert_pipeline_store_entries(&self, _count: i64) {}
    /// An active alert was evicted (resolved out of the store).
    fn on_alert_pipeline_eviction(&self) {}
    /// A dedup summary record (a `repeat` re-emit or a `resolved` record) was
    /// emitted.
    fn on_alert_pipeline_summary_emitted(&self) {}
    /// Observe the alert-pipeline stage duration in seconds.
    fn observe_alert_pipeline_duration(&self, _seconds: f64) {}

    /// An incident was emitted. `trigger` is one of `group_wait` /
    /// `group_interval` / `repeat` / `resolved`.
    fn on_incident_emitted(&self, _trigger: &str) {}
    /// Report the current number of open incidents.
    fn set_incidents_open(&self, _count: i64) {}
    /// An entity-graph guard suppressed a join. `guard` is `stop_value` or
    /// `cardinality_ceiling`.
    fn on_alert_pipeline_overmerge(&self, _guard: &str) {}

    /// A result was muted by an active silence.
    fn on_alert_pipeline_silenced(&self) {}
    /// Report the current number of active silences.
    fn set_silences_active(&self, _count: i64) {}
    /// A result was muted by an inhibition rule. `rule` is the rule name.
    fn on_alert_pipeline_inhibited(&self, _rule: &str) {}
    /// Report the current number of active inhibition sources.
    fn set_inhibit_sources_active(&self, _count: i64) {}

    /// A risk-annotation outcome. `action` is one of `scored` / `no_entity` /
    /// `skipped` (out of scope).
    fn on_risk_annotation(&self, _action: &str) {}
    /// Observe the per-detection resolved risk score.
    fn observe_risk_annotation_score(&self, _score: f64) {}
    /// `count` risk objects were extracted from one detection.
    fn on_risk_objects(&self, _count: u64) {}
    /// Observe the risk-layer stage duration in seconds.
    fn observe_risk_layer_duration(&self, _seconds: f64) {}
    /// A risk incident was emitted. `trigger` is `score` or `tactic_count`.
    fn on_risk_incident_emitted(&self, _trigger: &str) {}
    /// Report the current number of tracked entities.
    fn set_risk_entities_open(&self, _count: i64) {}
    /// Report the current number of retained contributions across all entities.
    fn set_risk_state_entries(&self, _count: i64) {}
    /// A new entity could not be tracked (store full) or an aged-out entity was
    /// pruned.
    fn on_risk_eviction(&self) {}
}

/// No-op implementation for use when metrics are disabled (e.g., `rsigma run`).
pub struct NoopMetrics;

impl MetricsHook for NoopMetrics {
    fn on_parse_error(&self) {}
    fn on_events_processed(&self, _count: u64) {}
    fn on_detection_matches(&self, _count: u64) {}
    fn on_correlation_matches(&self, _count: u64) {}
    fn observe_processing_latency(&self, _seconds: f64) {}
    fn on_input_queue_depth_change(&self, _delta: i64) {}
    fn on_back_pressure(&self) {}
    fn observe_batch_size(&self, _size: u64) {}
    fn on_output_queue_depth_change(&self, _delta: i64) {}
    fn observe_pipeline_latency(&self, _seconds: f64) {}
    fn set_correlation_state_entries(&self, _count: u64) {}
}