use crate::wal::block::Block;
use crate::wal::config::{DEFAULT_BLOCK_SIZE, MAX_ALLOC, MAX_FILE_SIZE, debug_print};
use crate::wal::paths::WalPathManager;
use crate::wal::storage::{SharedMmap, SharedMmapKeeper};
use std::cell::UnsafeCell;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicU16, Ordering};
use std::sync::{Arc, OnceLock, RwLock};
use super::DELETION_TX;
pub(super) struct BlockAllocator {
next_block: UnsafeCell<Block>,
lock: AtomicBool,
paths: Arc<WalPathManager>,
}
impl BlockAllocator {
pub(super) fn new(paths: Arc<WalPathManager>) -> std::io::Result<Self> {
let file1 = paths.create_new_file()?;
let mmap: Arc<SharedMmap> = SharedMmapKeeper::get_mmap_arc(&file1)?;
debug_print!(
"[alloc] init: created file={}, max_file_size={}B, block_size={}B",
file1,
MAX_FILE_SIZE,
DEFAULT_BLOCK_SIZE
);
Ok(BlockAllocator {
next_block: UnsafeCell::new(Block {
id: 1,
offset: 0,
limit: DEFAULT_BLOCK_SIZE,
file_path: file1,
mmap,
used: 0,
}),
lock: AtomicBool::new(false),
paths,
})
}
pub(super) unsafe fn get_next_available_block(&self) -> std::io::Result<Block> {
self.lock();
let data = unsafe { &mut *self.next_block.get() };
let prev_block_file_path = data.file_path.clone();
if data.offset >= MAX_FILE_SIZE {
FileStateTracker::set_fully_allocated(prev_block_file_path);
data.file_path = self.paths.create_new_file()?;
data.mmap = SharedMmapKeeper::get_mmap_arc(&data.file_path)?;
data.offset = 0;
data.used = 0;
debug_print!("[alloc] rolled over to new file: {}", data.file_path);
}
BlockStateTracker::register_block(data.id as usize, &data.file_path);
FileStateTracker::register_file_if_absent(&data.file_path);
FileStateTracker::add_block_to_file_state(&data.file_path);
FileStateTracker::set_block_locked(data.id as usize);
let ret = data.clone();
data.offset += DEFAULT_BLOCK_SIZE;
data.id += 1;
self.unlock();
debug_print!(
"[alloc] handout: block_id={}, file={}, offset={}, limit={}",
ret.id,
ret.file_path,
ret.offset,
ret.limit
);
Ok(ret)
}
pub(super) unsafe fn alloc_block(&self, want_bytes: u64) -> std::io::Result<Block> {
if want_bytes == 0 || want_bytes > MAX_ALLOC {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"invalid allocation size, a single entry can't be more than 1gb",
));
}
let alloc_units = (want_bytes + DEFAULT_BLOCK_SIZE - 1) / DEFAULT_BLOCK_SIZE;
let alloc_size = alloc_units * DEFAULT_BLOCK_SIZE;
debug_print!(
"[alloc] alloc_block: want_bytes={}, units={}, size={}",
want_bytes,
alloc_units,
alloc_size
);
self.lock();
let data = unsafe { &mut *self.next_block.get() };
if data.offset + alloc_size > MAX_FILE_SIZE {
let prev_block_file_path = data.file_path.clone();
data.file_path = self.paths.create_new_file()?;
data.mmap = SharedMmapKeeper::get_mmap_arc(&data.file_path)?;
data.offset = 0;
FileStateTracker::set_fully_allocated(prev_block_file_path);
debug_print!(
"[alloc] file rollover for sized alloc -> {}",
data.file_path
);
}
let ret = Block {
id: data.id,
file_path: data.file_path.clone(),
offset: data.offset,
limit: alloc_size,
mmap: data.mmap.clone(),
used: 0,
};
BlockStateTracker::register_block(ret.id as usize, &ret.file_path);
FileStateTracker::register_file_if_absent(&ret.file_path);
FileStateTracker::add_block_to_file_state(&ret.file_path);
FileStateTracker::set_block_locked(ret.id as usize);
data.offset += alloc_size;
data.id += 1;
self.unlock();
debug_print!(
"[alloc] handout(sized): block_id={}, file={}, offset={}, limit={}",
ret.id,
ret.file_path,
ret.offset,
ret.limit
);
Ok(ret)
}
fn lock(&self) {
while self
.lock
.compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_err()
{
std::hint::spin_loop();
}
}
fn unlock(&self) {
self.lock.store(false, Ordering::Release);
}
}
unsafe impl Sync for BlockAllocator {}
unsafe impl Send for BlockAllocator {}
pub(super) fn flush_check(file_path: String) {
if let Some((locked, checkpointed, total, fully_allocated)) =
FileStateTracker::get_state_snapshot(&file_path)
{
let ready_to_delete = fully_allocated && locked == 0 && total > 0 && checkpointed >= total;
if ready_to_delete {
if let Some(tx) = DELETION_TX.get() {
let _ = tx.send(file_path);
}
}
}
}
struct BlockState {
is_checkpointed: AtomicBool,
file_path: String,
}
pub(super) struct BlockStateTracker {}
impl BlockStateTracker {
fn map() -> &'static RwLock<HashMap<usize, BlockState>> {
static MAP: OnceLock<RwLock<HashMap<usize, BlockState>>> = OnceLock::new();
MAP.get_or_init(|| RwLock::new(HashMap::new()))
}
pub(super) fn register_block(block_id: usize, file_path: &str) {
let map = Self::map();
if let Ok(mut w) = map.write() {
w.entry(block_id).or_insert_with(|| BlockState {
is_checkpointed: AtomicBool::new(false),
file_path: file_path.to_string(),
});
}
}
pub(super) fn get_file_path_for_block(block_id: usize) -> Option<String> {
let map = Self::map();
let r = map.read().ok()?;
r.get(&block_id).map(|b| b.file_path.clone())
}
pub(super) fn set_checkpointed_true(block_id: usize) {
let path_opt = {
let map = Self::map();
if let Ok(r) = map.read() {
if let Some(b) = r.get(&block_id) {
b.is_checkpointed.store(true, Ordering::Release);
Some(b.file_path.clone())
} else {
None
}
} else {
None
}
};
if let Some(path) = path_opt {
FileStateTracker::inc_checkpoint_for_file(&path);
flush_check(path);
}
}
}
struct FileState {
locked_block_ctr: AtomicU16,
checkpoint_block_ctr: AtomicU16,
total_blocks: AtomicU16,
is_fully_allocated: AtomicBool,
}
pub(super) struct FileStateTracker {}
impl FileStateTracker {
fn map() -> &'static RwLock<HashMap<String, FileState>> {
static MAP: OnceLock<RwLock<HashMap<String, FileState>>> = OnceLock::new();
MAP.get_or_init(|| RwLock::new(HashMap::new()))
}
pub(super) fn register_file_if_absent(file_path: &str) {
let map = Self::map();
let mut w = map.write().expect("file state map write lock poisoned");
w.entry(file_path.to_string()).or_insert_with(|| FileState {
locked_block_ctr: AtomicU16::new(0),
checkpoint_block_ctr: AtomicU16::new(0),
total_blocks: AtomicU16::new(0),
is_fully_allocated: AtomicBool::new(false),
});
}
pub(super) fn add_block_to_file_state(file_path: &str) {
Self::register_file_if_absent(file_path);
let map = Self::map();
if let Ok(r) = map.read() {
if let Some(st) = r.get(file_path) {
st.total_blocks.fetch_add(1, Ordering::AcqRel);
}
}
}
pub(super) fn set_fully_allocated(file_path: String) {
Self::register_file_if_absent(&file_path);
let map = Self::map();
if let Ok(r) = map.read() {
if let Some(st) = r.get(&file_path) {
st.is_fully_allocated.store(true, Ordering::Release);
}
}
flush_check(file_path);
}
pub(super) fn set_block_locked(block_id: usize) {
if let Some(path) = BlockStateTracker::get_file_path_for_block(block_id) {
let map = Self::map();
if let Ok(r) = map.read() {
if let Some(st) = r.get(&path) {
st.locked_block_ctr.fetch_add(1, Ordering::AcqRel);
}
}
}
}
pub(super) fn set_block_unlocked(block_id: usize) {
if let Some(path) = BlockStateTracker::get_file_path_for_block(block_id) {
let map = Self::map();
if let Ok(r) = map.read() {
if let Some(st) = r.get(&path) {
st.locked_block_ctr.fetch_sub(1, Ordering::AcqRel);
}
}
flush_check(path);
}
}
pub(super) fn inc_checkpoint_for_file(file_path: &str) {
let map = Self::map();
if let Ok(r) = map.read() {
if let Some(st) = r.get(file_path) {
st.checkpoint_block_ctr.fetch_add(1, Ordering::AcqRel);
}
}
}
pub(super) fn get_state_snapshot(file_path: &str) -> Option<(u16, u16, u16, bool)> {
let map = Self::map();
let r = map.read().ok()?;
let st = r.get(file_path)?;
let locked = st.locked_block_ctr.load(Ordering::Acquire);
let checkpointed = st.checkpoint_block_ctr.load(Ordering::Acquire);
let total = st.total_blocks.load(Ordering::Acquire);
let fully = st.is_fully_allocated.load(Ordering::Acquire);
Some((locked, checkpointed, total, fully))
}
}