1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
/// Abstraction for runtime metrics so the runtime crate does not depend on
/// `prometheus` directly. The CLI (or any other consumer) provides a concrete
/// implementation backed by Prometheus, OpenTelemetry, or whatever it prefers.
pub trait MetricsHook: Send + Sync {
/// A JSON line failed to parse.
fn on_parse_error(&self);
/// `count` events were successfully evaluated.
fn on_events_processed(&self, count: u64);
/// `count` detection rule matches were produced.
fn on_detection_matches(&self, count: u64);
/// `count` correlation rule matches were produced.
fn on_correlation_matches(&self, count: u64);
/// Observe per-event processing latency in seconds.
fn observe_processing_latency(&self, seconds: f64);
/// The input queue depth changed by `delta` (positive = enqueue, negative = dequeue).
fn on_input_queue_depth_change(&self, delta: i64);
/// Back-pressure event: a source tried to send but the channel was full.
fn on_back_pressure(&self);
/// Observe the batch size used for a single engine lock acquisition.
fn observe_batch_size(&self, size: u64);
/// The output queue depth changed by `delta`.
fn on_output_queue_depth_change(&self, delta: i64);
/// Observe end-to-end pipeline latency (dequeue → sink) in seconds.
fn observe_pipeline_latency(&self, seconds: f64);
/// Report current correlation state entry count.
fn set_correlation_state_entries(&self, count: u64);
/// A single detection rule matched. Labels enable per-rule Prometheus counters.
fn on_detection_match_detail(&self, _rule_title: &str, _level: &str) {}
/// A single correlation rule matched. Labels enable per-rule Prometheus counters.
fn on_correlation_match_detail(
&self,
_rule_title: &str,
_level: &str,
_correlation_type: &str,
) {
}
/// One enrichment call completed (success, skip, error, timeout, drop).
/// `kind` is the enricher's declared kind (`detection` / `correlation`).
/// `status` is one of `success` / `skip` / `error` / `timeout` / `drop`.
fn on_enrichment_completed(
&self,
_enricher_id: &str,
_kind: &str,
_status: &str,
_duration_seconds: f64,
) {
}
/// The enrichment queue depth changed (positive = a result entered the
/// pipeline, negative = a result completed). Sum across both kinds.
fn on_enrichment_queue_depth_change(&self, _delta: i64) {}
/// HTTP enrichment cache lookup hit a live entry.
fn on_enrichment_http_cache_hit(&self, _enricher_id: &str) {}
/// HTTP enrichment cache lookup missed (no entry).
fn on_enrichment_http_cache_miss(&self, _enricher_id: &str) {}
/// HTTP enrichment cache lookup found an expired entry and evicted it.
fn on_enrichment_http_cache_expiration(&self, _enricher_id: &str) {}
/// Pre-register an enricher's `enricher_id` + `kind` label set so
/// `rsigma_enrichment_total{...}` and
/// `rsigma_enrichment_duration_seconds{...}` are emitted with their
/// `# HELP` and `# TYPE` lines from the very first `/metrics`
/// scrape, even before the enricher has run. Called once per
/// configured enricher at pipeline construction.
fn register_enricher(&self, _enricher_id: &str, _kind: &str) {}
/// Pre-register the three HTTP-cache counter label sets for an
/// `HttpEnricher` instance so the cache counters are visible on
/// `/metrics` from startup, even before any cache event fires.
fn register_http_enricher_cache(&self, _enricher_id: &str) {}
/// Pre-register a sink's `sink` label so its delivery metrics appear on
/// `/metrics` from startup, before the first event flows. `sink` is the
/// sink's kind label (`stdout` / `file` / `nats` / ...).
fn register_sink(&self, _sink: &str) {}
/// A sink's bounded delivery queue changed by `delta` (positive = enqueue,
/// negative = dequeue into the worker).
fn on_sink_queue_depth_change(&self, _sink: &str, _delta: i64) {}
/// A sink retried a delivery after a retryable failure.
fn on_sink_retry(&self, _sink: &str) {}
/// A result was dropped because a lossy sink's queue was full
/// (`on_full=drop`).
fn on_sink_dropped(&self, _sink: &str) {}
/// A sink exhausted its retries and routed the result to the DLQ.
fn on_sink_delivery_failed(&self, _sink: &str) {}
/// Pre-register a webhook's `webhook_id` label so its request metrics
/// appear on `/metrics` from the first scrape, before any traffic.
fn register_webhook(&self, _webhook_id: &str) {}
/// A webhook request completed with `outcome` (`success` or
/// `permanent_failure`), taking `duration_secs`. Retryable failures are
/// covered by the shared `on_sink_retry` / `on_sink_delivery_failed`.
fn on_webhook_request(&self, _webhook_id: &str, _outcome: &'static str, _duration_secs: f64) {}
/// A webhook had to wait for its rate-limiter token bucket to refill.
fn on_webhook_rate_limited(&self, _webhook_id: &str) {}
/// An alert-pipeline dedup outcome. `action` is one of `emitted` /
/// `folded` / `repeat` / `resolved`.
fn on_alert_pipeline_result(&self, _action: &str) {}
/// Report the current number of active dedup alerts in the store.
fn set_alert_pipeline_store_entries(&self, _count: i64) {}
/// An active alert was evicted (resolved out of the store).
fn on_alert_pipeline_eviction(&self) {}
/// A dedup summary record (a `repeat` re-emit or a `resolved` record) was
/// emitted.
fn on_alert_pipeline_summary_emitted(&self) {}
/// Observe the alert-pipeline stage duration in seconds.
fn observe_alert_pipeline_duration(&self, _seconds: f64) {}
/// An incident was emitted. `trigger` is one of `group_wait` /
/// `group_interval` / `repeat` / `resolved`.
fn on_incident_emitted(&self, _trigger: &str) {}
/// Report the current number of open incidents.
fn set_incidents_open(&self, _count: i64) {}
/// An entity-graph guard suppressed a join. `guard` is `stop_value` or
/// `cardinality_ceiling`.
fn on_alert_pipeline_overmerge(&self, _guard: &str) {}
/// A result was muted by an active silence.
fn on_alert_pipeline_silenced(&self) {}
/// Report the current number of active silences.
fn set_silences_active(&self, _count: i64) {}
/// A result was muted by an inhibition rule. `rule` is the rule name.
fn on_alert_pipeline_inhibited(&self, _rule: &str) {}
/// Report the current number of active inhibition sources.
fn set_inhibit_sources_active(&self, _count: i64) {}
/// A risk-annotation outcome. `action` is one of `scored` / `no_entity` /
/// `skipped` (out of scope).
fn on_risk_annotation(&self, _action: &str) {}
/// Observe the per-detection resolved risk score.
fn observe_risk_annotation_score(&self, _score: f64) {}
/// `count` risk objects were extracted from one detection.
fn on_risk_objects(&self, _count: u64) {}
/// Observe the risk-layer stage duration in seconds.
fn observe_risk_layer_duration(&self, _seconds: f64) {}
/// A risk incident was emitted. `trigger` is `score` or `tactic_count`.
fn on_risk_incident_emitted(&self, _trigger: &str) {}
/// Report the current number of tracked entities.
fn set_risk_entities_open(&self, _count: i64) {}
/// Report the current number of retained contributions across all entities.
fn set_risk_state_entries(&self, _count: i64) {}
/// A new entity could not be tracked (store full) or an aged-out entity was
/// pruned.
fn on_risk_eviction(&self) {}
}
/// No-op implementation for use when metrics are disabled (e.g., `rsigma run`).
pub struct NoopMetrics;
impl MetricsHook for NoopMetrics {
fn on_parse_error(&self) {}
fn on_events_processed(&self, _count: u64) {}
fn on_detection_matches(&self, _count: u64) {}
fn on_correlation_matches(&self, _count: u64) {}
fn observe_processing_latency(&self, _seconds: f64) {}
fn on_input_queue_depth_change(&self, _delta: i64) {}
fn on_back_pressure(&self) {}
fn observe_batch_size(&self, _size: u64) {}
fn on_output_queue_depth_change(&self, _delta: i64) {}
fn observe_pipeline_latency(&self, _seconds: f64) {}
fn set_correlation_state_entries(&self, _count: u64) {}
}