absurder_sql/storage/
sync_operations.rs1use crate::types::DatabaseError;
5use super::block_storage::BlockStorage;
6
7#[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
8use std::collections::HashMap;
9#[cfg(all(not(target_arch = "wasm32"), not(feature = "fs_persist")))]
10use std::sync::atomic::Ordering;
11
12#[cfg(any(target_arch = "wasm32", all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist"))))]
13use super::vfs_sync;
14#[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
15use super::metadata::BlockMetadataPersist;
16
17#[cfg(target_arch = "wasm32")]
18use std::collections::HashMap;
19#[cfg(target_arch = "wasm32")]
20use super::metadata::BlockMetadataPersist;
21
22#[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
23use super::block_storage::GLOBAL_METADATA_TEST;
24
25pub fn sync_implementation_impl(storage: &mut BlockStorage) -> Result<(), DatabaseError> {
27 #[cfg(target_arch = "wasm32")]
28 use wasm_bindgen::JsCast;
29 #[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
30 let start = std::time::Instant::now();
31
32 let dirty_count = storage.dirty_blocks.lock().len();
34 let dirty_bytes = dirty_count * super::block_storage::BLOCK_SIZE;
35 storage.observability.record_sync_start(dirty_count, dirty_bytes);
36
37 #[cfg(not(target_arch = "wasm32"))]
39 if let Some(ref callback) = storage.observability.sync_start_callback {
40 callback(dirty_count, dirty_bytes);
41 }
42
43 #[cfg(all(not(target_arch = "wasm32"), not(any(test, debug_assertions)), not(feature = "fs_persist")))]
45 {
46 storage.dirty_blocks.lock().clear();
48 storage.sync_count.fetch_add(1, Ordering::SeqCst);
49 return Ok(());
50 }
51
52 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
54 {
55 return storage.fs_persist_sync();
56 }
57
58 #[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
60 {
61 let current_dirty = storage.dirty_blocks.lock().len();
62 log::info!("Syncing {} dirty blocks (native non-fs_persist)", current_dirty);
63
64 let to_persist: Vec<(u64, Vec<u8>)> = {
65 let dirty = storage.dirty_blocks.lock();
66 dirty.iter().map(|(k,v)| (*k, v.clone())).collect()
67 };
68 let ids: Vec<u64> = to_persist.iter().map(|(k, _)| *k).collect();
69 let blocks_synced = ids.len(); let next_commit: u64 = vfs_sync::with_global_commit_marker(|cm| {
73 let cm = cm.borrow();
74 let current = cm.get(&storage.db_name).copied().unwrap_or(0);
75 current + 1
76 });
77
78 vfs_sync::with_global_storage(|gs| {
80 let mut storage_map = gs.borrow_mut();
81 let db_storage = storage_map.entry(storage.db_name.clone()).or_insert_with(HashMap::new);
82 for (block_id, data) in to_persist {
83 db_storage.insert(block_id, data);
84 }
85 });
86
87 GLOBAL_METADATA_TEST.with(|meta| {
89 let mut meta_map = meta.borrow_mut();
90 let db_meta = meta_map.entry(storage.db_name.clone()).or_insert_with(HashMap::new);
91 for block_id in ids {
92 if let Some(checksum) = storage.checksum_manager.get_checksum(block_id) {
93 let version = next_commit as u32;
95 db_meta.insert(
96 block_id,
97 BlockMetadataPersist {
98 checksum,
99 last_modified_ms: std::time::SystemTime::now()
100 .duration_since(std::time::UNIX_EPOCH)
101 .unwrap_or_default()
102 .as_millis() as u64,
103 version,
104 algo: storage.checksum_manager.get_algorithm(block_id),
105 },
106 );
107 }
108 }
109 });
110
111 vfs_sync::with_global_commit_marker(|cm| {
113 let mut cm_map = cm.borrow_mut();
114 cm_map.insert(storage.db_name.clone(), next_commit);
115 });
116
117 {
119 let mut dirty = storage.dirty_blocks.lock();
120 dirty.clear();
121 }
122
123 storage.sync_count.fetch_add(1, Ordering::SeqCst);
125 let elapsed = start.elapsed();
126 let ms = elapsed.as_millis() as u64;
127 let ms = if ms == 0 { 1 } else { ms };
128 storage.last_sync_duration_ms.store(ms, Ordering::SeqCst);
129
130 storage.observability.record_sync_success(ms, blocks_synced);
132
133 if let Some(ref callback) = storage.observability.sync_success_callback {
135 callback(ms, blocks_synced);
136 }
137
138 storage.evict_if_needed();
139 return Ok(());
140 }
141
142 #[cfg(target_arch = "wasm32")]
143 {
144 let current_dirty = storage.dirty_blocks.lock().len();
146 log::info!("Syncing {} dirty blocks (WASM)", current_dirty);
147
148 let to_persist: Vec<(u64, Vec<u8>)> = {
150 let dirty = storage.dirty_blocks.lock();
151 dirty.iter().map(|(k,v)| (*k, v.clone())).collect()
152 };
153 let ids: Vec<u64> = to_persist.iter().map(|(k, _)| *k).collect();
154 let next_commit: u64 = vfs_sync::with_global_commit_marker(|cm| {
156 let cm = cm.borrow();
157 let current = cm.get(&storage.db_name).copied().unwrap_or(0);
158 current + 1
159 });
160 vfs_sync::with_global_storage(|gs| {
161 let mut storage_map = gs.borrow_mut();
162 let db_storage = storage_map.entry(storage.db_name.clone()).or_insert_with(HashMap::new);
163 for (block_id, data) in &to_persist {
164 let should_update = if let Some(existing) = db_storage.get(block_id) {
166 if existing != data {
167 let has_committed_metadata = vfs_sync::with_global_metadata(|meta| {
169 let meta_map = meta.borrow();
170 if let Some(db_meta) = meta_map.get(&storage.db_name) {
171 if let Some(metadata) = db_meta.get(block_id) {
172 metadata.version > 0
173 } else {
174 false
175 }
176 } else {
177 false
178 }
179 });
180
181 if has_committed_metadata {
182 false } else {
185 true }
187 } else {
188 true }
190 } else {
191 true };
193
194 if should_update {
195 db_storage.insert(*block_id, data.clone());
196 }
197 }
198 });
199 vfs_sync::with_global_metadata(|meta| {
201 let mut meta_map = meta.borrow_mut();
202 let db_meta = meta_map.entry(storage.db_name.clone()).or_insert_with(HashMap::new);
203 for block_id in ids {
204 if let Some(checksum) = storage.checksum_manager.get_checksum(block_id) {
205 let version = next_commit as u32;
207 db_meta.insert(
208 block_id,
209 BlockMetadataPersist {
210 checksum,
211 last_modified_ms: BlockStorage::now_millis(),
212 version,
213 algo: storage.checksum_manager.get_algorithm(block_id),
214 },
215 );
216 }
217 }
218 });
219 vfs_sync::with_global_commit_marker(|cm| {
221 let mut cm_map = cm.borrow_mut();
222 cm_map.insert(storage.db_name.clone(), next_commit);
223 });
224
225 let db_name = storage.db_name.clone();
227 wasm_bindgen_futures::spawn_local(async move {
228 let Some(window) = web_sys::window() else {
229 log::error!("Window unavailable for IndexedDB sync - cannot persist");
230 return;
231 };
232 let idb_factory = match window.indexed_db() {
233 Ok(Some(factory)) => factory,
234 Ok(None) => {
235 log::warn!("IndexedDB unavailable for sync (private browsing?) - data not persisted to IndexedDB");
236 return;
237 },
238 Err(_) => {
239 log::error!("IndexedDB access denied for sync - data not persisted to IndexedDB");
240 return;
241 }
242 };
243 let open_req = match idb_factory.open_with_u32("block_storage", 2) {
244 Ok(req) => req,
245 Err(e) => {
246 log::error!("Failed to open IndexedDB for sync: {:?}", e);
247 return;
248 }
249 };
250
251 let upgrade_handler = js_sys::Function::new_no_args(&format!(
253 "
254 const db = event.target.result;
255 if (!db.objectStoreNames.contains('blocks')) {{
256 db.createObjectStore('blocks');
257 }}
258 if (!db.objectStoreNames.contains('metadata')) {{
259 db.createObjectStore('metadata');
260 }}
261 "
262 ));
263 open_req.set_onupgradeneeded(Some(&upgrade_handler));
264
265 let (tx, rx) = futures::channel::oneshot::channel();
267 let tx = std::rc::Rc::new(std::cell::RefCell::new(Some(tx)));
268
269 let success_tx = tx.clone();
270 let success_callback = wasm_bindgen::closure::Closure::wrap(Box::new(move |event: web_sys::Event| {
271 if let Some(tx) = success_tx.borrow_mut().take() {
272 let target = event.target().unwrap();
273 let request: web_sys::IdbOpenDbRequest = target.unchecked_into();
274 let result = request.result().unwrap();
275 let _ = tx.send(Ok(result));
276 }
277 }) as Box<dyn FnMut(_)>);
278
279 let error_tx = tx.clone();
280 let error_callback = wasm_bindgen::closure::Closure::wrap(Box::new(move |event: web_sys::Event| {
281 if let Some(tx) = error_tx.borrow_mut().take() {
282 let _ = tx.send(Err(format!("IndexedDB open failed: {:?}", event)));
283 }
284 }) as Box<dyn FnMut(_)>);
285
286 open_req.set_onsuccess(Some(success_callback.as_ref().unchecked_ref()));
287 open_req.set_onerror(Some(error_callback.as_ref().unchecked_ref()));
288
289 let db_result = rx.await;
290
291 success_callback.forget();
293 error_callback.forget();
294
295 match db_result {
296 Ok(Ok(db_value)) => {
297 if let Ok(db) = db_value.dyn_into::<web_sys::IdbDatabase>() {
298 let store_names = js_sys::Array::new();
300 store_names.push(&wasm_bindgen::JsValue::from_str("blocks"));
301 store_names.push(&wasm_bindgen::JsValue::from_str("metadata"));
302
303 let transaction = db.transaction_with_str_sequence_and_mode(
304 &store_names,
305 web_sys::IdbTransactionMode::Readwrite
306 ).unwrap();
307
308 let blocks_store = transaction.object_store("blocks").unwrap();
309 let metadata_store = transaction.object_store("metadata").unwrap();
310
311 for (block_id, data) in &to_persist {
313 let key = wasm_bindgen::JsValue::from_str(&format!("{}_{}", db_name, block_id));
314 let value = js_sys::Uint8Array::from(&data[..]);
315 blocks_store.put_with_key(&value, &key).unwrap();
316 }
317
318 let commit_key = wasm_bindgen::JsValue::from_str(&format!("{}_commit_marker", db_name));
320 let commit_value = wasm_bindgen::JsValue::from_f64(next_commit as f64);
321 metadata_store.put_with_key(&commit_value, &commit_key).unwrap();
322
323 let (tx_tx, tx_rx) = futures::channel::oneshot::channel();
325 let tx_tx = std::rc::Rc::new(std::cell::RefCell::new(Some(tx_tx)));
326
327 let tx_complete_tx = tx_tx.clone();
328 let tx_complete_callback = wasm_bindgen::closure::Closure::wrap(Box::new(move |_event: web_sys::Event| {
329 if let Some(tx) = tx_complete_tx.borrow_mut().take() {
330 let _ = tx.send(Ok(()));
331 }
332 }) as Box<dyn FnMut(_)>);
333
334 let tx_error_tx = tx_tx.clone();
335 let tx_error_callback = wasm_bindgen::closure::Closure::wrap(Box::new(move |event: web_sys::Event| {
336 if let Some(tx) = tx_error_tx.borrow_mut().take() {
337 let _ = tx.send(Err(format!("Transaction failed: {:?}", event)));
338 }
339 }) as Box<dyn FnMut(_)>);
340
341 transaction.set_oncomplete(Some(tx_complete_callback.as_ref().unchecked_ref()));
342 transaction.set_onerror(Some(tx_error_callback.as_ref().unchecked_ref()));
343
344 let _ = tx_rx.await;
345
346 tx_complete_callback.forget();
348 tx_error_callback.forget();
349 }
350 }
351 _ => {}
352 }
354 });
355 {
357 let mut dirty = storage.dirty_blocks.lock();
358 dirty.clear();
359 }
360
361 storage.observability.record_sync_success(1, current_dirty);
364
365 #[cfg(target_arch = "wasm32")]
367 if let Some(ref callback) = storage.observability.wasm_sync_success_callback {
368 callback(1, current_dirty);
369 }
370
371 storage.evict_if_needed();
372 Ok(())
373 }
374 }