Skip to main content

hyperi_rustlib/metrics/
dfe.rs

1// Project:   hyperi-rustlib
2// File:      src/metrics/dfe.rs
3// Purpose:   Standard DFE metric definitions with transport labels
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Standard DFE metrics for pipeline components (receiver, loader, engine).
10//!
11//! Call [`DfeMetrics::register`] **after** creating a
12//! [`MetricsManager`](super::MetricsManager): the manager must exist so platform
13//! metrics land in the manifest registry. Methods are `#[inline]` for hot-path use.
14//!
15//! ## Example
16//!
17//! ```rust,no_run
18//! use hyperi_rustlib::metrics::{MetricsManager, DfeMetrics, TransportKind};
19//!
20//! let mgr = MetricsManager::new("myapp");
21//! let dfe = DfeMetrics::register(&mgr);
22//!
23//! dfe.transport_sent(TransportKind::Kafka, 100);
24//! dfe.records_received(500);
25//! dfe.scaling_pressure(42.0);
26//! ```
27
28use super::manifest::{MetricDescriptor, MetricType};
29
30/// Standard DFE metric set: labelled counters, gauges, and histograms across
31/// transport, pipeline, records, scaling, spool, and security.
32///
33/// Construct via [`DfeMetrics::register`] -- describes all metrics with the
34/// global recorder AND pushes descriptors into the manifest registry.
35pub struct DfeMetrics {
36    /// Prevent external construction.
37    _private: (),
38}
39
40impl DfeMetrics {
41    /// Register all DFE metric descriptions with the global recorder and
42    /// manifest registry. Call **once** after creating a
43    /// [`MetricsManager`](super::MetricsManager). Returned handle is zero-sized
44    /// (recording goes through the global `metrics!` macros).
45    ///
46    /// **Breaking change (v1.22):** takes `&MetricsManager` so platform metrics
47    /// are tightly coupled with the manifest registry.
48    #[must_use]
49    #[allow(clippy::too_many_lines)]
50    pub fn register(manager: &super::MetricsManager) -> Self {
51        let reg = manager.registry();
52
53        // --- Transport ---
54        metrics::describe_counter!(
55            "dfe_transport_sent_total",
56            "Messages successfully sent to transport"
57        );
58        metrics::describe_counter!(
59            "dfe_transport_send_errors_total",
60            "Messages that failed to send"
61        );
62        metrics::describe_counter!(
63            "dfe_transport_backpressured_total",
64            "Messages delayed due to backpressure"
65        );
66        metrics::describe_counter!(
67            "dfe_transport_refused_total",
68            "Messages refused by transport (circuit open, capacity)"
69        );
70        metrics::describe_gauge!(
71            "dfe_transport_healthy",
72            "Transport health (1=healthy, 0=unhealthy)"
73        );
74        metrics::describe_gauge!(
75            "dfe_transport_queue_size",
76            "Current number of messages in transport queue"
77        );
78        metrics::describe_gauge!(
79            "dfe_transport_queue_capacity",
80            "Maximum transport queue capacity"
81        );
82        metrics::describe_gauge!(
83            "dfe_transport_inflight",
84            "Messages currently in-flight (sent but not acked)"
85        );
86        metrics::describe_histogram!(
87            "dfe_transport_send_duration_seconds",
88            metrics::Unit::Seconds,
89            "Time to send a batch to transport"
90        );
91
92        // Push transport descriptors into manifest registry
93        for (name, desc, mt) in [
94            (
95                "dfe_transport_sent_total",
96                "Messages successfully sent to transport",
97                MetricType::Counter,
98            ),
99            (
100                "dfe_transport_send_errors_total",
101                "Messages that failed to send",
102                MetricType::Counter,
103            ),
104            (
105                "dfe_transport_backpressured_total",
106                "Messages delayed due to backpressure",
107                MetricType::Counter,
108            ),
109            (
110                "dfe_transport_refused_total",
111                "Messages refused by transport (circuit open, capacity)",
112                MetricType::Counter,
113            ),
114            (
115                "dfe_transport_healthy",
116                "Transport health (1=healthy, 0=unhealthy)",
117                MetricType::Gauge,
118            ),
119            (
120                "dfe_transport_queue_size",
121                "Current number of messages in transport queue",
122                MetricType::Gauge,
123            ),
124            (
125                "dfe_transport_queue_capacity",
126                "Maximum transport queue capacity",
127                MetricType::Gauge,
128            ),
129            (
130                "dfe_transport_inflight",
131                "Messages currently in-flight (sent but not acked)",
132                MetricType::Gauge,
133            ),
134        ] {
135            reg.push(MetricDescriptor {
136                name: name.into(),
137                metric_type: mt,
138                description: desc.into(),
139                unit: String::new(),
140                labels: vec!["transport".into()],
141                group: "platform".into(),
142                buckets: None,
143                use_cases: vec![],
144                dashboard_hint: None,
145            });
146        }
147        reg.push(MetricDescriptor {
148            name: "dfe_transport_send_duration_seconds".into(),
149            metric_type: MetricType::Histogram,
150            description: "Time to send a batch to transport".into(),
151            unit: "seconds".into(),
152            labels: vec!["transport".into()],
153            group: "platform".into(),
154            buckets: None,
155            use_cases: vec![],
156            dashboard_hint: None,
157        });
158
159        // --- Pipeline ---
160        metrics::describe_gauge!(
161            "dfe_pipeline_ready",
162            "Pipeline readiness (1=ready, 0=not ready)"
163        );
164        metrics::describe_counter!(
165            "dfe_pipeline_stall_seconds_total",
166            "Cumulative seconds the pipeline was stalled"
167        );
168
169        reg.push(MetricDescriptor {
170            name: "dfe_pipeline_ready".into(),
171            metric_type: MetricType::Gauge,
172            description: "Pipeline readiness (1=ready, 0=not ready)".into(),
173            unit: String::new(),
174            labels: vec![],
175            group: "platform".into(),
176            buckets: None,
177            use_cases: vec![],
178            dashboard_hint: None,
179        });
180        reg.push(MetricDescriptor {
181            name: "dfe_pipeline_stall_seconds_total".into(),
182            metric_type: MetricType::Counter,
183            description: "Cumulative seconds the pipeline was stalled".into(),
184            unit: "seconds".into(),
185            labels: vec![],
186            group: "platform".into(),
187            buckets: None,
188            use_cases: vec![],
189            dashboard_hint: None,
190        });
191
192        // --- Records ---
193        metrics::describe_counter!(
194            "dfe_records_received_total",
195            "Records received from all sources"
196        );
197        metrics::describe_counter!(
198            "dfe_records_delivered_total",
199            "Records successfully delivered to sink"
200        );
201        metrics::describe_counter!(
202            "dfe_records_filtered_total",
203            "Records dropped by filter/routing rules"
204        );
205        metrics::describe_counter!("dfe_records_dlq_total", "Records sent to dead letter queue");
206
207        for (name, desc) in [
208            (
209                "dfe_records_received_total",
210                "Records received from all sources",
211            ),
212            (
213                "dfe_records_delivered_total",
214                "Records successfully delivered to sink",
215            ),
216            (
217                "dfe_records_filtered_total",
218                "Records dropped by filter/routing rules",
219            ),
220            ("dfe_records_dlq_total", "Records sent to dead letter queue"),
221        ] {
222            reg.push(MetricDescriptor {
223                name: name.into(),
224                metric_type: MetricType::Counter,
225                description: desc.into(),
226                unit: String::new(),
227                labels: vec![],
228                group: "platform".into(),
229                buckets: None,
230                use_cases: vec![],
231                dashboard_hint: None,
232            });
233        }
234
235        // --- Scaling ---
236        metrics::describe_gauge!(
237            "dfe_scaling_pressure",
238            "Normalised scaling pressure (0-100)"
239        );
240        metrics::describe_gauge!(
241            "dfe_scaling_circuit_open",
242            "Circuit breaker state (1=open, 0=closed)"
243        );
244        metrics::describe_gauge!(
245            "dfe_scaling_memory_pressure",
246            "Memory pressure ratio (0.0-1.0)"
247        );
248
249        for (name, desc) in [
250            (
251                "dfe_scaling_pressure",
252                "Normalised scaling pressure (0-100)",
253            ),
254            (
255                "dfe_scaling_circuit_open",
256                "Circuit breaker state (1=open, 0=closed)",
257            ),
258            (
259                "dfe_scaling_memory_pressure",
260                "Memory pressure ratio (0.0-1.0)",
261            ),
262        ] {
263            reg.push(MetricDescriptor {
264                name: name.into(),
265                metric_type: MetricType::Gauge,
266                description: desc.into(),
267                unit: String::new(),
268                labels: vec![],
269                group: "platform".into(),
270                buckets: None,
271                use_cases: vec![],
272                dashboard_hint: None,
273            });
274        }
275
276        // --- Spool ---
277        metrics::describe_gauge!("dfe_spool_bytes", "Current spool size in bytes");
278        metrics::describe_gauge!("dfe_spool_messages", "Current spool message count");
279        metrics::describe_gauge!(
280            "dfe_spool_disk_available",
281            "Available disk space for spool in bytes"
282        );
283
284        for (name, desc) in [
285            ("dfe_spool_bytes", "Current spool size in bytes"),
286            ("dfe_spool_messages", "Current spool message count"),
287            (
288                "dfe_spool_disk_available",
289                "Available disk space for spool in bytes",
290            ),
291        ] {
292            reg.push(MetricDescriptor {
293                name: name.into(),
294                metric_type: MetricType::Gauge,
295                description: desc.into(),
296                unit: String::new(),
297                labels: vec![],
298                group: "platform".into(),
299                buckets: None,
300                use_cases: vec![],
301                dashboard_hint: None,
302            });
303        }
304
305        // --- Security ---
306        metrics::describe_counter!(
307            "dfe_auth_failures_total",
308            "Authentication failures by reason"
309        );
310        metrics::describe_counter!(
311            "dfe_validation_failures_total",
312            "Validation failures by reason"
313        );
314
315        reg.push(MetricDescriptor {
316            name: "dfe_auth_failures_total".into(),
317            metric_type: MetricType::Counter,
318            description: "Authentication failures by reason".into(),
319            unit: String::new(),
320            labels: vec!["reason".into()],
321            group: "platform".into(),
322            buckets: None,
323            use_cases: vec![],
324            dashboard_hint: None,
325        });
326        reg.push(MetricDescriptor {
327            name: "dfe_validation_failures_total".into(),
328            metric_type: MetricType::Counter,
329            description: "Validation failures by reason".into(),
330            unit: String::new(),
331            labels: vec!["reason".into()],
332            group: "platform".into(),
333            buckets: None,
334            use_cases: vec![],
335            dashboard_hint: None,
336        });
337
338        Self { _private: () }
339    }
340
341    // ── Transport ────────────────────────────────────────────────────
342
343    /// Record messages successfully sent to a transport.
344    #[inline]
345    pub fn transport_sent(&self, transport: super::TransportKind, count: u64) {
346        metrics::counter!("dfe_transport_sent_total", "transport" => transport.as_label())
347            .increment(count);
348    }
349
350    /// Record send errors for a transport.
351    #[inline]
352    pub fn transport_send_errors(&self, transport: super::TransportKind, count: u64) {
353        metrics::counter!("dfe_transport_send_errors_total", "transport" => transport.as_label())
354            .increment(count);
355    }
356
357    /// Record backpressure events for a transport.
358    #[inline]
359    pub fn transport_backpressured(&self, transport: &str, count: u64) {
360        metrics::counter!("dfe_transport_backpressured_total", "transport" => transport.to_string())
361            .increment(count);
362    }
363
364    /// Record refused messages for a transport.
365    #[inline]
366    pub fn transport_refused(&self, transport: &str, count: u64) {
367        metrics::counter!("dfe_transport_refused_total", "transport" => transport.to_string())
368            .increment(count);
369    }
370
371    /// Set transport health status.
372    #[inline]
373    pub fn transport_healthy(&self, transport: &str, healthy: bool) {
374        metrics::gauge!("dfe_transport_healthy", "transport" => transport.to_string())
375            .set(if healthy { 1.0 } else { 0.0 });
376    }
377
378    /// Set current transport queue size.
379    #[inline]
380    pub fn transport_queue_size(&self, transport: &str, size: f64) {
381        metrics::gauge!("dfe_transport_queue_size", "transport" => transport.to_string()).set(size);
382    }
383
384    /// Set transport queue capacity.
385    #[inline]
386    pub fn transport_queue_capacity(&self, transport: &str, capacity: f64) {
387        metrics::gauge!("dfe_transport_queue_capacity", "transport" => transport.to_string())
388            .set(capacity);
389    }
390
391    /// Set in-flight message count for a transport.
392    #[inline]
393    pub fn transport_inflight(&self, transport: &str, count: f64) {
394        metrics::gauge!("dfe_transport_inflight", "transport" => transport.to_string()).set(count);
395    }
396
397    /// Record batch send duration for a transport.
398    #[inline]
399    pub fn transport_send_duration(&self, transport: &str, seconds: f64) {
400        metrics::histogram!(
401            "dfe_transport_send_duration_seconds",
402            "transport" => transport.to_string()
403        )
404        .record(seconds);
405    }
406
407    // ── Pipeline ─────────────────────────────────────────────────────
408
409    /// Set pipeline readiness state.
410    #[inline]
411    pub fn pipeline_ready(&self, ready: bool) {
412        metrics::gauge!("dfe_pipeline_ready").set(if ready { 1.0 } else { 0.0 });
413    }
414
415    /// Add stall duration to the cumulative stall counter (whole seconds).
416    #[inline]
417    pub fn pipeline_stall(&self, seconds: u64) {
418        metrics::counter!("dfe_pipeline_stall_seconds_total").increment(seconds);
419    }
420
421    // ── Records ──────────────────────────────────────────────────────
422
423    /// Record incoming records.
424    #[inline]
425    pub fn records_received(&self, count: u64) {
426        metrics::counter!("dfe_records_received_total").increment(count);
427    }
428
429    /// Record successfully delivered records.
430    #[inline]
431    pub fn records_delivered(&self, count: u64) {
432        metrics::counter!("dfe_records_delivered_total").increment(count);
433    }
434
435    /// Record filtered/dropped records.
436    #[inline]
437    pub fn records_filtered(&self, count: u64) {
438        metrics::counter!("dfe_records_filtered_total").increment(count);
439    }
440
441    /// Record records sent to dead letter queue.
442    #[inline]
443    pub fn records_dlq(&self, count: u64) {
444        metrics::counter!("dfe_records_dlq_total").increment(count);
445    }
446
447    // ── Scaling ──────────────────────────────────────────────────────
448
449    /// Set normalised scaling pressure (0-100).
450    #[inline]
451    pub fn scaling_pressure(&self, pressure: f64) {
452        metrics::gauge!("dfe_scaling_pressure").set(pressure);
453    }
454
455    /// Set circuit breaker state.
456    #[inline]
457    pub fn scaling_circuit_open(&self, open: bool) {
458        metrics::gauge!("dfe_scaling_circuit_open").set(if open { 1.0 } else { 0.0 });
459    }
460
461    /// Set memory pressure ratio (0.0-1.0).
462    #[inline]
463    pub fn scaling_memory_pressure(&self, ratio: f64) {
464        metrics::gauge!("dfe_scaling_memory_pressure").set(ratio);
465    }
466
467    // ── Spool ────────────────────────────────────────────────────────
468
469    /// Set current spool size in bytes.
470    #[inline]
471    pub fn spool_bytes(&self, bytes: f64) {
472        metrics::gauge!("dfe_spool_bytes").set(bytes);
473    }
474
475    /// Set current spool message count.
476    #[inline]
477    pub fn spool_messages(&self, count: f64) {
478        metrics::gauge!("dfe_spool_messages").set(count);
479    }
480
481    /// Set available disk space for spool.
482    #[inline]
483    pub fn spool_disk_available(&self, bytes: f64) {
484        metrics::gauge!("dfe_spool_disk_available").set(bytes);
485    }
486
487    // ── Security ─────────────────────────────────────────────────────
488
489    /// Record authentication failure.
490    #[inline]
491    pub fn auth_failure(&self, reason: super::AuthFailureReason) {
492        metrics::counter!("dfe_auth_failures_total", "reason" => reason.as_label()).increment(1);
493    }
494
495    /// Record validation failure.
496    #[inline]
497    pub fn validation_failure(&self, reason: super::ValidationFailureReason) {
498        metrics::counter!("dfe_validation_failures_total", "reason" => reason.as_label())
499            .increment(1);
500    }
501}
502
503#[cfg(test)]
504mod tests {
505    use super::*;
506
507    #[tokio::test]
508    async fn test_register_does_not_panic() {
509        let mgr = super::super::MetricsManager::new_for_test("test_app");
510        let _dfe = DfeMetrics::register(&mgr);
511    }
512
513    #[tokio::test]
514    async fn test_register_populates_registry() {
515        let mgr = super::super::MetricsManager::new_for_test("test_app");
516        let _dfe = DfeMetrics::register(&mgr);
517        let manifest = mgr.registry().manifest();
518        let names: Vec<&str> = manifest.metrics.iter().map(|m| m.name.as_str()).collect();
519        assert!(names.contains(&"dfe_transport_sent_total"));
520        assert!(names.contains(&"dfe_pipeline_ready"));
521        assert!(names.contains(&"dfe_records_received_total"));
522        assert!(names.contains(&"dfe_scaling_pressure"));
523        assert!(names.contains(&"dfe_spool_bytes"));
524        assert!(names.contains(&"dfe_auth_failures_total"));
525        // All should be group=platform
526        for m in &manifest.metrics {
527            assert_eq!(m.group, "platform");
528        }
529        // Transport metrics should have "transport" label
530        let sent = manifest
531            .metrics
532            .iter()
533            .find(|m| m.name == "dfe_transport_sent_total")
534            .unwrap();
535        assert_eq!(sent.labels, vec!["transport"]);
536        // Security metrics should have "reason" label
537        let auth = manifest
538            .metrics
539            .iter()
540            .find(|m| m.name == "dfe_auth_failures_total")
541            .unwrap();
542        assert_eq!(auth.labels, vec!["reason"]);
543    }
544
545    #[tokio::test]
546    async fn test_methods_callable_without_recorder() {
547        let mgr = super::super::MetricsManager::new("test_app");
548        let dfe = DfeMetrics::register(&mgr);
549
550        dfe.transport_sent(super::super::TransportKind::Kafka, 1);
551        dfe.transport_send_errors(super::super::TransportKind::Kafka, 1);
552        dfe.transport_backpressured("kafka", 1);
553        dfe.transport_refused("kafka", 1);
554        dfe.transport_healthy("kafka", true);
555        dfe.transport_queue_size("kafka", 100.0);
556        dfe.transport_queue_capacity("kafka", 1000.0);
557        dfe.transport_inflight("kafka", 50.0);
558        dfe.transport_send_duration("kafka", 0.042);
559
560        dfe.pipeline_ready(true);
561        dfe.pipeline_stall(1);
562
563        dfe.records_received(100);
564        dfe.records_delivered(99);
565        dfe.records_filtered(1);
566        dfe.records_dlq(0);
567
568        dfe.scaling_pressure(42.0);
569        dfe.scaling_circuit_open(false);
570        dfe.scaling_memory_pressure(0.65);
571
572        dfe.spool_bytes(1024.0);
573        dfe.spool_messages(10.0);
574        dfe.spool_disk_available(1_000_000.0);
575
576        dfe.auth_failure(super::super::AuthFailureReason::MalformedToken);
577        dfe.validation_failure(super::super::ValidationFailureReason::FieldMissing);
578    }
579}