1use super::manifest::{MetricDescriptor, MetricType};
32
33pub struct DfeMetrics {
41 _private: (),
43}
44
45impl DfeMetrics {
46 #[must_use]
56 #[allow(clippy::too_many_lines)]
57 pub fn register(manager: &super::MetricsManager) -> Self {
58 let reg = manager.registry();
59
60 metrics::describe_counter!(
62 "dfe_transport_sent_total",
63 "Messages successfully sent to transport"
64 );
65 metrics::describe_counter!(
66 "dfe_transport_send_errors_total",
67 "Messages that failed to send"
68 );
69 metrics::describe_counter!(
70 "dfe_transport_backpressured_total",
71 "Messages delayed due to backpressure"
72 );
73 metrics::describe_counter!(
74 "dfe_transport_refused_total",
75 "Messages refused by transport (circuit open, capacity)"
76 );
77 metrics::describe_gauge!(
78 "dfe_transport_healthy",
79 "Transport health (1=healthy, 0=unhealthy)"
80 );
81 metrics::describe_gauge!(
82 "dfe_transport_queue_size",
83 "Current number of messages in transport queue"
84 );
85 metrics::describe_gauge!(
86 "dfe_transport_queue_capacity",
87 "Maximum transport queue capacity"
88 );
89 metrics::describe_gauge!(
90 "dfe_transport_inflight",
91 "Messages currently in-flight (sent but not acked)"
92 );
93 metrics::describe_histogram!(
94 "dfe_transport_send_duration_seconds",
95 metrics::Unit::Seconds,
96 "Time to send a batch to transport"
97 );
98
99 for (name, desc, mt) in [
101 (
102 "dfe_transport_sent_total",
103 "Messages successfully sent to transport",
104 MetricType::Counter,
105 ),
106 (
107 "dfe_transport_send_errors_total",
108 "Messages that failed to send",
109 MetricType::Counter,
110 ),
111 (
112 "dfe_transport_backpressured_total",
113 "Messages delayed due to backpressure",
114 MetricType::Counter,
115 ),
116 (
117 "dfe_transport_refused_total",
118 "Messages refused by transport (circuit open, capacity)",
119 MetricType::Counter,
120 ),
121 (
122 "dfe_transport_healthy",
123 "Transport health (1=healthy, 0=unhealthy)",
124 MetricType::Gauge,
125 ),
126 (
127 "dfe_transport_queue_size",
128 "Current number of messages in transport queue",
129 MetricType::Gauge,
130 ),
131 (
132 "dfe_transport_queue_capacity",
133 "Maximum transport queue capacity",
134 MetricType::Gauge,
135 ),
136 (
137 "dfe_transport_inflight",
138 "Messages currently in-flight (sent but not acked)",
139 MetricType::Gauge,
140 ),
141 ] {
142 reg.push(MetricDescriptor {
143 name: name.into(),
144 metric_type: mt,
145 description: desc.into(),
146 unit: String::new(),
147 labels: vec!["transport".into()],
148 group: "platform".into(),
149 buckets: None,
150 use_cases: vec![],
151 dashboard_hint: None,
152 });
153 }
154 reg.push(MetricDescriptor {
155 name: "dfe_transport_send_duration_seconds".into(),
156 metric_type: MetricType::Histogram,
157 description: "Time to send a batch to transport".into(),
158 unit: "seconds".into(),
159 labels: vec!["transport".into()],
160 group: "platform".into(),
161 buckets: None,
162 use_cases: vec![],
163 dashboard_hint: None,
164 });
165
166 metrics::describe_gauge!(
168 "dfe_pipeline_ready",
169 "Pipeline readiness (1=ready, 0=not ready)"
170 );
171 metrics::describe_counter!(
172 "dfe_pipeline_stall_seconds_total",
173 "Cumulative seconds the pipeline was stalled"
174 );
175
176 reg.push(MetricDescriptor {
177 name: "dfe_pipeline_ready".into(),
178 metric_type: MetricType::Gauge,
179 description: "Pipeline readiness (1=ready, 0=not ready)".into(),
180 unit: String::new(),
181 labels: vec![],
182 group: "platform".into(),
183 buckets: None,
184 use_cases: vec![],
185 dashboard_hint: None,
186 });
187 reg.push(MetricDescriptor {
188 name: "dfe_pipeline_stall_seconds_total".into(),
189 metric_type: MetricType::Counter,
190 description: "Cumulative seconds the pipeline was stalled".into(),
191 unit: "seconds".into(),
192 labels: vec![],
193 group: "platform".into(),
194 buckets: None,
195 use_cases: vec![],
196 dashboard_hint: None,
197 });
198
199 metrics::describe_counter!(
201 "dfe_records_received_total",
202 "Records received from all sources"
203 );
204 metrics::describe_counter!(
205 "dfe_records_delivered_total",
206 "Records successfully delivered to sink"
207 );
208 metrics::describe_counter!(
209 "dfe_records_filtered_total",
210 "Records dropped by filter/routing rules"
211 );
212 metrics::describe_counter!("dfe_records_dlq_total", "Records sent to dead letter queue");
213
214 for (name, desc) in [
215 (
216 "dfe_records_received_total",
217 "Records received from all sources",
218 ),
219 (
220 "dfe_records_delivered_total",
221 "Records successfully delivered to sink",
222 ),
223 (
224 "dfe_records_filtered_total",
225 "Records dropped by filter/routing rules",
226 ),
227 ("dfe_records_dlq_total", "Records sent to dead letter queue"),
228 ] {
229 reg.push(MetricDescriptor {
230 name: name.into(),
231 metric_type: MetricType::Counter,
232 description: desc.into(),
233 unit: String::new(),
234 labels: vec![],
235 group: "platform".into(),
236 buckets: None,
237 use_cases: vec![],
238 dashboard_hint: None,
239 });
240 }
241
242 metrics::describe_gauge!(
244 "dfe_scaling_pressure",
245 "Normalised scaling pressure (0-100)"
246 );
247 metrics::describe_gauge!(
248 "dfe_scaling_circuit_open",
249 "Circuit breaker state (1=open, 0=closed)"
250 );
251 metrics::describe_gauge!(
252 "dfe_scaling_memory_pressure",
253 "Memory pressure ratio (0.0-1.0)"
254 );
255
256 for (name, desc) in [
257 (
258 "dfe_scaling_pressure",
259 "Normalised scaling pressure (0-100)",
260 ),
261 (
262 "dfe_scaling_circuit_open",
263 "Circuit breaker state (1=open, 0=closed)",
264 ),
265 (
266 "dfe_scaling_memory_pressure",
267 "Memory pressure ratio (0.0-1.0)",
268 ),
269 ] {
270 reg.push(MetricDescriptor {
271 name: name.into(),
272 metric_type: MetricType::Gauge,
273 description: desc.into(),
274 unit: String::new(),
275 labels: vec![],
276 group: "platform".into(),
277 buckets: None,
278 use_cases: vec![],
279 dashboard_hint: None,
280 });
281 }
282
283 metrics::describe_gauge!("dfe_spool_bytes", "Current spool size in bytes");
285 metrics::describe_gauge!("dfe_spool_messages", "Current spool message count");
286 metrics::describe_gauge!(
287 "dfe_spool_disk_available",
288 "Available disk space for spool in bytes"
289 );
290
291 for (name, desc) in [
292 ("dfe_spool_bytes", "Current spool size in bytes"),
293 ("dfe_spool_messages", "Current spool message count"),
294 (
295 "dfe_spool_disk_available",
296 "Available disk space for spool in bytes",
297 ),
298 ] {
299 reg.push(MetricDescriptor {
300 name: name.into(),
301 metric_type: MetricType::Gauge,
302 description: desc.into(),
303 unit: String::new(),
304 labels: vec![],
305 group: "platform".into(),
306 buckets: None,
307 use_cases: vec![],
308 dashboard_hint: None,
309 });
310 }
311
312 metrics::describe_counter!(
314 "dfe_auth_failures_total",
315 "Authentication failures by reason"
316 );
317 metrics::describe_counter!(
318 "dfe_validation_failures_total",
319 "Validation failures by reason"
320 );
321
322 reg.push(MetricDescriptor {
323 name: "dfe_auth_failures_total".into(),
324 metric_type: MetricType::Counter,
325 description: "Authentication failures by reason".into(),
326 unit: String::new(),
327 labels: vec!["reason".into()],
328 group: "platform".into(),
329 buckets: None,
330 use_cases: vec![],
331 dashboard_hint: None,
332 });
333 reg.push(MetricDescriptor {
334 name: "dfe_validation_failures_total".into(),
335 metric_type: MetricType::Counter,
336 description: "Validation failures by reason".into(),
337 unit: String::new(),
338 labels: vec!["reason".into()],
339 group: "platform".into(),
340 buckets: None,
341 use_cases: vec![],
342 dashboard_hint: None,
343 });
344
345 Self { _private: () }
346 }
347
348 #[inline]
352 pub fn transport_sent(&self, transport: &str, count: u64) {
353 metrics::counter!("dfe_transport_sent_total", "transport" => transport.to_string())
354 .increment(count);
355 }
356
357 #[inline]
359 pub fn transport_send_errors(&self, transport: &str, count: u64) {
360 metrics::counter!("dfe_transport_send_errors_total", "transport" => transport.to_string())
361 .increment(count);
362 }
363
364 #[inline]
366 pub fn transport_backpressured(&self, transport: &str, count: u64) {
367 metrics::counter!("dfe_transport_backpressured_total", "transport" => transport.to_string())
368 .increment(count);
369 }
370
371 #[inline]
373 pub fn transport_refused(&self, transport: &str, count: u64) {
374 metrics::counter!("dfe_transport_refused_total", "transport" => transport.to_string())
375 .increment(count);
376 }
377
378 #[inline]
380 pub fn transport_healthy(&self, transport: &str, healthy: bool) {
381 metrics::gauge!("dfe_transport_healthy", "transport" => transport.to_string())
382 .set(if healthy { 1.0 } else { 0.0 });
383 }
384
385 #[inline]
387 pub fn transport_queue_size(&self, transport: &str, size: f64) {
388 metrics::gauge!("dfe_transport_queue_size", "transport" => transport.to_string()).set(size);
389 }
390
391 #[inline]
393 pub fn transport_queue_capacity(&self, transport: &str, capacity: f64) {
394 metrics::gauge!("dfe_transport_queue_capacity", "transport" => transport.to_string())
395 .set(capacity);
396 }
397
398 #[inline]
400 pub fn transport_inflight(&self, transport: &str, count: f64) {
401 metrics::gauge!("dfe_transport_inflight", "transport" => transport.to_string()).set(count);
402 }
403
404 #[inline]
406 pub fn transport_send_duration(&self, transport: &str, seconds: f64) {
407 metrics::histogram!(
408 "dfe_transport_send_duration_seconds",
409 "transport" => transport.to_string()
410 )
411 .record(seconds);
412 }
413
414 #[inline]
418 pub fn pipeline_ready(&self, ready: bool) {
419 metrics::gauge!("dfe_pipeline_ready").set(if ready { 1.0 } else { 0.0 });
420 }
421
422 #[inline]
424 pub fn pipeline_stall(&self, seconds: u64) {
425 metrics::counter!("dfe_pipeline_stall_seconds_total").increment(seconds);
426 }
427
428 #[inline]
432 pub fn records_received(&self, count: u64) {
433 metrics::counter!("dfe_records_received_total").increment(count);
434 }
435
436 #[inline]
438 pub fn records_delivered(&self, count: u64) {
439 metrics::counter!("dfe_records_delivered_total").increment(count);
440 }
441
442 #[inline]
444 pub fn records_filtered(&self, count: u64) {
445 metrics::counter!("dfe_records_filtered_total").increment(count);
446 }
447
448 #[inline]
450 pub fn records_dlq(&self, count: u64) {
451 metrics::counter!("dfe_records_dlq_total").increment(count);
452 }
453
454 #[inline]
458 pub fn scaling_pressure(&self, pressure: f64) {
459 metrics::gauge!("dfe_scaling_pressure").set(pressure);
460 }
461
462 #[inline]
464 pub fn scaling_circuit_open(&self, open: bool) {
465 metrics::gauge!("dfe_scaling_circuit_open").set(if open { 1.0 } else { 0.0 });
466 }
467
468 #[inline]
470 pub fn scaling_memory_pressure(&self, ratio: f64) {
471 metrics::gauge!("dfe_scaling_memory_pressure").set(ratio);
472 }
473
474 #[inline]
478 pub fn spool_bytes(&self, bytes: f64) {
479 metrics::gauge!("dfe_spool_bytes").set(bytes);
480 }
481
482 #[inline]
484 pub fn spool_messages(&self, count: f64) {
485 metrics::gauge!("dfe_spool_messages").set(count);
486 }
487
488 #[inline]
490 pub fn spool_disk_available(&self, bytes: f64) {
491 metrics::gauge!("dfe_spool_disk_available").set(bytes);
492 }
493
494 #[inline]
498 pub fn auth_failure(&self, reason: &str) {
499 metrics::counter!("dfe_auth_failures_total", "reason" => reason.to_string()).increment(1);
500 }
501
502 #[inline]
504 pub fn validation_failure(&self, reason: &str) {
505 metrics::counter!("dfe_validation_failures_total", "reason" => reason.to_string())
506 .increment(1);
507 }
508}
509
510#[cfg(test)]
511mod tests {
512 use super::*;
513
514 #[tokio::test]
515 async fn test_register_does_not_panic() {
516 let mgr = super::super::MetricsManager::new("test_app");
517 let _dfe = DfeMetrics::register(&mgr);
518 }
519
520 #[tokio::test]
521 async fn test_register_populates_registry() {
522 let mgr = super::super::MetricsManager::new("test_app");
523 let _dfe = DfeMetrics::register(&mgr);
524 let manifest = mgr.registry().manifest();
525 let names: Vec<&str> = manifest.metrics.iter().map(|m| m.name.as_str()).collect();
526 assert!(names.contains(&"dfe_transport_sent_total"));
527 assert!(names.contains(&"dfe_pipeline_ready"));
528 assert!(names.contains(&"dfe_records_received_total"));
529 assert!(names.contains(&"dfe_scaling_pressure"));
530 assert!(names.contains(&"dfe_spool_bytes"));
531 assert!(names.contains(&"dfe_auth_failures_total"));
532 for m in &manifest.metrics {
534 assert_eq!(m.group, "platform");
535 }
536 let sent = manifest
538 .metrics
539 .iter()
540 .find(|m| m.name == "dfe_transport_sent_total")
541 .unwrap();
542 assert_eq!(sent.labels, vec!["transport"]);
543 let auth = manifest
545 .metrics
546 .iter()
547 .find(|m| m.name == "dfe_auth_failures_total")
548 .unwrap();
549 assert_eq!(auth.labels, vec!["reason"]);
550 }
551
552 #[tokio::test]
553 async fn test_methods_callable_without_recorder() {
554 let mgr = super::super::MetricsManager::new("test_app");
555 let dfe = DfeMetrics::register(&mgr);
556
557 dfe.transport_sent("kafka", 1);
558 dfe.transport_send_errors("kafka", 1);
559 dfe.transport_backpressured("kafka", 1);
560 dfe.transport_refused("kafka", 1);
561 dfe.transport_healthy("kafka", true);
562 dfe.transport_queue_size("kafka", 100.0);
563 dfe.transport_queue_capacity("kafka", 1000.0);
564 dfe.transport_inflight("kafka", 50.0);
565 dfe.transport_send_duration("kafka", 0.042);
566
567 dfe.pipeline_ready(true);
568 dfe.pipeline_stall(1);
569
570 dfe.records_received(100);
571 dfe.records_delivered(99);
572 dfe.records_filtered(1);
573 dfe.records_dlq(0);
574
575 dfe.scaling_pressure(42.0);
576 dfe.scaling_circuit_open(false);
577 dfe.scaling_memory_pressure(0.65);
578
579 dfe.spool_bytes(1024.0);
580 dfe.spool_messages(10.0);
581 dfe.spool_disk_available(1_000_000.0);
582
583 dfe.auth_failure("invalid_token");
584 dfe.validation_failure("missing_field");
585 }
586}