absurder_sql/storage/
sync_operations.rs1use crate::types::DatabaseError;
5use super::block_storage::BlockStorage;
6
7#[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
8use std::collections::HashMap;
9#[cfg(all(not(target_arch = "wasm32"), not(feature = "fs_persist")))]
10use std::sync::atomic::Ordering;
11
12#[cfg(any(target_arch = "wasm32", all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist"))))]
13use super::vfs_sync;
14#[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
15use super::metadata::BlockMetadataPersist;
16
17#[cfg(target_arch = "wasm32")]
18use std::collections::HashMap;
19#[cfg(target_arch = "wasm32")]
20use super::metadata::BlockMetadataPersist;
21
22#[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
23use super::block_storage::GLOBAL_METADATA_TEST;
24
25pub fn sync_implementation_impl(storage: &mut BlockStorage) -> Result<(), DatabaseError> {
27 #[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
28 let start = std::time::Instant::now();
29
30 let dirty_count = storage.dirty_blocks.lock().len();
32 let dirty_bytes = dirty_count * super::block_storage::BLOCK_SIZE;
33 storage.observability.record_sync_start(dirty_count, dirty_bytes);
34
35 #[cfg(not(target_arch = "wasm32"))]
37 if let Some(ref callback) = storage.observability.sync_start_callback {
38 callback(dirty_count, dirty_bytes);
39 }
40
41 #[cfg(all(not(target_arch = "wasm32"), not(any(test, debug_assertions)), not(feature = "fs_persist")))]
43 {
44 storage.dirty_blocks.lock().clear();
46 storage.sync_count.fetch_add(1, Ordering::SeqCst);
47 return Ok(());
48 }
49
50 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
52 {
53 return storage.fs_persist_sync();
54 }
55
56 #[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
58 {
59 let current_dirty = storage.dirty_blocks.lock().len();
60 log::info!("Syncing {} dirty blocks (native non-fs_persist)", current_dirty);
61
62 let to_persist: Vec<(u64, Vec<u8>)> = {
63 let dirty = storage.dirty_blocks.lock();
64 dirty.iter().map(|(k,v)| (*k, v.clone())).collect()
65 };
66 let ids: Vec<u64> = to_persist.iter().map(|(k, _)| *k).collect();
67 let blocks_synced = ids.len(); let next_commit: u64 = vfs_sync::with_global_commit_marker(|cm| {
71 let cm = cm.borrow();
72 let current = cm.get(&storage.db_name).copied().unwrap_or(0);
73 current + 1
74 });
75
76 vfs_sync::with_global_storage(|gs| {
78 let mut storage_map = gs.borrow_mut();
79 let db_storage = storage_map.entry(storage.db_name.clone()).or_insert_with(HashMap::new);
80 for (block_id, data) in to_persist {
81 db_storage.insert(block_id, data);
82 }
83 });
84
85 GLOBAL_METADATA_TEST.with(|meta| {
87 let mut meta_map = meta.borrow_mut();
88 let db_meta = meta_map.entry(storage.db_name.clone()).or_insert_with(HashMap::new);
89 for block_id in ids {
90 if let Some(checksum) = storage.checksum_manager.get_checksum(block_id) {
91 let version = next_commit as u32;
93 db_meta.insert(
94 block_id,
95 BlockMetadataPersist {
96 checksum,
97 last_modified_ms: std::time::SystemTime::now()
98 .duration_since(std::time::UNIX_EPOCH)
99 .unwrap_or_default()
100 .as_millis() as u64,
101 version,
102 algo: storage.checksum_manager.get_algorithm(block_id),
103 },
104 );
105 }
106 }
107 });
108
109 vfs_sync::with_global_commit_marker(|cm| {
111 let mut cm_map = cm.borrow_mut();
112 cm_map.insert(storage.db_name.clone(), next_commit);
113 });
114
115 {
117 let mut dirty = storage.dirty_blocks.lock();
118 dirty.clear();
119 }
120
121 storage.sync_count.fetch_add(1, Ordering::SeqCst);
123 let elapsed = start.elapsed();
124 let ms = elapsed.as_millis() as u64;
125 let ms = if ms == 0 { 1 } else { ms };
126 storage.last_sync_duration_ms.store(ms, Ordering::SeqCst);
127
128 storage.observability.record_sync_success(ms, blocks_synced);
130
131 if let Some(ref callback) = storage.observability.sync_success_callback {
133 callback(ms, blocks_synced);
134 }
135
136 storage.evict_if_needed();
137 return Ok(());
138 }
139
140 #[cfg(target_arch = "wasm32")]
141 {
142 let current_dirty = storage.dirty_blocks.lock().len();
144 log::info!("Syncing {} dirty blocks (WASM)", current_dirty);
145
146 let to_persist: Vec<(u64, Vec<u8>)> = {
148 let dirty = storage.dirty_blocks.lock();
149 dirty.iter().map(|(k,v)| (*k, v.clone())).collect()
150 };
151 let ids: Vec<u64> = to_persist.iter().map(|(k, _)| *k).collect();
152 let next_commit: u64 = vfs_sync::with_global_commit_marker(|cm| {
154 let cm = cm.borrow();
155 let current = cm.get(&storage.db_name).copied().unwrap_or(0);
156 current + 1
157 });
158 vfs_sync::with_global_storage(|gs| {
159 let mut storage_map = gs.borrow_mut();
160 let db_storage = storage_map.entry(storage.db_name.clone()).or_insert_with(HashMap::new);
161 for (block_id, data) in &to_persist {
162 let should_update = if let Some(existing) = db_storage.get(block_id) {
164 if existing != data {
165 let has_committed_metadata = vfs_sync::with_global_metadata(|meta| {
167 let meta_map = meta.borrow();
168 if let Some(db_meta) = meta_map.get(&storage.db_name) {
169 if let Some(metadata) = db_meta.get(block_id) {
170 metadata.version > 0
171 } else {
172 false
173 }
174 } else {
175 false
176 }
177 });
178
179 if has_committed_metadata {
180 false } else {
183 true }
185 } else {
186 true }
188 } else {
189 true };
191
192 if should_update {
193 db_storage.insert(*block_id, data.clone());
194 }
195 }
196 });
197 vfs_sync::with_global_metadata(|meta| {
199 let mut meta_map = meta.borrow_mut();
200 let db_meta = meta_map.entry(storage.db_name.clone()).or_insert_with(HashMap::new);
201 for block_id in ids {
202 if let Some(checksum) = storage.checksum_manager.get_checksum(block_id) {
203 let version = next_commit as u32;
205 db_meta.insert(
206 block_id,
207 BlockMetadataPersist {
208 checksum,
209 last_modified_ms: BlockStorage::now_millis(),
210 version,
211 algo: storage.checksum_manager.get_algorithm(block_id),
212 },
213 );
214 }
215 }
216 });
217 vfs_sync::with_global_commit_marker(|cm| {
219 let mut cm_map = cm.borrow_mut();
220 cm_map.insert(storage.db_name.clone(), next_commit);
221 });
222
223 let db_name = storage.db_name.clone();
225 wasm_bindgen_futures::spawn_local(async move {
226 use wasm_bindgen::JsCast;
227
228 let global = js_sys::global();
230 let indexed_db_value = match js_sys::Reflect::get(&global, &wasm_bindgen::JsValue::from_str("indexedDB")) {
231 Ok(val) => val,
232 Err(_) => {
233 log::error!("IndexedDB property access failed - cannot persist");
234 return;
235 }
236 };
237
238 if indexed_db_value.is_null() || indexed_db_value.is_undefined() {
239 log::warn!("IndexedDB unavailable for sync (private browsing?) - data not persisted to IndexedDB");
240 return;
241 }
242
243 let idb_factory = match indexed_db_value.dyn_into::<web_sys::IdbFactory>() {
244 Ok(factory) => factory,
245 Err(_) => {
246 log::error!("IndexedDB property is not an IdbFactory - cannot persist");
247 return;
248 }
249 };
250
251 let open_req = match idb_factory.open_with_u32("block_storage", 2) {
252 Ok(req) => req,
253 Err(e) => {
254 log::error!("Failed to open IndexedDB for sync: {:?}", e);
255 return;
256 }
257 };
258
259 let upgrade_handler = js_sys::Function::new_no_args(&format!(
261 "
262 const db = event.target.result;
263 if (!db.objectStoreNames.contains('blocks')) {{
264 db.createObjectStore('blocks');
265 }}
266 if (!db.objectStoreNames.contains('metadata')) {{
267 db.createObjectStore('metadata');
268 }}
269 "
270 ));
271 open_req.set_onupgradeneeded(Some(&upgrade_handler));
272
273 let (tx, rx) = futures::channel::oneshot::channel();
275 let tx = std::rc::Rc::new(std::cell::RefCell::new(Some(tx)));
276
277 let success_tx = tx.clone();
278 let success_callback = wasm_bindgen::closure::Closure::wrap(Box::new(move |event: web_sys::Event| {
279 if let Some(tx) = success_tx.borrow_mut().take() {
280 let target = event.target().unwrap();
281 let request: web_sys::IdbOpenDbRequest = target.unchecked_into();
282 let result = request.result().unwrap();
283 let _ = tx.send(Ok(result));
284 }
285 }) as Box<dyn FnMut(_)>);
286
287 let error_tx = tx.clone();
288 let error_callback = wasm_bindgen::closure::Closure::wrap(Box::new(move |event: web_sys::Event| {
289 if let Some(tx) = error_tx.borrow_mut().take() {
290 let _ = tx.send(Err(format!("IndexedDB open failed: {:?}", event)));
291 }
292 }) as Box<dyn FnMut(_)>);
293
294 open_req.set_onsuccess(Some(success_callback.as_ref().unchecked_ref()));
295 open_req.set_onerror(Some(error_callback.as_ref().unchecked_ref()));
296
297 let db_result = rx.await;
298
299 success_callback.forget();
301 error_callback.forget();
302
303 match db_result {
304 Ok(Ok(db_value)) => {
305 if let Ok(db) = db_value.dyn_into::<web_sys::IdbDatabase>() {
306 let store_names = js_sys::Array::new();
308 store_names.push(&wasm_bindgen::JsValue::from_str("blocks"));
309 store_names.push(&wasm_bindgen::JsValue::from_str("metadata"));
310
311 let transaction = db.transaction_with_str_sequence_and_mode(
312 &store_names,
313 web_sys::IdbTransactionMode::Readwrite
314 ).unwrap();
315
316 let blocks_store = transaction.object_store("blocks").unwrap();
317 let metadata_store = transaction.object_store("metadata").unwrap();
318
319 for (block_id, data) in &to_persist {
321 let key = wasm_bindgen::JsValue::from_str(&format!("{}_{}", db_name, block_id));
322 let value = js_sys::Uint8Array::from(&data[..]);
323 blocks_store.put_with_key(&value, &key).unwrap();
324 }
325
326 let commit_key = wasm_bindgen::JsValue::from_str(&format!("{}_commit_marker", db_name));
328 let commit_value = wasm_bindgen::JsValue::from_f64(next_commit as f64);
329 metadata_store.put_with_key(&commit_value, &commit_key).unwrap();
330
331 let (tx_tx, tx_rx) = futures::channel::oneshot::channel();
333 let tx_tx = std::rc::Rc::new(std::cell::RefCell::new(Some(tx_tx)));
334
335 let tx_complete_tx = tx_tx.clone();
336 let tx_complete_callback = wasm_bindgen::closure::Closure::wrap(Box::new(move |_event: web_sys::Event| {
337 if let Some(tx) = tx_complete_tx.borrow_mut().take() {
338 let _ = tx.send(Ok(()));
339 }
340 }) as Box<dyn FnMut(_)>);
341
342 let tx_error_tx = tx_tx.clone();
343 let tx_error_callback = wasm_bindgen::closure::Closure::wrap(Box::new(move |event: web_sys::Event| {
344 if let Some(tx) = tx_error_tx.borrow_mut().take() {
345 let _ = tx.send(Err(format!("Transaction failed: {:?}", event)));
346 }
347 }) as Box<dyn FnMut(_)>);
348
349 transaction.set_oncomplete(Some(tx_complete_callback.as_ref().unchecked_ref()));
350 transaction.set_onerror(Some(tx_error_callback.as_ref().unchecked_ref()));
351
352 let _ = tx_rx.await;
353
354 tx_complete_callback.forget();
356 tx_error_callback.forget();
357 }
358 }
359 _ => {}
360 }
362 });
363 {
365 let mut dirty = storage.dirty_blocks.lock();
366 dirty.clear();
367 }
368
369 storage.observability.record_sync_success(1, current_dirty);
372
373 #[cfg(target_arch = "wasm32")]
375 if let Some(ref callback) = storage.observability.wasm_sync_success_callback {
376 callback(1, current_dirty);
377 }
378
379 storage.evict_if_needed();
380 Ok(())
381 }
382 }