absurder_sql/storage/
fs_persist.rs

1// Reentrancy-safe lock macros
2#[allow(unused_macros)]
3#[cfg(target_arch = "wasm32")]
4macro_rules! lock_mutex {
5    ($mutex:expr) => {
6        $mutex
7            .try_borrow_mut()
8            .expect("RefCell borrow failed - reentrancy detected in fs_persist.rs")
9    };
10}
11
12#[allow(unused_macros)]
13#[cfg(not(target_arch = "wasm32"))]
14macro_rules! lock_mutex {
15    ($mutex:expr) => {
16        $mutex.lock()
17    };
18}
19
20#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
21use super::metadata::{BlockMetadataPersist, ChecksumAlgorithm};
22#[cfg(any(
23    target_arch = "wasm32",
24    all(
25        not(target_arch = "wasm32"),
26        any(test, debug_assertions),
27        not(feature = "fs_persist")
28    )
29))]
30#[allow(unused_imports)]
31use super::vfs_sync;
32#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
33use crate::types::DatabaseError;
34#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
35use std::collections::HashMap;
36#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
37use std::sync::atomic::Ordering;
38#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
39use std::time::Instant;
40#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
41use std::{
42    env, fs,
43    io::{Read, Write},
44    path::PathBuf,
45};
46
47// On-disk JSON schema for fs_persist
48#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
49#[derive(serde::Serialize, serde::Deserialize, Default)]
50#[allow(dead_code)]
51pub(super) struct FsMeta {
52    pub entries: Vec<(u64, BlockMetadataPersist)>,
53}
54
55#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
56#[derive(serde::Serialize, serde::Deserialize, Default)]
57#[allow(dead_code)]
58pub(super) struct FsAlloc {
59    pub allocated: Vec<u64>,
60}
61
62#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
63#[derive(serde::Serialize, serde::Deserialize, Default)]
64#[allow(dead_code)]
65pub(super) struct FsDealloc {
66    pub tombstones: Vec<u64>,
67}
68
69impl super::BlockStorage {
70    /// Native fs_persist sync implementation
71    #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
72    pub(super) fn fs_persist_sync(&mut self) -> Result<(), DatabaseError> {
73        // Record sync start for observability
74        let dirty_count = lock_mutex!(self.dirty_blocks).len();
75        let dirty_bytes = dirty_count * super::BLOCK_SIZE;
76        self.observability
77            .record_sync_start(dirty_count, dirty_bytes);
78
79        // Invoke sync start callback if set
80        if let Some(ref callback) = self.observability.sync_start_callback {
81            callback(dirty_count, dirty_bytes);
82        }
83
84        // In fs_persist mode, proactively ensure directory structure and empty metadata.json exists
85        // even if there are no dirty blocks, to satisfy filesystem expectations in tests.
86        #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
87        {
88            let base: PathBuf = self.base_dir.clone();
89            let mut db_dir = base.clone();
90            db_dir.push(&self.db_name);
91            let mut blocks_dir = db_dir.clone();
92            blocks_dir.push("blocks");
93            let _ = fs::create_dir_all(&blocks_dir);
94            let mut meta_path = db_dir.clone();
95            meta_path.push("metadata.json");
96            if fs::metadata(&meta_path).is_err() {
97                if let Ok(mut f) = fs::File::create(&meta_path) {
98                    let _ = f.write_all(br#"{"entries":[]}"#);
99                }
100            }
101        }
102
103        if self.get_dirty_blocks().lock().is_empty() {
104            log::debug!("No dirty blocks to sync");
105            #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
106            {
107                // Cleanup-only sync: reconcile on-disk state with current allocations
108                let base: PathBuf = self.base_dir.clone();
109                let mut db_dir = base.clone();
110                db_dir.push(&self.db_name);
111                let mut blocks_dir = db_dir.clone();
112                blocks_dir.push("blocks");
113                // Load metadata.json (do not prune based on allocated; retain all persisted entries),
114                // normalize invalid/missing algo values to the current default
115                let mut meta_path = db_dir.clone();
116                meta_path.push("metadata.json");
117                let mut meta_val: serde_json::Value = serde_json::json!({"entries": []});
118                if let Ok(mut f) = fs::File::open(&meta_path) {
119                    let mut s = String::new();
120                    if f.read_to_string(&mut s).is_ok() {
121                        if let Ok(v) = serde_json::from_str::<serde_json::Value>(&s) {
122                            meta_val = v;
123                        }
124                    }
125                }
126                // Ensure structure exists
127                if !meta_val.is_object() {
128                    meta_val = serde_json::json!({"entries": []});
129                }
130                // Normalize per-entry algo values if missing/invalid
131                if let Some(entries) = meta_val.get_mut("entries").and_then(|e| e.as_array_mut()) {
132                    for ent in entries.iter_mut() {
133                        if let Some(arr) = ent.as_array_mut() {
134                            if arr.len() == 2 {
135                                if let Some(obj) = arr.get_mut(1).and_then(|v| v.as_object_mut()) {
136                                    let ok = obj
137                                        .get("algo")
138                                        .and_then(|v| v.as_str())
139                                        .map(|s| s == "FastHash" || s == "CRC32")
140                                        .unwrap_or(false);
141                                    if !ok {
142                                        let def = match self.checksum_manager.default_algorithm() {
143                                            ChecksumAlgorithm::CRC32 => "CRC32",
144                                            _ => "FastHash",
145                                        };
146                                        obj.insert(
147                                            "algo".into(),
148                                            serde_json::Value::String(def.into()),
149                                        );
150                                    }
151                                }
152                            }
153                        }
154                    }
155                }
156                let meta_string = serde_json::to_string(&meta_val).unwrap_or_else(|_| "{}".into());
157                let allocated: std::collections::HashSet<u64> =
158                    lock_mutex!(self.allocated_blocks).clone();
159                // Write metadata via commit marker: metadata.json.pending -> metadata.json
160                let mut meta_pending = db_dir.clone();
161                meta_pending.push("metadata.json.pending");
162                log::debug!(
163                    "[fs_persist] cleanup-only: writing pending metadata at {:?}",
164                    meta_pending
165                );
166                if let Ok(mut f) = fs::File::create(&meta_pending) {
167                    let _ = f.write_all(meta_string.as_bytes());
168                    let _ = f.sync_all();
169                }
170                let _ = fs::rename(&meta_pending, &meta_path);
171                log::debug!(
172                    "[fs_persist] cleanup-only: finalized metadata rename to {:?}",
173                    meta_path
174                );
175                let mut alloc_path = db_dir.clone();
176                alloc_path.push("allocations.json");
177                let mut alloc = FsAlloc {
178                    allocated: allocated.iter().cloned().collect(),
179                };
180                alloc.allocated.sort_unstable();
181                if let Ok(mut f) = fs::File::create(&alloc_path) {
182                    let _ = f.write_all(
183                        serde_json::to_string(&alloc)
184                            .unwrap_or_else(|_| "{}".into())
185                            .as_bytes(),
186                    );
187                }
188                log::info!("wrote allocations.json at {:?}", alloc_path);
189                // Remove stray block files not allocated
190                // Determine valid block ids from metadata; remove files that have no metadata entry
191                let valid_ids: std::collections::HashSet<u64> =
192                    if let Some(entries) = meta_val.get("entries").and_then(|e| e.as_array()) {
193                        entries
194                            .iter()
195                            .filter_map(|ent| {
196                                ent.as_array()
197                                    .and_then(|arr| arr.first())
198                                    .and_then(|v| v.as_u64())
199                            })
200                            .collect()
201                    } else {
202                        std::collections::HashSet::new()
203                    };
204                if let Ok(entries) = fs::read_dir(&blocks_dir) {
205                    for entry in entries.flatten() {
206                        if let Ok(ft) = entry.file_type() {
207                            if ft.is_file() {
208                                if let Some(name) = entry.file_name().to_str() {
209                                    if let Some(id_str) = name
210                                        .strip_prefix("block_")
211                                        .and_then(|s| s.strip_suffix(".bin"))
212                                    {
213                                        if let Ok(id) = id_str.parse::<u64>() {
214                                            if !valid_ids.contains(&id) {
215                                                let _ = fs::remove_file(entry.path());
216                                            }
217                                        }
218                                    }
219                                }
220                            }
221                        }
222                    }
223                }
224
225                // Also mirror cleanup to the current ABSURDERSQL_FS_BASE at sync-time to avoid env var race conditions across tests
226                let alt_base: PathBuf = {
227                    if let Ok(p) = env::var("ABSURDERSQL_FS_BASE") {
228                        PathBuf::from(p)
229                    } else if cfg!(any(test, debug_assertions)) {
230                        PathBuf::from(format!(".absurdersql_fs/run_{}", std::process::id()))
231                    } else {
232                        PathBuf::from(".absurdersql_fs")
233                    }
234                };
235                if alt_base != self.base_dir {
236                    let mut alt_db_dir = alt_base.clone();
237                    alt_db_dir.push(&self.db_name);
238                    let mut alt_blocks_dir = alt_db_dir.clone();
239                    alt_blocks_dir.push("blocks");
240                    let _ = fs::create_dir_all(&alt_blocks_dir);
241                    // alt metadata via commit marker
242                    let mut alt_meta_pending = alt_db_dir.clone();
243                    alt_meta_pending.push("metadata.json.pending");
244                    log::debug!(
245                        "[fs_persist] cleanup-only (alt): writing pending metadata at {:?}",
246                        alt_meta_pending
247                    );
248                    if let Ok(mut f) = fs::File::create(&alt_meta_pending) {
249                        let _ = f.write_all(meta_string.as_bytes());
250                        let _ = f.sync_all();
251                    }
252                    let mut alt_meta_path = alt_db_dir.clone();
253                    alt_meta_path.push("metadata.json");
254                    let _ = fs::rename(&alt_meta_pending, &alt_meta_path);
255                    log::debug!(
256                        "[fs_persist] cleanup-only (alt): finalized metadata rename to {:?}",
257                        alt_meta_path
258                    );
259                    let mut alt_alloc_path = alt_db_dir.clone();
260                    alt_alloc_path.push("allocations.json");
261                    if let Ok(mut f) = fs::File::create(&alt_alloc_path) {
262                        let _ = f.write_all(
263                            serde_json::to_string(&alloc)
264                                .unwrap_or_else(|_| "{}".into())
265                                .as_bytes(),
266                        );
267                    }
268                    log::info!("(alt) wrote allocations.json at {:?}", alt_alloc_path);
269                    if let Ok(entries) = fs::read_dir(&alt_blocks_dir) {
270                        for entry in entries.flatten() {
271                            if let Ok(ft) = entry.file_type() {
272                                if ft.is_file() {
273                                    if let Some(name) = entry.file_name().to_str() {
274                                        if let Some(id_str) = name
275                                            .strip_prefix("block_")
276                                            .and_then(|s| s.strip_suffix(".bin"))
277                                        {
278                                            if let Ok(id) = id_str.parse::<u64>() {
279                                                if !valid_ids.contains(&id) {
280                                                    let _ = fs::remove_file(entry.path());
281                                                }
282                                            }
283                                        }
284                                    }
285                                }
286                            }
287                        }
288                    }
289                }
290            }
291            return Ok(());
292        }
293
294        let current_dirty = self.get_dirty_blocks().lock().len();
295        log::info!("Syncing {} dirty blocks", current_dirty);
296
297        // For WASM, persist dirty blocks to global storage
298        #[cfg(target_arch = "wasm32")]
299        {
300            let to_persist: Vec<(u64, Vec<u8>)> = {
301                let dirty = self.get_dirty_blocks().lock();
302                dirty.iter().map(|(k, v)| (*k, v.clone())).collect()
303            };
304            let ids: Vec<u64> = to_persist.iter().map(|(k, _)| *k).collect();
305            // Determine next commit version so that all metadata written in this sync share the same version
306            let next_commit: u64 = vfs_sync::with_global_commit_marker(|cm| {
307                let cm = cm;
308                let current = cm.get(&self.db_name).copied().unwrap_or(0);
309                #[cfg(target_arch = "wasm32")]
310                log::debug!("Current commit marker for {}: {}", self.db_name, current);
311                current + 1
312            });
313            #[cfg(target_arch = "wasm32")]
314            log::debug!("Next commit marker for {}: {}", self.db_name, next_commit);
315            vfs_sync::with_global_storage(|storage| {
316                let mut storage_map = storage.lock();
317                let db_storage = storage_map
318                    .entry(self.db_name.clone())
319                    .or_insert_with(HashMap::new);
320                for (block_id, data) in &to_persist {
321                    // Check if block already exists in global storage with committed data
322                    let should_update = if let Some(existing) = db_storage.get(block_id) {
323                        if existing != data {
324                            // Check if existing data has committed metadata (version > 0)
325                            let has_committed_metadata = vfs_sync::with_global_metadata(|meta| {
326                                let meta_map = meta.borrow_mut();
327                                if let Some(db_meta) = meta_map.get(&self.db_name) {
328                                    if let Some(metadata) = db_meta.get(block_id) {
329                                        metadata.version > 0
330                                    } else {
331                                        false
332                                    }
333                                } else {
334                                    false
335                                }
336                            });
337
338                            let existing_preview = if existing.len() >= 8 {
339                                format!(
340                                    "{:02x} {:02x} {:02x} {:02x} {:02x} {:02x} {:02x} {:02x}",
341                                    existing[0],
342                                    existing[1],
343                                    existing[2],
344                                    existing[3],
345                                    existing[4],
346                                    existing[5],
347                                    existing[6],
348                                    existing[7]
349                                )
350                            } else {
351                                "short".to_string()
352                            };
353                            let new_preview = if data.len() >= 8 {
354                                format!(
355                                    "{:02x} {:02x} {:02x} {:02x} {:02x} {:02x} {:02x} {:02x}",
356                                    data[0],
357                                    data[1],
358                                    data[2],
359                                    data[3],
360                                    data[4],
361                                    data[5],
362                                    data[6],
363                                    data[7]
364                                )
365                            } else {
366                                "short".to_string()
367                            };
368
369                            if has_committed_metadata {
370                                // CRITICAL FIX: Never overwrite committed data to prevent corruption
371                                // Once data is committed, it should be immutable to maintain data integrity
372                                #[cfg(target_arch = "wasm32")]
373                                log::debug!(
374                                    "SYNC preserving committed block {} - existing: {}, cache: {} - NEVER OVERWRITE COMMITTED DATA",
375                                    block_id,
376                                    existing_preview,
377                                    new_preview
378                                );
379                                false // Never overwrite committed data
380                            } else {
381                                #[cfg(target_arch = "wasm32")]
382                                log::debug!(
383                                    "SYNC updating uncommitted block {} - existing: {}, new: {}",
384                                    block_id,
385                                    existing_preview,
386                                    new_preview
387                                );
388                                true // Update uncommitted data
389                            }
390                        } else {
391                            true // Same data, safe to update
392                        }
393                    } else {
394                        true // No existing data, safe to insert
395                    };
396
397                    if should_update {
398                        db_storage.insert(*block_id, data.clone());
399                        log::debug!("Persisted block {} to global storage", block_id);
400                    }
401                }
402            });
403            // Persist corresponding metadata entries
404            vfs_sync::with_global_metadata(|meta| {
405                let mut meta_map = meta.borrow_mut();
406                let db_meta = meta_map
407                    .entry(self.db_name.clone())
408                    .or_insert_with(HashMap::new);
409                for block_id in ids {
410                    if let Some(checksum) = self.checksum_manager.get_checksum(block_id) {
411                        // Use the per-commit version so entries remain invisible until the commit marker advances
412                        let version = next_commit as u32;
413                        db_meta.insert(
414                            block_id,
415                            BlockMetadataPersist {
416                                checksum,
417                                last_modified_ms: Self::now_millis(),
418                                version,
419                                algo: self.checksum_manager.get_algorithm(block_id),
420                            },
421                        );
422                        log::debug!("Persisted metadata for block {}", block_id);
423                    }
424                }
425            });
426            // Atomically advance the commit marker after all data and metadata are persisted
427            vfs_sync::with_global_commit_marker(|cm| {
428                let cm_map = cm;
429                cm_map.insert(self.db_name.clone(), next_commit);
430            });
431
432            // Spawn async IndexedDB persistence (fire and forget for sync compatibility)
433            #[cfg(target_arch = "wasm32")]
434            log::debug!(
435                "Spawning IndexedDB persistence for {} blocks",
436                to_persist.len()
437            );
438            let db_name = self.db_name.clone();
439            wasm_bindgen_futures::spawn_local(async move {
440                use wasm_bindgen::JsCast;
441
442                // Get IndexedDB factory (works in both Window and Worker contexts)
443                let global = js_sys::global();
444                let indexed_db_value = match js_sys::Reflect::get(
445                    &global,
446                    &wasm_bindgen::JsValue::from_str("indexedDB"),
447                ) {
448                    Ok(val) => val,
449                    Err(_) => {
450                        log::error!("IndexedDB property access failed - cannot persist");
451                        return;
452                    }
453                };
454
455                if indexed_db_value.is_null() || indexed_db_value.is_undefined() {
456                    log::warn!(
457                        "IndexedDB unavailable for persistence (private browsing?) - data not persisted to IndexedDB"
458                    );
459                    return;
460                }
461
462                let idb_factory = match indexed_db_value.dyn_into::<web_sys::IdbFactory>() {
463                    Ok(factory) => factory,
464                    Err(_) => {
465                        log::error!("IndexedDB property is not an IdbFactory - cannot persist");
466                        return;
467                    }
468                };
469
470                let open_req = match idb_factory.open_with_u32("block_storage", 2) {
471                    Ok(req) => req,
472                    Err(e) => {
473                        log::error!("Failed to open IndexedDB for persistence: {:?}", e);
474                        return;
475                    }
476                };
477
478                // Set up upgrade handler to create object stores if needed
479                let upgrade_handler = js_sys::Function::new_no_args(&format!(
480                    "
481                    const db = event.target.result;
482                    if (!db.objectStoreNames.contains('blocks')) {{
483                        db.createObjectStore('blocks');
484                    }}
485                    if (!db.objectStoreNames.contains('metadata')) {{
486                        db.createObjectStore('metadata');
487                    }}
488                    "
489                ));
490                open_req.set_onupgradeneeded(Some(&upgrade_handler));
491
492                // Use event-based approach for opening database
493                let (tx, rx) = futures::channel::oneshot::channel();
494                let tx = std::rc::Rc::new(std::cell::RefCell::new(Some(tx)));
495
496                let success_tx = tx.clone();
497                let success_callback =
498                    wasm_bindgen::closure::Closure::wrap(Box::new(move |event: web_sys::Event| {
499                        if let Some(tx) = success_tx.lock().take() {
500                            let target = event.target().unwrap();
501                            let request: web_sys::IdbOpenDbRequest = target.unchecked_into();
502                            let result = request.result().unwrap();
503                            let _ = tx.send(Ok(result));
504                        }
505                    })
506                        as Box<dyn FnMut(_)>);
507
508                let error_tx = tx.clone();
509                let error_callback =
510                    wasm_bindgen::closure::Closure::wrap(Box::new(move |event: web_sys::Event| {
511                        if let Some(tx) = error_tx.lock().take() {
512                            let _ = tx.send(Err(format!("IndexedDB open failed: {:?}", event)));
513                        }
514                    })
515                        as Box<dyn FnMut(_)>);
516
517                open_req.set_onsuccess(Some(success_callback.as_ref().unchecked_ref()));
518                open_req.set_onerror(Some(error_callback.as_ref().unchecked_ref()));
519
520                let db_result = rx.await;
521
522                // Keep closures alive
523                success_callback.forget();
524                error_callback.forget();
525
526                match db_result {
527                    Ok(Ok(db_value)) => {
528                        #[cfg(target_arch = "wasm32")]
529                        log::info!("Successfully opened IndexedDB for persistence");
530                        if let Ok(db) = db_value.dyn_into::<web_sys::IdbDatabase>() {
531                            // Start transaction for both blocks and metadata
532                            let store_names = js_sys::Array::new();
533                            store_names.push(&wasm_bindgen::JsValue::from_str("blocks"));
534                            store_names.push(&wasm_bindgen::JsValue::from_str("metadata"));
535
536                            let transaction = db
537                                .transaction_with_str_sequence_and_mode(
538                                    &store_names,
539                                    web_sys::IdbTransactionMode::Readwrite,
540                                )
541                                .unwrap();
542
543                            let blocks_store = transaction.object_store("blocks").unwrap();
544                            let metadata_store = transaction.object_store("metadata").unwrap();
545
546                            // Persist all blocks
547                            for (block_id, data) in &to_persist {
548                                let key = wasm_bindgen::JsValue::from_str(&format!(
549                                    "{}_{}",
550                                    db_name, block_id
551                                ));
552                                let value = js_sys::Uint8Array::from(&data[..]);
553                                blocks_store.put_with_key(&value, &key).unwrap();
554                                #[cfg(target_arch = "wasm32")]
555                                log::debug!("Persisted block {} to IndexedDB", block_id);
556                            }
557
558                            // Persist commit marker
559                            let commit_key = wasm_bindgen::JsValue::from_str(&format!(
560                                "{}_commit_marker",
561                                db_name
562                            ));
563                            let commit_value = wasm_bindgen::JsValue::from_f64(next_commit as f64);
564                            metadata_store
565                                .put_with_key(&commit_value, &commit_key)
566                                .unwrap();
567                            #[cfg(target_arch = "wasm32")]
568                            log::info!("Persisted commit marker {} to IndexedDB", next_commit);
569
570                            // Use event-based approach for transaction completion
571                            let (tx_tx, tx_rx) = futures::channel::oneshot::channel();
572                            let tx_tx = std::rc::Rc::new(std::cell::RefCell::new(Some(tx_tx)));
573
574                            let tx_complete_tx = tx_tx.clone();
575                            let tx_complete_callback = wasm_bindgen::closure::Closure::wrap(
576                                Box::new(move |_event: web_sys::Event| {
577                                    if let Some(tx) = tx_complete_tx.lock().take() {
578                                        let _ = tx.send(Ok(()));
579                                    }
580                                }) as Box<dyn FnMut(_)>,
581                            );
582
583                            let tx_error_tx = tx_tx.clone();
584                            let tx_error_callback = wasm_bindgen::closure::Closure::wrap(Box::new(
585                                move |event: web_sys::Event| {
586                                    if let Some(tx) = tx_error_tx.lock().take() {
587                                        let _ = tx
588                                            .send(Err(format!("Transaction failed: {:?}", event)));
589                                    }
590                                },
591                            )
592                                as Box<dyn FnMut(_)>);
593
594                            transaction.set_oncomplete(Some(
595                                tx_complete_callback.as_ref().unchecked_ref(),
596                            ));
597                            transaction
598                                .set_onerror(Some(tx_error_callback.as_ref().unchecked_ref()));
599
600                            match tx_rx.await {
601                                Ok(Ok(_)) => {
602                                    #[cfg(target_arch = "wasm32")]
603                                    log::info!("IndexedDB transaction completed successfully");
604                                }
605                                Ok(Err(e)) => {
606                                    #[cfg(target_arch = "wasm32")]
607                                    log::error!("IndexedDB transaction failed: {}", e);
608                                }
609                                Err(_) => {
610                                    #[cfg(target_arch = "wasm32")]
611                                    log::error!("IndexedDB transaction channel failed");
612                                }
613                            }
614
615                            // Keep closures alive
616                            tx_complete_callback.forget();
617                            tx_error_callback.forget();
618                        } else {
619                            #[cfg(target_arch = "wasm32")]
620                            log::error!("Failed to cast to IdbDatabase for persistence");
621                        }
622                    }
623                    Ok(Err(e)) => {
624                        #[cfg(target_arch = "wasm32")]
625                        log::error!("Failed to open IndexedDB for persistence: {}", e);
626                    }
627                    Err(_) => {
628                        #[cfg(target_arch = "wasm32")]
629                        log::error!("IndexedDB open channel failed");
630                    }
631                }
632            });
633        }
634
635        // For native fs_persist, write dirty blocks to disk and update metadata.json
636        #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
637        {
638            let to_persist: Vec<(u64, Vec<u8>)> = {
639                let dirty = self.get_dirty_blocks().lock();
640                dirty.iter().map(|(k, v)| (*k, v.clone())).collect()
641            };
642            let now_ms = Self::now_millis();
643            let base: PathBuf = self.base_dir.clone();
644            let mut db_dir = base.clone();
645            db_dir.push(&self.db_name);
646            let mut blocks_dir = db_dir.clone();
647            blocks_dir.push("blocks");
648            let mut meta_path = db_dir.clone();
649            meta_path.push("metadata.json");
650            // Ensure dirs exist
651            let _ = fs::create_dir_all(&blocks_dir);
652            // Load existing metadata tolerantly and build a JSON object map keyed by id
653            let mut meta_val: serde_json::Value = serde_json::json!({"entries": []});
654            if let Ok(mut f) = fs::File::open(&meta_path) {
655                let mut s = String::new();
656                if f.read_to_string(&mut s).is_ok() {
657                    if let Ok(v) = serde_json::from_str::<serde_json::Value>(&s) {
658                        meta_val = v;
659                    }
660                }
661            }
662            if !meta_val.is_object() {
663                meta_val = serde_json::json!({"entries": []});
664            }
665            let mut map: HashMap<u64, serde_json::Map<String, serde_json::Value>> = HashMap::new();
666            if let Some(entries) = meta_val.get("entries").and_then(|e| e.as_array()) {
667                for ent in entries.iter() {
668                    if let Some(arr) = ent.as_array() {
669                        if arr.len() == 2 {
670                            if let (Some(id), Some(obj)) = (
671                                arr.first().and_then(|v| v.as_u64()),
672                                arr.get(1).and_then(|v| v.as_object()),
673                            ) {
674                                map.insert(id, obj.clone());
675                            }
676                        }
677                    }
678                }
679            }
680            for (block_id, data) in &to_persist {
681                // write block file
682                let mut block_file = blocks_dir.clone();
683                block_file.push(format!("block_{}.bin", block_id));
684                if let Ok(mut f) = fs::File::create(&block_file) {
685                    let _ = f.write_all(data);
686                }
687                // update metadata
688                if let Some(checksum) = self.checksum_manager.get_checksum(*block_id) {
689                    let version_u64 = map
690                        .get(block_id)
691                        .and_then(|m| m.get("version"))
692                        .and_then(|v| v.as_u64())
693                        .unwrap_or(0)
694                        .saturating_add(1);
695                    let algo = self.checksum_manager.get_algorithm(*block_id);
696                    let algo_str = match algo {
697                        ChecksumAlgorithm::CRC32 => "CRC32",
698                        _ => "FastHash",
699                    };
700                    let mut obj = serde_json::Map::new();
701                    obj.insert("checksum".into(), serde_json::Value::from(checksum));
702                    obj.insert("last_modified_ms".into(), serde_json::Value::from(now_ms));
703                    obj.insert("version".into(), serde_json::Value::from(version_u64));
704                    obj.insert("algo".into(), serde_json::Value::String(algo_str.into()));
705                    map.insert(*block_id, obj);
706                }
707            }
708            // Normalize any remaining entries with missing/invalid algo
709            for (_id, obj) in map.iter_mut() {
710                let ok = obj
711                    .get("algo")
712                    .and_then(|v| v.as_str())
713                    .map(|s| s == "FastHash" || s == "CRC32")
714                    .unwrap_or(false);
715                if !ok {
716                    let def = match self.checksum_manager.default_algorithm() {
717                        ChecksumAlgorithm::CRC32 => "CRC32",
718                        _ => "FastHash",
719                    };
720                    obj.insert("algo".into(), serde_json::Value::String(def.into()));
721                }
722            }
723            // Do not prune metadata based on allocated set; preserve entries for all persisted blocks
724            let allocated: std::collections::HashSet<u64> =
725                lock_mutex!(self.allocated_blocks).clone();
726            // Save metadata (build entries array [[id, obj], ...])
727            let mut entries_vec: Vec<serde_json::Value> = Vec::new();
728            for (id, obj) in map.iter() {
729                entries_vec.push(serde_json::Value::Array(vec![
730                    serde_json::Value::from(*id),
731                    serde_json::Value::Object(obj.clone()),
732                ]));
733            }
734            let meta_out = serde_json::json!({"entries": entries_vec});
735            let meta_string = serde_json::to_string(&meta_out).unwrap_or_else(|_| "{}".into());
736            // Write metadata via commit marker: metadata.json.pending -> metadata.json
737            let mut meta_pending = db_dir.clone();
738            meta_pending.push("metadata.json.pending");
739            log::debug!(
740                "[fs_persist] writing pending metadata at {:?}",
741                meta_pending
742            );
743            if let Ok(mut f) = fs::File::create(&meta_pending) {
744                let _ = f.write_all(meta_string.as_bytes());
745                let _ = f.sync_all();
746            }
747            let _ = fs::rename(&meta_pending, &meta_path);
748            log::debug!("[fs_persist] finalized metadata rename to {:?}", meta_path);
749            // Mirror allocations.json to current allocated set
750            let mut alloc_path = db_dir.clone();
751            alloc_path.push("allocations.json");
752            let mut alloc = FsAlloc {
753                allocated: allocated.iter().cloned().collect(),
754            };
755            alloc.allocated.sort_unstable();
756            if let Ok(mut f) = fs::File::create(&alloc_path) {
757                let _ = f.write_all(
758                    serde_json::to_string(&alloc)
759                        .unwrap_or_else(|_| "{}".into())
760                        .as_bytes(),
761                );
762            }
763            log::info!("wrote allocations.json at {:?}", alloc_path);
764            // Remove any stray block files for deallocated blocks
765            // Determine valid ids from metadata; remove files without a metadata entry
766            let valid_ids: std::collections::HashSet<u64> = map.keys().cloned().collect();
767            if let Ok(entries) = fs::read_dir(&blocks_dir) {
768                for entry in entries.flatten() {
769                    if let Ok(ft) = entry.file_type() {
770                        if ft.is_file() {
771                            if let Some(name) = entry.file_name().to_str() {
772                                if let Some(id_str) = name
773                                    .strip_prefix("block_")
774                                    .and_then(|s| s.strip_suffix(".bin"))
775                                {
776                                    if let Ok(id) = id_str.parse::<u64>() {
777                                        if !valid_ids.contains(&id) {
778                                            let _ = fs::remove_file(entry.path());
779                                        }
780                                    }
781                                }
782                            }
783                        }
784                    }
785                }
786            }
787
788            // Also mirror persistence to the current ABSURDERSQL_FS_BASE at sync-time, to avoid env var race issues
789            let alt_base: PathBuf = {
790                if let Ok(p) = env::var("ABSURDERSQL_FS_BASE") {
791                    PathBuf::from(p)
792                } else if cfg!(any(test, debug_assertions)) {
793                    PathBuf::from(format!(".absurdersql_fs/run_{}", std::process::id()))
794                } else {
795                    PathBuf::from(".absurdersql_fs")
796                }
797            };
798            if alt_base != self.base_dir {
799                let mut alt_db_dir = alt_base.clone();
800                alt_db_dir.push(&self.db_name);
801                let mut alt_blocks_dir = alt_db_dir.clone();
802                alt_blocks_dir.push("blocks");
803                let _ = fs::create_dir_all(&alt_blocks_dir);
804                // Write blocks
805                let alt_to_persist: Vec<(u64, Vec<u8>)> = {
806                    let dirty = self.get_dirty_blocks().lock();
807                    dirty.iter().map(|(k, v)| (*k, v.clone())).collect()
808                };
809                for (block_id, data) in alt_to_persist.iter() {
810                    let mut alt_block_file = alt_blocks_dir.clone();
811                    alt_block_file.push(format!("block_{}.bin", block_id));
812                    if let Ok(mut f) = fs::File::create(&alt_block_file) {
813                        let _ = f.write_all(data);
814                    }
815                }
816                // Save metadata mirror
817                let mut alt_meta_pending = alt_db_dir.clone();
818                alt_meta_pending.push("metadata.json.pending");
819                log::debug!(
820                    "[fs_persist] (alt) writing pending metadata at {:?}",
821                    alt_meta_pending
822                );
823                if let Ok(mut f) = fs::File::create(&alt_meta_pending) {
824                    let _ = f.write_all(meta_string.as_bytes());
825                    let _ = f.sync_all();
826                }
827                let mut alt_meta_path = alt_db_dir.clone();
828                alt_meta_path.push("metadata.json");
829                let _ = fs::rename(&alt_meta_pending, &alt_meta_path);
830                log::debug!(
831                    "[fs_persist] (alt) finalized metadata rename to {:?}",
832                    alt_meta_path
833                );
834                // allocations mirror
835                let mut alt_alloc_path = alt_db_dir.clone();
836                alt_alloc_path.push("allocations.json");
837                if let Ok(mut f) = fs::File::create(&alt_alloc_path) {
838                    let _ = f.write_all(
839                        serde_json::to_string(&alloc)
840                            .unwrap_or_else(|_| "{}".into())
841                            .as_bytes(),
842                    );
843                }
844                log::info!("(alt) wrote allocations.json at {:?}", alt_alloc_path);
845                // cleanup stray files
846                if let Ok(entries) = fs::read_dir(&alt_blocks_dir) {
847                    for entry in entries.flatten() {
848                        if let Ok(ft) = entry.file_type() {
849                            if ft.is_file() {
850                                if let Some(name) = entry.file_name().to_str() {
851                                    if let Some(id_str) = name
852                                        .strip_prefix("block_")
853                                        .and_then(|s| s.strip_suffix(".bin"))
854                                    {
855                                        if let Ok(id) = id_str.parse::<u64>() {
856                                            if !valid_ids.contains(&id) {
857                                                let _ = fs::remove_file(entry.path());
858                                            }
859                                        }
860                                    }
861                                }
862                            }
863                        }
864                    }
865                }
866            }
867        }
868
869        // For native tests, persist dirty blocks and metadata to test globals (when fs_persist disabled)
870        #[cfg(all(
871            not(target_arch = "wasm32"),
872            any(test, debug_assertions),
873            not(feature = "fs_persist")
874        ))]
875        {
876            let to_persist: Vec<(u64, Vec<u8>)> = {
877                let dirty = self.get_dirty_blocks().lock();
878                dirty.iter().map(|(k, v)| (*k, v.clone())).collect()
879            };
880            let ids: Vec<u64> = to_persist.iter().map(|(k, _)| *k).collect();
881            // Determine next commit version for the test-global path
882            let next_commit: u64 = vfs_sync::with_global_commit_marker(|cm| {
883                let cm = cm;
884                cm.get(&self.db_name).copied().unwrap_or(0) + 1
885            });
886            vfs_sync::with_global_storage(|storage| {
887                let mut storage_map = storage.lock();
888                let db_storage = storage_map
889                    .entry(self.db_name.clone())
890                    .or_insert_with(HashMap::new);
891                for (block_id, data) in &to_persist {
892                    db_storage.insert(*block_id, data.clone());
893                    log::debug!("[test] Persisted block {} to test-global storage", block_id);
894                }
895            });
896            // Persist corresponding metadata entries
897            GLOBAL_METADATA_TEST.with(|meta| {
898                let mut meta_map = meta.borrow_mut();
899                let db_meta = meta_map
900                    .entry(self.db_name.clone())
901                    .or_insert_with(HashMap::new);
902                for block_id in ids {
903                    if let Some(checksum) = self.checksum_manager.get_checksum(block_id) {
904                        let version = next_commit as u32;
905                        db_meta.insert(
906                            block_id,
907                            BlockMetadataPersist {
908                                checksum,
909                                last_modified_ms: Self::now_millis(),
910                                version,
911                                algo: self.checksum_manager.get_algorithm(block_id),
912                            },
913                        );
914                        log::debug!("[test] Persisted metadata for block {}", block_id);
915                    }
916                }
917            });
918            // Advance commit marker after persisting all entries
919            vfs_sync::with_global_commit_marker(|cm| {
920                let cm_map = cm;
921                cm_map.insert(self.db_name.clone(), next_commit);
922            });
923        }
924
925        #[cfg(not(target_arch = "wasm32"))]
926        let start = Instant::now();
927        let dirty_count = {
928            let mut dirty = self.get_dirty_blocks().lock();
929            let count = dirty.len();
930            dirty.clear();
931            count
932        };
933        log::info!(
934            "Successfully synced {} blocks to global storage",
935            dirty_count
936        );
937        #[cfg(not(target_arch = "wasm32"))]
938        {
939            self.sync_count.fetch_add(1, Ordering::SeqCst);
940            let elapsed = start.elapsed();
941            let ms = elapsed.as_millis() as u64;
942            let ms = if ms == 0 { 1 } else { ms };
943            self.last_sync_duration_ms.store(ms, Ordering::SeqCst);
944
945            // Record sync success for observability
946            self.observability.record_sync_success(ms, dirty_count);
947
948            // Invoke sync success callback if set
949            if let Some(ref callback) = self.observability.sync_success_callback {
950                callback(ms, dirty_count);
951            }
952        }
953        // Now that everything is clean, enforce capacity again
954        self.evict_if_needed();
955        Ok(())
956    }
957}