use std::collections::HashSet;
use std::fmt;
use std::path::Path;
use std::sync::Mutex;
use bstack::BStack;
use crc32fast::Hasher as CrcHasher;
use crate::Error;
const MAGIC: [u8; 4] = *b"BLLS";
const VERSION: u32 = 1;
const HEADER_SIZE: u64 = 24;
const BLOCK_HEADER_SIZE: usize = 12;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct BlockRef(pub u64);
impl fmt::Display for BlockRef {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "@{}", self.0)
}
}
impl fmt::LowerHex for BlockRef {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("@")?;
fmt::LowerHex::fmt(&self.0, f)
}
}
impl fmt::UpperHex for BlockRef {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("@")?;
fmt::UpperHex::fmt(&self.0, f)
}
}
impl From<u64> for BlockRef {
fn from(offset: u64) -> Self {
BlockRef(offset)
}
}
impl From<BlockRef> for u64 {
fn from(r: BlockRef) -> u64 {
r.0
}
}
struct Header {
root: u64,
free_list_head: u64,
}
impl Header {
fn from_bytes(buf: &[u8; 24]) -> Result<Self, Error> {
if buf[0..4] != MAGIC {
return Err(Error::Corruption(format!(
"invalid magic bytes: expected {:?}, found {:?}",
MAGIC,
&buf[0..4]
)));
}
let version = u32::from_le_bytes(buf[4..8].try_into().unwrap());
if version != VERSION {
return Err(Error::Corruption(format!(
"unsupported bllist version {version}, expected {VERSION}"
)));
}
Ok(Self {
root: u64::from_le_bytes(buf[8..16].try_into().unwrap()),
free_list_head: u64::from_le_bytes(buf[16..24].try_into().unwrap()),
})
}
fn to_bytes(&self) -> [u8; 24] {
let mut buf = [0u8; 24];
buf[0..4].copy_from_slice(&MAGIC);
buf[4..8].copy_from_slice(&VERSION.to_le_bytes());
buf[8..16].copy_from_slice(&self.root.to_le_bytes());
buf[16..24].copy_from_slice(&self.free_list_head.to_le_bytes());
buf
}
}
pub struct FixedBlockList<const PAYLOAD_CAPACITY: usize> {
stack: BStack,
mu: Mutex<()>,
}
impl<const PAYLOAD_CAPACITY: usize> fmt::Debug for FixedBlockList<PAYLOAD_CAPACITY> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FixedBlockList")
.field("payload_capacity", &PAYLOAD_CAPACITY)
.field("block_size", &(PAYLOAD_CAPACITY + BLOCK_HEADER_SIZE))
.finish_non_exhaustive()
}
}
impl<const PAYLOAD_CAPACITY: usize> fmt::Display for FixedBlockList<PAYLOAD_CAPACITY> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "FixedBlockList<{PAYLOAD_CAPACITY}>")
}
}
impl<const PAYLOAD_CAPACITY: usize> FixedBlockList<PAYLOAD_CAPACITY> {
pub fn open(path: impl AsRef<Path>) -> Result<Self, Error> {
const {
assert!(
PAYLOAD_CAPACITY > 0,
"PAYLOAD_CAPACITY must be greater than 0 \
(block header already occupies 12 bytes)"
)
};
let stack = BStack::open(path)?;
let total = stack.len()?;
if total == 0 {
let h = Header {
root: 0,
free_list_head: 0,
};
let offset = stack.push(&h.to_bytes())?;
debug_assert_eq!(offset, 0, "bllist header must land at logical offset 0");
return Ok(Self {
stack,
mu: Mutex::new(()),
});
}
if total < HEADER_SIZE {
return Err(Error::Corruption(format!(
"file payload is {total} bytes, too small for the 24-byte bllist header"
)));
}
let mut hdr_buf = [0u8; 24];
stack.get_into(0, &mut hdr_buf)?;
let mut header = Header::from_bytes(&hdr_buf)?;
Self::recover_orphans(&stack, &mut header, total)?;
Ok(Self {
stack,
mu: Mutex::new(()),
})
}
pub const fn payload_capacity() -> usize {
PAYLOAD_CAPACITY
}
pub fn alloc(&self) -> Result<BlockRef, Error> {
let _g = self.mu.lock().unwrap();
let mut header = self.read_header_locked()?;
self.alloc_locked(&mut header)
}
pub fn free(&self, block: BlockRef) -> Result<(), Error> {
self.validate_block_offset(block.0)?;
let _g = self.mu.lock().unwrap();
let mut header = self.read_header_locked()?;
self.free_locked(block, &mut header)
}
pub fn write(&self, block: BlockRef, data: &[u8]) -> Result<(), Error> {
if data.len() > PAYLOAD_CAPACITY {
return Err(Error::DataTooLarge {
capacity: PAYLOAD_CAPACITY,
provided: data.len(),
});
}
self.validate_block_offset(block.0)?;
let block_size = PAYLOAD_CAPACITY + BLOCK_HEADER_SIZE;
let mut buf = vec![0u8; block_size];
self.stack.get_into(block.0 + 4, &mut buf[4..12])?;
buf[12..12 + data.len()].copy_from_slice(data);
let crc = crc32fast::hash(&buf[4..]);
buf[0..4].copy_from_slice(&crc.to_le_bytes());
self.stack.set(block.0, &buf)?;
Ok(())
}
pub fn read(&self, block: BlockRef) -> Result<Vec<u8>, Error> {
let mut buf = vec![0u8; PAYLOAD_CAPACITY];
self.read_into(block, &mut buf)?;
Ok(buf)
}
pub fn read_into(&self, block: BlockRef, buf: &mut [u8]) -> Result<(), Error> {
if buf.len() > PAYLOAD_CAPACITY {
return Err(Error::DataTooLarge {
capacity: PAYLOAD_CAPACITY,
provided: buf.len(),
});
}
self.validate_block_offset(block.0)?;
let mut hdr = [0u8; 12];
self.stack.get_into(block.0, &mut hdr)?;
self.stack.get_into(block.0 + 12, buf)?;
let stored = u32::from_le_bytes(hdr[0..4].try_into().unwrap());
let mut hasher = CrcHasher::new();
hasher.update(&hdr[4..12]); hasher.update(buf);
let tail = PAYLOAD_CAPACITY - buf.len();
if tail > 0 {
hasher.update(&vec![0u8; tail]);
}
if hasher.finalize() != stored {
return Err(Error::ChecksumMismatch { block: block.0 });
}
Ok(())
}
pub fn set_next(&self, block: BlockRef, next: Option<BlockRef>) -> Result<(), Error> {
self.validate_block_offset(block.0)?;
let next_val = next.map(|r| r.0).unwrap_or(0u64);
let block_size = PAYLOAD_CAPACITY + BLOCK_HEADER_SIZE;
let mut buf = vec![0u8; block_size];
self.stack.get_into(block.0, &mut buf)?;
buf[4..12].copy_from_slice(&next_val.to_le_bytes());
let crc = crc32fast::hash(&buf[4..]);
buf[0..4].copy_from_slice(&crc.to_le_bytes());
self.stack.set(block.0, &buf)?;
Ok(())
}
pub fn get_next(&self, block: BlockRef) -> Result<Option<BlockRef>, Error> {
self.validate_block_offset(block.0)?;
let mut next_buf = [0u8; 8];
self.stack.get_into(block.0 + 4, &mut next_buf)?;
let next = u64::from_le_bytes(next_buf);
Ok(if next == 0 {
None
} else {
Some(BlockRef(next))
})
}
pub fn root(&self) -> Result<Option<BlockRef>, Error> {
let _g = self.mu.lock().unwrap();
let header = self.read_header_locked()?;
Ok(if header.root == 0 {
None
} else {
Some(BlockRef(header.root))
})
}
pub fn push_front(&self, data: &[u8]) -> Result<BlockRef, Error> {
if data.len() > PAYLOAD_CAPACITY {
return Err(Error::DataTooLarge {
capacity: PAYLOAD_CAPACITY,
provided: data.len(),
});
}
let _g = self.mu.lock().unwrap();
let mut header = self.read_header_locked()?;
let old_root = header.root;
let new_block = self.alloc_locked(&mut header)?;
self.write_block_with_next(new_block.0, old_root, data)?;
header.root = new_block.0;
self.write_header_locked(&header)?;
Ok(new_block)
}
pub fn pop_front(&self) -> Result<Option<Vec<u8>>, Error> {
let _g = self.mu.lock().unwrap();
let mut header = self.read_header_locked()?;
if header.root == 0 {
return Ok(None);
}
let old_root = header.root;
let (next, payload) = self.read_block_full(old_root)?;
header.root = next;
self.write_header_locked(&header)?;
self.free_locked(BlockRef(old_root), &mut header)?;
Ok(Some(payload))
}
pub fn pop_front_into(&self, buf: &mut [u8]) -> Result<bool, Error> {
if buf.len() > PAYLOAD_CAPACITY {
return Err(Error::DataTooLarge {
capacity: PAYLOAD_CAPACITY,
provided: buf.len(),
});
}
let _g = self.mu.lock().unwrap();
let mut header = self.read_header_locked()?;
if header.root == 0 {
return Ok(false);
}
let old_root = header.root;
self.read_into(BlockRef(old_root), buf)?;
let mut next_buf = [0u8; 8];
self.stack.get_into(old_root + 4, &mut next_buf)?;
let next = u64::from_le_bytes(next_buf);
header.root = next;
self.write_header_locked(&header)?;
self.free_locked(BlockRef(old_root), &mut header)?;
Ok(true)
}
fn validate_block_offset(&self, offset: u64) -> Result<(), Error> {
let block_size = (PAYLOAD_CAPACITY + BLOCK_HEADER_SIZE) as u64;
if offset < HEADER_SIZE || !(offset - HEADER_SIZE).is_multiple_of(block_size) {
return Err(Error::InvalidBlock);
}
Ok(())
}
fn read_header_locked(&self) -> Result<Header, Error> {
let mut buf = [0u8; 24];
self.stack.get_into(0, &mut buf)?;
Header::from_bytes(&buf)
}
fn write_header_locked(&self, header: &Header) -> Result<(), Error> {
self.stack.set(0, &header.to_bytes())?;
Ok(())
}
fn alloc_locked(&self, header: &mut Header) -> Result<BlockRef, Error> {
if header.free_list_head != 0 {
let fh = header.free_list_head;
let mut next_buf = [0u8; 8];
self.stack.get_into(fh + 4, &mut next_buf)?;
header.free_list_head = u64::from_le_bytes(next_buf);
self.write_header_locked(header)?;
Ok(BlockRef(fh))
} else {
let block_size = PAYLOAD_CAPACITY + BLOCK_HEADER_SIZE;
let zeros = vec![0u8; block_size];
let offset = self.stack.push(&zeros)?;
debug_assert!(
offset >= HEADER_SIZE && (offset - HEADER_SIZE).is_multiple_of(block_size as u64),
"newly pushed block has misaligned offset {offset}"
);
Ok(BlockRef(offset))
}
}
fn free_locked(&self, block: BlockRef, header: &mut Header) -> Result<(), Error> {
let block_size = PAYLOAD_CAPACITY + BLOCK_HEADER_SIZE;
let mut buf = vec![0u8; block_size];
buf[4..12].copy_from_slice(&header.free_list_head.to_le_bytes());
let crc = crc32fast::hash(&buf[4..]);
buf[0..4].copy_from_slice(&crc.to_le_bytes());
self.stack.set(block.0, &buf)?;
header.free_list_head = block.0;
self.write_header_locked(header)?;
Ok(())
}
fn write_block_with_next(&self, offset: u64, next: u64, data: &[u8]) -> Result<(), Error> {
let block_size = PAYLOAD_CAPACITY + BLOCK_HEADER_SIZE;
let mut buf = vec![0u8; block_size];
buf[4..12].copy_from_slice(&next.to_le_bytes());
buf[12..12 + data.len()].copy_from_slice(data);
let crc = crc32fast::hash(&buf[4..]);
buf[0..4].copy_from_slice(&crc.to_le_bytes());
self.stack.set(offset, &buf)?;
Ok(())
}
fn read_block_full(&self, offset: u64) -> Result<(u64, Vec<u8>), Error> {
let block_size = PAYLOAD_CAPACITY + BLOCK_HEADER_SIZE;
let buf = self.stack.get(offset, offset + block_size as u64)?;
if buf.len() != block_size {
return Err(Error::InvalidBlock);
}
let stored = u32::from_le_bytes(buf[0..4].try_into().unwrap());
let computed = crc32fast::hash(&buf[4..]);
if computed != stored {
return Err(Error::ChecksumMismatch { block: offset });
}
let next = u64::from_le_bytes(buf[4..12].try_into().unwrap());
let payload = buf[12..].to_vec();
Ok((next, payload))
}
fn recover_orphans(stack: &BStack, header: &mut Header, total: u64) -> Result<(), Error> {
if total <= HEADER_SIZE {
return Ok(());
}
let block_size = PAYLOAD_CAPACITY + BLOCK_HEADER_SIZE;
let num_blocks = ((total - HEADER_SIZE) / block_size as u64) as usize;
let max_steps = num_blocks + 1;
let mut active: HashSet<u64> = HashSet::new();
let mut cur = header.root;
let mut steps = 0usize;
while cur != 0 {
if steps >= max_steps {
return Err(Error::Corruption("cycle detected in active list".into()));
}
let mut block_buf = vec![0u8; block_size];
stack.get_into(cur, &mut block_buf)?;
let stored = u32::from_le_bytes(block_buf[0..4].try_into().unwrap());
if crc32fast::hash(&block_buf[4..]) != stored {
return Err(Error::ChecksumMismatch { block: cur });
}
active.insert(cur);
cur = u64::from_le_bytes(block_buf[4..12].try_into().unwrap());
steps += 1;
}
let mut free_set: HashSet<u64> = HashSet::new();
cur = header.free_list_head;
steps = 0;
while cur != 0 {
if steps >= max_steps {
return Err(Error::Corruption("cycle detected in free list".into()));
}
let mut block_buf = vec![0u8; block_size];
stack.get_into(cur, &mut block_buf)?;
let stored = u32::from_le_bytes(block_buf[0..4].try_into().unwrap());
if crc32fast::hash(&block_buf[4..]) != stored {
return Err(Error::ChecksumMismatch { block: cur });
}
free_set.insert(cur);
cur = u64::from_le_bytes(block_buf[4..12].try_into().unwrap());
steps += 1;
}
let mut found_orphan = false;
for i in 0..num_blocks as u64 {
let offset = HEADER_SIZE + i * block_size as u64;
if active.contains(&offset) || free_set.contains(&offset) {
continue;
}
let mut buf = vec![0u8; block_size];
buf[4..12].copy_from_slice(&header.free_list_head.to_le_bytes());
let crc = crc32fast::hash(&buf[4..]);
buf[0..4].copy_from_slice(&crc.to_le_bytes());
stack.set(offset, &buf)?;
header.free_list_head = offset;
found_orphan = true;
}
if found_orphan {
stack.set(0, &header.to_bytes())?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU64, Ordering};
type List = FixedBlockList<52>;
const CAP: usize = 52;
static COUNTER: AtomicU64 = AtomicU64::new(0);
fn tmp(label: &str) -> std::path::PathBuf {
let n = COUNTER.fetch_add(1, Ordering::Relaxed);
let mut p = std::env::temp_dir();
p.push(format!(
"bllist_{}_{}_{}.blls",
std::process::id(),
label,
n
));
p
}
#[test]
fn fresh_open_empty() {
let path = tmp("fresh");
let list = List::open(&path).unwrap();
assert_eq!(list.root().unwrap(), None);
let _ = std::fs::remove_file(&path);
}
#[test]
fn payload_capacity_const() {
assert_eq!(List::payload_capacity(), CAP);
assert_eq!(FixedBlockList::<116>::payload_capacity(), 116);
}
#[test]
fn alloc_free_reuse() {
let path = tmp("alloc");
let list = List::open(&path).unwrap();
let b0 = list.alloc().unwrap();
let b1 = list.alloc().unwrap();
let b2 = list.alloc().unwrap();
list.free(b1).unwrap();
let b3 = list.alloc().unwrap();
assert_eq!(b3, b1);
drop(list);
let _ = std::fs::remove_file(&path);
let _ = (b0, b2, b3);
}
#[test]
fn write_read_roundtrip() {
let path = tmp("rw");
let list = List::open(&path).unwrap();
let block = list.alloc().unwrap();
let data = b"hello, bllist!";
list.write(block, data).unwrap();
let out = list.read(block).unwrap();
assert_eq!(&out[..data.len()], data);
assert!(out[data.len()..].iter().all(|&b| b == 0));
let _ = std::fs::remove_file(&path);
}
#[test]
fn overwrite_shorter_zeroes_tail() {
let path = tmp("overwrite");
let list = List::open(&path).unwrap();
let block = list.alloc().unwrap();
list.write(block, b"longer data here!!").unwrap();
list.write(block, b"short").unwrap();
let out = list.read(block).unwrap();
assert_eq!(&out[..5], b"short");
assert!(out[5..].iter().all(|&b| b == 0));
let _ = std::fs::remove_file(&path);
}
#[test]
fn read_into_full_capacity() {
let path = tmp("read_into");
let list = List::open(&path).unwrap();
let block = list.alloc().unwrap();
let data: Vec<u8> = (0..CAP as u8).collect();
list.write(block, &data).unwrap();
let mut buf = vec![0u8; CAP];
list.read_into(block, &mut buf).unwrap();
assert_eq!(buf, data);
let _ = std::fs::remove_file(&path);
}
#[test]
fn read_into_shorter_than_written_fails_crc() {
let path = tmp("read_into_crc");
let list = List::open(&path).unwrap();
let block = list.alloc().unwrap();
list.write(block, &[0xAB; 20]).unwrap();
let mut buf = vec![0u8; 5];
let err = list.read_into(block, &mut buf).unwrap_err();
assert!(matches!(err, Error::ChecksumMismatch { .. }));
let _ = std::fs::remove_file(&path);
}
#[test]
fn read_into_exact_written_length_passes() {
let path = tmp("read_into_exact");
let list = List::open(&path).unwrap();
let block = list.alloc().unwrap();
list.write(block, &[0xAB; 10]).unwrap();
let mut buf = vec![0u8; 10];
list.read_into(block, &mut buf).unwrap();
assert_eq!(buf, vec![0xAB; 10]);
let _ = std::fs::remove_file(&path);
}
#[test]
fn checksum_mismatch_on_corrupt_block() {
let path = tmp("crc");
{
let list = List::open(&path).unwrap();
list.push_front(b"integrity").unwrap();
}
{
let stack = BStack::open(&path).unwrap();
let block_offset: u64 = 24;
let mut byte = [0u8; 1];
stack.get_into(block_offset + 12, &mut byte).unwrap();
byte[0] ^= 0xFF;
stack.set(block_offset + 12, &byte).unwrap();
}
let err = List::open(&path).unwrap_err();
assert!(matches!(err, Error::ChecksumMismatch { .. }));
let _ = std::fs::remove_file(&path);
}
#[test]
fn set_get_next() {
let path = tmp("next");
let list = List::open(&path).unwrap();
let b0 = list.alloc().unwrap();
let b1 = list.alloc().unwrap();
assert_eq!(list.get_next(b0).unwrap(), None);
list.set_next(b0, Some(b1)).unwrap();
assert_eq!(list.get_next(b0).unwrap(), Some(b1));
list.set_next(b0, None).unwrap();
assert_eq!(list.get_next(b0).unwrap(), None);
let _ = std::fs::remove_file(&path);
}
#[test]
fn set_next_preserves_payload() {
let path = tmp("next_payload");
let list = List::open(&path).unwrap();
let b0 = list.alloc().unwrap();
let b1 = list.alloc().unwrap();
list.write(b0, b"preserved").unwrap();
list.set_next(b0, Some(b1)).unwrap();
let out = list.read(b0).unwrap();
assert_eq!(&out[..9], b"preserved");
let _ = std::fs::remove_file(&path);
}
#[test]
fn push_pop_lifo() {
let path = tmp("lifo");
let list = List::open(&path).unwrap();
list.push_front(b"first").unwrap();
list.push_front(b"second").unwrap();
list.push_front(b"third").unwrap();
let d1 = list.pop_front().unwrap().unwrap();
assert_eq!(&d1[..5], b"third");
let d2 = list.pop_front().unwrap().unwrap();
assert_eq!(&d2[..6], b"second");
let d3 = list.pop_front().unwrap().unwrap();
assert_eq!(&d3[..5], b"first");
assert_eq!(list.pop_front().unwrap(), None);
let _ = std::fs::remove_file(&path);
}
#[test]
fn pop_front_into_basic() {
let path = tmp("pop_into");
let list = List::open(&path).unwrap();
list.push_front(b"hello").unwrap();
let mut buf = vec![0u8; 5];
let popped = list.pop_front_into(&mut buf).unwrap();
assert!(popped);
assert_eq!(&buf, b"hello");
let popped2 = list.pop_front_into(&mut buf).unwrap();
assert!(!popped2);
let _ = std::fs::remove_file(&path);
}
#[test]
fn pop_front_empty() {
let path = tmp("pop_empty");
let list = List::open(&path).unwrap();
assert_eq!(list.pop_front().unwrap(), None);
let _ = std::fs::remove_file(&path);
}
#[test]
fn orphan_recovery() {
let path = tmp("orphan");
{
let stack = BStack::open(&path).unwrap();
let hdr = Header {
root: 0,
free_list_head: 0,
};
let off = stack.push(&hdr.to_bytes()).unwrap();
assert_eq!(off, 0);
let mut block = [0u8; 64];
let crc = crc32fast::hash(&block[4..]);
block[0..4].copy_from_slice(&crc.to_le_bytes());
stack.push(&block).unwrap();
}
let list = List::open(&path).unwrap();
assert_eq!(list.root().unwrap(), None);
let b = list.alloc().unwrap();
assert_eq!(b.0, 24);
let _ = std::fs::remove_file(&path);
}
#[test]
fn data_too_large() {
let path = tmp("toolarge");
let list = List::open(&path).unwrap();
let block = list.alloc().unwrap();
let err = list.write(block, &vec![0u8; CAP + 1]).unwrap_err();
assert!(matches!(err, Error::DataTooLarge { .. }));
let _ = std::fs::remove_file(&path);
}
#[test]
fn invalid_block_offset() {
let path = tmp("invalid");
let list = List::open(&path).unwrap();
let err = list.read(BlockRef(1)).unwrap_err();
assert!(matches!(err, Error::InvalidBlock));
let _ = std::fs::remove_file(&path);
}
#[test]
fn invalid_block_header_offset() {
let path = tmp("invalid_hdr");
let list = List::open(&path).unwrap();
let err = list.read(BlockRef(0)).unwrap_err();
assert!(matches!(err, Error::InvalidBlock));
let _ = std::fs::remove_file(&path);
}
#[test]
fn reopen_persists_data() {
let path = tmp("reopen");
{
let list = List::open(&path).unwrap();
list.push_front(b"persisted").unwrap();
}
{
let list = List::open(&path).unwrap();
let data = list.pop_front().unwrap().unwrap();
assert_eq!(&data[..9], b"persisted");
assert_eq!(list.pop_front().unwrap(), None);
}
let _ = std::fs::remove_file(&path);
}
#[test]
fn write_empty_data() {
let path = tmp("empty_data");
let list = List::open(&path).unwrap();
let block = list.alloc().unwrap();
list.write(block, &[]).unwrap();
let out = list.read(block).unwrap();
assert!(out.iter().all(|&b| b == 0));
let _ = std::fs::remove_file(&path);
}
}