absurder_sql/storage/
observability.rs

1use crate::types::DatabaseError;
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::sync::Arc;
4
5#[cfg(not(target_arch = "wasm32"))]
6use std::time::Instant;
7
8/// Comprehensive metrics for observability
9#[derive(Debug, Clone)]
10pub struct StorageMetrics {
11    pub dirty_count: usize,
12    pub dirty_bytes: usize,
13    pub sync_count: u64,
14    pub timer_sync_count: u64,
15    pub debounce_sync_count: u64,
16    pub error_count: u64,
17    pub checksum_failures: u64,
18    pub last_sync_duration_ms: u64,
19    pub throughput_blocks_per_sec: f64,
20    pub throughput_bytes_per_sec: f64,
21    pub error_rate: f64,
22}
23
24impl Default for StorageMetrics {
25    fn default() -> Self {
26        Self {
27            dirty_count: 0,
28            dirty_bytes: 0,
29            sync_count: 0,
30            timer_sync_count: 0,
31            debounce_sync_count: 0,
32            error_count: 0,
33            checksum_failures: 0,
34            last_sync_duration_ms: 0,
35            throughput_blocks_per_sec: 0.0,
36            throughput_bytes_per_sec: 0.0,
37            error_rate: 0.0,
38        }
39    }
40}
41
42/// Event callback types
43pub type SyncStartCallback = Box<dyn Fn(usize, usize) + Send + Sync>;
44pub type SyncSuccessCallback = Box<dyn Fn(u64, usize) + Send + Sync>;
45pub type SyncFailureCallback = Box<dyn Fn(&DatabaseError) + Send + Sync>;
46pub type ErrorCallback = Box<dyn Fn(&DatabaseError) + Send + Sync>;
47pub type BackpressureCallback = Box<dyn Fn(&str, &str) + Send + Sync>;
48
49/// WASM-specific callback types (simpler, no Send/Sync requirements)
50#[cfg(target_arch = "wasm32")]
51pub type WasmSyncSuccessCallback = Box<dyn Fn(u64, usize)>;
52
53/// Observability manager for tracking metrics and events
54pub struct ObservabilityManager {
55    // Atomic counters for thread-safe metrics
56    pub(super) error_count: Arc<AtomicU64>,
57    pub(super) checksum_failures: Arc<AtomicU64>,
58    pub(super) sync_count: Arc<AtomicU64>,
59    
60    // Event callbacks
61    pub(super) sync_start_callback: Option<SyncStartCallback>,
62    pub(super) sync_success_callback: Option<SyncSuccessCallback>,
63    pub(super) sync_failure_callback: Option<SyncFailureCallback>,
64    pub(super) error_callback: Option<ErrorCallback>,
65    pub(super) backpressure_callback: Option<BackpressureCallback>,
66    
67    // WASM-specific callbacks
68    #[cfg(target_arch = "wasm32")]
69    pub(super) wasm_sync_success_callback: Option<WasmSyncSuccessCallback>,
70    
71    // Throughput tracking
72    #[cfg(not(target_arch = "wasm32"))]
73    pub(super) last_sync_start: Option<Instant>,
74    pub(super) last_sync_blocks: usize,
75    pub(super) last_sync_bytes: usize,
76}
77
78impl Default for ObservabilityManager {
79    fn default() -> Self {
80        Self {
81            error_count: Arc::new(AtomicU64::new(0)),
82            checksum_failures: Arc::new(AtomicU64::new(0)),
83            sync_count: Arc::new(AtomicU64::new(0)),
84            sync_start_callback: None,
85            sync_success_callback: None,
86            sync_failure_callback: None,
87            error_callback: None,
88            backpressure_callback: None,
89            #[cfg(target_arch = "wasm32")]
90            wasm_sync_success_callback: None,
91            #[cfg(not(target_arch = "wasm32"))]
92            last_sync_start: None,
93            last_sync_blocks: 0,
94            last_sync_bytes: 0,
95        }
96    }
97}
98
99impl ObservabilityManager {
100    pub fn new() -> Self {
101        Self::default()
102    }
103    
104    /// Record an error occurrence
105    pub fn record_error(&self, error: &DatabaseError) {
106        self.error_count.fetch_add(1, Ordering::SeqCst);
107        
108        if let Some(ref callback) = self.error_callback {
109            callback(error);
110        }
111    }
112    
113    /// Record a checksum failure
114    pub fn record_checksum_failure(&self) {
115        self.checksum_failures.fetch_add(1, Ordering::SeqCst);
116    }
117    
118    /// Record sync start
119    pub fn record_sync_start(&mut self, dirty_count: usize, dirty_bytes: usize) {
120        #[cfg(not(target_arch = "wasm32"))]
121        {
122            self.last_sync_start = Some(Instant::now());
123        }
124        self.last_sync_blocks = dirty_count;
125        self.last_sync_bytes = dirty_bytes;
126        
127        if let Some(ref callback) = self.sync_start_callback {
128            callback(dirty_count, dirty_bytes);
129        }
130    }
131    
132    /// Record sync success
133    pub fn record_sync_success(&mut self, duration_ms: u64, blocks_synced: usize) {
134        // Increment sync count
135        self.sync_count.fetch_add(1, Ordering::SeqCst);
136        
137        if let Some(ref callback) = self.sync_success_callback {
138            callback(duration_ms, blocks_synced);
139        }
140        
141        #[cfg(target_arch = "wasm32")]
142        {
143            if let Some(ref callback) = self.wasm_sync_success_callback {
144                callback(duration_ms, blocks_synced);
145            }
146        }
147    }
148    
149    /// Record sync failure
150    pub fn record_sync_failure(&self, error: &DatabaseError) {
151        if let Some(ref callback) = self.sync_failure_callback {
152            callback(error);
153        }
154    }
155    
156    /// Record backpressure event
157    pub fn record_backpressure(&self, level: &str, reason: &str) {
158        if let Some(ref callback) = self.backpressure_callback {
159            callback(level, reason);
160        }
161    }
162    
163    /// Calculate throughput metrics
164    pub fn calculate_throughput(&self, duration_ms: u64) -> (f64, f64) {
165        if duration_ms == 0 {
166            return (0.0, 0.0);
167        }
168        
169        let duration_sec = duration_ms as f64 / 1000.0;
170        let blocks_per_sec = self.last_sync_blocks as f64 / duration_sec;
171        let bytes_per_sec = self.last_sync_bytes as f64 / duration_sec;
172        
173        (blocks_per_sec, bytes_per_sec)
174    }
175    
176    /// Calculate error rate
177    pub fn calculate_error_rate(&self, total_operations: u64) -> f64 {
178        if total_operations == 0 {
179            return 0.0;
180        }
181        
182        let errors = self.error_count.load(Ordering::SeqCst);
183        errors as f64 / total_operations as f64
184    }
185    
186    /// Get current error count
187    pub fn get_error_count(&self) -> u64 {
188        self.error_count.load(Ordering::SeqCst)
189    }
190    
191    /// Get current checksum failure count
192    pub fn get_checksum_failures(&self) -> u64 {
193        self.checksum_failures.load(Ordering::SeqCst)
194    }
195    
196    /// Get current sync count
197    pub fn get_sync_count(&self) -> u64 {
198        self.sync_count.load(Ordering::SeqCst)
199    }
200}