Skip to main content

bstack/
lib.rs

1//! A persistent, fsync-durable binary stack backed by a single file.
2//!
3//! # Overview
4//!
5//! [`BStack`] treats a file as a flat byte buffer that grows and shrinks from
6//! the tail.  Every mutating operation — [`push`](BStack::push),
7//! [`extend`](BStack::extend), [`pop`](BStack::pop), [`discard`](BStack::discard), and (with the `set`
8//! feature) [`set`](BStack::set) and [`zero`](BStack::zero) — calls a *durable sync* before returning,
9//! so the data survives a process crash or an unclean system shutdown.
10//! Read-only operations — [`peek`](BStack::peek),
11//! [`peek_into`](BStack::peek_into), [`get`](BStack::get), and
12//! [`get_into`](BStack::get_into) — never modify the file and on Unix and
13//! Windows can run concurrently with each other.
14//! [`pop_into`](BStack::pop_into) is the buffer-passing counterpart of `pop`,
15//! carrying the same durability and atomicity guarantees.
16//! [`discard`](BStack::discard) is like `pop` but discards the removed bytes
17//! without reading or returning them, avoiding any allocation or copy.
18//!
19//! The crate depends on **`libc`** (Unix) and **`windows-sys`** (Windows) for
20//! platform-specific syscalls, and uses **no `unsafe` code beyond the required
21//! FFI calls**.
22//!
23//! # File format
24//!
25//! Every file begins with a fixed 16-byte header:
26//!
27//! ```text
28//! ┌────────────────────────┬──────────────┬──────────────┐
29//! │      header (16 B)     │  payload 0   │  payload 1   │  ...
30//! │  magic[8] | clen[8 LE] │              │              │
31//! └────────────────────────┴──────────────┴──────────────┘
32//! ^                        ^              ^              ^
33//! file offset 0         offset 16      16+n0          EOF
34//! ```
35//!
36//! * **`magic`** — 8 bytes: `BSTK` + major(1 B) + minor(1 B) + patch(1 B) + reserved(1 B).
37//!   This version writes `BSTK\x00\x01\x05\x00` (0.1.5).  [`open`](BStack::open)
38//!   accepts any file whose first 6 bytes match `BSTK\x00\x01` (any 0.1.x) and
39//!   rejects anything with a different major or minor.
40//! * **`clen`** — little-endian `u64` recording the *committed* payload length.
41//!   It is updated atomically with each [`push`](BStack::push) or
42//!   [`pop`](BStack::pop) and is used for crash recovery on the next
43//!   [`open`](BStack::open).
44//!
45//! All user-visible offsets are **logical** (0-based from the start of the
46//! payload region, i.e. from file byte 16).
47//!
48//! # Crash recovery
49//!
50//! On [`open`](BStack::open), the header's committed length is compared against
51//! the actual file size:
52//!
53//! | Condition | Cause | Recovery |
54//! |-----------|-------|----------|
55//! | `file_size − 16 > clen` | partial tail write (push crashed before header update) | truncate to `16 + clen` |
56//! | `file_size − 16 < clen` | partial truncation (pop crashed before header update) | set `clen = file_size − 16` |
57//!
58//! After recovery a `durable_sync` ensures the repaired state is on stable
59//! storage before any caller can observe or modify the file.
60//!
61//! # Durability
62//!
63//! | Operation | Syscall sequence |
64//! |-----------|-----------------|
65//! | `push` | `lseek(END)` → `write(data)` → `lseek(8)` → `write(clen)` → `durable_sync` |
66//! | `extend` | `lseek(END)` → `set_len(new_end)` → `lseek(8)` → `write(clen)` → `durable_sync` |
67//! | `pop`, `pop_into` | `lseek` → `read` → `ftruncate` → `lseek(8)` → `write(clen)` → `durable_sync` |
68//! | `discard` | `ftruncate` → `lseek(8)` → `write(clen)` → `durable_sync` |
69//! | `set` *(feature)* | `lseek(offset)` → `write(data)` → `durable_sync` |
70//! | `zero` *(feature)* | `lseek(offset)` → `write(zeros)` → `durable_sync` |
71//! | `peek`, `peek_into`, `get`, `get_into` | `pread(2)` on Unix; `ReadFile`+`OVERLAPPED` on Windows; `lseek` → `read` elsewhere (no sync — read-only) |
72//!
73//! **`durable_sync` on macOS** issues `fcntl(F_FULLFSYNC)`, which flushes the
74//! drive's hardware write cache.  Plain `fdatasync` is not sufficient on macOS
75//! because the kernel may acknowledge it before the drive controller has
76//! committed the data.  If `F_FULLFSYNC` is not supported by the device the
77//! implementation falls back to `sync_data` (`fdatasync`).
78//!
79//! **`durable_sync` on other Unix** calls `sync_data` (`fdatasync`), which is
80//! sufficient on Linux and BSD.
81//!
82//! **`durable_sync` on Windows** calls `sync_data`, which maps to
83//! `FlushFileBuffers`.  This flushes the kernel write-back cache and waits for
84//! the drive to acknowledge, providing equivalent durability to `fdatasync`.
85//!
86//! # Multi-process safety
87//!
88//! On Unix, [`open`](BStack::open) acquires an **exclusive advisory `flock`**
89//! on the file (`LOCK_EX | LOCK_NB`).  If another process already holds the
90//! lock, `open` returns immediately with [`io::ErrorKind::WouldBlock`] rather
91//! than blocking indefinitely.  The lock is released automatically when the
92//! [`BStack`] is dropped (the underlying file descriptor is closed).
93//!
94//! On Windows, [`open`](BStack::open) acquires an **exclusive `LockFileEx`**
95//! lock (`LOCKFILE_EXCLUSIVE_LOCK | LOCKFILE_FAIL_IMMEDIATELY`) covering the
96//! entire file range.  If another process already holds the lock, `open`
97//! returns immediately with [`io::ErrorKind::WouldBlock`]
98//! (`ERROR_LOCK_VIOLATION`).  The lock is released when the [`BStack`] is
99//! dropped (the underlying file handle is closed).
100//!
101//! > **Note:** Both `flock` (Unix) and `LockFileEx` (Windows) are advisory
102//! > and per-process.  They prevent well-behaved concurrent opens across
103//! > processes but do not protect against processes that bypass the lock or
104//! > against raw writes to the file.
105//!
106//! # Correct usage
107//!
108//! bstack files must only be opened through this crate or a compatible
109//! implementation that understands the file format, the header protocol, and
110//! the locking semantics.  Reading or writing the underlying file with raw
111//! tools or syscalls while a [`BStack`] instance is live — or manually editing
112//! the header fields — can silently corrupt the committed-length sentinel or
113//! bypass the advisory lock.
114//!
115//! **The authors make no guarantees about the behaviour of this crate —
116//! including freedom from data loss or logical corruption — when the file has
117//! been accessed outside of this crate's controlled interface.**
118//!
119//! # Thread safety
120//!
121//! `BStack` wraps the file in a [`std::sync::RwLock`].
122//!
123//! | Operation | Lock (Unix / Windows) | Lock (other) |
124//! |-----------|-----------------------|--------------|
125//! | `push`, `extend`, `pop`, `pop_into`, `discard` | write | write |
126//! | `set`, `zero` *(feature)* | write | write |
127//! | `peek`, `peek_into`, `get`, `get_into` | **read** | write |
128//! | `len` | read | read |
129//!
130//! On Unix and Windows, `peek`, `peek_into`, `get`, and `get_into` use a
131//! cursor-safe positional read (`pread(2)` on Unix; `ReadFile` with
132//! `OVERLAPPED` on Windows) that does not modify the file-position cursor.
133//! This allows multiple concurrent calls to any of these methods to run in
134//! parallel while any ongoing `push`, `pop`, or `pop_into` still serialises
135//! all writers via the write lock.
136//!
137//! On other platforms a seek is required, so `peek`, `peek_into`, `get`, and
138//! `get_into` fall back to the write lock and all reads serialise.
139//!
140//! # Standard I/O adapters
141//!
142//! ## Writing
143//!
144//! `BStack` implements [`std::io::Write`] (and so does `&BStack`, mirroring
145//! [`std::io::Write` for `&File`]).  Each call to `write` is forwarded to
146//! [`push`](BStack::push), so every write is atomically appended and durably
147//! synced before returning.  `flush` is a no-op.
148//!
149//! ```no_run
150//! use std::io::Write;
151//! use bstack::BStack;
152//!
153//! # fn main() -> std::io::Result<()> {
154//! let mut stack = BStack::open("log.bin")?;
155//! stack.write_all(b"hello")?;
156//! stack.write_all(b"world")?;
157//! # Ok(())
158//! # }
159//! ```
160//!
161//! ## Reading
162//!
163//! [`BStackReader`] wraps a `&BStack` with a cursor and implements
164//! [`std::io::Read`] and [`std::io::Seek`].  Use [`BStack::reader`] or
165//! [`BStack::reader_at`] to construct one.
166//!
167//! ```no_run
168//! use std::io::{Read, Seek, SeekFrom};
169//! use bstack::BStack;
170//!
171//! # fn main() -> std::io::Result<()> {
172//! let stack = BStack::open("log.bin")?;
173//! stack.push(b"hello world")?;
174//!
175//! let mut reader = stack.reader();
176//! let mut buf = [0u8; 5];
177//! reader.read_exact(&mut buf)?;  // b"hello"
178//! reader.seek(SeekFrom::Start(6))?;
179//! reader.read_exact(&mut buf)?;  // b"world"
180//! # Ok(())
181//! # }
182//! ```
183//!
184//! # Trait implementations
185//!
186//! ## `BStack`
187//!
188//! | Trait | Semantics |
189//! |-------|-----------|
190//! | `Debug` | Shows `version` (semver string from the magic header, e.g. `"0.1.5"`) and `len` (`Option<u64>`, `None` on I/O failure). |
191//! | `PartialEq` / `Eq` | **Pointer identity.** Two values are equal iff they are the same instance. No two distinct `BStack` values in one process can refer to the same file. |
192//! | `Hash` | Hashes the instance address — consistent with pointer-identity `PartialEq`. |
193//!
194//! ## `BStackReader`
195//!
196//! | Trait | Semantics |
197//! |-------|-----------|
198//! | `PartialEq` / `Eq` | Equal when both the `BStack` pointer (identity) and the cursor `offset` match. |
199//! | `Hash` | Hashes `(BStack pointer, offset)` — consistent with `PartialEq`. |
200//! | `PartialOrd` / `Ord` | Ordered by `BStack` instance address, then by cursor `offset`. Groups all readers over the same stack and within that group orders by position. |
201//!
202//! # Feature flags
203//!
204//! | Feature | Description |
205//! |---------|-------------|
206//! | `set`   | Enables [`BStack::set`] and [`BStack::zero`] — in-place overwrite of existing payload bytes (or with zeros) without changing the file size. |
207//! | `alloc` | Enables [`BStackAllocator`], [`BStackSlice`], [`BStackSliceReader`], and [`LinearBStackAllocator`] — region-based allocation over a `BStack` payload. |
208//!
209//! Enable with:
210//!
211//! ```toml
212//! [dependencies]
213//! bstack = { version = "0.1", features = ["set"] }
214//! # or
215//! bstack = { version = "0.1", features = ["alloc"] }
216//! # or both
217//! bstack = { version = "0.1", features = ["alloc", "set"] }
218//! ```
219//!
220//! # Allocator (`alloc` feature)
221//!
222//! The `alloc` feature adds a region-management layer on top of [`BStack`].
223//!
224//! ## Key types
225//!
226//! * [`BStackAllocator`] — trait for types that own a [`BStack`] and manage
227//!   contiguous byte regions within its payload.  Requires `stack()`,
228//!   `into_stack()`, `alloc()`, and `realloc()`; provides a default no-op
229//!   `dealloc()` and delegation helpers `len()` / `is_empty()`.
230//!
231//! * [`BStackSlice`]`<'a, A>` — lightweight `Copy` handle (allocator reference +
232//!   offset + length) to a contiguous region.  Exposes `read`, `read_into`,
233//!   `read_range_into`, `subslice`, `subslice_range`, `reader`, `reader_at`,
234//!   and (with the `set` feature) `write`, `write_range`, `zero`, `zero_range`.
235//!
236//! * [`BStackSliceReader`]`<'a, A>` — cursor-based reader over a
237//!   [`BStackSlice`], implementing [`io::Read`] and [`io::Seek`] in the
238//!   slice's coordinate space.
239//!
240//! * [`LinearBStackAllocator`] — reference bump allocator that appends regions
241//!   sequentially.  `realloc` is O(1) for the tail allocation and returns
242//!   `Unsupported` for non-tail slices.  `dealloc` reclaims the tail via
243//!   [`BStack::discard`]; non-tail deallocations are a no-op.  Every operation
244//!   maps to exactly one [`BStack`] call and is crash-safe by inheritance.
245//!
246//! ## Lifetime model
247//!
248//! `BStackSlice<'a, A>` borrows the **allocator** for `'a`, not the
249//! [`BStack`] directly.  As a result the borrow checker statically prevents
250//! calling [`BStackAllocator::into_stack`] — which consumes the allocator by
251//! value — while any slice is still in scope.
252//!
253//! ## Quick example
254//!
255//! ```skip
256//! use bstack::{BStack, BStackAllocator, LinearBStackAllocator};
257//!
258//! # fn main() -> std::io::Result<()> {
259//! let alloc = LinearBStackAllocator::new(BStack::open("data.bstack")?);
260//!
261//! let slice = alloc.alloc(128)?;          // reserve 128 zero bytes
262//! let data  = slice.read()?;              // read them back
263//! alloc.dealloc(slice)?;                  // release (tail, so O(1))
264//!
265//! let stack = alloc.into_stack();         // reclaim the BStack
266//! # Ok(())
267//! # }
268//! ```
269//!
270//! # Examples
271//!
272//! ```no_run
273//! use bstack::BStack;
274//!
275//! # fn main() -> std::io::Result<()> {
276//! let stack = BStack::open("log.bin")?;
277//!
278//! // push returns the logical byte offset where the payload starts.
279//! let off0 = stack.push(b"hello")?;  // 0
280//! let off1 = stack.push(b"world")?;  // 5
281//!
282//! assert_eq!(stack.len()?, 10);
283//!
284//! // peek reads from a logical offset to the end without removing anything.
285//! assert_eq!(stack.peek(off1)?, b"world");
286//!
287//! // get reads an arbitrary half-open logical byte range.
288//! assert_eq!(stack.get(3, 8)?, b"lowor");
289//!
290//! // pop removes bytes from the tail and returns them.
291//! assert_eq!(stack.pop(5)?, b"world");
292//! assert_eq!(stack.len()?, 5);
293//! # Ok(())
294//! # }
295//! ```
296
297mod test;
298
299#[cfg(feature = "alloc")]
300mod alloc;
301#[cfg(feature = "alloc")]
302pub use alloc::{BStackAllocator, BStackSlice, BStackSliceReader, LinearBStackAllocator};
303#[cfg(all(feature = "alloc", feature = "set"))]
304pub use alloc::{BStackSliceWriter, FirstFitBStackAllocator};
305
306use std::fmt;
307use std::fs::{File, OpenOptions};
308use std::hash::{Hash, Hasher};
309use std::io::{self, Read, Seek, SeekFrom, Write};
310use std::path::Path;
311use std::sync::RwLock;
312
313#[cfg(unix)]
314use std::os::unix::fs::FileExt;
315#[cfg(unix)]
316use std::os::unix::io::AsRawFd;
317
318#[cfg(windows)]
319use std::os::windows::fs::FileExt as WindowsFileExt;
320#[cfg(windows)]
321use std::os::windows::io::AsRawHandle;
322#[cfg(windows)]
323use windows_sys::Win32::Storage::FileSystem::{
324    LOCKFILE_EXCLUSIVE_LOCK, LOCKFILE_FAIL_IMMEDIATELY, LockFileEx,
325};
326#[cfg(windows)]
327use windows_sys::Win32::System::IO::OVERLAPPED;
328
329/// Full magic for files written by this version (`BSTK` + major 0 + minor 1 + patch 5 + 0).
330const MAGIC: [u8; 8] = *b"BSTK\x00\x01\x05\x00";
331
332/// Compatibility prefix checked on open: `BSTK` + major 0 + minor 1.
333/// Any file whose first 6 bytes match is considered a compatible 0.1.x file.
334const MAGIC_PREFIX: [u8; 6] = *b"BSTK\x00\x01";
335
336/// Bytes occupied by the file header (magic[8] + committed_len[8]).
337const HEADER_SIZE: u64 = 16;
338
339/// Flush all in-flight writes to stable storage.
340///
341/// On macOS this uses `F_FULLFSYNC` to flush the drive's hardware write cache,
342/// which `fdatasync` alone does not guarantee.  Falls back to `sync_data` if
343/// `F_FULLFSYNC` returns an error (e.g. the device doesn't support it).
344/// On all other platforms this delegates to `sync_data` (`fdatasync`).
345fn durable_sync(file: &File) -> io::Result<()> {
346    #[cfg(target_os = "macos")]
347    {
348        let ret = unsafe { libc::fcntl(file.as_raw_fd(), libc::F_FULLFSYNC) };
349        if ret != -1 {
350            return Ok(());
351        }
352        // Device does not support F_FULLFSYNC; fall back to fdatasync.
353    }
354    file.sync_data()
355}
356
357/// Acquire an exclusive, non-blocking advisory flock on `file`.
358///
359/// Returns `Err(WouldBlock)` if another process already holds the lock.
360#[cfg(unix)]
361fn flock_exclusive(file: &File) -> io::Result<()> {
362    let ret = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX | libc::LOCK_NB) };
363    if ret == 0 {
364        Ok(())
365    } else {
366        Err(io::Error::last_os_error())
367    }
368}
369
370/// Acquire an exclusive, non-blocking `LockFileEx` lock on `file`.
371///
372/// Locks the entire file range (offset 0, length `u64::MAX`).
373/// Returns `Err(WouldBlock)` if another process already holds the lock
374/// (`ERROR_LOCK_VIOLATION` maps to `io::ErrorKind::WouldBlock` in Rust).
375#[cfg(windows)]
376fn lock_file_exclusive(file: &File) -> io::Result<()> {
377    let handle = file.as_raw_handle() as windows_sys::Win32::Foundation::HANDLE;
378    // OVERLAPPED is required by LockFileEx even for synchronous handles.
379    // Offset fields (0, 0) anchor the lock at byte 0 of the file.
380    let mut overlapped: OVERLAPPED = unsafe { std::mem::zeroed() };
381    let ret = unsafe {
382        LockFileEx(
383            handle,
384            LOCKFILE_EXCLUSIVE_LOCK | LOCKFILE_FAIL_IMMEDIATELY,
385            0,        // reserved, must be zero
386            u32::MAX, // nNumberOfBytesToLockLow  ─┐ lock entire
387            u32::MAX, // nNumberOfBytesToLockHigh ─┘ file space
388            &mut overlapped,
389        )
390    };
391    if ret != 0 {
392        Ok(())
393    } else {
394        Err(io::Error::last_os_error())
395    }
396}
397
398/// Write the 16-byte header into a brand-new (empty) file.
399fn init_header(file: &mut File) -> io::Result<()> {
400    file.seek(SeekFrom::Start(0))?;
401    file.write_all(&MAGIC)?;
402    file.write_all(&0u64.to_le_bytes())
403}
404
405/// Overwrite the committed-length field at file offset 8.
406fn write_committed_len(file: &mut File, len: u64) -> io::Result<()> {
407    file.seek(SeekFrom::Start(8))?;
408    file.write_all(&len.to_le_bytes())
409}
410
411/// Read `len` bytes from absolute file position `offset` without modifying
412/// the file-position cursor, so the caller only needs a shared (read) lock.
413///
414/// On Unix this uses `pread(2)` via `read_exact_at`.
415/// On Windows this uses `ReadFile` with an `OVERLAPPED` offset (via
416/// `seek_read`), which is also cursor-safe on synchronous handles.
417#[cfg(unix)]
418fn pread_exact(file: &File, offset: u64, len: usize) -> io::Result<Vec<u8>> {
419    let mut buf = vec![0u8; len];
420    file.read_exact_at(&mut buf, offset)?;
421    Ok(buf)
422}
423
424/// Windows counterpart of `pread_exact` — see the shared doc comment above.
425#[cfg(windows)]
426fn pread_exact(file: &File, offset: u64, len: usize) -> io::Result<Vec<u8>> {
427    let mut buf = vec![0u8; len];
428    let mut filled = 0usize;
429    while filled < len {
430        let n = file.seek_read(&mut buf[filled..], offset + filled as u64)?;
431        if n == 0 {
432            return Err(io::Error::new(
433                io::ErrorKind::UnexpectedEof,
434                "pread_exact: unexpected EOF",
435            ));
436        }
437        filled += n;
438    }
439    Ok(buf)
440}
441
442/// Fill `buf` from absolute file position `offset` without modifying the
443/// file-position cursor.  Unix uses `pread(2)` via `read_exact_at`;
444/// Windows uses `ReadFile` with an `OVERLAPPED` offset via `seek_read`.
445#[cfg(unix)]
446fn pread_exact_into(file: &File, offset: u64, buf: &mut [u8]) -> io::Result<()> {
447    file.read_exact_at(buf, offset)
448}
449
450/// Windows counterpart of `pread_exact_into`.
451#[cfg(windows)]
452fn pread_exact_into(file: &File, offset: u64, buf: &mut [u8]) -> io::Result<()> {
453    let len = buf.len();
454    let mut filled = 0usize;
455    while filled < len {
456        let n = file.seek_read(&mut buf[filled..], offset + filled as u64)?;
457        if n == 0 {
458            return Err(io::Error::new(
459                io::ErrorKind::UnexpectedEof,
460                "pread_exact_into: unexpected EOF",
461            ));
462        }
463        filled += n;
464    }
465    Ok(())
466}
467
468/// Read and validate the header; return the committed payload length.
469fn read_header(file: &mut File) -> io::Result<u64> {
470    file.seek(SeekFrom::Start(0))?;
471    let mut hdr = [0u8; 16];
472    file.read_exact(&mut hdr)?;
473    if hdr[0..6] != MAGIC_PREFIX {
474        return Err(io::Error::new(
475            io::ErrorKind::InvalidData,
476            "bstack: bad magic number — not a bstack file or incompatible version",
477        ));
478    }
479    Ok(u64::from_le_bytes(hdr[8..16].try_into().unwrap()))
480}
481
482// ---------------------------------------------------------------------------
483
484/// A persistent, fsync-durable binary stack backed by a single file.
485///
486/// See the [crate-level documentation](crate) for the file format, durability
487/// guarantees, crash recovery, multi-process safety, and thread-safety model.
488pub struct BStack {
489    lock: RwLock<File>,
490}
491
492impl BStack {
493    /// Open or create a stack file at `path`.
494    ///
495    /// On a **new** file the 16-byte header is written and durably synced
496    /// before returning.
497    ///
498    /// On an **existing** file the header is validated and, if a previous crash
499    /// left the file in an inconsistent state, the file is repaired and durably
500    /// synced before returning (see *Crash recovery* in the crate docs).
501    ///
502    /// On Unix an **exclusive advisory `flock`** is acquired; if another
503    /// process already holds the lock this function returns immediately with
504    /// [`io::ErrorKind::WouldBlock`].
505    ///
506    /// # Errors
507    ///
508    /// * [`io::ErrorKind::WouldBlock`] — another process holds the exclusive
509    ///   lock (Unix only).
510    /// * [`io::ErrorKind::InvalidData`] — the file exists but its header magic
511    ///   is wrong (not a bstack file, or created by an incompatible version),
512    ///   or the file is too short to contain a valid header.
513    /// * Any [`io::Error`] from [`OpenOptions::open`], `read`, `write`, or
514    ///   `durable_sync`.
515    pub fn open(path: impl AsRef<Path>) -> io::Result<Self> {
516        let mut file = OpenOptions::new()
517            .read(true)
518            .write(true)
519            .create(true)
520            .truncate(false)
521            .open(path)?;
522
523        #[cfg(unix)]
524        flock_exclusive(&file)?;
525
526        #[cfg(windows)]
527        lock_file_exclusive(&file)?;
528
529        let raw_size = file.metadata()?.len();
530
531        if raw_size == 0 {
532            init_header(&mut file)?;
533            durable_sync(&file)?;
534        } else if raw_size < HEADER_SIZE {
535            return Err(io::Error::new(
536                io::ErrorKind::InvalidData,
537                format!(
538                    "bstack: file is {raw_size} bytes — too small to contain the 16-byte header"
539                ),
540            ));
541        } else {
542            let committed_len = read_header(&mut file)?;
543            let actual_data_len = raw_size - HEADER_SIZE;
544            if actual_data_len != committed_len {
545                // Recover: use whichever length is smaller (the committed
546                // value is the last successfully synced boundary).
547                let correct_len = committed_len.min(actual_data_len);
548                file.set_len(HEADER_SIZE + correct_len)?;
549                write_committed_len(&mut file, correct_len)?;
550                durable_sync(&file)?;
551            }
552        }
553
554        Ok(BStack {
555            lock: RwLock::new(file),
556        })
557    }
558
559    /// Append `data` to the end of the file.
560    ///
561    /// Returns the **logical** byte offset at which `data` begins — i.e. the
562    /// payload size immediately before the write.  An empty slice is valid; it
563    /// writes nothing and returns the current end offset.
564    ///
565    /// # Atomicity
566    ///
567    /// Either the full payload is written, the header committed-length is
568    /// updated, and the whole thing is durably synced, or the file is
569    /// left unchanged (best-effort rollback via `ftruncate` + header reset).
570    ///
571    /// # Errors
572    ///
573    /// Returns any [`io::Error`] from `write_all`, `durable_sync`, or the
574    /// fallback `set_len`.
575    pub fn push(&self, data: &[u8]) -> io::Result<u64> {
576        let mut file = self.lock.write().unwrap();
577        let file_end = file.seek(SeekFrom::End(0))?;
578        let logical_offset = file_end - HEADER_SIZE;
579
580        if data.is_empty() {
581            return Ok(logical_offset);
582        }
583
584        if let Err(e) = file.write_all(data) {
585            let _ = file.set_len(file_end);
586            return Err(e);
587        }
588
589        let new_len = logical_offset + data.len() as u64;
590        if let Err(e) = write_committed_len(&mut file, new_len).and_then(|_| durable_sync(&file)) {
591            // Roll back: truncate data and reset header.
592            let _ = file.set_len(file_end);
593            let _ = write_committed_len(&mut file, logical_offset);
594            return Err(e);
595        }
596
597        Ok(logical_offset)
598    }
599
600    /// Append `n` zero bytes to the end of the file.
601    ///
602    /// Returns the **logical** byte offset at which the zeros begin — i.e. the
603    /// payload size immediately before the write.  `n = 0` is valid; it writes
604    /// nothing and returns the current end offset.
605    ///
606    /// # Atomicity
607    ///
608    /// Either the file is extended, the header committed-length is updated,
609    /// and the whole thing is durably synced, or the file is left unchanged
610    /// (best-effort rollback via `ftruncate` + header reset).
611    ///
612    /// # Errors
613    ///
614    /// Returns any [`io::Error`] from `set_len`, `durable_sync`, or the
615    /// fallback `set_len`.
616    pub fn extend(&self, n: u64) -> io::Result<u64> {
617        let mut file = self.lock.write().unwrap();
618        let file_end = file.seek(SeekFrom::End(0))?;
619        let logical_offset = file_end - HEADER_SIZE;
620
621        if n == 0 {
622            return Ok(logical_offset);
623        }
624
625        let new_file_end = file_end + n;
626        file.set_len(new_file_end)?;
627
628        let new_len = logical_offset + n;
629        if let Err(e) = write_committed_len(&mut file, new_len).and_then(|_| durable_sync(&file)) {
630            // Roll back: truncate and reset header.
631            let _ = file.set_len(file_end);
632            let _ = write_committed_len(&mut file, logical_offset);
633            return Err(e);
634        }
635
636        Ok(logical_offset)
637    }
638
639    /// Remove and return the last `n` bytes of the file.
640    ///
641    /// `n = 0` is valid: no bytes are removed and an empty `Vec` is returned.
642    /// `n` may span across multiple previous [`push`](Self::push) boundaries.
643    ///
644    /// # Atomicity
645    ///
646    /// The bytes are read before the file is truncated.  The committed-length
647    /// in the header is updated and durably synced after the truncation.
648    ///
649    /// # Errors
650    ///
651    /// Returns [`io::ErrorKind::InvalidInput`] if `n` exceeds the current
652    /// payload size.  Also propagates any I/O error from `read_exact`,
653    /// `set_len`, `write_all`, or `durable_sync`.
654    pub fn pop(&self, n: u64) -> io::Result<Vec<u8>> {
655        let mut file = self.lock.write().unwrap();
656        let raw_size = file.seek(SeekFrom::End(0))?;
657        let data_size = raw_size - HEADER_SIZE;
658        if n > data_size {
659            return Err(io::Error::new(
660                io::ErrorKind::InvalidInput,
661                format!("pop({n}) exceeds payload size ({data_size})"),
662            ));
663        }
664        let new_data_len = data_size - n;
665        file.seek(SeekFrom::Start(HEADER_SIZE + new_data_len))?;
666        let mut buf = vec![0u8; n as usize];
667        file.read_exact(&mut buf)?;
668        file.set_len(HEADER_SIZE + new_data_len)?;
669        write_committed_len(&mut file, new_data_len)?;
670        durable_sync(&file)?;
671        Ok(buf)
672    }
673
674    /// Return a copy of every payload byte from `offset` to the end of the
675    /// file.
676    ///
677    /// `offset` is a **logical** offset (as returned by [`push`](Self::push)).
678    /// `offset == len()` is valid and returns an empty `Vec`.  The file is not
679    /// modified.
680    ///
681    /// # Concurrency
682    ///
683    /// On Unix and Windows this uses a cursor-safe positional read (`pread(2)`
684    /// on Unix; `ReadFile`+`OVERLAPPED` on Windows), so the method takes only
685    /// the **read lock**, allowing multiple concurrent `peek` and `get` calls
686    /// to run in parallel.
687    ///
688    /// On other platforms a seek is required; the method falls back to the
689    /// write lock and concurrent reads serialise.
690    ///
691    /// # Errors
692    ///
693    /// Returns [`io::ErrorKind::InvalidInput`] if `offset` exceeds the current
694    /// payload size.
695    pub fn peek(&self, offset: u64) -> io::Result<Vec<u8>> {
696        #[cfg(any(unix, windows))]
697        {
698            let file = self.lock.read().unwrap();
699            let data_size = file.metadata()?.len().saturating_sub(HEADER_SIZE);
700            if offset > data_size {
701                return Err(io::Error::new(
702                    io::ErrorKind::InvalidInput,
703                    format!("peek offset ({offset}) exceeds payload size ({data_size})"),
704                ));
705            }
706            pread_exact(&file, HEADER_SIZE + offset, (data_size - offset) as usize)
707        }
708        #[cfg(not(any(unix, windows)))]
709        {
710            let mut file = self.lock.write().unwrap();
711            let raw_size = file.seek(SeekFrom::End(0))?;
712            let data_size = raw_size.saturating_sub(HEADER_SIZE);
713            if offset > data_size {
714                return Err(io::Error::new(
715                    io::ErrorKind::InvalidInput,
716                    format!("peek offset ({offset}) exceeds payload size ({data_size})"),
717                ));
718            }
719            file.seek(SeekFrom::Start(HEADER_SIZE + offset))?;
720            let mut buf = vec![0u8; (data_size - offset) as usize];
721            file.read_exact(&mut buf)?;
722            Ok(buf)
723        }
724    }
725
726    /// Return a copy of the bytes in the half-open logical range `[start, end)`.
727    ///
728    /// `start == end` is valid and returns an empty `Vec`.  The file is not
729    /// modified.
730    ///
731    /// # Concurrency
732    ///
733    /// Same as [`peek`](Self::peek): on Unix and Windows the read lock is
734    /// taken and concurrent `get`/`peek`/`len` calls may run in parallel.  On
735    /// other platforms the write lock is taken and reads serialise.
736    ///
737    /// # Errors
738    ///
739    /// Returns [`io::ErrorKind::InvalidInput`] if `end < start` or if `end`
740    /// exceeds the current payload size.
741    pub fn get(&self, start: u64, end: u64) -> io::Result<Vec<u8>> {
742        if end < start {
743            return Err(io::Error::new(
744                io::ErrorKind::InvalidInput,
745                format!("get: end ({end}) < start ({start})"),
746            ));
747        }
748        #[cfg(any(unix, windows))]
749        {
750            let file = self.lock.read().unwrap();
751            let data_size = file.metadata()?.len().saturating_sub(HEADER_SIZE);
752            if end > data_size {
753                return Err(io::Error::new(
754                    io::ErrorKind::InvalidInput,
755                    format!("get: end ({end}) exceeds payload size ({data_size})"),
756                ));
757            }
758            pread_exact(&file, HEADER_SIZE + start, (end - start) as usize)
759        }
760        #[cfg(not(any(unix, windows)))]
761        {
762            let mut file = self.lock.write().unwrap();
763            let raw_size = file.seek(SeekFrom::End(0))?;
764            let data_size = raw_size.saturating_sub(HEADER_SIZE);
765            if end > data_size {
766                return Err(io::Error::new(
767                    io::ErrorKind::InvalidInput,
768                    format!("get: end ({end}) exceeds payload size ({data_size})"),
769                ));
770            }
771            file.seek(SeekFrom::Start(HEADER_SIZE + start))?;
772            let mut buf = vec![0u8; (end - start) as usize];
773            file.read_exact(&mut buf)?;
774            Ok(buf)
775        }
776    }
777
778    /// Fill `buf` with bytes from logical `offset` to `offset + buf.len()`.
779    ///
780    /// Reads exactly `buf.len()` bytes from `offset` into the caller-supplied
781    /// buffer.  An empty buffer is a valid no-op.  The file is not modified.
782    ///
783    /// Use this instead of [`peek`](Self::peek) when the destination buffer is
784    /// already allocated and you want to avoid the extra heap allocation.
785    ///
786    /// # Concurrency
787    ///
788    /// Same as [`peek`](Self::peek): on Unix and Windows only the read lock is
789    /// taken; on other platforms the write lock serialises all reads.
790    ///
791    /// # Errors
792    ///
793    /// Returns [`io::ErrorKind::InvalidInput`] if `offset + buf.len()` overflows
794    /// `u64` or exceeds the current payload size.
795    pub fn peek_into(&self, offset: u64, buf: &mut [u8]) -> io::Result<()> {
796        if buf.is_empty() {
797            return Ok(());
798        }
799        let len = buf.len() as u64;
800        let end = offset.checked_add(len).ok_or_else(|| {
801            io::Error::new(
802                io::ErrorKind::InvalidInput,
803                "peek_into: offset + len overflows u64",
804            )
805        })?;
806        #[cfg(any(unix, windows))]
807        {
808            let file = self.lock.read().unwrap();
809            let data_size = file.metadata()?.len().saturating_sub(HEADER_SIZE);
810            if end > data_size {
811                return Err(io::Error::new(
812                    io::ErrorKind::InvalidInput,
813                    format!(
814                        "peek_into: range [{offset}, {end}) exceeds payload size ({data_size})"
815                    ),
816                ));
817            }
818            pread_exact_into(&file, HEADER_SIZE + offset, buf)
819        }
820        #[cfg(not(any(unix, windows)))]
821        {
822            let mut file = self.lock.write().unwrap();
823            let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE);
824            if end > data_size {
825                return Err(io::Error::new(
826                    io::ErrorKind::InvalidInput,
827                    format!(
828                        "peek_into: range [{offset}, {end}) exceeds payload size ({data_size})"
829                    ),
830                ));
831            }
832            file.seek(SeekFrom::Start(HEADER_SIZE + offset))?;
833            file.read_exact(buf)
834        }
835    }
836
837    /// Fill `buf` with bytes from the half-open logical range
838    /// `[start, start + buf.len())`.
839    ///
840    /// An empty buffer is a valid no-op.  The file is not modified.
841    ///
842    /// Use this instead of [`get`](Self::get) when the destination buffer is
843    /// already allocated and you want to avoid the extra heap allocation.
844    ///
845    /// # Concurrency
846    ///
847    /// Same as [`get`](Self::get): on Unix and Windows only the read lock is
848    /// taken; on other platforms the write lock serialises all reads.
849    ///
850    /// # Errors
851    ///
852    /// Returns [`io::ErrorKind::InvalidInput`] if `start + buf.len()` overflows
853    /// `u64` or exceeds the current payload size.
854    pub fn get_into(&self, start: u64, buf: &mut [u8]) -> io::Result<()> {
855        if buf.is_empty() {
856            return Ok(());
857        }
858        let len = buf.len() as u64;
859        let end = start.checked_add(len).ok_or_else(|| {
860            io::Error::new(
861                io::ErrorKind::InvalidInput,
862                "get_into: start + len overflows u64",
863            )
864        })?;
865        #[cfg(any(unix, windows))]
866        {
867            let file = self.lock.read().unwrap();
868            let data_size = file.metadata()?.len().saturating_sub(HEADER_SIZE);
869            if end > data_size {
870                return Err(io::Error::new(
871                    io::ErrorKind::InvalidInput,
872                    format!("get_into: end ({end}) exceeds payload size ({data_size})"),
873                ));
874            }
875            pread_exact_into(&file, HEADER_SIZE + start, buf)
876        }
877        #[cfg(not(any(unix, windows)))]
878        {
879            let mut file = self.lock.write().unwrap();
880            let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE);
881            if end > data_size {
882                return Err(io::Error::new(
883                    io::ErrorKind::InvalidInput,
884                    format!("get_into: end ({end}) exceeds payload size ({data_size})"),
885                ));
886            }
887            file.seek(SeekFrom::Start(HEADER_SIZE + start))?;
888            file.read_exact(buf)
889        }
890    }
891
892    /// Remove the last `buf.len()` bytes from the file and write them into `buf`.
893    ///
894    /// An empty buffer is a valid no-op: no bytes are removed.
895    ///
896    /// Use this instead of [`pop`](Self::pop) when the destination buffer is
897    /// already allocated and you want to avoid the extra heap allocation.
898    ///
899    /// # Atomicity
900    ///
901    /// Same guarantees as [`pop`](Self::pop).
902    ///
903    /// # Errors
904    ///
905    /// Returns [`io::ErrorKind::InvalidInput`] if `buf.len()` exceeds the
906    /// current payload size.  Also propagates any I/O error from `read_exact`,
907    /// `set_len`, `write_all`, or `durable_sync`.
908    pub fn pop_into(&self, buf: &mut [u8]) -> io::Result<()> {
909        if buf.is_empty() {
910            return Ok(());
911        }
912        let n = buf.len() as u64;
913        let mut file = self.lock.write().unwrap();
914        let raw_size = file.seek(SeekFrom::End(0))?;
915        let data_size = raw_size - HEADER_SIZE;
916        if n > data_size {
917            return Err(io::Error::new(
918                io::ErrorKind::InvalidInput,
919                format!("pop_into({n}) exceeds payload size ({data_size})"),
920            ));
921        }
922        let new_data_len = data_size - n;
923        file.seek(SeekFrom::Start(HEADER_SIZE + new_data_len))?;
924        file.read_exact(buf)?;
925        file.set_len(HEADER_SIZE + new_data_len)?;
926        write_committed_len(&mut file, new_data_len)?;
927        durable_sync(&file)?;
928        Ok(())
929    }
930
931    /// Remove (discard) the last `n` bytes from the file without returning them.
932    ///
933    /// Equivalent to [`pop`](Self::pop) but avoids allocating a buffer for the
934    /// removed bytes.  `n = 0` is valid and is a no-op.
935    ///
936    /// # Atomicity
937    ///
938    /// Same guarantees as [`pop`](Self::pop).
939    ///
940    /// # Errors
941    ///
942    /// Returns [`io::ErrorKind::InvalidInput`] if `n` exceeds the current
943    /// payload size.  Also propagates any I/O error from `set_len`,
944    /// `write_all`, or `durable_sync`.
945    pub fn discard(&self, n: u64) -> io::Result<()> {
946        if n == 0 {
947            return Ok(());
948        }
949        let mut file = self.lock.write().unwrap();
950        let raw_size = file.seek(SeekFrom::End(0))?;
951        let data_size = raw_size - HEADER_SIZE;
952        if n > data_size {
953            return Err(io::Error::new(
954                io::ErrorKind::InvalidInput,
955                format!("discard({n}) exceeds payload size ({data_size})"),
956            ));
957        }
958        let new_data_len = data_size - n;
959        file.set_len(HEADER_SIZE + new_data_len)?;
960        write_committed_len(&mut file, new_data_len)?;
961        durable_sync(&file)?;
962        Ok(())
963    }
964
965    /// Overwrite `data` bytes in place starting at logical `offset`.
966    ///
967    /// The file size is never changed: if `offset + data.len()` would exceed
968    /// the current payload size the call is rejected.  An empty slice is a
969    /// valid no-op.
970    ///
971    /// # Feature flag
972    ///
973    /// Only available when the `set` Cargo feature is enabled.
974    ///
975    /// # Durability
976    ///
977    /// Equivalent to `push`/`pop`: the overwritten bytes are durably synced
978    /// before the call returns.
979    ///
980    /// # Errors
981    ///
982    /// Returns [`io::ErrorKind::InvalidInput`] if `offset + data.len()`
983    /// exceeds the current payload size, or if the addition overflows `u64`.
984    /// Propagates any I/O error from `write_all` or `durable_sync`.
985    #[cfg(feature = "set")]
986    pub fn set(&self, offset: u64, data: &[u8]) -> io::Result<()> {
987        if data.is_empty() {
988            return Ok(());
989        }
990        let end = offset.checked_add(data.len() as u64).ok_or_else(|| {
991            io::Error::new(
992                io::ErrorKind::InvalidInput,
993                "set: offset + len overflows u64",
994            )
995        })?;
996        let mut file = self.lock.write().unwrap();
997        let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE);
998        if end > data_size {
999            return Err(io::Error::new(
1000                io::ErrorKind::InvalidInput,
1001                format!("set: write end ({end}) exceeds payload size ({data_size})"),
1002            ));
1003        }
1004        file.seek(SeekFrom::Start(HEADER_SIZE + offset))?;
1005        file.write_all(data)?;
1006        durable_sync(&file)
1007    }
1008
1009    /// Overwrite `n` bytes with zeros in place starting at logical `offset`.
1010    ///
1011    /// The file size is never changed: if `offset + n` would exceed
1012    /// the current payload size the call is rejected.  `n = 0` is a
1013    /// valid no-op.
1014    ///
1015    /// # Feature flag
1016    ///
1017    /// Only available when the `set` Cargo feature is enabled.
1018    ///
1019    /// # Durability
1020    ///
1021    /// Equivalent to `push`/`pop`: the overwritten bytes are durably synced
1022    /// before the call returns.
1023    ///
1024    /// # Errors
1025    ///
1026    /// Returns [`io::ErrorKind::InvalidInput`] if `offset + n`
1027    /// exceeds the current payload size, or if the addition overflows `u64`.
1028    /// Propagates any I/O error from `write_all` or `durable_sync`.
1029    #[cfg(feature = "set")]
1030    pub fn zero(&self, offset: u64, n: u64) -> io::Result<()> {
1031        if n == 0 {
1032            return Ok(());
1033        }
1034        let end = offset.checked_add(n).ok_or_else(|| {
1035            io::Error::new(
1036                io::ErrorKind::InvalidInput,
1037                "zero: offset + n overflows u64",
1038            )
1039        })?;
1040        let mut file = self.lock.write().unwrap();
1041        let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE);
1042        if end > data_size {
1043            return Err(io::Error::new(
1044                io::ErrorKind::InvalidInput,
1045                format!("zero: write end ({end}) exceeds payload size ({data_size})"),
1046            ));
1047        }
1048        file.seek(SeekFrom::Start(HEADER_SIZE + offset))?;
1049        let zeros = vec![0u8; n as usize];
1050        file.write_all(&zeros)?;
1051        durable_sync(&file)
1052    }
1053
1054    /// Return the current **logical** payload size in bytes (excludes the
1055    /// 16-byte header).
1056    ///
1057    /// Takes the read lock, so it can run concurrently with other `len` calls
1058    /// but blocks while any write-lock operation is in progress.  The returned
1059    /// value always reflects a clean operation boundary.
1060    ///
1061    /// # Errors
1062    ///
1063    /// Propagates any [`io::Error`] from [`File::metadata`].
1064    pub fn len(&self) -> io::Result<u64> {
1065        let file = self.lock.read().unwrap();
1066        Ok(file.metadata()?.len().saturating_sub(HEADER_SIZE))
1067    }
1068
1069    /// Return `true` if the stack contains no payload bytes.
1070    ///
1071    /// # Errors
1072    ///
1073    /// Propagates any [`io::Error`] from [`File::metadata`].
1074    pub fn is_empty(&self) -> io::Result<bool> {
1075        Ok(self.len()? == 0)
1076    }
1077}
1078
1079// ---------------------------------------------------------------------------
1080// io::Write
1081
1082/// Appends bytes to the stack.
1083///
1084/// Each call to [`write`](io::Write::write) is equivalent to [`push`](BStack::push):
1085/// all bytes are written atomically and durably synced before returning.
1086/// Calling `write_all` or chaining multiple `write` calls therefore issues
1087/// one `durable_sync` per call — callers that need to batch many small writes
1088/// without per-write syncs should accumulate data and call `push` directly.
1089///
1090/// [`flush`](io::Write::flush) is a no-op because every `write` is already
1091/// durable.
1092impl io::Write for BStack {
1093    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1094        self.push(buf)?;
1095        Ok(buf.len())
1096    }
1097
1098    fn flush(&mut self) -> io::Result<()> {
1099        Ok(())
1100    }
1101}
1102
1103/// Shared-reference counterpart of `impl Write for BStack`.
1104///
1105/// Because [`push`](BStack::push) takes `&self` (interior mutability via
1106/// `RwLock`), the `Write` implementation is also available on `&BStack`,
1107/// mirroring the standard library's `impl Write for &File`.
1108impl io::Write for &BStack {
1109    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1110        self.push(buf)?;
1111        Ok(buf.len())
1112    }
1113
1114    fn flush(&mut self) -> io::Result<()> {
1115        Ok(())
1116    }
1117}
1118
1119impl fmt::Debug for BStack {
1120    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1121        f.debug_struct("BStack")
1122            .field(
1123                "version",
1124                &format!("{}.{}.{}", MAGIC[4], MAGIC[5], MAGIC[6]),
1125            )
1126            .field("len", &self.len().ok())
1127            .finish_non_exhaustive()
1128    }
1129}
1130
1131impl Eq for BStack {}
1132
1133/// Two `BStack` instances are equal iff they are the **same instance** in memory.
1134///
1135/// Because [`BStack::open`] acquires an exclusive advisory lock, no two
1136/// `BStack` values within one process can refer to the same file at the same
1137/// time.  Pointer identity is therefore the only meaningful equality: a stack
1138/// is equal to itself and to nothing else.
1139impl PartialEq for BStack {
1140    fn eq(&self, other: &Self) -> bool {
1141        std::ptr::eq(self, other)
1142    }
1143}
1144
1145/// Hashes the instance address, consistent with the pointer-identity [`PartialEq`].
1146impl Hash for BStack {
1147    fn hash<H: Hasher>(&self, state: &mut H) {
1148        (self as *const BStack).hash(state);
1149    }
1150}
1151
1152/// A cursor-based reader over a [`BStack`] payload.
1153///
1154/// `BStackReader` implements [`io::Read`] and [`io::Seek`], allowing the
1155/// stack's payload to be consumed through any interface that expects a
1156/// readable, seekable byte stream.
1157///
1158/// # Construction
1159///
1160/// ```no_run
1161/// use bstack::BStack;
1162///
1163/// # fn main() -> std::io::Result<()> {
1164/// let stack = BStack::open("log.bin")?;
1165/// stack.push(b"hello world")?;
1166///
1167/// // Start reading from the beginning.
1168/// let mut reader = stack.reader();
1169///
1170/// // Or start from an arbitrary offset.
1171/// let mut mid = stack.reader_at(6);
1172/// # Ok(())
1173/// # }
1174/// ```
1175///
1176/// # Concurrency
1177///
1178/// `BStackReader` borrows the stack immutably, so multiple readers can coexist
1179/// and run concurrently with each other and with [`peek`](BStack::peek) /
1180/// [`get`](BStack::get) calls.  Concurrent [`push`](BStack::push) or
1181/// [`pop`](BStack::pop) operations are not blocked by an active reader, but
1182/// reading interleaved with writes may observe different snapshots of the
1183/// payload across calls — callers are responsible for synchronisation when
1184/// that matters.
1185pub struct BStackReader<'a> {
1186    stack: &'a BStack,
1187    offset: u64,
1188}
1189
1190impl BStack {
1191    /// Create a [`BStackReader`] positioned at the start of the payload.
1192    pub fn reader(&self) -> BStackReader<'_> {
1193        BStackReader {
1194            stack: self,
1195            offset: 0,
1196        }
1197    }
1198
1199    /// Create a [`BStackReader`] positioned at `offset` bytes into the payload.
1200    ///
1201    /// Seeking past the current end is allowed; [`read`](io::Read::read) will
1202    /// return `Ok(0)` until new data is pushed past that point.
1203    pub fn reader_at(&self, offset: u64) -> BStackReader<'_> {
1204        BStackReader {
1205            stack: self,
1206            offset,
1207        }
1208    }
1209}
1210
1211impl<'a> BStackReader<'a> {
1212    /// Return the current logical read offset within the payload.
1213    pub fn position(&self) -> u64 {
1214        self.offset
1215    }
1216}
1217
1218impl<'a> From<&'a BStack> for BStackReader<'a> {
1219    fn from(stack: &'a BStack) -> Self {
1220        stack.reader()
1221    }
1222}
1223
1224impl<'a> From<BStackReader<'a>> for &'a BStack {
1225    fn from(val: BStackReader<'a>) -> Self {
1226        val.stack
1227    }
1228}
1229
1230/// Two readers are equal when they point to the **same `BStack` instance**
1231/// (pointer identity) and share the same cursor `offset`.
1232impl<'a> PartialEq for BStackReader<'a> {
1233    fn eq(&self, other: &Self) -> bool {
1234        self.stack == other.stack && self.offset == other.offset
1235    }
1236}
1237
1238impl<'a> Eq for BStackReader<'a> {}
1239
1240/// Hashes `(BStack pointer, offset)`, consistent with [`PartialEq`].
1241impl<'a> Hash for BStackReader<'a> {
1242    fn hash<H: Hasher>(&self, state: &mut H) {
1243        self.stack.hash(state);
1244        self.offset.hash(state);
1245    }
1246}
1247
1248impl<'a> PartialOrd for BStackReader<'a> {
1249    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1250        Some(self.cmp(other))
1251    }
1252}
1253
1254/// Ordered by `BStack` instance address, then by cursor `offset`.
1255///
1256/// The address component groups all readers over the same stack together,
1257/// and within that group the natural read order (smaller offset first) applies.
1258/// This ordering is consistent with the pointer-identity [`PartialEq`].
1259impl<'a> Ord for BStackReader<'a> {
1260    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1261        let self_ptr = self.stack as *const BStack as usize;
1262        let other_ptr = other.stack as *const BStack as usize;
1263        self_ptr
1264            .cmp(&other_ptr)
1265            .then(self.offset.cmp(&other.offset))
1266    }
1267}
1268
1269impl<'a> io::Read for BStackReader<'a> {
1270    /// Read bytes from the current position into `buf`.
1271    ///
1272    /// Returns the number of bytes read, which may be less than `buf.len()` if
1273    /// the end of the payload is reached.  Returns `Ok(0)` at EOF.
1274    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1275        if buf.is_empty() {
1276            return Ok(0);
1277        }
1278        let data_size = self.stack.len()?;
1279        if self.offset >= data_size {
1280            return Ok(0);
1281        }
1282        let available = (data_size - self.offset) as usize;
1283        let n = buf.len().min(available);
1284        self.stack.get_into(self.offset, &mut buf[..n])?;
1285        self.offset += n as u64;
1286        Ok(n)
1287    }
1288}
1289
1290impl<'a> io::Seek for BStackReader<'a> {
1291    /// Move the read cursor.
1292    ///
1293    /// [`SeekFrom::Start`] and [`SeekFrom::Current`] with a non-negative delta
1294    /// may advance the cursor past the current end of the payload; subsequent
1295    /// [`read`](io::Read::read) calls will return `Ok(0)` until the payload
1296    /// grows past that point.  Seeking before the start of the payload returns
1297    /// [`io::ErrorKind::InvalidInput`].
1298    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
1299        let data_size = self.stack.len()? as i128;
1300        let new_offset = match pos {
1301            SeekFrom::Start(n) => n as i128,
1302            SeekFrom::End(n) => data_size + n as i128,
1303            SeekFrom::Current(n) => self.offset as i128 + n as i128,
1304        };
1305        if new_offset < 0 {
1306            return Err(io::Error::new(
1307                io::ErrorKind::InvalidInput,
1308                "seek before beginning of payload",
1309            ));
1310        }
1311        self.offset = new_offset as u64;
1312        Ok(self.offset)
1313    }
1314}