sync_engine/
metrics.rs

1//! Metrics instrumentation for sync-engine.
2//!
3//! Uses the `metrics` crate for backend-agnostic metrics collection.
4//! The parent daemon is responsible for choosing the exporter (Prometheus, OTEL, etc.)
5//!
6//! # Metric Naming Convention
7//! - `sync_engine_` prefix for all metrics
8//! - `_total` suffix for counters
9//! - `_seconds` suffix for duration histograms
10//! - `_bytes` suffix for size histograms
11//!
12//! # Labels
13//! - `tier`: L1, L2, L3
14//! - `operation`: get, put, delete, batch
15//! - `status`: success, error, rejected
16
17use metrics::{counter, gauge, histogram};
18use std::time::{Duration, Instant};
19
20/// Record a successful sync operation
21pub fn record_operation(tier: &str, operation: &str, status: &str) {
22    counter!(
23        "sync_engine_operations_total",
24        "tier" => tier.to_string(),
25        "operation" => operation.to_string(),
26        "status" => status.to_string()
27    )
28    .increment(1);
29}
30
31/// Record operation latency
32pub fn record_latency(tier: &str, operation: &str, duration: Duration) {
33    histogram!(
34        "sync_engine_operation_seconds",
35        "tier" => tier.to_string(),
36        "operation" => operation.to_string()
37    )
38    .record(duration.as_secs_f64());
39}
40
41/// Record batch size
42pub fn record_batch_size(tier: &str, count: usize) {
43    histogram!(
44        "sync_engine_batch_size",
45        "tier" => tier.to_string()
46    )
47    .record(count as f64);
48}
49
50/// Record batch bytes
51pub fn record_batch_bytes(tier: &str, bytes: usize) {
52    histogram!(
53        "sync_engine_batch_bytes",
54        "tier" => tier.to_string()
55    )
56    .record(bytes as f64);
57}
58
59/// Set current L1 cache size in bytes
60pub fn set_l1_cache_bytes(bytes: usize) {
61    gauge!("sync_engine_l1_cache_bytes").set(bytes as f64);
62}
63
64/// Set current L1 cache item count
65pub fn set_l1_cache_items(count: usize) {
66    gauge!("sync_engine_l1_cache_items").set(count as f64);
67}
68
69/// Set WAL pending entries
70pub fn set_wal_entries(count: usize) {
71    gauge!("sync_engine_wal_entries").set(count as f64);
72}
73
74/// Set WAL file size in bytes
75pub fn set_wal_bytes(bytes: u64) {
76    gauge!("sync_engine_wal_bytes").set(bytes as f64);
77}
78
79/// Set cuckoo filter capacity utilization (0.0 - 1.0)
80pub fn set_cuckoo_filter_load(filter: &str, load: f64) {
81    gauge!(
82        "sync_engine_cuckoo_filter_load",
83        "filter" => filter.to_string()
84    )
85    .set(load);
86}
87
88/// Set cuckoo filter entry count
89pub fn set_cuckoo_filter_entries(filter: &str, count: usize) {
90    gauge!(
91        "sync_engine_cuckoo_filter_entries",
92        "filter" => filter.to_string()
93    )
94    .set(count as f64);
95}
96
97/// Record eviction event
98pub fn record_eviction(count: usize, bytes: usize) {
99    counter!("sync_engine_evictions_total").increment(count as u64);
100    counter!("sync_engine_evicted_bytes_total").increment(bytes as u64);
101}
102
103/// Set memory pressure level (0.0 - 1.0)
104pub fn set_memory_pressure(pressure: f64) {
105    gauge!("sync_engine_memory_pressure").set(pressure);
106}
107
108/// Set backpressure level (0 = None, 1 = Low, 2 = Medium, 3 = High, 4 = Critical)
109pub fn set_backpressure_level(level: u8) {
110    gauge!("sync_engine_backpressure_level").set(level as f64);
111}
112
113/// Record circuit breaker state change
114pub fn set_circuit_state(circuit: &str, state: u8) {
115    gauge!(
116        "sync_engine_circuit_breaker_state",
117        "circuit" => circuit.to_string()
118    )
119    .set(state as f64);
120}
121
122/// Record data corruption detection
123pub fn record_corruption(id: &str) {
124    counter!(
125        "sync_engine_corruption_detected_total",
126        "id" => id.to_string()
127    )
128    .increment(1);
129}
130
131/// Record circuit breaker call
132pub fn record_circuit_call(circuit: &str, outcome: &str) {
133    counter!(
134        "sync_engine_circuit_breaker_calls_total",
135        "circuit" => circuit.to_string(),
136        "outcome" => outcome.to_string()
137    )
138    .increment(1);
139}
140
141// ═══════════════════════════════════════════════════════════════════════════
142// ERROR TRACKING - Categorized error counters for alerting
143// ═══════════════════════════════════════════════════════════════════════════
144
145/// Record an error with category for alerting
146pub fn record_error(tier: &str, operation: &str, error_type: &str) {
147    counter!(
148        "sync_engine_errors_total",
149        "tier" => tier.to_string(),
150        "operation" => operation.to_string(),
151        "error_type" => error_type.to_string()
152    )
153    .increment(1);
154}
155
156/// Record a connection/backend error
157pub fn record_connection_error(backend: &str) {
158    counter!(
159        "sync_engine_connection_errors_total",
160        "backend" => backend.to_string()
161    )
162    .increment(1);
163}
164
165/// Record a timeout error
166pub fn record_timeout(tier: &str, operation: &str) {
167    counter!(
168        "sync_engine_timeouts_total",
169        "tier" => tier.to_string(),
170        "operation" => operation.to_string()
171    )
172    .increment(1);
173}
174
175// ═══════════════════════════════════════════════════════════════════════════
176// THROUGHPUT - Bytes and items processed
177// ═══════════════════════════════════════════════════════════════════════════
178
179/// Record bytes written to a tier
180pub fn record_bytes_written(tier: &str, bytes: usize) {
181    counter!(
182        "sync_engine_bytes_written_total",
183        "tier" => tier.to_string()
184    )
185    .increment(bytes as u64);
186}
187
188/// Record bytes read from a tier
189pub fn record_bytes_read(tier: &str, bytes: usize) {
190    counter!(
191        "sync_engine_bytes_read_total",
192        "tier" => tier.to_string()
193    )
194    .increment(bytes as u64);
195}
196
197/// Record items written
198pub fn record_items_written(tier: &str, count: usize) {
199    counter!(
200        "sync_engine_items_written_total",
201        "tier" => tier.to_string()
202    )
203    .increment(count as u64);
204}
205
206// ═══════════════════════════════════════════════════════════════════════════
207// QUEUE DEPTHS - Pending work
208// ═══════════════════════════════════════════════════════════════════════════
209
210/// Set batch queue depth (items pending flush)
211pub fn set_batch_queue_items(count: usize) {
212    gauge!("sync_engine_batch_queue_items").set(count as f64);
213}
214
215/// Set batch queue size in bytes
216pub fn set_batch_queue_bytes(bytes: usize) {
217    gauge!("sync_engine_batch_queue_bytes").set(bytes as f64);
218}
219
220// ═══════════════════════════════════════════════════════════════════════════
221// BACKEND HEALTH - Connection status
222// ═══════════════════════════════════════════════════════════════════════════
223
224/// Set backend health status (1 = healthy, 0 = unhealthy)
225pub fn set_backend_healthy(backend: &str, healthy: bool) {
226    gauge!(
227        "sync_engine_backend_healthy",
228        "backend" => backend.to_string()
229    )
230    .set(if healthy { 1.0 } else { 0.0 });
231}
232
233// ═══════════════════════════════════════════════════════════════════════════
234// CIRCUIT BREAKER - Resilience metrics
235// ═══════════════════════════════════════════════════════════════════════════
236
237/// Record circuit breaker call outcome
238pub fn record_circuit_breaker_call(circuit: &str, outcome: &str) {
239    counter!(
240        "sync_engine_circuit_breaker_calls_total",
241        "circuit" => circuit.to_string(),
242        "outcome" => outcome.to_string()
243    )
244    .increment(1);
245}
246
247// ═══════════════════════════════════════════════════════════════════════════
248// CUCKOO FILTER - Accuracy tracking
249// ═══════════════════════════════════════════════════════════════════════════
250
251/// Record cuckoo filter false positive
252pub fn record_cuckoo_false_positive(filter: &str) {
253    counter!(
254        "sync_engine_cuckoo_false_positive_total",
255        "filter" => filter.to_string()
256    )
257    .increment(1);
258}
259
260/// Record cuckoo filter check
261pub fn record_cuckoo_check(filter: &str, result: &str) {
262    counter!(
263        "sync_engine_cuckoo_checks_total",
264        "filter" => filter.to_string(),
265        "result" => result.to_string()
266    )
267    .increment(1);
268}
269
270// ═══════════════════════════════════════════════════════════════════════════
271// STARTUP - Timing for cold start monitoring
272// ═══════════════════════════════════════════════════════════════════════════
273
274/// Record startup phase duration
275pub fn record_startup_phase(phase: &str, duration: Duration) {
276    histogram!(
277        "sync_engine_startup_seconds",
278        "phase" => phase.to_string()
279    )
280    .record(duration.as_secs_f64());
281}
282
283/// Record total startup time
284pub fn record_startup_total(duration: Duration) {
285    histogram!("sync_engine_startup_total_seconds").record(duration.as_secs_f64());
286}
287
288// ═══════════════════════════════════════════════════════════════════════════
289// BATCH FLUSH - Detailed flush metrics
290// ═══════════════════════════════════════════════════════════════════════════
291
292/// Record batch flush duration
293pub fn record_flush_duration(duration: Duration) {
294    histogram!("sync_engine_flush_seconds").record(duration.as_secs_f64());
295}
296
297/// Set engine state (for monitoring state machine transitions)
298pub fn set_engine_state(state: &str) {
299    // Use a simple counter to track state transitions
300    counter!(
301        "sync_engine_state_transitions_total",
302        "state" => state.to_string()
303    )
304    .increment(1);
305}
306
307/// Record WAL drain operation
308pub fn record_wal_drain(count: usize, success: bool) {
309    let status = if success { "success" } else { "failure" };
310    counter!(
311        "sync_engine_wal_drain_total",
312        "status" => status
313    )
314    .increment(1);
315    
316    if success {
317        counter!("sync_engine_wal_drained_items_total").increment(count as u64);
318    }
319}
320
321/// Record merkle tree operation
322pub fn record_merkle_operation(store: &str, operation: &str, success: bool) {
323    let status = if success { "success" } else { "failure" };
324    counter!(
325        "sync_engine_merkle_operations_total",
326        "store" => store.to_string(),
327        "operation" => operation.to_string(),
328        "status" => status
329    )
330    .increment(1);
331}
332
333// ═══════════════════════════════════════════════════════════════════════════
334// CDC STREAM - Change Data Capture metrics
335// ═══════════════════════════════════════════════════════════════════════════
336
337/// Record CDC entries emitted to stream
338pub fn record_cdc_entries(op: &str, count: usize) {
339    counter!(
340        "sync_engine_cdc_entries_total",
341        "op" => op.to_string()
342    )
343    .increment(count as u64);
344}
345
346// ═══════════════════════════════════════════════════════════════════════════
347// SEARCH - RediSearch and SQL search metrics
348// ═══════════════════════════════════════════════════════════════════════════
349
350/// Record a search query execution
351pub fn record_search_query(backend: &str, status: &str) {
352    counter!(
353        "sync_engine_search_queries_total",
354        "backend" => backend.to_string(),
355        "status" => status.to_string()
356    )
357    .increment(1);
358}
359
360/// Record search query latency
361pub fn record_search_latency(backend: &str, duration: Duration) {
362    histogram!(
363        "sync_engine_search_seconds",
364        "backend" => backend.to_string()
365    )
366    .record(duration.as_secs_f64());
367}
368
369/// Record search result count
370pub fn record_search_results(count: usize) {
371    histogram!("sync_engine_search_results").record(count as f64);
372}
373
374/// Record search cache hit/miss
375pub fn record_search_cache(hit: bool) {
376    let outcome = if hit { "hit" } else { "miss" };
377    counter!(
378        "sync_engine_search_cache_total",
379        "outcome" => outcome
380    )
381    .increment(1);
382}
383
384/// Set search cache stats gauge
385pub fn set_search_cache_stats(entries: usize, hit_rate: f64) {
386    gauge!("sync_engine_search_cache_entries").set(entries as f64);
387    gauge!("sync_engine_search_cache_hit_rate").set(hit_rate);
388}
389
390/// Record index creation/drop
391pub fn record_search_index_operation(operation: &str, success: bool) {
392    let status = if success { "success" } else { "failure" };
393    counter!(
394        "sync_engine_search_index_operations_total",
395        "operation" => operation.to_string(),
396        "status" => status
397    )
398    .increment(1);
399}
400
401/// A timing guard that records latency on drop
402pub struct LatencyTimer {
403    tier: &'static str,
404    operation: &'static str,
405    start: Instant,
406}
407
408impl LatencyTimer {
409    /// Start a new latency timer
410    pub fn new(tier: &'static str, operation: &'static str) -> Self {
411        Self {
412            tier,
413            operation,
414            start: Instant::now(),
415        }
416    }
417}
418
419impl Drop for LatencyTimer {
420    fn drop(&mut self) {
421        record_latency(self.tier, self.operation, self.start.elapsed());
422    }
423}
424
425/// Convenience macro for timing operations
426#[macro_export]
427macro_rules! time_operation {
428    ($tier:expr, $op:expr) => {
429        $crate::metrics::LatencyTimer::new($tier, $op)
430    };
431}
432
433#[cfg(test)]
434mod tests {
435    use super::*;
436    
437    // Note: These tests verify the API compiles and doesn't panic.
438    // In production, you'd use metrics-util's Recorder for assertions.
439    
440    #[test]
441    fn test_record_operation() {
442        record_operation("L1", "get", "success");
443        record_operation("L2", "put", "error");
444        record_operation("L3", "delete", "success");
445    }
446    
447    #[test]
448    fn test_record_latency() {
449        record_latency("L1", "get", Duration::from_micros(100));
450        record_latency("L2", "put", Duration::from_millis(5));
451        record_latency("L3", "batch", Duration::from_millis(50));
452    }
453    
454    #[test]
455    fn test_record_batch() {
456        record_batch_size("L2", 100);
457        record_batch_bytes("L2", 1024 * 50);
458    }
459    
460    #[test]
461    fn test_gauges() {
462        set_l1_cache_bytes(1024 * 1024);
463        set_l1_cache_items(5000);
464        set_wal_entries(42);
465        set_wal_bytes(1024 * 100);
466        set_memory_pressure(0.75);
467        set_backpressure_level(2);
468    }
469    
470    #[test]
471    fn test_cuckoo_filter_metrics() {
472        set_cuckoo_filter_load("L2", 0.65);
473        set_cuckoo_filter_load("L3", 0.45);
474        set_cuckoo_filter_entries("L2", 65000);
475        set_cuckoo_filter_entries("L3", 45000);
476    }
477    
478    #[test]
479    fn test_eviction_metrics() {
480        record_eviction(10, 1024 * 50);
481    }
482    
483    #[test]
484    fn test_circuit_breaker_metrics() {
485        set_circuit_state("redis", 0);
486        set_circuit_state("mysql", 2);
487        record_circuit_call("redis", "success");
488        record_circuit_call("mysql", "rejected");
489    }
490    
491    #[test]
492    fn test_wal_drain_metrics() {
493        record_wal_drain(50, true);
494        record_wal_drain(0, false);
495    }
496    
497    #[test]
498    fn test_merkle_metrics() {
499        record_merkle_operation("sql", "insert", true);
500        record_merkle_operation("redis", "batch", false);
501    }
502    
503    #[test]
504    fn test_latency_timer() {
505        {
506            let _timer = LatencyTimer::new("L1", "get");
507            // Simulate some work
508            std::thread::sleep(Duration::from_micros(10));
509        }
510        // Timer recorded on drop
511    }
512    
513    #[test]
514    fn test_engine_state_tracking() {
515        set_engine_state("Created");
516        set_engine_state("Connecting");
517        set_engine_state("Running");
518    }
519    
520    #[test]
521    fn test_search_metrics() {
522        // Search queries
523        record_search_query("redis", "success");
524        record_search_query("sql", "success");
525        record_search_query("redis", "error");
526        
527        // Search latency
528        record_search_latency("redis", Duration::from_micros(500));
529        record_search_latency("sql", Duration::from_millis(5));
530        record_search_latency("cache", Duration::from_micros(10));
531        
532        // Search results
533        record_search_results(42);
534        record_search_results(0);
535        
536        // Cache stats
537        record_search_cache(true);
538        record_search_cache(false);
539        set_search_cache_stats(100, 0.85);
540        
541        // Index operations
542        record_search_index_operation("create", true);
543        record_search_index_operation("drop", true);
544        record_search_index_operation("create", false);
545    }
546}