absurder_sql/storage/
constructors.rs

1//! Constructor functions for BlockStorage
2//! This module contains platform-specific constructor implementations
3
4#[cfg(target_arch = "wasm32")]
5use std::collections::{HashMap, HashSet, VecDeque};
6#[cfg(target_arch = "wasm32")]
7use std::sync::Arc;
8#[cfg(target_arch = "wasm32")]
9use parking_lot::Mutex;
10#[cfg(target_arch = "wasm32")]
11use crate::types::DatabaseError;
12#[cfg(target_arch = "wasm32")]
13use super::metadata::{ChecksumManager, ChecksumAlgorithm};
14#[cfg(target_arch = "wasm32")]
15use super::block_storage::{BlockStorage, RecoveryReport, DEFAULT_CACHE_CAPACITY};
16#[cfg(target_arch = "wasm32")]
17use super::vfs_sync;
18
19
20// On-disk JSON schema for fs_persist
21#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
22#[derive(serde::Serialize, serde::Deserialize, Default)]
23#[allow(dead_code)]
24struct FsAlloc { allocated: Vec<u64> }
25
26#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
27#[derive(serde::Serialize, serde::Deserialize, Default)]
28#[allow(dead_code)]
29struct FsDealloc { tombstones: Vec<u64> }
30
31/// Create a new BlockStorage instance for WASM platform
32#[cfg(target_arch = "wasm32")]
33pub async fn new_wasm(db_name: &str) -> Result<BlockStorage, DatabaseError> {
34    log::info!("Creating BlockStorage for database: {}", db_name);
35    
36    // Perform IndexedDB recovery scan first
37    let recovery_performed = super::wasm_indexeddb::perform_indexeddb_recovery_scan(db_name).await
38        .unwrap_or(false);
39    if recovery_performed {
40        log::info!("IndexedDB recovery scan completed for: {}", db_name);
41    }
42    
43    // Try to restore from IndexedDB
44    match super::wasm_indexeddb::restore_from_indexeddb(db_name).await {
45        Ok(_) => log::info!("Successfully restored BlockStorage from IndexedDB for: {}", db_name),
46        Err(e) => log::warn!("IndexedDB restoration failed for {}: {}", db_name, e.message),
47    }
48    
49    // Debug: Log what's in global storage after restoration
50    vfs_sync::with_global_storage(|storage| {
51        let storage_map = storage.borrow();
52        if let Some(db_storage) = storage_map.get(db_name) {
53            #[cfg(target_arch = "wasm32")]
54            web_sys::console::log_1(&format!("DEBUG: After restoration, database {} has {} blocks in global storage", db_name, db_storage.len()).into());
55            for (block_id, data) in db_storage.iter() {
56                let preview = if data.len() >= 8 {
57                    format!("{:02x} {:02x} {:02x} {:02x} {:02x} {:02x} {:02x} {:02x}", 
58                        data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7])
59                } else {
60                    "short".to_string()
61                };
62                #[cfg(target_arch = "wasm32")]
63            web_sys::console::log_1(&format!("DEBUG: Block {} preview after restoration: {}", block_id, preview).into());
64            }
65            
66            #[cfg(target_arch = "wasm32")]
67            web_sys::console::log_1(&format!("DEBUG: Found {} blocks in global storage for pre-population", db_storage.len()).into());
68        } else {
69            #[cfg(target_arch = "wasm32")]
70            web_sys::console::log_1(&format!("DEBUG: After restoration, no blocks found for database {}", db_name).into());
71        }
72    });
73
74    // In fs_persist mode, proactively ensure the on-disk structure exists for this DB
75    // so tests that inspect the filesystem right after first sync can find the blocks dir.
76    #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
77    {
78        let mut db_dir = fs_base_dir.clone();
79        db_dir.push(db_name);
80        let _ = fs::create_dir_all(&db_dir);
81        let mut blocks_dir = db_dir.clone();
82        blocks_dir.push("blocks");
83        let _ = fs::create_dir_all(&blocks_dir);
84        println!("[fs] init base_dir={:?}, db_dir={:?}, blocks_dir={:?}", fs_base_dir, db_dir, blocks_dir);
85        // Ensure metadata.json exists
86        let mut meta_path = db_dir.clone();
87        meta_path.push("metadata.json");
88        if fs::metadata(&meta_path).is_err() {
89            if let Ok(mut f) = fs::File::create(&meta_path) {
90                let _ = f.write_all(br#"{"entries":[]}"#);
91            }
92        }
93        // Ensure allocations.json exists
94        let mut alloc_path = db_dir.clone();
95        alloc_path.push("allocations.json");
96        if fs::metadata(&alloc_path).is_err() {
97            if let Ok(mut f) = fs::File::create(&alloc_path) {
98                let _ = f.write_all(br#"{"allocated":[]}"#);
99            }
100        }
101        // Ensure deallocated.json exists
102        let mut dealloc_path = db_dir.clone();
103        dealloc_path.push("deallocated.json");
104        if fs::metadata(&dealloc_path).is_err() {
105            if let Ok(mut f) = fs::File::create(&dealloc_path) {
106                let _ = f.write_all(br#"{"tombstones":[]}"#);
107            }
108        }
109    }
110
111    // Initialize allocation tracking
112    let (allocated_blocks, next_block_id) = {
113        // WASM: restore allocation state from global storage
114        #[cfg(target_arch = "wasm32")]
115        {
116            let mut allocated_blocks = HashSet::new();
117            let mut next_block_id: u64 = 1;
118
119            vfs_sync::with_global_allocation_map(|allocation_map| {
120                let allocation_map = allocation_map.borrow();
121                if let Some(existing_allocations) = allocation_map.get(db_name) {
122                    allocated_blocks = existing_allocations.clone();
123                    next_block_id = allocated_blocks.iter().max().copied().unwrap_or(0) + 1;
124                    log::info!(
125                        "Restored {} allocated blocks for database: {}",
126                        allocated_blocks.len(),
127                        db_name
128                    );
129                }
130
131            });
132
133            (allocated_blocks, next_block_id)
134        }
135
136        // fs_persist (native): restore allocation from filesystem
137        #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
138        {
139            let mut path = fs_base_dir.clone();
140            path.push(db_name);
141            let mut alloc_path = path.clone();
142            alloc_path.push("allocations.json");
143            let (mut allocated_blocks, mut next_block_id) = (HashSet::new(), 1u64);
144            if let Ok(mut f) = fs::File::open(&alloc_path) {
145                let mut s = String::new();
146                if f.read_to_string(&mut s).is_ok() {
147                    if let Ok(parsed) = serde_json::from_str::<FsAlloc>(&s) {
148                        for id in parsed.allocated { allocated_blocks.insert(id); }
149                        next_block_id = allocated_blocks.iter().max().copied().unwrap_or(0) + 1;
150                        log::info!("[fs] Restored {} allocated blocks for database: {}", allocated_blocks.len(), db_name);
151                    }
152                }
153            }
154            (allocated_blocks, next_block_id)
155        }
156
157        // Native tests: restore allocation from test-global (when fs_persist is disabled)
158        #[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
159        {
160            let mut allocated_blocks = HashSet::new();
161            let mut next_block_id: u64 = 1;
162
163            vfs_sync::with_global_allocation_map(|allocation_map| {
164                let allocation_map = allocation_map.borrow();
165                if let Some(existing_allocations) = allocation_map.get(db_name) {
166                    allocated_blocks = existing_allocations.clone();
167                    next_block_id = allocated_blocks.iter().max().copied().unwrap_or(0) + 1;
168                    log::info!(
169                        "[test] Restored {} allocated blocks for database: {}",
170                        allocated_blocks.len(),
171                        db_name
172                    );
173                }
174            });
175
176            (allocated_blocks, next_block_id)
177        }
178
179        // Native defaults
180        #[cfg(all(not(target_arch = "wasm32"), not(any(test, debug_assertions))))]
181        {
182            (HashSet::new(), 1u64)
183        }
184    };
185
186    // Initialize checksum map, restoring persisted metadata in WASM builds
187    #[cfg(target_arch = "wasm32")]
188    let checksums_init: HashMap<u64, u64> = {
189        // First, try to load commit marker and data from IndexedDB
190        let restored_from_indexeddb = super::wasm_indexeddb::restore_from_indexeddb(db_name).await.is_ok();
191        if !restored_from_indexeddb {
192            log::warn!("Failed to restore from IndexedDB for {}", db_name);
193        }
194        
195        let mut map = HashMap::new();
196        let committed = vfs_sync::with_global_commit_marker(|cm| {
197            let cm = cm.borrow();
198            cm.get(db_name).copied().unwrap_or(0)
199        });
200        vfs_sync::with_global_metadata(|meta| {
201            let meta_map = meta.borrow();
202            if let Some(db_meta) = meta_map.get(db_name) {
203                for (bid, m) in db_meta.iter() {
204                    if (m.version as u64) <= committed {
205                        map.insert(*bid, m.checksum);
206                    }
207                }
208                log::info!(
209                    "Restored {} checksum entries for database: {} (IndexedDB restore: {})",
210                    map.len(),
211                    db_name,
212                    restored_from_indexeddb
213                );
214            }
215        });
216        map
217    };
218
219    // fs_persist: restore from metadata.json
220    #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
221    let checksums_init: HashMap<u64, u64> = {
222        let mut map = HashMap::new();
223        let mut path = fs_base_dir.clone();
224        path.push(db_name);
225        let mut meta_path = path.clone();
226        meta_path.push("metadata.json");
227        if let Ok(mut f) = fs::File::open(&meta_path) {
228            let mut s = String::new();
229            if f.read_to_string(&mut s).is_ok() {
230                if let Ok(val) = serde_json::from_str::<serde_json::Value>(&s) {
231                    if let Some(entries) = val.get("entries").and_then(|v| v.as_array()) {
232                        for entry in entries.iter() {
233                            if let Some(arr) = entry.as_array() {
234                                if arr.len() == 2 {
235                                    let id_opt = arr.get(0).and_then(|v| v.as_u64());
236                                    let meta_opt = arr.get(1).and_then(|v| v.as_object());
237                                    if let (Some(bid), Some(meta)) = (id_opt, meta_opt) {
238                                        if let Some(csum) = meta.get("checksum").and_then(|v| v.as_u64()) {
239                                            map.insert(bid, csum);
240                                        }
241                                    }
242                                }
243                            }
244                        }
245                        log::info!("[fs] Restored checksum metadata for database: {}", db_name);
246                    }
247                }
248            }
249        }
250        map
251    };
252
253    // fs_persist: restore per-block checksum algorithms from metadata.json
254    #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
255    let checksum_algos_init: HashMap<u64, ChecksumAlgorithm> = {
256        let mut map = HashMap::new();
257        let mut path = fs_base_dir.clone();
258        path.push(db_name);
259        let mut meta_path = path.clone();
260        meta_path.push("metadata.json");
261        if let Ok(mut f) = fs::File::open(&meta_path) {
262            let mut s = String::new();
263            if f.read_to_string(&mut s).is_ok() {
264                if let Ok(val) = serde_json::from_str::<serde_json::Value>(&s) {
265                    if let Some(entries) = val.get("entries").and_then(|v| v.as_array()) {
266                        for entry in entries.iter() {
267                            if let Some(arr) = entry.as_array() {
268                                if arr.len() == 2 {
269                                    let id_opt = arr.get(0).and_then(|v| v.as_u64());
270                                    let meta_opt = arr.get(1).and_then(|v| v.as_object());
271                                    if let (Some(bid), Some(meta)) = (id_opt, meta_opt) {
272                                        let algo_opt = meta.get("algo").and_then(|v| v.as_str());
273                                        let algo = match algo_opt {
274                                            Some("FastHash") => Some(ChecksumAlgorithm::FastHash),
275                                            Some("CRC32") => Some(ChecksumAlgorithm::CRC32),
276                                            _ => None, // tolerate invalid/missing by not inserting; will fallback to default later
277                                        };
278                                        if let Some(a) = algo { map.insert(bid, a); }
279                                    }
280                                }
281                            }
282                        }
283                        log::info!("[fs] Restored checksum algorithms for database: {}", db_name);
284                    }
285                }
286            }
287        }
288        map
289    };
290
291    // fs_persist: restore deallocation tombstones
292    #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
293    let deallocated_init: HashSet<u64> = {
294        let mut set = HashSet::new();
295        let mut path = fs_base_dir.clone();
296        path.push(db_name);
297        let mut dealloc_path = path.clone();
298        dealloc_path.push("deallocated.json");
299        if let Ok(mut f) = fs::File::open(&dealloc_path) {
300            let mut s = String::new();
301            if f.read_to_string(&mut s).is_ok() {
302                if let Ok(parsed) = serde_json::from_str::<FsDealloc>(&s) {
303                    for id in parsed.tombstones { set.insert(id); }
304                    log::info!("[fs] Restored {} deallocation tombstones for database: {}", set.len(), db_name);
305                }
306            }
307        }
308        set
309    };
310
311    // Native tests: restore from test-global metadata (when fs_persist is disabled)
312    #[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
313    let checksums_init: HashMap<u64, u64> = {
314        let mut map = HashMap::new();
315        let committed = vfs_sync::with_global_commit_marker(|cm| {
316            let cm = cm.borrow();
317            cm.get(db_name).copied().unwrap_or(0)
318        });
319        GLOBAL_METADATA_TEST.with(|meta| {
320            let meta_map = meta.borrow();
321            if let Some(db_meta) = meta_map.get(db_name) {
322                for (bid, m) in db_meta.iter() {
323                    if (m.version as u64) <= committed {
324                        map.insert(*bid, m.checksum);
325                    }
326                }
327                log::info!(
328                    "[test] Restored {} checksum entries for database: {}",
329                    db_meta.len(),
330                    db_name
331                );
332            }
333        });
334        map
335    };
336
337    // Native tests: restore per-block algorithms (when fs_persist is disabled)
338    #[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
339    let checksum_algos_init: HashMap<u64, ChecksumAlgorithm> = {
340        let mut map = HashMap::new();
341        let committed = vfs_sync::with_global_commit_marker(|cm| {
342            let cm = cm.borrow();
343            cm.get(db_name).copied().unwrap_or(0)
344        });
345        GLOBAL_METADATA_TEST.with(|meta| {
346            let meta_map = meta.borrow();
347            if let Some(db_meta) = meta_map.get(db_name) {
348                for (bid, m) in db_meta.iter() {
349                    if (m.version as u64) <= committed {
350                        map.insert(*bid, m.algo);
351                    }
352                }
353            }
354        });
355        map
356    };
357
358    // Native non-test: start empty
359    #[cfg(all(not(target_arch = "wasm32"), not(any(test, debug_assertions))))]
360    let checksums_init: HashMap<u64, u64> = HashMap::new();
361
362    // Native non-test: start empty for algorithms
363    #[cfg(all(not(target_arch = "wasm32"), not(any(test, debug_assertions))))]
364    let checksum_algos_init: HashMap<u64, ChecksumAlgorithm> = HashMap::new();
365
366    // WASM: restore per-block algorithms
367    #[cfg(target_arch = "wasm32")]
368    let checksum_algos_init: HashMap<u64, ChecksumAlgorithm> = {
369        let mut map = HashMap::new();
370        let committed = vfs_sync::with_global_commit_marker(|cm| {
371            let cm = cm.borrow();
372            cm.get(db_name).copied().unwrap_or(0)
373        });
374        vfs_sync::with_global_metadata(|meta| {
375            let meta_map = meta.borrow();
376            if let Some(db_meta) = meta_map.get(db_name) {
377                for (bid, m) in db_meta.iter() {
378                    if (m.version as u64) <= committed {
379                        map.insert(*bid, m.algo);
380                    }
381                }
382            }
383        });
384        map
385    };
386
387    // Determine default checksum algorithm from environment (fs_persist native), fallback to FastHash
388    #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
389    let checksum_algo_default = match env::var("DATASYNC_CHECKSUM_ALGO").ok().as_deref() {
390        Some("CRC32") => ChecksumAlgorithm::CRC32,
391        _ => ChecksumAlgorithm::FastHash,
392    };
393    #[cfg(not(all(not(target_arch = "wasm32"), feature = "fs_persist")))]
394    let checksum_algo_default = ChecksumAlgorithm::FastHash;
395
396    Ok(BlockStorage {
397        cache: HashMap::new(),
398        dirty_blocks: Arc::new(Mutex::new(HashMap::new())),
399        allocated_blocks,
400        next_block_id,
401        capacity: DEFAULT_CACHE_CAPACITY,
402        lru_order: VecDeque::new(),
403        checksum_manager: ChecksumManager::with_data(
404            checksums_init,
405            checksum_algos_init,
406            checksum_algo_default,
407        ),
408        db_name: db_name.to_string(),
409        #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
410        base_dir: fs_base_dir,
411        #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
412        deallocated_blocks: deallocated_init,
413        #[cfg(any(target_arch = "wasm32", not(feature = "fs_persist")))]
414        deallocated_blocks: HashSet::new(),
415        auto_sync_interval: None,
416        #[cfg(not(target_arch = "wasm32"))]
417        last_auto_sync: Instant::now(),
418        policy: None,
419        #[cfg(not(target_arch = "wasm32"))]
420        auto_sync_stop: None,
421        #[cfg(not(target_arch = "wasm32"))]
422        auto_sync_thread: None,
423        #[cfg(not(target_arch = "wasm32"))]
424        debounce_thread: None,
425        #[cfg(not(target_arch = "wasm32"))]
426        tokio_timer_task: None,
427        #[cfg(not(target_arch = "wasm32"))]
428        tokio_debounce_task: None,
429        #[cfg(not(target_arch = "wasm32"))]
430        last_write_ms: Arc::new(AtomicU64::new(0)),
431        #[cfg(not(target_arch = "wasm32"))]
432        threshold_hit: Arc::new(AtomicBool::new(false)),
433        #[cfg(not(target_arch = "wasm32"))]
434        sync_count: Arc::new(AtomicU64::new(0)),
435        #[cfg(not(target_arch = "wasm32"))]
436        timer_sync_count: Arc::new(AtomicU64::new(0)),
437        #[cfg(not(target_arch = "wasm32"))]
438        debounce_sync_count: Arc::new(AtomicU64::new(0)),
439        #[cfg(not(target_arch = "wasm32"))]
440        last_sync_duration_ms: Arc::new(AtomicU64::new(0)),
441        #[cfg(not(target_arch = "wasm32"))]
442        sync_sender: None,
443        #[cfg(not(target_arch = "wasm32"))]
444        sync_receiver: None,
445        recovery_report: RecoveryReport::default(),
446        #[cfg(target_arch = "wasm32")]
447        leader_election: None,
448        observability: super::observability::ObservabilityManager::new(),
449        #[cfg(feature = "telemetry")]
450        metrics: None,
451    })
452}