absurder_sql/storage/
observability.rs

1use crate::types::DatabaseError;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4
5#[cfg(not(target_arch = "wasm32"))]
6use std::sync::Mutex;
7#[cfg(not(target_arch = "wasm32"))]
8use std::time::Instant;
9
10/// Comprehensive metrics for observability
11#[derive(Debug, Clone)]
12pub struct StorageMetrics {
13    pub dirty_count: usize,
14    pub dirty_bytes: usize,
15    pub sync_count: u64,
16    pub timer_sync_count: u64,
17    pub debounce_sync_count: u64,
18    pub error_count: u64,
19    pub checksum_failures: u64,
20    pub last_sync_duration_ms: u64,
21    pub throughput_blocks_per_sec: f64,
22    pub throughput_bytes_per_sec: f64,
23    pub error_rate: f64,
24}
25
26impl Default for StorageMetrics {
27    fn default() -> Self {
28        Self {
29            dirty_count: 0,
30            dirty_bytes: 0,
31            sync_count: 0,
32            timer_sync_count: 0,
33            debounce_sync_count: 0,
34            error_count: 0,
35            checksum_failures: 0,
36            last_sync_duration_ms: 0,
37            throughput_blocks_per_sec: 0.0,
38            throughput_bytes_per_sec: 0.0,
39            error_rate: 0.0,
40        }
41    }
42}
43
44/// Event callback types
45pub type SyncStartCallback = Box<dyn Fn(usize, usize) + Send + Sync>;
46pub type SyncSuccessCallback = Box<dyn Fn(u64, usize) + Send + Sync>;
47pub type SyncFailureCallback = Box<dyn Fn(&DatabaseError) + Send + Sync>;
48pub type ErrorCallback = Box<dyn Fn(&DatabaseError) + Send + Sync>;
49pub type BackpressureCallback = Box<dyn Fn(&str, &str) + Send + Sync>;
50
51/// WASM-specific callback types (simpler, no Send/Sync requirements)
52#[cfg(target_arch = "wasm32")]
53pub type WasmSyncSuccessCallback = Box<dyn Fn(u64, usize)>;
54
55/// Observability manager for tracking metrics and events
56pub struct ObservabilityManager {
57    // Atomic counters for thread-safe metrics
58    pub(super) error_count: Arc<AtomicU64>,
59    pub(super) checksum_failures: Arc<AtomicU64>,
60    pub(super) sync_count: Arc<AtomicU64>,
61
62    // Event callbacks
63    pub(super) sync_start_callback: Option<SyncStartCallback>,
64    pub(super) sync_success_callback: Option<SyncSuccessCallback>,
65    pub(super) sync_failure_callback: Option<SyncFailureCallback>,
66    pub(super) error_callback: Option<ErrorCallback>,
67    pub(super) backpressure_callback: Option<BackpressureCallback>,
68
69    // WASM-specific callbacks
70    #[cfg(target_arch = "wasm32")]
71    pub(super) wasm_sync_success_callback: Option<WasmSyncSuccessCallback>,
72
73    // Throughput tracking (use interior mutability)
74    #[cfg(not(target_arch = "wasm32"))]
75    pub(super) last_sync_start: Mutex<Option<Instant>>,
76    pub(super) last_sync_blocks: AtomicU64,
77    pub(super) last_sync_bytes: AtomicU64,
78}
79
80impl Default for ObservabilityManager {
81    fn default() -> Self {
82        Self {
83            error_count: Arc::new(AtomicU64::new(0)),
84            checksum_failures: Arc::new(AtomicU64::new(0)),
85            sync_count: Arc::new(AtomicU64::new(0)),
86            sync_start_callback: None,
87            sync_success_callback: None,
88            sync_failure_callback: None,
89            error_callback: None,
90            backpressure_callback: None,
91            #[cfg(target_arch = "wasm32")]
92            wasm_sync_success_callback: None,
93            #[cfg(not(target_arch = "wasm32"))]
94            last_sync_start: Mutex::new(None),
95            #[cfg(not(target_arch = "wasm32"))]
96            last_sync_blocks: AtomicU64::new(0),
97            #[cfg(not(target_arch = "wasm32"))]
98            last_sync_bytes: AtomicU64::new(0),
99            #[cfg(target_arch = "wasm32")]
100            last_sync_blocks: AtomicU64::new(0),
101            #[cfg(target_arch = "wasm32")]
102            last_sync_bytes: AtomicU64::new(0),
103        }
104    }
105}
106
107impl ObservabilityManager {
108    pub fn new() -> Self {
109        Self::default()
110    }
111
112    /// Record an error occurrence
113    pub fn record_error(&self, error: &DatabaseError) {
114        self.error_count.fetch_add(1, Ordering::SeqCst);
115
116        if let Some(ref callback) = self.error_callback {
117            callback(error);
118        }
119    }
120
121    /// Record a checksum failure
122    pub fn record_checksum_failure(&self) {
123        self.checksum_failures.fetch_add(1, Ordering::SeqCst);
124    }
125
126    /// Record sync start
127    pub fn record_sync_start(&self, dirty_count: usize, dirty_bytes: usize) {
128        #[cfg(not(target_arch = "wasm32"))]
129        {
130            if let Ok(mut guard) = self.last_sync_start.lock() {
131                *guard = Some(Instant::now());
132            }
133        }
134        self.last_sync_blocks
135            .store(dirty_count as u64, Ordering::SeqCst);
136        self.last_sync_bytes
137            .store(dirty_bytes as u64, Ordering::SeqCst);
138
139        if let Some(ref callback) = self.sync_start_callback {
140            callback(dirty_count, dirty_bytes);
141        }
142    }
143
144    /// Record sync success
145    pub fn record_sync_success(&self, duration_ms: u64, blocks_synced: usize) {
146        // Increment sync count
147        self.sync_count.fetch_add(1, Ordering::SeqCst);
148
149        if let Some(ref callback) = self.sync_success_callback {
150            callback(duration_ms, blocks_synced);
151        }
152
153        #[cfg(target_arch = "wasm32")]
154        {
155            if let Some(ref callback) = self.wasm_sync_success_callback {
156                callback(duration_ms, blocks_synced);
157            }
158        }
159    }
160
161    /// Record sync failure
162    pub fn record_sync_failure(&self, error: &DatabaseError) {
163        if let Some(ref callback) = self.sync_failure_callback {
164            callback(error);
165        }
166    }
167
168    /// Record backpressure event
169    pub fn record_backpressure(&self, level: &str, reason: &str) {
170        if let Some(ref callback) = self.backpressure_callback {
171            callback(level, reason);
172        }
173    }
174
175    /// Calculate throughput metrics
176    pub fn calculate_throughput(&self, duration_ms: u64) -> (f64, f64) {
177        if duration_ms == 0 {
178            return (0.0, 0.0);
179        }
180
181        let duration_sec = duration_ms as f64 / 1000.0;
182        let blocks = self.last_sync_blocks.load(Ordering::SeqCst) as f64;
183        let bytes = self.last_sync_bytes.load(Ordering::SeqCst) as f64;
184        let blocks_per_sec = blocks / duration_sec;
185        let bytes_per_sec = bytes / duration_sec;
186
187        (blocks_per_sec, bytes_per_sec)
188    }
189
190    /// Calculate error rate
191    pub fn calculate_error_rate(&self, total_operations: u64) -> f64 {
192        if total_operations == 0 {
193            return 0.0;
194        }
195
196        let errors = self.error_count.load(Ordering::SeqCst);
197        errors as f64 / total_operations as f64
198    }
199
200    /// Get current error count
201    pub fn get_error_count(&self) -> u64 {
202        self.error_count.load(Ordering::SeqCst)
203    }
204
205    /// Get current checksum failure count
206    pub fn get_checksum_failures(&self) -> u64 {
207        self.checksum_failures.load(Ordering::SeqCst)
208    }
209
210    /// Get current sync count
211    pub fn get_sync_count(&self) -> u64 {
212        self.sync_count.load(Ordering::SeqCst)
213    }
214}