1#[cfg(target_arch = "wasm32")]
5use super::block_storage::{BlockStorage, DEFAULT_CACHE_CAPACITY, RecoveryReport};
6#[cfg(target_arch = "wasm32")]
7use super::metadata::{ChecksumAlgorithm, ChecksumManager};
8#[cfg(target_arch = "wasm32")]
9use super::vfs_sync;
10#[cfg(target_arch = "wasm32")]
11use crate::types::DatabaseError;
12#[cfg(target_arch = "wasm32")]
13use std::cell::RefCell;
14#[cfg(target_arch = "wasm32")]
15use std::collections::{HashMap, HashSet, VecDeque};
16#[cfg(target_arch = "wasm32")]
17use std::sync::Arc;
18
19#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
21#[derive(serde::Serialize, serde::Deserialize, Default)]
22#[allow(dead_code)]
23struct FsAlloc {
24 allocated: Vec<u64>,
25}
26
27#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
28#[derive(serde::Serialize, serde::Deserialize, Default)]
29#[allow(dead_code)]
30struct FsDealloc {
31 tombstones: Vec<u64>,
32}
33
34#[cfg(target_arch = "wasm32")]
36pub async fn new_wasm(db_name: &str) -> Result<BlockStorage, DatabaseError> {
37 log::info!("Creating BlockStorage for database: {}", db_name);
38
39 let recovery_performed = super::wasm_indexeddb::perform_indexeddb_recovery_scan(db_name)
41 .await
42 .unwrap_or(false);
43 if recovery_performed {
44 log::info!("IndexedDB recovery scan completed for: {}", db_name);
45 }
46
47 match super::wasm_indexeddb::restore_from_indexeddb(db_name).await {
49 Ok(_) => log::info!(
50 "Successfully restored BlockStorage from IndexedDB for: {}",
51 db_name
52 ),
53 Err(e) => log::warn!(
54 "IndexedDB restoration failed for {}: {}",
55 db_name,
56 e.message
57 ),
58 }
59
60 vfs_sync::with_global_storage(|storage_map| {
62 if let Some(db_storage) = storage_map.borrow().get(db_name) {
63 #[cfg(target_arch = "wasm32")]
64 web_sys::console::log_1(
65 &format!(
66 "DEBUG: After restoration, database {} has {} blocks in global storage",
67 db_name,
68 db_storage.len()
69 )
70 .into(),
71 );
72 for (block_id, data) in db_storage.iter() {
73 let preview = if data.len() >= 16 {
74 format!(
75 "{:02x} {:02x} {:02x} {:02x} {:02x} {:02x} {:02x} {:02x} {:02x} {:02x} {:02x} {:02x} {:02x} {:02x} {:02x} {:02x}",
76 data[0],
77 data[1],
78 data[2],
79 data[3],
80 data[4],
81 data[5],
82 data[6],
83 data[7],
84 data[8],
85 data[9],
86 data[10],
87 data[11],
88 data[12],
89 data[13],
90 data[14],
91 data[15]
92 )
93 } else {
94 "short".to_string()
95 };
96 #[cfg(target_arch = "wasm32")]
97 web_sys::console::log_1(
98 &format!(
99 "DEBUG: Block {} preview after restoration: {}",
100 block_id, preview
101 )
102 .into(),
103 );
104
105 if *block_id == 0 {
107 let is_valid = data.len() >= 16 && &data[0..16] == b"SQLite format 3\0";
108 #[cfg(target_arch = "wasm32")]
109 web_sys::console::log_1(
110 &format!("DEBUG: Block 0 SQLite header valid: {}", is_valid).into(),
111 );
112 }
113 }
114
115 #[cfg(target_arch = "wasm32")]
116 web_sys::console::log_1(
117 &format!(
118 "DEBUG: Found {} blocks in global storage for pre-population",
119 db_storage.len()
120 )
121 .into(),
122 );
123 } else {
124 #[cfg(target_arch = "wasm32")]
125 web_sys::console::log_1(
126 &format!(
127 "DEBUG: After restoration, no blocks found for database {}",
128 db_name
129 )
130 .into(),
131 );
132 }
133 });
134
135 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
138 {
139 let mut db_dir = fs_base_dir.clone();
140 db_dir.push(db_name);
141 let _ = fs::create_dir_all(&db_dir);
142 let mut blocks_dir = db_dir.clone();
143 blocks_dir.push("blocks");
144 let _ = fs::create_dir_all(&blocks_dir);
145 println!(
146 "[fs] init base_dir={:?}, db_dir={:?}, blocks_dir={:?}",
147 fs_base_dir, db_dir, blocks_dir
148 );
149 let mut meta_path = db_dir.clone();
151 meta_path.push("metadata.json");
152 if fs::metadata(&meta_path).is_err() {
153 if let Ok(mut f) = fs::File::create(&meta_path) {
154 let _ = f.write_all(br#"{"entries":[]}"#);
155 }
156 }
157 let mut alloc_path = db_dir.clone();
159 alloc_path.push("allocations.json");
160 if fs::metadata(&alloc_path).is_err() {
161 if let Ok(mut f) = fs::File::create(&alloc_path) {
162 let _ = f.write_all(br#"{"allocated":[]}"#);
163 }
164 }
165 let mut dealloc_path = db_dir.clone();
167 dealloc_path.push("deallocated.json");
168 if fs::metadata(&dealloc_path).is_err() {
169 if let Ok(mut f) = fs::File::create(&dealloc_path) {
170 let _ = f.write_all(br#"{"tombstones":[]}"#);
171 }
172 }
173 }
174
175 let (allocated_blocks, next_block_id) = {
177 #[cfg(target_arch = "wasm32")]
179 {
180 let mut allocated_blocks = HashSet::new();
181 let mut next_block_id: u64 = 1;
182
183 vfs_sync::with_global_allocation_map(|allocation_map| {
184 if let Some(existing_allocations) = allocation_map.borrow().get(db_name) {
185 allocated_blocks = existing_allocations.clone();
186 next_block_id = allocated_blocks.iter().max().copied().unwrap_or(0) + 1;
187 log::info!(
188 "Restored {} allocated blocks for database: {}",
189 allocated_blocks.len(),
190 db_name
191 );
192 }
193 });
194
195 (allocated_blocks, next_block_id)
196 }
197
198 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
200 {
201 let mut path = fs_base_dir.clone();
202 path.push(db_name);
203 let mut alloc_path = path.clone();
204 alloc_path.push("allocations.json");
205 let (mut allocated_blocks, mut next_block_id) = (HashSet::new(), 1u64);
206 if let Ok(mut f) = fs::File::open(&alloc_path) {
207 let mut s = String::new();
208 if f.read_to_string(&mut s).is_ok() {
209 if let Ok(parsed) = serde_json::from_str::<FsAlloc>(&s) {
210 for id in parsed.allocated {
211 allocated_blocks.insert(id);
212 }
213 next_block_id = allocated_blocks.iter().max().copied().unwrap_or(0) + 1;
214 log::info!(
215 "[fs] Restored {} allocated blocks for database: {}",
216 allocated_blocks.len(),
217 db_name
218 );
219 }
220 }
221 }
222 (allocated_blocks, next_block_id)
223 }
224
225 #[cfg(all(
227 not(target_arch = "wasm32"),
228 any(test, debug_assertions),
229 not(feature = "fs_persist")
230 ))]
231 {
232 let mut allocated_blocks = HashSet::new();
233 let mut next_block_id: u64 = 1;
234
235 vfs_sync::with_global_allocation_map(|allocation_map| {
236 if let Some(existing_allocations) = allocation_map.get(db_name) {
237 allocated_blocks = existing_allocations.clone();
238 next_block_id = allocated_blocks.iter().max().copied().unwrap_or(0) + 1;
239 log::info!(
240 "[test] Restored {} allocated blocks for database: {}",
241 allocated_blocks.len(),
242 db_name
243 );
244 }
245 });
246
247 (allocated_blocks, next_block_id)
248 }
249
250 #[cfg(all(not(target_arch = "wasm32"), not(any(test, debug_assertions))))]
252 {
253 (HashSet::new(), 1u64)
254 }
255 };
256
257 #[cfg(target_arch = "wasm32")]
259 let checksums_init: HashMap<u64, u64> = {
260 let mut map = HashMap::new();
261 let committed = vfs_sync::with_global_commit_marker(|cm| {
262 cm.borrow().get(db_name).copied().unwrap_or(0)
263 });
264 vfs_sync::with_global_metadata(|meta_map| {
265 if let Some(db_meta) = meta_map.borrow().get(db_name) {
266 for (bid, m) in db_meta.iter() {
267 if (m.version as u64) <= committed {
268 map.insert(*bid, m.checksum);
269 }
270 }
271 log::info!(
272 "Restored {} checksum entries for database: {}",
273 map.len(),
274 db_name
275 );
276 }
277 });
278 map
279 };
280
281 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
283 let checksums_init: HashMap<u64, u64> = {
284 let mut map = HashMap::new();
285 let mut path = fs_base_dir.clone();
286 path.push(db_name);
287 let mut meta_path = path.clone();
288 meta_path.push("metadata.json");
289 if let Ok(mut f) = fs::File::open(&meta_path) {
290 let mut s = String::new();
291 if f.read_to_string(&mut s).is_ok() {
292 if let Ok(val) = serde_json::from_str::<serde_json::Value>(&s) {
293 if let Some(entries) = val.get("entries").and_then(|v| v.as_array()) {
294 for entry in entries.iter() {
295 if let Some(arr) = entry.as_array() {
296 if arr.len() == 2 {
297 let id_opt = arr.get(0).and_then(|v| v.as_u64());
298 let meta_opt = arr.get(1).and_then(|v| v.as_object());
299 if let (Some(bid), Some(meta)) = (id_opt, meta_opt) {
300 if let Some(csum) =
301 meta.get("checksum").and_then(|v| v.as_u64())
302 {
303 map.insert(bid, csum);
304 }
305 }
306 }
307 }
308 }
309 log::info!("[fs] Restored checksum metadata for database: {}", db_name);
310 }
311 }
312 }
313 }
314 map
315 };
316
317 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
319 let checksum_algos_init: HashMap<u64, ChecksumAlgorithm> = {
320 let mut map = HashMap::new();
321 let mut path = fs_base_dir.clone();
322 path.push(db_name);
323 let mut meta_path = path.clone();
324 meta_path.push("metadata.json");
325 if let Ok(mut f) = fs::File::open(&meta_path) {
326 let mut s = String::new();
327 if f.read_to_string(&mut s).is_ok() {
328 if let Ok(val) = serde_json::from_str::<serde_json::Value>(&s) {
329 if let Some(entries) = val.get("entries").and_then(|v| v.as_array()) {
330 for entry in entries.iter() {
331 if let Some(arr) = entry.as_array() {
332 if arr.len() == 2 {
333 let id_opt = arr.get(0).and_then(|v| v.as_u64());
334 let meta_opt = arr.get(1).and_then(|v| v.as_object());
335 if let (Some(bid), Some(meta)) = (id_opt, meta_opt) {
336 let algo_opt = meta.get("algo").and_then(|v| v.as_str());
337 let algo = match algo_opt {
338 Some("FastHash") => Some(ChecksumAlgorithm::FastHash),
339 Some("CRC32") => Some(ChecksumAlgorithm::CRC32),
340 _ => None, };
342 if let Some(a) = algo {
343 map.insert(bid, a);
344 }
345 }
346 }
347 }
348 }
349 log::info!(
350 "[fs] Restored checksum algorithms for database: {}",
351 db_name
352 );
353 }
354 }
355 }
356 }
357 map
358 };
359
360 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
362 let deallocated_init: HashSet<u64> = {
363 let mut set = HashSet::new();
364 let mut path = fs_base_dir.clone();
365 path.push(db_name);
366 let mut dealloc_path = path.clone();
367 dealloc_path.push("deallocated.json");
368 if let Ok(mut f) = fs::File::open(&dealloc_path) {
369 let mut s = String::new();
370 if f.read_to_string(&mut s).is_ok() {
371 if let Ok(parsed) = serde_json::from_str::<FsDealloc>(&s) {
372 for id in parsed.tombstones {
373 set.insert(id);
374 }
375 log::info!(
376 "[fs] Restored {} deallocation tombstones for database: {}",
377 set.len(),
378 db_name
379 );
380 }
381 }
382 }
383 set
384 };
385
386 #[cfg(all(
388 not(target_arch = "wasm32"),
389 any(test, debug_assertions),
390 not(feature = "fs_persist")
391 ))]
392 let checksums_init: HashMap<u64, u64> = {
393 let mut map = HashMap::new();
394 let committed =
395 vfs_sync::with_global_commit_marker(|cm| cm.get(db_name).copied().unwrap_or(0));
396 GLOBAL_METADATA_TEST.with(|meta| {
397 let meta_map = meta.borrow_mut();
398 if let Some(db_meta) = meta_map.get(db_name) {
399 for (bid, m) in db_meta.iter() {
400 if (m.version as u64) <= committed {
401 map.insert(*bid, m.checksum);
402 }
403 }
404 log::info!(
405 "[test] Restored {} checksum entries for database: {}",
406 db_meta.len(),
407 db_name
408 );
409 }
410 });
411 map
412 };
413
414 #[cfg(all(
416 not(target_arch = "wasm32"),
417 any(test, debug_assertions),
418 not(feature = "fs_persist")
419 ))]
420 let checksum_algos_init: HashMap<u64, ChecksumAlgorithm> = {
421 let mut map = HashMap::new();
422 let committed =
423 vfs_sync::with_global_commit_marker(|cm| cm.get(db_name).copied().unwrap_or(0));
424 GLOBAL_METADATA_TEST.with(|meta| {
425 let meta_map = meta.borrow_mut();
426 if let Some(db_meta) = meta_map.get(db_name) {
427 for (bid, m) in db_meta.iter() {
428 if (m.version as u64) <= committed {
429 map.insert(*bid, m.algo);
430 }
431 }
432 }
433 });
434 map
435 };
436
437 #[cfg(all(not(target_arch = "wasm32"), not(any(test, debug_assertions))))]
439 let checksums_init: HashMap<u64, u64> = HashMap::new();
440
441 #[cfg(all(not(target_arch = "wasm32"), not(any(test, debug_assertions))))]
443 let checksum_algos_init: HashMap<u64, ChecksumAlgorithm> = HashMap::new();
444
445 #[cfg(target_arch = "wasm32")]
447 let checksum_algos_init: HashMap<u64, ChecksumAlgorithm> = {
448 let mut map = HashMap::new();
449 let committed = vfs_sync::with_global_commit_marker(|cm| {
450 cm.borrow().get(db_name).copied().unwrap_or(0)
451 });
452 vfs_sync::with_global_metadata(|meta_map| {
453 if let Some(db_meta) = meta_map.borrow().get(db_name) {
454 for (bid, m) in db_meta.iter() {
455 if (m.version as u64) <= committed {
456 map.insert(*bid, m.algo);
457 }
458 }
459 }
460 });
461 map
462 };
463
464 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
466 let checksum_algo_default = match env::var("DATASYNC_CHECKSUM_ALGO").ok().as_deref() {
467 Some("CRC32") => ChecksumAlgorithm::CRC32,
468 _ => ChecksumAlgorithm::FastHash,
469 };
470 #[cfg(not(all(not(target_arch = "wasm32"), feature = "fs_persist")))]
471 let checksum_algo_default = ChecksumAlgorithm::FastHash;
472
473 Ok(BlockStorage {
474 #[cfg(target_arch = "wasm32")]
475 cache: RefCell::new(HashMap::new()),
476 #[cfg(not(target_arch = "wasm32"))]
477 cache: Mutex::new(HashMap::new()),
478
479 #[cfg(target_arch = "wasm32")]
480 dirty_blocks: Arc::new(RefCell::new(HashMap::new())),
481 #[cfg(not(target_arch = "wasm32"))]
482 dirty_blocks: Arc::new(Mutex::new(HashMap::new())),
483
484 #[cfg(target_arch = "wasm32")]
485 allocated_blocks: RefCell::new(allocated_blocks),
486 #[cfg(not(target_arch = "wasm32"))]
487 allocated_blocks: Mutex::new(allocated_blocks),
488
489 next_block_id: std::sync::atomic::AtomicU64::new(next_block_id),
490 capacity: DEFAULT_CACHE_CAPACITY,
491
492 #[cfg(target_arch = "wasm32")]
493 lru_order: RefCell::new(VecDeque::new()),
494 #[cfg(not(target_arch = "wasm32"))]
495 lru_order: Mutex::new(VecDeque::new()),
496 checksum_manager: ChecksumManager::with_data(
497 checksums_init,
498 checksum_algos_init,
499 checksum_algo_default,
500 ),
501 db_name: db_name.to_string(),
502 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
503 base_dir: fs_base_dir,
504
505 #[cfg(all(target_arch = "wasm32", feature = "fs_persist"))]
506 deallocated_blocks: RefCell::new(deallocated_init),
507 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
508 deallocated_blocks: Mutex::new(deallocated_init),
509 #[cfg(all(target_arch = "wasm32", not(feature = "fs_persist")))]
510 deallocated_blocks: RefCell::new(HashSet::new()),
511 #[cfg(all(not(target_arch = "wasm32"), not(feature = "fs_persist")))]
512 deallocated_blocks: Mutex::new(HashSet::new()),
513
514 #[cfg(target_arch = "wasm32")]
515 auto_sync_interval: RefCell::new(None),
516 #[cfg(not(target_arch = "wasm32"))]
517 auto_sync_interval: Mutex::new(None),
518
519 #[cfg(not(target_arch = "wasm32"))]
520 last_auto_sync: Instant::now(),
521
522 #[cfg(target_arch = "wasm32")]
523 policy: RefCell::new(None),
524 #[cfg(not(target_arch = "wasm32"))]
525 policy: Mutex::new(None),
526 #[cfg(not(target_arch = "wasm32"))]
527 auto_sync_stop: None,
528 #[cfg(not(target_arch = "wasm32"))]
529 auto_sync_thread: None,
530 #[cfg(not(target_arch = "wasm32"))]
531 debounce_thread: None,
532 #[cfg(not(target_arch = "wasm32"))]
533 tokio_timer_task: None,
534 #[cfg(not(target_arch = "wasm32"))]
535 tokio_debounce_task: None,
536 #[cfg(not(target_arch = "wasm32"))]
537 last_write_ms: Arc::new(AtomicU64::new(0)),
538 #[cfg(not(target_arch = "wasm32"))]
539 threshold_hit: Arc::new(AtomicBool::new(false)),
540 #[cfg(not(target_arch = "wasm32"))]
541 sync_count: Arc::new(AtomicU64::new(0)),
542 #[cfg(not(target_arch = "wasm32"))]
543 timer_sync_count: Arc::new(AtomicU64::new(0)),
544 #[cfg(not(target_arch = "wasm32"))]
545 debounce_sync_count: Arc::new(AtomicU64::new(0)),
546 #[cfg(not(target_arch = "wasm32"))]
547 last_sync_duration_ms: Arc::new(AtomicU64::new(0)),
548 #[cfg(not(target_arch = "wasm32"))]
549 sync_sender: None,
550 #[cfg(not(target_arch = "wasm32"))]
551 sync_receiver: None,
552 recovery_report: RecoveryReport::default(),
553 #[cfg(target_arch = "wasm32")]
554 leader_election: std::cell::RefCell::new(None),
555 observability: super::observability::ObservabilityManager::new(),
556 #[cfg(feature = "telemetry")]
557 metrics: None,
558 })
559}