1#[cfg(target_arch = "wasm32")]
5use std::collections::{HashMap, HashSet, VecDeque};
6#[cfg(target_arch = "wasm32")]
7use std::sync::Arc;
8#[cfg(target_arch = "wasm32")]
9use parking_lot::Mutex;
10#[cfg(target_arch = "wasm32")]
11use crate::types::DatabaseError;
12#[cfg(target_arch = "wasm32")]
13use super::metadata::{ChecksumManager, ChecksumAlgorithm};
14#[cfg(target_arch = "wasm32")]
15use super::block_storage::{BlockStorage, RecoveryReport, DEFAULT_CACHE_CAPACITY};
16#[cfg(target_arch = "wasm32")]
17use super::vfs_sync;
18
19
20#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
22#[derive(serde::Serialize, serde::Deserialize, Default)]
23#[allow(dead_code)]
24struct FsAlloc { allocated: Vec<u64> }
25
26#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
27#[derive(serde::Serialize, serde::Deserialize, Default)]
28#[allow(dead_code)]
29struct FsDealloc { tombstones: Vec<u64> }
30
31#[cfg(target_arch = "wasm32")]
33pub async fn new_wasm(db_name: &str) -> Result<BlockStorage, DatabaseError> {
34 log::info!("Creating BlockStorage for database: {}", db_name);
35
36 let recovery_performed = super::wasm_indexeddb::perform_indexeddb_recovery_scan(db_name).await
38 .unwrap_or(false);
39 if recovery_performed {
40 log::info!("IndexedDB recovery scan completed for: {}", db_name);
41 }
42
43 match super::wasm_indexeddb::restore_from_indexeddb(db_name).await {
45 Ok(_) => log::info!("Successfully restored BlockStorage from IndexedDB for: {}", db_name),
46 Err(e) => log::warn!("IndexedDB restoration failed for {}: {}", db_name, e.message),
47 }
48
49 vfs_sync::with_global_storage(|storage| {
51 let storage_map = storage.borrow();
52 if let Some(db_storage) = storage_map.get(db_name) {
53 #[cfg(target_arch = "wasm32")]
54 web_sys::console::log_1(&format!("DEBUG: After restoration, database {} has {} blocks in global storage", db_name, db_storage.len()).into());
55 for (block_id, data) in db_storage.iter() {
56 let preview = if data.len() >= 8 {
57 format!("{:02x} {:02x} {:02x} {:02x} {:02x} {:02x} {:02x} {:02x}",
58 data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7])
59 } else {
60 "short".to_string()
61 };
62 #[cfg(target_arch = "wasm32")]
63 web_sys::console::log_1(&format!("DEBUG: Block {} preview after restoration: {}", block_id, preview).into());
64 }
65
66 #[cfg(target_arch = "wasm32")]
67 web_sys::console::log_1(&format!("DEBUG: Found {} blocks in global storage for pre-population", db_storage.len()).into());
68 } else {
69 #[cfg(target_arch = "wasm32")]
70 web_sys::console::log_1(&format!("DEBUG: After restoration, no blocks found for database {}", db_name).into());
71 }
72 });
73
74 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
77 {
78 let mut db_dir = fs_base_dir.clone();
79 db_dir.push(db_name);
80 let _ = fs::create_dir_all(&db_dir);
81 let mut blocks_dir = db_dir.clone();
82 blocks_dir.push("blocks");
83 let _ = fs::create_dir_all(&blocks_dir);
84 println!("[fs] init base_dir={:?}, db_dir={:?}, blocks_dir={:?}", fs_base_dir, db_dir, blocks_dir);
85 let mut meta_path = db_dir.clone();
87 meta_path.push("metadata.json");
88 if fs::metadata(&meta_path).is_err() {
89 if let Ok(mut f) = fs::File::create(&meta_path) {
90 let _ = f.write_all(br#"{"entries":[]}"#);
91 }
92 }
93 let mut alloc_path = db_dir.clone();
95 alloc_path.push("allocations.json");
96 if fs::metadata(&alloc_path).is_err() {
97 if let Ok(mut f) = fs::File::create(&alloc_path) {
98 let _ = f.write_all(br#"{"allocated":[]}"#);
99 }
100 }
101 let mut dealloc_path = db_dir.clone();
103 dealloc_path.push("deallocated.json");
104 if fs::metadata(&dealloc_path).is_err() {
105 if let Ok(mut f) = fs::File::create(&dealloc_path) {
106 let _ = f.write_all(br#"{"tombstones":[]}"#);
107 }
108 }
109 }
110
111 let (allocated_blocks, next_block_id) = {
113 #[cfg(target_arch = "wasm32")]
115 {
116 let mut allocated_blocks = HashSet::new();
117 let mut next_block_id: u64 = 1;
118
119 vfs_sync::with_global_allocation_map(|allocation_map| {
120 let allocation_map = allocation_map.borrow();
121 if let Some(existing_allocations) = allocation_map.get(db_name) {
122 allocated_blocks = existing_allocations.clone();
123 next_block_id = allocated_blocks.iter().max().copied().unwrap_or(0) + 1;
124 log::info!(
125 "Restored {} allocated blocks for database: {}",
126 allocated_blocks.len(),
127 db_name
128 );
129 }
130
131 });
132
133 (allocated_blocks, next_block_id)
134 }
135
136 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
138 {
139 let mut path = fs_base_dir.clone();
140 path.push(db_name);
141 let mut alloc_path = path.clone();
142 alloc_path.push("allocations.json");
143 let (mut allocated_blocks, mut next_block_id) = (HashSet::new(), 1u64);
144 if let Ok(mut f) = fs::File::open(&alloc_path) {
145 let mut s = String::new();
146 if f.read_to_string(&mut s).is_ok() {
147 if let Ok(parsed) = serde_json::from_str::<FsAlloc>(&s) {
148 for id in parsed.allocated { allocated_blocks.insert(id); }
149 next_block_id = allocated_blocks.iter().max().copied().unwrap_or(0) + 1;
150 log::info!("[fs] Restored {} allocated blocks for database: {}", allocated_blocks.len(), db_name);
151 }
152 }
153 }
154 (allocated_blocks, next_block_id)
155 }
156
157 #[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
159 {
160 let mut allocated_blocks = HashSet::new();
161 let mut next_block_id: u64 = 1;
162
163 vfs_sync::with_global_allocation_map(|allocation_map| {
164 let allocation_map = allocation_map.borrow();
165 if let Some(existing_allocations) = allocation_map.get(db_name) {
166 allocated_blocks = existing_allocations.clone();
167 next_block_id = allocated_blocks.iter().max().copied().unwrap_or(0) + 1;
168 log::info!(
169 "[test] Restored {} allocated blocks for database: {}",
170 allocated_blocks.len(),
171 db_name
172 );
173 }
174 });
175
176 (allocated_blocks, next_block_id)
177 }
178
179 #[cfg(all(not(target_arch = "wasm32"), not(any(test, debug_assertions))))]
181 {
182 (HashSet::new(), 1u64)
183 }
184 };
185
186 #[cfg(target_arch = "wasm32")]
188 let checksums_init: HashMap<u64, u64> = {
189 let restored_from_indexeddb = super::wasm_indexeddb::restore_from_indexeddb(db_name).await.is_ok();
191 if !restored_from_indexeddb {
192 log::warn!("Failed to restore from IndexedDB for {}", db_name);
193 }
194
195 let mut map = HashMap::new();
196 let committed = vfs_sync::with_global_commit_marker(|cm| {
197 let cm = cm.borrow();
198 cm.get(db_name).copied().unwrap_or(0)
199 });
200 vfs_sync::with_global_metadata(|meta| {
201 let meta_map = meta.borrow();
202 if let Some(db_meta) = meta_map.get(db_name) {
203 for (bid, m) in db_meta.iter() {
204 if (m.version as u64) <= committed {
205 map.insert(*bid, m.checksum);
206 }
207 }
208 log::info!(
209 "Restored {} checksum entries for database: {} (IndexedDB restore: {})",
210 map.len(),
211 db_name,
212 restored_from_indexeddb
213 );
214 }
215 });
216 map
217 };
218
219 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
221 let checksums_init: HashMap<u64, u64> = {
222 let mut map = HashMap::new();
223 let mut path = fs_base_dir.clone();
224 path.push(db_name);
225 let mut meta_path = path.clone();
226 meta_path.push("metadata.json");
227 if let Ok(mut f) = fs::File::open(&meta_path) {
228 let mut s = String::new();
229 if f.read_to_string(&mut s).is_ok() {
230 if let Ok(val) = serde_json::from_str::<serde_json::Value>(&s) {
231 if let Some(entries) = val.get("entries").and_then(|v| v.as_array()) {
232 for entry in entries.iter() {
233 if let Some(arr) = entry.as_array() {
234 if arr.len() == 2 {
235 let id_opt = arr.get(0).and_then(|v| v.as_u64());
236 let meta_opt = arr.get(1).and_then(|v| v.as_object());
237 if let (Some(bid), Some(meta)) = (id_opt, meta_opt) {
238 if let Some(csum) = meta.get("checksum").and_then(|v| v.as_u64()) {
239 map.insert(bid, csum);
240 }
241 }
242 }
243 }
244 }
245 log::info!("[fs] Restored checksum metadata for database: {}", db_name);
246 }
247 }
248 }
249 }
250 map
251 };
252
253 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
255 let checksum_algos_init: HashMap<u64, ChecksumAlgorithm> = {
256 let mut map = HashMap::new();
257 let mut path = fs_base_dir.clone();
258 path.push(db_name);
259 let mut meta_path = path.clone();
260 meta_path.push("metadata.json");
261 if let Ok(mut f) = fs::File::open(&meta_path) {
262 let mut s = String::new();
263 if f.read_to_string(&mut s).is_ok() {
264 if let Ok(val) = serde_json::from_str::<serde_json::Value>(&s) {
265 if let Some(entries) = val.get("entries").and_then(|v| v.as_array()) {
266 for entry in entries.iter() {
267 if let Some(arr) = entry.as_array() {
268 if arr.len() == 2 {
269 let id_opt = arr.get(0).and_then(|v| v.as_u64());
270 let meta_opt = arr.get(1).and_then(|v| v.as_object());
271 if let (Some(bid), Some(meta)) = (id_opt, meta_opt) {
272 let algo_opt = meta.get("algo").and_then(|v| v.as_str());
273 let algo = match algo_opt {
274 Some("FastHash") => Some(ChecksumAlgorithm::FastHash),
275 Some("CRC32") => Some(ChecksumAlgorithm::CRC32),
276 _ => None, };
278 if let Some(a) = algo { map.insert(bid, a); }
279 }
280 }
281 }
282 }
283 log::info!("[fs] Restored checksum algorithms for database: {}", db_name);
284 }
285 }
286 }
287 }
288 map
289 };
290
291 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
293 let deallocated_init: HashSet<u64> = {
294 let mut set = HashSet::new();
295 let mut path = fs_base_dir.clone();
296 path.push(db_name);
297 let mut dealloc_path = path.clone();
298 dealloc_path.push("deallocated.json");
299 if let Ok(mut f) = fs::File::open(&dealloc_path) {
300 let mut s = String::new();
301 if f.read_to_string(&mut s).is_ok() {
302 if let Ok(parsed) = serde_json::from_str::<FsDealloc>(&s) {
303 for id in parsed.tombstones { set.insert(id); }
304 log::info!("[fs] Restored {} deallocation tombstones for database: {}", set.len(), db_name);
305 }
306 }
307 }
308 set
309 };
310
311 #[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
313 let checksums_init: HashMap<u64, u64> = {
314 let mut map = HashMap::new();
315 let committed = vfs_sync::with_global_commit_marker(|cm| {
316 let cm = cm.borrow();
317 cm.get(db_name).copied().unwrap_or(0)
318 });
319 GLOBAL_METADATA_TEST.with(|meta| {
320 let meta_map = meta.borrow();
321 if let Some(db_meta) = meta_map.get(db_name) {
322 for (bid, m) in db_meta.iter() {
323 if (m.version as u64) <= committed {
324 map.insert(*bid, m.checksum);
325 }
326 }
327 log::info!(
328 "[test] Restored {} checksum entries for database: {}",
329 db_meta.len(),
330 db_name
331 );
332 }
333 });
334 map
335 };
336
337 #[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
339 let checksum_algos_init: HashMap<u64, ChecksumAlgorithm> = {
340 let mut map = HashMap::new();
341 let committed = vfs_sync::with_global_commit_marker(|cm| {
342 let cm = cm.borrow();
343 cm.get(db_name).copied().unwrap_or(0)
344 });
345 GLOBAL_METADATA_TEST.with(|meta| {
346 let meta_map = meta.borrow();
347 if let Some(db_meta) = meta_map.get(db_name) {
348 for (bid, m) in db_meta.iter() {
349 if (m.version as u64) <= committed {
350 map.insert(*bid, m.algo);
351 }
352 }
353 }
354 });
355 map
356 };
357
358 #[cfg(all(not(target_arch = "wasm32"), not(any(test, debug_assertions))))]
360 let checksums_init: HashMap<u64, u64> = HashMap::new();
361
362 #[cfg(all(not(target_arch = "wasm32"), not(any(test, debug_assertions))))]
364 let checksum_algos_init: HashMap<u64, ChecksumAlgorithm> = HashMap::new();
365
366 #[cfg(target_arch = "wasm32")]
368 let checksum_algos_init: HashMap<u64, ChecksumAlgorithm> = {
369 let mut map = HashMap::new();
370 let committed = vfs_sync::with_global_commit_marker(|cm| {
371 let cm = cm.borrow();
372 cm.get(db_name).copied().unwrap_or(0)
373 });
374 vfs_sync::with_global_metadata(|meta| {
375 let meta_map = meta.borrow();
376 if let Some(db_meta) = meta_map.get(db_name) {
377 for (bid, m) in db_meta.iter() {
378 if (m.version as u64) <= committed {
379 map.insert(*bid, m.algo);
380 }
381 }
382 }
383 });
384 map
385 };
386
387 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
389 let checksum_algo_default = match env::var("DATASYNC_CHECKSUM_ALGO").ok().as_deref() {
390 Some("CRC32") => ChecksumAlgorithm::CRC32,
391 _ => ChecksumAlgorithm::FastHash,
392 };
393 #[cfg(not(all(not(target_arch = "wasm32"), feature = "fs_persist")))]
394 let checksum_algo_default = ChecksumAlgorithm::FastHash;
395
396 Ok(BlockStorage {
397 cache: HashMap::new(),
398 dirty_blocks: Arc::new(Mutex::new(HashMap::new())),
399 allocated_blocks,
400 next_block_id,
401 capacity: DEFAULT_CACHE_CAPACITY,
402 lru_order: VecDeque::new(),
403 checksum_manager: ChecksumManager::with_data(
404 checksums_init,
405 checksum_algos_init,
406 checksum_algo_default,
407 ),
408 db_name: db_name.to_string(),
409 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
410 base_dir: fs_base_dir,
411 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
412 deallocated_blocks: deallocated_init,
413 #[cfg(any(target_arch = "wasm32", not(feature = "fs_persist")))]
414 deallocated_blocks: HashSet::new(),
415 auto_sync_interval: None,
416 #[cfg(not(target_arch = "wasm32"))]
417 last_auto_sync: Instant::now(),
418 policy: None,
419 #[cfg(not(target_arch = "wasm32"))]
420 auto_sync_stop: None,
421 #[cfg(not(target_arch = "wasm32"))]
422 auto_sync_thread: None,
423 #[cfg(not(target_arch = "wasm32"))]
424 debounce_thread: None,
425 #[cfg(not(target_arch = "wasm32"))]
426 tokio_timer_task: None,
427 #[cfg(not(target_arch = "wasm32"))]
428 tokio_debounce_task: None,
429 #[cfg(not(target_arch = "wasm32"))]
430 last_write_ms: Arc::new(AtomicU64::new(0)),
431 #[cfg(not(target_arch = "wasm32"))]
432 threshold_hit: Arc::new(AtomicBool::new(false)),
433 #[cfg(not(target_arch = "wasm32"))]
434 sync_count: Arc::new(AtomicU64::new(0)),
435 #[cfg(not(target_arch = "wasm32"))]
436 timer_sync_count: Arc::new(AtomicU64::new(0)),
437 #[cfg(not(target_arch = "wasm32"))]
438 debounce_sync_count: Arc::new(AtomicU64::new(0)),
439 #[cfg(not(target_arch = "wasm32"))]
440 last_sync_duration_ms: Arc::new(AtomicU64::new(0)),
441 #[cfg(not(target_arch = "wasm32"))]
442 sync_sender: None,
443 #[cfg(not(target_arch = "wasm32"))]
444 sync_receiver: None,
445 recovery_report: RecoveryReport::default(),
446 #[cfg(target_arch = "wasm32")]
447 leader_election: None,
448 observability: super::observability::ObservabilityManager::new(),
449 #[cfg(feature = "telemetry")]
450 metrics: None,
451 })
452}