mod test;
use std::fs::{File, OpenOptions};
use std::io::{self, Read, Seek, SeekFrom, Write};
use std::path::Path;
use std::sync::RwLock;
#[cfg(unix)]
use std::os::unix::fs::FileExt;
#[cfg(unix)]
use std::os::unix::io::AsRawFd;
#[cfg(windows)]
use std::os::windows::fs::FileExt as WindowsFileExt;
#[cfg(windows)]
use std::os::windows::io::AsRawHandle;
#[cfg(windows)]
use windows_sys::Win32::Storage::FileSystem::{
LOCKFILE_EXCLUSIVE_LOCK, LOCKFILE_FAIL_IMMEDIATELY, LockFileEx,
};
#[cfg(windows)]
use windows_sys::Win32::System::IO::OVERLAPPED;
const MAGIC: [u8; 8] = *b"BSTK\x00\x01\x03\x00";
const MAGIC_PREFIX: [u8; 6] = *b"BSTK\x00\x01";
const HEADER_SIZE: u64 = 16;
fn durable_sync(file: &File) -> io::Result<()> {
#[cfg(target_os = "macos")]
{
let ret = unsafe { libc::fcntl(file.as_raw_fd(), libc::F_FULLFSYNC) };
if ret != -1 {
return Ok(());
}
}
file.sync_data()
}
#[cfg(unix)]
fn flock_exclusive(file: &File) -> io::Result<()> {
let ret = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX | libc::LOCK_NB) };
if ret == 0 {
Ok(())
} else {
Err(io::Error::last_os_error())
}
}
#[cfg(windows)]
fn lock_file_exclusive(file: &File) -> io::Result<()> {
let handle = file.as_raw_handle() as windows_sys::Win32::Foundation::HANDLE;
let mut overlapped: OVERLAPPED = unsafe { std::mem::zeroed() };
let ret = unsafe {
LockFileEx(
handle,
LOCKFILE_EXCLUSIVE_LOCK | LOCKFILE_FAIL_IMMEDIATELY,
0, u32::MAX, u32::MAX, &mut overlapped,
)
};
if ret != 0 {
Ok(())
} else {
Err(io::Error::last_os_error())
}
}
fn init_header(file: &mut File) -> io::Result<()> {
file.seek(SeekFrom::Start(0))?;
file.write_all(&MAGIC)?;
file.write_all(&0u64.to_le_bytes())
}
fn write_committed_len(file: &mut File, len: u64) -> io::Result<()> {
file.seek(SeekFrom::Start(8))?;
file.write_all(&len.to_le_bytes())
}
#[cfg(unix)]
fn pread_exact(file: &File, offset: u64, len: usize) -> io::Result<Vec<u8>> {
let mut buf = vec![0u8; len];
file.read_exact_at(&mut buf, offset)?;
Ok(buf)
}
#[cfg(windows)]
fn pread_exact(file: &File, offset: u64, len: usize) -> io::Result<Vec<u8>> {
let mut buf = vec![0u8; len];
let mut filled = 0usize;
while filled < len {
let n = file.seek_read(&mut buf[filled..], offset + filled as u64)?;
if n == 0 {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"pread_exact: unexpected EOF",
));
}
filled += n;
}
Ok(buf)
}
#[cfg(unix)]
fn pread_exact_into(file: &File, offset: u64, buf: &mut [u8]) -> io::Result<()> {
file.read_exact_at(buf, offset)
}
#[cfg(windows)]
fn pread_exact_into(file: &File, offset: u64, buf: &mut [u8]) -> io::Result<()> {
let len = buf.len();
let mut filled = 0usize;
while filled < len {
let n = file.seek_read(&mut buf[filled..], offset + filled as u64)?;
if n == 0 {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"pread_exact_into: unexpected EOF",
));
}
filled += n;
}
Ok(())
}
fn read_header(file: &mut File) -> io::Result<u64> {
file.seek(SeekFrom::Start(0))?;
let mut hdr = [0u8; 16];
file.read_exact(&mut hdr)?;
if hdr[0..6] != MAGIC_PREFIX {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"bstack: bad magic number — not a bstack file or incompatible version",
));
}
Ok(u64::from_le_bytes(hdr[8..16].try_into().unwrap()))
}
pub struct BStack {
lock: RwLock<File>,
}
impl BStack {
pub fn open(path: impl AsRef<Path>) -> io::Result<Self> {
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(path)?;
#[cfg(unix)]
flock_exclusive(&file)?;
#[cfg(windows)]
lock_file_exclusive(&file)?;
let raw_size = file.metadata()?.len();
if raw_size == 0 {
init_header(&mut file)?;
durable_sync(&file)?;
} else if raw_size < HEADER_SIZE {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"bstack: file is {raw_size} bytes — too small to contain the 16-byte header"
),
));
} else {
let committed_len = read_header(&mut file)?;
let actual_data_len = raw_size - HEADER_SIZE;
if actual_data_len != committed_len {
let correct_len = committed_len.min(actual_data_len);
file.set_len(HEADER_SIZE + correct_len)?;
write_committed_len(&mut file, correct_len)?;
durable_sync(&file)?;
}
}
Ok(BStack {
lock: RwLock::new(file),
})
}
pub fn push(&self, data: &[u8]) -> io::Result<u64> {
let mut file = self.lock.write().unwrap();
let file_end = file.seek(SeekFrom::End(0))?;
let logical_offset = file_end - HEADER_SIZE;
if data.is_empty() {
return Ok(logical_offset);
}
if let Err(e) = file.write_all(data) {
let _ = file.set_len(file_end);
return Err(e);
}
let new_len = logical_offset + data.len() as u64;
if let Err(e) = write_committed_len(&mut file, new_len).and_then(|_| durable_sync(&file)) {
let _ = file.set_len(file_end);
let _ = write_committed_len(&mut file, logical_offset);
return Err(e);
}
Ok(logical_offset)
}
pub fn pop(&self, n: u64) -> io::Result<Vec<u8>> {
let mut file = self.lock.write().unwrap();
let raw_size = file.seek(SeekFrom::End(0))?;
let data_size = raw_size - HEADER_SIZE;
if n > data_size {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("pop({n}) exceeds payload size ({data_size})"),
));
}
let new_data_len = data_size - n;
file.seek(SeekFrom::Start(HEADER_SIZE + new_data_len))?;
let mut buf = vec![0u8; n as usize];
file.read_exact(&mut buf)?;
file.set_len(HEADER_SIZE + new_data_len)?;
write_committed_len(&mut file, new_data_len)?;
durable_sync(&file)?;
Ok(buf)
}
pub fn peek(&self, offset: u64) -> io::Result<Vec<u8>> {
#[cfg(any(unix, windows))]
{
let file = self.lock.read().unwrap();
let data_size = file.metadata()?.len().saturating_sub(HEADER_SIZE);
if offset > data_size {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("peek offset ({offset}) exceeds payload size ({data_size})"),
));
}
pread_exact(&file, HEADER_SIZE + offset, (data_size - offset) as usize)
}
#[cfg(not(any(unix, windows)))]
{
let mut file = self.lock.write().unwrap();
let raw_size = file.seek(SeekFrom::End(0))?;
let data_size = raw_size.saturating_sub(HEADER_SIZE);
if offset > data_size {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("peek offset ({offset}) exceeds payload size ({data_size})"),
));
}
file.seek(SeekFrom::Start(HEADER_SIZE + offset))?;
let mut buf = vec![0u8; (data_size - offset) as usize];
file.read_exact(&mut buf)?;
Ok(buf)
}
}
pub fn get(&self, start: u64, end: u64) -> io::Result<Vec<u8>> {
if end < start {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("get: end ({end}) < start ({start})"),
));
}
#[cfg(any(unix, windows))]
{
let file = self.lock.read().unwrap();
let data_size = file.metadata()?.len().saturating_sub(HEADER_SIZE);
if end > data_size {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("get: end ({end}) exceeds payload size ({data_size})"),
));
}
pread_exact(&file, HEADER_SIZE + start, (end - start) as usize)
}
#[cfg(not(any(unix, windows)))]
{
let mut file = self.lock.write().unwrap();
let raw_size = file.seek(SeekFrom::End(0))?;
let data_size = raw_size.saturating_sub(HEADER_SIZE);
if end > data_size {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("get: end ({end}) exceeds payload size ({data_size})"),
));
}
file.seek(SeekFrom::Start(HEADER_SIZE + start))?;
let mut buf = vec![0u8; (end - start) as usize];
file.read_exact(&mut buf)?;
Ok(buf)
}
}
pub fn peek_into(&self, offset: u64, buf: &mut [u8]) -> io::Result<()> {
if buf.is_empty() {
return Ok(());
}
let len = buf.len() as u64;
let end = offset.checked_add(len).ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"peek_into: offset + len overflows u64",
)
})?;
#[cfg(any(unix, windows))]
{
let file = self.lock.read().unwrap();
let data_size = file.metadata()?.len().saturating_sub(HEADER_SIZE);
if end > data_size {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!(
"peek_into: range [{offset}, {end}) exceeds payload size ({data_size})"
),
));
}
pread_exact_into(&file, HEADER_SIZE + offset, buf)
}
#[cfg(not(any(unix, windows)))]
{
let mut file = self.lock.write().unwrap();
let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE);
if end > data_size {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!(
"peek_into: range [{offset}, {end}) exceeds payload size ({data_size})"
),
));
}
file.seek(SeekFrom::Start(HEADER_SIZE + offset))?;
file.read_exact(buf)
}
}
pub fn get_into(&self, start: u64, buf: &mut [u8]) -> io::Result<()> {
if buf.is_empty() {
return Ok(());
}
let len = buf.len() as u64;
let end = start.checked_add(len).ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"get_into: start + len overflows u64",
)
})?;
#[cfg(any(unix, windows))]
{
let file = self.lock.read().unwrap();
let data_size = file.metadata()?.len().saturating_sub(HEADER_SIZE);
if end > data_size {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("get_into: end ({end}) exceeds payload size ({data_size})"),
));
}
pread_exact_into(&file, HEADER_SIZE + start, buf)
}
#[cfg(not(any(unix, windows)))]
{
let mut file = self.lock.write().unwrap();
let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE);
if end > data_size {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("get_into: end ({end}) exceeds payload size ({data_size})"),
));
}
file.seek(SeekFrom::Start(HEADER_SIZE + start))?;
file.read_exact(buf)
}
}
pub fn pop_into(&self, buf: &mut [u8]) -> io::Result<()> {
if buf.is_empty() {
return Ok(());
}
let n = buf.len() as u64;
let mut file = self.lock.write().unwrap();
let raw_size = file.seek(SeekFrom::End(0))?;
let data_size = raw_size - HEADER_SIZE;
if n > data_size {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("pop_into({n}) exceeds payload size ({data_size})"),
));
}
let new_data_len = data_size - n;
file.seek(SeekFrom::Start(HEADER_SIZE + new_data_len))?;
file.read_exact(buf)?;
file.set_len(HEADER_SIZE + new_data_len)?;
write_committed_len(&mut file, new_data_len)?;
durable_sync(&file)?;
Ok(())
}
#[cfg(feature = "set")]
pub fn set(&self, offset: u64, data: &[u8]) -> io::Result<()> {
if data.is_empty() {
return Ok(());
}
let end = offset.checked_add(data.len() as u64).ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"set: offset + len overflows u64",
)
})?;
let mut file = self.lock.write().unwrap();
let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE);
if end > data_size {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("set: write end ({end}) exceeds payload size ({data_size})"),
));
}
file.seek(SeekFrom::Start(HEADER_SIZE + offset))?;
file.write_all(data)?;
durable_sync(&file)
}
pub fn len(&self) -> io::Result<u64> {
let file = self.lock.read().unwrap();
Ok(file.metadata()?.len().saturating_sub(HEADER_SIZE))
}
pub fn is_empty(&self) -> io::Result<bool> {
Ok(self.len()? == 0)
}
}
impl io::Write for BStack {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.push(buf)?;
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl io::Write for &BStack {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.push(buf)?;
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
pub struct BStackReader<'a> {
stack: &'a BStack,
offset: u64,
}
impl BStack {
pub fn reader(&self) -> BStackReader<'_> {
BStackReader {
stack: self,
offset: 0,
}
}
pub fn reader_at(&self, offset: u64) -> BStackReader<'_> {
BStackReader {
stack: self,
offset,
}
}
}
impl<'a> BStackReader<'a> {
pub fn position(&self) -> u64 {
self.offset
}
}
impl<'a> From<&'a BStack> for BStackReader<'a> {
fn from(stack: &'a BStack) -> Self {
stack.reader()
}
}
impl<'a> io::Read for BStackReader<'a> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if buf.is_empty() {
return Ok(0);
}
let data_size = self.stack.len()?;
if self.offset >= data_size {
return Ok(0);
}
let available = (data_size - self.offset) as usize;
let n = buf.len().min(available);
self.stack.get_into(self.offset, &mut buf[..n])?;
self.offset += n as u64;
Ok(n)
}
}
impl<'a> io::Seek for BStackReader<'a> {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
let data_size = self.stack.len()? as i128;
let new_offset = match pos {
SeekFrom::Start(n) => n as i128,
SeekFrom::End(n) => data_size + n as i128,
SeekFrom::Current(n) => self.offset as i128 + n as i128,
};
if new_offset < 0 {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"seek before beginning of payload",
));
}
self.offset = new_offset as u64;
Ok(self.offset)
}
}