#[cfg(target_arch = "wasm32")]
macro_rules! lock_mutex {
($mutex:expr) => {
$mutex
.try_borrow_mut()
.expect("RefCell borrow failed - reentrancy detected in allocation.rs")
};
}
#[cfg(not(target_arch = "wasm32"))]
macro_rules! lock_mutex {
($mutex:expr) => {
$mutex.lock()
};
}
use super::block_storage::BlockStorage;
use crate::types::DatabaseError;
#[cfg(any(
target_arch = "wasm32",
all(
not(target_arch = "wasm32"),
any(test, debug_assertions),
not(feature = "fs_persist")
)
))]
use std::collections::HashSet;
use std::sync::atomic::Ordering;
#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
use std::{
fs,
io::{Read, Write},
path::PathBuf,
};
#[cfg(target_arch = "wasm32")]
use super::vfs_sync;
#[cfg(all(
not(target_arch = "wasm32"),
any(test, debug_assertions),
not(feature = "fs_persist")
))]
use super::vfs_sync;
#[cfg(all(
not(target_arch = "wasm32"),
any(test, debug_assertions),
not(feature = "fs_persist")
))]
use super::block_storage::GLOBAL_METADATA_TEST;
#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
#[derive(serde::Serialize, serde::Deserialize, Default)]
#[allow(dead_code)]
struct FsAlloc {
allocated: Vec<u64>,
}
#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
#[derive(serde::Serialize, serde::Deserialize, Default)]
#[allow(dead_code)]
struct FsDealloc {
tombstones: Vec<u64>,
}
pub async fn allocate_block_impl(storage: &mut BlockStorage) -> Result<u64, DatabaseError> {
let block_id = storage.next_block_id.fetch_add(1, Ordering::SeqCst);
lock_mutex!(storage.allocated_blocks).insert(block_id);
#[cfg(target_arch = "wasm32")]
{
vfs_sync::with_global_allocation_map(|allocation_map| {
let mut map = allocation_map.borrow_mut();
let db_allocations = map
.entry(storage.db_name.clone())
.or_insert_with(HashSet::new);
db_allocations.insert(block_id);
});
}
#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
{
let base: PathBuf = storage.base_dir.clone();
let mut db_dir = base.clone();
db_dir.push(&storage.db_name);
let _ = fs::create_dir_all(&db_dir);
let mut blocks_dir = db_dir.clone();
blocks_dir.push("blocks");
let _ = fs::create_dir_all(&blocks_dir);
let mut alloc_path = db_dir.clone();
alloc_path.push("allocations.json");
let mut alloc = FsAlloc::default();
if let Ok(mut f) = fs::File::open(&alloc_path) {
let mut s = String::new();
if f.read_to_string(&mut s).is_ok() {
let _ = serde_json::from_str::<FsAlloc>(&s).map(|a| {
alloc = a;
});
}
}
if !alloc.allocated.contains(&block_id) {
alloc.allocated.push(block_id);
}
if let Ok(mut f) = fs::File::create(&alloc_path) {
let _ = f.write_all(
serde_json::to_string(&alloc)
.unwrap_or_else(|_| "{}".into())
.as_bytes(),
);
}
let mut dealloc_path = db_dir.clone();
dealloc_path.push("deallocated.json");
lock_mutex!(storage.deallocated_blocks).remove(&block_id);
let mut dealloc = FsDealloc::default();
if let Ok(mut f) = fs::File::open(&dealloc_path) {
let mut s = String::new();
if f.read_to_string(&mut s).is_ok() {
let _ = serde_json::from_str::<FsDealloc>(&s).map(|d| {
dealloc = d;
});
}
}
dealloc.tombstones = lock_mutex!(storage.deallocated_blocks)
.iter()
.cloned()
.collect();
dealloc.tombstones.sort_unstable();
if let Ok(mut f) = fs::File::create(&dealloc_path) {
let _ = f.write_all(
serde_json::to_string(&dealloc)
.unwrap_or_else(|_| "{}".into())
.as_bytes(),
);
}
}
#[cfg(all(
not(target_arch = "wasm32"),
any(test, debug_assertions),
not(feature = "fs_persist")
))]
{
vfs_sync::with_global_allocation_map(|allocation_map| {
let mut map = allocation_map.borrow_mut();
let db_allocations = map
.entry(storage.db_name.clone())
.or_insert_with(HashSet::new);
db_allocations.insert(block_id);
});
}
#[cfg(feature = "telemetry")]
if let Some(ref metrics) = storage.metrics {
metrics.blocks_allocated_total().inc();
let total_memory = (lock_mutex!(storage.allocated_blocks).len() as f64)
* (super::block_storage::BLOCK_SIZE as f64);
metrics.memory_bytes().set(total_memory);
}
log::info!(
"Allocated block: {} (total allocated: {})",
block_id,
lock_mutex!(storage.allocated_blocks).len()
);
Ok(block_id)
}
pub async fn deallocate_block_impl(
storage: &mut BlockStorage,
block_id: u64,
) -> Result<(), DatabaseError> {
if !lock_mutex!(storage.allocated_blocks).contains(&block_id) {
return Err(DatabaseError::new(
"BLOCK_NOT_ALLOCATED",
&format!("Block {} is not allocated", block_id),
));
}
lock_mutex!(storage.allocated_blocks).remove(&block_id);
lock_mutex!(storage.cache).remove(&block_id);
lock_mutex!(storage.dirty_blocks).remove(&block_id);
storage.checksum_manager.remove_checksum(block_id);
#[cfg(target_arch = "wasm32")]
{
vfs_sync::with_global_storage(|storage_map| {
if let Some(db_storage) = storage_map.borrow_mut().get_mut(&storage.db_name) {
db_storage.remove(&block_id);
}
});
vfs_sync::with_global_allocation_map(|allocation_map| {
if let Some(db_allocations) = allocation_map.borrow_mut().get_mut(&storage.db_name) {
db_allocations.remove(&block_id);
}
});
vfs_sync::with_global_metadata(|meta_map| {
if let Some(db_meta) = meta_map.borrow_mut().get_mut(&storage.db_name) {
db_meta.remove(&block_id);
}
});
}
#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
{
let base: PathBuf = storage.base_dir.clone();
let mut db_dir = base.clone();
db_dir.push(&storage.db_name);
let mut blocks_dir = db_dir.clone();
blocks_dir.push("blocks");
let mut block_path = blocks_dir.clone();
block_path.push(format!("block_{}.bin", block_id));
let _ = fs::remove_file(&block_path);
let mut alloc_path = db_dir.clone();
alloc_path.push("allocations.json");
let mut alloc = FsAlloc::default();
if let Ok(mut f) = fs::File::open(&alloc_path) {
let mut s = String::new();
if f.read_to_string(&mut s).is_ok() {
let _ = serde_json::from_str::<FsAlloc>(&s).map(|a| {
alloc = a;
});
}
}
alloc.allocated.retain(|&id| id != block_id);
if let Ok(mut f) = fs::File::create(&alloc_path) {
let _ = f.write_all(
serde_json::to_string(&alloc)
.unwrap_or_else(|_| "{}".into())
.as_bytes(),
);
}
let mut meta_path = db_dir.clone();
meta_path.push("metadata.json");
let mut meta_val: serde_json::Value = serde_json::json!({"entries": []});
if let Ok(mut f) = fs::File::open(&meta_path) {
let mut s = String::new();
if f.read_to_string(&mut s).is_ok() {
if let Ok(v) = serde_json::from_str::<serde_json::Value>(&s) {
meta_val = v;
}
}
}
if !meta_val.is_object() {
meta_val = serde_json::json!({"entries": []});
}
if let Some(entries) = meta_val.get_mut("entries").and_then(|v| v.as_array_mut()) {
entries.retain(|ent| {
ent.as_array()
.and_then(|arr| arr.first())
.and_then(|v| v.as_u64())
.map(|bid| bid != block_id)
.unwrap_or(true)
});
}
let meta_string = serde_json::to_string(&meta_val).unwrap_or_else(|_| "{}".into());
if let Ok(mut f) = fs::File::create(&meta_path) {
let _ = f.write_all(meta_string.as_bytes());
}
let mut dealloc_path = db_dir.clone();
dealloc_path.push("deallocated.json");
lock_mutex!(storage.deallocated_blocks).insert(block_id);
let mut dealloc = FsDealloc::default();
if let Ok(mut f) = fs::File::open(&dealloc_path) {
let mut s = String::new();
if f.read_to_string(&mut s).is_ok() {
let _ = serde_json::from_str::<FsDealloc>(&s).map(|d| {
dealloc = d;
});
}
}
dealloc.tombstones = lock_mutex!(storage.deallocated_blocks)
.iter()
.cloned()
.collect();
dealloc.tombstones.sort_unstable();
if let Ok(mut f) = fs::File::create(&dealloc_path) {
let _ = f.write_all(
serde_json::to_string(&dealloc)
.unwrap_or_else(|_| "{}".into())
.as_bytes(),
);
}
}
#[cfg(all(
not(target_arch = "wasm32"),
any(test, debug_assertions),
not(feature = "fs_persist")
))]
{
vfs_sync::with_global_storage(|gs| {
let mut storage_map = gs.borrow_mut();
if let Some(db_storage) = storage_map.get_mut(&storage.db_name) {
db_storage.remove(&block_id);
}
});
vfs_sync::with_global_allocation_map(|allocation_map| {
if let Some(db_allocations) = allocation_map.borrow_mut().get_mut(&storage.db_name) {
db_allocations.remove(&block_id);
}
});
GLOBAL_METADATA_TEST.with(|meta| {
let mut meta_map = meta.lock();
if let Some(db_meta) = meta_map.get_mut(&storage.db_name) {
db_meta.remove(&block_id);
}
});
}
let current = storage.next_block_id.load(Ordering::SeqCst);
if block_id < current {
storage.next_block_id.store(block_id, Ordering::SeqCst);
}
#[cfg(feature = "telemetry")]
if let Some(ref metrics) = storage.metrics {
metrics.blocks_deallocated_total().inc();
let total_memory = (lock_mutex!(storage.allocated_blocks).len() as f64)
* (super::block_storage::BLOCK_SIZE as f64);
metrics.memory_bytes().set(total_memory);
}
log::info!(
"Deallocated block: {} (total allocated: {})",
block_id,
lock_mutex!(storage.allocated_blocks).len()
);
Ok(())
}