absurder_sql/storage/
sync_operations.rs

1//! Sync operations for BlockStorage
2//! This module contains the core sync implementation logic
3
4use crate::types::DatabaseError;
5use super::block_storage::BlockStorage;
6
7#[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
8use std::collections::HashMap;
9#[cfg(all(not(target_arch = "wasm32"), not(feature = "fs_persist")))]
10use std::sync::atomic::Ordering;
11
12#[cfg(any(target_arch = "wasm32", all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist"))))]
13use super::vfs_sync;
14#[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
15use super::metadata::BlockMetadataPersist;
16
17#[cfg(target_arch = "wasm32")]
18use std::collections::HashMap;
19#[cfg(target_arch = "wasm32")]
20use super::metadata::BlockMetadataPersist;
21
22#[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
23use super::block_storage::GLOBAL_METADATA_TEST;
24
25/// Internal sync implementation shared by sync() and sync_now()
26pub fn sync_implementation_impl(storage: &mut BlockStorage) -> Result<(), DatabaseError> {
27        #[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
28        let start = std::time::Instant::now();
29        
30        // Record sync start for observability
31        let dirty_count = storage.dirty_blocks.lock().len();
32        let dirty_bytes = dirty_count * super::block_storage::BLOCK_SIZE;
33        storage.observability.record_sync_start(dirty_count, dirty_bytes);
34        
35        // Invoke sync start callback if set
36        #[cfg(not(target_arch = "wasm32"))]
37        if let Some(ref callback) = storage.observability.sync_start_callback {
38            callback(dirty_count, dirty_bytes);
39        }
40        
41        // Early return for native release builds without fs_persist
42        #[cfg(all(not(target_arch = "wasm32"), not(any(test, debug_assertions)), not(feature = "fs_persist")))]
43        {
44            // In release mode without fs_persist, just clear dirty blocks
45            storage.dirty_blocks.lock().clear();
46            storage.sync_count.fetch_add(1, Ordering::SeqCst);
47            return Ok(());
48        }
49        
50        // Call the existing fs_persist implementation for native builds
51        #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
52        {
53            return storage.fs_persist_sync();
54        }
55        
56        // For native non-fs_persist builds (test/debug only), use simple in-memory sync with commit marker handling
57        #[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
58        {
59            let current_dirty = storage.dirty_blocks.lock().len();
60            log::info!("Syncing {} dirty blocks (native non-fs_persist)", current_dirty);
61            
62            let to_persist: Vec<(u64, Vec<u8>)> = {
63                let dirty = storage.dirty_blocks.lock();
64                dirty.iter().map(|(k,v)| (*k, v.clone())).collect()
65            };
66            let ids: Vec<u64> = to_persist.iter().map(|(k, _)| *k).collect();
67            let blocks_synced = ids.len(); // Capture length before moving ids
68            
69            // Determine next commit version for native test path
70            let next_commit: u64 = vfs_sync::with_global_commit_marker(|cm| {
71                let cm = cm.borrow();
72                let current = cm.get(&storage.db_name).copied().unwrap_or(0);
73                current + 1
74            });
75            
76            // Store blocks in global test storage with versioning
77            vfs_sync::with_global_storage(|gs| {
78                let mut storage_map = gs.borrow_mut();
79                let db_storage = storage_map.entry(storage.db_name.clone()).or_insert_with(HashMap::new);
80                for (block_id, data) in to_persist {
81                    db_storage.insert(block_id, data);
82                }
83            });
84            
85            // Store metadata with per-commit versioning
86            GLOBAL_METADATA_TEST.with(|meta| {
87                let mut meta_map = meta.borrow_mut();
88                let db_meta = meta_map.entry(storage.db_name.clone()).or_insert_with(HashMap::new);
89                for block_id in ids {
90                    if let Some(checksum) = storage.checksum_manager.get_checksum(block_id) {
91                        // Use the per-commit version so entries remain invisible until the commit marker advances
92                        let version = next_commit as u32;
93                        db_meta.insert(
94                            block_id,
95                            BlockMetadataPersist {
96                                checksum,
97                                last_modified_ms: std::time::SystemTime::now()
98                                    .duration_since(std::time::UNIX_EPOCH)
99                                    .unwrap_or_default()
100                                    .as_millis() as u64,
101                                version,
102                                algo: storage.checksum_manager.get_algorithm(block_id),
103                            },
104                        );
105                    }
106                }
107            });
108            
109            // Atomically advance the commit marker after all data and metadata are persisted
110            vfs_sync::with_global_commit_marker(|cm| {
111                let mut cm_map = cm.borrow_mut();
112                cm_map.insert(storage.db_name.clone(), next_commit);
113            });
114            
115            // Clear dirty blocks
116            {
117                let mut dirty = storage.dirty_blocks.lock();
118                dirty.clear();
119            }
120            
121            // Update sync metrics
122            storage.sync_count.fetch_add(1, Ordering::SeqCst);
123            let elapsed = start.elapsed();
124            let ms = elapsed.as_millis() as u64;
125            let ms = if ms == 0 { 1 } else { ms };
126            storage.last_sync_duration_ms.store(ms, Ordering::SeqCst);
127            
128            // Record sync success for observability
129            storage.observability.record_sync_success(ms, blocks_synced);
130            
131            // Invoke sync success callback if set
132            if let Some(ref callback) = storage.observability.sync_success_callback {
133                callback(ms, blocks_synced);
134            }
135            
136            storage.evict_if_needed();
137            return Ok(());
138        }
139        
140        #[cfg(target_arch = "wasm32")]
141        {
142            // WASM implementation
143            let current_dirty = storage.dirty_blocks.lock().len();
144            log::info!("Syncing {} dirty blocks (WASM)", current_dirty);
145            
146            // For WASM, persist dirty blocks to global storage
147            let to_persist: Vec<(u64, Vec<u8>)> = {
148                let dirty = storage.dirty_blocks.lock();
149                dirty.iter().map(|(k,v)| (*k, v.clone())).collect()
150            };
151            let ids: Vec<u64> = to_persist.iter().map(|(k, _)| *k).collect();
152            // Determine next commit version so that all metadata written in this sync share the same version
153            let next_commit: u64 = vfs_sync::with_global_commit_marker(|cm| {
154                let cm = cm.borrow();
155                let current = cm.get(&storage.db_name).copied().unwrap_or(0);
156                current + 1
157            });
158            vfs_sync::with_global_storage(|gs| {
159                let mut storage_map = gs.borrow_mut();
160                let db_storage = storage_map.entry(storage.db_name.clone()).or_insert_with(HashMap::new);
161                for (block_id, data) in &to_persist {
162                    // Check if block already exists in global storage with committed data
163                    let should_update = if let Some(existing) = db_storage.get(block_id) {
164                        if existing != data {
165                            // Check if existing data has committed metadata (version > 0)
166                            let has_committed_metadata = vfs_sync::with_global_metadata(|meta| {
167                                let meta_map = meta.borrow();
168                                if let Some(db_meta) = meta_map.get(&storage.db_name) {
169                                    if let Some(metadata) = db_meta.get(block_id) {
170                                        metadata.version > 0
171                                    } else {
172                                        false
173                                    }
174                                } else {
175                                    false
176                                }
177                            });
178                            
179                            if has_committed_metadata {
180                                // CRITICAL FIX: Never overwrite committed data to prevent corruption
181                                false // Never overwrite committed data
182                            } else {
183                                true // Update uncommitted data
184                            }
185                        } else {
186                            true // Same data, safe to update
187                        }
188                    } else {
189                        true // No existing data, safe to insert
190                    };
191                    
192                    if should_update {
193                        db_storage.insert(*block_id, data.clone());
194                    }
195                }
196            });
197            // Persist corresponding metadata entries
198            vfs_sync::with_global_metadata(|meta| {
199                let mut meta_map = meta.borrow_mut();
200                let db_meta = meta_map.entry(storage.db_name.clone()).or_insert_with(HashMap::new);
201                for block_id in ids {
202                    if let Some(checksum) = storage.checksum_manager.get_checksum(block_id) {
203                        // Use the per-commit version so entries remain invisible until the commit marker advances
204                        let version = next_commit as u32;
205                        db_meta.insert(
206                            block_id,
207                            BlockMetadataPersist {
208                                checksum,
209                                last_modified_ms: BlockStorage::now_millis(),
210                                version,
211                                algo: storage.checksum_manager.get_algorithm(block_id),
212                            },
213                        );
214                    }
215                }
216            });
217            // Atomically advance the commit marker after all data and metadata are persisted
218            vfs_sync::with_global_commit_marker(|cm| {
219                let mut cm_map = cm.borrow_mut();
220                cm_map.insert(storage.db_name.clone(), next_commit);
221            });
222            
223            // Spawn async IndexedDB persistence (fire and forget for sync compatibility)
224            let db_name = storage.db_name.clone();
225            wasm_bindgen_futures::spawn_local(async move {
226                use wasm_bindgen::JsCast;
227                
228                // Get IndexedDB factory (works in both Window and Worker contexts)
229                let global = js_sys::global();
230                let indexed_db_value = match js_sys::Reflect::get(&global, &wasm_bindgen::JsValue::from_str("indexedDB")) {
231                    Ok(val) => val,
232                    Err(_) => {
233                        log::error!("IndexedDB property access failed - cannot persist");
234                        return;
235                    }
236                };
237                
238                if indexed_db_value.is_null() || indexed_db_value.is_undefined() {
239                    log::warn!("IndexedDB unavailable for sync (private browsing?) - data not persisted to IndexedDB");
240                    return;
241                }
242                
243                let idb_factory = match indexed_db_value.dyn_into::<web_sys::IdbFactory>() {
244                    Ok(factory) => factory,
245                    Err(_) => {
246                        log::error!("IndexedDB property is not an IdbFactory - cannot persist");
247                        return;
248                    }
249                };
250                
251                let open_req = match idb_factory.open_with_u32("block_storage", 2) {
252                    Ok(req) => req,
253                    Err(e) => {
254                        log::error!("Failed to open IndexedDB for sync: {:?}", e);
255                        return;
256                    }
257                };
258                
259                // Set up upgrade handler to create object stores if needed
260                let upgrade_handler = js_sys::Function::new_no_args(&format!(
261                    "
262                    const db = event.target.result;
263                    if (!db.objectStoreNames.contains('blocks')) {{
264                        db.createObjectStore('blocks');
265                    }}
266                    if (!db.objectStoreNames.contains('metadata')) {{
267                        db.createObjectStore('metadata');
268                    }}
269                    "
270                ));
271                open_req.set_onupgradeneeded(Some(&upgrade_handler));
272                
273                // Use event-based approach for opening database
274                let (tx, rx) = futures::channel::oneshot::channel();
275                let tx = std::rc::Rc::new(std::cell::RefCell::new(Some(tx)));
276                
277                let success_tx = tx.clone();
278                let success_callback = wasm_bindgen::closure::Closure::wrap(Box::new(move |event: web_sys::Event| {
279                    if let Some(tx) = success_tx.borrow_mut().take() {
280                        let target = event.target().unwrap();
281                        let request: web_sys::IdbOpenDbRequest = target.unchecked_into();
282                        let result = request.result().unwrap();
283                        let _ = tx.send(Ok(result));
284                    }
285                }) as Box<dyn FnMut(_)>);
286                
287                let error_tx = tx.clone();
288                let error_callback = wasm_bindgen::closure::Closure::wrap(Box::new(move |event: web_sys::Event| {
289                    if let Some(tx) = error_tx.borrow_mut().take() {
290                        let _ = tx.send(Err(format!("IndexedDB open failed: {:?}", event)));
291                    }
292                }) as Box<dyn FnMut(_)>);
293                
294                open_req.set_onsuccess(Some(success_callback.as_ref().unchecked_ref()));
295                open_req.set_onerror(Some(error_callback.as_ref().unchecked_ref()));
296                
297                let db_result = rx.await;
298                
299                // Keep closures alive
300                success_callback.forget();
301                error_callback.forget();
302                
303                match db_result {
304                    Ok(Ok(db_value)) => {
305                        if let Ok(db) = db_value.dyn_into::<web_sys::IdbDatabase>() {
306                            // Start transaction for both blocks and metadata
307                            let store_names = js_sys::Array::new();
308                            store_names.push(&wasm_bindgen::JsValue::from_str("blocks"));
309                            store_names.push(&wasm_bindgen::JsValue::from_str("metadata"));
310                            
311                            let transaction = db.transaction_with_str_sequence_and_mode(
312                                &store_names,
313                                web_sys::IdbTransactionMode::Readwrite
314                            ).unwrap();
315                            
316                            let blocks_store = transaction.object_store("blocks").unwrap();
317                            let metadata_store = transaction.object_store("metadata").unwrap();
318                            
319                            // Persist all blocks
320                            for (block_id, data) in &to_persist {
321                                let key = wasm_bindgen::JsValue::from_str(&format!("{}_{}", db_name, block_id));
322                                let value = js_sys::Uint8Array::from(&data[..]);
323                                blocks_store.put_with_key(&value, &key).unwrap();
324                            }
325                            
326                            // Persist commit marker
327                            let commit_key = wasm_bindgen::JsValue::from_str(&format!("{}_commit_marker", db_name));
328                            let commit_value = wasm_bindgen::JsValue::from_f64(next_commit as f64);
329                            metadata_store.put_with_key(&commit_value, &commit_key).unwrap();
330                            
331                            // Use event-based approach for transaction completion
332                            let (tx_tx, tx_rx) = futures::channel::oneshot::channel();
333                            let tx_tx = std::rc::Rc::new(std::cell::RefCell::new(Some(tx_tx)));
334                            
335                            let tx_complete_tx = tx_tx.clone();
336                            let tx_complete_callback = wasm_bindgen::closure::Closure::wrap(Box::new(move |_event: web_sys::Event| {
337                                if let Some(tx) = tx_complete_tx.borrow_mut().take() {
338                                    let _ = tx.send(Ok(()));
339                                }
340                            }) as Box<dyn FnMut(_)>);
341                            
342                            let tx_error_tx = tx_tx.clone();
343                            let tx_error_callback = wasm_bindgen::closure::Closure::wrap(Box::new(move |event: web_sys::Event| {
344                                if let Some(tx) = tx_error_tx.borrow_mut().take() {
345                                    let _ = tx.send(Err(format!("Transaction failed: {:?}", event)));
346                                }
347                            }) as Box<dyn FnMut(_)>);
348                            
349                            transaction.set_oncomplete(Some(tx_complete_callback.as_ref().unchecked_ref()));
350                            transaction.set_onerror(Some(tx_error_callback.as_ref().unchecked_ref()));
351                            
352                            let _ = tx_rx.await;
353                            
354                            // Keep closures alive
355                            tx_complete_callback.forget();
356                            tx_error_callback.forget();
357                        }
358                    }
359                    _ => {}
360                    // Silently ignore errors in background persistence
361                }
362            });
363            // Clear dirty blocks after successful persistence
364            {
365                let mut dirty = storage.dirty_blocks.lock();
366                dirty.clear();
367            }
368            
369            // Record sync success for observability (WASM)
370            // For WASM, we don't have precise timing, so use a default duration
371            storage.observability.record_sync_success(1, current_dirty);
372            
373            // Invoke WASM sync success callback if set
374            #[cfg(target_arch = "wasm32")]
375            if let Some(ref callback) = storage.observability.wasm_sync_success_callback {
376                callback(1, current_dirty);
377            }
378            
379            storage.evict_if_needed();
380            Ok(())
381        }
382    }