1use metrics::{counter, gauge, histogram};
35use std::time::Duration;
36
37pub fn record_peer_connection(peer_id: &str, success: bool) {
39 let status = if success { "success" } else { "failure" };
40 counter!("replication_peer_connections_total", "peer_id" => peer_id.to_string(), "status" => status).increment(1);
41}
42
43pub fn record_peer_state(peer_id: &str, state: &str) {
45 gauge!("replication_peer_state", "peer_id" => peer_id.to_string(), "state" => state.to_string()).set(1.0);
48}
49
50pub fn record_peer_ping_latency(peer_id: &str, latency: Duration) {
52 histogram!("replication_peer_ping_latency_seconds", "peer_id" => peer_id.to_string())
53 .record(latency.as_secs_f64());
54}
55
56pub fn record_peer_ping(peer_id: &str, success: bool) {
58 let status = if success { "success" } else { "failure" };
59 counter!("replication_peer_pings_total", "peer_id" => peer_id.to_string(), "status" => status).increment(1);
60}
61
62pub fn record_peer_circuit_state(peer_id: &str, state: &str) {
64 counter!("replication_peer_circuit_transitions_total", "peer_id" => peer_id.to_string(), "state" => state.to_string()).increment(1);
66}
67
68pub fn record_cdc_events_read(peer_id: &str, count: usize) {
70 counter!("replication_cdc_events_read_total", "peer_id" => peer_id.to_string()).increment(count as u64);
71}
72
73pub fn record_cdc_events_applied(peer_id: &str, count: usize) {
75 counter!("replication_cdc_events_applied_total", "peer_id" => peer_id.to_string()).increment(count as u64);
76}
77
78pub fn record_cdc_events_deduped(peer_id: &str, count: usize) {
80 counter!("replication_cdc_events_deduped_total", "peer_id" => peer_id.to_string()).increment(count as u64);
81}
82
83pub fn record_stream_read_latency(peer_id: &str, duration: Duration) {
85 histogram!("replication_stream_read_duration_seconds", "peer_id" => peer_id.to_string())
86 .record(duration.as_secs_f64());
87}
88
89pub fn record_peer_operation_latency(peer_id: &str, operation: &str, duration: Duration) {
92 histogram!(
93 "replication_peer_operation_duration_seconds",
94 "peer_id" => peer_id.to_string(),
95 "operation" => operation.to_string()
96 )
97 .record(duration.as_secs_f64());
98}
99
100pub fn record_event_processing_latency(peer_id: &str, duration: Duration) {
102 histogram!("replication_event_processing_duration_seconds", "peer_id" => peer_id.to_string())
103 .record(duration.as_secs_f64());
104}
105
106pub fn record_cursor_persist(peer_id: &str, success: bool) {
108 let status = if success { "success" } else { "failure" };
109 counter!("replication_cursor_persists_total", "peer_id" => peer_id.to_string(), "status" => status).increment(1);
110}
111
112pub fn record_cursor_flush(flushed: usize, errors: usize) {
114 counter!("replication_cursor_flushes_total").increment(1);
115 counter!("replication_cursor_flushed_count").increment(flushed as u64);
116 if errors > 0 {
117 counter!("replication_cursor_flush_errors_total").increment(errors as u64);
118 }
119}
120
121pub fn cursor_retries_total(operation: &str) {
123 counter!("replication_cursor_retries_total", "operation" => operation.to_string()).increment(1);
124}
125
126pub fn record_replication_lag(peer_id: &str, lag_seconds: f64) {
128 gauge!("replication_lag_seconds", "peer_id" => peer_id.to_string()).set(lag_seconds);
129}
130
131pub fn record_replication_lag_events(peer_id: &str, lag_events: u64) {
133 gauge!("replication_lag_events", "peer_id" => peer_id.to_string()).set(lag_events as f64);
134}
135
136pub fn record_replication_lag_ms(peer_id: &str, lag_ms: u64) {
138 gauge!("replication_lag_ms", "peer_id" => peer_id.to_string()).set(lag_ms as f64);
139}
140
141pub fn record_adaptive_batch_size(peer_id: &str, batch_size: usize) {
143 gauge!("replication_adaptive_batch_size", "peer_id" => peer_id.to_string()).set(batch_size as f64);
144}
145
146pub fn record_repair_cycle(items_repaired: usize, duration: Duration) {
148 counter!("replication_repair_items_total").increment(items_repaired as u64);
149 histogram!("replication_repair_cycle_duration_seconds").record(duration.as_secs_f64());
150}
151
152pub fn record_repair_skipped(reason: &str) {
154 counter!("replication_repair_skipped_total", "reason" => reason.to_string()).increment(1);
155}
156
157pub fn record_error(peer_id: &str, error_type: &str) {
159 counter!("replication_errors_total", "peer_id" => peer_id.to_string(), "error_type" => error_type.to_string()).increment(1);
160}
161
162pub fn set_connected_peers(count: usize) {
164 gauge!("replication_connected_peers").set(count as f64);
165}
166
167pub fn set_engine_state(state: &str) {
169 let value = match state {
171 "Created" => 0.0,
172 "Connecting" => 1.0,
173 "Running" => 2.0,
174 "ShuttingDown" => 3.0,
175 "Stopped" => 4.0,
176 "Failed" => 5.0,
177 _ => -1.0,
178 };
179 gauge!("replication_engine_state").set(value);
180}
181
182pub fn record_circuit_call(circuit_name: &str, outcome: &str) {
188 counter!(
189 "replication_circuit_calls_total",
190 "circuit" => circuit_name.to_string(),
191 "outcome" => outcome.to_string()
192 )
193 .increment(1);
194}
195
196pub fn set_circuit_state(circuit_name: &str, state: &str) {
198 let value = match state {
199 "closed" => 0.0,
200 "half_open" => 1.0,
201 "open" => 2.0,
202 _ => -1.0,
203 };
204 gauge!("replication_circuit_state", "circuit" => circuit_name.to_string()).set(value);
205}
206
207pub fn record_circuit_rejection(circuit_name: &str) {
209 counter!(
210 "replication_circuit_rejections_total",
211 "circuit" => circuit_name.to_string()
212 )
213 .increment(1);
214}
215
216pub fn record_batch_flush(
222 peer_id: &str,
223 total: usize,
224 submitted: usize,
225 deleted: usize,
226 skipped: usize,
227 errors: usize,
228 duration: Duration,
229) {
230 let peer = peer_id.to_string();
231
232 counter!("replication_batch_events_total", "peer_id" => peer.clone())
233 .increment(total as u64);
234 counter!("replication_batch_submitted_total", "peer_id" => peer.clone())
235 .increment(submitted as u64);
236 counter!("replication_batch_deleted_total", "peer_id" => peer.clone())
237 .increment(deleted as u64);
238 counter!("replication_batch_skipped_total", "peer_id" => peer.clone())
239 .increment(skipped as u64);
240
241 if errors > 0 {
242 counter!("replication_batch_errors_total", "peer_id" => peer.clone())
243 .increment(errors as u64);
244 }
245
246 histogram!("replication_batch_flush_duration_seconds", "peer_id" => peer.clone())
247 .record(duration.as_secs_f64());
248 histogram!("replication_batch_size", "peer_id" => peer).record(total as f64);
249}
250
251pub fn record_batch_dedup(peer_id: &str, before_dedup: usize, after_dedup: usize) {
253 let deduped = before_dedup.saturating_sub(after_dedup);
254 if deduped > 0 {
255 counter!("replication_batch_deduped_total", "peer_id" => peer_id.to_string())
256 .increment(deduped as u64);
257 }
258}
259
260pub fn record_stream_trimmed(peer_id: &str) {
266 counter!("replication_stream_trimmed_total", "peer_id" => peer_id.to_string()).increment(1);
267}
268
269pub fn record_backpressure_pause(peer_id: &str) {
271 counter!("replication_backpressure_pauses_total", "peer_id" => peer_id.to_string()).increment(1);
272}
273
274pub fn record_stream_read(peer_id: &str, events_count: usize, duration: Duration) {
276 counter!("replication_stream_reads_total", "peer_id" => peer_id.to_string()).increment(1);
277 if events_count > 0 {
278 counter!("replication_stream_events_read_total", "peer_id" => peer_id.to_string())
279 .increment(events_count as u64);
280 }
281 histogram!("replication_stream_read_duration_seconds", "peer_id" => peer_id.to_string())
282 .record(duration.as_secs_f64());
283}
284
285pub fn record_repair_cycle_complete(
291 peers_checked: usize,
292 peers_in_sync: usize,
293 items_fetched: usize,
294 items_submitted: usize,
295 errors: usize,
296 duration: Duration,
297) {
298 counter!("replication_repair_cycles_total").increment(1);
299 counter!("replication_repair_peers_checked_total").increment(peers_checked as u64);
300 counter!("replication_repair_peers_in_sync_total").increment(peers_in_sync as u64);
301 counter!("replication_repair_items_fetched_total").increment(items_fetched as u64);
302 counter!("replication_repair_items_submitted_total").increment(items_submitted as u64);
303
304 if errors > 0 {
305 counter!("replication_repair_errors_total").increment(errors as u64);
306 }
307
308 histogram!("replication_repair_cycle_duration_seconds").record(duration.as_secs_f64());
309}
310
311pub fn record_merkle_divergence(peer_id: &str) {
313 counter!("replication_merkle_divergence_total", "peer_id" => peer_id.to_string()).increment(1);
314}
315
316pub fn record_slo_violation(peer_id: &str, slo_type: &str, latency_ms: u64) {
327 counter!(
328 "replication_slo_violations_total",
329 "peer_id" => peer_id.to_string(),
330 "slo_type" => slo_type.to_string()
331 )
332 .increment(1);
333
334 histogram!(
335 "replication_slo_violation_latency_ms",
336 "peer_id" => peer_id.to_string(),
337 "slo_type" => slo_type.to_string()
338 )
339 .record(latency_ms as f64);
340}
341
342pub fn set_replication_lag_slo(peer_id: &str, lag_ms: u64, threshold_ms: u64) {
344 gauge!("replication_lag_slo_ms", "peer_id" => peer_id.to_string()).set(lag_ms as f64);
345
346 let is_violation = if lag_ms > threshold_ms { 1.0 } else { 0.0 };
348 gauge!("replication_lag_slo_violation", "peer_id" => peer_id.to_string()).set(is_violation);
349}
350
351#[cfg(test)]
352mod tests {
353 use super::*;
354
355 #[test]
360 fn test_record_peer_connection_success() {
361 record_peer_connection("peer-1", true);
363 record_peer_connection("peer-2", false);
364 }
365
366 #[test]
367 fn test_record_peer_connection_empty_peer_id() {
368 record_peer_connection("", true);
370 }
371
372 #[test]
373 fn test_record_peer_state() {
374 record_peer_state("peer-1", "connected");
375 record_peer_state("peer-1", "disconnected");
376 record_peer_state("peer-1", "connecting");
377 }
378
379 #[test]
380 fn test_record_peer_ping_latency() {
381 record_peer_ping_latency("peer-1", Duration::from_millis(50));
382 record_peer_ping_latency("peer-1", Duration::from_secs(1));
383 record_peer_ping_latency("peer-1", Duration::ZERO);
384 }
385
386 #[test]
387 fn test_record_peer_ping() {
388 record_peer_ping("peer-1", true);
389 record_peer_ping("peer-1", false);
390 }
391
392 #[test]
393 fn test_record_peer_circuit_state() {
394 record_peer_circuit_state("peer-1", "closed");
395 record_peer_circuit_state("peer-1", "open");
396 record_peer_circuit_state("peer-1", "half_open");
397 }
398
399 #[test]
400 fn test_record_cdc_events() {
401 record_cdc_events_read("peer-1", 100);
402 record_cdc_events_read("peer-1", 0);
403 record_cdc_events_applied("peer-1", 50);
404 record_cdc_events_deduped("peer-1", 10);
405 }
406
407 #[test]
408 fn test_record_stream_read_latency() {
409 record_stream_read_latency("peer-1", Duration::from_millis(100));
410 record_stream_read_latency("peer-1", Duration::from_micros(500));
411 }
412
413 #[test]
414 fn test_record_peer_operation_latency() {
415 record_peer_operation_latency("peer-1", "merkle_query", Duration::from_millis(25));
416 record_peer_operation_latency("peer-1", "item_fetch", Duration::from_millis(100));
417 record_peer_operation_latency("peer-1", "xread", Duration::from_secs(5));
418 }
419
420 #[test]
421 fn test_record_event_processing_latency() {
422 record_event_processing_latency("peer-1", Duration::from_micros(100));
423 }
424
425 #[test]
426 fn test_record_cursor_persist() {
427 record_cursor_persist("peer-1", true);
428 record_cursor_persist("peer-1", false);
429 }
430
431 #[test]
432 fn test_record_cursor_flush() {
433 record_cursor_flush(10, 0);
434 record_cursor_flush(5, 2);
435 record_cursor_flush(0, 0);
436 }
437
438 #[test]
439 fn test_cursor_retries_total() {
440 cursor_retries_total("get");
441 cursor_retries_total("set");
442 cursor_retries_total("flush");
443 }
444
445 #[test]
446 fn test_record_replication_lag() {
447 record_replication_lag("peer-1", 5.5);
448 record_replication_lag("peer-1", 0.0);
449 record_replication_lag("peer-1", 1000.0);
450 }
451
452 #[test]
453 fn test_record_replication_lag_events() {
454 record_replication_lag_events("peer-1", 100);
455 record_replication_lag_events("peer-1", 0);
456 }
457
458 #[test]
459 fn test_record_replication_lag_ms() {
460 record_replication_lag_ms("peer-1", 500);
461 record_replication_lag_ms("peer-1", 0);
462 }
463
464 #[test]
465 fn test_record_adaptive_batch_size() {
466 record_adaptive_batch_size("peer-1", 100);
467 record_adaptive_batch_size("peer-1", 1000);
468 }
469
470 #[test]
471 fn test_record_repair_cycle() {
472 record_repair_cycle(50, Duration::from_secs(10));
473 record_repair_cycle(0, Duration::ZERO);
474 }
475
476 #[test]
477 fn test_record_repair_skipped() {
478 record_repair_skipped("no_peers");
479 record_repair_skipped("all_healthy");
480 record_repair_skipped("disabled");
481 }
482
483 #[test]
484 fn test_record_error() {
485 record_error("peer-1", "connection_failed");
486 record_error("peer-1", "timeout");
487 record_error("peer-1", "parse_error");
488 }
489
490 #[test]
491 fn test_set_connected_peers() {
492 set_connected_peers(0);
493 set_connected_peers(5);
494 set_connected_peers(100);
495 }
496
497 #[test]
498 fn test_set_engine_state_all_states() {
499 set_engine_state("Created");
501 set_engine_state("Connecting");
502 set_engine_state("Running");
503 set_engine_state("ShuttingDown");
504 set_engine_state("Stopped");
505 set_engine_state("Failed");
506 set_engine_state("Unknown");
508 }
509
510 #[test]
511 fn test_record_circuit_call() {
512 record_circuit_call("peer-1-circuit", "success");
513 record_circuit_call("peer-1-circuit", "failure");
514 record_circuit_call("peer-1-circuit", "rejected");
515 }
516
517 #[test]
518 fn test_set_circuit_state_all_states() {
519 set_circuit_state("peer-1-circuit", "closed");
520 set_circuit_state("peer-1-circuit", "half_open");
521 set_circuit_state("peer-1-circuit", "open");
522 set_circuit_state("peer-1-circuit", "unknown");
524 }
525
526 #[test]
527 fn test_record_circuit_rejection() {
528 record_circuit_rejection("peer-1-circuit");
529 }
530
531 #[test]
532 fn test_record_batch_flush() {
533 record_batch_flush("peer-1", 100, 80, 10, 5, 5, Duration::from_millis(50));
535 record_batch_flush("peer-1", 50, 50, 0, 0, 0, Duration::from_millis(10));
537 record_batch_flush("peer-1", 0, 0, 0, 0, 0, Duration::ZERO);
539 }
540
541 #[test]
542 fn test_record_batch_dedup() {
543 record_batch_dedup("peer-1", 100, 80);
545 record_batch_dedup("peer-1", 50, 50);
547 record_batch_dedup("peer-1", 10, 20);
549 }
550
551 #[test]
552 fn test_record_stream_trimmed() {
553 record_stream_trimmed("peer-1");
554 }
555
556 #[test]
557 fn test_record_stream_read() {
558 record_stream_read("peer-1", 100, Duration::from_millis(50));
559 record_stream_read("peer-1", 0, Duration::from_millis(5));
560 }
561
562 #[test]
563 fn test_record_repair_cycle_complete() {
564 record_repair_cycle_complete(5, 4, 100, 90, 10, Duration::from_secs(30));
566 record_repair_cycle_complete(3, 3, 0, 0, 0, Duration::from_secs(5));
568 record_repair_cycle_complete(1, 0, 50, 0, 50, Duration::from_secs(60));
570 }
571
572 #[test]
573 fn test_record_merkle_divergence() {
574 record_merkle_divergence("peer-1");
575 record_merkle_divergence("peer-2");
576 }
577
578 #[test]
579 fn test_record_slo_violation() {
580 record_slo_violation("peer-1", "stream_read", 150);
581 record_slo_violation("peer-1", "peer_op", 500);
582 record_slo_violation("peer-1", "batch_flush", 1000);
583 }
584
585 #[test]
586 fn test_set_replication_lag_slo_under_threshold() {
587 set_replication_lag_slo("peer-1", 50, 100);
589 }
590
591 #[test]
592 fn test_set_replication_lag_slo_over_threshold() {
593 set_replication_lag_slo("peer-1", 150, 100);
595 }
596
597 #[test]
598 fn test_set_replication_lag_slo_at_threshold() {
599 set_replication_lag_slo("peer-1", 100, 100);
601 }
602}