replication_engine/
metrics.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Metrics for observability.
5//!
6//! Exports Prometheus-compatible metrics for:
7//! - Peer connection status
8//! - Stream tailing performance
9//! - Replication lag
10//! - Deduplication stats
11//! - Circuit breaker state
12//! - Batch processing stats
13//!
14//! # Metric Naming Convention
15//!
16//! All metrics are prefixed with `replication_` and follow Prometheus conventions:
17//! - Counters end in `_total`
18//! - Gauges represent current state
19//! - Histograms track distributions (duration, size)
20//!
21//! # Usage
22//!
23//! ```rust,no_run
24//! use replication_engine::metrics;
25//! use std::time::Duration;
26//!
27//! // In hot_path after reading events
28//! metrics::record_cdc_events_read("peer-1", 42);
29//!
30//! // In batch processor after flush
31//! metrics::record_batch_flush("peer-1", 100, 85, 5, 8, 2, Duration::from_millis(50));
32//! ```
33
34use metrics::{counter, gauge, histogram};
35use std::time::Duration;
36
37/// Record a peer connection event.
38pub fn record_peer_connection(peer_id: &str, success: bool) {
39    let status = if success { "success" } else { "failure" };
40    counter!("replication_peer_connections_total", "peer_id" => peer_id.to_string(), "status" => status).increment(1);
41}
42
43/// Record peer connection state.
44pub fn record_peer_state(peer_id: &str, state: &str) {
45    // This is a state gauge - we set it to 1 for the current state
46    // In practice, you'd want to use a label for the state
47    gauge!("replication_peer_state", "peer_id" => peer_id.to_string(), "state" => state.to_string()).set(1.0);
48}
49
50/// Record peer ping latency (for idle peer health checks).
51pub fn record_peer_ping_latency(peer_id: &str, latency: Duration) {
52    histogram!("replication_peer_ping_latency_seconds", "peer_id" => peer_id.to_string())
53        .record(latency.as_secs_f64());
54}
55
56/// Record peer ping result.
57pub fn record_peer_ping(peer_id: &str, success: bool) {
58    let status = if success { "success" } else { "failure" };
59    counter!("replication_peer_pings_total", "peer_id" => peer_id.to_string(), "status" => status).increment(1);
60}
61
62/// Record peer circuit breaker state change.
63pub fn record_peer_circuit_state(peer_id: &str, state: &str) {
64    // Use a counter to track state transitions
65    counter!("replication_peer_circuit_transitions_total", "peer_id" => peer_id.to_string(), "state" => state.to_string()).increment(1);
66}
67
68/// Record CDC events read from a peer.
69pub fn record_cdc_events_read(peer_id: &str, count: usize) {
70    counter!("replication_cdc_events_read_total", "peer_id" => peer_id.to_string()).increment(count as u64);
71}
72
73/// Record CDC events applied (not deduplicated).
74pub fn record_cdc_events_applied(peer_id: &str, count: usize) {
75    counter!("replication_cdc_events_applied_total", "peer_id" => peer_id.to_string()).increment(count as u64);
76}
77
78/// Record CDC events deduplicated (skipped).
79pub fn record_cdc_events_deduped(peer_id: &str, count: usize) {
80    counter!("replication_cdc_events_deduped_total", "peer_id" => peer_id.to_string()).increment(count as u64);
81}
82
83/// Record stream read (XREAD) latency.
84pub fn record_stream_read_latency(peer_id: &str, duration: Duration) {
85    histogram!("replication_stream_read_duration_seconds", "peer_id" => peer_id.to_string())
86        .record(duration.as_secs_f64());
87}
88
89/// Record peer Redis operation latency by operation type.
90/// Useful for tracking Merkle queries, item fetches, etc.
91pub fn record_peer_operation_latency(peer_id: &str, operation: &str, duration: Duration) {
92    histogram!(
93        "replication_peer_operation_duration_seconds",
94        "peer_id" => peer_id.to_string(),
95        "operation" => operation.to_string()
96    )
97    .record(duration.as_secs_f64());
98}
99
100/// Record event processing latency.
101pub fn record_event_processing_latency(peer_id: &str, duration: Duration) {
102    histogram!("replication_event_processing_duration_seconds", "peer_id" => peer_id.to_string())
103        .record(duration.as_secs_f64());
104}
105
106/// Record cursor persistence.
107pub fn record_cursor_persist(peer_id: &str, success: bool) {
108    let status = if success { "success" } else { "failure" };
109    counter!("replication_cursor_persists_total", "peer_id" => peer_id.to_string(), "status" => status).increment(1);
110}
111
112/// Record cursor flush batch (debounced writes).
113pub fn record_cursor_flush(flushed: usize, errors: usize) {
114    counter!("replication_cursor_flushes_total").increment(1);
115    counter!("replication_cursor_flushed_count").increment(flushed as u64);
116    if errors > 0 {
117        counter!("replication_cursor_flush_errors_total").increment(errors as u64);
118    }
119}
120
121/// Record cursor SQLite retry (for SQLITE_BUSY/SQLITE_LOCKED).
122pub fn cursor_retries_total(operation: &str) {
123    counter!("replication_cursor_retries_total", "operation" => operation.to_string()).increment(1);
124}
125
126/// Record replication lag (time since last successful sync).
127pub fn record_replication_lag(peer_id: &str, lag_seconds: f64) {
128    gauge!("replication_lag_seconds", "peer_id" => peer_id.to_string()).set(lag_seconds);
129}
130
131/// Record replication lag in events (how many events behind stream head).
132pub fn record_replication_lag_events(peer_id: &str, lag_events: u64) {
133    gauge!("replication_lag_events", "peer_id" => peer_id.to_string()).set(lag_events as f64);
134}
135
136/// Record replication lag in milliseconds (based on stream ID timestamps).
137pub fn record_replication_lag_ms(peer_id: &str, lag_ms: u64) {
138    gauge!("replication_lag_ms", "peer_id" => peer_id.to_string()).set(lag_ms as f64);
139}
140
141/// Record current adaptive batch size for a peer.
142pub fn record_adaptive_batch_size(peer_id: &str, batch_size: usize) {
143    gauge!("replication_adaptive_batch_size", "peer_id" => peer_id.to_string()).set(batch_size as f64);
144}
145
146/// Record cold path repair cycle.
147pub fn record_repair_cycle(items_repaired: usize, duration: Duration) {
148    counter!("replication_repair_items_total").increment(items_repaired as u64);
149    histogram!("replication_repair_cycle_duration_seconds").record(duration.as_secs_f64());
150}
151
152/// Record cold path repair cycle skipped.
153pub fn record_repair_skipped(reason: &str) {
154    counter!("replication_repair_skipped_total", "reason" => reason.to_string()).increment(1);
155}
156
157/// Record errors by type.
158pub fn record_error(peer_id: &str, error_type: &str) {
159    counter!("replication_errors_total", "peer_id" => peer_id.to_string(), "error_type" => error_type.to_string()).increment(1);
160}
161
162/// Gauge for number of connected peers.
163pub fn set_connected_peers(count: usize) {
164    gauge!("replication_connected_peers").set(count as f64);
165}
166
167/// Gauge for engine state.
168pub fn set_engine_state(state: &str) {
169    // Encode state as numeric for alerting (0=stopped, 1=running, 2=degraded, etc.)
170    let value = match state {
171        "Created" => 0.0,
172        "Connecting" => 1.0,
173        "Running" => 2.0,
174        "ShuttingDown" => 3.0,
175        "Stopped" => 4.0,
176        "Failed" => 5.0,
177        _ => -1.0,
178    };
179    gauge!("replication_engine_state").set(value);
180}
181
182// =============================================================================
183// Circuit Breaker Metrics
184// =============================================================================
185
186/// Record circuit breaker call outcome.
187pub fn record_circuit_call(circuit_name: &str, outcome: &str) {
188    counter!(
189        "replication_circuit_calls_total",
190        "circuit" => circuit_name.to_string(),
191        "outcome" => outcome.to_string()
192    )
193    .increment(1);
194}
195
196/// Set circuit breaker state gauge (0=closed, 1=half_open, 2=open).
197pub fn set_circuit_state(circuit_name: &str, state: &str) {
198    let value = match state {
199        "closed" => 0.0,
200        "half_open" => 1.0,
201        "open" => 2.0,
202        _ => -1.0,
203    };
204    gauge!("replication_circuit_state", "circuit" => circuit_name.to_string()).set(value);
205}
206
207/// Record circuit breaker rejection (circuit was open).
208pub fn record_circuit_rejection(circuit_name: &str) {
209    counter!(
210        "replication_circuit_rejections_total",
211        "circuit" => circuit_name.to_string()
212    )
213    .increment(1);
214}
215
216// =============================================================================
217// Batch Processing Metrics
218// =============================================================================
219
220/// Record batch flush with detailed stats.
221pub fn record_batch_flush(
222    peer_id: &str,
223    total: usize,
224    submitted: usize,
225    deleted: usize,
226    skipped: usize,
227    errors: usize,
228    duration: Duration,
229) {
230    let peer = peer_id.to_string();
231
232    counter!("replication_batch_events_total", "peer_id" => peer.clone())
233        .increment(total as u64);
234    counter!("replication_batch_submitted_total", "peer_id" => peer.clone())
235        .increment(submitted as u64);
236    counter!("replication_batch_deleted_total", "peer_id" => peer.clone())
237        .increment(deleted as u64);
238    counter!("replication_batch_skipped_total", "peer_id" => peer.clone())
239        .increment(skipped as u64);
240
241    if errors > 0 {
242        counter!("replication_batch_errors_total", "peer_id" => peer.clone())
243            .increment(errors as u64);
244    }
245
246    histogram!("replication_batch_flush_duration_seconds", "peer_id" => peer.clone())
247        .record(duration.as_secs_f64());
248    histogram!("replication_batch_size", "peer_id" => peer).record(total as f64);
249}
250
251/// Record batch dedup stats (for monitoring dedup efficiency).
252pub fn record_batch_dedup(peer_id: &str, before_dedup: usize, after_dedup: usize) {
253    let deduped = before_dedup.saturating_sub(after_dedup);
254    if deduped > 0 {
255        counter!("replication_batch_deduped_total", "peer_id" => peer_id.to_string())
256            .increment(deduped as u64);
257    }
258}
259
260// =============================================================================
261// Stream Trim Metrics
262// =============================================================================
263
264/// Record stream trimmed event (potential data gap).
265pub fn record_stream_trimmed(peer_id: &str) {
266    counter!("replication_stream_trimmed_total", "peer_id" => peer_id.to_string()).increment(1);
267}
268
269/// Record backpressure pause (sync-engine under load).
270pub fn record_backpressure_pause(peer_id: &str) {
271    counter!("replication_backpressure_pauses_total", "peer_id" => peer_id.to_string()).increment(1);
272}
273
274/// Record stream read result.
275pub fn record_stream_read(peer_id: &str, events_count: usize, duration: Duration) {
276    counter!("replication_stream_reads_total", "peer_id" => peer_id.to_string()).increment(1);
277    if events_count > 0 {
278        counter!("replication_stream_events_read_total", "peer_id" => peer_id.to_string())
279            .increment(events_count as u64);
280    }
281    histogram!("replication_stream_read_duration_seconds", "peer_id" => peer_id.to_string())
282        .record(duration.as_secs_f64());
283}
284
285// =============================================================================
286// Cold Path Repair Metrics
287// =============================================================================
288
289/// Record cold path repair cycle completion.
290pub fn record_repair_cycle_complete(
291    peers_checked: usize,
292    peers_in_sync: usize,
293    items_fetched: usize,
294    items_submitted: usize,
295    errors: usize,
296    duration: Duration,
297) {
298    counter!("replication_repair_cycles_total").increment(1);
299    counter!("replication_repair_peers_checked_total").increment(peers_checked as u64);
300    counter!("replication_repair_peers_in_sync_total").increment(peers_in_sync as u64);
301    counter!("replication_repair_items_fetched_total").increment(items_fetched as u64);
302    counter!("replication_repair_items_submitted_total").increment(items_submitted as u64);
303
304    if errors > 0 {
305        counter!("replication_repair_errors_total").increment(errors as u64);
306    }
307
308    histogram!("replication_repair_cycle_duration_seconds").record(duration.as_secs_f64());
309}
310
311/// Record divergent peer detected during repair.
312pub fn record_merkle_divergence(peer_id: &str) {
313    counter!("replication_merkle_divergence_total", "peer_id" => peer_id.to_string()).increment(1);
314}
315
316// =============================================================================
317// SLO Violation Metrics
318// =============================================================================
319
320/// Record an SLO violation (latency threshold exceeded).
321///
322/// Labels:
323/// - `peer_id`: The peer that violated the SLO
324/// - `slo_type`: The type of SLO violated (stream_read, peer_op, batch_flush)
325/// - `latency_ms`: The actual latency observed
326pub fn record_slo_violation(peer_id: &str, slo_type: &str, latency_ms: u64) {
327    counter!(
328        "replication_slo_violations_total",
329        "peer_id" => peer_id.to_string(),
330        "slo_type" => slo_type.to_string()
331    )
332    .increment(1);
333
334    histogram!(
335        "replication_slo_violation_latency_ms",
336        "peer_id" => peer_id.to_string(),
337        "slo_type" => slo_type.to_string()
338    )
339    .record(latency_ms as f64);
340}
341
342/// Set current replication lag gauge (for SLO monitoring).
343pub fn set_replication_lag_slo(peer_id: &str, lag_ms: u64, threshold_ms: u64) {
344    gauge!("replication_lag_slo_ms", "peer_id" => peer_id.to_string()).set(lag_ms as f64);
345    
346    // Track whether we're over threshold
347    let is_violation = if lag_ms > threshold_ms { 1.0 } else { 0.0 };
348    gauge!("replication_lag_slo_violation", "peer_id" => peer_id.to_string()).set(is_violation);
349}
350
351#[cfg(test)]
352mod tests {
353    use super::*;
354
355    // Note: The metrics crate uses global state. In tests, we just verify that
356    // the functions don't panic and handle edge cases correctly.
357    // For full integration testing, you'd use metrics-util's DebuggingRecorder.
358
359    #[test]
360    fn test_record_peer_connection_success() {
361        // Should not panic with valid inputs
362        record_peer_connection("peer-1", true);
363        record_peer_connection("peer-2", false);
364    }
365
366    #[test]
367    fn test_record_peer_connection_empty_peer_id() {
368        // Should handle empty peer_id gracefully
369        record_peer_connection("", true);
370    }
371
372    #[test]
373    fn test_record_peer_state() {
374        record_peer_state("peer-1", "connected");
375        record_peer_state("peer-1", "disconnected");
376        record_peer_state("peer-1", "connecting");
377    }
378
379    #[test]
380    fn test_record_peer_ping_latency() {
381        record_peer_ping_latency("peer-1", Duration::from_millis(50));
382        record_peer_ping_latency("peer-1", Duration::from_secs(1));
383        record_peer_ping_latency("peer-1", Duration::ZERO);
384    }
385
386    #[test]
387    fn test_record_peer_ping() {
388        record_peer_ping("peer-1", true);
389        record_peer_ping("peer-1", false);
390    }
391
392    #[test]
393    fn test_record_peer_circuit_state() {
394        record_peer_circuit_state("peer-1", "closed");
395        record_peer_circuit_state("peer-1", "open");
396        record_peer_circuit_state("peer-1", "half_open");
397    }
398
399    #[test]
400    fn test_record_cdc_events() {
401        record_cdc_events_read("peer-1", 100);
402        record_cdc_events_read("peer-1", 0);
403        record_cdc_events_applied("peer-1", 50);
404        record_cdc_events_deduped("peer-1", 10);
405    }
406
407    #[test]
408    fn test_record_stream_read_latency() {
409        record_stream_read_latency("peer-1", Duration::from_millis(100));
410        record_stream_read_latency("peer-1", Duration::from_micros(500));
411    }
412
413    #[test]
414    fn test_record_peer_operation_latency() {
415        record_peer_operation_latency("peer-1", "merkle_query", Duration::from_millis(25));
416        record_peer_operation_latency("peer-1", "item_fetch", Duration::from_millis(100));
417        record_peer_operation_latency("peer-1", "xread", Duration::from_secs(5));
418    }
419
420    #[test]
421    fn test_record_event_processing_latency() {
422        record_event_processing_latency("peer-1", Duration::from_micros(100));
423    }
424
425    #[test]
426    fn test_record_cursor_persist() {
427        record_cursor_persist("peer-1", true);
428        record_cursor_persist("peer-1", false);
429    }
430
431    #[test]
432    fn test_record_cursor_flush() {
433        record_cursor_flush(10, 0);
434        record_cursor_flush(5, 2);
435        record_cursor_flush(0, 0);
436    }
437
438    #[test]
439    fn test_cursor_retries_total() {
440        cursor_retries_total("get");
441        cursor_retries_total("set");
442        cursor_retries_total("flush");
443    }
444
445    #[test]
446    fn test_record_replication_lag() {
447        record_replication_lag("peer-1", 5.5);
448        record_replication_lag("peer-1", 0.0);
449        record_replication_lag("peer-1", 1000.0);
450    }
451
452    #[test]
453    fn test_record_replication_lag_events() {
454        record_replication_lag_events("peer-1", 100);
455        record_replication_lag_events("peer-1", 0);
456    }
457
458    #[test]
459    fn test_record_replication_lag_ms() {
460        record_replication_lag_ms("peer-1", 500);
461        record_replication_lag_ms("peer-1", 0);
462    }
463
464    #[test]
465    fn test_record_adaptive_batch_size() {
466        record_adaptive_batch_size("peer-1", 100);
467        record_adaptive_batch_size("peer-1", 1000);
468    }
469
470    #[test]
471    fn test_record_repair_cycle() {
472        record_repair_cycle(50, Duration::from_secs(10));
473        record_repair_cycle(0, Duration::ZERO);
474    }
475
476    #[test]
477    fn test_record_repair_skipped() {
478        record_repair_skipped("no_peers");
479        record_repair_skipped("all_healthy");
480        record_repair_skipped("disabled");
481    }
482
483    #[test]
484    fn test_record_error() {
485        record_error("peer-1", "connection_failed");
486        record_error("peer-1", "timeout");
487        record_error("peer-1", "parse_error");
488    }
489
490    #[test]
491    fn test_set_connected_peers() {
492        set_connected_peers(0);
493        set_connected_peers(5);
494        set_connected_peers(100);
495    }
496
497    #[test]
498    fn test_set_engine_state_all_states() {
499        // Test all known states
500        set_engine_state("Created");
501        set_engine_state("Connecting");
502        set_engine_state("Running");
503        set_engine_state("ShuttingDown");
504        set_engine_state("Stopped");
505        set_engine_state("Failed");
506        // Unknown state should map to -1
507        set_engine_state("Unknown");
508    }
509
510    #[test]
511    fn test_record_circuit_call() {
512        record_circuit_call("peer-1-circuit", "success");
513        record_circuit_call("peer-1-circuit", "failure");
514        record_circuit_call("peer-1-circuit", "rejected");
515    }
516
517    #[test]
518    fn test_set_circuit_state_all_states() {
519        set_circuit_state("peer-1-circuit", "closed");
520        set_circuit_state("peer-1-circuit", "half_open");
521        set_circuit_state("peer-1-circuit", "open");
522        // Unknown state
523        set_circuit_state("peer-1-circuit", "unknown");
524    }
525
526    #[test]
527    fn test_record_circuit_rejection() {
528        record_circuit_rejection("peer-1-circuit");
529    }
530
531    #[test]
532    fn test_record_batch_flush() {
533        // Normal batch
534        record_batch_flush("peer-1", 100, 80, 10, 5, 5, Duration::from_millis(50));
535        // Zero errors
536        record_batch_flush("peer-1", 50, 50, 0, 0, 0, Duration::from_millis(10));
537        // Empty batch
538        record_batch_flush("peer-1", 0, 0, 0, 0, 0, Duration::ZERO);
539    }
540
541    #[test]
542    fn test_record_batch_dedup() {
543        // Normal dedup
544        record_batch_dedup("peer-1", 100, 80);
545        // No dedup (same count)
546        record_batch_dedup("peer-1", 50, 50);
547        // Edge case: after > before (shouldn't happen but should handle)
548        record_batch_dedup("peer-1", 10, 20);
549    }
550
551    #[test]
552    fn test_record_stream_trimmed() {
553        record_stream_trimmed("peer-1");
554    }
555
556    #[test]
557    fn test_record_stream_read() {
558        record_stream_read("peer-1", 100, Duration::from_millis(50));
559        record_stream_read("peer-1", 0, Duration::from_millis(5));
560    }
561
562    #[test]
563    fn test_record_repair_cycle_complete() {
564        // Normal repair
565        record_repair_cycle_complete(5, 4, 100, 90, 10, Duration::from_secs(30));
566        // No errors
567        record_repair_cycle_complete(3, 3, 0, 0, 0, Duration::from_secs(5));
568        // All errors
569        record_repair_cycle_complete(1, 0, 50, 0, 50, Duration::from_secs(60));
570    }
571
572    #[test]
573    fn test_record_merkle_divergence() {
574        record_merkle_divergence("peer-1");
575        record_merkle_divergence("peer-2");
576    }
577
578    #[test]
579    fn test_record_slo_violation() {
580        record_slo_violation("peer-1", "stream_read", 150);
581        record_slo_violation("peer-1", "peer_op", 500);
582        record_slo_violation("peer-1", "batch_flush", 1000);
583    }
584
585    #[test]
586    fn test_set_replication_lag_slo_under_threshold() {
587        // Under threshold - no violation
588        set_replication_lag_slo("peer-1", 50, 100);
589    }
590
591    #[test]
592    fn test_set_replication_lag_slo_over_threshold() {
593        // Over threshold - violation
594        set_replication_lag_slo("peer-1", 150, 100);
595    }
596
597    #[test]
598    fn test_set_replication_lag_slo_at_threshold() {
599        // At threshold - no violation (not > threshold)
600        set_replication_lag_slo("peer-1", 100, 100);
601    }
602}