absurder_sql/storage/
sync_operations.rs

1//! Sync operations for BlockStorage
2//! This module contains the core sync implementation logic
3
4use crate::types::DatabaseError;
5use super::block_storage::BlockStorage;
6
7#[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
8use std::collections::HashMap;
9#[cfg(all(not(target_arch = "wasm32"), not(feature = "fs_persist")))]
10use std::sync::atomic::Ordering;
11
12#[cfg(any(target_arch = "wasm32", all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist"))))]
13use super::vfs_sync;
14#[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
15use super::metadata::BlockMetadataPersist;
16
17#[cfg(target_arch = "wasm32")]
18use std::collections::HashMap;
19#[cfg(target_arch = "wasm32")]
20use super::metadata::BlockMetadataPersist;
21
22#[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
23use super::block_storage::GLOBAL_METADATA_TEST;
24
25/// Internal sync implementation shared by sync() and sync_now()
26pub fn sync_implementation_impl(storage: &mut BlockStorage) -> Result<(), DatabaseError> {
27        #[cfg(target_arch = "wasm32")]
28        use wasm_bindgen::JsCast;
29        #[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
30        let start = std::time::Instant::now();
31        
32        // Record sync start for observability
33        let dirty_count = storage.dirty_blocks.lock().len();
34        let dirty_bytes = dirty_count * super::block_storage::BLOCK_SIZE;
35        storage.observability.record_sync_start(dirty_count, dirty_bytes);
36        
37        // Invoke sync start callback if set
38        #[cfg(not(target_arch = "wasm32"))]
39        if let Some(ref callback) = storage.observability.sync_start_callback {
40            callback(dirty_count, dirty_bytes);
41        }
42        
43        // Early return for native release builds without fs_persist
44        #[cfg(all(not(target_arch = "wasm32"), not(any(test, debug_assertions)), not(feature = "fs_persist")))]
45        {
46            // In release mode without fs_persist, just clear dirty blocks
47            storage.dirty_blocks.lock().clear();
48            storage.sync_count.fetch_add(1, Ordering::SeqCst);
49            return Ok(());
50        }
51        
52        // Call the existing fs_persist implementation for native builds
53        #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
54        {
55            return storage.fs_persist_sync();
56        }
57        
58        // For native non-fs_persist builds (test/debug only), use simple in-memory sync with commit marker handling
59        #[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
60        {
61            let current_dirty = storage.dirty_blocks.lock().len();
62            log::info!("Syncing {} dirty blocks (native non-fs_persist)", current_dirty);
63            
64            let to_persist: Vec<(u64, Vec<u8>)> = {
65                let dirty = storage.dirty_blocks.lock();
66                dirty.iter().map(|(k,v)| (*k, v.clone())).collect()
67            };
68            let ids: Vec<u64> = to_persist.iter().map(|(k, _)| *k).collect();
69            let blocks_synced = ids.len(); // Capture length before moving ids
70            
71            // Determine next commit version for native test path
72            let next_commit: u64 = vfs_sync::with_global_commit_marker(|cm| {
73                let cm = cm.borrow();
74                let current = cm.get(&storage.db_name).copied().unwrap_or(0);
75                current + 1
76            });
77            
78            // Store blocks in global test storage with versioning
79            vfs_sync::with_global_storage(|gs| {
80                let mut storage_map = gs.borrow_mut();
81                let db_storage = storage_map.entry(storage.db_name.clone()).or_insert_with(HashMap::new);
82                for (block_id, data) in to_persist {
83                    db_storage.insert(block_id, data);
84                }
85            });
86            
87            // Store metadata with per-commit versioning
88            GLOBAL_METADATA_TEST.with(|meta| {
89                let mut meta_map = meta.borrow_mut();
90                let db_meta = meta_map.entry(storage.db_name.clone()).or_insert_with(HashMap::new);
91                for block_id in ids {
92                    if let Some(checksum) = storage.checksum_manager.get_checksum(block_id) {
93                        // Use the per-commit version so entries remain invisible until the commit marker advances
94                        let version = next_commit as u32;
95                        db_meta.insert(
96                            block_id,
97                            BlockMetadataPersist {
98                                checksum,
99                                last_modified_ms: std::time::SystemTime::now()
100                                    .duration_since(std::time::UNIX_EPOCH)
101                                    .unwrap_or_default()
102                                    .as_millis() as u64,
103                                version,
104                                algo: storage.checksum_manager.get_algorithm(block_id),
105                            },
106                        );
107                    }
108                }
109            });
110            
111            // Atomically advance the commit marker after all data and metadata are persisted
112            vfs_sync::with_global_commit_marker(|cm| {
113                let mut cm_map = cm.borrow_mut();
114                cm_map.insert(storage.db_name.clone(), next_commit);
115            });
116            
117            // Clear dirty blocks
118            {
119                let mut dirty = storage.dirty_blocks.lock();
120                dirty.clear();
121            }
122            
123            // Update sync metrics
124            storage.sync_count.fetch_add(1, Ordering::SeqCst);
125            let elapsed = start.elapsed();
126            let ms = elapsed.as_millis() as u64;
127            let ms = if ms == 0 { 1 } else { ms };
128            storage.last_sync_duration_ms.store(ms, Ordering::SeqCst);
129            
130            // Record sync success for observability
131            storage.observability.record_sync_success(ms, blocks_synced);
132            
133            // Invoke sync success callback if set
134            if let Some(ref callback) = storage.observability.sync_success_callback {
135                callback(ms, blocks_synced);
136            }
137            
138            storage.evict_if_needed();
139            return Ok(());
140        }
141        
142        #[cfg(target_arch = "wasm32")]
143        {
144            // WASM implementation
145            let current_dirty = storage.dirty_blocks.lock().len();
146            log::info!("Syncing {} dirty blocks (WASM)", current_dirty);
147            
148            // For WASM, persist dirty blocks to global storage
149            let to_persist: Vec<(u64, Vec<u8>)> = {
150                let dirty = storage.dirty_blocks.lock();
151                dirty.iter().map(|(k,v)| (*k, v.clone())).collect()
152            };
153            let ids: Vec<u64> = to_persist.iter().map(|(k, _)| *k).collect();
154            // Determine next commit version so that all metadata written in this sync share the same version
155            let next_commit: u64 = vfs_sync::with_global_commit_marker(|cm| {
156                let cm = cm.borrow();
157                let current = cm.get(&storage.db_name).copied().unwrap_or(0);
158                current + 1
159            });
160            vfs_sync::with_global_storage(|gs| {
161                let mut storage_map = gs.borrow_mut();
162                let db_storage = storage_map.entry(storage.db_name.clone()).or_insert_with(HashMap::new);
163                for (block_id, data) in &to_persist {
164                    // Check if block already exists in global storage with committed data
165                    let should_update = if let Some(existing) = db_storage.get(block_id) {
166                        if existing != data {
167                            // Check if existing data has committed metadata (version > 0)
168                            let has_committed_metadata = vfs_sync::with_global_metadata(|meta| {
169                                let meta_map = meta.borrow();
170                                if let Some(db_meta) = meta_map.get(&storage.db_name) {
171                                    if let Some(metadata) = db_meta.get(block_id) {
172                                        metadata.version > 0
173                                    } else {
174                                        false
175                                    }
176                                } else {
177                                    false
178                                }
179                            });
180                            
181                            if has_committed_metadata {
182                                // CRITICAL FIX: Never overwrite committed data to prevent corruption
183                                false // Never overwrite committed data
184                            } else {
185                                true // Update uncommitted data
186                            }
187                        } else {
188                            true // Same data, safe to update
189                        }
190                    } else {
191                        true // No existing data, safe to insert
192                    };
193                    
194                    if should_update {
195                        db_storage.insert(*block_id, data.clone());
196                    }
197                }
198            });
199            // Persist corresponding metadata entries
200            vfs_sync::with_global_metadata(|meta| {
201                let mut meta_map = meta.borrow_mut();
202                let db_meta = meta_map.entry(storage.db_name.clone()).or_insert_with(HashMap::new);
203                for block_id in ids {
204                    if let Some(checksum) = storage.checksum_manager.get_checksum(block_id) {
205                        // Use the per-commit version so entries remain invisible until the commit marker advances
206                        let version = next_commit as u32;
207                        db_meta.insert(
208                            block_id,
209                            BlockMetadataPersist {
210                                checksum,
211                                last_modified_ms: BlockStorage::now_millis(),
212                                version,
213                                algo: storage.checksum_manager.get_algorithm(block_id),
214                            },
215                        );
216                    }
217                }
218            });
219            // Atomically advance the commit marker after all data and metadata are persisted
220            vfs_sync::with_global_commit_marker(|cm| {
221                let mut cm_map = cm.borrow_mut();
222                cm_map.insert(storage.db_name.clone(), next_commit);
223            });
224            
225            // Spawn async IndexedDB persistence (fire and forget for sync compatibility)
226            let db_name = storage.db_name.clone();
227            wasm_bindgen_futures::spawn_local(async move {
228                let Some(window) = web_sys::window() else {
229                    log::error!("Window unavailable for IndexedDB sync - cannot persist");
230                    return;
231                };
232                let idb_factory = match window.indexed_db() {
233                    Ok(Some(factory)) => factory,
234                    Ok(None) => {
235                        log::warn!("IndexedDB unavailable for sync (private browsing?) - data not persisted to IndexedDB");
236                        return;
237                    },
238                    Err(_) => {
239                        log::error!("IndexedDB access denied for sync - data not persisted to IndexedDB");
240                        return;
241                    }
242                };
243                let open_req = match idb_factory.open_with_u32("block_storage", 2) {
244                    Ok(req) => req,
245                    Err(e) => {
246                        log::error!("Failed to open IndexedDB for sync: {:?}", e);
247                        return;
248                    }
249                };
250                
251                // Set up upgrade handler to create object stores if needed
252                let upgrade_handler = js_sys::Function::new_no_args(&format!(
253                    "
254                    const db = event.target.result;
255                    if (!db.objectStoreNames.contains('blocks')) {{
256                        db.createObjectStore('blocks');
257                    }}
258                    if (!db.objectStoreNames.contains('metadata')) {{
259                        db.createObjectStore('metadata');
260                    }}
261                    "
262                ));
263                open_req.set_onupgradeneeded(Some(&upgrade_handler));
264                
265                // Use event-based approach for opening database
266                let (tx, rx) = futures::channel::oneshot::channel();
267                let tx = std::rc::Rc::new(std::cell::RefCell::new(Some(tx)));
268                
269                let success_tx = tx.clone();
270                let success_callback = wasm_bindgen::closure::Closure::wrap(Box::new(move |event: web_sys::Event| {
271                    if let Some(tx) = success_tx.borrow_mut().take() {
272                        let target = event.target().unwrap();
273                        let request: web_sys::IdbOpenDbRequest = target.unchecked_into();
274                        let result = request.result().unwrap();
275                        let _ = tx.send(Ok(result));
276                    }
277                }) as Box<dyn FnMut(_)>);
278                
279                let error_tx = tx.clone();
280                let error_callback = wasm_bindgen::closure::Closure::wrap(Box::new(move |event: web_sys::Event| {
281                    if let Some(tx) = error_tx.borrow_mut().take() {
282                        let _ = tx.send(Err(format!("IndexedDB open failed: {:?}", event)));
283                    }
284                }) as Box<dyn FnMut(_)>);
285                
286                open_req.set_onsuccess(Some(success_callback.as_ref().unchecked_ref()));
287                open_req.set_onerror(Some(error_callback.as_ref().unchecked_ref()));
288                
289                let db_result = rx.await;
290                
291                // Keep closures alive
292                success_callback.forget();
293                error_callback.forget();
294                
295                match db_result {
296                    Ok(Ok(db_value)) => {
297                        if let Ok(db) = db_value.dyn_into::<web_sys::IdbDatabase>() {
298                            // Start transaction for both blocks and metadata
299                            let store_names = js_sys::Array::new();
300                            store_names.push(&wasm_bindgen::JsValue::from_str("blocks"));
301                            store_names.push(&wasm_bindgen::JsValue::from_str("metadata"));
302                            
303                            let transaction = db.transaction_with_str_sequence_and_mode(
304                                &store_names,
305                                web_sys::IdbTransactionMode::Readwrite
306                            ).unwrap();
307                            
308                            let blocks_store = transaction.object_store("blocks").unwrap();
309                            let metadata_store = transaction.object_store("metadata").unwrap();
310                            
311                            // Persist all blocks
312                            for (block_id, data) in &to_persist {
313                                let key = wasm_bindgen::JsValue::from_str(&format!("{}_{}", db_name, block_id));
314                                let value = js_sys::Uint8Array::from(&data[..]);
315                                blocks_store.put_with_key(&value, &key).unwrap();
316                            }
317                            
318                            // Persist commit marker
319                            let commit_key = wasm_bindgen::JsValue::from_str(&format!("{}_commit_marker", db_name));
320                            let commit_value = wasm_bindgen::JsValue::from_f64(next_commit as f64);
321                            metadata_store.put_with_key(&commit_value, &commit_key).unwrap();
322                            
323                            // Use event-based approach for transaction completion
324                            let (tx_tx, tx_rx) = futures::channel::oneshot::channel();
325                            let tx_tx = std::rc::Rc::new(std::cell::RefCell::new(Some(tx_tx)));
326                            
327                            let tx_complete_tx = tx_tx.clone();
328                            let tx_complete_callback = wasm_bindgen::closure::Closure::wrap(Box::new(move |_event: web_sys::Event| {
329                                if let Some(tx) = tx_complete_tx.borrow_mut().take() {
330                                    let _ = tx.send(Ok(()));
331                                }
332                            }) as Box<dyn FnMut(_)>);
333                            
334                            let tx_error_tx = tx_tx.clone();
335                            let tx_error_callback = wasm_bindgen::closure::Closure::wrap(Box::new(move |event: web_sys::Event| {
336                                if let Some(tx) = tx_error_tx.borrow_mut().take() {
337                                    let _ = tx.send(Err(format!("Transaction failed: {:?}", event)));
338                                }
339                            }) as Box<dyn FnMut(_)>);
340                            
341                            transaction.set_oncomplete(Some(tx_complete_callback.as_ref().unchecked_ref()));
342                            transaction.set_onerror(Some(tx_error_callback.as_ref().unchecked_ref()));
343                            
344                            let _ = tx_rx.await;
345                            
346                            // Keep closures alive
347                            tx_complete_callback.forget();
348                            tx_error_callback.forget();
349                        }
350                    }
351                    _ => {}
352                    // Silently ignore errors in background persistence
353                }
354            });
355            // Clear dirty blocks after successful persistence
356            {
357                let mut dirty = storage.dirty_blocks.lock();
358                dirty.clear();
359            }
360            
361            // Record sync success for observability (WASM)
362            // For WASM, we don't have precise timing, so use a default duration
363            storage.observability.record_sync_success(1, current_dirty);
364            
365            // Invoke WASM sync success callback if set
366            #[cfg(target_arch = "wasm32")]
367            if let Some(ref callback) = storage.observability.wasm_sync_success_callback {
368                callback(1, current_dirty);
369            }
370            
371            storage.evict_if_needed();
372            Ok(())
373        }
374    }