1use super::manifest::{MetricDescriptor, MetricType};
29
30pub struct DfeMetrics {
36 _private: (),
38}
39
40impl DfeMetrics {
41 #[must_use]
49 #[allow(clippy::too_many_lines)]
50 pub fn register(manager: &super::MetricsManager) -> Self {
51 let reg = manager.registry();
52
53 metrics::describe_counter!(
55 "dfe_transport_sent_total",
56 "Messages successfully sent to transport"
57 );
58 metrics::describe_counter!(
59 "dfe_transport_send_errors_total",
60 "Messages that failed to send"
61 );
62 metrics::describe_counter!(
63 "dfe_transport_backpressured_total",
64 "Messages delayed due to backpressure"
65 );
66 metrics::describe_counter!(
67 "dfe_transport_refused_total",
68 "Messages refused by transport (circuit open, capacity)"
69 );
70 metrics::describe_gauge!(
71 "dfe_transport_healthy",
72 "Transport health (1=healthy, 0=unhealthy)"
73 );
74 metrics::describe_gauge!(
75 "dfe_transport_queue_size",
76 "Current number of messages in transport queue"
77 );
78 metrics::describe_gauge!(
79 "dfe_transport_queue_capacity",
80 "Maximum transport queue capacity"
81 );
82 metrics::describe_gauge!(
83 "dfe_transport_inflight",
84 "Messages currently in-flight (sent but not acked)"
85 );
86 metrics::describe_histogram!(
87 "dfe_transport_send_duration_seconds",
88 metrics::Unit::Seconds,
89 "Time to send a batch to transport"
90 );
91
92 for (name, desc, mt) in [
94 (
95 "dfe_transport_sent_total",
96 "Messages successfully sent to transport",
97 MetricType::Counter,
98 ),
99 (
100 "dfe_transport_send_errors_total",
101 "Messages that failed to send",
102 MetricType::Counter,
103 ),
104 (
105 "dfe_transport_backpressured_total",
106 "Messages delayed due to backpressure",
107 MetricType::Counter,
108 ),
109 (
110 "dfe_transport_refused_total",
111 "Messages refused by transport (circuit open, capacity)",
112 MetricType::Counter,
113 ),
114 (
115 "dfe_transport_healthy",
116 "Transport health (1=healthy, 0=unhealthy)",
117 MetricType::Gauge,
118 ),
119 (
120 "dfe_transport_queue_size",
121 "Current number of messages in transport queue",
122 MetricType::Gauge,
123 ),
124 (
125 "dfe_transport_queue_capacity",
126 "Maximum transport queue capacity",
127 MetricType::Gauge,
128 ),
129 (
130 "dfe_transport_inflight",
131 "Messages currently in-flight (sent but not acked)",
132 MetricType::Gauge,
133 ),
134 ] {
135 reg.push(MetricDescriptor {
136 name: name.into(),
137 metric_type: mt,
138 description: desc.into(),
139 unit: String::new(),
140 labels: vec!["transport".into()],
141 group: "platform".into(),
142 buckets: None,
143 use_cases: vec![],
144 dashboard_hint: None,
145 });
146 }
147 reg.push(MetricDescriptor {
148 name: "dfe_transport_send_duration_seconds".into(),
149 metric_type: MetricType::Histogram,
150 description: "Time to send a batch to transport".into(),
151 unit: "seconds".into(),
152 labels: vec!["transport".into()],
153 group: "platform".into(),
154 buckets: None,
155 use_cases: vec![],
156 dashboard_hint: None,
157 });
158
159 metrics::describe_gauge!(
161 "dfe_pipeline_ready",
162 "Pipeline readiness (1=ready, 0=not ready)"
163 );
164 metrics::describe_counter!(
165 "dfe_pipeline_stall_seconds_total",
166 "Cumulative seconds the pipeline was stalled"
167 );
168
169 reg.push(MetricDescriptor {
170 name: "dfe_pipeline_ready".into(),
171 metric_type: MetricType::Gauge,
172 description: "Pipeline readiness (1=ready, 0=not ready)".into(),
173 unit: String::new(),
174 labels: vec![],
175 group: "platform".into(),
176 buckets: None,
177 use_cases: vec![],
178 dashboard_hint: None,
179 });
180 reg.push(MetricDescriptor {
181 name: "dfe_pipeline_stall_seconds_total".into(),
182 metric_type: MetricType::Counter,
183 description: "Cumulative seconds the pipeline was stalled".into(),
184 unit: "seconds".into(),
185 labels: vec![],
186 group: "platform".into(),
187 buckets: None,
188 use_cases: vec![],
189 dashboard_hint: None,
190 });
191
192 metrics::describe_counter!(
194 "dfe_records_received_total",
195 "Records received from all sources"
196 );
197 metrics::describe_counter!(
198 "dfe_records_delivered_total",
199 "Records successfully delivered to sink"
200 );
201 metrics::describe_counter!(
202 "dfe_records_filtered_total",
203 "Records dropped by filter/routing rules"
204 );
205 metrics::describe_counter!("dfe_records_dlq_total", "Records sent to dead letter queue");
206
207 for (name, desc) in [
208 (
209 "dfe_records_received_total",
210 "Records received from all sources",
211 ),
212 (
213 "dfe_records_delivered_total",
214 "Records successfully delivered to sink",
215 ),
216 (
217 "dfe_records_filtered_total",
218 "Records dropped by filter/routing rules",
219 ),
220 ("dfe_records_dlq_total", "Records sent to dead letter queue"),
221 ] {
222 reg.push(MetricDescriptor {
223 name: name.into(),
224 metric_type: MetricType::Counter,
225 description: desc.into(),
226 unit: String::new(),
227 labels: vec![],
228 group: "platform".into(),
229 buckets: None,
230 use_cases: vec![],
231 dashboard_hint: None,
232 });
233 }
234
235 metrics::describe_gauge!(
237 "dfe_scaling_pressure",
238 "Normalised scaling pressure (0-100)"
239 );
240 metrics::describe_gauge!(
241 "dfe_scaling_circuit_open",
242 "Circuit breaker state (1=open, 0=closed)"
243 );
244 metrics::describe_gauge!(
245 "dfe_scaling_memory_pressure",
246 "Memory pressure ratio (0.0-1.0)"
247 );
248
249 for (name, desc) in [
250 (
251 "dfe_scaling_pressure",
252 "Normalised scaling pressure (0-100)",
253 ),
254 (
255 "dfe_scaling_circuit_open",
256 "Circuit breaker state (1=open, 0=closed)",
257 ),
258 (
259 "dfe_scaling_memory_pressure",
260 "Memory pressure ratio (0.0-1.0)",
261 ),
262 ] {
263 reg.push(MetricDescriptor {
264 name: name.into(),
265 metric_type: MetricType::Gauge,
266 description: desc.into(),
267 unit: String::new(),
268 labels: vec![],
269 group: "platform".into(),
270 buckets: None,
271 use_cases: vec![],
272 dashboard_hint: None,
273 });
274 }
275
276 metrics::describe_gauge!("dfe_spool_bytes", "Current spool size in bytes");
278 metrics::describe_gauge!("dfe_spool_messages", "Current spool message count");
279 metrics::describe_gauge!(
280 "dfe_spool_disk_available",
281 "Available disk space for spool in bytes"
282 );
283
284 for (name, desc) in [
285 ("dfe_spool_bytes", "Current spool size in bytes"),
286 ("dfe_spool_messages", "Current spool message count"),
287 (
288 "dfe_spool_disk_available",
289 "Available disk space for spool in bytes",
290 ),
291 ] {
292 reg.push(MetricDescriptor {
293 name: name.into(),
294 metric_type: MetricType::Gauge,
295 description: desc.into(),
296 unit: String::new(),
297 labels: vec![],
298 group: "platform".into(),
299 buckets: None,
300 use_cases: vec![],
301 dashboard_hint: None,
302 });
303 }
304
305 metrics::describe_counter!(
307 "dfe_auth_failures_total",
308 "Authentication failures by reason"
309 );
310 metrics::describe_counter!(
311 "dfe_validation_failures_total",
312 "Validation failures by reason"
313 );
314
315 reg.push(MetricDescriptor {
316 name: "dfe_auth_failures_total".into(),
317 metric_type: MetricType::Counter,
318 description: "Authentication failures by reason".into(),
319 unit: String::new(),
320 labels: vec!["reason".into()],
321 group: "platform".into(),
322 buckets: None,
323 use_cases: vec![],
324 dashboard_hint: None,
325 });
326 reg.push(MetricDescriptor {
327 name: "dfe_validation_failures_total".into(),
328 metric_type: MetricType::Counter,
329 description: "Validation failures by reason".into(),
330 unit: String::new(),
331 labels: vec!["reason".into()],
332 group: "platform".into(),
333 buckets: None,
334 use_cases: vec![],
335 dashboard_hint: None,
336 });
337
338 Self { _private: () }
339 }
340
341 #[inline]
345 pub fn transport_sent(&self, transport: super::TransportKind, count: u64) {
346 metrics::counter!("dfe_transport_sent_total", "transport" => transport.as_label())
347 .increment(count);
348 }
349
350 #[inline]
352 pub fn transport_send_errors(&self, transport: super::TransportKind, count: u64) {
353 metrics::counter!("dfe_transport_send_errors_total", "transport" => transport.as_label())
354 .increment(count);
355 }
356
357 #[inline]
359 pub fn transport_backpressured(&self, transport: &str, count: u64) {
360 metrics::counter!("dfe_transport_backpressured_total", "transport" => transport.to_string())
361 .increment(count);
362 }
363
364 #[inline]
366 pub fn transport_refused(&self, transport: &str, count: u64) {
367 metrics::counter!("dfe_transport_refused_total", "transport" => transport.to_string())
368 .increment(count);
369 }
370
371 #[inline]
373 pub fn transport_healthy(&self, transport: &str, healthy: bool) {
374 metrics::gauge!("dfe_transport_healthy", "transport" => transport.to_string())
375 .set(if healthy { 1.0 } else { 0.0 });
376 }
377
378 #[inline]
380 pub fn transport_queue_size(&self, transport: &str, size: f64) {
381 metrics::gauge!("dfe_transport_queue_size", "transport" => transport.to_string()).set(size);
382 }
383
384 #[inline]
386 pub fn transport_queue_capacity(&self, transport: &str, capacity: f64) {
387 metrics::gauge!("dfe_transport_queue_capacity", "transport" => transport.to_string())
388 .set(capacity);
389 }
390
391 #[inline]
393 pub fn transport_inflight(&self, transport: &str, count: f64) {
394 metrics::gauge!("dfe_transport_inflight", "transport" => transport.to_string()).set(count);
395 }
396
397 #[inline]
399 pub fn transport_send_duration(&self, transport: &str, seconds: f64) {
400 metrics::histogram!(
401 "dfe_transport_send_duration_seconds",
402 "transport" => transport.to_string()
403 )
404 .record(seconds);
405 }
406
407 #[inline]
411 pub fn pipeline_ready(&self, ready: bool) {
412 metrics::gauge!("dfe_pipeline_ready").set(if ready { 1.0 } else { 0.0 });
413 }
414
415 #[inline]
417 pub fn pipeline_stall(&self, seconds: u64) {
418 metrics::counter!("dfe_pipeline_stall_seconds_total").increment(seconds);
419 }
420
421 #[inline]
425 pub fn records_received(&self, count: u64) {
426 metrics::counter!("dfe_records_received_total").increment(count);
427 }
428
429 #[inline]
431 pub fn records_delivered(&self, count: u64) {
432 metrics::counter!("dfe_records_delivered_total").increment(count);
433 }
434
435 #[inline]
437 pub fn records_filtered(&self, count: u64) {
438 metrics::counter!("dfe_records_filtered_total").increment(count);
439 }
440
441 #[inline]
443 pub fn records_dlq(&self, count: u64) {
444 metrics::counter!("dfe_records_dlq_total").increment(count);
445 }
446
447 #[inline]
451 pub fn scaling_pressure(&self, pressure: f64) {
452 metrics::gauge!("dfe_scaling_pressure").set(pressure);
453 }
454
455 #[inline]
457 pub fn scaling_circuit_open(&self, open: bool) {
458 metrics::gauge!("dfe_scaling_circuit_open").set(if open { 1.0 } else { 0.0 });
459 }
460
461 #[inline]
463 pub fn scaling_memory_pressure(&self, ratio: f64) {
464 metrics::gauge!("dfe_scaling_memory_pressure").set(ratio);
465 }
466
467 #[inline]
471 pub fn spool_bytes(&self, bytes: f64) {
472 metrics::gauge!("dfe_spool_bytes").set(bytes);
473 }
474
475 #[inline]
477 pub fn spool_messages(&self, count: f64) {
478 metrics::gauge!("dfe_spool_messages").set(count);
479 }
480
481 #[inline]
483 pub fn spool_disk_available(&self, bytes: f64) {
484 metrics::gauge!("dfe_spool_disk_available").set(bytes);
485 }
486
487 #[inline]
491 pub fn auth_failure(&self, reason: super::AuthFailureReason) {
492 metrics::counter!("dfe_auth_failures_total", "reason" => reason.as_label()).increment(1);
493 }
494
495 #[inline]
497 pub fn validation_failure(&self, reason: super::ValidationFailureReason) {
498 metrics::counter!("dfe_validation_failures_total", "reason" => reason.as_label())
499 .increment(1);
500 }
501}
502
503#[cfg(test)]
504mod tests {
505 use super::*;
506
507 #[tokio::test]
508 async fn test_register_does_not_panic() {
509 let mgr = super::super::MetricsManager::new_for_test("test_app");
510 let _dfe = DfeMetrics::register(&mgr);
511 }
512
513 #[tokio::test]
514 async fn test_register_populates_registry() {
515 let mgr = super::super::MetricsManager::new_for_test("test_app");
516 let _dfe = DfeMetrics::register(&mgr);
517 let manifest = mgr.registry().manifest();
518 let names: Vec<&str> = manifest.metrics.iter().map(|m| m.name.as_str()).collect();
519 assert!(names.contains(&"dfe_transport_sent_total"));
520 assert!(names.contains(&"dfe_pipeline_ready"));
521 assert!(names.contains(&"dfe_records_received_total"));
522 assert!(names.contains(&"dfe_scaling_pressure"));
523 assert!(names.contains(&"dfe_spool_bytes"));
524 assert!(names.contains(&"dfe_auth_failures_total"));
525 for m in &manifest.metrics {
527 assert_eq!(m.group, "platform");
528 }
529 let sent = manifest
531 .metrics
532 .iter()
533 .find(|m| m.name == "dfe_transport_sent_total")
534 .unwrap();
535 assert_eq!(sent.labels, vec!["transport"]);
536 let auth = manifest
538 .metrics
539 .iter()
540 .find(|m| m.name == "dfe_auth_failures_total")
541 .unwrap();
542 assert_eq!(auth.labels, vec!["reason"]);
543 }
544
545 #[tokio::test]
546 async fn test_methods_callable_without_recorder() {
547 let mgr = super::super::MetricsManager::new("test_app");
548 let dfe = DfeMetrics::register(&mgr);
549
550 dfe.transport_sent(super::super::TransportKind::Kafka, 1);
551 dfe.transport_send_errors(super::super::TransportKind::Kafka, 1);
552 dfe.transport_backpressured("kafka", 1);
553 dfe.transport_refused("kafka", 1);
554 dfe.transport_healthy("kafka", true);
555 dfe.transport_queue_size("kafka", 100.0);
556 dfe.transport_queue_capacity("kafka", 1000.0);
557 dfe.transport_inflight("kafka", 50.0);
558 dfe.transport_send_duration("kafka", 0.042);
559
560 dfe.pipeline_ready(true);
561 dfe.pipeline_stall(1);
562
563 dfe.records_received(100);
564 dfe.records_delivered(99);
565 dfe.records_filtered(1);
566 dfe.records_dlq(0);
567
568 dfe.scaling_pressure(42.0);
569 dfe.scaling_circuit_open(false);
570 dfe.scaling_memory_pressure(0.65);
571
572 dfe.spool_bytes(1024.0);
573 dfe.spool_messages(10.0);
574 dfe.spool_disk_available(1_000_000.0);
575
576 dfe.auth_failure(super::super::AuthFailureReason::MalformedToken);
577 dfe.validation_failure(super::super::ValidationFailureReason::FieldMissing);
578 }
579}