absurder_sql/storage/
sync_operations.rs

1//! Sync operations for BlockStorage
2//! This module contains the core sync implementation logic
3
4// Reentrancy-safe lock macros
5#[cfg(target_arch = "wasm32")]
6macro_rules! lock_mutex {
7    ($mutex:expr) => {
8        $mutex
9            .try_borrow_mut()
10            .expect("RefCell borrow failed - reentrancy detected in sync_operations.rs")
11    };
12}
13
14#[cfg(not(target_arch = "wasm32"))]
15macro_rules! lock_mutex {
16    ($mutex:expr) => {
17        $mutex.lock()
18    };
19}
20
21#[allow(unused_macros)]
22#[cfg(target_arch = "wasm32")]
23macro_rules! try_lock_mutex {
24    ($mutex:expr) => {
25        $mutex
26    };
27}
28
29#[allow(unused_macros)]
30#[cfg(not(target_arch = "wasm32"))]
31macro_rules! try_lock_mutex {
32    ($mutex:expr) => {
33        $mutex.lock()
34    };
35}
36
37use super::block_storage::BlockStorage;
38use crate::types::DatabaseError;
39
40#[cfg(all(
41    not(target_arch = "wasm32"),
42    any(test, debug_assertions),
43    not(feature = "fs_persist")
44))]
45use std::collections::HashMap;
46#[cfg(all(not(target_arch = "wasm32"), not(feature = "fs_persist")))]
47use std::sync::atomic::Ordering;
48
49#[cfg(all(
50    not(target_arch = "wasm32"),
51    any(test, debug_assertions),
52    not(feature = "fs_persist")
53))]
54use super::metadata::BlockMetadataPersist;
55#[cfg(any(
56    target_arch = "wasm32",
57    all(
58        not(target_arch = "wasm32"),
59        any(test, debug_assertions),
60        not(feature = "fs_persist")
61    )
62))]
63use super::vfs_sync;
64
65#[cfg(target_arch = "wasm32")]
66use super::metadata::BlockMetadataPersist;
67#[cfg(target_arch = "wasm32")]
68use std::collections::HashMap;
69
70#[cfg(all(
71    not(target_arch = "wasm32"),
72    any(test, debug_assertions),
73    not(feature = "fs_persist")
74))]
75use super::block_storage::GLOBAL_METADATA_TEST;
76
77/// Internal sync implementation shared by sync() and sync_now()
78pub fn sync_implementation_impl(storage: &mut BlockStorage) -> Result<(), DatabaseError> {
79    #[cfg(all(
80        not(target_arch = "wasm32"),
81        any(test, debug_assertions),
82        not(feature = "fs_persist")
83    ))]
84    let start = std::time::Instant::now();
85
86    // Record sync start for observability
87    let dirty_count = lock_mutex!(storage.dirty_blocks).len();
88    let dirty_bytes = dirty_count * super::block_storage::BLOCK_SIZE;
89    storage
90        .observability
91        .record_sync_start(dirty_count, dirty_bytes);
92
93    // Invoke sync start callback if set
94    #[cfg(not(target_arch = "wasm32"))]
95    if let Some(ref callback) = storage.observability.sync_start_callback {
96        callback(dirty_count, dirty_bytes);
97    }
98
99    // Early return for native release builds without fs_persist
100    #[cfg(all(
101        not(target_arch = "wasm32"),
102        not(any(test, debug_assertions)),
103        not(feature = "fs_persist")
104    ))]
105    {
106        // In release mode without fs_persist, just clear dirty blocks
107        lock_mutex!(storage.dirty_blocks).clear();
108        storage.sync_count.fetch_add(1, Ordering::SeqCst);
109        return Ok(());
110    }
111
112    // Call the existing fs_persist implementation for native builds
113    #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
114    {
115        storage.fs_persist_sync()
116    }
117
118    // For native non-fs_persist builds (test/debug only), use simple in-memory sync with commit marker handling
119    #[cfg(all(
120        not(target_arch = "wasm32"),
121        any(test, debug_assertions),
122        not(feature = "fs_persist")
123    ))]
124    {
125        let current_dirty = lock_mutex!(storage.dirty_blocks).len();
126        log::info!(
127            "Syncing {} dirty blocks (native non-fs_persist)",
128            current_dirty
129        );
130
131        let to_persist: Vec<(u64, Vec<u8>)> = {
132            let dirty = lock_mutex!(storage.dirty_blocks);
133            dirty.iter().map(|(k, v)| (*k, v.clone())).collect()
134        };
135        let ids: Vec<u64> = to_persist.iter().map(|(k, _)| *k).collect();
136        let blocks_synced = ids.len(); // Capture length before moving ids
137
138        // Determine next commit version for native test path
139        let next_commit: u64 = vfs_sync::with_global_commit_marker(|cm| {
140            #[cfg(target_arch = "wasm32")]
141            let cm = cm;
142            #[cfg(not(target_arch = "wasm32"))]
143            let cm = cm.borrow();
144            let current = cm.get(&storage.db_name).copied().unwrap_or(0);
145            current + 1
146        });
147
148        // Store blocks in global test storage with versioning
149        vfs_sync::with_global_storage(|gs| {
150            #[cfg(target_arch = "wasm32")]
151            let storage_map = gs;
152            #[cfg(not(target_arch = "wasm32"))]
153            let mut storage_map = gs.borrow_mut();
154            let db_storage = storage_map
155                .entry(storage.db_name.clone())
156                .or_insert_with(HashMap::new);
157            for (block_id, data) in to_persist {
158                db_storage.insert(block_id, data);
159            }
160        });
161
162        // Store metadata with per-commit versioning
163        GLOBAL_METADATA_TEST.with(|meta| {
164            #[cfg(target_arch = "wasm32")]
165            let mut meta_map = meta.borrow_mut();
166            #[cfg(not(target_arch = "wasm32"))]
167            let mut meta_map = meta.lock();
168            let db_meta = meta_map
169                .entry(storage.db_name.clone())
170                .or_insert_with(HashMap::new);
171            for block_id in ids {
172                if let Some(checksum) = storage.checksum_manager.get_checksum(block_id) {
173                    // Use the per-commit version so entries remain invisible until the commit marker advances
174                    let version = next_commit as u32;
175                    db_meta.insert(
176                        block_id,
177                        BlockMetadataPersist {
178                            checksum,
179                            last_modified_ms: std::time::SystemTime::now()
180                                .duration_since(std::time::UNIX_EPOCH)
181                                .unwrap_or_default()
182                                .as_millis() as u64,
183                            version,
184                            algo: storage.checksum_manager.get_algorithm(block_id),
185                        },
186                    );
187                }
188            }
189        });
190
191        // Atomically advance the commit marker after all data and metadata are persisted
192        vfs_sync::with_global_commit_marker(|cm| {
193            #[cfg(target_arch = "wasm32")]
194            let cm_map = cm;
195            #[cfg(not(target_arch = "wasm32"))]
196            let mut cm_map = cm.borrow_mut();
197            cm_map.insert(storage.db_name.clone(), next_commit);
198        });
199
200        // Clear dirty blocks
201        {
202            let mut dirty = lock_mutex!(storage.dirty_blocks);
203            dirty.clear();
204        }
205
206        // Update sync metrics
207        storage.sync_count.fetch_add(1, Ordering::SeqCst);
208        let elapsed = start.elapsed();
209        let ms = elapsed.as_millis() as u64;
210        let ms = if ms == 0 { 1 } else { ms };
211        storage.last_sync_duration_ms.store(ms, Ordering::SeqCst);
212
213        // Record sync success for observability
214        storage.observability.record_sync_success(ms, blocks_synced);
215
216        // Invoke sync success callback if set
217        if let Some(ref callback) = storage.observability.sync_success_callback {
218            callback(ms, blocks_synced);
219        }
220
221        storage.evict_if_needed();
222        return Ok(());
223    }
224
225    #[cfg(target_arch = "wasm32")]
226    {
227        // WASM implementation
228        let current_dirty = lock_mutex!(storage.dirty_blocks).len();
229        log::info!("Syncing {} dirty blocks (WASM)", current_dirty);
230
231        // For WASM, persist dirty blocks to global storage
232        let to_persist: Vec<(u64, Vec<u8>)> = {
233            let dirty = lock_mutex!(storage.dirty_blocks);
234            dirty.iter().map(|(k, v)| (*k, v.clone())).collect()
235        };
236        let ids: Vec<u64> = to_persist.iter().map(|(k, _)| *k).collect();
237        // Determine next commit version so that all metadata written in this sync share the same version
238        let next_commit: u64 = vfs_sync::with_global_commit_marker(|cm| {
239            let cm = cm;
240            let current = cm.borrow().get(&storage.db_name).copied().unwrap_or(0);
241            current + 1
242        });
243        vfs_sync::with_global_storage(|gs| {
244            let mut storage_map = gs.borrow_mut();
245            let db_storage = storage_map
246                .entry(storage.db_name.clone())
247                .or_insert_with(HashMap::new);
248            for (block_id, data) in &to_persist {
249                // Check if block already exists in global storage with committed data
250                let should_update = if let Some(existing) = db_storage.get(block_id) {
251                    if existing != data {
252                        // Check if existing data has committed metadata (version > 0)
253                        let has_committed_metadata = vfs_sync::with_global_metadata(|meta| {
254                            if let Some(db_meta) = meta.borrow().get(&storage.db_name) {
255                                if let Some(metadata) = db_meta.get(block_id) {
256                                    metadata.version > 0
257                                } else {
258                                    false
259                                }
260                            } else {
261                                false
262                            }
263                        });
264
265                        if has_committed_metadata {
266                            // CRITICAL FIX: Never overwrite committed data to prevent corruption
267                            false // Never overwrite committed data
268                        } else {
269                            true // Update uncommitted data
270                        }
271                    } else {
272                        true // Same data, safe to update
273                    }
274                } else {
275                    true // No existing data, safe to insert
276                };
277
278                if should_update {
279                    db_storage.insert(*block_id, data.clone());
280                }
281            }
282        });
283        // Persist corresponding metadata entries
284        vfs_sync::with_global_metadata(|meta| {
285            let mut meta_guard = meta.borrow_mut();
286            let db_meta = meta_guard
287                .entry(storage.db_name.clone())
288                .or_insert_with(HashMap::new);
289            for block_id in ids {
290                if let Some(checksum) = storage.checksum_manager.get_checksum(block_id) {
291                    // Use the per-commit version so entries remain invisible until the commit marker advances
292                    let version = next_commit as u32;
293                    db_meta.insert(
294                        block_id,
295                        BlockMetadataPersist {
296                            checksum,
297                            last_modified_ms: BlockStorage::now_millis(),
298                            version,
299                            algo: storage.checksum_manager.get_algorithm(block_id),
300                        },
301                    );
302                }
303            }
304        });
305        // Atomically advance the commit marker after all data and metadata are persisted
306        vfs_sync::with_global_commit_marker(|cm| {
307            let cm_map = cm;
308            cm_map
309                .borrow_mut()
310                .insert(storage.db_name.clone(), next_commit);
311        });
312
313        // Spawn async IndexedDB persistence (fire and forget for sync compatibility)
314        let db_name = storage.db_name.clone();
315        wasm_bindgen_futures::spawn_local(async move {
316            use wasm_bindgen::JsCast;
317
318            // Get IndexedDB factory (works in both Window and Worker contexts)
319            let global = js_sys::global();
320            let indexed_db_value = match js_sys::Reflect::get(
321                &global,
322                &wasm_bindgen::JsValue::from_str("indexedDB"),
323            ) {
324                Ok(val) => val,
325                Err(_) => {
326                    log::error!("IndexedDB property access failed - cannot persist");
327                    return;
328                }
329            };
330
331            if indexed_db_value.is_null() || indexed_db_value.is_undefined() {
332                log::warn!(
333                    "IndexedDB unavailable for sync (private browsing?) - data not persisted to IndexedDB"
334                );
335                return;
336            }
337
338            let idb_factory = match indexed_db_value.dyn_into::<web_sys::IdbFactory>() {
339                Ok(factory) => factory,
340                Err(_) => {
341                    log::error!("IndexedDB property is not an IdbFactory - cannot persist");
342                    return;
343                }
344            };
345
346            let open_req = match idb_factory.open_with_u32("block_storage", 2) {
347                Ok(req) => req,
348                Err(e) => {
349                    log::error!("Failed to open IndexedDB for sync: {:?}", e);
350                    return;
351                }
352            };
353
354            // Set up upgrade handler to create object stores if needed
355            let upgrade_handler = js_sys::Function::new_no_args(&format!(
356                "
357                    const db = event.target.result;
358                    if (!db.objectStoreNames.contains('blocks')) {{
359                        db.createObjectStore('blocks');
360                    }}
361                    if (!db.objectStoreNames.contains('metadata')) {{
362                        db.createObjectStore('metadata');
363                    }}
364                    "
365            ));
366            open_req.set_onupgradeneeded(Some(&upgrade_handler));
367
368            // Use event-based approach for opening database
369            let (tx, rx) = futures::channel::oneshot::channel();
370            let tx = std::rc::Rc::new(std::cell::RefCell::new(Some(tx)));
371
372            let success_tx = tx.clone();
373            let success_callback =
374                wasm_bindgen::closure::Closure::wrap(Box::new(move |event: web_sys::Event| {
375                    if let Some(tx) = success_tx.borrow_mut().take() {
376                        let target = event.target().unwrap();
377                        let request: web_sys::IdbOpenDbRequest = target.unchecked_into();
378                        let result = request.result().unwrap();
379                        let _ = tx.send(Ok(result));
380                    }
381                }) as Box<dyn FnMut(_)>);
382
383            let error_tx = tx.clone();
384            let error_callback =
385                wasm_bindgen::closure::Closure::wrap(Box::new(move |event: web_sys::Event| {
386                    if let Some(tx) = error_tx.borrow_mut().take() {
387                        let _ = tx.send(Err(format!("IndexedDB open failed: {:?}", event)));
388                    }
389                }) as Box<dyn FnMut(_)>);
390
391            open_req.set_onsuccess(Some(success_callback.as_ref().unchecked_ref()));
392            open_req.set_onerror(Some(error_callback.as_ref().unchecked_ref()));
393
394            let db_result = rx.await;
395
396            // Keep closures alive
397            success_callback.forget();
398            error_callback.forget();
399
400            match db_result {
401                Ok(Ok(db_value)) => {
402                    if let Ok(db) = db_value.dyn_into::<web_sys::IdbDatabase>() {
403                        // Start transaction for both blocks and metadata
404                        let store_names = js_sys::Array::new();
405                        store_names.push(&wasm_bindgen::JsValue::from_str("blocks"));
406                        store_names.push(&wasm_bindgen::JsValue::from_str("metadata"));
407
408                        let transaction = db
409                            .transaction_with_str_sequence_and_mode(
410                                &store_names,
411                                web_sys::IdbTransactionMode::Readwrite,
412                            )
413                            .unwrap();
414
415                        let blocks_store = transaction.object_store("blocks").unwrap();
416                        let metadata_store = transaction.object_store("metadata").unwrap();
417
418                        // Persist all blocks
419                        for (block_id, data) in &to_persist {
420                            let key = wasm_bindgen::JsValue::from_str(&format!(
421                                "{}_{}",
422                                db_name, block_id
423                            ));
424                            let value = js_sys::Uint8Array::from(&data[..]);
425                            blocks_store.put_with_key(&value, &key).unwrap();
426                        }
427
428                        // Persist commit marker
429                        let commit_key =
430                            wasm_bindgen::JsValue::from_str(&format!("{}_commit_marker", db_name));
431                        let commit_value = wasm_bindgen::JsValue::from_f64(next_commit as f64);
432                        metadata_store
433                            .put_with_key(&commit_value, &commit_key)
434                            .unwrap();
435
436                        // Use event-based approach for transaction completion
437                        let (tx_tx, tx_rx) = futures::channel::oneshot::channel();
438                        let tx_tx = std::rc::Rc::new(std::cell::RefCell::new(Some(tx_tx)));
439
440                        let tx_complete_tx = tx_tx.clone();
441                        let tx_complete_callback = wasm_bindgen::closure::Closure::wrap(Box::new(
442                            move |_event: web_sys::Event| {
443                                if let Some(tx) = tx_complete_tx.borrow_mut().take() {
444                                    let _ = tx.send(Ok(()));
445                                }
446                            },
447                        )
448                            as Box<dyn FnMut(_)>);
449
450                        let tx_error_tx = tx_tx.clone();
451                        let tx_error_callback = wasm_bindgen::closure::Closure::wrap(Box::new(
452                            move |event: web_sys::Event| {
453                                if let Some(tx) = tx_error_tx.borrow_mut().take() {
454                                    let _ =
455                                        tx.send(Err(format!("Transaction failed: {:?}", event)));
456                                }
457                            },
458                        )
459                            as Box<dyn FnMut(_)>);
460
461                        transaction
462                            .set_oncomplete(Some(tx_complete_callback.as_ref().unchecked_ref()));
463                        transaction.set_onerror(Some(tx_error_callback.as_ref().unchecked_ref()));
464
465                        let _ = tx_rx.await;
466
467                        // Keep closures alive
468                        tx_complete_callback.forget();
469                        tx_error_callback.forget();
470                    }
471                }
472                _ => {} // Silently ignore errors in background persistence
473            }
474        });
475        // Clear dirty blocks after successful persistence
476        {
477            let mut dirty = lock_mutex!(storage.dirty_blocks);
478            dirty.clear();
479        }
480
481        // Record sync success for observability (WASM)
482        // For WASM, we don't have precise timing, so use a default duration
483        storage.observability.record_sync_success(1, current_dirty);
484
485        // Invoke WASM sync success callback if set
486        #[cfg(target_arch = "wasm32")]
487        if let Some(ref callback) = storage.observability.wasm_sync_success_callback {
488            callback(1, current_dirty);
489        }
490
491        storage.evict_if_needed();
492        Ok(())
493    }
494}