absurder_sql/storage/
sync_operations.rs1#[cfg(target_arch = "wasm32")]
6macro_rules! lock_mutex {
7 ($mutex:expr) => {
8 $mutex
9 .try_borrow_mut()
10 .expect("RefCell borrow failed - reentrancy detected in sync_operations.rs")
11 };
12}
13
14#[cfg(not(target_arch = "wasm32"))]
15macro_rules! lock_mutex {
16 ($mutex:expr) => {
17 $mutex.lock()
18 };
19}
20
21#[allow(unused_macros)]
22#[cfg(target_arch = "wasm32")]
23macro_rules! try_lock_mutex {
24 ($mutex:expr) => {
25 $mutex
26 };
27}
28
29#[allow(unused_macros)]
30#[cfg(not(target_arch = "wasm32"))]
31macro_rules! try_lock_mutex {
32 ($mutex:expr) => {
33 $mutex.lock()
34 };
35}
36
37use super::block_storage::BlockStorage;
38use crate::types::DatabaseError;
39
40#[cfg(all(
41 not(target_arch = "wasm32"),
42 any(test, debug_assertions),
43 not(feature = "fs_persist")
44))]
45use std::collections::HashMap;
46#[cfg(all(not(target_arch = "wasm32"), not(feature = "fs_persist")))]
47use std::sync::atomic::Ordering;
48
49#[cfg(all(
50 not(target_arch = "wasm32"),
51 any(test, debug_assertions),
52 not(feature = "fs_persist")
53))]
54use super::metadata::BlockMetadataPersist;
55#[cfg(any(
56 target_arch = "wasm32",
57 all(
58 not(target_arch = "wasm32"),
59 any(test, debug_assertions),
60 not(feature = "fs_persist")
61 )
62))]
63use super::vfs_sync;
64
65#[cfg(target_arch = "wasm32")]
66use super::metadata::BlockMetadataPersist;
67#[cfg(target_arch = "wasm32")]
68use std::collections::HashMap;
69
70#[cfg(all(
71 not(target_arch = "wasm32"),
72 any(test, debug_assertions),
73 not(feature = "fs_persist")
74))]
75use super::block_storage::GLOBAL_METADATA_TEST;
76
77pub fn sync_implementation_impl(storage: &mut BlockStorage) -> Result<(), DatabaseError> {
79 #[cfg(all(
80 not(target_arch = "wasm32"),
81 any(test, debug_assertions),
82 not(feature = "fs_persist")
83 ))]
84 let start = std::time::Instant::now();
85
86 let dirty_count = lock_mutex!(storage.dirty_blocks).len();
88 let dirty_bytes = dirty_count * super::block_storage::BLOCK_SIZE;
89 storage
90 .observability
91 .record_sync_start(dirty_count, dirty_bytes);
92
93 #[cfg(not(target_arch = "wasm32"))]
95 if let Some(ref callback) = storage.observability.sync_start_callback {
96 callback(dirty_count, dirty_bytes);
97 }
98
99 #[cfg(all(
101 not(target_arch = "wasm32"),
102 not(any(test, debug_assertions)),
103 not(feature = "fs_persist")
104 ))]
105 {
106 lock_mutex!(storage.dirty_blocks).clear();
108 storage.sync_count.fetch_add(1, Ordering::SeqCst);
109 return Ok(());
110 }
111
112 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
114 {
115 storage.fs_persist_sync()
116 }
117
118 #[cfg(all(
120 not(target_arch = "wasm32"),
121 any(test, debug_assertions),
122 not(feature = "fs_persist")
123 ))]
124 {
125 let current_dirty = lock_mutex!(storage.dirty_blocks).len();
126 log::info!(
127 "Syncing {} dirty blocks (native non-fs_persist)",
128 current_dirty
129 );
130
131 let to_persist: Vec<(u64, Vec<u8>)> = {
132 let dirty = lock_mutex!(storage.dirty_blocks);
133 dirty.iter().map(|(k, v)| (*k, v.clone())).collect()
134 };
135 let ids: Vec<u64> = to_persist.iter().map(|(k, _)| *k).collect();
136 let blocks_synced = ids.len(); let next_commit: u64 = vfs_sync::with_global_commit_marker(|cm| {
140 #[cfg(target_arch = "wasm32")]
141 let cm = cm;
142 #[cfg(not(target_arch = "wasm32"))]
143 let cm = cm.borrow();
144 let current = cm.get(&storage.db_name).copied().unwrap_or(0);
145 current + 1
146 });
147
148 vfs_sync::with_global_storage(|gs| {
150 #[cfg(target_arch = "wasm32")]
151 let storage_map = gs;
152 #[cfg(not(target_arch = "wasm32"))]
153 let mut storage_map = gs.borrow_mut();
154 let db_storage = storage_map
155 .entry(storage.db_name.clone())
156 .or_insert_with(HashMap::new);
157 for (block_id, data) in to_persist {
158 db_storage.insert(block_id, data);
159 }
160 });
161
162 GLOBAL_METADATA_TEST.with(|meta| {
164 #[cfg(target_arch = "wasm32")]
165 let mut meta_map = meta.borrow_mut();
166 #[cfg(not(target_arch = "wasm32"))]
167 let mut meta_map = meta.lock();
168 let db_meta = meta_map
169 .entry(storage.db_name.clone())
170 .or_insert_with(HashMap::new);
171 for block_id in ids {
172 if let Some(checksum) = storage.checksum_manager.get_checksum(block_id) {
173 let version = next_commit as u32;
175 db_meta.insert(
176 block_id,
177 BlockMetadataPersist {
178 checksum,
179 last_modified_ms: std::time::SystemTime::now()
180 .duration_since(std::time::UNIX_EPOCH)
181 .unwrap_or_default()
182 .as_millis() as u64,
183 version,
184 algo: storage.checksum_manager.get_algorithm(block_id),
185 },
186 );
187 }
188 }
189 });
190
191 vfs_sync::with_global_commit_marker(|cm| {
193 #[cfg(target_arch = "wasm32")]
194 let cm_map = cm;
195 #[cfg(not(target_arch = "wasm32"))]
196 let mut cm_map = cm.borrow_mut();
197 cm_map.insert(storage.db_name.clone(), next_commit);
198 });
199
200 {
202 let mut dirty = lock_mutex!(storage.dirty_blocks);
203 dirty.clear();
204 }
205
206 storage.sync_count.fetch_add(1, Ordering::SeqCst);
208 let elapsed = start.elapsed();
209 let ms = elapsed.as_millis() as u64;
210 let ms = if ms == 0 { 1 } else { ms };
211 storage.last_sync_duration_ms.store(ms, Ordering::SeqCst);
212
213 storage.observability.record_sync_success(ms, blocks_synced);
215
216 if let Some(ref callback) = storage.observability.sync_success_callback {
218 callback(ms, blocks_synced);
219 }
220
221 storage.evict_if_needed();
222 return Ok(());
223 }
224
225 #[cfg(target_arch = "wasm32")]
226 {
227 let current_dirty = lock_mutex!(storage.dirty_blocks).len();
229 log::info!("Syncing {} dirty blocks (WASM)", current_dirty);
230
231 let to_persist: Vec<(u64, Vec<u8>)> = {
233 let dirty = lock_mutex!(storage.dirty_blocks);
234 dirty.iter().map(|(k, v)| (*k, v.clone())).collect()
235 };
236 let ids: Vec<u64> = to_persist.iter().map(|(k, _)| *k).collect();
237 let next_commit: u64 = vfs_sync::with_global_commit_marker(|cm| {
239 let cm = cm;
240 let current = cm.borrow().get(&storage.db_name).copied().unwrap_or(0);
241 current + 1
242 });
243 vfs_sync::with_global_storage(|gs| {
244 let mut storage_map = gs.borrow_mut();
245 let db_storage = storage_map
246 .entry(storage.db_name.clone())
247 .or_insert_with(HashMap::new);
248 for (block_id, data) in &to_persist {
249 let should_update = if let Some(existing) = db_storage.get(block_id) {
251 if existing != data {
252 let has_committed_metadata = vfs_sync::with_global_metadata(|meta| {
254 if let Some(db_meta) = meta.borrow().get(&storage.db_name) {
255 if let Some(metadata) = db_meta.get(block_id) {
256 metadata.version > 0
257 } else {
258 false
259 }
260 } else {
261 false
262 }
263 });
264
265 if has_committed_metadata {
266 false } else {
269 true }
271 } else {
272 true }
274 } else {
275 true };
277
278 if should_update {
279 db_storage.insert(*block_id, data.clone());
280 }
281 }
282 });
283 vfs_sync::with_global_metadata(|meta| {
285 let mut meta_guard = meta.borrow_mut();
286 let db_meta = meta_guard
287 .entry(storage.db_name.clone())
288 .or_insert_with(HashMap::new);
289 for block_id in ids {
290 if let Some(checksum) = storage.checksum_manager.get_checksum(block_id) {
291 let version = next_commit as u32;
293 db_meta.insert(
294 block_id,
295 BlockMetadataPersist {
296 checksum,
297 last_modified_ms: BlockStorage::now_millis(),
298 version,
299 algo: storage.checksum_manager.get_algorithm(block_id),
300 },
301 );
302 }
303 }
304 });
305 vfs_sync::with_global_commit_marker(|cm| {
307 let cm_map = cm;
308 cm_map
309 .borrow_mut()
310 .insert(storage.db_name.clone(), next_commit);
311 });
312
313 let db_name = storage.db_name.clone();
315 wasm_bindgen_futures::spawn_local(async move {
316 use wasm_bindgen::JsCast;
317
318 let global = js_sys::global();
320 let indexed_db_value = match js_sys::Reflect::get(
321 &global,
322 &wasm_bindgen::JsValue::from_str("indexedDB"),
323 ) {
324 Ok(val) => val,
325 Err(_) => {
326 log::error!("IndexedDB property access failed - cannot persist");
327 return;
328 }
329 };
330
331 if indexed_db_value.is_null() || indexed_db_value.is_undefined() {
332 log::warn!(
333 "IndexedDB unavailable for sync (private browsing?) - data not persisted to IndexedDB"
334 );
335 return;
336 }
337
338 let idb_factory = match indexed_db_value.dyn_into::<web_sys::IdbFactory>() {
339 Ok(factory) => factory,
340 Err(_) => {
341 log::error!("IndexedDB property is not an IdbFactory - cannot persist");
342 return;
343 }
344 };
345
346 let open_req = match idb_factory.open_with_u32("block_storage", 2) {
347 Ok(req) => req,
348 Err(e) => {
349 log::error!("Failed to open IndexedDB for sync: {:?}", e);
350 return;
351 }
352 };
353
354 let upgrade_handler = js_sys::Function::new_no_args(&format!(
356 "
357 const db = event.target.result;
358 if (!db.objectStoreNames.contains('blocks')) {{
359 db.createObjectStore('blocks');
360 }}
361 if (!db.objectStoreNames.contains('metadata')) {{
362 db.createObjectStore('metadata');
363 }}
364 "
365 ));
366 open_req.set_onupgradeneeded(Some(&upgrade_handler));
367
368 let (tx, rx) = futures::channel::oneshot::channel();
370 let tx = std::rc::Rc::new(std::cell::RefCell::new(Some(tx)));
371
372 let success_tx = tx.clone();
373 let success_callback =
374 wasm_bindgen::closure::Closure::wrap(Box::new(move |event: web_sys::Event| {
375 if let Some(tx) = success_tx.borrow_mut().take() {
376 let target = event.target().unwrap();
377 let request: web_sys::IdbOpenDbRequest = target.unchecked_into();
378 let result = request.result().unwrap();
379 let _ = tx.send(Ok(result));
380 }
381 }) as Box<dyn FnMut(_)>);
382
383 let error_tx = tx.clone();
384 let error_callback =
385 wasm_bindgen::closure::Closure::wrap(Box::new(move |event: web_sys::Event| {
386 if let Some(tx) = error_tx.borrow_mut().take() {
387 let _ = tx.send(Err(format!("IndexedDB open failed: {:?}", event)));
388 }
389 }) as Box<dyn FnMut(_)>);
390
391 open_req.set_onsuccess(Some(success_callback.as_ref().unchecked_ref()));
392 open_req.set_onerror(Some(error_callback.as_ref().unchecked_ref()));
393
394 let db_result = rx.await;
395
396 success_callback.forget();
398 error_callback.forget();
399
400 match db_result {
401 Ok(Ok(db_value)) => {
402 if let Ok(db) = db_value.dyn_into::<web_sys::IdbDatabase>() {
403 let store_names = js_sys::Array::new();
405 store_names.push(&wasm_bindgen::JsValue::from_str("blocks"));
406 store_names.push(&wasm_bindgen::JsValue::from_str("metadata"));
407
408 let transaction = db
409 .transaction_with_str_sequence_and_mode(
410 &store_names,
411 web_sys::IdbTransactionMode::Readwrite,
412 )
413 .unwrap();
414
415 let blocks_store = transaction.object_store("blocks").unwrap();
416 let metadata_store = transaction.object_store("metadata").unwrap();
417
418 for (block_id, data) in &to_persist {
420 let key = wasm_bindgen::JsValue::from_str(&format!(
421 "{}_{}",
422 db_name, block_id
423 ));
424 let value = js_sys::Uint8Array::from(&data[..]);
425 blocks_store.put_with_key(&value, &key).unwrap();
426 }
427
428 let commit_key =
430 wasm_bindgen::JsValue::from_str(&format!("{}_commit_marker", db_name));
431 let commit_value = wasm_bindgen::JsValue::from_f64(next_commit as f64);
432 metadata_store
433 .put_with_key(&commit_value, &commit_key)
434 .unwrap();
435
436 let (tx_tx, tx_rx) = futures::channel::oneshot::channel();
438 let tx_tx = std::rc::Rc::new(std::cell::RefCell::new(Some(tx_tx)));
439
440 let tx_complete_tx = tx_tx.clone();
441 let tx_complete_callback = wasm_bindgen::closure::Closure::wrap(Box::new(
442 move |_event: web_sys::Event| {
443 if let Some(tx) = tx_complete_tx.borrow_mut().take() {
444 let _ = tx.send(Ok(()));
445 }
446 },
447 )
448 as Box<dyn FnMut(_)>);
449
450 let tx_error_tx = tx_tx.clone();
451 let tx_error_callback = wasm_bindgen::closure::Closure::wrap(Box::new(
452 move |event: web_sys::Event| {
453 if let Some(tx) = tx_error_tx.borrow_mut().take() {
454 let _ =
455 tx.send(Err(format!("Transaction failed: {:?}", event)));
456 }
457 },
458 )
459 as Box<dyn FnMut(_)>);
460
461 transaction
462 .set_oncomplete(Some(tx_complete_callback.as_ref().unchecked_ref()));
463 transaction.set_onerror(Some(tx_error_callback.as_ref().unchecked_ref()));
464
465 let _ = tx_rx.await;
466
467 tx_complete_callback.forget();
469 tx_error_callback.forget();
470 }
471 }
472 _ => {} }
474 });
475 {
477 let mut dirty = lock_mutex!(storage.dirty_blocks);
478 dirty.clear();
479 }
480
481 storage.observability.record_sync_success(1, current_dirty);
484
485 #[cfg(target_arch = "wasm32")]
487 if let Some(ref callback) = storage.observability.wasm_sync_success_callback {
488 callback(1, current_dirty);
489 }
490
491 storage.evict_if_needed();
492 Ok(())
493 }
494}