1use metrics::{counter, gauge, histogram};
18use std::time::{Duration, Instant};
19
20pub fn record_operation(tier: &str, operation: &str, status: &str) {
22 counter!(
23 "sync_engine_operations_total",
24 "tier" => tier.to_string(),
25 "operation" => operation.to_string(),
26 "status" => status.to_string()
27 )
28 .increment(1);
29}
30
31pub fn record_latency(tier: &str, operation: &str, duration: Duration) {
33 histogram!(
34 "sync_engine_operation_seconds",
35 "tier" => tier.to_string(),
36 "operation" => operation.to_string()
37 )
38 .record(duration.as_secs_f64());
39}
40
41pub fn record_batch_size(tier: &str, count: usize) {
43 histogram!(
44 "sync_engine_batch_size",
45 "tier" => tier.to_string()
46 )
47 .record(count as f64);
48}
49
50pub fn record_batch_bytes(tier: &str, bytes: usize) {
52 histogram!(
53 "sync_engine_batch_bytes",
54 "tier" => tier.to_string()
55 )
56 .record(bytes as f64);
57}
58
59pub fn set_l1_cache_bytes(bytes: usize) {
61 gauge!("sync_engine_l1_cache_bytes").set(bytes as f64);
62}
63
64pub fn set_l1_cache_items(count: usize) {
66 gauge!("sync_engine_l1_cache_items").set(count as f64);
67}
68
69pub fn set_wal_entries(count: usize) {
71 gauge!("sync_engine_wal_entries").set(count as f64);
72}
73
74pub fn set_wal_bytes(bytes: u64) {
76 gauge!("sync_engine_wal_bytes").set(bytes as f64);
77}
78
79pub fn set_cuckoo_filter_load(filter: &str, load: f64) {
81 gauge!(
82 "sync_engine_cuckoo_filter_load",
83 "filter" => filter.to_string()
84 )
85 .set(load);
86}
87
88pub fn set_cuckoo_filter_entries(filter: &str, count: usize) {
90 gauge!(
91 "sync_engine_cuckoo_filter_entries",
92 "filter" => filter.to_string()
93 )
94 .set(count as f64);
95}
96
97pub fn record_eviction(count: usize, bytes: usize) {
99 counter!("sync_engine_evictions_total").increment(count as u64);
100 counter!("sync_engine_evicted_bytes_total").increment(bytes as u64);
101}
102
103pub fn set_memory_pressure(pressure: f64) {
105 gauge!("sync_engine_memory_pressure").set(pressure);
106}
107
108pub fn set_backpressure_level(level: u8) {
110 gauge!("sync_engine_backpressure_level").set(level as f64);
111}
112
113pub fn set_circuit_state(circuit: &str, state: u8) {
115 gauge!(
116 "sync_engine_circuit_breaker_state",
117 "circuit" => circuit.to_string()
118 )
119 .set(state as f64);
120}
121
122pub fn record_corruption(id: &str) {
124 counter!(
125 "sync_engine_corruption_detected_total",
126 "id" => id.to_string()
127 )
128 .increment(1);
129}
130
131pub fn record_circuit_call(circuit: &str, outcome: &str) {
133 counter!(
134 "sync_engine_circuit_breaker_calls_total",
135 "circuit" => circuit.to_string(),
136 "outcome" => outcome.to_string()
137 )
138 .increment(1);
139}
140
141pub fn record_error(tier: &str, operation: &str, error_type: &str) {
147 counter!(
148 "sync_engine_errors_total",
149 "tier" => tier.to_string(),
150 "operation" => operation.to_string(),
151 "error_type" => error_type.to_string()
152 )
153 .increment(1);
154}
155
156pub fn record_connection_error(backend: &str) {
158 counter!(
159 "sync_engine_connection_errors_total",
160 "backend" => backend.to_string()
161 )
162 .increment(1);
163}
164
165pub fn record_timeout(tier: &str, operation: &str) {
167 counter!(
168 "sync_engine_timeouts_total",
169 "tier" => tier.to_string(),
170 "operation" => operation.to_string()
171 )
172 .increment(1);
173}
174
175pub fn record_bytes_written(tier: &str, bytes: usize) {
181 counter!(
182 "sync_engine_bytes_written_total",
183 "tier" => tier.to_string()
184 )
185 .increment(bytes as u64);
186}
187
188pub fn record_bytes_read(tier: &str, bytes: usize) {
190 counter!(
191 "sync_engine_bytes_read_total",
192 "tier" => tier.to_string()
193 )
194 .increment(bytes as u64);
195}
196
197pub fn record_items_written(tier: &str, count: usize) {
199 counter!(
200 "sync_engine_items_written_total",
201 "tier" => tier.to_string()
202 )
203 .increment(count as u64);
204}
205
206pub fn set_batch_queue_items(count: usize) {
212 gauge!("sync_engine_batch_queue_items").set(count as f64);
213}
214
215pub fn set_batch_queue_bytes(bytes: usize) {
217 gauge!("sync_engine_batch_queue_bytes").set(bytes as f64);
218}
219
220pub fn set_backend_healthy(backend: &str, healthy: bool) {
226 gauge!(
227 "sync_engine_backend_healthy",
228 "backend" => backend.to_string()
229 )
230 .set(if healthy { 1.0 } else { 0.0 });
231}
232
233pub fn record_circuit_breaker_call(circuit: &str, outcome: &str) {
239 counter!(
240 "sync_engine_circuit_breaker_calls_total",
241 "circuit" => circuit.to_string(),
242 "outcome" => outcome.to_string()
243 )
244 .increment(1);
245}
246
247pub fn record_cuckoo_false_positive(filter: &str) {
253 counter!(
254 "sync_engine_cuckoo_false_positive_total",
255 "filter" => filter.to_string()
256 )
257 .increment(1);
258}
259
260pub fn record_cuckoo_check(filter: &str, result: &str) {
262 counter!(
263 "sync_engine_cuckoo_checks_total",
264 "filter" => filter.to_string(),
265 "result" => result.to_string()
266 )
267 .increment(1);
268}
269
270pub fn record_startup_phase(phase: &str, duration: Duration) {
276 histogram!(
277 "sync_engine_startup_seconds",
278 "phase" => phase.to_string()
279 )
280 .record(duration.as_secs_f64());
281}
282
283pub fn record_startup_total(duration: Duration) {
285 histogram!("sync_engine_startup_total_seconds").record(duration.as_secs_f64());
286}
287
288pub fn record_flush_duration(duration: Duration) {
294 histogram!("sync_engine_flush_seconds").record(duration.as_secs_f64());
295}
296
297pub fn set_engine_state(state: &str) {
299 counter!(
301 "sync_engine_state_transitions_total",
302 "state" => state.to_string()
303 )
304 .increment(1);
305}
306
307pub fn record_wal_drain(count: usize, success: bool) {
309 let status = if success { "success" } else { "failure" };
310 counter!(
311 "sync_engine_wal_drain_total",
312 "status" => status
313 )
314 .increment(1);
315
316 if success {
317 counter!("sync_engine_wal_drained_items_total").increment(count as u64);
318 }
319}
320
321pub fn record_merkle_operation(store: &str, operation: &str, success: bool) {
323 let status = if success { "success" } else { "failure" };
324 counter!(
325 "sync_engine_merkle_operations_total",
326 "store" => store.to_string(),
327 "operation" => operation.to_string(),
328 "status" => status
329 )
330 .increment(1);
331}
332
333pub fn record_cdc_entries(op: &str, count: usize) {
339 counter!(
340 "sync_engine_cdc_entries_total",
341 "op" => op.to_string()
342 )
343 .increment(count as u64);
344}
345
346pub fn record_search_query(backend: &str, status: &str) {
352 counter!(
353 "sync_engine_search_queries_total",
354 "backend" => backend.to_string(),
355 "status" => status.to_string()
356 )
357 .increment(1);
358}
359
360pub fn record_search_latency(backend: &str, duration: Duration) {
362 histogram!(
363 "sync_engine_search_seconds",
364 "backend" => backend.to_string()
365 )
366 .record(duration.as_secs_f64());
367}
368
369pub fn record_search_results(count: usize) {
371 histogram!("sync_engine_search_results").record(count as f64);
372}
373
374pub fn record_search_cache(hit: bool) {
376 let outcome = if hit { "hit" } else { "miss" };
377 counter!(
378 "sync_engine_search_cache_total",
379 "outcome" => outcome
380 )
381 .increment(1);
382}
383
384pub fn set_search_cache_stats(entries: usize, hit_rate: f64) {
386 gauge!("sync_engine_search_cache_entries").set(entries as f64);
387 gauge!("sync_engine_search_cache_hit_rate").set(hit_rate);
388}
389
390pub fn record_search_index_operation(operation: &str, success: bool) {
392 let status = if success { "success" } else { "failure" };
393 counter!(
394 "sync_engine_search_index_operations_total",
395 "operation" => operation.to_string(),
396 "status" => status
397 )
398 .increment(1);
399}
400
401pub struct LatencyTimer {
403 tier: &'static str,
404 operation: &'static str,
405 start: Instant,
406}
407
408impl LatencyTimer {
409 pub fn new(tier: &'static str, operation: &'static str) -> Self {
411 Self {
412 tier,
413 operation,
414 start: Instant::now(),
415 }
416 }
417}
418
419impl Drop for LatencyTimer {
420 fn drop(&mut self) {
421 record_latency(self.tier, self.operation, self.start.elapsed());
422 }
423}
424
425#[macro_export]
427macro_rules! time_operation {
428 ($tier:expr, $op:expr) => {
429 $crate::metrics::LatencyTimer::new($tier, $op)
430 };
431}
432
433#[cfg(test)]
434mod tests {
435 use super::*;
436
437 #[test]
441 fn test_record_operation() {
442 record_operation("L1", "get", "success");
443 record_operation("L2", "put", "error");
444 record_operation("L3", "delete", "success");
445 }
446
447 #[test]
448 fn test_record_latency() {
449 record_latency("L1", "get", Duration::from_micros(100));
450 record_latency("L2", "put", Duration::from_millis(5));
451 record_latency("L3", "batch", Duration::from_millis(50));
452 }
453
454 #[test]
455 fn test_record_batch() {
456 record_batch_size("L2", 100);
457 record_batch_bytes("L2", 1024 * 50);
458 }
459
460 #[test]
461 fn test_gauges() {
462 set_l1_cache_bytes(1024 * 1024);
463 set_l1_cache_items(5000);
464 set_wal_entries(42);
465 set_wal_bytes(1024 * 100);
466 set_memory_pressure(0.75);
467 set_backpressure_level(2);
468 }
469
470 #[test]
471 fn test_cuckoo_filter_metrics() {
472 set_cuckoo_filter_load("L2", 0.65);
473 set_cuckoo_filter_load("L3", 0.45);
474 set_cuckoo_filter_entries("L2", 65000);
475 set_cuckoo_filter_entries("L3", 45000);
476 }
477
478 #[test]
479 fn test_eviction_metrics() {
480 record_eviction(10, 1024 * 50);
481 }
482
483 #[test]
484 fn test_circuit_breaker_metrics() {
485 set_circuit_state("redis", 0);
486 set_circuit_state("mysql", 2);
487 record_circuit_call("redis", "success");
488 record_circuit_call("mysql", "rejected");
489 }
490
491 #[test]
492 fn test_wal_drain_metrics() {
493 record_wal_drain(50, true);
494 record_wal_drain(0, false);
495 }
496
497 #[test]
498 fn test_merkle_metrics() {
499 record_merkle_operation("sql", "insert", true);
500 record_merkle_operation("redis", "batch", false);
501 }
502
503 #[test]
504 fn test_latency_timer() {
505 {
506 let _timer = LatencyTimer::new("L1", "get");
507 std::thread::sleep(Duration::from_micros(10));
509 }
510 }
512
513 #[test]
514 fn test_engine_state_tracking() {
515 set_engine_state("Created");
516 set_engine_state("Connecting");
517 set_engine_state("Running");
518 }
519
520 #[test]
521 fn test_search_metrics() {
522 record_search_query("redis", "success");
524 record_search_query("sql", "success");
525 record_search_query("redis", "error");
526
527 record_search_latency("redis", Duration::from_micros(500));
529 record_search_latency("sql", Duration::from_millis(5));
530 record_search_latency("cache", Duration::from_micros(10));
531
532 record_search_results(42);
534 record_search_results(0);
535
536 record_search_cache(true);
538 record_search_cache(false);
539 set_search_cache_stats(100, 0.85);
540
541 record_search_index_operation("create", true);
543 record_search_index_operation("drop", true);
544 record_search_index_operation("create", false);
545 }
546}