1#[allow(unused_macros)]
3#[cfg(target_arch = "wasm32")]
4macro_rules! lock_mutex {
5 ($mutex:expr) => {
6 $mutex
7 .try_borrow_mut()
8 .expect("RefCell borrow failed - reentrancy detected in fs_persist.rs")
9 };
10}
11
12#[allow(unused_macros)]
13#[cfg(not(target_arch = "wasm32"))]
14macro_rules! lock_mutex {
15 ($mutex:expr) => {
16 $mutex.lock()
17 };
18}
19
20#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
21use super::metadata::{BlockMetadataPersist, ChecksumAlgorithm};
22#[cfg(any(
23 target_arch = "wasm32",
24 all(
25 not(target_arch = "wasm32"),
26 any(test, debug_assertions),
27 not(feature = "fs_persist")
28 )
29))]
30#[allow(unused_imports)]
31use super::vfs_sync;
32#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
33use crate::types::DatabaseError;
34#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
35use std::collections::HashMap;
36#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
37use std::sync::atomic::Ordering;
38#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
39use std::time::Instant;
40#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
41use std::{
42 env, fs,
43 io::{Read, Write},
44 path::PathBuf,
45};
46
47#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
49#[derive(serde::Serialize, serde::Deserialize, Default)]
50#[allow(dead_code)]
51pub(super) struct FsMeta {
52 pub entries: Vec<(u64, BlockMetadataPersist)>,
53}
54
55#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
56#[derive(serde::Serialize, serde::Deserialize, Default)]
57#[allow(dead_code)]
58pub(super) struct FsAlloc {
59 pub allocated: Vec<u64>,
60}
61
62#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
63#[derive(serde::Serialize, serde::Deserialize, Default)]
64#[allow(dead_code)]
65pub(super) struct FsDealloc {
66 pub tombstones: Vec<u64>,
67}
68
69impl super::BlockStorage {
70 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
72 pub(super) fn fs_persist_sync(&mut self) -> Result<(), DatabaseError> {
73 let dirty_count = lock_mutex!(self.dirty_blocks).len();
75 let dirty_bytes = dirty_count * super::BLOCK_SIZE;
76 self.observability
77 .record_sync_start(dirty_count, dirty_bytes);
78
79 if let Some(ref callback) = self.observability.sync_start_callback {
81 callback(dirty_count, dirty_bytes);
82 }
83
84 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
87 {
88 let base: PathBuf = self.base_dir.clone();
89 let mut db_dir = base.clone();
90 db_dir.push(&self.db_name);
91 let mut blocks_dir = db_dir.clone();
92 blocks_dir.push("blocks");
93 let _ = fs::create_dir_all(&blocks_dir);
94 let mut meta_path = db_dir.clone();
95 meta_path.push("metadata.json");
96 if fs::metadata(&meta_path).is_err() {
97 if let Ok(mut f) = fs::File::create(&meta_path) {
98 let _ = f.write_all(br#"{"entries":[]}"#);
99 }
100 }
101 }
102
103 if self.get_dirty_blocks().lock().is_empty() {
104 log::debug!("No dirty blocks to sync");
105 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
106 {
107 let base: PathBuf = self.base_dir.clone();
109 let mut db_dir = base.clone();
110 db_dir.push(&self.db_name);
111 let mut blocks_dir = db_dir.clone();
112 blocks_dir.push("blocks");
113 let mut meta_path = db_dir.clone();
116 meta_path.push("metadata.json");
117 let mut meta_val: serde_json::Value = serde_json::json!({"entries": []});
118 if let Ok(mut f) = fs::File::open(&meta_path) {
119 let mut s = String::new();
120 if f.read_to_string(&mut s).is_ok() {
121 if let Ok(v) = serde_json::from_str::<serde_json::Value>(&s) {
122 meta_val = v;
123 }
124 }
125 }
126 if !meta_val.is_object() {
128 meta_val = serde_json::json!({"entries": []});
129 }
130 if let Some(entries) = meta_val.get_mut("entries").and_then(|e| e.as_array_mut()) {
132 for ent in entries.iter_mut() {
133 if let Some(arr) = ent.as_array_mut() {
134 if arr.len() == 2 {
135 if let Some(obj) = arr.get_mut(1).and_then(|v| v.as_object_mut()) {
136 let ok = obj
137 .get("algo")
138 .and_then(|v| v.as_str())
139 .map(|s| s == "FastHash" || s == "CRC32")
140 .unwrap_or(false);
141 if !ok {
142 let def = match self.checksum_manager.default_algorithm() {
143 ChecksumAlgorithm::CRC32 => "CRC32",
144 _ => "FastHash",
145 };
146 obj.insert(
147 "algo".into(),
148 serde_json::Value::String(def.into()),
149 );
150 }
151 }
152 }
153 }
154 }
155 }
156 let meta_string = serde_json::to_string(&meta_val).unwrap_or_else(|_| "{}".into());
157 let allocated: std::collections::HashSet<u64> =
158 lock_mutex!(self.allocated_blocks).clone();
159 let mut meta_pending = db_dir.clone();
161 meta_pending.push("metadata.json.pending");
162 log::debug!(
163 "[fs_persist] cleanup-only: writing pending metadata at {:?}",
164 meta_pending
165 );
166 if let Ok(mut f) = fs::File::create(&meta_pending) {
167 let _ = f.write_all(meta_string.as_bytes());
168 let _ = f.sync_all();
169 }
170 let _ = fs::rename(&meta_pending, &meta_path);
171 log::debug!(
172 "[fs_persist] cleanup-only: finalized metadata rename to {:?}",
173 meta_path
174 );
175 let mut alloc_path = db_dir.clone();
176 alloc_path.push("allocations.json");
177 let mut alloc = FsAlloc {
178 allocated: allocated.iter().cloned().collect(),
179 };
180 alloc.allocated.sort_unstable();
181 if let Ok(mut f) = fs::File::create(&alloc_path) {
182 let _ = f.write_all(
183 serde_json::to_string(&alloc)
184 .unwrap_or_else(|_| "{}".into())
185 .as_bytes(),
186 );
187 }
188 log::info!("wrote allocations.json at {:?}", alloc_path);
189 let valid_ids: std::collections::HashSet<u64> =
192 if let Some(entries) = meta_val.get("entries").and_then(|e| e.as_array()) {
193 entries
194 .iter()
195 .filter_map(|ent| {
196 ent.as_array()
197 .and_then(|arr| arr.first())
198 .and_then(|v| v.as_u64())
199 })
200 .collect()
201 } else {
202 std::collections::HashSet::new()
203 };
204 if let Ok(entries) = fs::read_dir(&blocks_dir) {
205 for entry in entries.flatten() {
206 if let Ok(ft) = entry.file_type() {
207 if ft.is_file() {
208 if let Some(name) = entry.file_name().to_str() {
209 if let Some(id_str) = name
210 .strip_prefix("block_")
211 .and_then(|s| s.strip_suffix(".bin"))
212 {
213 if let Ok(id) = id_str.parse::<u64>() {
214 if !valid_ids.contains(&id) {
215 let _ = fs::remove_file(entry.path());
216 }
217 }
218 }
219 }
220 }
221 }
222 }
223 }
224
225 let alt_base: PathBuf = {
227 if let Ok(p) = env::var("ABSURDERSQL_FS_BASE") {
228 PathBuf::from(p)
229 } else if cfg!(any(test, debug_assertions)) {
230 PathBuf::from(format!(".absurdersql_fs/run_{}", std::process::id()))
231 } else {
232 PathBuf::from(".absurdersql_fs")
233 }
234 };
235 if alt_base != self.base_dir {
236 let mut alt_db_dir = alt_base.clone();
237 alt_db_dir.push(&self.db_name);
238 let mut alt_blocks_dir = alt_db_dir.clone();
239 alt_blocks_dir.push("blocks");
240 let _ = fs::create_dir_all(&alt_blocks_dir);
241 let mut alt_meta_pending = alt_db_dir.clone();
243 alt_meta_pending.push("metadata.json.pending");
244 log::debug!(
245 "[fs_persist] cleanup-only (alt): writing pending metadata at {:?}",
246 alt_meta_pending
247 );
248 if let Ok(mut f) = fs::File::create(&alt_meta_pending) {
249 let _ = f.write_all(meta_string.as_bytes());
250 let _ = f.sync_all();
251 }
252 let mut alt_meta_path = alt_db_dir.clone();
253 alt_meta_path.push("metadata.json");
254 let _ = fs::rename(&alt_meta_pending, &alt_meta_path);
255 log::debug!(
256 "[fs_persist] cleanup-only (alt): finalized metadata rename to {:?}",
257 alt_meta_path
258 );
259 let mut alt_alloc_path = alt_db_dir.clone();
260 alt_alloc_path.push("allocations.json");
261 if let Ok(mut f) = fs::File::create(&alt_alloc_path) {
262 let _ = f.write_all(
263 serde_json::to_string(&alloc)
264 .unwrap_or_else(|_| "{}".into())
265 .as_bytes(),
266 );
267 }
268 log::info!("(alt) wrote allocations.json at {:?}", alt_alloc_path);
269 if let Ok(entries) = fs::read_dir(&alt_blocks_dir) {
270 for entry in entries.flatten() {
271 if let Ok(ft) = entry.file_type() {
272 if ft.is_file() {
273 if let Some(name) = entry.file_name().to_str() {
274 if let Some(id_str) = name
275 .strip_prefix("block_")
276 .and_then(|s| s.strip_suffix(".bin"))
277 {
278 if let Ok(id) = id_str.parse::<u64>() {
279 if !valid_ids.contains(&id) {
280 let _ = fs::remove_file(entry.path());
281 }
282 }
283 }
284 }
285 }
286 }
287 }
288 }
289 }
290 }
291 return Ok(());
292 }
293
294 let current_dirty = self.get_dirty_blocks().lock().len();
295 log::info!("Syncing {} dirty blocks", current_dirty);
296
297 #[cfg(target_arch = "wasm32")]
299 {
300 let to_persist: Vec<(u64, Vec<u8>)> = {
301 let dirty = self.get_dirty_blocks().lock();
302 dirty.iter().map(|(k, v)| (*k, v.clone())).collect()
303 };
304 let ids: Vec<u64> = to_persist.iter().map(|(k, _)| *k).collect();
305 let next_commit: u64 = vfs_sync::with_global_commit_marker(|cm| {
307 let cm = cm;
308 let current = cm.get(&self.db_name).copied().unwrap_or(0);
309 #[cfg(target_arch = "wasm32")]
310 log::debug!("Current commit marker for {}: {}", self.db_name, current);
311 current + 1
312 });
313 #[cfg(target_arch = "wasm32")]
314 log::debug!("Next commit marker for {}: {}", self.db_name, next_commit);
315 vfs_sync::with_global_storage(|storage| {
316 let mut storage_map = storage.lock();
317 let db_storage = storage_map
318 .entry(self.db_name.clone())
319 .or_insert_with(HashMap::new);
320 for (block_id, data) in &to_persist {
321 let should_update = if let Some(existing) = db_storage.get(block_id) {
323 if existing != data {
324 let has_committed_metadata = vfs_sync::with_global_metadata(|meta| {
326 let meta_map = meta.borrow_mut();
327 if let Some(db_meta) = meta_map.get(&self.db_name) {
328 if let Some(metadata) = db_meta.get(block_id) {
329 metadata.version > 0
330 } else {
331 false
332 }
333 } else {
334 false
335 }
336 });
337
338 let existing_preview = if existing.len() >= 8 {
339 format!(
340 "{:02x} {:02x} {:02x} {:02x} {:02x} {:02x} {:02x} {:02x}",
341 existing[0],
342 existing[1],
343 existing[2],
344 existing[3],
345 existing[4],
346 existing[5],
347 existing[6],
348 existing[7]
349 )
350 } else {
351 "short".to_string()
352 };
353 let new_preview = if data.len() >= 8 {
354 format!(
355 "{:02x} {:02x} {:02x} {:02x} {:02x} {:02x} {:02x} {:02x}",
356 data[0],
357 data[1],
358 data[2],
359 data[3],
360 data[4],
361 data[5],
362 data[6],
363 data[7]
364 )
365 } else {
366 "short".to_string()
367 };
368
369 if has_committed_metadata {
370 #[cfg(target_arch = "wasm32")]
373 log::debug!(
374 "SYNC preserving committed block {} - existing: {}, cache: {} - NEVER OVERWRITE COMMITTED DATA",
375 block_id,
376 existing_preview,
377 new_preview
378 );
379 false } else {
381 #[cfg(target_arch = "wasm32")]
382 log::debug!(
383 "SYNC updating uncommitted block {} - existing: {}, new: {}",
384 block_id,
385 existing_preview,
386 new_preview
387 );
388 true }
390 } else {
391 true }
393 } else {
394 true };
396
397 if should_update {
398 db_storage.insert(*block_id, data.clone());
399 log::debug!("Persisted block {} to global storage", block_id);
400 }
401 }
402 });
403 vfs_sync::with_global_metadata(|meta| {
405 let mut meta_map = meta.borrow_mut();
406 let db_meta = meta_map
407 .entry(self.db_name.clone())
408 .or_insert_with(HashMap::new);
409 for block_id in ids {
410 if let Some(checksum) = self.checksum_manager.get_checksum(block_id) {
411 let version = next_commit as u32;
413 db_meta.insert(
414 block_id,
415 BlockMetadataPersist {
416 checksum,
417 last_modified_ms: Self::now_millis(),
418 version,
419 algo: self.checksum_manager.get_algorithm(block_id),
420 },
421 );
422 log::debug!("Persisted metadata for block {}", block_id);
423 }
424 }
425 });
426 vfs_sync::with_global_commit_marker(|cm| {
428 let cm_map = cm;
429 cm_map.insert(self.db_name.clone(), next_commit);
430 });
431
432 #[cfg(target_arch = "wasm32")]
434 log::debug!(
435 "Spawning IndexedDB persistence for {} blocks",
436 to_persist.len()
437 );
438 let db_name = self.db_name.clone();
439 wasm_bindgen_futures::spawn_local(async move {
440 use wasm_bindgen::JsCast;
441
442 let global = js_sys::global();
444 let indexed_db_value = match js_sys::Reflect::get(
445 &global,
446 &wasm_bindgen::JsValue::from_str("indexedDB"),
447 ) {
448 Ok(val) => val,
449 Err(_) => {
450 log::error!("IndexedDB property access failed - cannot persist");
451 return;
452 }
453 };
454
455 if indexed_db_value.is_null() || indexed_db_value.is_undefined() {
456 log::warn!(
457 "IndexedDB unavailable for persistence (private browsing?) - data not persisted to IndexedDB"
458 );
459 return;
460 }
461
462 let idb_factory = match indexed_db_value.dyn_into::<web_sys::IdbFactory>() {
463 Ok(factory) => factory,
464 Err(_) => {
465 log::error!("IndexedDB property is not an IdbFactory - cannot persist");
466 return;
467 }
468 };
469
470 let open_req = match idb_factory.open_with_u32("block_storage", 2) {
471 Ok(req) => req,
472 Err(e) => {
473 log::error!("Failed to open IndexedDB for persistence: {:?}", e);
474 return;
475 }
476 };
477
478 let upgrade_handler = js_sys::Function::new_no_args(&format!(
480 "
481 const db = event.target.result;
482 if (!db.objectStoreNames.contains('blocks')) {{
483 db.createObjectStore('blocks');
484 }}
485 if (!db.objectStoreNames.contains('metadata')) {{
486 db.createObjectStore('metadata');
487 }}
488 "
489 ));
490 open_req.set_onupgradeneeded(Some(&upgrade_handler));
491
492 let (tx, rx) = futures::channel::oneshot::channel();
494 let tx = std::rc::Rc::new(std::cell::RefCell::new(Some(tx)));
495
496 let success_tx = tx.clone();
497 let success_callback =
498 wasm_bindgen::closure::Closure::wrap(Box::new(move |event: web_sys::Event| {
499 if let Some(tx) = success_tx.lock().take() {
500 let target = event.target().unwrap();
501 let request: web_sys::IdbOpenDbRequest = target.unchecked_into();
502 let result = request.result().unwrap();
503 let _ = tx.send(Ok(result));
504 }
505 })
506 as Box<dyn FnMut(_)>);
507
508 let error_tx = tx.clone();
509 let error_callback =
510 wasm_bindgen::closure::Closure::wrap(Box::new(move |event: web_sys::Event| {
511 if let Some(tx) = error_tx.lock().take() {
512 let _ = tx.send(Err(format!("IndexedDB open failed: {:?}", event)));
513 }
514 })
515 as Box<dyn FnMut(_)>);
516
517 open_req.set_onsuccess(Some(success_callback.as_ref().unchecked_ref()));
518 open_req.set_onerror(Some(error_callback.as_ref().unchecked_ref()));
519
520 let db_result = rx.await;
521
522 success_callback.forget();
524 error_callback.forget();
525
526 match db_result {
527 Ok(Ok(db_value)) => {
528 #[cfg(target_arch = "wasm32")]
529 log::info!("Successfully opened IndexedDB for persistence");
530 if let Ok(db) = db_value.dyn_into::<web_sys::IdbDatabase>() {
531 let store_names = js_sys::Array::new();
533 store_names.push(&wasm_bindgen::JsValue::from_str("blocks"));
534 store_names.push(&wasm_bindgen::JsValue::from_str("metadata"));
535
536 let transaction = db
537 .transaction_with_str_sequence_and_mode(
538 &store_names,
539 web_sys::IdbTransactionMode::Readwrite,
540 )
541 .unwrap();
542
543 let blocks_store = transaction.object_store("blocks").unwrap();
544 let metadata_store = transaction.object_store("metadata").unwrap();
545
546 for (block_id, data) in &to_persist {
548 let key = wasm_bindgen::JsValue::from_str(&format!(
549 "{}_{}",
550 db_name, block_id
551 ));
552 let value = js_sys::Uint8Array::from(&data[..]);
553 blocks_store.put_with_key(&value, &key).unwrap();
554 #[cfg(target_arch = "wasm32")]
555 log::debug!("Persisted block {} to IndexedDB", block_id);
556 }
557
558 let commit_key = wasm_bindgen::JsValue::from_str(&format!(
560 "{}_commit_marker",
561 db_name
562 ));
563 let commit_value = wasm_bindgen::JsValue::from_f64(next_commit as f64);
564 metadata_store
565 .put_with_key(&commit_value, &commit_key)
566 .unwrap();
567 #[cfg(target_arch = "wasm32")]
568 log::info!("Persisted commit marker {} to IndexedDB", next_commit);
569
570 let (tx_tx, tx_rx) = futures::channel::oneshot::channel();
572 let tx_tx = std::rc::Rc::new(std::cell::RefCell::new(Some(tx_tx)));
573
574 let tx_complete_tx = tx_tx.clone();
575 let tx_complete_callback = wasm_bindgen::closure::Closure::wrap(
576 Box::new(move |_event: web_sys::Event| {
577 if let Some(tx) = tx_complete_tx.lock().take() {
578 let _ = tx.send(Ok(()));
579 }
580 }) as Box<dyn FnMut(_)>,
581 );
582
583 let tx_error_tx = tx_tx.clone();
584 let tx_error_callback = wasm_bindgen::closure::Closure::wrap(Box::new(
585 move |event: web_sys::Event| {
586 if let Some(tx) = tx_error_tx.lock().take() {
587 let _ = tx
588 .send(Err(format!("Transaction failed: {:?}", event)));
589 }
590 },
591 )
592 as Box<dyn FnMut(_)>);
593
594 transaction.set_oncomplete(Some(
595 tx_complete_callback.as_ref().unchecked_ref(),
596 ));
597 transaction
598 .set_onerror(Some(tx_error_callback.as_ref().unchecked_ref()));
599
600 match tx_rx.await {
601 Ok(Ok(_)) => {
602 #[cfg(target_arch = "wasm32")]
603 log::info!("IndexedDB transaction completed successfully");
604 }
605 Ok(Err(e)) => {
606 #[cfg(target_arch = "wasm32")]
607 log::error!("IndexedDB transaction failed: {}", e);
608 }
609 Err(_) => {
610 #[cfg(target_arch = "wasm32")]
611 log::error!("IndexedDB transaction channel failed");
612 }
613 }
614
615 tx_complete_callback.forget();
617 tx_error_callback.forget();
618 } else {
619 #[cfg(target_arch = "wasm32")]
620 log::error!("Failed to cast to IdbDatabase for persistence");
621 }
622 }
623 Ok(Err(e)) => {
624 #[cfg(target_arch = "wasm32")]
625 log::error!("Failed to open IndexedDB for persistence: {}", e);
626 }
627 Err(_) => {
628 #[cfg(target_arch = "wasm32")]
629 log::error!("IndexedDB open channel failed");
630 }
631 }
632 });
633 }
634
635 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
637 {
638 let to_persist: Vec<(u64, Vec<u8>)> = {
639 let dirty = self.get_dirty_blocks().lock();
640 dirty.iter().map(|(k, v)| (*k, v.clone())).collect()
641 };
642 let now_ms = Self::now_millis();
643 let base: PathBuf = self.base_dir.clone();
644 let mut db_dir = base.clone();
645 db_dir.push(&self.db_name);
646 let mut blocks_dir = db_dir.clone();
647 blocks_dir.push("blocks");
648 let mut meta_path = db_dir.clone();
649 meta_path.push("metadata.json");
650 let _ = fs::create_dir_all(&blocks_dir);
652 let mut meta_val: serde_json::Value = serde_json::json!({"entries": []});
654 if let Ok(mut f) = fs::File::open(&meta_path) {
655 let mut s = String::new();
656 if f.read_to_string(&mut s).is_ok() {
657 if let Ok(v) = serde_json::from_str::<serde_json::Value>(&s) {
658 meta_val = v;
659 }
660 }
661 }
662 if !meta_val.is_object() {
663 meta_val = serde_json::json!({"entries": []});
664 }
665 let mut map: HashMap<u64, serde_json::Map<String, serde_json::Value>> = HashMap::new();
666 if let Some(entries) = meta_val.get("entries").and_then(|e| e.as_array()) {
667 for ent in entries.iter() {
668 if let Some(arr) = ent.as_array() {
669 if arr.len() == 2 {
670 if let (Some(id), Some(obj)) = (
671 arr.first().and_then(|v| v.as_u64()),
672 arr.get(1).and_then(|v| v.as_object()),
673 ) {
674 map.insert(id, obj.clone());
675 }
676 }
677 }
678 }
679 }
680 for (block_id, data) in &to_persist {
681 let mut block_file = blocks_dir.clone();
683 block_file.push(format!("block_{}.bin", block_id));
684 if let Ok(mut f) = fs::File::create(&block_file) {
685 let _ = f.write_all(data);
686 }
687 if let Some(checksum) = self.checksum_manager.get_checksum(*block_id) {
689 let version_u64 = map
690 .get(block_id)
691 .and_then(|m| m.get("version"))
692 .and_then(|v| v.as_u64())
693 .unwrap_or(0)
694 .saturating_add(1);
695 let algo = self.checksum_manager.get_algorithm(*block_id);
696 let algo_str = match algo {
697 ChecksumAlgorithm::CRC32 => "CRC32",
698 _ => "FastHash",
699 };
700 let mut obj = serde_json::Map::new();
701 obj.insert("checksum".into(), serde_json::Value::from(checksum));
702 obj.insert("last_modified_ms".into(), serde_json::Value::from(now_ms));
703 obj.insert("version".into(), serde_json::Value::from(version_u64));
704 obj.insert("algo".into(), serde_json::Value::String(algo_str.into()));
705 map.insert(*block_id, obj);
706 }
707 }
708 for (_id, obj) in map.iter_mut() {
710 let ok = obj
711 .get("algo")
712 .and_then(|v| v.as_str())
713 .map(|s| s == "FastHash" || s == "CRC32")
714 .unwrap_or(false);
715 if !ok {
716 let def = match self.checksum_manager.default_algorithm() {
717 ChecksumAlgorithm::CRC32 => "CRC32",
718 _ => "FastHash",
719 };
720 obj.insert("algo".into(), serde_json::Value::String(def.into()));
721 }
722 }
723 let allocated: std::collections::HashSet<u64> =
725 lock_mutex!(self.allocated_blocks).clone();
726 let mut entries_vec: Vec<serde_json::Value> = Vec::new();
728 for (id, obj) in map.iter() {
729 entries_vec.push(serde_json::Value::Array(vec![
730 serde_json::Value::from(*id),
731 serde_json::Value::Object(obj.clone()),
732 ]));
733 }
734 let meta_out = serde_json::json!({"entries": entries_vec});
735 let meta_string = serde_json::to_string(&meta_out).unwrap_or_else(|_| "{}".into());
736 let mut meta_pending = db_dir.clone();
738 meta_pending.push("metadata.json.pending");
739 log::debug!(
740 "[fs_persist] writing pending metadata at {:?}",
741 meta_pending
742 );
743 if let Ok(mut f) = fs::File::create(&meta_pending) {
744 let _ = f.write_all(meta_string.as_bytes());
745 let _ = f.sync_all();
746 }
747 let _ = fs::rename(&meta_pending, &meta_path);
748 log::debug!("[fs_persist] finalized metadata rename to {:?}", meta_path);
749 let mut alloc_path = db_dir.clone();
751 alloc_path.push("allocations.json");
752 let mut alloc = FsAlloc {
753 allocated: allocated.iter().cloned().collect(),
754 };
755 alloc.allocated.sort_unstable();
756 if let Ok(mut f) = fs::File::create(&alloc_path) {
757 let _ = f.write_all(
758 serde_json::to_string(&alloc)
759 .unwrap_or_else(|_| "{}".into())
760 .as_bytes(),
761 );
762 }
763 log::info!("wrote allocations.json at {:?}", alloc_path);
764 let valid_ids: std::collections::HashSet<u64> = map.keys().cloned().collect();
767 if let Ok(entries) = fs::read_dir(&blocks_dir) {
768 for entry in entries.flatten() {
769 if let Ok(ft) = entry.file_type() {
770 if ft.is_file() {
771 if let Some(name) = entry.file_name().to_str() {
772 if let Some(id_str) = name
773 .strip_prefix("block_")
774 .and_then(|s| s.strip_suffix(".bin"))
775 {
776 if let Ok(id) = id_str.parse::<u64>() {
777 if !valid_ids.contains(&id) {
778 let _ = fs::remove_file(entry.path());
779 }
780 }
781 }
782 }
783 }
784 }
785 }
786 }
787
788 let alt_base: PathBuf = {
790 if let Ok(p) = env::var("ABSURDERSQL_FS_BASE") {
791 PathBuf::from(p)
792 } else if cfg!(any(test, debug_assertions)) {
793 PathBuf::from(format!(".absurdersql_fs/run_{}", std::process::id()))
794 } else {
795 PathBuf::from(".absurdersql_fs")
796 }
797 };
798 if alt_base != self.base_dir {
799 let mut alt_db_dir = alt_base.clone();
800 alt_db_dir.push(&self.db_name);
801 let mut alt_blocks_dir = alt_db_dir.clone();
802 alt_blocks_dir.push("blocks");
803 let _ = fs::create_dir_all(&alt_blocks_dir);
804 let alt_to_persist: Vec<(u64, Vec<u8>)> = {
806 let dirty = self.get_dirty_blocks().lock();
807 dirty.iter().map(|(k, v)| (*k, v.clone())).collect()
808 };
809 for (block_id, data) in alt_to_persist.iter() {
810 let mut alt_block_file = alt_blocks_dir.clone();
811 alt_block_file.push(format!("block_{}.bin", block_id));
812 if let Ok(mut f) = fs::File::create(&alt_block_file) {
813 let _ = f.write_all(data);
814 }
815 }
816 let mut alt_meta_pending = alt_db_dir.clone();
818 alt_meta_pending.push("metadata.json.pending");
819 log::debug!(
820 "[fs_persist] (alt) writing pending metadata at {:?}",
821 alt_meta_pending
822 );
823 if let Ok(mut f) = fs::File::create(&alt_meta_pending) {
824 let _ = f.write_all(meta_string.as_bytes());
825 let _ = f.sync_all();
826 }
827 let mut alt_meta_path = alt_db_dir.clone();
828 alt_meta_path.push("metadata.json");
829 let _ = fs::rename(&alt_meta_pending, &alt_meta_path);
830 log::debug!(
831 "[fs_persist] (alt) finalized metadata rename to {:?}",
832 alt_meta_path
833 );
834 let mut alt_alloc_path = alt_db_dir.clone();
836 alt_alloc_path.push("allocations.json");
837 if let Ok(mut f) = fs::File::create(&alt_alloc_path) {
838 let _ = f.write_all(
839 serde_json::to_string(&alloc)
840 .unwrap_or_else(|_| "{}".into())
841 .as_bytes(),
842 );
843 }
844 log::info!("(alt) wrote allocations.json at {:?}", alt_alloc_path);
845 if let Ok(entries) = fs::read_dir(&alt_blocks_dir) {
847 for entry in entries.flatten() {
848 if let Ok(ft) = entry.file_type() {
849 if ft.is_file() {
850 if let Some(name) = entry.file_name().to_str() {
851 if let Some(id_str) = name
852 .strip_prefix("block_")
853 .and_then(|s| s.strip_suffix(".bin"))
854 {
855 if let Ok(id) = id_str.parse::<u64>() {
856 if !valid_ids.contains(&id) {
857 let _ = fs::remove_file(entry.path());
858 }
859 }
860 }
861 }
862 }
863 }
864 }
865 }
866 }
867 }
868
869 #[cfg(all(
871 not(target_arch = "wasm32"),
872 any(test, debug_assertions),
873 not(feature = "fs_persist")
874 ))]
875 {
876 let to_persist: Vec<(u64, Vec<u8>)> = {
877 let dirty = self.get_dirty_blocks().lock();
878 dirty.iter().map(|(k, v)| (*k, v.clone())).collect()
879 };
880 let ids: Vec<u64> = to_persist.iter().map(|(k, _)| *k).collect();
881 let next_commit: u64 = vfs_sync::with_global_commit_marker(|cm| {
883 let cm = cm;
884 cm.get(&self.db_name).copied().unwrap_or(0) + 1
885 });
886 vfs_sync::with_global_storage(|storage| {
887 let mut storage_map = storage.lock();
888 let db_storage = storage_map
889 .entry(self.db_name.clone())
890 .or_insert_with(HashMap::new);
891 for (block_id, data) in &to_persist {
892 db_storage.insert(*block_id, data.clone());
893 log::debug!("[test] Persisted block {} to test-global storage", block_id);
894 }
895 });
896 GLOBAL_METADATA_TEST.with(|meta| {
898 let mut meta_map = meta.borrow_mut();
899 let db_meta = meta_map
900 .entry(self.db_name.clone())
901 .or_insert_with(HashMap::new);
902 for block_id in ids {
903 if let Some(checksum) = self.checksum_manager.get_checksum(block_id) {
904 let version = next_commit as u32;
905 db_meta.insert(
906 block_id,
907 BlockMetadataPersist {
908 checksum,
909 last_modified_ms: Self::now_millis(),
910 version,
911 algo: self.checksum_manager.get_algorithm(block_id),
912 },
913 );
914 log::debug!("[test] Persisted metadata for block {}", block_id);
915 }
916 }
917 });
918 vfs_sync::with_global_commit_marker(|cm| {
920 let cm_map = cm;
921 cm_map.insert(self.db_name.clone(), next_commit);
922 });
923 }
924
925 #[cfg(not(target_arch = "wasm32"))]
926 let start = Instant::now();
927 let dirty_count = {
928 let mut dirty = self.get_dirty_blocks().lock();
929 let count = dirty.len();
930 dirty.clear();
931 count
932 };
933 log::info!(
934 "Successfully synced {} blocks to global storage",
935 dirty_count
936 );
937 #[cfg(not(target_arch = "wasm32"))]
938 {
939 self.sync_count.fetch_add(1, Ordering::SeqCst);
940 let elapsed = start.elapsed();
941 let ms = elapsed.as_millis() as u64;
942 let ms = if ms == 0 { 1 } else { ms };
943 self.last_sync_duration_ms.store(ms, Ordering::SeqCst);
944
945 self.observability.record_sync_success(ms, dirty_count);
947
948 if let Some(ref callback) = self.observability.sync_success_callback {
950 callback(ms, dirty_count);
951 }
952 }
953 self.evict_if_needed();
955 Ok(())
956 }
957}