absurder_sql/storage/
observability.rs1use crate::types::DatabaseError;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4
5#[cfg(not(target_arch = "wasm32"))]
6use std::sync::Mutex;
7#[cfg(not(target_arch = "wasm32"))]
8use std::time::Instant;
9
10#[derive(Debug, Clone)]
12pub struct StorageMetrics {
13 pub dirty_count: usize,
14 pub dirty_bytes: usize,
15 pub sync_count: u64,
16 pub timer_sync_count: u64,
17 pub debounce_sync_count: u64,
18 pub error_count: u64,
19 pub checksum_failures: u64,
20 pub last_sync_duration_ms: u64,
21 pub throughput_blocks_per_sec: f64,
22 pub throughput_bytes_per_sec: f64,
23 pub error_rate: f64,
24}
25
26impl Default for StorageMetrics {
27 fn default() -> Self {
28 Self {
29 dirty_count: 0,
30 dirty_bytes: 0,
31 sync_count: 0,
32 timer_sync_count: 0,
33 debounce_sync_count: 0,
34 error_count: 0,
35 checksum_failures: 0,
36 last_sync_duration_ms: 0,
37 throughput_blocks_per_sec: 0.0,
38 throughput_bytes_per_sec: 0.0,
39 error_rate: 0.0,
40 }
41 }
42}
43
44pub type SyncStartCallback = Box<dyn Fn(usize, usize) + Send + Sync>;
46pub type SyncSuccessCallback = Box<dyn Fn(u64, usize) + Send + Sync>;
47pub type SyncFailureCallback = Box<dyn Fn(&DatabaseError) + Send + Sync>;
48pub type ErrorCallback = Box<dyn Fn(&DatabaseError) + Send + Sync>;
49pub type BackpressureCallback = Box<dyn Fn(&str, &str) + Send + Sync>;
50
51#[cfg(target_arch = "wasm32")]
53pub type WasmSyncSuccessCallback = Box<dyn Fn(u64, usize)>;
54
55pub struct ObservabilityManager {
57 pub(super) error_count: Arc<AtomicU64>,
59 pub(super) checksum_failures: Arc<AtomicU64>,
60 pub(super) sync_count: Arc<AtomicU64>,
61
62 pub(super) sync_start_callback: Option<SyncStartCallback>,
64 pub(super) sync_success_callback: Option<SyncSuccessCallback>,
65 pub(super) sync_failure_callback: Option<SyncFailureCallback>,
66 pub(super) error_callback: Option<ErrorCallback>,
67 pub(super) backpressure_callback: Option<BackpressureCallback>,
68
69 #[cfg(target_arch = "wasm32")]
71 pub(super) wasm_sync_success_callback: Option<WasmSyncSuccessCallback>,
72
73 #[cfg(not(target_arch = "wasm32"))]
75 pub(super) last_sync_start: Mutex<Option<Instant>>,
76 pub(super) last_sync_blocks: AtomicU64,
77 pub(super) last_sync_bytes: AtomicU64,
78}
79
80impl Default for ObservabilityManager {
81 fn default() -> Self {
82 Self {
83 error_count: Arc::new(AtomicU64::new(0)),
84 checksum_failures: Arc::new(AtomicU64::new(0)),
85 sync_count: Arc::new(AtomicU64::new(0)),
86 sync_start_callback: None,
87 sync_success_callback: None,
88 sync_failure_callback: None,
89 error_callback: None,
90 backpressure_callback: None,
91 #[cfg(target_arch = "wasm32")]
92 wasm_sync_success_callback: None,
93 #[cfg(not(target_arch = "wasm32"))]
94 last_sync_start: Mutex::new(None),
95 #[cfg(not(target_arch = "wasm32"))]
96 last_sync_blocks: AtomicU64::new(0),
97 #[cfg(not(target_arch = "wasm32"))]
98 last_sync_bytes: AtomicU64::new(0),
99 #[cfg(target_arch = "wasm32")]
100 last_sync_blocks: AtomicU64::new(0),
101 #[cfg(target_arch = "wasm32")]
102 last_sync_bytes: AtomicU64::new(0),
103 }
104 }
105}
106
107impl ObservabilityManager {
108 pub fn new() -> Self {
109 Self::default()
110 }
111
112 pub fn record_error(&self, error: &DatabaseError) {
114 self.error_count.fetch_add(1, Ordering::SeqCst);
115
116 if let Some(ref callback) = self.error_callback {
117 callback(error);
118 }
119 }
120
121 pub fn record_checksum_failure(&self) {
123 self.checksum_failures.fetch_add(1, Ordering::SeqCst);
124 }
125
126 pub fn record_sync_start(&self, dirty_count: usize, dirty_bytes: usize) {
128 #[cfg(not(target_arch = "wasm32"))]
129 {
130 if let Ok(mut guard) = self.last_sync_start.lock() {
131 *guard = Some(Instant::now());
132 }
133 }
134 self.last_sync_blocks
135 .store(dirty_count as u64, Ordering::SeqCst);
136 self.last_sync_bytes
137 .store(dirty_bytes as u64, Ordering::SeqCst);
138
139 if let Some(ref callback) = self.sync_start_callback {
140 callback(dirty_count, dirty_bytes);
141 }
142 }
143
144 pub fn record_sync_success(&self, duration_ms: u64, blocks_synced: usize) {
146 self.sync_count.fetch_add(1, Ordering::SeqCst);
148
149 if let Some(ref callback) = self.sync_success_callback {
150 callback(duration_ms, blocks_synced);
151 }
152
153 #[cfg(target_arch = "wasm32")]
154 {
155 if let Some(ref callback) = self.wasm_sync_success_callback {
156 callback(duration_ms, blocks_synced);
157 }
158 }
159 }
160
161 pub fn record_sync_failure(&self, error: &DatabaseError) {
163 if let Some(ref callback) = self.sync_failure_callback {
164 callback(error);
165 }
166 }
167
168 pub fn record_backpressure(&self, level: &str, reason: &str) {
170 if let Some(ref callback) = self.backpressure_callback {
171 callback(level, reason);
172 }
173 }
174
175 pub fn calculate_throughput(&self, duration_ms: u64) -> (f64, f64) {
177 if duration_ms == 0 {
178 return (0.0, 0.0);
179 }
180
181 let duration_sec = duration_ms as f64 / 1000.0;
182 let blocks = self.last_sync_blocks.load(Ordering::SeqCst) as f64;
183 let bytes = self.last_sync_bytes.load(Ordering::SeqCst) as f64;
184 let blocks_per_sec = blocks / duration_sec;
185 let bytes_per_sec = bytes / duration_sec;
186
187 (blocks_per_sec, bytes_per_sec)
188 }
189
190 pub fn calculate_error_rate(&self, total_operations: u64) -> f64 {
192 if total_operations == 0 {
193 return 0.0;
194 }
195
196 let errors = self.error_count.load(Ordering::SeqCst);
197 errors as f64 / total_operations as f64
198 }
199
200 pub fn get_error_count(&self) -> u64 {
202 self.error_count.load(Ordering::SeqCst)
203 }
204
205 pub fn get_checksum_failures(&self) -> u64 {
207 self.checksum_failures.load(Ordering::SeqCst)
208 }
209
210 pub fn get_sync_count(&self) -> u64 {
212 self.sync_count.load(Ordering::SeqCst)
213 }
214}