1use metrics::{counter, gauge, histogram};
21use std::time::{Duration, Instant};
22
23pub fn record_operation(tier: &str, operation: &str, status: &str) {
25 counter!(
26 "sync_engine_operations_total",
27 "tier" => tier.to_string(),
28 "operation" => operation.to_string(),
29 "status" => status.to_string()
30 )
31 .increment(1);
32}
33
34pub fn record_latency(tier: &str, operation: &str, duration: Duration) {
36 histogram!(
37 "sync_engine_operation_seconds",
38 "tier" => tier.to_string(),
39 "operation" => operation.to_string()
40 )
41 .record(duration.as_secs_f64());
42}
43
44pub fn record_batch_size(tier: &str, count: usize) {
46 histogram!(
47 "sync_engine_batch_size",
48 "tier" => tier.to_string()
49 )
50 .record(count as f64);
51}
52
53pub fn record_batch_bytes(tier: &str, bytes: usize) {
55 histogram!(
56 "sync_engine_batch_bytes",
57 "tier" => tier.to_string()
58 )
59 .record(bytes as f64);
60}
61
62pub fn set_l1_cache_bytes(bytes: usize) {
64 gauge!("sync_engine_l1_cache_bytes").set(bytes as f64);
65}
66
67pub fn set_l1_cache_items(count: usize) {
69 gauge!("sync_engine_l1_cache_items").set(count as f64);
70}
71
72pub fn set_wal_entries(count: usize) {
74 gauge!("sync_engine_wal_entries").set(count as f64);
75}
76
77pub fn set_wal_bytes(bytes: u64) {
79 gauge!("sync_engine_wal_bytes").set(bytes as f64);
80}
81
82pub fn set_cuckoo_filter_load(filter: &str, load: f64) {
84 gauge!(
85 "sync_engine_cuckoo_filter_load",
86 "filter" => filter.to_string()
87 )
88 .set(load);
89}
90
91pub fn set_cuckoo_filter_entries(filter: &str, count: usize) {
93 gauge!(
94 "sync_engine_cuckoo_filter_entries",
95 "filter" => filter.to_string()
96 )
97 .set(count as f64);
98}
99
100pub fn record_eviction(count: usize, bytes: usize) {
102 counter!("sync_engine_evictions_total").increment(count as u64);
103 counter!("sync_engine_evicted_bytes_total").increment(bytes as u64);
104}
105
106pub fn set_memory_pressure(pressure: f64) {
108 gauge!("sync_engine_memory_pressure").set(pressure);
109}
110
111pub fn set_backpressure_level(level: u8) {
113 gauge!("sync_engine_backpressure_level").set(level as f64);
114}
115
116pub fn set_circuit_state(circuit: &str, state: u8) {
118 gauge!(
119 "sync_engine_circuit_breaker_state",
120 "circuit" => circuit.to_string()
121 )
122 .set(state as f64);
123}
124
125pub fn record_corruption(id: &str) {
127 counter!(
128 "sync_engine_corruption_detected_total",
129 "id" => id.to_string()
130 )
131 .increment(1);
132}
133
134pub fn record_circuit_call(circuit: &str, outcome: &str) {
136 counter!(
137 "sync_engine_circuit_breaker_calls_total",
138 "circuit" => circuit.to_string(),
139 "outcome" => outcome.to_string()
140 )
141 .increment(1);
142}
143
144pub fn record_error(tier: &str, operation: &str, error_type: &str) {
150 counter!(
151 "sync_engine_errors_total",
152 "tier" => tier.to_string(),
153 "operation" => operation.to_string(),
154 "error_type" => error_type.to_string()
155 )
156 .increment(1);
157}
158
159pub fn record_connection_error(backend: &str) {
161 counter!(
162 "sync_engine_connection_errors_total",
163 "backend" => backend.to_string()
164 )
165 .increment(1);
166}
167
168pub fn record_timeout(tier: &str, operation: &str) {
170 counter!(
171 "sync_engine_timeouts_total",
172 "tier" => tier.to_string(),
173 "operation" => operation.to_string()
174 )
175 .increment(1);
176}
177
178pub fn record_bytes_written(tier: &str, bytes: usize) {
184 counter!(
185 "sync_engine_bytes_written_total",
186 "tier" => tier.to_string()
187 )
188 .increment(bytes as u64);
189}
190
191pub fn record_bytes_read(tier: &str, bytes: usize) {
193 counter!(
194 "sync_engine_bytes_read_total",
195 "tier" => tier.to_string()
196 )
197 .increment(bytes as u64);
198}
199
200pub fn record_items_written(tier: &str, count: usize) {
202 counter!(
203 "sync_engine_items_written_total",
204 "tier" => tier.to_string()
205 )
206 .increment(count as u64);
207}
208
209pub fn set_batch_queue_items(count: usize) {
215 gauge!("sync_engine_batch_queue_items").set(count as f64);
216}
217
218pub fn set_batch_queue_bytes(bytes: usize) {
220 gauge!("sync_engine_batch_queue_bytes").set(bytes as f64);
221}
222
223pub fn set_backend_healthy(backend: &str, healthy: bool) {
229 gauge!(
230 "sync_engine_backend_healthy",
231 "backend" => backend.to_string()
232 )
233 .set(if healthy { 1.0 } else { 0.0 });
234}
235
236pub fn record_circuit_breaker_call(circuit: &str, outcome: &str) {
242 counter!(
243 "sync_engine_circuit_breaker_calls_total",
244 "circuit" => circuit.to_string(),
245 "outcome" => outcome.to_string()
246 )
247 .increment(1);
248}
249
250pub fn record_cuckoo_false_positive(filter: &str) {
256 counter!(
257 "sync_engine_cuckoo_false_positive_total",
258 "filter" => filter.to_string()
259 )
260 .increment(1);
261}
262
263pub fn record_cuckoo_check(filter: &str, result: &str) {
265 counter!(
266 "sync_engine_cuckoo_checks_total",
267 "filter" => filter.to_string(),
268 "result" => result.to_string()
269 )
270 .increment(1);
271}
272
273pub fn record_startup_phase(phase: &str, duration: Duration) {
279 histogram!(
280 "sync_engine_startup_seconds",
281 "phase" => phase.to_string()
282 )
283 .record(duration.as_secs_f64());
284}
285
286pub fn record_startup_total(duration: Duration) {
288 histogram!("sync_engine_startup_total_seconds").record(duration.as_secs_f64());
289}
290
291pub fn record_flush_duration(duration: Duration) {
297 histogram!("sync_engine_flush_seconds").record(duration.as_secs_f64());
298}
299
300pub fn set_engine_state(state: &str) {
302 counter!(
304 "sync_engine_state_transitions_total",
305 "state" => state.to_string()
306 )
307 .increment(1);
308}
309
310pub fn record_wal_drain(count: usize, success: bool) {
312 let status = if success { "success" } else { "failure" };
313 counter!(
314 "sync_engine_wal_drain_total",
315 "status" => status
316 )
317 .increment(1);
318
319 if success {
320 counter!("sync_engine_wal_drained_items_total").increment(count as u64);
321 }
322}
323
324pub fn record_merkle_operation(store: &str, operation: &str, success: bool) {
326 let status = if success { "success" } else { "failure" };
327 counter!(
328 "sync_engine_merkle_operations_total",
329 "store" => store.to_string(),
330 "operation" => operation.to_string(),
331 "status" => status
332 )
333 .increment(1);
334}
335
336pub fn record_cdc_entries(op: &str, count: usize) {
342 counter!(
343 "sync_engine_cdc_entries_total",
344 "op" => op.to_string()
345 )
346 .increment(count as u64);
347}
348
349pub fn record_search_query(backend: &str, status: &str) {
355 counter!(
356 "sync_engine_search_queries_total",
357 "backend" => backend.to_string(),
358 "status" => status.to_string()
359 )
360 .increment(1);
361}
362
363pub fn record_search_latency(backend: &str, duration: Duration) {
365 histogram!(
366 "sync_engine_search_seconds",
367 "backend" => backend.to_string()
368 )
369 .record(duration.as_secs_f64());
370}
371
372pub fn record_search_results(count: usize) {
374 histogram!("sync_engine_search_results").record(count as f64);
375}
376
377pub fn record_search_cache(hit: bool) {
379 let outcome = if hit { "hit" } else { "miss" };
380 counter!(
381 "sync_engine_search_cache_total",
382 "outcome" => outcome
383 )
384 .increment(1);
385}
386
387pub fn set_search_cache_stats(entries: usize, hit_rate: f64) {
389 gauge!("sync_engine_search_cache_entries").set(entries as f64);
390 gauge!("sync_engine_search_cache_hit_rate").set(hit_rate);
391}
392
393pub fn record_search_index_operation(operation: &str, success: bool) {
395 let status = if success { "success" } else { "failure" };
396 counter!(
397 "sync_engine_search_index_operations_total",
398 "operation" => operation.to_string(),
399 "status" => status
400 )
401 .increment(1);
402}
403
404pub struct LatencyTimer {
406 tier: &'static str,
407 operation: &'static str,
408 start: Instant,
409}
410
411impl LatencyTimer {
412 pub fn new(tier: &'static str, operation: &'static str) -> Self {
414 Self {
415 tier,
416 operation,
417 start: Instant::now(),
418 }
419 }
420}
421
422impl Drop for LatencyTimer {
423 fn drop(&mut self) {
424 record_latency(self.tier, self.operation, self.start.elapsed());
425 }
426}
427
428#[macro_export]
430macro_rules! time_operation {
431 ($tier:expr, $op:expr) => {
432 $crate::metrics::LatencyTimer::new($tier, $op)
433 };
434}
435
436#[cfg(test)]
437mod tests {
438 use super::*;
439
440 #[test]
444 fn test_record_operation() {
445 record_operation("L1", "get", "success");
446 record_operation("L2", "put", "error");
447 record_operation("L3", "delete", "success");
448 }
449
450 #[test]
451 fn test_record_latency() {
452 record_latency("L1", "get", Duration::from_micros(100));
453 record_latency("L2", "put", Duration::from_millis(5));
454 record_latency("L3", "batch", Duration::from_millis(50));
455 }
456
457 #[test]
458 fn test_record_batch() {
459 record_batch_size("L2", 100);
460 record_batch_bytes("L2", 1024 * 50);
461 }
462
463 #[test]
464 fn test_gauges() {
465 set_l1_cache_bytes(1024 * 1024);
466 set_l1_cache_items(5000);
467 set_wal_entries(42);
468 set_wal_bytes(1024 * 100);
469 set_memory_pressure(0.75);
470 set_backpressure_level(2);
471 }
472
473 #[test]
474 fn test_cuckoo_filter_metrics() {
475 set_cuckoo_filter_load("L2", 0.65);
476 set_cuckoo_filter_load("L3", 0.45);
477 set_cuckoo_filter_entries("L2", 65000);
478 set_cuckoo_filter_entries("L3", 45000);
479 }
480
481 #[test]
482 fn test_eviction_metrics() {
483 record_eviction(10, 1024 * 50);
484 }
485
486 #[test]
487 fn test_circuit_breaker_metrics() {
488 set_circuit_state("redis", 0);
489 set_circuit_state("mysql", 2);
490 record_circuit_call("redis", "success");
491 record_circuit_call("mysql", "rejected");
492 }
493
494 #[test]
495 fn test_wal_drain_metrics() {
496 record_wal_drain(50, true);
497 record_wal_drain(0, false);
498 }
499
500 #[test]
501 fn test_merkle_metrics() {
502 record_merkle_operation("sql", "insert", true);
503 record_merkle_operation("redis", "batch", false);
504 }
505
506 #[test]
507 fn test_latency_timer() {
508 {
509 let _timer = LatencyTimer::new("L1", "get");
510 std::thread::sleep(Duration::from_micros(10));
512 }
513 }
515
516 #[test]
517 fn test_engine_state_tracking() {
518 set_engine_state("Created");
519 set_engine_state("Connecting");
520 set_engine_state("Running");
521 }
522
523 #[test]
524 fn test_search_metrics() {
525 record_search_query("redis", "success");
527 record_search_query("sql", "success");
528 record_search_query("redis", "error");
529
530 record_search_latency("redis", Duration::from_micros(500));
532 record_search_latency("sql", Duration::from_millis(5));
533 record_search_latency("cache", Duration::from_micros(10));
534
535 record_search_results(42);
537 record_search_results(0);
538
539 record_search_cache(true);
541 record_search_cache(false);
542 set_search_cache_stats(100, 0.85);
543
544 record_search_index_operation("create", true);
546 record_search_index_operation("drop", true);
547 record_search_index_operation("create", false);
548 }
549}