Skip to main content

hyperi_rustlib/metrics/
dfe.rs

1// Project:   hyperi-rustlib
2// File:      src/metrics/dfe.rs
3// Purpose:   Standard DFE metric definitions with transport labels
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Standard DFE metrics for pipeline components (receiver, loader, engine).
10//!
11//! Call [`DfeMetrics::register`] **after** creating a
12//! [`MetricsManager`](super::MetricsManager): the manager must exist so platform
13//! metrics land in the manifest registry. Methods are `#[inline]` for hot-path use.
14//!
15//! ## Example
16//!
17//! ```rust,no_run
18//! use hyperi_rustlib::metrics::{MetricsManager, DfeMetrics, TransportKind};
19//!
20//! let mgr = MetricsManager::new("myapp");
21//! let dfe = DfeMetrics::register(&mgr);
22//!
23//! dfe.transport_sent(TransportKind::Kafka, 100);
24//! dfe.records_received(500);
25//! dfe.scaling_pressure(42.0);
26//! ```
27
28use super::manifest::{MetricDescriptor, MetricType};
29
30/// Standard DFE metric set: labelled counters, gauges, and histograms across
31/// transport, pipeline, records, scaling, spool, and security.
32///
33/// Construct via [`DfeMetrics::register`] -- describes all metrics with the
34/// global recorder AND pushes descriptors into the manifest registry.
35pub struct DfeMetrics {
36    /// Prevent external construction.
37    _private: (),
38}
39
40impl DfeMetrics {
41    /// Register all DFE metric descriptions with the global recorder and
42    /// manifest registry. Call **once** after creating a
43    /// [`MetricsManager`](super::MetricsManager). Returned handle is zero-sized
44    /// (recording goes through the global `metrics!` macros).
45    ///
46    /// **Breaking change (v1.22):** takes `&MetricsManager` so platform metrics
47    /// are tightly coupled with the manifest registry.
48    #[must_use]
49    #[allow(clippy::too_many_lines)]
50    pub fn register(manager: &super::MetricsManager) -> Self {
51        let reg = manager.registry();
52
53        // --- Transport ---
54        metrics::describe_counter!(
55            "dfe_transport_sent_total",
56            "Messages successfully sent to transport"
57        );
58        metrics::describe_counter!(
59            "dfe_transport_send_errors_total",
60            "Messages that failed to send"
61        );
62        metrics::describe_counter!(
63            "dfe_transport_backpressured_total",
64            "Messages delayed due to backpressure"
65        );
66        metrics::describe_counter!(
67            "dfe_transport_refused_total",
68            "Messages refused by transport (circuit open, capacity)"
69        );
70        metrics::describe_gauge!(
71            "dfe_transport_healthy",
72            "Transport health (1=healthy, 0=unhealthy)"
73        );
74        metrics::describe_gauge!(
75            "dfe_transport_queue_size",
76            "Current number of messages in transport queue"
77        );
78        metrics::describe_gauge!(
79            "dfe_transport_queue_capacity",
80            "Maximum transport queue capacity"
81        );
82        metrics::describe_gauge!(
83            "dfe_transport_inflight",
84            "Messages currently in-flight (sent but not acked)"
85        );
86        metrics::describe_histogram!(
87            "dfe_transport_send_duration_seconds",
88            metrics::Unit::Seconds,
89            "Time to send a batch to transport"
90        );
91
92        // Push transport descriptors into manifest registry
93        for (name, desc, mt) in [
94            (
95                "dfe_transport_sent_total",
96                "Messages successfully sent to transport",
97                MetricType::Counter,
98            ),
99            (
100                "dfe_transport_send_errors_total",
101                "Messages that failed to send",
102                MetricType::Counter,
103            ),
104            (
105                "dfe_transport_backpressured_total",
106                "Messages delayed due to backpressure",
107                MetricType::Counter,
108            ),
109            (
110                "dfe_transport_refused_total",
111                "Messages refused by transport (circuit open, capacity)",
112                MetricType::Counter,
113            ),
114            (
115                "dfe_transport_healthy",
116                "Transport health (1=healthy, 0=unhealthy)",
117                MetricType::Gauge,
118            ),
119            (
120                "dfe_transport_queue_size",
121                "Current number of messages in transport queue",
122                MetricType::Gauge,
123            ),
124            (
125                "dfe_transport_queue_capacity",
126                "Maximum transport queue capacity",
127                MetricType::Gauge,
128            ),
129            (
130                "dfe_transport_inflight",
131                "Messages currently in-flight (sent but not acked)",
132                MetricType::Gauge,
133            ),
134        ] {
135            reg.push(MetricDescriptor {
136                name: name.into(),
137                metric_type: mt,
138                description: desc.into(),
139                unit: String::new(),
140                labels: vec!["transport".into()],
141                group: "platform".into(),
142                buckets: None,
143                use_cases: vec![],
144                dashboard_hint: None,
145            });
146        }
147        reg.push(MetricDescriptor {
148            name: "dfe_transport_send_duration_seconds".into(),
149            metric_type: MetricType::Histogram,
150            description: "Time to send a batch to transport".into(),
151            unit: "seconds".into(),
152            labels: vec!["transport".into()],
153            group: "platform".into(),
154            buckets: None,
155            use_cases: vec![],
156            dashboard_hint: None,
157        });
158
159        // --- Pipeline ---
160        metrics::describe_gauge!(
161            "dfe_pipeline_ready",
162            "Pipeline readiness (1=ready, 0=not ready)"
163        );
164        metrics::describe_counter!(
165            "dfe_pipeline_stall_seconds_total",
166            "Cumulative seconds the pipeline was stalled"
167        );
168
169        reg.push(MetricDescriptor {
170            name: "dfe_pipeline_ready".into(),
171            metric_type: MetricType::Gauge,
172            description: "Pipeline readiness (1=ready, 0=not ready)".into(),
173            unit: String::new(),
174            labels: vec![],
175            group: "platform".into(),
176            buckets: None,
177            use_cases: vec![],
178            dashboard_hint: None,
179        });
180        reg.push(MetricDescriptor {
181            name: "dfe_pipeline_stall_seconds_total".into(),
182            metric_type: MetricType::Counter,
183            description: "Cumulative seconds the pipeline was stalled".into(),
184            unit: "seconds".into(),
185            labels: vec![],
186            group: "platform".into(),
187            buckets: None,
188            use_cases: vec![],
189            dashboard_hint: None,
190        });
191
192        // --- Records ---
193        metrics::describe_counter!(
194            "dfe_records_received_total",
195            "Records received from all sources"
196        );
197        metrics::describe_counter!(
198            "dfe_records_delivered_total",
199            "Records successfully delivered to sink"
200        );
201        metrics::describe_counter!(
202            "dfe_records_filtered_total",
203            "Records dropped by filter/routing rules"
204        );
205        metrics::describe_counter!("dfe_records_dlq_total", "Records sent to dead letter queue");
206
207        for (name, desc) in [
208            (
209                "dfe_records_received_total",
210                "Records received from all sources",
211            ),
212            (
213                "dfe_records_delivered_total",
214                "Records successfully delivered to sink",
215            ),
216            (
217                "dfe_records_filtered_total",
218                "Records dropped by filter/routing rules",
219            ),
220            ("dfe_records_dlq_total", "Records sent to dead letter queue"),
221        ] {
222            reg.push(MetricDescriptor {
223                name: name.into(),
224                metric_type: MetricType::Counter,
225                description: desc.into(),
226                unit: String::new(),
227                labels: vec![],
228                group: "platform".into(),
229                buckets: None,
230                use_cases: vec![],
231                dashboard_hint: None,
232            });
233        }
234
235        // --- Scaling ---
236        metrics::describe_gauge!(
237            "dfe_scaling_pressure",
238            "Normalised scaling pressure (0-100)"
239        );
240        // 0/1 state gauge (documented), not a bool type. `unit` is empty.
241        metrics::describe_gauge!(
242            "dfe_scaling_circuit_open",
243            "Circuit breaker state (1=open, 0=closed)"
244        );
245        metrics::describe_gauge!(
246            "dfe_scaling_memory_pressure",
247            "Memory pressure ratio (0.0-1.0)"
248        );
249        // dual-emit: drop OLD in next release (MIGRATIONS) -- `_ratio` suffix
250        // for a 0-1 ratio gauge.
251        metrics::describe_gauge!(
252            "dfe_scaling_memory_pressure_ratio",
253            "Memory pressure ratio (0.0-1.0)"
254        );
255
256        for (name, desc) in [
257            (
258                "dfe_scaling_pressure",
259                "Normalised scaling pressure (0-100)",
260            ),
261            (
262                "dfe_scaling_circuit_open",
263                "Circuit breaker state (1=open, 0=closed)",
264            ),
265            (
266                "dfe_scaling_memory_pressure",
267                "Memory pressure ratio (0.0-1.0)",
268            ),
269            // dual-emit: drop OLD in next release (MIGRATIONS)
270            (
271                "dfe_scaling_memory_pressure_ratio",
272                "Memory pressure ratio (0.0-1.0)",
273            ),
274        ] {
275            reg.push(MetricDescriptor {
276                name: name.into(),
277                metric_type: MetricType::Gauge,
278                description: desc.into(),
279                unit: String::new(),
280                labels: vec![],
281                group: "platform".into(),
282                buckets: None,
283                use_cases: vec![],
284                dashboard_hint: None,
285            });
286        }
287
288        // --- Spool ---
289        metrics::describe_gauge!("dfe_spool_bytes", "Current spool size in bytes");
290        metrics::describe_gauge!("dfe_spool_messages", "Current spool message count");
291        metrics::describe_gauge!(
292            "dfe_spool_disk_available",
293            "Available disk space for spool in bytes"
294        );
295        // dual-emit: drop OLD in next release (MIGRATIONS) -- `_bytes` base-unit
296        // suffix; aligns with tiered_sink's `dfe_spool_disk_available_bytes`.
297        metrics::describe_gauge!(
298            "dfe_spool_disk_available_bytes",
299            "Available disk space for spool in bytes"
300        );
301
302        for (name, desc) in [
303            ("dfe_spool_bytes", "Current spool size in bytes"),
304            ("dfe_spool_messages", "Current spool message count"),
305            (
306                "dfe_spool_disk_available",
307                "Available disk space for spool in bytes",
308            ),
309            // dual-emit: drop OLD in next release (MIGRATIONS)
310            (
311                "dfe_spool_disk_available_bytes",
312                "Available disk space for spool in bytes",
313            ),
314        ] {
315            reg.push(MetricDescriptor {
316                name: name.into(),
317                metric_type: MetricType::Gauge,
318                description: desc.into(),
319                unit: String::new(),
320                labels: vec![],
321                group: "platform".into(),
322                buckets: None,
323                use_cases: vec![],
324                dashboard_hint: None,
325            });
326        }
327
328        // --- Security ---
329        metrics::describe_counter!(
330            "dfe_auth_failures_total",
331            "Authentication failures by reason"
332        );
333        metrics::describe_counter!(
334            "dfe_validation_failures_total",
335            "Validation failures by reason"
336        );
337
338        reg.push(MetricDescriptor {
339            name: "dfe_auth_failures_total".into(),
340            metric_type: MetricType::Counter,
341            description: "Authentication failures by reason".into(),
342            unit: String::new(),
343            labels: vec!["reason".into()],
344            group: "platform".into(),
345            buckets: None,
346            use_cases: vec![],
347            dashboard_hint: None,
348        });
349        reg.push(MetricDescriptor {
350            name: "dfe_validation_failures_total".into(),
351            metric_type: MetricType::Counter,
352            description: "Validation failures by reason".into(),
353            unit: String::new(),
354            labels: vec!["reason".into()],
355            group: "platform".into(),
356            buckets: None,
357            use_cases: vec![],
358            dashboard_hint: None,
359        });
360
361        Self { _private: () }
362    }
363
364    // ── Transport ────────────────────────────────────────────────────
365
366    /// Record messages successfully sent to a transport.
367    #[inline]
368    pub fn transport_sent(&self, transport: super::TransportKind, count: u64) {
369        metrics::counter!("dfe_transport_sent_total", "transport" => transport.as_label())
370            .increment(count);
371    }
372
373    /// Record send errors for a transport.
374    #[inline]
375    pub fn transport_send_errors(&self, transport: super::TransportKind, count: u64) {
376        metrics::counter!("dfe_transport_send_errors_total", "transport" => transport.as_label())
377            .increment(count);
378    }
379
380    /// Record backpressure events for a transport.
381    #[inline]
382    pub fn transport_backpressured(&self, transport: &str, count: u64) {
383        metrics::counter!("dfe_transport_backpressured_total", "transport" => transport.to_string())
384            .increment(count);
385    }
386
387    /// Record refused messages for a transport.
388    #[inline]
389    pub fn transport_refused(&self, transport: &str, count: u64) {
390        metrics::counter!("dfe_transport_refused_total", "transport" => transport.to_string())
391            .increment(count);
392    }
393
394    /// Set transport health status.
395    ///
396    /// `dfe_transport_healthy` is a 0/1 state gauge (documented), not a bool
397    /// type -- 1=healthy, 0=unhealthy.
398    #[inline]
399    pub fn transport_healthy(&self, transport: &str, healthy: bool) {
400        metrics::gauge!("dfe_transport_healthy", "transport" => transport.to_string())
401            .set(if healthy { 1.0 } else { 0.0 });
402    }
403
404    /// Set current transport queue size.
405    #[inline]
406    pub fn transport_queue_size(&self, transport: &str, size: f64) {
407        metrics::gauge!("dfe_transport_queue_size", "transport" => transport.to_string()).set(size);
408    }
409
410    /// Set transport queue capacity.
411    #[inline]
412    pub fn transport_queue_capacity(&self, transport: &str, capacity: f64) {
413        metrics::gauge!("dfe_transport_queue_capacity", "transport" => transport.to_string())
414            .set(capacity);
415    }
416
417    /// Set in-flight message count for a transport.
418    #[inline]
419    pub fn transport_inflight(&self, transport: &str, count: f64) {
420        metrics::gauge!("dfe_transport_inflight", "transport" => transport.to_string()).set(count);
421    }
422
423    /// Record batch send duration for a transport.
424    #[inline]
425    pub fn transport_send_duration(&self, transport: &str, seconds: f64) {
426        metrics::histogram!(
427            "dfe_transport_send_duration_seconds",
428            "transport" => transport.to_string()
429        )
430        .record(seconds);
431    }
432
433    // ── Pipeline ─────────────────────────────────────────────────────
434
435    /// Set pipeline readiness state.
436    ///
437    /// `dfe_pipeline_ready` is a 0/1 state gauge (documented), not a bool type
438    /// -- 1=ready, 0=not ready.
439    #[inline]
440    pub fn pipeline_ready(&self, ready: bool) {
441        metrics::gauge!("dfe_pipeline_ready").set(if ready { 1.0 } else { 0.0 });
442    }
443
444    /// Add stall duration to the cumulative stall counter (whole seconds).
445    #[inline]
446    pub fn pipeline_stall(&self, seconds: u64) {
447        metrics::counter!("dfe_pipeline_stall_seconds_total").increment(seconds);
448    }
449
450    // ── Records ──────────────────────────────────────────────────────
451
452    /// Record incoming records.
453    #[inline]
454    pub fn records_received(&self, count: u64) {
455        metrics::counter!("dfe_records_received_total").increment(count);
456    }
457
458    /// Record successfully delivered records.
459    #[inline]
460    pub fn records_delivered(&self, count: u64) {
461        metrics::counter!("dfe_records_delivered_total").increment(count);
462    }
463
464    /// Record filtered/dropped records.
465    #[inline]
466    pub fn records_filtered(&self, count: u64) {
467        metrics::counter!("dfe_records_filtered_total").increment(count);
468    }
469
470    /// Record records sent to dead letter queue.
471    #[inline]
472    pub fn records_dlq(&self, count: u64) {
473        metrics::counter!("dfe_records_dlq_total").increment(count);
474    }
475
476    // ── Scaling ──────────────────────────────────────────────────────
477
478    /// Set normalised scaling pressure (0-100).
479    #[inline]
480    pub fn scaling_pressure(&self, pressure: f64) {
481        metrics::gauge!("dfe_scaling_pressure").set(pressure);
482    }
483
484    /// Set circuit breaker state.
485    ///
486    /// `dfe_scaling_circuit_open` is a 0/1 state gauge (documented), not a bool
487    /// type -- 1=open, 0=closed.
488    #[inline]
489    pub fn scaling_circuit_open(&self, open: bool) {
490        metrics::gauge!("dfe_scaling_circuit_open").set(if open { 1.0 } else { 0.0 });
491    }
492
493    /// Set memory pressure ratio (0.0-1.0).
494    #[inline]
495    pub fn scaling_memory_pressure(&self, ratio: f64) {
496        metrics::gauge!("dfe_scaling_memory_pressure").set(ratio);
497        // dual-emit: drop OLD in next release (MIGRATIONS) -- `_ratio` suffix.
498        metrics::gauge!("dfe_scaling_memory_pressure_ratio").set(ratio);
499    }
500
501    // ── Spool ────────────────────────────────────────────────────────
502
503    /// Set current spool size in bytes.
504    #[inline]
505    pub fn spool_bytes(&self, bytes: f64) {
506        metrics::gauge!("dfe_spool_bytes").set(bytes);
507    }
508
509    /// Set current spool message count.
510    #[inline]
511    pub fn spool_messages(&self, count: f64) {
512        metrics::gauge!("dfe_spool_messages").set(count);
513    }
514
515    /// Set available disk space for spool.
516    #[inline]
517    pub fn spool_disk_available(&self, bytes: f64) {
518        metrics::gauge!("dfe_spool_disk_available").set(bytes);
519        // dual-emit: drop OLD in next release (MIGRATIONS) -- `_bytes` suffix;
520        // aligns with tiered_sink's `dfe_spool_disk_available_bytes`.
521        metrics::gauge!("dfe_spool_disk_available_bytes").set(bytes);
522    }
523
524    // ── Security ─────────────────────────────────────────────────────
525
526    /// Record authentication failure.
527    #[inline]
528    pub fn auth_failure(&self, reason: super::AuthFailureReason) {
529        metrics::counter!("dfe_auth_failures_total", "reason" => reason.as_label()).increment(1);
530    }
531
532    /// Record validation failure.
533    #[inline]
534    pub fn validation_failure(&self, reason: super::ValidationFailureReason) {
535        metrics::counter!("dfe_validation_failures_total", "reason" => reason.as_label())
536            .increment(1);
537    }
538}
539
540#[cfg(test)]
541mod tests {
542    use super::*;
543
544    #[tokio::test]
545    async fn test_register_does_not_panic() {
546        let mgr = super::super::MetricsManager::new_for_test("test_app");
547        let _dfe = DfeMetrics::register(&mgr);
548    }
549
550    #[tokio::test]
551    async fn test_register_populates_registry() {
552        let mgr = super::super::MetricsManager::new_for_test("test_app");
553        let _dfe = DfeMetrics::register(&mgr);
554        let manifest = mgr.registry().manifest();
555        let names: Vec<&str> = manifest.metrics.iter().map(|m| m.name.as_str()).collect();
556        assert!(names.contains(&"dfe_transport_sent_total"));
557        assert!(names.contains(&"dfe_pipeline_ready"));
558        assert!(names.contains(&"dfe_records_received_total"));
559        assert!(names.contains(&"dfe_scaling_pressure"));
560        assert!(names.contains(&"dfe_spool_bytes"));
561        assert!(names.contains(&"dfe_auth_failures_total"));
562        // All should be group=platform
563        for m in &manifest.metrics {
564            assert_eq!(m.group, "platform");
565        }
566        // Transport metrics should have "transport" label
567        let sent = manifest
568            .metrics
569            .iter()
570            .find(|m| m.name == "dfe_transport_sent_total")
571            .unwrap();
572        assert_eq!(sent.labels, vec!["transport"]);
573        // Security metrics should have "reason" label
574        let auth = manifest
575            .metrics
576            .iter()
577            .find(|m| m.name == "dfe_auth_failures_total")
578            .unwrap();
579        assert_eq!(auth.labels, vec!["reason"]);
580    }
581
582    #[tokio::test]
583    async fn test_methods_callable_without_recorder() {
584        let mgr = super::super::MetricsManager::new("test_app");
585        let dfe = DfeMetrics::register(&mgr);
586
587        dfe.transport_sent(super::super::TransportKind::Kafka, 1);
588        dfe.transport_send_errors(super::super::TransportKind::Kafka, 1);
589        dfe.transport_backpressured("kafka", 1);
590        dfe.transport_refused("kafka", 1);
591        dfe.transport_healthy("kafka", true);
592        dfe.transport_queue_size("kafka", 100.0);
593        dfe.transport_queue_capacity("kafka", 1000.0);
594        dfe.transport_inflight("kafka", 50.0);
595        dfe.transport_send_duration("kafka", 0.042);
596
597        dfe.pipeline_ready(true);
598        dfe.pipeline_stall(1);
599
600        dfe.records_received(100);
601        dfe.records_delivered(99);
602        dfe.records_filtered(1);
603        dfe.records_dlq(0);
604
605        dfe.scaling_pressure(42.0);
606        dfe.scaling_circuit_open(false);
607        dfe.scaling_memory_pressure(0.65);
608
609        dfe.spool_bytes(1024.0);
610        dfe.spool_messages(10.0);
611        dfe.spool_disk_available(1_000_000.0);
612
613        dfe.auth_failure(super::super::AuthFailureReason::MalformedToken);
614        dfe.validation_failure(super::super::ValidationFailureReason::FieldMissing);
615    }
616}