Skip to main content

hyperi_rustlib/metrics/
dfe.rs

1// Project:   hyperi-rustlib
2// File:      src/metrics/dfe.rs
3// Purpose:   Standard DFE metric definitions with transport labels
4// Language:  Rust
5//
6// License:   FSL-1.1-ALv2
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Standard DFE metrics.
10//!
11//! Pre-defined metric set for DFE pipeline components (receiver, loader, engine).
12//! Call [`DfeMetrics::register`] **after** creating a
13//! [`MetricsManager`](super::MetricsManager) — the manager must exist so that
14//! platform metrics are automatically captured in the manifest registry.
15//!
16//! All methods are `#[inline]` and designed for hot-path use.
17//!
18//! ## Example
19//!
20//! ```rust,no_run
21//! use hyperi_rustlib::metrics::{MetricsManager, DfeMetrics};
22//!
23//! let mgr = MetricsManager::new("myapp");
24//! let dfe = DfeMetrics::register(&mgr);
25//!
26//! dfe.transport_sent("kafka", 100);
27//! dfe.records_received(500);
28//! dfe.scaling_pressure(42.0);
29//! ```
30
31use super::manifest::{MetricDescriptor, MetricType};
32
33/// Standard DFE metric set.
34///
35/// Provides labelled counters, gauges, and histograms covering transport,
36/// pipeline, records, scaling, spool, and security concerns.
37///
38/// Construct via [`DfeMetrics::register`] — this describes all metrics with
39/// the global recorder AND pushes descriptors into the manifest registry.
40pub struct DfeMetrics {
41    /// Prevent external construction.
42    _private: (),
43}
44
45impl DfeMetrics {
46    /// Register all DFE metric descriptions with the global recorder and
47    /// manifest registry.
48    ///
49    /// Call this **once** after creating a [`MetricsManager`](super::MetricsManager).
50    /// The returned handle is cheaply clonable (it's zero-sized — all recording
51    /// goes through the global `metrics!` macros).
52    ///
53    /// **Breaking change (v1.22):** Now takes `&MetricsManager` to ensure
54    /// platform metrics are tightly coupled with the manifest registry.
55    #[must_use]
56    #[allow(clippy::too_many_lines)]
57    pub fn register(manager: &super::MetricsManager) -> Self {
58        let reg = manager.registry();
59
60        // --- Transport ---
61        metrics::describe_counter!(
62            "dfe_transport_sent_total",
63            "Messages successfully sent to transport"
64        );
65        metrics::describe_counter!(
66            "dfe_transport_send_errors_total",
67            "Messages that failed to send"
68        );
69        metrics::describe_counter!(
70            "dfe_transport_backpressured_total",
71            "Messages delayed due to backpressure"
72        );
73        metrics::describe_counter!(
74            "dfe_transport_refused_total",
75            "Messages refused by transport (circuit open, capacity)"
76        );
77        metrics::describe_gauge!(
78            "dfe_transport_healthy",
79            "Transport health (1=healthy, 0=unhealthy)"
80        );
81        metrics::describe_gauge!(
82            "dfe_transport_queue_size",
83            "Current number of messages in transport queue"
84        );
85        metrics::describe_gauge!(
86            "dfe_transport_queue_capacity",
87            "Maximum transport queue capacity"
88        );
89        metrics::describe_gauge!(
90            "dfe_transport_inflight",
91            "Messages currently in-flight (sent but not acked)"
92        );
93        metrics::describe_histogram!(
94            "dfe_transport_send_duration_seconds",
95            metrics::Unit::Seconds,
96            "Time to send a batch to transport"
97        );
98
99        // Push transport descriptors into manifest registry
100        for (name, desc, mt) in [
101            (
102                "dfe_transport_sent_total",
103                "Messages successfully sent to transport",
104                MetricType::Counter,
105            ),
106            (
107                "dfe_transport_send_errors_total",
108                "Messages that failed to send",
109                MetricType::Counter,
110            ),
111            (
112                "dfe_transport_backpressured_total",
113                "Messages delayed due to backpressure",
114                MetricType::Counter,
115            ),
116            (
117                "dfe_transport_refused_total",
118                "Messages refused by transport (circuit open, capacity)",
119                MetricType::Counter,
120            ),
121            (
122                "dfe_transport_healthy",
123                "Transport health (1=healthy, 0=unhealthy)",
124                MetricType::Gauge,
125            ),
126            (
127                "dfe_transport_queue_size",
128                "Current number of messages in transport queue",
129                MetricType::Gauge,
130            ),
131            (
132                "dfe_transport_queue_capacity",
133                "Maximum transport queue capacity",
134                MetricType::Gauge,
135            ),
136            (
137                "dfe_transport_inflight",
138                "Messages currently in-flight (sent but not acked)",
139                MetricType::Gauge,
140            ),
141        ] {
142            reg.push(MetricDescriptor {
143                name: name.into(),
144                metric_type: mt,
145                description: desc.into(),
146                unit: String::new(),
147                labels: vec!["transport".into()],
148                group: "platform".into(),
149                buckets: None,
150                use_cases: vec![],
151                dashboard_hint: None,
152            });
153        }
154        reg.push(MetricDescriptor {
155            name: "dfe_transport_send_duration_seconds".into(),
156            metric_type: MetricType::Histogram,
157            description: "Time to send a batch to transport".into(),
158            unit: "seconds".into(),
159            labels: vec!["transport".into()],
160            group: "platform".into(),
161            buckets: None,
162            use_cases: vec![],
163            dashboard_hint: None,
164        });
165
166        // --- Pipeline ---
167        metrics::describe_gauge!(
168            "dfe_pipeline_ready",
169            "Pipeline readiness (1=ready, 0=not ready)"
170        );
171        metrics::describe_counter!(
172            "dfe_pipeline_stall_seconds_total",
173            "Cumulative seconds the pipeline was stalled"
174        );
175
176        reg.push(MetricDescriptor {
177            name: "dfe_pipeline_ready".into(),
178            metric_type: MetricType::Gauge,
179            description: "Pipeline readiness (1=ready, 0=not ready)".into(),
180            unit: String::new(),
181            labels: vec![],
182            group: "platform".into(),
183            buckets: None,
184            use_cases: vec![],
185            dashboard_hint: None,
186        });
187        reg.push(MetricDescriptor {
188            name: "dfe_pipeline_stall_seconds_total".into(),
189            metric_type: MetricType::Counter,
190            description: "Cumulative seconds the pipeline was stalled".into(),
191            unit: "seconds".into(),
192            labels: vec![],
193            group: "platform".into(),
194            buckets: None,
195            use_cases: vec![],
196            dashboard_hint: None,
197        });
198
199        // --- Records ---
200        metrics::describe_counter!(
201            "dfe_records_received_total",
202            "Records received from all sources"
203        );
204        metrics::describe_counter!(
205            "dfe_records_delivered_total",
206            "Records successfully delivered to sink"
207        );
208        metrics::describe_counter!(
209            "dfe_records_filtered_total",
210            "Records dropped by filter/routing rules"
211        );
212        metrics::describe_counter!("dfe_records_dlq_total", "Records sent to dead letter queue");
213
214        for (name, desc) in [
215            (
216                "dfe_records_received_total",
217                "Records received from all sources",
218            ),
219            (
220                "dfe_records_delivered_total",
221                "Records successfully delivered to sink",
222            ),
223            (
224                "dfe_records_filtered_total",
225                "Records dropped by filter/routing rules",
226            ),
227            ("dfe_records_dlq_total", "Records sent to dead letter queue"),
228        ] {
229            reg.push(MetricDescriptor {
230                name: name.into(),
231                metric_type: MetricType::Counter,
232                description: desc.into(),
233                unit: String::new(),
234                labels: vec![],
235                group: "platform".into(),
236                buckets: None,
237                use_cases: vec![],
238                dashboard_hint: None,
239            });
240        }
241
242        // --- Scaling ---
243        metrics::describe_gauge!(
244            "dfe_scaling_pressure",
245            "Normalised scaling pressure (0-100)"
246        );
247        metrics::describe_gauge!(
248            "dfe_scaling_circuit_open",
249            "Circuit breaker state (1=open, 0=closed)"
250        );
251        metrics::describe_gauge!(
252            "dfe_scaling_memory_pressure",
253            "Memory pressure ratio (0.0-1.0)"
254        );
255
256        for (name, desc) in [
257            (
258                "dfe_scaling_pressure",
259                "Normalised scaling pressure (0-100)",
260            ),
261            (
262                "dfe_scaling_circuit_open",
263                "Circuit breaker state (1=open, 0=closed)",
264            ),
265            (
266                "dfe_scaling_memory_pressure",
267                "Memory pressure ratio (0.0-1.0)",
268            ),
269        ] {
270            reg.push(MetricDescriptor {
271                name: name.into(),
272                metric_type: MetricType::Gauge,
273                description: desc.into(),
274                unit: String::new(),
275                labels: vec![],
276                group: "platform".into(),
277                buckets: None,
278                use_cases: vec![],
279                dashboard_hint: None,
280            });
281        }
282
283        // --- Spool ---
284        metrics::describe_gauge!("dfe_spool_bytes", "Current spool size in bytes");
285        metrics::describe_gauge!("dfe_spool_messages", "Current spool message count");
286        metrics::describe_gauge!(
287            "dfe_spool_disk_available",
288            "Available disk space for spool in bytes"
289        );
290
291        for (name, desc) in [
292            ("dfe_spool_bytes", "Current spool size in bytes"),
293            ("dfe_spool_messages", "Current spool message count"),
294            (
295                "dfe_spool_disk_available",
296                "Available disk space for spool in bytes",
297            ),
298        ] {
299            reg.push(MetricDescriptor {
300                name: name.into(),
301                metric_type: MetricType::Gauge,
302                description: desc.into(),
303                unit: String::new(),
304                labels: vec![],
305                group: "platform".into(),
306                buckets: None,
307                use_cases: vec![],
308                dashboard_hint: None,
309            });
310        }
311
312        // --- Security ---
313        metrics::describe_counter!(
314            "dfe_auth_failures_total",
315            "Authentication failures by reason"
316        );
317        metrics::describe_counter!(
318            "dfe_validation_failures_total",
319            "Validation failures by reason"
320        );
321
322        reg.push(MetricDescriptor {
323            name: "dfe_auth_failures_total".into(),
324            metric_type: MetricType::Counter,
325            description: "Authentication failures by reason".into(),
326            unit: String::new(),
327            labels: vec!["reason".into()],
328            group: "platform".into(),
329            buckets: None,
330            use_cases: vec![],
331            dashboard_hint: None,
332        });
333        reg.push(MetricDescriptor {
334            name: "dfe_validation_failures_total".into(),
335            metric_type: MetricType::Counter,
336            description: "Validation failures by reason".into(),
337            unit: String::new(),
338            labels: vec!["reason".into()],
339            group: "platform".into(),
340            buckets: None,
341            use_cases: vec![],
342            dashboard_hint: None,
343        });
344
345        Self { _private: () }
346    }
347
348    // ── Transport ────────────────────────────────────────────────────
349
350    /// Record messages successfully sent to a transport.
351    #[inline]
352    pub fn transport_sent(&self, transport: &str, count: u64) {
353        metrics::counter!("dfe_transport_sent_total", "transport" => transport.to_string())
354            .increment(count);
355    }
356
357    /// Record send errors for a transport.
358    #[inline]
359    pub fn transport_send_errors(&self, transport: &str, count: u64) {
360        metrics::counter!("dfe_transport_send_errors_total", "transport" => transport.to_string())
361            .increment(count);
362    }
363
364    /// Record backpressure events for a transport.
365    #[inline]
366    pub fn transport_backpressured(&self, transport: &str, count: u64) {
367        metrics::counter!("dfe_transport_backpressured_total", "transport" => transport.to_string())
368            .increment(count);
369    }
370
371    /// Record refused messages for a transport.
372    #[inline]
373    pub fn transport_refused(&self, transport: &str, count: u64) {
374        metrics::counter!("dfe_transport_refused_total", "transport" => transport.to_string())
375            .increment(count);
376    }
377
378    /// Set transport health status.
379    #[inline]
380    pub fn transport_healthy(&self, transport: &str, healthy: bool) {
381        metrics::gauge!("dfe_transport_healthy", "transport" => transport.to_string())
382            .set(if healthy { 1.0 } else { 0.0 });
383    }
384
385    /// Set current transport queue size.
386    #[inline]
387    pub fn transport_queue_size(&self, transport: &str, size: f64) {
388        metrics::gauge!("dfe_transport_queue_size", "transport" => transport.to_string()).set(size);
389    }
390
391    /// Set transport queue capacity.
392    #[inline]
393    pub fn transport_queue_capacity(&self, transport: &str, capacity: f64) {
394        metrics::gauge!("dfe_transport_queue_capacity", "transport" => transport.to_string())
395            .set(capacity);
396    }
397
398    /// Set in-flight message count for a transport.
399    #[inline]
400    pub fn transport_inflight(&self, transport: &str, count: f64) {
401        metrics::gauge!("dfe_transport_inflight", "transport" => transport.to_string()).set(count);
402    }
403
404    /// Record batch send duration for a transport.
405    #[inline]
406    pub fn transport_send_duration(&self, transport: &str, seconds: f64) {
407        metrics::histogram!(
408            "dfe_transport_send_duration_seconds",
409            "transport" => transport.to_string()
410        )
411        .record(seconds);
412    }
413
414    // ── Pipeline ─────────────────────────────────────────────────────
415
416    /// Set pipeline readiness state.
417    #[inline]
418    pub fn pipeline_ready(&self, ready: bool) {
419        metrics::gauge!("dfe_pipeline_ready").set(if ready { 1.0 } else { 0.0 });
420    }
421
422    /// Add stall duration to the cumulative stall counter (whole seconds).
423    #[inline]
424    pub fn pipeline_stall(&self, seconds: u64) {
425        metrics::counter!("dfe_pipeline_stall_seconds_total").increment(seconds);
426    }
427
428    // ── Records ──────────────────────────────────────────────────────
429
430    /// Record incoming records.
431    #[inline]
432    pub fn records_received(&self, count: u64) {
433        metrics::counter!("dfe_records_received_total").increment(count);
434    }
435
436    /// Record successfully delivered records.
437    #[inline]
438    pub fn records_delivered(&self, count: u64) {
439        metrics::counter!("dfe_records_delivered_total").increment(count);
440    }
441
442    /// Record filtered/dropped records.
443    #[inline]
444    pub fn records_filtered(&self, count: u64) {
445        metrics::counter!("dfe_records_filtered_total").increment(count);
446    }
447
448    /// Record records sent to dead letter queue.
449    #[inline]
450    pub fn records_dlq(&self, count: u64) {
451        metrics::counter!("dfe_records_dlq_total").increment(count);
452    }
453
454    // ── Scaling ──────────────────────────────────────────────────────
455
456    /// Set normalised scaling pressure (0-100).
457    #[inline]
458    pub fn scaling_pressure(&self, pressure: f64) {
459        metrics::gauge!("dfe_scaling_pressure").set(pressure);
460    }
461
462    /// Set circuit breaker state.
463    #[inline]
464    pub fn scaling_circuit_open(&self, open: bool) {
465        metrics::gauge!("dfe_scaling_circuit_open").set(if open { 1.0 } else { 0.0 });
466    }
467
468    /// Set memory pressure ratio (0.0-1.0).
469    #[inline]
470    pub fn scaling_memory_pressure(&self, ratio: f64) {
471        metrics::gauge!("dfe_scaling_memory_pressure").set(ratio);
472    }
473
474    // ── Spool ────────────────────────────────────────────────────────
475
476    /// Set current spool size in bytes.
477    #[inline]
478    pub fn spool_bytes(&self, bytes: f64) {
479        metrics::gauge!("dfe_spool_bytes").set(bytes);
480    }
481
482    /// Set current spool message count.
483    #[inline]
484    pub fn spool_messages(&self, count: f64) {
485        metrics::gauge!("dfe_spool_messages").set(count);
486    }
487
488    /// Set available disk space for spool.
489    #[inline]
490    pub fn spool_disk_available(&self, bytes: f64) {
491        metrics::gauge!("dfe_spool_disk_available").set(bytes);
492    }
493
494    // ── Security ─────────────────────────────────────────────────────
495
496    /// Record authentication failure.
497    #[inline]
498    pub fn auth_failure(&self, reason: &str) {
499        metrics::counter!("dfe_auth_failures_total", "reason" => reason.to_string()).increment(1);
500    }
501
502    /// Record validation failure.
503    #[inline]
504    pub fn validation_failure(&self, reason: &str) {
505        metrics::counter!("dfe_validation_failures_total", "reason" => reason.to_string())
506            .increment(1);
507    }
508}
509
510#[cfg(test)]
511mod tests {
512    use super::*;
513
514    #[tokio::test]
515    async fn test_register_does_not_panic() {
516        let mgr = super::super::MetricsManager::new("test_app");
517        let _dfe = DfeMetrics::register(&mgr);
518    }
519
520    #[tokio::test]
521    async fn test_register_populates_registry() {
522        let mgr = super::super::MetricsManager::new("test_app");
523        let _dfe = DfeMetrics::register(&mgr);
524        let manifest = mgr.registry().manifest();
525        let names: Vec<&str> = manifest.metrics.iter().map(|m| m.name.as_str()).collect();
526        assert!(names.contains(&"dfe_transport_sent_total"));
527        assert!(names.contains(&"dfe_pipeline_ready"));
528        assert!(names.contains(&"dfe_records_received_total"));
529        assert!(names.contains(&"dfe_scaling_pressure"));
530        assert!(names.contains(&"dfe_spool_bytes"));
531        assert!(names.contains(&"dfe_auth_failures_total"));
532        // All should be group=platform
533        for m in &manifest.metrics {
534            assert_eq!(m.group, "platform");
535        }
536        // Transport metrics should have "transport" label
537        let sent = manifest
538            .metrics
539            .iter()
540            .find(|m| m.name == "dfe_transport_sent_total")
541            .unwrap();
542        assert_eq!(sent.labels, vec!["transport"]);
543        // Security metrics should have "reason" label
544        let auth = manifest
545            .metrics
546            .iter()
547            .find(|m| m.name == "dfe_auth_failures_total")
548            .unwrap();
549        assert_eq!(auth.labels, vec!["reason"]);
550    }
551
552    #[tokio::test]
553    async fn test_methods_callable_without_recorder() {
554        let mgr = super::super::MetricsManager::new("test_app");
555        let dfe = DfeMetrics::register(&mgr);
556
557        dfe.transport_sent("kafka", 1);
558        dfe.transport_send_errors("kafka", 1);
559        dfe.transport_backpressured("kafka", 1);
560        dfe.transport_refused("kafka", 1);
561        dfe.transport_healthy("kafka", true);
562        dfe.transport_queue_size("kafka", 100.0);
563        dfe.transport_queue_capacity("kafka", 1000.0);
564        dfe.transport_inflight("kafka", 50.0);
565        dfe.transport_send_duration("kafka", 0.042);
566
567        dfe.pipeline_ready(true);
568        dfe.pipeline_stall(1);
569
570        dfe.records_received(100);
571        dfe.records_delivered(99);
572        dfe.records_filtered(1);
573        dfe.records_dlq(0);
574
575        dfe.scaling_pressure(42.0);
576        dfe.scaling_circuit_open(false);
577        dfe.scaling_memory_pressure(0.65);
578
579        dfe.spool_bytes(1024.0);
580        dfe.spool_messages(10.0);
581        dfe.spool_disk_available(1_000_000.0);
582
583        dfe.auth_failure("invalid_token");
584        dfe.validation_failure("missing_field");
585    }
586}