sync_engine/
metrics.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Metrics instrumentation for sync-engine.
5//!
6//! Uses the `metrics` crate for backend-agnostic metrics collection.
7//! The parent daemon is responsible for choosing the exporter (Prometheus, OTEL, etc.)
8//!
9//! # Metric Naming Convention
10//! - `sync_engine_` prefix for all metrics
11//! - `_total` suffix for counters
12//! - `_seconds` suffix for duration histograms
13//! - `_bytes` suffix for size histograms
14//!
15//! # Labels
16//! - `tier`: L1, L2, L3
17//! - `operation`: get, put, delete, batch
18//! - `status`: success, error, rejected
19
20use metrics::{counter, gauge, histogram};
21use std::time::{Duration, Instant};
22
23/// Record a successful sync operation
24pub fn record_operation(tier: &str, operation: &str, status: &str) {
25    counter!(
26        "sync_engine_operations_total",
27        "tier" => tier.to_string(),
28        "operation" => operation.to_string(),
29        "status" => status.to_string()
30    )
31    .increment(1);
32}
33
34/// Record operation latency
35pub fn record_latency(tier: &str, operation: &str, duration: Duration) {
36    histogram!(
37        "sync_engine_operation_seconds",
38        "tier" => tier.to_string(),
39        "operation" => operation.to_string()
40    )
41    .record(duration.as_secs_f64());
42}
43
44/// Record batch size
45pub fn record_batch_size(tier: &str, count: usize) {
46    histogram!(
47        "sync_engine_batch_size",
48        "tier" => tier.to_string()
49    )
50    .record(count as f64);
51}
52
53/// Record batch bytes
54pub fn record_batch_bytes(tier: &str, bytes: usize) {
55    histogram!(
56        "sync_engine_batch_bytes",
57        "tier" => tier.to_string()
58    )
59    .record(bytes as f64);
60}
61
62/// Set current L1 cache size in bytes
63pub fn set_l1_cache_bytes(bytes: usize) {
64    gauge!("sync_engine_l1_cache_bytes").set(bytes as f64);
65}
66
67/// Set current L1 cache item count
68pub fn set_l1_cache_items(count: usize) {
69    gauge!("sync_engine_l1_cache_items").set(count as f64);
70}
71
72/// Set WAL pending entries
73pub fn set_wal_entries(count: usize) {
74    gauge!("sync_engine_wal_entries").set(count as f64);
75}
76
77/// Set WAL file size in bytes
78pub fn set_wal_bytes(bytes: u64) {
79    gauge!("sync_engine_wal_bytes").set(bytes as f64);
80}
81
82/// Set cuckoo filter capacity utilization (0.0 - 1.0)
83pub fn set_cuckoo_filter_load(filter: &str, load: f64) {
84    gauge!(
85        "sync_engine_cuckoo_filter_load",
86        "filter" => filter.to_string()
87    )
88    .set(load);
89}
90
91/// Set cuckoo filter entry count
92pub fn set_cuckoo_filter_entries(filter: &str, count: usize) {
93    gauge!(
94        "sync_engine_cuckoo_filter_entries",
95        "filter" => filter.to_string()
96    )
97    .set(count as f64);
98}
99
100/// Record eviction event
101pub fn record_eviction(count: usize, bytes: usize) {
102    counter!("sync_engine_evictions_total").increment(count as u64);
103    counter!("sync_engine_evicted_bytes_total").increment(bytes as u64);
104}
105
106/// Set memory pressure level (0.0 - 1.0)
107pub fn set_memory_pressure(pressure: f64) {
108    gauge!("sync_engine_memory_pressure").set(pressure);
109}
110
111/// Set backpressure level (0 = None, 1 = Low, 2 = Medium, 3 = High, 4 = Critical)
112pub fn set_backpressure_level(level: u8) {
113    gauge!("sync_engine_backpressure_level").set(level as f64);
114}
115
116/// Record circuit breaker state change
117pub fn set_circuit_state(circuit: &str, state: u8) {
118    gauge!(
119        "sync_engine_circuit_breaker_state",
120        "circuit" => circuit.to_string()
121    )
122    .set(state as f64);
123}
124
125/// Record data corruption detection
126pub fn record_corruption(id: &str) {
127    counter!(
128        "sync_engine_corruption_detected_total",
129        "id" => id.to_string()
130    )
131    .increment(1);
132}
133
134/// Record circuit breaker call
135pub fn record_circuit_call(circuit: &str, outcome: &str) {
136    counter!(
137        "sync_engine_circuit_breaker_calls_total",
138        "circuit" => circuit.to_string(),
139        "outcome" => outcome.to_string()
140    )
141    .increment(1);
142}
143
144// ═══════════════════════════════════════════════════════════════════════════
145// ERROR TRACKING - Categorized error counters for alerting
146// ═══════════════════════════════════════════════════════════════════════════
147
148/// Record an error with category for alerting
149pub fn record_error(tier: &str, operation: &str, error_type: &str) {
150    counter!(
151        "sync_engine_errors_total",
152        "tier" => tier.to_string(),
153        "operation" => operation.to_string(),
154        "error_type" => error_type.to_string()
155    )
156    .increment(1);
157}
158
159/// Record a connection/backend error
160pub fn record_connection_error(backend: &str) {
161    counter!(
162        "sync_engine_connection_errors_total",
163        "backend" => backend.to_string()
164    )
165    .increment(1);
166}
167
168/// Record a timeout error
169pub fn record_timeout(tier: &str, operation: &str) {
170    counter!(
171        "sync_engine_timeouts_total",
172        "tier" => tier.to_string(),
173        "operation" => operation.to_string()
174    )
175    .increment(1);
176}
177
178// ═══════════════════════════════════════════════════════════════════════════
179// THROUGHPUT - Bytes and items processed
180// ═══════════════════════════════════════════════════════════════════════════
181
182/// Record bytes written to a tier
183pub fn record_bytes_written(tier: &str, bytes: usize) {
184    counter!(
185        "sync_engine_bytes_written_total",
186        "tier" => tier.to_string()
187    )
188    .increment(bytes as u64);
189}
190
191/// Record bytes read from a tier
192pub fn record_bytes_read(tier: &str, bytes: usize) {
193    counter!(
194        "sync_engine_bytes_read_total",
195        "tier" => tier.to_string()
196    )
197    .increment(bytes as u64);
198}
199
200/// Record items written
201pub fn record_items_written(tier: &str, count: usize) {
202    counter!(
203        "sync_engine_items_written_total",
204        "tier" => tier.to_string()
205    )
206    .increment(count as u64);
207}
208
209// ═══════════════════════════════════════════════════════════════════════════
210// QUEUE DEPTHS - Pending work
211// ═══════════════════════════════════════════════════════════════════════════
212
213/// Set batch queue depth (items pending flush)
214pub fn set_batch_queue_items(count: usize) {
215    gauge!("sync_engine_batch_queue_items").set(count as f64);
216}
217
218/// Set batch queue size in bytes
219pub fn set_batch_queue_bytes(bytes: usize) {
220    gauge!("sync_engine_batch_queue_bytes").set(bytes as f64);
221}
222
223// ═══════════════════════════════════════════════════════════════════════════
224// BACKEND HEALTH - Connection status
225// ═══════════════════════════════════════════════════════════════════════════
226
227/// Set backend health status (1 = healthy, 0 = unhealthy)
228pub fn set_backend_healthy(backend: &str, healthy: bool) {
229    gauge!(
230        "sync_engine_backend_healthy",
231        "backend" => backend.to_string()
232    )
233    .set(if healthy { 1.0 } else { 0.0 });
234}
235
236// ═══════════════════════════════════════════════════════════════════════════
237// CIRCUIT BREAKER - Resilience metrics
238// ═══════════════════════════════════════════════════════════════════════════
239
240/// Record circuit breaker call outcome
241pub fn record_circuit_breaker_call(circuit: &str, outcome: &str) {
242    counter!(
243        "sync_engine_circuit_breaker_calls_total",
244        "circuit" => circuit.to_string(),
245        "outcome" => outcome.to_string()
246    )
247    .increment(1);
248}
249
250// ═══════════════════════════════════════════════════════════════════════════
251// CUCKOO FILTER - Accuracy tracking
252// ═══════════════════════════════════════════════════════════════════════════
253
254/// Record cuckoo filter false positive
255pub fn record_cuckoo_false_positive(filter: &str) {
256    counter!(
257        "sync_engine_cuckoo_false_positive_total",
258        "filter" => filter.to_string()
259    )
260    .increment(1);
261}
262
263/// Record cuckoo filter check
264pub fn record_cuckoo_check(filter: &str, result: &str) {
265    counter!(
266        "sync_engine_cuckoo_checks_total",
267        "filter" => filter.to_string(),
268        "result" => result.to_string()
269    )
270    .increment(1);
271}
272
273// ═══════════════════════════════════════════════════════════════════════════
274// STARTUP - Timing for cold start monitoring
275// ═══════════════════════════════════════════════════════════════════════════
276
277/// Record startup phase duration
278pub fn record_startup_phase(phase: &str, duration: Duration) {
279    histogram!(
280        "sync_engine_startup_seconds",
281        "phase" => phase.to_string()
282    )
283    .record(duration.as_secs_f64());
284}
285
286/// Record total startup time
287pub fn record_startup_total(duration: Duration) {
288    histogram!("sync_engine_startup_total_seconds").record(duration.as_secs_f64());
289}
290
291// ═══════════════════════════════════════════════════════════════════════════
292// BATCH FLUSH - Detailed flush metrics
293// ═══════════════════════════════════════════════════════════════════════════
294
295/// Record batch flush duration
296pub fn record_flush_duration(duration: Duration) {
297    histogram!("sync_engine_flush_seconds").record(duration.as_secs_f64());
298}
299
300/// Set engine state (for monitoring state machine transitions)
301pub fn set_engine_state(state: &str) {
302    // Use a simple counter to track state transitions
303    counter!(
304        "sync_engine_state_transitions_total",
305        "state" => state.to_string()
306    )
307    .increment(1);
308}
309
310/// Record WAL drain operation
311pub fn record_wal_drain(count: usize, success: bool) {
312    let status = if success { "success" } else { "failure" };
313    counter!(
314        "sync_engine_wal_drain_total",
315        "status" => status
316    )
317    .increment(1);
318    
319    if success {
320        counter!("sync_engine_wal_drained_items_total").increment(count as u64);
321    }
322}
323
324/// Record merkle tree operation
325pub fn record_merkle_operation(store: &str, operation: &str, success: bool) {
326    let status = if success { "success" } else { "failure" };
327    counter!(
328        "sync_engine_merkle_operations_total",
329        "store" => store.to_string(),
330        "operation" => operation.to_string(),
331        "status" => status
332    )
333    .increment(1);
334}
335
336// ═══════════════════════════════════════════════════════════════════════════
337// CDC STREAM - Change Data Capture metrics
338// ═══════════════════════════════════════════════════════════════════════════
339
340/// Record CDC entries emitted to stream
341pub fn record_cdc_entries(op: &str, count: usize) {
342    counter!(
343        "sync_engine_cdc_entries_total",
344        "op" => op.to_string()
345    )
346    .increment(count as u64);
347}
348
349// ═══════════════════════════════════════════════════════════════════════════
350// SEARCH - RediSearch and SQL search metrics
351// ═══════════════════════════════════════════════════════════════════════════
352
353/// Record a search query execution
354pub fn record_search_query(backend: &str, status: &str) {
355    counter!(
356        "sync_engine_search_queries_total",
357        "backend" => backend.to_string(),
358        "status" => status.to_string()
359    )
360    .increment(1);
361}
362
363/// Record search query latency
364pub fn record_search_latency(backend: &str, duration: Duration) {
365    histogram!(
366        "sync_engine_search_seconds",
367        "backend" => backend.to_string()
368    )
369    .record(duration.as_secs_f64());
370}
371
372/// Record search result count
373pub fn record_search_results(count: usize) {
374    histogram!("sync_engine_search_results").record(count as f64);
375}
376
377/// Record search cache hit/miss
378pub fn record_search_cache(hit: bool) {
379    let outcome = if hit { "hit" } else { "miss" };
380    counter!(
381        "sync_engine_search_cache_total",
382        "outcome" => outcome
383    )
384    .increment(1);
385}
386
387/// Set search cache stats gauge
388pub fn set_search_cache_stats(entries: usize, hit_rate: f64) {
389    gauge!("sync_engine_search_cache_entries").set(entries as f64);
390    gauge!("sync_engine_search_cache_hit_rate").set(hit_rate);
391}
392
393/// Record index creation/drop
394pub fn record_search_index_operation(operation: &str, success: bool) {
395    let status = if success { "success" } else { "failure" };
396    counter!(
397        "sync_engine_search_index_operations_total",
398        "operation" => operation.to_string(),
399        "status" => status
400    )
401    .increment(1);
402}
403
404/// A timing guard that records latency on drop
405pub struct LatencyTimer {
406    tier: &'static str,
407    operation: &'static str,
408    start: Instant,
409}
410
411impl LatencyTimer {
412    /// Start a new latency timer
413    pub fn new(tier: &'static str, operation: &'static str) -> Self {
414        Self {
415            tier,
416            operation,
417            start: Instant::now(),
418        }
419    }
420}
421
422impl Drop for LatencyTimer {
423    fn drop(&mut self) {
424        record_latency(self.tier, self.operation, self.start.elapsed());
425    }
426}
427
428/// Convenience macro for timing operations
429#[macro_export]
430macro_rules! time_operation {
431    ($tier:expr, $op:expr) => {
432        $crate::metrics::LatencyTimer::new($tier, $op)
433    };
434}
435
436#[cfg(test)]
437mod tests {
438    use super::*;
439    
440    // Note: These tests verify the API compiles and doesn't panic.
441    // In production, you'd use metrics-util's Recorder for assertions.
442    
443    #[test]
444    fn test_record_operation() {
445        record_operation("L1", "get", "success");
446        record_operation("L2", "put", "error");
447        record_operation("L3", "delete", "success");
448    }
449    
450    #[test]
451    fn test_record_latency() {
452        record_latency("L1", "get", Duration::from_micros(100));
453        record_latency("L2", "put", Duration::from_millis(5));
454        record_latency("L3", "batch", Duration::from_millis(50));
455    }
456    
457    #[test]
458    fn test_record_batch() {
459        record_batch_size("L2", 100);
460        record_batch_bytes("L2", 1024 * 50);
461    }
462    
463    #[test]
464    fn test_gauges() {
465        set_l1_cache_bytes(1024 * 1024);
466        set_l1_cache_items(5000);
467        set_wal_entries(42);
468        set_wal_bytes(1024 * 100);
469        set_memory_pressure(0.75);
470        set_backpressure_level(2);
471    }
472    
473    #[test]
474    fn test_cuckoo_filter_metrics() {
475        set_cuckoo_filter_load("L2", 0.65);
476        set_cuckoo_filter_load("L3", 0.45);
477        set_cuckoo_filter_entries("L2", 65000);
478        set_cuckoo_filter_entries("L3", 45000);
479    }
480    
481    #[test]
482    fn test_eviction_metrics() {
483        record_eviction(10, 1024 * 50);
484    }
485    
486    #[test]
487    fn test_circuit_breaker_metrics() {
488        set_circuit_state("redis", 0);
489        set_circuit_state("mysql", 2);
490        record_circuit_call("redis", "success");
491        record_circuit_call("mysql", "rejected");
492    }
493    
494    #[test]
495    fn test_wal_drain_metrics() {
496        record_wal_drain(50, true);
497        record_wal_drain(0, false);
498    }
499    
500    #[test]
501    fn test_merkle_metrics() {
502        record_merkle_operation("sql", "insert", true);
503        record_merkle_operation("redis", "batch", false);
504    }
505    
506    #[test]
507    fn test_latency_timer() {
508        {
509            let _timer = LatencyTimer::new("L1", "get");
510            // Simulate some work
511            std::thread::sleep(Duration::from_micros(10));
512        }
513        // Timer recorded on drop
514    }
515    
516    #[test]
517    fn test_engine_state_tracking() {
518        set_engine_state("Created");
519        set_engine_state("Connecting");
520        set_engine_state("Running");
521    }
522    
523    #[test]
524    fn test_search_metrics() {
525        // Search queries
526        record_search_query("redis", "success");
527        record_search_query("sql", "success");
528        record_search_query("redis", "error");
529        
530        // Search latency
531        record_search_latency("redis", Duration::from_micros(500));
532        record_search_latency("sql", Duration::from_millis(5));
533        record_search_latency("cache", Duration::from_micros(10));
534        
535        // Search results
536        record_search_results(42);
537        record_search_results(0);
538        
539        // Cache stats
540        record_search_cache(true);
541        record_search_cache(false);
542        set_search_cache_stats(100, 0.85);
543        
544        // Index operations
545        record_search_index_operation("create", true);
546        record_search_index_operation("drop", true);
547        record_search_index_operation("create", false);
548    }
549}