bstack/lib.rs
1//! A persistent, fsync-durable binary stack backed by a single file.
2//!
3//! # Overview
4//!
5//! [`BStack`] treats a file as a flat byte buffer that grows and shrinks from
6//! the tail. Every mutating operation — [`push`](BStack::push),
7//! [`extend`](BStack::extend), [`pop`](BStack::pop), [`discard`](BStack::discard), and (with the `set`
8//! feature) [`set`](BStack::set) and [`zero`](BStack::zero) — calls a *durable sync* before returning,
9//! so the data survives a process crash or an unclean system shutdown.
10//! Read-only operations — [`peek`](BStack::peek),
11//! [`peek_into`](BStack::peek_into), [`get`](BStack::get), and
12//! [`get_into`](BStack::get_into) — never modify the file and on Unix and
13//! Windows can run concurrently with each other.
14//! [`pop_into`](BStack::pop_into) is the buffer-passing counterpart of `pop`,
15//! carrying the same durability and atomicity guarantees.
16//! [`discard`](BStack::discard) is like `pop` but discards the removed bytes
17//! without reading or returning them, avoiding any allocation or copy.
18//!
19//! The crate depends on **`libc`** (Unix) and **`windows-sys`** (Windows) for
20//! platform-specific syscalls, and uses **no `unsafe` code beyond the required
21//! FFI calls**.
22//!
23//! # File format
24//!
25//! Every file begins with a fixed 16-byte header:
26//!
27//! ```text
28//! ┌────────────────────────┬──────────────┬──────────────┐
29//! │ header (16 B) │ payload 0 │ payload 1 │ ...
30//! │ magic[8] | clen[8 LE] │ │ │
31//! └────────────────────────┴──────────────┴──────────────┘
32//! ^ ^ ^ ^
33//! file offset 0 offset 16 16+n0 EOF
34//! ```
35//!
36//! * **`magic`** — 8 bytes: `BSTK` + major(1 B) + minor(1 B) + patch(1 B) + reserved(1 B).
37//! This version writes `BSTK\x00\x01\x05\x00` (0.1.5). [`open`](BStack::open)
38//! accepts any file whose first 6 bytes match `BSTK\x00\x01` (any 0.1.x) and
39//! rejects anything with a different major or minor.
40//! * **`clen`** — little-endian `u64` recording the *committed* payload length.
41//! It is updated atomically with each [`push`](BStack::push) or
42//! [`pop`](BStack::pop) and is used for crash recovery on the next
43//! [`open`](BStack::open).
44//!
45//! All user-visible offsets are **logical** (0-based from the start of the
46//! payload region, i.e. from file byte 16).
47//!
48//! # Crash recovery
49//!
50//! On [`open`](BStack::open), the header's committed length is compared against
51//! the actual file size:
52//!
53//! | Condition | Cause | Recovery |
54//! |-----------|-------|----------|
55//! | `file_size − 16 > clen` | partial tail write (push crashed before header update) | truncate to `16 + clen` |
56//! | `file_size − 16 < clen` | partial truncation (pop crashed before header update) | set `clen = file_size − 16` |
57//!
58//! After recovery a `durable_sync` ensures the repaired state is on stable
59//! storage before any caller can observe or modify the file.
60//!
61//! # Durability
62//!
63//! | Operation | Syscall sequence |
64//! |-----------|-----------------|
65//! | `push` | `lseek(END)` → `write(data)` → `lseek(8)` → `write(clen)` → `durable_sync` |
66//! | `extend` | `lseek(END)` → `set_len(new_end)` → `lseek(8)` → `write(clen)` → `durable_sync` |
67//! | `pop`, `pop_into` | `lseek` → `read` → `ftruncate` → `lseek(8)` → `write(clen)` → `durable_sync` |
68//! | `discard` | `ftruncate` → `lseek(8)` → `write(clen)` → `durable_sync` |
69//! | `set` *(feature)* | `lseek(offset)` → `write(data)` → `durable_sync` |
70//! | `zero` *(feature)* | `lseek(offset)` → `write(zeros)` → `durable_sync` |
71//! | `peek`, `peek_into`, `get`, `get_into` | `pread(2)` on Unix; `ReadFile`+`OVERLAPPED` on Windows; `lseek` → `read` elsewhere (no sync — read-only) |
72//!
73//! **`durable_sync` on macOS** issues `fcntl(F_FULLFSYNC)`, which flushes the
74//! drive's hardware write cache. Plain `fdatasync` is not sufficient on macOS
75//! because the kernel may acknowledge it before the drive controller has
76//! committed the data. If `F_FULLFSYNC` is not supported by the device the
77//! implementation falls back to `sync_data` (`fdatasync`).
78//!
79//! **`durable_sync` on other Unix** calls `sync_data` (`fdatasync`), which is
80//! sufficient on Linux and BSD.
81//!
82//! **`durable_sync` on Windows** calls `sync_data`, which maps to
83//! `FlushFileBuffers`. This flushes the kernel write-back cache and waits for
84//! the drive to acknowledge, providing equivalent durability to `fdatasync`.
85//!
86//! # Multi-process safety
87//!
88//! On Unix, [`open`](BStack::open) acquires an **exclusive advisory `flock`**
89//! on the file (`LOCK_EX | LOCK_NB`). If another process already holds the
90//! lock, `open` returns immediately with [`io::ErrorKind::WouldBlock`] rather
91//! than blocking indefinitely. The lock is released automatically when the
92//! [`BStack`] is dropped (the underlying file descriptor is closed).
93//!
94//! On Windows, [`open`](BStack::open) acquires an **exclusive `LockFileEx`**
95//! lock (`LOCKFILE_EXCLUSIVE_LOCK | LOCKFILE_FAIL_IMMEDIATELY`) covering the
96//! entire file range. If another process already holds the lock, `open`
97//! returns immediately with [`io::ErrorKind::WouldBlock`]
98//! (`ERROR_LOCK_VIOLATION`). The lock is released when the [`BStack`] is
99//! dropped (the underlying file handle is closed).
100//!
101//! > **Note:** Both `flock` (Unix) and `LockFileEx` (Windows) are advisory
102//! > and per-process. They prevent well-behaved concurrent opens across
103//! > processes but do not protect against processes that bypass the lock or
104//! > against raw writes to the file.
105//!
106//! # Correct usage
107//!
108//! bstack files must only be opened through this crate or a compatible
109//! implementation that understands the file format, the header protocol, and
110//! the locking semantics. Reading or writing the underlying file with raw
111//! tools or syscalls while a [`BStack`] instance is live — or manually editing
112//! the header fields — can silently corrupt the committed-length sentinel or
113//! bypass the advisory lock.
114//!
115//! **The authors make no guarantees about the behaviour of this crate —
116//! including freedom from data loss or logical corruption — when the file has
117//! been accessed outside of this crate's controlled interface.**
118//!
119//! # Thread safety
120//!
121//! `BStack` wraps the file in a [`std::sync::RwLock`].
122//!
123//! | Operation | Lock (Unix / Windows) | Lock (other) |
124//! |-----------|-----------------------|--------------|
125//! | `push`, `extend`, `pop`, `pop_into`, `discard` | write | write |
126//! | `set`, `zero` *(feature)* | write | write |
127//! | `peek`, `peek_into`, `get`, `get_into` | **read** | write |
128//! | `len` | read | read |
129//!
130//! On Unix and Windows, `peek`, `peek_into`, `get`, and `get_into` use a
131//! cursor-safe positional read (`pread(2)` on Unix; `ReadFile` with
132//! `OVERLAPPED` on Windows) that does not modify the file-position cursor.
133//! This allows multiple concurrent calls to any of these methods to run in
134//! parallel while any ongoing `push`, `pop`, or `pop_into` still serialises
135//! all writers via the write lock.
136//!
137//! On other platforms a seek is required, so `peek`, `peek_into`, `get`, and
138//! `get_into` fall back to the write lock and all reads serialise.
139//!
140//! # Standard I/O adapters
141//!
142//! ## Writing
143//!
144//! `BStack` implements [`std::io::Write`] (and so does `&BStack`, mirroring
145//! [`std::io::Write` for `&File`]). Each call to `write` is forwarded to
146//! [`push`](BStack::push), so every write is atomically appended and durably
147//! synced before returning. `flush` is a no-op.
148//!
149//! ```no_run
150//! use std::io::Write;
151//! use bstack::BStack;
152//!
153//! # fn main() -> std::io::Result<()> {
154//! let mut stack = BStack::open("log.bin")?;
155//! stack.write_all(b"hello")?;
156//! stack.write_all(b"world")?;
157//! # Ok(())
158//! # }
159//! ```
160//!
161//! ## Reading
162//!
163//! [`BStackReader`] wraps a `&BStack` with a cursor and implements
164//! [`std::io::Read`] and [`std::io::Seek`]. Use [`BStack::reader`] or
165//! [`BStack::reader_at`] to construct one.
166//!
167//! ```no_run
168//! use std::io::{Read, Seek, SeekFrom};
169//! use bstack::BStack;
170//!
171//! # fn main() -> std::io::Result<()> {
172//! let stack = BStack::open("log.bin")?;
173//! stack.push(b"hello world")?;
174//!
175//! let mut reader = stack.reader();
176//! let mut buf = [0u8; 5];
177//! reader.read_exact(&mut buf)?; // b"hello"
178//! reader.seek(SeekFrom::Start(6))?;
179//! reader.read_exact(&mut buf)?; // b"world"
180//! # Ok(())
181//! # }
182//! ```
183//!
184//! # Trait implementations
185//!
186//! ## `BStack`
187//!
188//! | Trait | Semantics |
189//! |-------|-----------|
190//! | `Debug` | Shows `version` (semver string from the magic header, e.g. `"0.1.5"`) and `len` (`Option<u64>`, `None` on I/O failure). |
191//! | `PartialEq` / `Eq` | **Pointer identity.** Two values are equal iff they are the same instance. No two distinct `BStack` values in one process can refer to the same file. |
192//! | `Hash` | Hashes the instance address — consistent with pointer-identity `PartialEq`. |
193//!
194//! ## `BStackReader`
195//!
196//! | Trait | Semantics |
197//! |-------|-----------|
198//! | `PartialEq` / `Eq` | Equal when both the `BStack` pointer (identity) and the cursor `offset` match. |
199//! | `Hash` | Hashes `(BStack pointer, offset)` — consistent with `PartialEq`. |
200//! | `PartialOrd` / `Ord` | Ordered by `BStack` instance address, then by cursor `offset`. Groups all readers over the same stack and within that group orders by position. |
201//!
202//! # Feature flags
203//!
204//! | Feature | Description |
205//! |---------|-------------|
206//! | `set` | Enables [`BStack::set`] and [`BStack::zero`] — in-place overwrite of existing payload bytes (or with zeros) without changing the file size. |
207//! | `alloc` | Enables [`BStackAllocator`], [`BStackSlice`], [`BStackSliceReader`], and [`LinearBStackAllocator`] — region-based allocation over a `BStack` payload. |
208//!
209//! Enable with:
210//!
211//! ```toml
212//! [dependencies]
213//! bstack = { version = "0.1", features = ["set"] }
214//! # or
215//! bstack = { version = "0.1", features = ["alloc"] }
216//! # or both
217//! bstack = { version = "0.1", features = ["alloc", "set"] }
218//! ```
219//!
220//! # Allocator (`alloc` feature)
221//!
222//! The `alloc` feature adds a region-management layer on top of [`BStack`].
223//!
224//! ## Key types
225//!
226//! * [`BStackAllocator`] — trait for types that own a [`BStack`] and manage
227//! contiguous byte regions within its payload. Requires `stack()`,
228//! `into_stack()`, `alloc()`, and `realloc()`; provides a default no-op
229//! `dealloc()` and delegation helpers `len()` / `is_empty()`.
230//!
231//! * [`BStackSlice`]`<'a, A>` — lightweight `Copy` handle (allocator reference +
232//! offset + length) to a contiguous region. Exposes `read`, `read_into`,
233//! `read_range_into`, `subslice`, `subslice_range`, `reader`, `reader_at`,
234//! and (with the `set` feature) `write`, `write_range`, `zero`, `zero_range`.
235//!
236//! * [`BStackSliceReader`]`<'a, A>` — cursor-based reader over a
237//! [`BStackSlice`], implementing [`io::Read`] and [`io::Seek`] in the
238//! slice's coordinate space.
239//!
240//! * [`LinearBStackAllocator`] — reference bump allocator that appends regions
241//! sequentially. `realloc` is O(1) for the tail allocation and returns
242//! `Unsupported` for non-tail slices. `dealloc` reclaims the tail via
243//! [`BStack::discard`]; non-tail deallocations are a no-op. Every operation
244//! maps to exactly one [`BStack`] call and is crash-safe by inheritance.
245//!
246//! ## Lifetime model
247//!
248//! `BStackSlice<'a, A>` borrows the **allocator** for `'a`, not the
249//! [`BStack`] directly. As a result the borrow checker statically prevents
250//! calling [`BStackAllocator::into_stack`] — which consumes the allocator by
251//! value — while any slice is still in scope.
252//!
253//! ## Quick example
254//!
255//! ```skip
256//! use bstack::{BStack, BStackAllocator, LinearBStackAllocator};
257//!
258//! # fn main() -> std::io::Result<()> {
259//! let alloc = LinearBStackAllocator::new(BStack::open("data.bstack")?);
260//!
261//! let slice = alloc.alloc(128)?; // reserve 128 zero bytes
262//! let data = slice.read()?; // read them back
263//! alloc.dealloc(slice)?; // release (tail, so O(1))
264//!
265//! let stack = alloc.into_stack(); // reclaim the BStack
266//! # Ok(())
267//! # }
268//! ```
269//!
270//! # Examples
271//!
272//! ```no_run
273//! use bstack::BStack;
274//!
275//! # fn main() -> std::io::Result<()> {
276//! let stack = BStack::open("log.bin")?;
277//!
278//! // push returns the logical byte offset where the payload starts.
279//! let off0 = stack.push(b"hello")?; // 0
280//! let off1 = stack.push(b"world")?; // 5
281//!
282//! assert_eq!(stack.len()?, 10);
283//!
284//! // peek reads from a logical offset to the end without removing anything.
285//! assert_eq!(stack.peek(off1)?, b"world");
286//!
287//! // get reads an arbitrary half-open logical byte range.
288//! assert_eq!(stack.get(3, 8)?, b"lowor");
289//!
290//! // pop removes bytes from the tail and returns them.
291//! assert_eq!(stack.pop(5)?, b"world");
292//! assert_eq!(stack.len()?, 5);
293//! # Ok(())
294//! # }
295//! ```
296
297mod test;
298
299#[cfg(feature = "alloc")]
300mod alloc;
301#[cfg(feature = "alloc")]
302pub use alloc::{BStackAllocator, BStackSlice, BStackSliceReader, LinearBStackAllocator};
303#[cfg(all(feature = "alloc", feature = "set"))]
304pub use alloc::{BStackSliceWriter, FirstFitBStackAllocator};
305
306use std::fmt;
307use std::fs::{File, OpenOptions};
308use std::hash::{Hash, Hasher};
309use std::io::{self, Read, Seek, SeekFrom, Write};
310use std::path::Path;
311use std::sync::RwLock;
312
313#[cfg(unix)]
314use std::os::unix::fs::FileExt;
315#[cfg(unix)]
316use std::os::unix::io::AsRawFd;
317
318#[cfg(windows)]
319use std::os::windows::fs::FileExt as WindowsFileExt;
320#[cfg(windows)]
321use std::os::windows::io::AsRawHandle;
322#[cfg(windows)]
323use windows_sys::Win32::Storage::FileSystem::{
324 LOCKFILE_EXCLUSIVE_LOCK, LOCKFILE_FAIL_IMMEDIATELY, LockFileEx,
325};
326#[cfg(windows)]
327use windows_sys::Win32::System::IO::OVERLAPPED;
328
329/// Full magic for files written by this version (`BSTK` + major 0 + minor 1 + patch 5 + 0).
330const MAGIC: [u8; 8] = *b"BSTK\x00\x01\x05\x00";
331
332/// Compatibility prefix checked on open: `BSTK` + major 0 + minor 1.
333/// Any file whose first 6 bytes match is considered a compatible 0.1.x file.
334const MAGIC_PREFIX: [u8; 6] = *b"BSTK\x00\x01";
335
336/// Bytes occupied by the file header (magic[8] + committed_len[8]).
337const HEADER_SIZE: u64 = 16;
338
339/// Flush all in-flight writes to stable storage.
340///
341/// On macOS this uses `F_FULLFSYNC` to flush the drive's hardware write cache,
342/// which `fdatasync` alone does not guarantee. Falls back to `sync_data` if
343/// `F_FULLFSYNC` returns an error (e.g. the device doesn't support it).
344/// On all other platforms this delegates to `sync_data` (`fdatasync`).
345fn durable_sync(file: &File) -> io::Result<()> {
346 #[cfg(target_os = "macos")]
347 {
348 let ret = unsafe { libc::fcntl(file.as_raw_fd(), libc::F_FULLFSYNC) };
349 if ret != -1 {
350 return Ok(());
351 }
352 // Device does not support F_FULLFSYNC; fall back to fdatasync.
353 }
354 file.sync_data()
355}
356
357/// Acquire an exclusive, non-blocking advisory flock on `file`.
358///
359/// Returns `Err(WouldBlock)` if another process already holds the lock.
360#[cfg(unix)]
361fn flock_exclusive(file: &File) -> io::Result<()> {
362 let ret = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX | libc::LOCK_NB) };
363 if ret == 0 {
364 Ok(())
365 } else {
366 Err(io::Error::last_os_error())
367 }
368}
369
370/// Acquire an exclusive, non-blocking `LockFileEx` lock on `file`.
371///
372/// Locks the entire file range (offset 0, length `u64::MAX`).
373/// Returns `Err(WouldBlock)` if another process already holds the lock
374/// (`ERROR_LOCK_VIOLATION` maps to `io::ErrorKind::WouldBlock` in Rust).
375#[cfg(windows)]
376fn lock_file_exclusive(file: &File) -> io::Result<()> {
377 let handle = file.as_raw_handle() as windows_sys::Win32::Foundation::HANDLE;
378 // OVERLAPPED is required by LockFileEx even for synchronous handles.
379 // Offset fields (0, 0) anchor the lock at byte 0 of the file.
380 let mut overlapped: OVERLAPPED = unsafe { std::mem::zeroed() };
381 let ret = unsafe {
382 LockFileEx(
383 handle,
384 LOCKFILE_EXCLUSIVE_LOCK | LOCKFILE_FAIL_IMMEDIATELY,
385 0, // reserved, must be zero
386 u32::MAX, // nNumberOfBytesToLockLow ─┐ lock entire
387 u32::MAX, // nNumberOfBytesToLockHigh ─┘ file space
388 &mut overlapped,
389 )
390 };
391 if ret != 0 {
392 Ok(())
393 } else {
394 Err(io::Error::last_os_error())
395 }
396}
397
398/// Write the 16-byte header into a brand-new (empty) file.
399fn init_header(file: &mut File) -> io::Result<()> {
400 file.seek(SeekFrom::Start(0))?;
401 file.write_all(&MAGIC)?;
402 file.write_all(&0u64.to_le_bytes())
403}
404
405/// Overwrite the committed-length field at file offset 8.
406fn write_committed_len(file: &mut File, len: u64) -> io::Result<()> {
407 file.seek(SeekFrom::Start(8))?;
408 file.write_all(&len.to_le_bytes())
409}
410
411/// Read `len` bytes from absolute file position `offset` without modifying
412/// the file-position cursor, so the caller only needs a shared (read) lock.
413///
414/// On Unix this uses `pread(2)` via `read_exact_at`.
415/// On Windows this uses `ReadFile` with an `OVERLAPPED` offset (via
416/// `seek_read`), which is also cursor-safe on synchronous handles.
417#[cfg(unix)]
418fn pread_exact(file: &File, offset: u64, len: usize) -> io::Result<Vec<u8>> {
419 let mut buf = vec![0u8; len];
420 file.read_exact_at(&mut buf, offset)?;
421 Ok(buf)
422}
423
424/// Windows counterpart of `pread_exact` — see the shared doc comment above.
425#[cfg(windows)]
426fn pread_exact(file: &File, offset: u64, len: usize) -> io::Result<Vec<u8>> {
427 let mut buf = vec![0u8; len];
428 let mut filled = 0usize;
429 while filled < len {
430 let n = file.seek_read(&mut buf[filled..], offset + filled as u64)?;
431 if n == 0 {
432 return Err(io::Error::new(
433 io::ErrorKind::UnexpectedEof,
434 "pread_exact: unexpected EOF",
435 ));
436 }
437 filled += n;
438 }
439 Ok(buf)
440}
441
442/// Fill `buf` from absolute file position `offset` without modifying the
443/// file-position cursor. Unix uses `pread(2)` via `read_exact_at`;
444/// Windows uses `ReadFile` with an `OVERLAPPED` offset via `seek_read`.
445#[cfg(unix)]
446fn pread_exact_into(file: &File, offset: u64, buf: &mut [u8]) -> io::Result<()> {
447 file.read_exact_at(buf, offset)
448}
449
450/// Windows counterpart of `pread_exact_into`.
451#[cfg(windows)]
452fn pread_exact_into(file: &File, offset: u64, buf: &mut [u8]) -> io::Result<()> {
453 let len = buf.len();
454 let mut filled = 0usize;
455 while filled < len {
456 let n = file.seek_read(&mut buf[filled..], offset + filled as u64)?;
457 if n == 0 {
458 return Err(io::Error::new(
459 io::ErrorKind::UnexpectedEof,
460 "pread_exact_into: unexpected EOF",
461 ));
462 }
463 filled += n;
464 }
465 Ok(())
466}
467
468/// Read and validate the header; return the committed payload length.
469fn read_header(file: &mut File) -> io::Result<u64> {
470 file.seek(SeekFrom::Start(0))?;
471 let mut hdr = [0u8; 16];
472 file.read_exact(&mut hdr)?;
473 if hdr[0..6] != MAGIC_PREFIX {
474 return Err(io::Error::new(
475 io::ErrorKind::InvalidData,
476 "bstack: bad magic number — not a bstack file or incompatible version",
477 ));
478 }
479 Ok(u64::from_le_bytes(hdr[8..16].try_into().unwrap()))
480}
481
482// ---------------------------------------------------------------------------
483
484/// A persistent, fsync-durable binary stack backed by a single file.
485///
486/// See the [crate-level documentation](crate) for the file format, durability
487/// guarantees, crash recovery, multi-process safety, and thread-safety model.
488pub struct BStack {
489 lock: RwLock<File>,
490}
491
492impl BStack {
493 /// Open or create a stack file at `path`.
494 ///
495 /// On a **new** file the 16-byte header is written and durably synced
496 /// before returning.
497 ///
498 /// On an **existing** file the header is validated and, if a previous crash
499 /// left the file in an inconsistent state, the file is repaired and durably
500 /// synced before returning (see *Crash recovery* in the crate docs).
501 ///
502 /// On Unix an **exclusive advisory `flock`** is acquired; if another
503 /// process already holds the lock this function returns immediately with
504 /// [`io::ErrorKind::WouldBlock`].
505 ///
506 /// # Errors
507 ///
508 /// * [`io::ErrorKind::WouldBlock`] — another process holds the exclusive
509 /// lock (Unix only).
510 /// * [`io::ErrorKind::InvalidData`] — the file exists but its header magic
511 /// is wrong (not a bstack file, or created by an incompatible version),
512 /// or the file is too short to contain a valid header.
513 /// * Any [`io::Error`] from [`OpenOptions::open`], `read`, `write`, or
514 /// `durable_sync`.
515 pub fn open(path: impl AsRef<Path>) -> io::Result<Self> {
516 let mut file = OpenOptions::new()
517 .read(true)
518 .write(true)
519 .create(true)
520 .truncate(false)
521 .open(path)?;
522
523 #[cfg(unix)]
524 flock_exclusive(&file)?;
525
526 #[cfg(windows)]
527 lock_file_exclusive(&file)?;
528
529 let raw_size = file.metadata()?.len();
530
531 if raw_size == 0 {
532 init_header(&mut file)?;
533 durable_sync(&file)?;
534 } else if raw_size < HEADER_SIZE {
535 return Err(io::Error::new(
536 io::ErrorKind::InvalidData,
537 format!(
538 "bstack: file is {raw_size} bytes — too small to contain the 16-byte header"
539 ),
540 ));
541 } else {
542 let committed_len = read_header(&mut file)?;
543 let actual_data_len = raw_size - HEADER_SIZE;
544 if actual_data_len != committed_len {
545 // Recover: use whichever length is smaller (the committed
546 // value is the last successfully synced boundary).
547 let correct_len = committed_len.min(actual_data_len);
548 file.set_len(HEADER_SIZE + correct_len)?;
549 write_committed_len(&mut file, correct_len)?;
550 durable_sync(&file)?;
551 }
552 }
553
554 Ok(BStack {
555 lock: RwLock::new(file),
556 })
557 }
558
559 /// Append `data` to the end of the file.
560 ///
561 /// Returns the **logical** byte offset at which `data` begins — i.e. the
562 /// payload size immediately before the write. An empty slice is valid; it
563 /// writes nothing and returns the current end offset.
564 ///
565 /// # Atomicity
566 ///
567 /// Either the full payload is written, the header committed-length is
568 /// updated, and the whole thing is durably synced, or the file is
569 /// left unchanged (best-effort rollback via `ftruncate` + header reset).
570 ///
571 /// # Errors
572 ///
573 /// Returns any [`io::Error`] from `write_all`, `durable_sync`, or the
574 /// fallback `set_len`.
575 pub fn push(&self, data: &[u8]) -> io::Result<u64> {
576 let mut file = self.lock.write().unwrap();
577 let file_end = file.seek(SeekFrom::End(0))?;
578 let logical_offset = file_end - HEADER_SIZE;
579
580 if data.is_empty() {
581 return Ok(logical_offset);
582 }
583
584 if let Err(e) = file.write_all(data) {
585 let _ = file.set_len(file_end);
586 return Err(e);
587 }
588
589 let new_len = logical_offset + data.len() as u64;
590 if let Err(e) = write_committed_len(&mut file, new_len).and_then(|_| durable_sync(&file)) {
591 // Roll back: truncate data and reset header.
592 let _ = file.set_len(file_end);
593 let _ = write_committed_len(&mut file, logical_offset);
594 return Err(e);
595 }
596
597 Ok(logical_offset)
598 }
599
600 /// Append `n` zero bytes to the end of the file.
601 ///
602 /// Returns the **logical** byte offset at which the zeros begin — i.e. the
603 /// payload size immediately before the write. `n = 0` is valid; it writes
604 /// nothing and returns the current end offset.
605 ///
606 /// # Atomicity
607 ///
608 /// Either the file is extended, the header committed-length is updated,
609 /// and the whole thing is durably synced, or the file is left unchanged
610 /// (best-effort rollback via `ftruncate` + header reset).
611 ///
612 /// # Errors
613 ///
614 /// Returns any [`io::Error`] from `set_len`, `durable_sync`, or the
615 /// fallback `set_len`.
616 pub fn extend(&self, n: u64) -> io::Result<u64> {
617 let mut file = self.lock.write().unwrap();
618 let file_end = file.seek(SeekFrom::End(0))?;
619 let logical_offset = file_end - HEADER_SIZE;
620
621 if n == 0 {
622 return Ok(logical_offset);
623 }
624
625 let new_file_end = file_end + n;
626 file.set_len(new_file_end)?;
627
628 let new_len = logical_offset + n;
629 if let Err(e) = write_committed_len(&mut file, new_len).and_then(|_| durable_sync(&file)) {
630 // Roll back: truncate and reset header.
631 let _ = file.set_len(file_end);
632 let _ = write_committed_len(&mut file, logical_offset);
633 return Err(e);
634 }
635
636 Ok(logical_offset)
637 }
638
639 /// Remove and return the last `n` bytes of the file.
640 ///
641 /// `n = 0` is valid: no bytes are removed and an empty `Vec` is returned.
642 /// `n` may span across multiple previous [`push`](Self::push) boundaries.
643 ///
644 /// # Atomicity
645 ///
646 /// The bytes are read before the file is truncated. The committed-length
647 /// in the header is updated and durably synced after the truncation.
648 ///
649 /// # Errors
650 ///
651 /// Returns [`io::ErrorKind::InvalidInput`] if `n` exceeds the current
652 /// payload size. Also propagates any I/O error from `read_exact`,
653 /// `set_len`, `write_all`, or `durable_sync`.
654 pub fn pop(&self, n: u64) -> io::Result<Vec<u8>> {
655 let mut file = self.lock.write().unwrap();
656 let raw_size = file.seek(SeekFrom::End(0))?;
657 let data_size = raw_size - HEADER_SIZE;
658 if n > data_size {
659 return Err(io::Error::new(
660 io::ErrorKind::InvalidInput,
661 format!("pop({n}) exceeds payload size ({data_size})"),
662 ));
663 }
664 let new_data_len = data_size - n;
665 file.seek(SeekFrom::Start(HEADER_SIZE + new_data_len))?;
666 let mut buf = vec![0u8; n as usize];
667 file.read_exact(&mut buf)?;
668 file.set_len(HEADER_SIZE + new_data_len)?;
669 write_committed_len(&mut file, new_data_len)?;
670 durable_sync(&file)?;
671 Ok(buf)
672 }
673
674 /// Return a copy of every payload byte from `offset` to the end of the
675 /// file.
676 ///
677 /// `offset` is a **logical** offset (as returned by [`push`](Self::push)).
678 /// `offset == len()` is valid and returns an empty `Vec`. The file is not
679 /// modified.
680 ///
681 /// # Concurrency
682 ///
683 /// On Unix and Windows this uses a cursor-safe positional read (`pread(2)`
684 /// on Unix; `ReadFile`+`OVERLAPPED` on Windows), so the method takes only
685 /// the **read lock**, allowing multiple concurrent `peek` and `get` calls
686 /// to run in parallel.
687 ///
688 /// On other platforms a seek is required; the method falls back to the
689 /// write lock and concurrent reads serialise.
690 ///
691 /// # Errors
692 ///
693 /// Returns [`io::ErrorKind::InvalidInput`] if `offset` exceeds the current
694 /// payload size.
695 pub fn peek(&self, offset: u64) -> io::Result<Vec<u8>> {
696 #[cfg(any(unix, windows))]
697 {
698 let file = self.lock.read().unwrap();
699 let data_size = file.metadata()?.len().saturating_sub(HEADER_SIZE);
700 if offset > data_size {
701 return Err(io::Error::new(
702 io::ErrorKind::InvalidInput,
703 format!("peek offset ({offset}) exceeds payload size ({data_size})"),
704 ));
705 }
706 pread_exact(&file, HEADER_SIZE + offset, (data_size - offset) as usize)
707 }
708 #[cfg(not(any(unix, windows)))]
709 {
710 let mut file = self.lock.write().unwrap();
711 let raw_size = file.seek(SeekFrom::End(0))?;
712 let data_size = raw_size.saturating_sub(HEADER_SIZE);
713 if offset > data_size {
714 return Err(io::Error::new(
715 io::ErrorKind::InvalidInput,
716 format!("peek offset ({offset}) exceeds payload size ({data_size})"),
717 ));
718 }
719 file.seek(SeekFrom::Start(HEADER_SIZE + offset))?;
720 let mut buf = vec![0u8; (data_size - offset) as usize];
721 file.read_exact(&mut buf)?;
722 Ok(buf)
723 }
724 }
725
726 /// Return a copy of the bytes in the half-open logical range `[start, end)`.
727 ///
728 /// `start == end` is valid and returns an empty `Vec`. The file is not
729 /// modified.
730 ///
731 /// # Concurrency
732 ///
733 /// Same as [`peek`](Self::peek): on Unix and Windows the read lock is
734 /// taken and concurrent `get`/`peek`/`len` calls may run in parallel. On
735 /// other platforms the write lock is taken and reads serialise.
736 ///
737 /// # Errors
738 ///
739 /// Returns [`io::ErrorKind::InvalidInput`] if `end < start` or if `end`
740 /// exceeds the current payload size.
741 pub fn get(&self, start: u64, end: u64) -> io::Result<Vec<u8>> {
742 if end < start {
743 return Err(io::Error::new(
744 io::ErrorKind::InvalidInput,
745 format!("get: end ({end}) < start ({start})"),
746 ));
747 }
748 #[cfg(any(unix, windows))]
749 {
750 let file = self.lock.read().unwrap();
751 let data_size = file.metadata()?.len().saturating_sub(HEADER_SIZE);
752 if end > data_size {
753 return Err(io::Error::new(
754 io::ErrorKind::InvalidInput,
755 format!("get: end ({end}) exceeds payload size ({data_size})"),
756 ));
757 }
758 pread_exact(&file, HEADER_SIZE + start, (end - start) as usize)
759 }
760 #[cfg(not(any(unix, windows)))]
761 {
762 let mut file = self.lock.write().unwrap();
763 let raw_size = file.seek(SeekFrom::End(0))?;
764 let data_size = raw_size.saturating_sub(HEADER_SIZE);
765 if end > data_size {
766 return Err(io::Error::new(
767 io::ErrorKind::InvalidInput,
768 format!("get: end ({end}) exceeds payload size ({data_size})"),
769 ));
770 }
771 file.seek(SeekFrom::Start(HEADER_SIZE + start))?;
772 let mut buf = vec![0u8; (end - start) as usize];
773 file.read_exact(&mut buf)?;
774 Ok(buf)
775 }
776 }
777
778 /// Fill `buf` with bytes from logical `offset` to `offset + buf.len()`.
779 ///
780 /// Reads exactly `buf.len()` bytes from `offset` into the caller-supplied
781 /// buffer. An empty buffer is a valid no-op. The file is not modified.
782 ///
783 /// Use this instead of [`peek`](Self::peek) when the destination buffer is
784 /// already allocated and you want to avoid the extra heap allocation.
785 ///
786 /// # Concurrency
787 ///
788 /// Same as [`peek`](Self::peek): on Unix and Windows only the read lock is
789 /// taken; on other platforms the write lock serialises all reads.
790 ///
791 /// # Errors
792 ///
793 /// Returns [`io::ErrorKind::InvalidInput`] if `offset + buf.len()` overflows
794 /// `u64` or exceeds the current payload size.
795 pub fn peek_into(&self, offset: u64, buf: &mut [u8]) -> io::Result<()> {
796 if buf.is_empty() {
797 return Ok(());
798 }
799 let len = buf.len() as u64;
800 let end = offset.checked_add(len).ok_or_else(|| {
801 io::Error::new(
802 io::ErrorKind::InvalidInput,
803 "peek_into: offset + len overflows u64",
804 )
805 })?;
806 #[cfg(any(unix, windows))]
807 {
808 let file = self.lock.read().unwrap();
809 let data_size = file.metadata()?.len().saturating_sub(HEADER_SIZE);
810 if end > data_size {
811 return Err(io::Error::new(
812 io::ErrorKind::InvalidInput,
813 format!(
814 "peek_into: range [{offset}, {end}) exceeds payload size ({data_size})"
815 ),
816 ));
817 }
818 pread_exact_into(&file, HEADER_SIZE + offset, buf)
819 }
820 #[cfg(not(any(unix, windows)))]
821 {
822 let mut file = self.lock.write().unwrap();
823 let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE);
824 if end > data_size {
825 return Err(io::Error::new(
826 io::ErrorKind::InvalidInput,
827 format!(
828 "peek_into: range [{offset}, {end}) exceeds payload size ({data_size})"
829 ),
830 ));
831 }
832 file.seek(SeekFrom::Start(HEADER_SIZE + offset))?;
833 file.read_exact(buf)
834 }
835 }
836
837 /// Fill `buf` with bytes from the half-open logical range
838 /// `[start, start + buf.len())`.
839 ///
840 /// An empty buffer is a valid no-op. The file is not modified.
841 ///
842 /// Use this instead of [`get`](Self::get) when the destination buffer is
843 /// already allocated and you want to avoid the extra heap allocation.
844 ///
845 /// # Concurrency
846 ///
847 /// Same as [`get`](Self::get): on Unix and Windows only the read lock is
848 /// taken; on other platforms the write lock serialises all reads.
849 ///
850 /// # Errors
851 ///
852 /// Returns [`io::ErrorKind::InvalidInput`] if `start + buf.len()` overflows
853 /// `u64` or exceeds the current payload size.
854 pub fn get_into(&self, start: u64, buf: &mut [u8]) -> io::Result<()> {
855 if buf.is_empty() {
856 return Ok(());
857 }
858 let len = buf.len() as u64;
859 let end = start.checked_add(len).ok_or_else(|| {
860 io::Error::new(
861 io::ErrorKind::InvalidInput,
862 "get_into: start + len overflows u64",
863 )
864 })?;
865 #[cfg(any(unix, windows))]
866 {
867 let file = self.lock.read().unwrap();
868 let data_size = file.metadata()?.len().saturating_sub(HEADER_SIZE);
869 if end > data_size {
870 return Err(io::Error::new(
871 io::ErrorKind::InvalidInput,
872 format!("get_into: end ({end}) exceeds payload size ({data_size})"),
873 ));
874 }
875 pread_exact_into(&file, HEADER_SIZE + start, buf)
876 }
877 #[cfg(not(any(unix, windows)))]
878 {
879 let mut file = self.lock.write().unwrap();
880 let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE);
881 if end > data_size {
882 return Err(io::Error::new(
883 io::ErrorKind::InvalidInput,
884 format!("get_into: end ({end}) exceeds payload size ({data_size})"),
885 ));
886 }
887 file.seek(SeekFrom::Start(HEADER_SIZE + start))?;
888 file.read_exact(buf)
889 }
890 }
891
892 /// Remove the last `buf.len()` bytes from the file and write them into `buf`.
893 ///
894 /// An empty buffer is a valid no-op: no bytes are removed.
895 ///
896 /// Use this instead of [`pop`](Self::pop) when the destination buffer is
897 /// already allocated and you want to avoid the extra heap allocation.
898 ///
899 /// # Atomicity
900 ///
901 /// Same guarantees as [`pop`](Self::pop).
902 ///
903 /// # Errors
904 ///
905 /// Returns [`io::ErrorKind::InvalidInput`] if `buf.len()` exceeds the
906 /// current payload size. Also propagates any I/O error from `read_exact`,
907 /// `set_len`, `write_all`, or `durable_sync`.
908 pub fn pop_into(&self, buf: &mut [u8]) -> io::Result<()> {
909 if buf.is_empty() {
910 return Ok(());
911 }
912 let n = buf.len() as u64;
913 let mut file = self.lock.write().unwrap();
914 let raw_size = file.seek(SeekFrom::End(0))?;
915 let data_size = raw_size - HEADER_SIZE;
916 if n > data_size {
917 return Err(io::Error::new(
918 io::ErrorKind::InvalidInput,
919 format!("pop_into({n}) exceeds payload size ({data_size})"),
920 ));
921 }
922 let new_data_len = data_size - n;
923 file.seek(SeekFrom::Start(HEADER_SIZE + new_data_len))?;
924 file.read_exact(buf)?;
925 file.set_len(HEADER_SIZE + new_data_len)?;
926 write_committed_len(&mut file, new_data_len)?;
927 durable_sync(&file)?;
928 Ok(())
929 }
930
931 /// Remove (discard) the last `n` bytes from the file without returning them.
932 ///
933 /// Equivalent to [`pop`](Self::pop) but avoids allocating a buffer for the
934 /// removed bytes. `n = 0` is valid and is a no-op.
935 ///
936 /// # Atomicity
937 ///
938 /// Same guarantees as [`pop`](Self::pop).
939 ///
940 /// # Errors
941 ///
942 /// Returns [`io::ErrorKind::InvalidInput`] if `n` exceeds the current
943 /// payload size. Also propagates any I/O error from `set_len`,
944 /// `write_all`, or `durable_sync`.
945 pub fn discard(&self, n: u64) -> io::Result<()> {
946 if n == 0 {
947 return Ok(());
948 }
949 let mut file = self.lock.write().unwrap();
950 let raw_size = file.seek(SeekFrom::End(0))?;
951 let data_size = raw_size - HEADER_SIZE;
952 if n > data_size {
953 return Err(io::Error::new(
954 io::ErrorKind::InvalidInput,
955 format!("discard({n}) exceeds payload size ({data_size})"),
956 ));
957 }
958 let new_data_len = data_size - n;
959 file.set_len(HEADER_SIZE + new_data_len)?;
960 write_committed_len(&mut file, new_data_len)?;
961 durable_sync(&file)?;
962 Ok(())
963 }
964
965 /// Overwrite `data` bytes in place starting at logical `offset`.
966 ///
967 /// The file size is never changed: if `offset + data.len()` would exceed
968 /// the current payload size the call is rejected. An empty slice is a
969 /// valid no-op.
970 ///
971 /// # Feature flag
972 ///
973 /// Only available when the `set` Cargo feature is enabled.
974 ///
975 /// # Durability
976 ///
977 /// Equivalent to `push`/`pop`: the overwritten bytes are durably synced
978 /// before the call returns.
979 ///
980 /// # Errors
981 ///
982 /// Returns [`io::ErrorKind::InvalidInput`] if `offset + data.len()`
983 /// exceeds the current payload size, or if the addition overflows `u64`.
984 /// Propagates any I/O error from `write_all` or `durable_sync`.
985 #[cfg(feature = "set")]
986 pub fn set(&self, offset: u64, data: &[u8]) -> io::Result<()> {
987 if data.is_empty() {
988 return Ok(());
989 }
990 let end = offset.checked_add(data.len() as u64).ok_or_else(|| {
991 io::Error::new(
992 io::ErrorKind::InvalidInput,
993 "set: offset + len overflows u64",
994 )
995 })?;
996 let mut file = self.lock.write().unwrap();
997 let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE);
998 if end > data_size {
999 return Err(io::Error::new(
1000 io::ErrorKind::InvalidInput,
1001 format!("set: write end ({end}) exceeds payload size ({data_size})"),
1002 ));
1003 }
1004 file.seek(SeekFrom::Start(HEADER_SIZE + offset))?;
1005 file.write_all(data)?;
1006 durable_sync(&file)
1007 }
1008
1009 /// Overwrite `n` bytes with zeros in place starting at logical `offset`.
1010 ///
1011 /// The file size is never changed: if `offset + n` would exceed
1012 /// the current payload size the call is rejected. `n = 0` is a
1013 /// valid no-op.
1014 ///
1015 /// # Feature flag
1016 ///
1017 /// Only available when the `set` Cargo feature is enabled.
1018 ///
1019 /// # Durability
1020 ///
1021 /// Equivalent to `push`/`pop`: the overwritten bytes are durably synced
1022 /// before the call returns.
1023 ///
1024 /// # Errors
1025 ///
1026 /// Returns [`io::ErrorKind::InvalidInput`] if `offset + n`
1027 /// exceeds the current payload size, or if the addition overflows `u64`.
1028 /// Propagates any I/O error from `write_all` or `durable_sync`.
1029 #[cfg(feature = "set")]
1030 pub fn zero(&self, offset: u64, n: u64) -> io::Result<()> {
1031 if n == 0 {
1032 return Ok(());
1033 }
1034 let end = offset.checked_add(n).ok_or_else(|| {
1035 io::Error::new(
1036 io::ErrorKind::InvalidInput,
1037 "zero: offset + n overflows u64",
1038 )
1039 })?;
1040 let mut file = self.lock.write().unwrap();
1041 let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE);
1042 if end > data_size {
1043 return Err(io::Error::new(
1044 io::ErrorKind::InvalidInput,
1045 format!("zero: write end ({end}) exceeds payload size ({data_size})"),
1046 ));
1047 }
1048 file.seek(SeekFrom::Start(HEADER_SIZE + offset))?;
1049 let zeros = vec![0u8; n as usize];
1050 file.write_all(&zeros)?;
1051 durable_sync(&file)
1052 }
1053
1054 /// Return the current **logical** payload size in bytes (excludes the
1055 /// 16-byte header).
1056 ///
1057 /// Takes the read lock, so it can run concurrently with other `len` calls
1058 /// but blocks while any write-lock operation is in progress. The returned
1059 /// value always reflects a clean operation boundary.
1060 ///
1061 /// # Errors
1062 ///
1063 /// Propagates any [`io::Error`] from [`File::metadata`].
1064 pub fn len(&self) -> io::Result<u64> {
1065 let file = self.lock.read().unwrap();
1066 Ok(file.metadata()?.len().saturating_sub(HEADER_SIZE))
1067 }
1068
1069 /// Return `true` if the stack contains no payload bytes.
1070 ///
1071 /// # Errors
1072 ///
1073 /// Propagates any [`io::Error`] from [`File::metadata`].
1074 pub fn is_empty(&self) -> io::Result<bool> {
1075 Ok(self.len()? == 0)
1076 }
1077}
1078
1079// ---------------------------------------------------------------------------
1080// io::Write
1081
1082/// Appends bytes to the stack.
1083///
1084/// Each call to [`write`](io::Write::write) is equivalent to [`push`](BStack::push):
1085/// all bytes are written atomically and durably synced before returning.
1086/// Calling `write_all` or chaining multiple `write` calls therefore issues
1087/// one `durable_sync` per call — callers that need to batch many small writes
1088/// without per-write syncs should accumulate data and call `push` directly.
1089///
1090/// [`flush`](io::Write::flush) is a no-op because every `write` is already
1091/// durable.
1092impl io::Write for BStack {
1093 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1094 self.push(buf)?;
1095 Ok(buf.len())
1096 }
1097
1098 fn flush(&mut self) -> io::Result<()> {
1099 Ok(())
1100 }
1101}
1102
1103/// Shared-reference counterpart of `impl Write for BStack`.
1104///
1105/// Because [`push`](BStack::push) takes `&self` (interior mutability via
1106/// `RwLock`), the `Write` implementation is also available on `&BStack`,
1107/// mirroring the standard library's `impl Write for &File`.
1108impl io::Write for &BStack {
1109 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1110 self.push(buf)?;
1111 Ok(buf.len())
1112 }
1113
1114 fn flush(&mut self) -> io::Result<()> {
1115 Ok(())
1116 }
1117}
1118
1119impl fmt::Debug for BStack {
1120 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1121 f.debug_struct("BStack")
1122 .field(
1123 "version",
1124 &format!("{}.{}.{}", MAGIC[4], MAGIC[5], MAGIC[6]),
1125 )
1126 .field("len", &self.len().ok())
1127 .finish_non_exhaustive()
1128 }
1129}
1130
1131impl Eq for BStack {}
1132
1133/// Two `BStack` instances are equal iff they are the **same instance** in memory.
1134///
1135/// Because [`BStack::open`] acquires an exclusive advisory lock, no two
1136/// `BStack` values within one process can refer to the same file at the same
1137/// time. Pointer identity is therefore the only meaningful equality: a stack
1138/// is equal to itself and to nothing else.
1139impl PartialEq for BStack {
1140 fn eq(&self, other: &Self) -> bool {
1141 std::ptr::eq(self, other)
1142 }
1143}
1144
1145/// Hashes the instance address, consistent with the pointer-identity [`PartialEq`].
1146impl Hash for BStack {
1147 fn hash<H: Hasher>(&self, state: &mut H) {
1148 (self as *const BStack).hash(state);
1149 }
1150}
1151
1152/// A cursor-based reader over a [`BStack`] payload.
1153///
1154/// `BStackReader` implements [`io::Read`] and [`io::Seek`], allowing the
1155/// stack's payload to be consumed through any interface that expects a
1156/// readable, seekable byte stream.
1157///
1158/// # Construction
1159///
1160/// ```no_run
1161/// use bstack::BStack;
1162///
1163/// # fn main() -> std::io::Result<()> {
1164/// let stack = BStack::open("log.bin")?;
1165/// stack.push(b"hello world")?;
1166///
1167/// // Start reading from the beginning.
1168/// let mut reader = stack.reader();
1169///
1170/// // Or start from an arbitrary offset.
1171/// let mut mid = stack.reader_at(6);
1172/// # Ok(())
1173/// # }
1174/// ```
1175///
1176/// # Concurrency
1177///
1178/// `BStackReader` borrows the stack immutably, so multiple readers can coexist
1179/// and run concurrently with each other and with [`peek`](BStack::peek) /
1180/// [`get`](BStack::get) calls. Concurrent [`push`](BStack::push) or
1181/// [`pop`](BStack::pop) operations are not blocked by an active reader, but
1182/// reading interleaved with writes may observe different snapshots of the
1183/// payload across calls — callers are responsible for synchronisation when
1184/// that matters.
1185pub struct BStackReader<'a> {
1186 stack: &'a BStack,
1187 offset: u64,
1188}
1189
1190impl BStack {
1191 /// Create a [`BStackReader`] positioned at the start of the payload.
1192 pub fn reader(&self) -> BStackReader<'_> {
1193 BStackReader {
1194 stack: self,
1195 offset: 0,
1196 }
1197 }
1198
1199 /// Create a [`BStackReader`] positioned at `offset` bytes into the payload.
1200 ///
1201 /// Seeking past the current end is allowed; [`read`](io::Read::read) will
1202 /// return `Ok(0)` until new data is pushed past that point.
1203 pub fn reader_at(&self, offset: u64) -> BStackReader<'_> {
1204 BStackReader {
1205 stack: self,
1206 offset,
1207 }
1208 }
1209}
1210
1211impl<'a> BStackReader<'a> {
1212 /// Return the current logical read offset within the payload.
1213 pub fn position(&self) -> u64 {
1214 self.offset
1215 }
1216}
1217
1218impl<'a> From<&'a BStack> for BStackReader<'a> {
1219 fn from(stack: &'a BStack) -> Self {
1220 stack.reader()
1221 }
1222}
1223
1224impl<'a> From<BStackReader<'a>> for &'a BStack {
1225 fn from(val: BStackReader<'a>) -> Self {
1226 val.stack
1227 }
1228}
1229
1230/// Two readers are equal when they point to the **same `BStack` instance**
1231/// (pointer identity) and share the same cursor `offset`.
1232impl<'a> PartialEq for BStackReader<'a> {
1233 fn eq(&self, other: &Self) -> bool {
1234 self.stack == other.stack && self.offset == other.offset
1235 }
1236}
1237
1238impl<'a> Eq for BStackReader<'a> {}
1239
1240/// Hashes `(BStack pointer, offset)`, consistent with [`PartialEq`].
1241impl<'a> Hash for BStackReader<'a> {
1242 fn hash<H: Hasher>(&self, state: &mut H) {
1243 self.stack.hash(state);
1244 self.offset.hash(state);
1245 }
1246}
1247
1248impl<'a> PartialOrd for BStackReader<'a> {
1249 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1250 Some(self.cmp(other))
1251 }
1252}
1253
1254/// Ordered by `BStack` instance address, then by cursor `offset`.
1255///
1256/// The address component groups all readers over the same stack together,
1257/// and within that group the natural read order (smaller offset first) applies.
1258/// This ordering is consistent with the pointer-identity [`PartialEq`].
1259impl<'a> Ord for BStackReader<'a> {
1260 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1261 let self_ptr = self.stack as *const BStack as usize;
1262 let other_ptr = other.stack as *const BStack as usize;
1263 self_ptr
1264 .cmp(&other_ptr)
1265 .then(self.offset.cmp(&other.offset))
1266 }
1267}
1268
1269impl<'a> io::Read for BStackReader<'a> {
1270 /// Read bytes from the current position into `buf`.
1271 ///
1272 /// Returns the number of bytes read, which may be less than `buf.len()` if
1273 /// the end of the payload is reached. Returns `Ok(0)` at EOF.
1274 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1275 if buf.is_empty() {
1276 return Ok(0);
1277 }
1278 let data_size = self.stack.len()?;
1279 if self.offset >= data_size {
1280 return Ok(0);
1281 }
1282 let available = (data_size - self.offset) as usize;
1283 let n = buf.len().min(available);
1284 self.stack.get_into(self.offset, &mut buf[..n])?;
1285 self.offset += n as u64;
1286 Ok(n)
1287 }
1288}
1289
1290impl<'a> io::Seek for BStackReader<'a> {
1291 /// Move the read cursor.
1292 ///
1293 /// [`SeekFrom::Start`] and [`SeekFrom::Current`] with a non-negative delta
1294 /// may advance the cursor past the current end of the payload; subsequent
1295 /// [`read`](io::Read::read) calls will return `Ok(0)` until the payload
1296 /// grows past that point. Seeking before the start of the payload returns
1297 /// [`io::ErrorKind::InvalidInput`].
1298 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
1299 let data_size = self.stack.len()? as i128;
1300 let new_offset = match pos {
1301 SeekFrom::Start(n) => n as i128,
1302 SeekFrom::End(n) => data_size + n as i128,
1303 SeekFrom::Current(n) => self.offset as i128 + n as i128,
1304 };
1305 if new_offset < 0 {
1306 return Err(io::Error::new(
1307 io::ErrorKind::InvalidInput,
1308 "seek before beginning of payload",
1309 ));
1310 }
1311 self.offset = new_offset as u64;
1312 Ok(self.offset)
1313 }
1314}