absurder_sql/storage/
observability.rs1use crate::types::DatabaseError;
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::sync::Arc;
4
5#[cfg(not(target_arch = "wasm32"))]
6use std::time::Instant;
7
8#[derive(Debug, Clone)]
10pub struct StorageMetrics {
11 pub dirty_count: usize,
12 pub dirty_bytes: usize,
13 pub sync_count: u64,
14 pub timer_sync_count: u64,
15 pub debounce_sync_count: u64,
16 pub error_count: u64,
17 pub checksum_failures: u64,
18 pub last_sync_duration_ms: u64,
19 pub throughput_blocks_per_sec: f64,
20 pub throughput_bytes_per_sec: f64,
21 pub error_rate: f64,
22}
23
24impl Default for StorageMetrics {
25 fn default() -> Self {
26 Self {
27 dirty_count: 0,
28 dirty_bytes: 0,
29 sync_count: 0,
30 timer_sync_count: 0,
31 debounce_sync_count: 0,
32 error_count: 0,
33 checksum_failures: 0,
34 last_sync_duration_ms: 0,
35 throughput_blocks_per_sec: 0.0,
36 throughput_bytes_per_sec: 0.0,
37 error_rate: 0.0,
38 }
39 }
40}
41
42pub type SyncStartCallback = Box<dyn Fn(usize, usize) + Send + Sync>;
44pub type SyncSuccessCallback = Box<dyn Fn(u64, usize) + Send + Sync>;
45pub type SyncFailureCallback = Box<dyn Fn(&DatabaseError) + Send + Sync>;
46pub type ErrorCallback = Box<dyn Fn(&DatabaseError) + Send + Sync>;
47pub type BackpressureCallback = Box<dyn Fn(&str, &str) + Send + Sync>;
48
49#[cfg(target_arch = "wasm32")]
51pub type WasmSyncSuccessCallback = Box<dyn Fn(u64, usize)>;
52
53pub struct ObservabilityManager {
55 pub(super) error_count: Arc<AtomicU64>,
57 pub(super) checksum_failures: Arc<AtomicU64>,
58 pub(super) sync_count: Arc<AtomicU64>,
59
60 pub(super) sync_start_callback: Option<SyncStartCallback>,
62 pub(super) sync_success_callback: Option<SyncSuccessCallback>,
63 pub(super) sync_failure_callback: Option<SyncFailureCallback>,
64 pub(super) error_callback: Option<ErrorCallback>,
65 pub(super) backpressure_callback: Option<BackpressureCallback>,
66
67 #[cfg(target_arch = "wasm32")]
69 pub(super) wasm_sync_success_callback: Option<WasmSyncSuccessCallback>,
70
71 #[cfg(not(target_arch = "wasm32"))]
73 pub(super) last_sync_start: Option<Instant>,
74 pub(super) last_sync_blocks: usize,
75 pub(super) last_sync_bytes: usize,
76}
77
78impl Default for ObservabilityManager {
79 fn default() -> Self {
80 Self {
81 error_count: Arc::new(AtomicU64::new(0)),
82 checksum_failures: Arc::new(AtomicU64::new(0)),
83 sync_count: Arc::new(AtomicU64::new(0)),
84 sync_start_callback: None,
85 sync_success_callback: None,
86 sync_failure_callback: None,
87 error_callback: None,
88 backpressure_callback: None,
89 #[cfg(target_arch = "wasm32")]
90 wasm_sync_success_callback: None,
91 #[cfg(not(target_arch = "wasm32"))]
92 last_sync_start: None,
93 last_sync_blocks: 0,
94 last_sync_bytes: 0,
95 }
96 }
97}
98
99impl ObservabilityManager {
100 pub fn new() -> Self {
101 Self::default()
102 }
103
104 pub fn record_error(&self, error: &DatabaseError) {
106 self.error_count.fetch_add(1, Ordering::SeqCst);
107
108 if let Some(ref callback) = self.error_callback {
109 callback(error);
110 }
111 }
112
113 pub fn record_checksum_failure(&self) {
115 self.checksum_failures.fetch_add(1, Ordering::SeqCst);
116 }
117
118 pub fn record_sync_start(&mut self, dirty_count: usize, dirty_bytes: usize) {
120 #[cfg(not(target_arch = "wasm32"))]
121 {
122 self.last_sync_start = Some(Instant::now());
123 }
124 self.last_sync_blocks = dirty_count;
125 self.last_sync_bytes = dirty_bytes;
126
127 if let Some(ref callback) = self.sync_start_callback {
128 callback(dirty_count, dirty_bytes);
129 }
130 }
131
132 pub fn record_sync_success(&mut self, duration_ms: u64, blocks_synced: usize) {
134 self.sync_count.fetch_add(1, Ordering::SeqCst);
136
137 if let Some(ref callback) = self.sync_success_callback {
138 callback(duration_ms, blocks_synced);
139 }
140
141 #[cfg(target_arch = "wasm32")]
142 {
143 if let Some(ref callback) = self.wasm_sync_success_callback {
144 callback(duration_ms, blocks_synced);
145 }
146 }
147 }
148
149 pub fn record_sync_failure(&self, error: &DatabaseError) {
151 if let Some(ref callback) = self.sync_failure_callback {
152 callback(error);
153 }
154 }
155
156 pub fn record_backpressure(&self, level: &str, reason: &str) {
158 if let Some(ref callback) = self.backpressure_callback {
159 callback(level, reason);
160 }
161 }
162
163 pub fn calculate_throughput(&self, duration_ms: u64) -> (f64, f64) {
165 if duration_ms == 0 {
166 return (0.0, 0.0);
167 }
168
169 let duration_sec = duration_ms as f64 / 1000.0;
170 let blocks_per_sec = self.last_sync_blocks as f64 / duration_sec;
171 let bytes_per_sec = self.last_sync_bytes as f64 / duration_sec;
172
173 (blocks_per_sec, bytes_per_sec)
174 }
175
176 pub fn calculate_error_rate(&self, total_operations: u64) -> f64 {
178 if total_operations == 0 {
179 return 0.0;
180 }
181
182 let errors = self.error_count.load(Ordering::SeqCst);
183 errors as f64 / total_operations as f64
184 }
185
186 pub fn get_error_count(&self) -> u64 {
188 self.error_count.load(Ordering::SeqCst)
189 }
190
191 pub fn get_checksum_failures(&self) -> u64 {
193 self.checksum_failures.load(Ordering::SeqCst)
194 }
195
196 pub fn get_sync_count(&self) -> u64 {
198 self.sync_count.load(Ordering::SeqCst)
199 }
200}