absurder_sql/storage/
constructors.rs

1//! Constructor functions for BlockStorage
2//! This module contains platform-specific constructor implementations
3
4#[cfg(target_arch = "wasm32")]
5use super::block_storage::{BlockStorage, DEFAULT_CACHE_CAPACITY, RecoveryReport};
6#[cfg(target_arch = "wasm32")]
7use super::metadata::{ChecksumAlgorithm, ChecksumManager};
8#[cfg(target_arch = "wasm32")]
9use super::vfs_sync;
10#[cfg(target_arch = "wasm32")]
11use crate::types::DatabaseError;
12#[cfg(target_arch = "wasm32")]
13use std::cell::RefCell;
14#[cfg(target_arch = "wasm32")]
15use std::collections::{HashMap, HashSet, VecDeque};
16#[cfg(target_arch = "wasm32")]
17use std::sync::Arc;
18
19// On-disk JSON schema for fs_persist
20#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
21#[derive(serde::Serialize, serde::Deserialize, Default)]
22#[allow(dead_code)]
23struct FsAlloc {
24    allocated: Vec<u64>,
25}
26
27#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
28#[derive(serde::Serialize, serde::Deserialize, Default)]
29#[allow(dead_code)]
30struct FsDealloc {
31    tombstones: Vec<u64>,
32}
33
34/// Create a new BlockStorage instance for WASM platform
35#[cfg(target_arch = "wasm32")]
36pub async fn new_wasm(db_name: &str) -> Result<BlockStorage, DatabaseError> {
37    log::info!("Creating BlockStorage for database: {}", db_name);
38
39    // Perform IndexedDB recovery scan first
40    let recovery_performed = super::wasm_indexeddb::perform_indexeddb_recovery_scan(db_name)
41        .await
42        .unwrap_or(false);
43    if recovery_performed {
44        log::info!("IndexedDB recovery scan completed for: {}", db_name);
45    }
46
47    // Try to restore from IndexedDB
48    match super::wasm_indexeddb::restore_from_indexeddb(db_name).await {
49        Ok(_) => log::info!(
50            "Successfully restored BlockStorage from IndexedDB for: {}",
51            db_name
52        ),
53        Err(e) => log::warn!(
54            "IndexedDB restoration failed for {}: {}",
55            db_name,
56            e.message
57        ),
58    }
59
60    // Debug: Log what's in global storage after restoration
61    vfs_sync::with_global_storage(|storage_map| {
62        if let Some(db_storage) = storage_map.borrow().get(db_name) {
63            #[cfg(target_arch = "wasm32")]
64            web_sys::console::log_1(
65                &format!(
66                    "DEBUG: After restoration, database {} has {} blocks in global storage",
67                    db_name,
68                    db_storage.len()
69                )
70                .into(),
71            );
72            for (block_id, data) in db_storage.iter() {
73                let preview = if data.len() >= 16 {
74                    format!(
75                        "{:02x} {:02x} {:02x} {:02x} {:02x} {:02x} {:02x} {:02x} {:02x} {:02x} {:02x} {:02x} {:02x} {:02x} {:02x} {:02x}",
76                        data[0],
77                        data[1],
78                        data[2],
79                        data[3],
80                        data[4],
81                        data[5],
82                        data[6],
83                        data[7],
84                        data[8],
85                        data[9],
86                        data[10],
87                        data[11],
88                        data[12],
89                        data[13],
90                        data[14],
91                        data[15]
92                    )
93                } else {
94                    "short".to_string()
95                };
96                #[cfg(target_arch = "wasm32")]
97                web_sys::console::log_1(
98                    &format!(
99                        "DEBUG: Block {} preview after restoration: {}",
100                        block_id, preview
101                    )
102                    .into(),
103                );
104
105                // CRITICAL: Check SQLite magic bytes on block 0
106                if *block_id == 0 {
107                    let is_valid = data.len() >= 16 && &data[0..16] == b"SQLite format 3\0";
108                    #[cfg(target_arch = "wasm32")]
109                    web_sys::console::log_1(
110                        &format!("DEBUG: Block 0 SQLite header valid: {}", is_valid).into(),
111                    );
112                }
113            }
114
115            #[cfg(target_arch = "wasm32")]
116            web_sys::console::log_1(
117                &format!(
118                    "DEBUG: Found {} blocks in global storage for pre-population",
119                    db_storage.len()
120                )
121                .into(),
122            );
123        } else {
124            #[cfg(target_arch = "wasm32")]
125            web_sys::console::log_1(
126                &format!(
127                    "DEBUG: After restoration, no blocks found for database {}",
128                    db_name
129                )
130                .into(),
131            );
132        }
133    });
134
135    // In fs_persist mode, proactively ensure the on-disk structure exists for this DB
136    // so tests that inspect the filesystem right after first sync can find the blocks dir.
137    #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
138    {
139        let mut db_dir = fs_base_dir.clone();
140        db_dir.push(db_name);
141        let _ = fs::create_dir_all(&db_dir);
142        let mut blocks_dir = db_dir.clone();
143        blocks_dir.push("blocks");
144        let _ = fs::create_dir_all(&blocks_dir);
145        println!(
146            "[fs] init base_dir={:?}, db_dir={:?}, blocks_dir={:?}",
147            fs_base_dir, db_dir, blocks_dir
148        );
149        // Ensure metadata.json exists
150        let mut meta_path = db_dir.clone();
151        meta_path.push("metadata.json");
152        if fs::metadata(&meta_path).is_err() {
153            if let Ok(mut f) = fs::File::create(&meta_path) {
154                let _ = f.write_all(br#"{"entries":[]}"#);
155            }
156        }
157        // Ensure allocations.json exists
158        let mut alloc_path = db_dir.clone();
159        alloc_path.push("allocations.json");
160        if fs::metadata(&alloc_path).is_err() {
161            if let Ok(mut f) = fs::File::create(&alloc_path) {
162                let _ = f.write_all(br#"{"allocated":[]}"#);
163            }
164        }
165        // Ensure deallocated.json exists
166        let mut dealloc_path = db_dir.clone();
167        dealloc_path.push("deallocated.json");
168        if fs::metadata(&dealloc_path).is_err() {
169            if let Ok(mut f) = fs::File::create(&dealloc_path) {
170                let _ = f.write_all(br#"{"tombstones":[]}"#);
171            }
172        }
173    }
174
175    // Initialize allocation tracking
176    let (allocated_blocks, next_block_id) = {
177        // WASM: restore allocation state from global storage
178        #[cfg(target_arch = "wasm32")]
179        {
180            let mut allocated_blocks = HashSet::new();
181            let mut next_block_id: u64 = 1;
182
183            vfs_sync::with_global_allocation_map(|allocation_map| {
184                if let Some(existing_allocations) = allocation_map.borrow().get(db_name) {
185                    allocated_blocks = existing_allocations.clone();
186                    next_block_id = allocated_blocks.iter().max().copied().unwrap_or(0) + 1;
187                    log::info!(
188                        "Restored {} allocated blocks for database: {}",
189                        allocated_blocks.len(),
190                        db_name
191                    );
192                }
193            });
194
195            (allocated_blocks, next_block_id)
196        }
197
198        // fs_persist (native): restore allocation from filesystem
199        #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
200        {
201            let mut path = fs_base_dir.clone();
202            path.push(db_name);
203            let mut alloc_path = path.clone();
204            alloc_path.push("allocations.json");
205            let (mut allocated_blocks, mut next_block_id) = (HashSet::new(), 1u64);
206            if let Ok(mut f) = fs::File::open(&alloc_path) {
207                let mut s = String::new();
208                if f.read_to_string(&mut s).is_ok() {
209                    if let Ok(parsed) = serde_json::from_str::<FsAlloc>(&s) {
210                        for id in parsed.allocated {
211                            allocated_blocks.insert(id);
212                        }
213                        next_block_id = allocated_blocks.iter().max().copied().unwrap_or(0) + 1;
214                        log::info!(
215                            "[fs] Restored {} allocated blocks for database: {}",
216                            allocated_blocks.len(),
217                            db_name
218                        );
219                    }
220                }
221            }
222            (allocated_blocks, next_block_id)
223        }
224
225        // Native tests: restore allocation from test-global (when fs_persist is disabled)
226        #[cfg(all(
227            not(target_arch = "wasm32"),
228            any(test, debug_assertions),
229            not(feature = "fs_persist")
230        ))]
231        {
232            let mut allocated_blocks = HashSet::new();
233            let mut next_block_id: u64 = 1;
234
235            vfs_sync::with_global_allocation_map(|allocation_map| {
236                if let Some(existing_allocations) = allocation_map.get(db_name) {
237                    allocated_blocks = existing_allocations.clone();
238                    next_block_id = allocated_blocks.iter().max().copied().unwrap_or(0) + 1;
239                    log::info!(
240                        "[test] Restored {} allocated blocks for database: {}",
241                        allocated_blocks.len(),
242                        db_name
243                    );
244                }
245            });
246
247            (allocated_blocks, next_block_id)
248        }
249
250        // Native defaults
251        #[cfg(all(not(target_arch = "wasm32"), not(any(test, debug_assertions))))]
252        {
253            (HashSet::new(), 1u64)
254        }
255    };
256
257    // Initialize checksum map, restoring persisted metadata in WASM builds
258    #[cfg(target_arch = "wasm32")]
259    let checksums_init: HashMap<u64, u64> = {
260        let mut map = HashMap::new();
261        let committed = vfs_sync::with_global_commit_marker(|cm| {
262            cm.borrow().get(db_name).copied().unwrap_or(0)
263        });
264        vfs_sync::with_global_metadata(|meta_map| {
265            if let Some(db_meta) = meta_map.borrow().get(db_name) {
266                for (bid, m) in db_meta.iter() {
267                    if (m.version as u64) <= committed {
268                        map.insert(*bid, m.checksum);
269                    }
270                }
271                log::info!(
272                    "Restored {} checksum entries for database: {}",
273                    map.len(),
274                    db_name
275                );
276            }
277        });
278        map
279    };
280
281    // fs_persist: restore from metadata.json
282    #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
283    let checksums_init: HashMap<u64, u64> = {
284        let mut map = HashMap::new();
285        let mut path = fs_base_dir.clone();
286        path.push(db_name);
287        let mut meta_path = path.clone();
288        meta_path.push("metadata.json");
289        if let Ok(mut f) = fs::File::open(&meta_path) {
290            let mut s = String::new();
291            if f.read_to_string(&mut s).is_ok() {
292                if let Ok(val) = serde_json::from_str::<serde_json::Value>(&s) {
293                    if let Some(entries) = val.get("entries").and_then(|v| v.as_array()) {
294                        for entry in entries.iter() {
295                            if let Some(arr) = entry.as_array() {
296                                if arr.len() == 2 {
297                                    let id_opt = arr.get(0).and_then(|v| v.as_u64());
298                                    let meta_opt = arr.get(1).and_then(|v| v.as_object());
299                                    if let (Some(bid), Some(meta)) = (id_opt, meta_opt) {
300                                        if let Some(csum) =
301                                            meta.get("checksum").and_then(|v| v.as_u64())
302                                        {
303                                            map.insert(bid, csum);
304                                        }
305                                    }
306                                }
307                            }
308                        }
309                        log::info!("[fs] Restored checksum metadata for database: {}", db_name);
310                    }
311                }
312            }
313        }
314        map
315    };
316
317    // fs_persist: restore per-block checksum algorithms from metadata.json
318    #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
319    let checksum_algos_init: HashMap<u64, ChecksumAlgorithm> = {
320        let mut map = HashMap::new();
321        let mut path = fs_base_dir.clone();
322        path.push(db_name);
323        let mut meta_path = path.clone();
324        meta_path.push("metadata.json");
325        if let Ok(mut f) = fs::File::open(&meta_path) {
326            let mut s = String::new();
327            if f.read_to_string(&mut s).is_ok() {
328                if let Ok(val) = serde_json::from_str::<serde_json::Value>(&s) {
329                    if let Some(entries) = val.get("entries").and_then(|v| v.as_array()) {
330                        for entry in entries.iter() {
331                            if let Some(arr) = entry.as_array() {
332                                if arr.len() == 2 {
333                                    let id_opt = arr.get(0).and_then(|v| v.as_u64());
334                                    let meta_opt = arr.get(1).and_then(|v| v.as_object());
335                                    if let (Some(bid), Some(meta)) = (id_opt, meta_opt) {
336                                        let algo_opt = meta.get("algo").and_then(|v| v.as_str());
337                                        let algo = match algo_opt {
338                                            Some("FastHash") => Some(ChecksumAlgorithm::FastHash),
339                                            Some("CRC32") => Some(ChecksumAlgorithm::CRC32),
340                                            _ => None, // tolerate invalid/missing by not inserting; will fallback to default later
341                                        };
342                                        if let Some(a) = algo {
343                                            map.insert(bid, a);
344                                        }
345                                    }
346                                }
347                            }
348                        }
349                        log::info!(
350                            "[fs] Restored checksum algorithms for database: {}",
351                            db_name
352                        );
353                    }
354                }
355            }
356        }
357        map
358    };
359
360    // fs_persist: restore deallocation tombstones
361    #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
362    let deallocated_init: HashSet<u64> = {
363        let mut set = HashSet::new();
364        let mut path = fs_base_dir.clone();
365        path.push(db_name);
366        let mut dealloc_path = path.clone();
367        dealloc_path.push("deallocated.json");
368        if let Ok(mut f) = fs::File::open(&dealloc_path) {
369            let mut s = String::new();
370            if f.read_to_string(&mut s).is_ok() {
371                if let Ok(parsed) = serde_json::from_str::<FsDealloc>(&s) {
372                    for id in parsed.tombstones {
373                        set.insert(id);
374                    }
375                    log::info!(
376                        "[fs] Restored {} deallocation tombstones for database: {}",
377                        set.len(),
378                        db_name
379                    );
380                }
381            }
382        }
383        set
384    };
385
386    // Native tests: restore from test-global metadata (when fs_persist is disabled)
387    #[cfg(all(
388        not(target_arch = "wasm32"),
389        any(test, debug_assertions),
390        not(feature = "fs_persist")
391    ))]
392    let checksums_init: HashMap<u64, u64> = {
393        let mut map = HashMap::new();
394        let committed =
395            vfs_sync::with_global_commit_marker(|cm| cm.get(db_name).copied().unwrap_or(0));
396        GLOBAL_METADATA_TEST.with(|meta| {
397            let meta_map = meta.borrow_mut();
398            if let Some(db_meta) = meta_map.get(db_name) {
399                for (bid, m) in db_meta.iter() {
400                    if (m.version as u64) <= committed {
401                        map.insert(*bid, m.checksum);
402                    }
403                }
404                log::info!(
405                    "[test] Restored {} checksum entries for database: {}",
406                    db_meta.len(),
407                    db_name
408                );
409            }
410        });
411        map
412    };
413
414    // Native tests: restore per-block algorithms (when fs_persist is disabled)
415    #[cfg(all(
416        not(target_arch = "wasm32"),
417        any(test, debug_assertions),
418        not(feature = "fs_persist")
419    ))]
420    let checksum_algos_init: HashMap<u64, ChecksumAlgorithm> = {
421        let mut map = HashMap::new();
422        let committed =
423            vfs_sync::with_global_commit_marker(|cm| cm.get(db_name).copied().unwrap_or(0));
424        GLOBAL_METADATA_TEST.with(|meta| {
425            let meta_map = meta.borrow_mut();
426            if let Some(db_meta) = meta_map.get(db_name) {
427                for (bid, m) in db_meta.iter() {
428                    if (m.version as u64) <= committed {
429                        map.insert(*bid, m.algo);
430                    }
431                }
432            }
433        });
434        map
435    };
436
437    // Native non-test: start empty
438    #[cfg(all(not(target_arch = "wasm32"), not(any(test, debug_assertions))))]
439    let checksums_init: HashMap<u64, u64> = HashMap::new();
440
441    // Native non-test: start empty for algorithms
442    #[cfg(all(not(target_arch = "wasm32"), not(any(test, debug_assertions))))]
443    let checksum_algos_init: HashMap<u64, ChecksumAlgorithm> = HashMap::new();
444
445    // WASM: restore per-block algorithms
446    #[cfg(target_arch = "wasm32")]
447    let checksum_algos_init: HashMap<u64, ChecksumAlgorithm> = {
448        let mut map = HashMap::new();
449        let committed = vfs_sync::with_global_commit_marker(|cm| {
450            cm.borrow().get(db_name).copied().unwrap_or(0)
451        });
452        vfs_sync::with_global_metadata(|meta_map| {
453            if let Some(db_meta) = meta_map.borrow().get(db_name) {
454                for (bid, m) in db_meta.iter() {
455                    if (m.version as u64) <= committed {
456                        map.insert(*bid, m.algo);
457                    }
458                }
459            }
460        });
461        map
462    };
463
464    // Determine default checksum algorithm from environment (fs_persist native), fallback to FastHash
465    #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
466    let checksum_algo_default = match env::var("DATASYNC_CHECKSUM_ALGO").ok().as_deref() {
467        Some("CRC32") => ChecksumAlgorithm::CRC32,
468        _ => ChecksumAlgorithm::FastHash,
469    };
470    #[cfg(not(all(not(target_arch = "wasm32"), feature = "fs_persist")))]
471    let checksum_algo_default = ChecksumAlgorithm::FastHash;
472
473    Ok(BlockStorage {
474        #[cfg(target_arch = "wasm32")]
475        cache: RefCell::new(HashMap::new()),
476        #[cfg(not(target_arch = "wasm32"))]
477        cache: Mutex::new(HashMap::new()),
478
479        #[cfg(target_arch = "wasm32")]
480        dirty_blocks: Arc::new(RefCell::new(HashMap::new())),
481        #[cfg(not(target_arch = "wasm32"))]
482        dirty_blocks: Arc::new(Mutex::new(HashMap::new())),
483
484        #[cfg(target_arch = "wasm32")]
485        allocated_blocks: RefCell::new(allocated_blocks),
486        #[cfg(not(target_arch = "wasm32"))]
487        allocated_blocks: Mutex::new(allocated_blocks),
488
489        next_block_id: std::sync::atomic::AtomicU64::new(next_block_id),
490        capacity: DEFAULT_CACHE_CAPACITY,
491
492        #[cfg(target_arch = "wasm32")]
493        lru_order: RefCell::new(VecDeque::new()),
494        #[cfg(not(target_arch = "wasm32"))]
495        lru_order: Mutex::new(VecDeque::new()),
496        checksum_manager: ChecksumManager::with_data(
497            checksums_init,
498            checksum_algos_init,
499            checksum_algo_default,
500        ),
501        db_name: db_name.to_string(),
502        #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
503        base_dir: fs_base_dir,
504
505        #[cfg(all(target_arch = "wasm32", feature = "fs_persist"))]
506        deallocated_blocks: RefCell::new(deallocated_init),
507        #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
508        deallocated_blocks: Mutex::new(deallocated_init),
509        #[cfg(all(target_arch = "wasm32", not(feature = "fs_persist")))]
510        deallocated_blocks: RefCell::new(HashSet::new()),
511        #[cfg(all(not(target_arch = "wasm32"), not(feature = "fs_persist")))]
512        deallocated_blocks: Mutex::new(HashSet::new()),
513
514        #[cfg(target_arch = "wasm32")]
515        auto_sync_interval: RefCell::new(None),
516        #[cfg(not(target_arch = "wasm32"))]
517        auto_sync_interval: Mutex::new(None),
518
519        #[cfg(not(target_arch = "wasm32"))]
520        last_auto_sync: Instant::now(),
521
522        #[cfg(target_arch = "wasm32")]
523        policy: RefCell::new(None),
524        #[cfg(not(target_arch = "wasm32"))]
525        policy: Mutex::new(None),
526        #[cfg(not(target_arch = "wasm32"))]
527        auto_sync_stop: None,
528        #[cfg(not(target_arch = "wasm32"))]
529        auto_sync_thread: None,
530        #[cfg(not(target_arch = "wasm32"))]
531        debounce_thread: None,
532        #[cfg(not(target_arch = "wasm32"))]
533        tokio_timer_task: None,
534        #[cfg(not(target_arch = "wasm32"))]
535        tokio_debounce_task: None,
536        #[cfg(not(target_arch = "wasm32"))]
537        last_write_ms: Arc::new(AtomicU64::new(0)),
538        #[cfg(not(target_arch = "wasm32"))]
539        threshold_hit: Arc::new(AtomicBool::new(false)),
540        #[cfg(not(target_arch = "wasm32"))]
541        sync_count: Arc::new(AtomicU64::new(0)),
542        #[cfg(not(target_arch = "wasm32"))]
543        timer_sync_count: Arc::new(AtomicU64::new(0)),
544        #[cfg(not(target_arch = "wasm32"))]
545        debounce_sync_count: Arc::new(AtomicU64::new(0)),
546        #[cfg(not(target_arch = "wasm32"))]
547        last_sync_duration_ms: Arc::new(AtomicU64::new(0)),
548        #[cfg(not(target_arch = "wasm32"))]
549        sync_sender: None,
550        #[cfg(not(target_arch = "wasm32"))]
551        sync_receiver: None,
552        recovery_report: RecoveryReport::default(),
553        #[cfg(target_arch = "wasm32")]
554        leader_election: std::cell::RefCell::new(None),
555        observability: super::observability::ObservabilityManager::new(),
556        #[cfg(feature = "telemetry")]
557        metrics: None,
558    })
559}