1use super::manifest::{MetricDescriptor, MetricType};
29
30pub struct DfeMetrics {
36 _private: (),
38}
39
40impl DfeMetrics {
41 #[must_use]
49 #[allow(clippy::too_many_lines)]
50 pub fn register(manager: &super::MetricsManager) -> Self {
51 let reg = manager.registry();
52
53 metrics::describe_counter!(
55 "dfe_transport_sent_total",
56 "Messages successfully sent to transport"
57 );
58 metrics::describe_counter!(
59 "dfe_transport_send_errors_total",
60 "Messages that failed to send"
61 );
62 metrics::describe_counter!(
63 "dfe_transport_backpressured_total",
64 "Messages delayed due to backpressure"
65 );
66 metrics::describe_counter!(
67 "dfe_transport_refused_total",
68 "Messages refused by transport (circuit open, capacity)"
69 );
70 metrics::describe_gauge!(
71 "dfe_transport_healthy",
72 "Transport health (1=healthy, 0=unhealthy)"
73 );
74 metrics::describe_gauge!(
75 "dfe_transport_queue_size",
76 "Current number of messages in transport queue"
77 );
78 metrics::describe_gauge!(
79 "dfe_transport_queue_capacity",
80 "Maximum transport queue capacity"
81 );
82 metrics::describe_gauge!(
83 "dfe_transport_inflight",
84 "Messages currently in-flight (sent but not acked)"
85 );
86 metrics::describe_histogram!(
87 "dfe_transport_send_duration_seconds",
88 metrics::Unit::Seconds,
89 "Time to send a batch to transport"
90 );
91
92 for (name, desc, mt) in [
94 (
95 "dfe_transport_sent_total",
96 "Messages successfully sent to transport",
97 MetricType::Counter,
98 ),
99 (
100 "dfe_transport_send_errors_total",
101 "Messages that failed to send",
102 MetricType::Counter,
103 ),
104 (
105 "dfe_transport_backpressured_total",
106 "Messages delayed due to backpressure",
107 MetricType::Counter,
108 ),
109 (
110 "dfe_transport_refused_total",
111 "Messages refused by transport (circuit open, capacity)",
112 MetricType::Counter,
113 ),
114 (
115 "dfe_transport_healthy",
116 "Transport health (1=healthy, 0=unhealthy)",
117 MetricType::Gauge,
118 ),
119 (
120 "dfe_transport_queue_size",
121 "Current number of messages in transport queue",
122 MetricType::Gauge,
123 ),
124 (
125 "dfe_transport_queue_capacity",
126 "Maximum transport queue capacity",
127 MetricType::Gauge,
128 ),
129 (
130 "dfe_transport_inflight",
131 "Messages currently in-flight (sent but not acked)",
132 MetricType::Gauge,
133 ),
134 ] {
135 reg.push(MetricDescriptor {
136 name: name.into(),
137 metric_type: mt,
138 description: desc.into(),
139 unit: String::new(),
140 labels: vec!["transport".into()],
141 group: "platform".into(),
142 buckets: None,
143 use_cases: vec![],
144 dashboard_hint: None,
145 });
146 }
147 reg.push(MetricDescriptor {
148 name: "dfe_transport_send_duration_seconds".into(),
149 metric_type: MetricType::Histogram,
150 description: "Time to send a batch to transport".into(),
151 unit: "seconds".into(),
152 labels: vec!["transport".into()],
153 group: "platform".into(),
154 buckets: None,
155 use_cases: vec![],
156 dashboard_hint: None,
157 });
158
159 metrics::describe_gauge!(
161 "dfe_pipeline_ready",
162 "Pipeline readiness (1=ready, 0=not ready)"
163 );
164 metrics::describe_counter!(
165 "dfe_pipeline_stall_seconds_total",
166 "Cumulative seconds the pipeline was stalled"
167 );
168
169 reg.push(MetricDescriptor {
170 name: "dfe_pipeline_ready".into(),
171 metric_type: MetricType::Gauge,
172 description: "Pipeline readiness (1=ready, 0=not ready)".into(),
173 unit: String::new(),
174 labels: vec![],
175 group: "platform".into(),
176 buckets: None,
177 use_cases: vec![],
178 dashboard_hint: None,
179 });
180 reg.push(MetricDescriptor {
181 name: "dfe_pipeline_stall_seconds_total".into(),
182 metric_type: MetricType::Counter,
183 description: "Cumulative seconds the pipeline was stalled".into(),
184 unit: "seconds".into(),
185 labels: vec![],
186 group: "platform".into(),
187 buckets: None,
188 use_cases: vec![],
189 dashboard_hint: None,
190 });
191
192 metrics::describe_counter!(
194 "dfe_records_received_total",
195 "Records received from all sources"
196 );
197 metrics::describe_counter!(
198 "dfe_records_delivered_total",
199 "Records successfully delivered to sink"
200 );
201 metrics::describe_counter!(
202 "dfe_records_filtered_total",
203 "Records dropped by filter/routing rules"
204 );
205 metrics::describe_counter!("dfe_records_dlq_total", "Records sent to dead letter queue");
206
207 for (name, desc) in [
208 (
209 "dfe_records_received_total",
210 "Records received from all sources",
211 ),
212 (
213 "dfe_records_delivered_total",
214 "Records successfully delivered to sink",
215 ),
216 (
217 "dfe_records_filtered_total",
218 "Records dropped by filter/routing rules",
219 ),
220 ("dfe_records_dlq_total", "Records sent to dead letter queue"),
221 ] {
222 reg.push(MetricDescriptor {
223 name: name.into(),
224 metric_type: MetricType::Counter,
225 description: desc.into(),
226 unit: String::new(),
227 labels: vec![],
228 group: "platform".into(),
229 buckets: None,
230 use_cases: vec![],
231 dashboard_hint: None,
232 });
233 }
234
235 metrics::describe_gauge!(
237 "dfe_scaling_pressure",
238 "Normalised scaling pressure (0-100)"
239 );
240 metrics::describe_gauge!(
242 "dfe_scaling_circuit_open",
243 "Circuit breaker state (1=open, 0=closed)"
244 );
245 metrics::describe_gauge!(
246 "dfe_scaling_memory_pressure",
247 "Memory pressure ratio (0.0-1.0)"
248 );
249 metrics::describe_gauge!(
252 "dfe_scaling_memory_pressure_ratio",
253 "Memory pressure ratio (0.0-1.0)"
254 );
255
256 for (name, desc) in [
257 (
258 "dfe_scaling_pressure",
259 "Normalised scaling pressure (0-100)",
260 ),
261 (
262 "dfe_scaling_circuit_open",
263 "Circuit breaker state (1=open, 0=closed)",
264 ),
265 (
266 "dfe_scaling_memory_pressure",
267 "Memory pressure ratio (0.0-1.0)",
268 ),
269 (
271 "dfe_scaling_memory_pressure_ratio",
272 "Memory pressure ratio (0.0-1.0)",
273 ),
274 ] {
275 reg.push(MetricDescriptor {
276 name: name.into(),
277 metric_type: MetricType::Gauge,
278 description: desc.into(),
279 unit: String::new(),
280 labels: vec![],
281 group: "platform".into(),
282 buckets: None,
283 use_cases: vec![],
284 dashboard_hint: None,
285 });
286 }
287
288 metrics::describe_gauge!("dfe_spool_bytes", "Current spool size in bytes");
290 metrics::describe_gauge!("dfe_spool_messages", "Current spool message count");
291 metrics::describe_gauge!(
292 "dfe_spool_disk_available",
293 "Available disk space for spool in bytes"
294 );
295 metrics::describe_gauge!(
298 "dfe_spool_disk_available_bytes",
299 "Available disk space for spool in bytes"
300 );
301
302 for (name, desc) in [
303 ("dfe_spool_bytes", "Current spool size in bytes"),
304 ("dfe_spool_messages", "Current spool message count"),
305 (
306 "dfe_spool_disk_available",
307 "Available disk space for spool in bytes",
308 ),
309 (
311 "dfe_spool_disk_available_bytes",
312 "Available disk space for spool in bytes",
313 ),
314 ] {
315 reg.push(MetricDescriptor {
316 name: name.into(),
317 metric_type: MetricType::Gauge,
318 description: desc.into(),
319 unit: String::new(),
320 labels: vec![],
321 group: "platform".into(),
322 buckets: None,
323 use_cases: vec![],
324 dashboard_hint: None,
325 });
326 }
327
328 metrics::describe_counter!(
330 "dfe_auth_failures_total",
331 "Authentication failures by reason"
332 );
333 metrics::describe_counter!(
334 "dfe_validation_failures_total",
335 "Validation failures by reason"
336 );
337
338 reg.push(MetricDescriptor {
339 name: "dfe_auth_failures_total".into(),
340 metric_type: MetricType::Counter,
341 description: "Authentication failures by reason".into(),
342 unit: String::new(),
343 labels: vec!["reason".into()],
344 group: "platform".into(),
345 buckets: None,
346 use_cases: vec![],
347 dashboard_hint: None,
348 });
349 reg.push(MetricDescriptor {
350 name: "dfe_validation_failures_total".into(),
351 metric_type: MetricType::Counter,
352 description: "Validation failures by reason".into(),
353 unit: String::new(),
354 labels: vec!["reason".into()],
355 group: "platform".into(),
356 buckets: None,
357 use_cases: vec![],
358 dashboard_hint: None,
359 });
360
361 Self { _private: () }
362 }
363
364 #[inline]
368 pub fn transport_sent(&self, transport: super::TransportKind, count: u64) {
369 metrics::counter!("dfe_transport_sent_total", "transport" => transport.as_label())
370 .increment(count);
371 }
372
373 #[inline]
375 pub fn transport_send_errors(&self, transport: super::TransportKind, count: u64) {
376 metrics::counter!("dfe_transport_send_errors_total", "transport" => transport.as_label())
377 .increment(count);
378 }
379
380 #[inline]
382 pub fn transport_backpressured(&self, transport: &str, count: u64) {
383 metrics::counter!("dfe_transport_backpressured_total", "transport" => transport.to_string())
384 .increment(count);
385 }
386
387 #[inline]
389 pub fn transport_refused(&self, transport: &str, count: u64) {
390 metrics::counter!("dfe_transport_refused_total", "transport" => transport.to_string())
391 .increment(count);
392 }
393
394 #[inline]
399 pub fn transport_healthy(&self, transport: &str, healthy: bool) {
400 metrics::gauge!("dfe_transport_healthy", "transport" => transport.to_string())
401 .set(if healthy { 1.0 } else { 0.0 });
402 }
403
404 #[inline]
406 pub fn transport_queue_size(&self, transport: &str, size: f64) {
407 metrics::gauge!("dfe_transport_queue_size", "transport" => transport.to_string()).set(size);
408 }
409
410 #[inline]
412 pub fn transport_queue_capacity(&self, transport: &str, capacity: f64) {
413 metrics::gauge!("dfe_transport_queue_capacity", "transport" => transport.to_string())
414 .set(capacity);
415 }
416
417 #[inline]
419 pub fn transport_inflight(&self, transport: &str, count: f64) {
420 metrics::gauge!("dfe_transport_inflight", "transport" => transport.to_string()).set(count);
421 }
422
423 #[inline]
425 pub fn transport_send_duration(&self, transport: &str, seconds: f64) {
426 metrics::histogram!(
427 "dfe_transport_send_duration_seconds",
428 "transport" => transport.to_string()
429 )
430 .record(seconds);
431 }
432
433 #[inline]
440 pub fn pipeline_ready(&self, ready: bool) {
441 metrics::gauge!("dfe_pipeline_ready").set(if ready { 1.0 } else { 0.0 });
442 }
443
444 #[inline]
446 pub fn pipeline_stall(&self, seconds: u64) {
447 metrics::counter!("dfe_pipeline_stall_seconds_total").increment(seconds);
448 }
449
450 #[inline]
454 pub fn records_received(&self, count: u64) {
455 metrics::counter!("dfe_records_received_total").increment(count);
456 }
457
458 #[inline]
460 pub fn records_delivered(&self, count: u64) {
461 metrics::counter!("dfe_records_delivered_total").increment(count);
462 }
463
464 #[inline]
466 pub fn records_filtered(&self, count: u64) {
467 metrics::counter!("dfe_records_filtered_total").increment(count);
468 }
469
470 #[inline]
472 pub fn records_dlq(&self, count: u64) {
473 metrics::counter!("dfe_records_dlq_total").increment(count);
474 }
475
476 #[inline]
480 pub fn scaling_pressure(&self, pressure: f64) {
481 metrics::gauge!("dfe_scaling_pressure").set(pressure);
482 }
483
484 #[inline]
489 pub fn scaling_circuit_open(&self, open: bool) {
490 metrics::gauge!("dfe_scaling_circuit_open").set(if open { 1.0 } else { 0.0 });
491 }
492
493 #[inline]
495 pub fn scaling_memory_pressure(&self, ratio: f64) {
496 metrics::gauge!("dfe_scaling_memory_pressure").set(ratio);
497 metrics::gauge!("dfe_scaling_memory_pressure_ratio").set(ratio);
499 }
500
501 #[inline]
505 pub fn spool_bytes(&self, bytes: f64) {
506 metrics::gauge!("dfe_spool_bytes").set(bytes);
507 }
508
509 #[inline]
511 pub fn spool_messages(&self, count: f64) {
512 metrics::gauge!("dfe_spool_messages").set(count);
513 }
514
515 #[inline]
517 pub fn spool_disk_available(&self, bytes: f64) {
518 metrics::gauge!("dfe_spool_disk_available").set(bytes);
519 metrics::gauge!("dfe_spool_disk_available_bytes").set(bytes);
522 }
523
524 #[inline]
528 pub fn auth_failure(&self, reason: super::AuthFailureReason) {
529 metrics::counter!("dfe_auth_failures_total", "reason" => reason.as_label()).increment(1);
530 }
531
532 #[inline]
534 pub fn validation_failure(&self, reason: super::ValidationFailureReason) {
535 metrics::counter!("dfe_validation_failures_total", "reason" => reason.as_label())
536 .increment(1);
537 }
538}
539
540#[cfg(test)]
541mod tests {
542 use super::*;
543
544 #[tokio::test]
545 async fn test_register_does_not_panic() {
546 let mgr = super::super::MetricsManager::new_for_test("test_app");
547 let _dfe = DfeMetrics::register(&mgr);
548 }
549
550 #[tokio::test]
551 async fn test_register_populates_registry() {
552 let mgr = super::super::MetricsManager::new_for_test("test_app");
553 let _dfe = DfeMetrics::register(&mgr);
554 let manifest = mgr.registry().manifest();
555 let names: Vec<&str> = manifest.metrics.iter().map(|m| m.name.as_str()).collect();
556 assert!(names.contains(&"dfe_transport_sent_total"));
557 assert!(names.contains(&"dfe_pipeline_ready"));
558 assert!(names.contains(&"dfe_records_received_total"));
559 assert!(names.contains(&"dfe_scaling_pressure"));
560 assert!(names.contains(&"dfe_spool_bytes"));
561 assert!(names.contains(&"dfe_auth_failures_total"));
562 for m in &manifest.metrics {
564 assert_eq!(m.group, "platform");
565 }
566 let sent = manifest
568 .metrics
569 .iter()
570 .find(|m| m.name == "dfe_transport_sent_total")
571 .unwrap();
572 assert_eq!(sent.labels, vec!["transport"]);
573 let auth = manifest
575 .metrics
576 .iter()
577 .find(|m| m.name == "dfe_auth_failures_total")
578 .unwrap();
579 assert_eq!(auth.labels, vec!["reason"]);
580 }
581
582 #[tokio::test]
583 async fn test_methods_callable_without_recorder() {
584 let mgr = super::super::MetricsManager::new("test_app");
585 let dfe = DfeMetrics::register(&mgr);
586
587 dfe.transport_sent(super::super::TransportKind::Kafka, 1);
588 dfe.transport_send_errors(super::super::TransportKind::Kafka, 1);
589 dfe.transport_backpressured("kafka", 1);
590 dfe.transport_refused("kafka", 1);
591 dfe.transport_healthy("kafka", true);
592 dfe.transport_queue_size("kafka", 100.0);
593 dfe.transport_queue_capacity("kafka", 1000.0);
594 dfe.transport_inflight("kafka", 50.0);
595 dfe.transport_send_duration("kafka", 0.042);
596
597 dfe.pipeline_ready(true);
598 dfe.pipeline_stall(1);
599
600 dfe.records_received(100);
601 dfe.records_delivered(99);
602 dfe.records_filtered(1);
603 dfe.records_dlq(0);
604
605 dfe.scaling_pressure(42.0);
606 dfe.scaling_circuit_open(false);
607 dfe.scaling_memory_pressure(0.65);
608
609 dfe.spool_bytes(1024.0);
610 dfe.spool_messages(10.0);
611 dfe.spool_disk_available(1_000_000.0);
612
613 dfe.auth_failure(super::super::AuthFailureReason::MalformedToken);
614 dfe.validation_failure(super::super::ValidationFailureReason::FieldMissing);
615 }
616}