Skip to main content

bstack/
lib.rs

1//! A persistent, fsync-durable binary stack backed by a single file.
2//!
3//! # Overview
4//!
5//! [`BStack`] treats a file as a flat byte buffer that grows and shrinks from
6//! the tail.  Every mutating operation — [`push`](BStack::push),
7//! [`extend`](BStack::extend), [`pop`](BStack::pop), [`discard`](BStack::discard), (with the `set`
8//! feature) [`set`](BStack::set) and [`zero`](BStack::zero), (with the `atomic` feature)
9//! [`replace`](BStack::replace), and (with both `set` and `atomic`)
10//! [`process`](BStack::process) — calls a *durable sync* before returning,
11//! so the data survives a process crash or an unclean system shutdown.
12//! Read-only operations — [`peek`](BStack::peek),
13//! [`peek_into`](BStack::peek_into), [`get`](BStack::get), and
14//! [`get_into`](BStack::get_into) — never modify the file and on Unix and
15//! Windows can run concurrently with each other.
16//! [`pop_into`](BStack::pop_into) is the buffer-passing counterpart of `pop`,
17//! carrying the same durability and atomicity guarantees.
18//! [`discard`](BStack::discard) is like `pop` but discards the removed bytes
19//! without reading or returning them, avoiding any allocation or copy.
20//!
21//! The crate depends on **`libc`** (Unix) and **`windows-sys`** (Windows) for
22//! platform-specific syscalls, and uses **no `unsafe` code beyond the required
23//! FFI calls**.
24//!
25//! # File format
26//!
27//! Every file begins with a fixed 16-byte header:
28//!
29//! ```text
30//! ┌────────────────────────┬──────────────┬──────────────┐
31//! │      header (16 B)     │  payload 0   │  payload 1   │  ...
32//! │  magic[8] | clen[8 LE] │              │              │
33//! └────────────────────────┴──────────────┴──────────────┘
34//! ^                        ^              ^              ^
35//! file offset 0         offset 16      16+n0          EOF
36//! ```
37//!
38//! * **`magic`** — 8 bytes: `BSTK` + major(1 B) + minor(1 B) + patch(1 B) + reserved(1 B).
39//!   This version writes `BSTK\x00\x01\x0a\x00` (0.1.10).  [`open`](BStack::open)
40//!   accepts any file whose first 6 bytes match `BSTK\x00\x01` (any 0.1.x) and
41//!   rejects anything with a different major or minor.
42//! * **`clen`** — little-endian `u64` recording the *committed* payload length.
43//!   It is updated atomically with each [`push`](BStack::push) or
44//!   [`pop`](BStack::pop) and is used for crash recovery on the next
45//!   [`open`](BStack::open).
46//!
47//! All user-visible offsets are **logical** (0-based from the start of the
48//! payload region, i.e. from file byte 16).
49//!
50//! # Crash recovery
51//!
52//! On [`open`](BStack::open), the header's committed length is compared against
53//! the actual file size:
54//!
55//! | Condition | Cause | Recovery |
56//! |-----------|-------|----------|
57//! | `file_size − 16 > clen` | partial tail write (push crashed before header update) | truncate to `16 + clen` |
58//! | `file_size − 16 < clen` | partial truncation (pop crashed before header update) | set `clen = file_size − 16` |
59//!
60//! After recovery a `durable_sync` ensures the repaired state is on stable
61//! storage before any caller can observe or modify the file.
62//!
63//! # Durability
64//!
65//! | Operation | Syscall sequence |
66//! |-----------|-----------------|
67//! | `push` | `lseek(END)` → `write(data)` → `lseek(8)` → `write(clen)` → `durable_sync` |
68//! | `extend` | `lseek(END)` → `set_len(new_end)` → `lseek(8)` → `write(clen)` → `durable_sync` |
69//! | `pop`, `pop_into` | `lseek` → `read` → `ftruncate` → `lseek(8)` → `write(clen)` → `durable_sync` |
70//! | `discard` | `ftruncate` → `lseek(8)` → `write(clen)` → `durable_sync` |
71//! | `set` *(feature)* | `lseek(offset)` → `write(data)` → `durable_sync` |
72//! | `zero` *(feature)* | `lseek(offset)` → `write(zeros)` → `durable_sync` |
73//! | `atrunc` *(feature: atomic, net extension)* | `set_len(new_end)` → `lseek(tail)` → `write(buf)` → `durable_sync` → `lseek(8)` → `write(clen)` |
74//! | `atrunc` *(feature: atomic, net truncation)* | `lseek(tail)` → `write(buf)` → `set_len(new_end)` → `durable_sync` → `lseek(8)` → `write(clen)` |
75//! | `splice`, `splice_into` *(feature: atomic)* | `lseek(tail)` → `read(n)` → *(then as `atrunc`)* |
76//! | `try_extend` *(feature: atomic)* | `lseek(END)` — conditional `push` sequence if size matches |
77//! | `try_discard` *(feature: atomic)* | `lseek(END)` — conditional `discard` sequence if size matches |
78//! | `swap`, `swap_into` *(features: set+atomic)* | `lseek(offset)` → `read` → `lseek(offset)` → `write(buf)` → `durable_sync` |
79//! | `cas` *(features: set+atomic)* | `lseek(offset)` → `read` → compare — conditional `lseek(offset)` → `write(new)` → `durable_sync` |
80//! | `process` *(features: set+atomic)* | `lseek(start)` → `read(end−start)` → *(callback)* → `lseek(start)` → `write(buf)` → `durable_sync` |
81//! | `replace` *(feature: atomic)* | `lseek(tail)` → `read(n)` → *(callback)* → *(then as `atrunc`)* |
82//! | `peek`, `peek_into`, `get`, `get_into` | `pread(2)` on Unix; `ReadFile`+`OVERLAPPED` on Windows; `lseek` → `read` elsewhere (no sync — read-only) |
83//!
84//! **`durable_sync` on macOS** issues `fcntl(F_FULLFSYNC)`, which flushes the
85//! drive's hardware write cache.  Plain `fdatasync` is not sufficient on macOS
86//! because the kernel may acknowledge it before the drive controller has
87//! committed the data.  If `F_FULLFSYNC` is not supported by the device the
88//! implementation falls back to `sync_data` (`fdatasync`).
89//!
90//! **`durable_sync` on other Unix** calls `sync_data` (`fdatasync`), which is
91//! sufficient on Linux and BSD.
92//!
93//! **`durable_sync` on Windows** calls `sync_data`, which maps to
94//! `FlushFileBuffers`.  This flushes the kernel write-back cache and waits for
95//! the drive to acknowledge, providing equivalent durability to `fdatasync`.
96//!
97//! # Multi-process safety
98//!
99//! On Unix, [`open`](BStack::open) acquires an **exclusive advisory `flock`**
100//! on the file (`LOCK_EX | LOCK_NB`).  If another process already holds the
101//! lock, `open` returns immediately with [`io::ErrorKind::WouldBlock`] rather
102//! than blocking indefinitely.  The lock is released automatically when the
103//! [`BStack`] is dropped (the underlying file descriptor is closed).
104//!
105//! On Windows, [`open`](BStack::open) acquires an **exclusive `LockFileEx`**
106//! lock (`LOCKFILE_EXCLUSIVE_LOCK | LOCKFILE_FAIL_IMMEDIATELY`) covering the
107//! entire file range.  If another process already holds the lock, `open`
108//! returns immediately with [`io::ErrorKind::WouldBlock`]
109//! (`ERROR_LOCK_VIOLATION`).  The lock is released when the [`BStack`] is
110//! dropped (the underlying file handle is closed).
111//!
112//! > **Note:** Both `flock` (Unix) and `LockFileEx` (Windows) are advisory
113//! > and per-process.  They prevent well-behaved concurrent opens across
114//! > processes but do not protect against processes that bypass the lock or
115//! > against raw writes to the file.
116//!
117//! # Correct usage
118//!
119//! bstack files must only be opened through this crate or a compatible
120//! implementation that understands the file format, the header protocol, and
121//! the locking semantics.  Reading or writing the underlying file with raw
122//! tools or syscalls while a [`BStack`] instance is live — or manually editing
123//! the header fields — can silently corrupt the committed-length sentinel or
124//! bypass the advisory lock.
125//!
126//! **The authors make no guarantees about the behaviour of this crate —
127//! including freedom from data loss or logical corruption — when the file has
128//! been accessed outside of this crate's controlled interface.**
129//!
130//! # Thread safety
131//!
132//! `BStack` wraps the file in a [`std::sync::RwLock`].
133//!
134//! | Operation | Lock (Unix / Windows) | Lock (other) |
135//! |-----------|-----------------------|--------------|
136//! | `push`, `extend`, `pop`, `pop_into`, `discard` | write | write |
137//! | `set`, `zero` *(feature)* | write | write |
138//! | `atrunc`, `splice`, `splice_into`, `try_extend` *(feature: atomic)* | write | write |
139//! | `try_discard(s, n > 0)` *(feature: atomic)* | write | write |
140//! | `try_discard(s, 0)` *(feature: atomic)* | **read** | **read** |
141//! | `swap`, `swap_into`, `cas` *(features: set+atomic)* | write | write |
142//! | `process` *(features: set+atomic)* | write | write |
143//! | `replace` *(feature: atomic)* | write | write |
144//! | `peek`, `peek_into`, `get`, `get_into` | **read** (or **none** for ranges entirely within the locked region) | write |
145//! | `len` | read | read |
146//!
147//! On Unix and Windows, `peek`, `peek_into`, `get`, and `get_into` use a
148//! cursor-safe positional read (`pread(2)` on Unix; `ReadFile` with
149//! `OVERLAPPED` on Windows) that does not modify the file-position cursor.
150//! This allows multiple concurrent calls to any of these methods to run in
151//! parallel while any ongoing `push`, `pop`, or `pop_into` still serialises
152//! all writers via the write lock.  When a read range lies entirely within
153//! the [locked region](#locked-region-lock_up_to), the rwlock is bypassed
154//! altogether — see that section for the concurrency model.
155//!
156//! On other platforms a seek is required, so `peek`, `peek_into`, `get`, and
157//! `get_into` fall back to the write lock and all reads serialise.
158//!
159//! # Locked region (`lock_up_to`)
160//!
161//! [`BStack`] maintains an in-memory **monotonically growing partition
162//! boundary** named the *locked region*.  Bytes in `[0, locked_len())` are
163//! declared permanently immutable for the lifetime of the open file.
164//!
165//! The locked length starts at `0` on every [`open`](BStack::open) and is
166//! **not persisted to disk** — the file format is unchanged.  Callers extend
167//! the boundary by calling [`lock_up_to`](BStack::lock_up_to) (or open and
168//! lock in one step with [`open_locked_up_to`](BStack::open_locked_up_to)).
169//! It can only grow; attempts to shrink it return
170//! [`io::ErrorKind::InvalidInput`].
171//!
172//! ## Effects
173//!
174//! * **Lock-free reads on Unix and Windows.**  When [`get`](BStack::get),
175//!   [`get_into`](BStack::get_into), or [`peek_into`](BStack::peek_into) are
176//!   called with a range that lies entirely within the locked region, the
177//!   rwlock is bypassed and the read is served directly by `pread(2)`
178//!   (Unix) or `ReadFile` + `OVERLAPPED` (Windows).  The `fstat` size check
179//!   is skipped too — the locked length is a sufficient upper bound.
180//!
181//! * **Write protection.**  [`set`](BStack::set), [`zero`](BStack::zero),
182//!   [`swap`](BStack::swap), [`swap_into`](BStack::swap_into),
183//!   [`cas`](BStack::cas), and [`process`](BStack::process) return
184//!   [`io::ErrorKind::InvalidInput`] when their target range overlaps the
185//!   locked region.  [`atrunc`](BStack::atrunc), [`splice`](BStack::splice),
186//!   [`splice_into`](BStack::splice_into), and [`replace`](BStack::replace)
187//!   return the same error when the operation would modify bytes inside it.
188//!
189//! * **Shrink protection.**  [`pop`](BStack::pop),
190//!   [`pop_into`](BStack::pop_into), [`discard`](BStack::discard), and
191//!   [`try_discard`](BStack::try_discard) return
192//!   [`io::ErrorKind::InvalidInput`] when they would shrink the payload
193//!   below the locked length.
194//!
195//! Callers that never invoke `lock_up_to` see no behavioural change — every
196//! read and write path adds only a single uncontended `AtomicU64::load` and
197//! a comparison.
198//!
199//! ## Concurrency model
200//!
201//! `lock_up_to(n)` acquires the exclusive write lock before publishing the
202//! new boundary with a `Release` store.  Lock-free readers `Acquire`-load
203//! `locked` before each call.  Two consequences follow:
204//!
205//! * A stale load is always safe.  If a reader sees an older (smaller)
206//!   `locked` value, it falls through to the rwlock path; if it sees a
207//!   newer value, the entire range it now reads is by definition immutable.
208//!
209//! * Locked-region checks on writers are evaluated **under the write lock**,
210//!   so they cannot race against a concurrent `lock_up_to` extending the
211//!   boundary across the write target.
212//!
213//! ## Typical use
214//!
215//! ```no_run
216//! use bstack::BStack;
217//!
218//! # fn main() -> std::io::Result<()> {
219//! // A fixed 64-byte metadata block at the head of the file, read by many
220//! // threads but never modified after first write.
221//! let stack = BStack::open_locked_up_to("meta.bin", 64)?;
222//! assert_eq!(stack.locked_len(), 64);
223//!
224//! // Reads of the metadata bypass the rwlock on Unix and Windows.
225//! let header = stack.get(0, 64)?;
226//! # let _ = header;
227//! # Ok(())
228//! # }
229//! ```
230//!
231//! On platforms other than Unix and Windows the boundary still enforces
232//! immutability through the rwlock path; only the lock-free read fast path
233//! is platform-gated.
234//!
235//! # Standard I/O adapters
236//!
237//! ## Writing
238//!
239//! `BStack` implements [`std::io::Write`] (and so does `&BStack`, mirroring
240//! [`std::io::Write` for `&File`]).  Each call to `write` is forwarded to
241//! [`push`](BStack::push), so every write is atomically appended and durably
242//! synced before returning.  `flush` is a no-op.
243//!
244//! ```no_run
245//! use std::io::Write;
246//! use bstack::BStack;
247//!
248//! # fn main() -> std::io::Result<()> {
249//! let mut stack = BStack::open("log.bin")?;
250//! stack.write_all(b"hello")?;
251//! stack.write_all(b"world")?;
252//! # Ok(())
253//! # }
254//! ```
255//!
256//! ## Reading
257//!
258//! [`BStackReader`] wraps a `&BStack` with a cursor and implements
259//! [`std::io::Read`] and [`std::io::Seek`].  Use [`BStack::reader`] or
260//! [`BStack::reader_at`] to construct one.
261//!
262//! ```no_run
263//! use std::io::{Read, Seek, SeekFrom};
264//! use bstack::BStack;
265//!
266//! # fn main() -> std::io::Result<()> {
267//! let stack = BStack::open("log.bin")?;
268//! stack.push(b"hello world")?;
269//!
270//! let mut reader = stack.reader();
271//! let mut buf = [0u8; 5];
272//! reader.read_exact(&mut buf)?;  // b"hello"
273//! reader.seek(SeekFrom::Start(6))?;
274//! reader.read_exact(&mut buf)?;  // b"world"
275//! # Ok(())
276//! # }
277//! ```
278//!
279//! # Trait implementations
280//!
281//! ## `BStack`
282//!
283//! | Trait | Semantics |
284//! |-------|-----------|
285//! | `Debug` | Shows `version` (semver string from the magic header, e.g. `"0.1.6"`) and `len` (`Option<u64>`, `None` on I/O failure). |
286//! | `PartialEq` / `Eq` | **Pointer identity.** Two values are equal iff they are the same instance. No two distinct `BStack` values in one process can refer to the same file. |
287//! | `Hash` | Hashes the instance address — consistent with pointer-identity `PartialEq`. |
288//!
289//! ## `BStackReader`
290//!
291//! | Trait | Semantics |
292//! |-------|-----------|
293//! | `PartialEq` / `Eq` | Equal when both the `BStack` pointer (identity) and the cursor `offset` match. |
294//! | `Hash` | Hashes `(BStack pointer, offset)` — consistent with `PartialEq`. |
295//! | `PartialOrd` / `Ord` | Ordered by `BStack` instance address, then by cursor `offset`. Groups all readers over the same stack and within that group orders by position. |
296//!
297//! # Feature flags
298//!
299//! | Feature | Description |
300//! |---------|-------------|
301//! | `set`   | Enables [`BStack::set`] and [`BStack::zero`] — in-place overwrite of existing payload bytes (or with zeros) without changing the file size. |
302//! | `alloc` | Enables [`BStackAllocator`], [`BStackBulkAllocator`], [`BStackSlice`], [`BStackSliceReader`], and [`LinearBStackAllocator`] — region-based allocation over a `BStack` payload. |
303//! | `atomic` | Enables [`BStack::atrunc`], [`BStack::splice`], [`BStack::splice_into`], [`BStack::try_extend`], [`BStack::try_discard`], and [`BStack::replace`] — compound read-modify-write operations that hold the write lock across what would otherwise be separate calls. Combined with `set`, also enables [`BStack::swap`], [`BStack::swap_into`], [`BStack::cas`], and [`BStack::process`]. |
304//!
305//! Enable with:
306//!
307//! ```toml
308//! [dependencies]
309//! bstack = { version = "0.1", features = ["set"] }
310//! # or
311//! bstack = { version = "0.1", features = ["alloc"] }
312//! # or both
313//! bstack = { version = "0.1", features = ["alloc", "set"] }
314//! ```
315//!
316//! # Allocator (`alloc` feature)
317//!
318//! The `alloc` feature adds a region-management layer on top of [`BStack`].
319//!
320//! ## Key types
321//!
322//! * [`BStackAllocator`] — trait for types that own a [`BStack`] and manage
323//!   contiguous byte regions within its payload.  Requires `stack()`,
324//!   `into_stack()`, `alloc()`, and `realloc()`; provides a default no-op
325//!   `dealloc()` and delegation helpers `len()` / `is_empty()`.
326//!
327//! * [`BStackBulkAllocator`] — extension trait for [`BStackAllocator`] that
328//!   adds atomic bulk operations.  Both methods are required with no default; on error
329//!   the backing store is left unchanged unless a crash occur.
330//!
331//! * [`BStackSlice`]`<'a, A>` — lightweight `Copy` handle (allocator reference +
332//!   offset + length) to a contiguous region.  Exposes `read`, `read_into`,
333//!   `read_range_into`, `subslice`, `subslice_range`, `reader`, `reader_at`,
334//!   and (with the `set` feature) `write`, `write_range`, `zero`, `zero_range`.
335//!
336//! * [`BStackSliceReader`]`<'a, A>` — cursor-based reader over a
337//!   [`BStackSlice`], implementing [`io::Read`] and [`io::Seek`] in the
338//!   slice's coordinate space.
339//!
340//! * [`LinearBStackAllocator`] — reference bump allocator that appends regions
341//!   sequentially.  `realloc` is O(1) for the tail allocation and returns
342//!   `Unsupported` for non-tail slices.  `dealloc` reclaims the tail via
343//!   [`BStack::discard`]; non-tail deallocations are a no-op.  Every operation
344//!   maps to exactly one [`BStack`] call and is crash-safe by inheritance.
345//!   Implements [`BStackAllocator`] and [`BStackBulkAllocator`].
346//!
347//! * [`FirstFitBStackAllocator`] — Experimental: a persistent first-fit free-list allocator
348//!   that reuses freed regions to prevent unbounded file growth.  Requires both
349//!   `alloc` and `set` features.
350//!
351//! * [`GhostTreeBstackAllocator`] — A pure-AVL general-purpose allocator with
352//!   zero-overhead live allocations.  Free blocks store their AVL node inline,
353//!   and the tree is keyed on `(size, address)` for best-fit allocation.
354//!   Provides O(log n) allocation and deallocation with crash recovery through
355//!   tree rebalancing on mount.
356//!
357//! ## Lifetime model
358//!
359//! `BStackSlice<'a, A>` borrows the **allocator** for `'a`, not the
360//! [`BStack`] directly.  As a result the borrow checker statically prevents
361//! calling [`BStackAllocator::into_stack`] — which consumes the allocator by
362//! value — while any slice is still in scope.
363//!
364//! ## Quick example
365//!
366//! ```skip
367//! use bstack::{BStack, BStackAllocator, LinearBStackAllocator};
368//!
369//! # fn main() -> std::io::Result<()> {
370//! let alloc = LinearBStackAllocator::new(BStack::open("data.bstack")?);
371//!
372//! let slice = alloc.alloc(128)?;          // reserve 128 zero bytes
373//! let data  = slice.read()?;              // read them back
374//! alloc.dealloc(slice)?;                  // release (tail, so O(1))
375//!
376//! let stack = alloc.into_stack();         // reclaim the BStack
377//! # Ok(())
378//! # }
379//! ```
380//!
381//! # Examples
382//!
383//! ```no_run
384//! use bstack::BStack;
385//!
386//! # fn main() -> std::io::Result<()> {
387//! let stack = BStack::open("log.bin")?;
388//!
389//! // push returns the logical byte offset where the payload starts.
390//! let off0 = stack.push(b"hello")?;  // 0
391//! let off1 = stack.push(b"world")?;  // 5
392//!
393//! assert_eq!(stack.len()?, 10);
394//!
395//! // peek reads from a logical offset to the end without removing anything.
396//! assert_eq!(stack.peek(off1)?, b"world");
397//!
398//! // get reads an arbitrary half-open logical byte range.
399//! assert_eq!(stack.get(3, 8)?, b"lowor");
400//!
401//! // pop removes bytes from the tail and returns them.
402//! assert_eq!(stack.pop(5)?, b"world");
403//! assert_eq!(stack.len()?, 5);
404//! # Ok(())
405//! # }
406//! ```
407
408#[cfg(all(test, feature = "alloc", feature = "set"))]
409mod alloc_fuzz_tests;
410mod test;
411
412#[cfg(feature = "alloc")]
413mod alloc;
414#[cfg(feature = "alloc")]
415pub use alloc::{
416    BStackAllocator, BStackBulkAllocator, BStackSlice, BStackSliceAllocator, BStackSliceReader,
417    LinearBStackAllocator, ManualAllocator,
418};
419#[cfg(all(feature = "alloc", feature = "set"))]
420pub use alloc::{BStackSliceWriter, FirstFitBStackAllocator, GhostTreeBstackAllocator};
421
422#[cfg(all(feature = "guarded", feature = "atomic"))]
423pub use alloc::{BStackAtomicGuardedSlice, BStackAtomicGuardedSliceSubview};
424#[cfg(feature = "guarded")]
425pub use alloc::{BStackGuardedSlice, BStackGuardedSliceSubview};
426
427use std::fmt;
428use std::fs::{File, OpenOptions};
429use std::hash::{Hash, Hasher};
430use std::io::{self, Read, Seek, SeekFrom, Write};
431use std::path::Path;
432use std::sync::RwLock;
433use std::sync::atomic::{AtomicU64, Ordering};
434
435#[cfg(unix)]
436use std::os::unix::fs::FileExt;
437#[cfg(unix)]
438use std::os::unix::io::AsRawFd;
439#[cfg(unix)]
440use std::os::unix::io::RawFd;
441
442#[cfg(windows)]
443use std::os::windows::fs::FileExt as WindowsFileExt;
444#[cfg(windows)]
445use std::os::windows::io::AsRawHandle;
446#[cfg(windows)]
447use windows_sys::Win32::Foundation::HANDLE;
448#[cfg(windows)]
449use windows_sys::Win32::Storage::FileSystem::{
450    LOCKFILE_EXCLUSIVE_LOCK, LOCKFILE_FAIL_IMMEDIATELY, LockFileEx, ReadFile,
451};
452#[cfg(windows)]
453use windows_sys::Win32::System::IO::OVERLAPPED;
454
455/// Full magic for files written by this version (`BSTK` + major 0 + minor 1 + patch 10 + 0).
456const MAGIC: [u8; 8] = *b"BSTK\x00\x01\x0a\x00";
457
458/// Compatibility prefix checked on open: `BSTK` + major 0 + minor 1.
459/// Any file whose first 6 bytes match is considered a compatible 0.1.x file.
460const MAGIC_PREFIX: [u8; 6] = *b"BSTK\x00\x01";
461
462/// Bytes occupied by the file header (magic[8] + committed_len[8]).
463const HEADER_SIZE: u64 = 16;
464
465/// Flush all in-flight writes to stable storage.
466///
467/// On macOS this uses `F_FULLFSYNC` to flush the drive's hardware write cache,
468/// which `fdatasync` alone does not guarantee.  Falls back to `sync_data` if
469/// `F_FULLFSYNC` returns an error (e.g. the device doesn't support it).
470/// On all other platforms this delegates to `sync_data` (`fdatasync`).
471fn durable_sync(file: &File) -> io::Result<()> {
472    #[cfg(target_os = "macos")]
473    {
474        let ret = unsafe { libc::fcntl(file.as_raw_fd(), libc::F_FULLFSYNC) };
475        if ret != -1 {
476            return Ok(());
477        }
478        // Device does not support F_FULLFSYNC; fall back to fdatasync.
479    }
480    file.sync_data()
481}
482
483/// Acquire an exclusive, non-blocking advisory flock on `file`.
484///
485/// Returns `Err(WouldBlock)` if another process already holds the lock.
486#[cfg(unix)]
487fn flock_exclusive(file: &File) -> io::Result<()> {
488    let ret = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX | libc::LOCK_NB) };
489    if ret == 0 {
490        Ok(())
491    } else {
492        Err(io::Error::last_os_error())
493    }
494}
495
496/// Acquire an exclusive, non-blocking `LockFileEx` lock on `file`.
497///
498/// Locks the entire file range (offset 0, length `u64::MAX`).
499/// Returns `Err(WouldBlock)` if another process already holds the lock
500/// (`ERROR_LOCK_VIOLATION` maps to `io::ErrorKind::WouldBlock` in Rust).
501#[cfg(windows)]
502fn lock_file_exclusive(file: &File) -> io::Result<()> {
503    let handle = file.as_raw_handle() as windows_sys::Win32::Foundation::HANDLE;
504    // OVERLAPPED is required by LockFileEx even for synchronous handles.
505    // Offset fields (0, 0) anchor the lock at byte 0 of the file.
506    let mut overlapped: OVERLAPPED = unsafe { std::mem::zeroed() };
507    let ret = unsafe {
508        LockFileEx(
509            handle,
510            LOCKFILE_EXCLUSIVE_LOCK | LOCKFILE_FAIL_IMMEDIATELY,
511            0,        // reserved, must be zero
512            u32::MAX, // nNumberOfBytesToLockLow  ─┐ lock entire
513            u32::MAX, // nNumberOfBytesToLockHigh ─┘ file space
514            &mut overlapped,
515        )
516    };
517    if ret != 0 {
518        Ok(())
519    } else {
520        Err(io::Error::last_os_error())
521    }
522}
523
524/// Write the 16-byte header into a brand-new (empty) file.
525fn init_header(file: &mut File) -> io::Result<()> {
526    file.seek(SeekFrom::Start(0))?;
527    file.write_all(&MAGIC)?;
528    file.write_all(&0u64.to_le_bytes())
529}
530
531/// Overwrite the committed-length field at file offset 8.
532fn write_committed_len(file: &mut File, len: u64) -> io::Result<()> {
533    file.seek(SeekFrom::Start(8))?;
534    file.write_all(&len.to_le_bytes())
535}
536
537/// Read `len` bytes from absolute file position `offset` without modifying
538/// the file-position cursor, so the caller only needs a shared (read) lock.
539///
540/// On Unix this uses `pread(2)` via `read_exact_at`.
541/// On Windows this uses `ReadFile` with an `OVERLAPPED` offset (via
542/// `seek_read`), which is also cursor-safe on synchronous handles.
543#[cfg(unix)]
544fn pread_exact(file: &File, offset: u64, len: usize) -> io::Result<Vec<u8>> {
545    let mut buf = vec![0u8; len];
546    file.read_exact_at(&mut buf, offset)?;
547    Ok(buf)
548}
549
550/// Windows counterpart of `pread_exact` — see the shared doc comment above.
551#[cfg(windows)]
552fn pread_exact(file: &File, offset: u64, len: usize) -> io::Result<Vec<u8>> {
553    let mut buf = vec![0u8; len];
554    let mut filled = 0usize;
555    while filled < len {
556        let n = file.seek_read(&mut buf[filled..], offset + filled as u64)?;
557        if n == 0 {
558            return Err(io::Error::new(
559                io::ErrorKind::UnexpectedEof,
560                "pread_exact: unexpected EOF",
561            ));
562        }
563        filled += n;
564    }
565    Ok(buf)
566}
567
568/// Fill `buf` from absolute file position `offset` without modifying the
569/// file-position cursor.  Unix uses `pread(2)` via `read_exact_at`;
570/// Windows uses `ReadFile` with an `OVERLAPPED` offset via `seek_read`.
571#[cfg(unix)]
572fn pread_exact_into(file: &File, offset: u64, buf: &mut [u8]) -> io::Result<()> {
573    file.read_exact_at(buf, offset)
574}
575
576/// Windows counterpart of `pread_exact_into`.
577#[cfg(windows)]
578fn pread_exact_into(file: &File, offset: u64, buf: &mut [u8]) -> io::Result<()> {
579    let len = buf.len();
580    let mut filled = 0usize;
581    while filled < len {
582        let n = file.seek_read(&mut buf[filled..], offset + filled as u64)?;
583        if n == 0 {
584            return Err(io::Error::new(
585                io::ErrorKind::UnexpectedEof,
586                "pread_exact_into: unexpected EOF",
587            ));
588        }
589        filled += n;
590    }
591    Ok(())
592}
593
594/// Lock-free positional read using a raw file descriptor (Unix).
595///
596/// Calls `pread(2)` directly, bypassing the `RwLock<File>`.  Safe only when
597/// the target range is within the immutable locked region, ensuring no
598/// concurrent writer can touch those bytes.
599#[cfg(unix)]
600fn pread_exact_raw(fd: RawFd, offset: u64, buf: &mut [u8]) -> io::Result<()> {
601    let mut filled = 0usize;
602    while filled < buf.len() {
603        let n = unsafe {
604            libc::pread(
605                fd,
606                buf[filled..].as_mut_ptr() as *mut libc::c_void,
607                buf.len() - filled,
608                (offset + filled as u64) as libc::off_t,
609            )
610        };
611        if n < 0 {
612            return Err(io::Error::last_os_error());
613        }
614        if n == 0 {
615            return Err(io::Error::new(
616                io::ErrorKind::UnexpectedEof,
617                "locked pread: unexpected EOF",
618            ));
619        }
620        filled += n as usize;
621    }
622    Ok(())
623}
624
625/// Lock-free positional read using a raw Windows HANDLE.
626///
627/// Calls `ReadFile` with an `OVERLAPPED` offset, bypassing the `RwLock<File>`.
628/// Safe under the same invariant as `pread_exact_raw`.
629#[cfg(windows)]
630fn pread_exact_raw_handle(handle: isize, offset: u64, buf: &mut [u8]) -> io::Result<()> {
631    let handle = handle as HANDLE;
632    let mut filled = 0usize;
633    let len = buf.len();
634    while filled < len {
635        let current_offset = offset + filled as u64;
636        let mut overlapped: OVERLAPPED = unsafe { std::mem::zeroed() };
637        // SAFETY: the Anonymous/Anonymous path exists in the windows-sys OVERLAPPED layout.
638        overlapped.Anonymous.Anonymous.Offset = current_offset as u32;
639        overlapped.Anonymous.Anonymous.OffsetHigh = (current_offset >> 32) as u32;
640        let mut bytes_read: u32 = 0;
641        let ret = unsafe {
642            ReadFile(
643                handle,
644                buf[filled..].as_mut_ptr(),
645                (len - filled) as u32,
646                &mut bytes_read,
647                &mut overlapped,
648            )
649        };
650        if ret == 0 {
651            return Err(io::Error::last_os_error());
652        }
653        if bytes_read == 0 {
654            return Err(io::Error::new(
655                io::ErrorKind::UnexpectedEof,
656                "locked ReadFile: unexpected EOF",
657            ));
658        }
659        filled += bytes_read as usize;
660    }
661    Ok(())
662}
663
664/// Read and validate the header; return the committed payload length.
665fn read_header(file: &mut File) -> io::Result<u64> {
666    file.seek(SeekFrom::Start(0))?;
667    let mut hdr = [0u8; 16];
668    file.read_exact(&mut hdr)?;
669    if hdr[0..6] != MAGIC_PREFIX {
670        return Err(io::Error::new(
671            io::ErrorKind::InvalidData,
672            "bstack: bad magic number — not a bstack file or incompatible version",
673        ));
674    }
675    Ok(u64::from_le_bytes(hdr[8..16].try_into().unwrap()))
676}
677
678// ---------------------------------------------------------------------------
679
680/// A persistent, fsync-durable binary stack backed by a single file.
681///
682/// See the [crate-level documentation](crate) for the file format, durability
683/// guarantees, crash recovery, multi-process safety, and thread-safety model.
684pub struct BStack {
685    lock: RwLock<File>,
686    /// Monotonically growing partition boundary.  Bytes in `[0, locked)` are
687    /// immutable and can be read without the rwlock on supported platforms.
688    /// Not persisted — resets to 0 on every open.
689    locked: AtomicU64,
690    /// Copy of the raw file descriptor used for lock-free positional reads
691    /// on the locked region.  The `File` inside `lock` retains ownership and
692    /// will close the descriptor when `BStack` is dropped.
693    #[cfg(unix)]
694    fd: RawFd,
695    /// Copy of the Windows HANDLE stored as `isize` so the field is
696    /// `Send + Sync`.  Same lifetime guarantee as `fd` above.
697    #[cfg(windows)]
698    handle: isize,
699}
700
701// `BStack` is auto-`Send + Sync` on every platform: all fields
702// (`RwLock<File>`, `AtomicU64`, and the `RawFd` / `isize` handle) already
703// implement both traits.  The lock-free `pread` / `ReadFile`+`OVERLAPPED`
704// paths are cursor-independent and safe to call from any thread, and the raw
705// fd / handle remains valid for as long as `BStack` owns the `File`.
706
707impl BStack {
708    /// Open or create a stack file at `path`.
709    ///
710    /// On a **new** file the 16-byte header is written and durably synced
711    /// before returning.
712    ///
713    /// On an **existing** file the header is validated and, if a previous crash
714    /// left the file in an inconsistent state, the file is repaired and durably
715    /// synced before returning (see *Crash recovery* in the crate docs).
716    ///
717    /// On Unix an **exclusive advisory `flock`** is acquired; if another
718    /// process already holds the lock this function returns immediately with
719    /// [`io::ErrorKind::WouldBlock`].
720    ///
721    /// # Errors
722    ///
723    /// * [`io::ErrorKind::WouldBlock`] — another process holds the exclusive
724    ///   lock (Unix only).
725    /// * [`io::ErrorKind::InvalidData`] — the file exists but its header magic
726    ///   is wrong (not a bstack file, or created by an incompatible version),
727    ///   or the file is too short to contain a valid header.
728    /// * Any [`io::Error`] from [`OpenOptions::open`], `read`, `write`, or
729    ///   `durable_sync`.
730    pub fn open(path: impl AsRef<Path>) -> io::Result<Self> {
731        let mut file = OpenOptions::new()
732            .read(true)
733            .write(true)
734            .create(true)
735            .truncate(false)
736            .open(path)?;
737
738        #[cfg(unix)]
739        flock_exclusive(&file)?;
740
741        #[cfg(windows)]
742        lock_file_exclusive(&file)?;
743
744        let raw_size = file.metadata()?.len();
745
746        if raw_size == 0 {
747            init_header(&mut file)?;
748            durable_sync(&file)?;
749        } else if raw_size < HEADER_SIZE {
750            return Err(io::Error::new(
751                io::ErrorKind::InvalidData,
752                format!(
753                    "bstack: file is {raw_size} bytes — too small to contain the 16-byte header"
754                ),
755            ));
756        } else {
757            let committed_len = read_header(&mut file)?;
758            let actual_data_len = raw_size - HEADER_SIZE;
759            if actual_data_len != committed_len {
760                // Recover: use whichever length is smaller (the committed
761                // value is the last successfully synced boundary).
762                let correct_len = committed_len.min(actual_data_len);
763                file.set_len(HEADER_SIZE + correct_len)?;
764                write_committed_len(&mut file, correct_len)?;
765                durable_sync(&file)?;
766            }
767        }
768
769        #[cfg(unix)]
770        let fd = file.as_raw_fd();
771        #[cfg(windows)]
772        let handle = file.as_raw_handle() as isize;
773
774        Ok(BStack {
775            #[cfg(unix)]
776            fd,
777            #[cfg(windows)]
778            handle,
779            lock: RwLock::new(file),
780            locked: AtomicU64::new(0),
781        })
782    }
783
784    /// Append `data` to the end of the file.
785    ///
786    /// Returns the **logical** byte offset at which `data` begins — i.e. the
787    /// payload size immediately before the write.  An empty slice is valid; it
788    /// writes nothing and returns the current end offset.
789    ///
790    /// # Atomicity
791    ///
792    /// Either the full payload is written, the header committed-length is
793    /// updated, and the whole thing is durably synced, or the file is
794    /// left unchanged (best-effort rollback via `ftruncate` + header reset).
795    ///
796    /// # Errors
797    ///
798    /// Returns any [`io::Error`] from `write_all`, `durable_sync`, or the
799    /// fallback `set_len`.
800    pub fn push(&self, data: impl AsRef<[u8]>) -> io::Result<u64> {
801        let data = data.as_ref();
802        let mut file = self.lock.write().unwrap();
803        let file_end = file.seek(SeekFrom::End(0))?;
804        let logical_offset = file_end - HEADER_SIZE;
805
806        if data.is_empty() {
807            return Ok(logical_offset);
808        }
809
810        if let Err(e) = file.write_all(data) {
811            let _ = file.set_len(file_end);
812            return Err(e);
813        }
814
815        let new_len = logical_offset + data.len() as u64;
816        if let Err(e) = write_committed_len(&mut file, new_len).and_then(|_| durable_sync(&file)) {
817            // Roll back: truncate data and reset header.
818            let _ = file.set_len(file_end);
819            let _ = write_committed_len(&mut file, logical_offset);
820            return Err(e);
821        }
822
823        Ok(logical_offset)
824    }
825
826    /// Append `n` zero bytes to the end of the file.
827    ///
828    /// Returns the **logical** byte offset at which the zeros begin — i.e. the
829    /// payload size immediately before the write.  `n = 0` is valid; it writes
830    /// nothing and returns the current end offset.
831    ///
832    /// # Atomicity
833    ///
834    /// Either the file is extended, the header committed-length is updated,
835    /// and the whole thing is durably synced, or the file is left unchanged
836    /// (best-effort rollback via `ftruncate` + header reset).
837    ///
838    /// # Errors
839    ///
840    /// Returns any [`io::Error`] from `set_len`, `durable_sync`, or the
841    /// fallback `set_len`.
842    pub fn extend(&self, n: u64) -> io::Result<u64> {
843        let mut file = self.lock.write().unwrap();
844        let file_end = file.seek(SeekFrom::End(0))?;
845        let logical_offset = file_end - HEADER_SIZE;
846
847        if n == 0 {
848            return Ok(logical_offset);
849        }
850
851        let new_file_end = file_end + n;
852        file.set_len(new_file_end)?;
853
854        let new_len = logical_offset + n;
855        if let Err(e) = write_committed_len(&mut file, new_len).and_then(|_| durable_sync(&file)) {
856            // Roll back: truncate and reset header.
857            let _ = file.set_len(file_end);
858            let _ = write_committed_len(&mut file, logical_offset);
859            return Err(e);
860        }
861
862        Ok(logical_offset)
863    }
864
865    /// Remove and return the last `n` bytes of the file.
866    ///
867    /// `n = 0` is valid: no bytes are removed and an empty `Vec` is returned.
868    /// `n` may span across multiple previous [`push`](Self::push) boundaries.
869    ///
870    /// # Atomicity
871    ///
872    /// The bytes are read before the file is truncated.  The committed-length
873    /// in the header is updated and durably synced after the truncation.
874    ///
875    /// # Errors
876    ///
877    /// Returns [`io::ErrorKind::InvalidInput`] if `n` exceeds the current
878    /// payload size.  Also propagates any I/O error from `read_exact`,
879    /// `set_len`, `write_all`, or `durable_sync`.
880    pub fn pop(&self, n: u64) -> io::Result<Vec<u8>> {
881        let mut file = self.lock.write().unwrap();
882        let raw_size = file.seek(SeekFrom::End(0))?;
883        let data_size = raw_size - HEADER_SIZE;
884        if n > data_size {
885            return Err(io::Error::new(
886                io::ErrorKind::InvalidInput,
887                format!("pop({n}) exceeds payload size ({data_size})"),
888            ));
889        }
890        let new_data_len = data_size - n;
891        let locked = self.locked.load(Ordering::Acquire);
892        if new_data_len < locked {
893            return Err(io::Error::new(
894                io::ErrorKind::InvalidInput,
895                format!("pop({n}) would shrink payload below locked length ({locked})"),
896            ));
897        }
898        file.seek(SeekFrom::Start(HEADER_SIZE + new_data_len))?;
899        let mut buf = vec![0u8; n as usize];
900        file.read_exact(&mut buf)?;
901        file.set_len(HEADER_SIZE + new_data_len)?;
902        write_committed_len(&mut file, new_data_len)?;
903        durable_sync(&file)?;
904        Ok(buf)
905    }
906
907    /// Return a copy of every payload byte from `offset` to the end of the
908    /// file.
909    ///
910    /// `offset` is a **logical** offset (as returned by [`push`](Self::push)).
911    /// `offset == len()` is valid and returns an empty `Vec`.  The file is not
912    /// modified.
913    ///
914    /// # Concurrency
915    ///
916    /// On Unix and Windows this uses a cursor-safe positional read (`pread(2)`
917    /// on Unix; `ReadFile`+`OVERLAPPED` on Windows), so the method takes only
918    /// the **read lock**, allowing multiple concurrent `peek` and `get` calls
919    /// to run in parallel.
920    ///
921    /// On other platforms a seek is required; the method falls back to the
922    /// write lock and concurrent reads serialise.
923    ///
924    /// # Errors
925    ///
926    /// Returns [`io::ErrorKind::InvalidInput`] if `offset` exceeds the current
927    /// payload size.
928    pub fn peek(&self, offset: u64) -> io::Result<Vec<u8>> {
929        #[cfg(any(unix, windows))]
930        {
931            let file = self.lock.read().unwrap();
932            let data_size = file.metadata()?.len().saturating_sub(HEADER_SIZE);
933            if offset > data_size {
934                return Err(io::Error::new(
935                    io::ErrorKind::InvalidInput,
936                    format!("peek offset ({offset}) exceeds payload size ({data_size})"),
937                ));
938            }
939            pread_exact(&file, HEADER_SIZE + offset, (data_size - offset) as usize)
940        }
941        #[cfg(not(any(unix, windows)))]
942        {
943            let mut file = self.lock.write().unwrap();
944            let raw_size = file.seek(SeekFrom::End(0))?;
945            let data_size = raw_size.saturating_sub(HEADER_SIZE);
946            if offset > data_size {
947                return Err(io::Error::new(
948                    io::ErrorKind::InvalidInput,
949                    format!("peek offset ({offset}) exceeds payload size ({data_size})"),
950                ));
951            }
952            file.seek(SeekFrom::Start(HEADER_SIZE + offset))?;
953            let mut buf = vec![0u8; (data_size - offset) as usize];
954            file.read_exact(&mut buf)?;
955            Ok(buf)
956        }
957    }
958
959    /// Return a copy of the bytes in the half-open logical range `[start, end)`.
960    ///
961    /// `start == end` is valid and returns an empty `Vec`.  The file is not
962    /// modified.
963    ///
964    /// # Concurrency
965    ///
966    /// Same as [`peek`](Self::peek): on Unix and Windows the read lock is
967    /// taken and concurrent `get`/`peek`/`len` calls may run in parallel.  On
968    /// other platforms the write lock is taken and reads serialise.
969    ///
970    /// # Errors
971    ///
972    /// Returns [`io::ErrorKind::InvalidInput`] if `end < start` or if `end`
973    /// exceeds the current payload size.
974    pub fn get(&self, start: u64, end: u64) -> io::Result<Vec<u8>> {
975        if end < start {
976            return Err(io::Error::new(
977                io::ErrorKind::InvalidInput,
978                format!("get: end ({end}) < start ({start})"),
979            ));
980        }
981        // Fast-path: if the range lies entirely within the locked region,
982        // skip the rwlock — locked bytes are immutable so no lock is needed.
983        #[cfg(unix)]
984        {
985            let locked = self.locked.load(Ordering::Acquire);
986            if end <= locked {
987                let mut buf = vec![0u8; (end - start) as usize];
988                pread_exact_raw(self.fd, HEADER_SIZE + start, &mut buf)?;
989                return Ok(buf);
990            }
991        }
992        #[cfg(windows)]
993        {
994            let locked = self.locked.load(Ordering::Acquire);
995            if end <= locked {
996                let mut buf = vec![0u8; (end - start) as usize];
997                pread_exact_raw_handle(self.handle, HEADER_SIZE + start, &mut buf)?;
998                return Ok(buf);
999            }
1000        }
1001        #[cfg(any(unix, windows))]
1002        {
1003            let file = self.lock.read().unwrap();
1004            let data_size = file.metadata()?.len().saturating_sub(HEADER_SIZE);
1005            if end > data_size {
1006                return Err(io::Error::new(
1007                    io::ErrorKind::InvalidInput,
1008                    format!("get: end ({end}) exceeds payload size ({data_size})"),
1009                ));
1010            }
1011            pread_exact(&file, HEADER_SIZE + start, (end - start) as usize)
1012        }
1013        #[cfg(not(any(unix, windows)))]
1014        {
1015            let mut file = self.lock.write().unwrap();
1016            let raw_size = file.seek(SeekFrom::End(0))?;
1017            let data_size = raw_size.saturating_sub(HEADER_SIZE);
1018            if end > data_size {
1019                return Err(io::Error::new(
1020                    io::ErrorKind::InvalidInput,
1021                    format!("get: end ({end}) exceeds payload size ({data_size})"),
1022                ));
1023            }
1024            file.seek(SeekFrom::Start(HEADER_SIZE + start))?;
1025            let mut buf = vec![0u8; (end - start) as usize];
1026            file.read_exact(&mut buf)?;
1027            Ok(buf)
1028        }
1029    }
1030
1031    /// Fill `buf` with bytes from logical `offset` to `offset + buf.len()`.
1032    ///
1033    /// Reads exactly `buf.len()` bytes from `offset` into the caller-supplied
1034    /// buffer.  An empty buffer is a valid no-op.  The file is not modified.
1035    ///
1036    /// Use this instead of [`peek`](Self::peek) when the destination buffer is
1037    /// already allocated and you want to avoid the extra heap allocation.
1038    ///
1039    /// # Concurrency
1040    ///
1041    /// Same as [`peek`](Self::peek): on Unix and Windows only the read lock is
1042    /// taken; on other platforms the write lock serialises all reads.
1043    ///
1044    /// # Errors
1045    ///
1046    /// Returns [`io::ErrorKind::InvalidInput`] if `offset + buf.len()` overflows
1047    /// `u64` or exceeds the current payload size.
1048    pub fn peek_into(&self, offset: u64, buf: &mut [u8]) -> io::Result<()> {
1049        if buf.is_empty() {
1050            return Ok(());
1051        }
1052        let len = buf.len() as u64;
1053        let end = offset.checked_add(len).ok_or_else(|| {
1054            io::Error::new(
1055                io::ErrorKind::InvalidInput,
1056                "peek_into: offset + len overflows u64",
1057            )
1058        })?;
1059        // Fast-path: locked region is immutable, skip the rwlock.
1060        #[cfg(unix)]
1061        {
1062            let locked = self.locked.load(Ordering::Acquire);
1063            if end <= locked {
1064                return pread_exact_raw(self.fd, HEADER_SIZE + offset, buf);
1065            }
1066        }
1067        #[cfg(windows)]
1068        {
1069            let locked = self.locked.load(Ordering::Acquire);
1070            if end <= locked {
1071                return pread_exact_raw_handle(self.handle, HEADER_SIZE + offset, buf);
1072            }
1073        }
1074        #[cfg(any(unix, windows))]
1075        {
1076            let file = self.lock.read().unwrap();
1077            let data_size = file.metadata()?.len().saturating_sub(HEADER_SIZE);
1078            if end > data_size {
1079                return Err(io::Error::new(
1080                    io::ErrorKind::InvalidInput,
1081                    format!(
1082                        "peek_into: range [{offset}, {end}) exceeds payload size ({data_size})"
1083                    ),
1084                ));
1085            }
1086            pread_exact_into(&file, HEADER_SIZE + offset, buf)
1087        }
1088        #[cfg(not(any(unix, windows)))]
1089        {
1090            let mut file = self.lock.write().unwrap();
1091            let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE);
1092            if end > data_size {
1093                return Err(io::Error::new(
1094                    io::ErrorKind::InvalidInput,
1095                    format!(
1096                        "peek_into: range [{offset}, {end}) exceeds payload size ({data_size})"
1097                    ),
1098                ));
1099            }
1100            file.seek(SeekFrom::Start(HEADER_SIZE + offset))?;
1101            file.read_exact(buf)
1102        }
1103    }
1104
1105    /// Fill `buf` with bytes from the half-open logical range
1106    /// `[start, start + buf.len())`.
1107    ///
1108    /// An empty buffer is a valid no-op.  The file is not modified.
1109    ///
1110    /// Use this instead of [`get`](Self::get) when the destination buffer is
1111    /// already allocated and you want to avoid the extra heap allocation.
1112    ///
1113    /// # Concurrency
1114    ///
1115    /// Same as [`get`](Self::get): on Unix and Windows only the read lock is
1116    /// taken; on other platforms the write lock serialises all reads.
1117    ///
1118    /// # Errors
1119    ///
1120    /// Returns [`io::ErrorKind::InvalidInput`] if `start + buf.len()` overflows
1121    /// `u64` or exceeds the current payload size.
1122    pub fn get_into(&self, start: u64, buf: &mut [u8]) -> io::Result<()> {
1123        if buf.is_empty() {
1124            return Ok(());
1125        }
1126        let len = buf.len() as u64;
1127        let end = start.checked_add(len).ok_or_else(|| {
1128            io::Error::new(
1129                io::ErrorKind::InvalidInput,
1130                "get_into: start + len overflows u64",
1131            )
1132        })?;
1133        // Fast-path: locked region is immutable, skip the rwlock.
1134        #[cfg(unix)]
1135        {
1136            let locked = self.locked.load(Ordering::Acquire);
1137            if end <= locked {
1138                return pread_exact_raw(self.fd, HEADER_SIZE + start, buf);
1139            }
1140        }
1141        #[cfg(windows)]
1142        {
1143            let locked = self.locked.load(Ordering::Acquire);
1144            if end <= locked {
1145                return pread_exact_raw_handle(self.handle, HEADER_SIZE + start, buf);
1146            }
1147        }
1148        #[cfg(any(unix, windows))]
1149        {
1150            let file = self.lock.read().unwrap();
1151            let data_size = file.metadata()?.len().saturating_sub(HEADER_SIZE);
1152            if end > data_size {
1153                return Err(io::Error::new(
1154                    io::ErrorKind::InvalidInput,
1155                    format!("get_into: end ({end}) exceeds payload size ({data_size})"),
1156                ));
1157            }
1158            pread_exact_into(&file, HEADER_SIZE + start, buf)
1159        }
1160        #[cfg(not(any(unix, windows)))]
1161        {
1162            let mut file = self.lock.write().unwrap();
1163            let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE);
1164            if end > data_size {
1165                return Err(io::Error::new(
1166                    io::ErrorKind::InvalidInput,
1167                    format!("get_into: end ({end}) exceeds payload size ({data_size})"),
1168                ));
1169            }
1170            file.seek(SeekFrom::Start(HEADER_SIZE + start))?;
1171            file.read_exact(buf)
1172        }
1173    }
1174
1175    /// Remove the last `buf.len()` bytes from the file and write them into `buf`.
1176    ///
1177    /// An empty buffer is a valid no-op: no bytes are removed.
1178    ///
1179    /// Use this instead of [`pop`](Self::pop) when the destination buffer is
1180    /// already allocated and you want to avoid the extra heap allocation.
1181    ///
1182    /// # Atomicity
1183    ///
1184    /// Same guarantees as [`pop`](Self::pop).
1185    ///
1186    /// # Errors
1187    ///
1188    /// Returns [`io::ErrorKind::InvalidInput`] if `buf.len()` exceeds the
1189    /// current payload size.  Also propagates any I/O error from `read_exact`,
1190    /// `set_len`, `write_all`, or `durable_sync`.
1191    pub fn pop_into(&self, buf: &mut [u8]) -> io::Result<()> {
1192        if buf.is_empty() {
1193            return Ok(());
1194        }
1195        let n = buf.len() as u64;
1196        let mut file = self.lock.write().unwrap();
1197        let raw_size = file.seek(SeekFrom::End(0))?;
1198        let data_size = raw_size - HEADER_SIZE;
1199        if n > data_size {
1200            return Err(io::Error::new(
1201                io::ErrorKind::InvalidInput,
1202                format!("pop_into({n}) exceeds payload size ({data_size})"),
1203            ));
1204        }
1205        let new_data_len = data_size - n;
1206        let locked = self.locked.load(Ordering::Acquire);
1207        if new_data_len < locked {
1208            return Err(io::Error::new(
1209                io::ErrorKind::InvalidInput,
1210                format!("pop_into({n}) would shrink payload below locked length ({locked})"),
1211            ));
1212        }
1213        file.seek(SeekFrom::Start(HEADER_SIZE + new_data_len))?;
1214        file.read_exact(buf)?;
1215        file.set_len(HEADER_SIZE + new_data_len)?;
1216        write_committed_len(&mut file, new_data_len)?;
1217        durable_sync(&file)?;
1218        Ok(())
1219    }
1220
1221    /// Remove (discard) the last `n` bytes from the file without returning them.
1222    ///
1223    /// Equivalent to [`pop`](Self::pop) but avoids allocating a buffer for the
1224    /// removed bytes.  `n = 0` is valid and is a no-op.
1225    ///
1226    /// # Atomicity
1227    ///
1228    /// Same guarantees as [`pop`](Self::pop).
1229    ///
1230    /// # Errors
1231    ///
1232    /// Returns [`io::ErrorKind::InvalidInput`] if `n` exceeds the current
1233    /// payload size.  Also propagates any I/O error from `set_len`,
1234    /// `write_all`, or `durable_sync`.
1235    pub fn discard(&self, n: u64) -> io::Result<()> {
1236        if n == 0 {
1237            return Ok(());
1238        }
1239        let mut file = self.lock.write().unwrap();
1240        let raw_size = file.seek(SeekFrom::End(0))?;
1241        let data_size = raw_size - HEADER_SIZE;
1242        if n > data_size {
1243            return Err(io::Error::new(
1244                io::ErrorKind::InvalidInput,
1245                format!("discard({n}) exceeds payload size ({data_size})"),
1246            ));
1247        }
1248        let new_data_len = data_size - n;
1249        let locked = self.locked.load(Ordering::Acquire);
1250        if new_data_len < locked {
1251            return Err(io::Error::new(
1252                io::ErrorKind::InvalidInput,
1253                format!("discard({n}) would shrink payload below locked length ({locked})"),
1254            ));
1255        }
1256        file.set_len(HEADER_SIZE + new_data_len)?;
1257        write_committed_len(&mut file, new_data_len)?;
1258        durable_sync(&file)?;
1259        Ok(())
1260    }
1261
1262    /// Overwrite `data` bytes in place starting at logical `offset`.
1263    ///
1264    /// The file size is never changed: if `offset + data.len()` would exceed
1265    /// the current payload size the call is rejected.  An empty slice is a
1266    /// valid no-op.
1267    ///
1268    /// # Feature flag
1269    ///
1270    /// Only available when the `set` Cargo feature is enabled.
1271    ///
1272    /// # Durability
1273    ///
1274    /// Equivalent to `push`/`pop`: the overwritten bytes are durably synced
1275    /// before the call returns.
1276    ///
1277    /// # Errors
1278    ///
1279    /// Returns [`io::ErrorKind::InvalidInput`] if `offset + data.len()`
1280    /// exceeds the current payload size, or if the addition overflows `u64`.
1281    /// Propagates any I/O error from `write_all` or `durable_sync`.
1282    #[cfg(feature = "set")]
1283    pub fn set(&self, offset: u64, data: impl AsRef<[u8]>) -> io::Result<()> {
1284        let data = data.as_ref();
1285        if data.is_empty() {
1286            return Ok(());
1287        }
1288        let end = offset.checked_add(data.len() as u64).ok_or_else(|| {
1289            io::Error::new(
1290                io::ErrorKind::InvalidInput,
1291                "set: offset + len overflows u64",
1292            )
1293        })?;
1294        let mut file = self.lock.write().unwrap();
1295        // Load `locked` under the write lock — otherwise a concurrent
1296        // `lock_up_to` could extend the locked region between our check and
1297        // our write, letting us mutate a now-immutable byte.
1298        let locked = self.locked.load(Ordering::Acquire);
1299        if offset < locked {
1300            return Err(io::Error::new(
1301                io::ErrorKind::InvalidInput,
1302                format!("set: range [{offset}, {end}) overlaps locked region [0, {locked})"),
1303            ));
1304        }
1305        let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE);
1306        if end > data_size {
1307            return Err(io::Error::new(
1308                io::ErrorKind::InvalidInput,
1309                format!("set: write end ({end}) exceeds payload size ({data_size})"),
1310            ));
1311        }
1312        file.seek(SeekFrom::Start(HEADER_SIZE + offset))?;
1313        file.write_all(data)?;
1314        durable_sync(&file)
1315    }
1316
1317    /// Overwrite `n` bytes with zeros in place starting at logical `offset`.
1318    ///
1319    /// The file size is never changed: if `offset + n` would exceed
1320    /// the current payload size the call is rejected.  `n = 0` is a
1321    /// valid no-op.
1322    ///
1323    /// # Feature flag
1324    ///
1325    /// Only available when the `set` Cargo feature is enabled.
1326    ///
1327    /// # Durability
1328    ///
1329    /// Equivalent to `push`/`pop`: the overwritten bytes are durably synced
1330    /// before the call returns.
1331    ///
1332    /// # Errors
1333    ///
1334    /// Returns [`io::ErrorKind::InvalidInput`] if `offset + n`
1335    /// exceeds the current payload size, or if the addition overflows `u64`.
1336    /// Propagates any I/O error from `write_all` or `durable_sync`.
1337    #[cfg(feature = "set")]
1338    pub fn zero(&self, offset: u64, n: u64) -> io::Result<()> {
1339        if n == 0 {
1340            return Ok(());
1341        }
1342        let end = offset.checked_add(n).ok_or_else(|| {
1343            io::Error::new(
1344                io::ErrorKind::InvalidInput,
1345                "zero: offset + n overflows u64",
1346            )
1347        })?;
1348        let mut file = self.lock.write().unwrap();
1349        // Load `locked` under the write lock (see `set` for rationale).
1350        let locked = self.locked.load(Ordering::Acquire);
1351        if offset < locked {
1352            return Err(io::Error::new(
1353                io::ErrorKind::InvalidInput,
1354                format!("zero: range [{offset}, {end}) overlaps locked region [0, {locked})"),
1355            ));
1356        }
1357        let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE);
1358        if end > data_size {
1359            return Err(io::Error::new(
1360                io::ErrorKind::InvalidInput,
1361                format!("zero: write end ({end}) exceeds payload size ({data_size})"),
1362            ));
1363        }
1364        file.seek(SeekFrom::Start(HEADER_SIZE + offset))?;
1365        let zeros = vec![0u8; n as usize];
1366        file.write_all(&zeros)?;
1367        durable_sync(&file)
1368    }
1369}
1370
1371// ---------------------------------------------------------------------------
1372// Atomic compound operations
1373
1374#[cfg(feature = "atomic")]
1375impl BStack {
1376    /// Cut `n` bytes off the tail then append `buf` as a single atomic operation.
1377    ///
1378    /// The operation ordering is chosen based on the net size change to maximise
1379    /// crash-recovery safety (see *Durability* in the crate docs):
1380    ///
1381    /// * **Net extension** (`buf.len() > n`): the file is extended first, `buf`
1382    ///   is written into the freed tail region plus the new space, then a
1383    ///   `durable_sync` commits the data before the header committed-length is
1384    ///   updated.  On crash before the header update, recovery truncates back to
1385    ///   the original committed length — a clean rollback.
1386    ///
1387    /// * **Net truncation or same size** (`buf.len() ≤ n`): `buf` is written
1388    ///   into the tail first, then the file is truncated, then `durable_sync`
1389    ///   commits the result before the header is updated.  On crash after
1390    ///   truncation, recovery sets the committed length to the (smaller) file
1391    ///   size, committing the final state.
1392    ///
1393    /// `n = 0` with an empty `buf` is a valid no-op.
1394    ///
1395    /// # Feature flag
1396    ///
1397    /// Only available when the `atomic` Cargo feature is enabled.
1398    ///
1399    /// # Errors
1400    ///
1401    /// Returns [`io::ErrorKind::InvalidInput`] if `n` exceeds the current
1402    /// payload size.  Propagates any I/O error from `set_len`, `write_all`,
1403    /// or `durable_sync`.
1404    #[cfg(feature = "atomic")]
1405    pub fn atrunc(&self, n: u64, buf: impl AsRef<[u8]>) -> io::Result<()> {
1406        let buf = buf.as_ref();
1407        let buf_len = buf.len() as u64;
1408        if n == 0 && buf_len == 0 {
1409            return Ok(());
1410        }
1411        let mut file = self.lock.write().unwrap();
1412        let file_end = file.seek(SeekFrom::End(0))?;
1413        let data_size = file_end - HEADER_SIZE;
1414        if n > data_size {
1415            return Err(io::Error::new(
1416                io::ErrorKind::InvalidInput,
1417                format!("atrunc: n ({n}) exceeds payload size ({data_size})"),
1418            ));
1419        }
1420        let locked = self.locked.load(Ordering::Acquire);
1421        let new_tail_start = data_size - n;
1422        if new_tail_start < locked {
1423            return Err(io::Error::new(
1424                io::ErrorKind::InvalidInput,
1425                format!("atrunc: operation would modify locked region [0, {locked})"),
1426            ));
1427        }
1428        let tail_offset = HEADER_SIZE + new_tail_start;
1429        let final_data_len = new_tail_start + buf_len;
1430
1431        if buf_len > n {
1432            // Net extension: extend first so data is never lost, then write buf,
1433            // sync the data, then commit the new length.
1434            let new_file_end = HEADER_SIZE + final_data_len;
1435            file.set_len(new_file_end)?;
1436            file.seek(SeekFrom::Start(tail_offset))?;
1437            if let Err(e) = file.write_all(buf) {
1438                let _ = file.set_len(file_end);
1439                return Err(e);
1440            }
1441            if let Err(e) = durable_sync(&file) {
1442                let _ = file.set_len(file_end);
1443                return Err(e);
1444            }
1445            write_committed_len(&mut file, final_data_len)?;
1446        } else {
1447            // Net truncation or same size: write buf into the old tail first,
1448            // truncate, sync, then commit the new length.
1449            if !buf.is_empty() {
1450                file.seek(SeekFrom::Start(tail_offset))?;
1451                file.write_all(buf)?;
1452            }
1453            file.set_len(HEADER_SIZE + final_data_len)?;
1454            durable_sync(&file)?;
1455            write_committed_len(&mut file, final_data_len)?;
1456        }
1457        Ok(())
1458    }
1459
1460    /// Pop `n` bytes off the tail then append `buf`, returning the removed bytes.
1461    ///
1462    /// The bytes are read before any mutation, so they are always available in
1463    /// the returned `Vec` even if the subsequent write fails.  The same
1464    /// ordering strategy as [`atrunc`](Self::atrunc) is used.
1465    ///
1466    /// `n = 0` with an empty `buf` is a valid no-op and returns an empty `Vec`.
1467    ///
1468    /// # Feature flag
1469    ///
1470    /// Only available when the `atomic` Cargo feature is enabled.
1471    ///
1472    /// # Errors
1473    ///
1474    /// Returns [`io::ErrorKind::InvalidInput`] if `n` exceeds the current
1475    /// payload size.  Propagates any I/O error from `read_exact`, `set_len`,
1476    /// `write_all`, or `durable_sync`.
1477    #[cfg(feature = "atomic")]
1478    pub fn splice(&self, n: u64, buf: impl AsRef<[u8]>) -> io::Result<Vec<u8>> {
1479        let buf = buf.as_ref();
1480        let buf_len = buf.len() as u64;
1481        if n == 0 && buf_len == 0 {
1482            return Ok(Vec::new());
1483        }
1484        let mut file = self.lock.write().unwrap();
1485        let file_end = file.seek(SeekFrom::End(0))?;
1486        let data_size = file_end - HEADER_SIZE;
1487        if n > data_size {
1488            return Err(io::Error::new(
1489                io::ErrorKind::InvalidInput,
1490                format!("splice: n ({n}) exceeds payload size ({data_size})"),
1491            ));
1492        }
1493        let locked = self.locked.load(Ordering::Acquire);
1494        let new_tail_start = data_size - n;
1495        if new_tail_start < locked {
1496            return Err(io::Error::new(
1497                io::ErrorKind::InvalidInput,
1498                format!("splice: operation would modify locked region [0, {locked})"),
1499            ));
1500        }
1501        let tail_offset = HEADER_SIZE + new_tail_start;
1502        let final_data_len = new_tail_start + buf_len;
1503
1504        // Read the bytes to remove before any mutation.
1505        file.seek(SeekFrom::Start(tail_offset))?;
1506        let mut removed = vec![0u8; n as usize];
1507        file.read_exact(&mut removed)?;
1508
1509        if buf_len > n {
1510            // Net extension: extend first, write buf, sync, commit.
1511            let new_file_end = HEADER_SIZE + final_data_len;
1512            file.set_len(new_file_end)?;
1513            file.seek(SeekFrom::Start(tail_offset))?;
1514            if let Err(e) = file.write_all(buf) {
1515                let _ = file.set_len(file_end);
1516                return Err(e);
1517            }
1518            if let Err(e) = durable_sync(&file) {
1519                let _ = file.set_len(file_end);
1520                return Err(e);
1521            }
1522            write_committed_len(&mut file, final_data_len)?;
1523        } else {
1524            // Net truncation or same size: write buf, truncate, sync, commit.
1525            if !buf.is_empty() {
1526                file.seek(SeekFrom::Start(tail_offset))?;
1527                file.write_all(buf)?;
1528            }
1529            file.set_len(HEADER_SIZE + final_data_len)?;
1530            durable_sync(&file)?;
1531            write_committed_len(&mut file, final_data_len)?;
1532        }
1533
1534        Ok(removed)
1535    }
1536
1537    /// Pop `old.len()` bytes off the tail into `old`, then append `new`.
1538    ///
1539    /// Buffer-reuse counterpart of [`splice`](Self::splice): avoids allocating
1540    /// a `Vec` for the removed bytes by writing them into the caller-supplied
1541    /// `old` slice.  The same ordering strategy as [`atrunc`](Self::atrunc) is
1542    /// used for the write/truncation side.
1543    ///
1544    /// An empty `old` with an empty `new` is a valid no-op.
1545    ///
1546    /// # Feature flag
1547    ///
1548    /// Only available when the `atomic` Cargo feature is enabled.
1549    ///
1550    /// # Errors
1551    ///
1552    /// Returns [`io::ErrorKind::InvalidInput`] if `old.len()` exceeds the
1553    /// current payload size.  Propagates any I/O error from `read_exact`,
1554    /// `set_len`, `write_all`, or `durable_sync`.
1555    #[cfg(feature = "atomic")]
1556    pub fn splice_into(&self, old: &mut [u8], new: impl AsRef<[u8]>) -> io::Result<()> {
1557        let new = new.as_ref();
1558        let n = old.len() as u64;
1559        let new_len = new.len() as u64;
1560        if n == 0 && new_len == 0 {
1561            return Ok(());
1562        }
1563        let mut file = self.lock.write().unwrap();
1564        let file_end = file.seek(SeekFrom::End(0))?;
1565        let data_size = file_end - HEADER_SIZE;
1566        if n > data_size {
1567            return Err(io::Error::new(
1568                io::ErrorKind::InvalidInput,
1569                format!("splice_into: n ({n}) exceeds payload size ({data_size})"),
1570            ));
1571        }
1572        let locked = self.locked.load(Ordering::Acquire);
1573        let new_tail_start = data_size - n;
1574        if new_tail_start < locked {
1575            return Err(io::Error::new(
1576                io::ErrorKind::InvalidInput,
1577                format!("splice_into: operation would modify locked region [0, {locked})"),
1578            ));
1579        }
1580        let tail_offset = HEADER_SIZE + new_tail_start;
1581        let final_data_len = new_tail_start + new_len;
1582
1583        // Read the bytes to remove before any mutation.
1584        file.seek(SeekFrom::Start(tail_offset))?;
1585        file.read_exact(old)?;
1586
1587        if new_len > n {
1588            // Net extension: extend first, write new, sync, commit.
1589            let new_file_end = HEADER_SIZE + final_data_len;
1590            file.set_len(new_file_end)?;
1591            file.seek(SeekFrom::Start(tail_offset))?;
1592            if let Err(e) = file.write_all(new) {
1593                let _ = file.set_len(file_end);
1594                return Err(e);
1595            }
1596            if let Err(e) = durable_sync(&file) {
1597                let _ = file.set_len(file_end);
1598                return Err(e);
1599            }
1600            write_committed_len(&mut file, final_data_len)?;
1601        } else {
1602            // Net truncation or same size: write new, truncate, sync, commit.
1603            if !new.is_empty() {
1604                file.seek(SeekFrom::Start(tail_offset))?;
1605                file.write_all(new)?;
1606            }
1607            file.set_len(HEADER_SIZE + final_data_len)?;
1608            durable_sync(&file)?;
1609            write_committed_len(&mut file, final_data_len)?;
1610        }
1611        Ok(())
1612    }
1613
1614    /// Append `buf` only if the current logical payload size equals `s`.
1615    ///
1616    /// Returns `Ok(true)` if the size matched and `buf` was appended (or `buf`
1617    /// is empty and no I/O was needed).  Returns `Ok(false)` without modifying
1618    /// the file if the size does not match.
1619    ///
1620    /// # Feature flag
1621    ///
1622    /// Only available when the `atomic` Cargo feature is enabled.
1623    ///
1624    /// # Errors
1625    ///
1626    /// Propagates any I/O error from `write_all`, `write_committed_len`, or
1627    /// `durable_sync`.
1628    #[cfg(feature = "atomic")]
1629    pub fn try_extend(&self, s: u64, buf: impl AsRef<[u8]>) -> io::Result<bool> {
1630        let buf = buf.as_ref();
1631        let mut file = self.lock.write().unwrap();
1632        let file_end = file.seek(SeekFrom::End(0))?;
1633        let data_size = file_end - HEADER_SIZE;
1634        if data_size != s {
1635            return Ok(false);
1636        }
1637        if buf.is_empty() {
1638            return Ok(true);
1639        }
1640        if let Err(e) = file.write_all(buf) {
1641            let _ = file.set_len(file_end);
1642            return Err(e);
1643        }
1644        let new_len = data_size + buf.len() as u64;
1645        if let Err(e) = write_committed_len(&mut file, new_len).and_then(|_| durable_sync(&file)) {
1646            let _ = file.set_len(file_end);
1647            let _ = write_committed_len(&mut file, data_size);
1648            return Err(e);
1649        }
1650        Ok(true)
1651    }
1652
1653    /// Discard `n` bytes only if the current logical payload size equals `s`.
1654    ///
1655    /// Returns `Ok(true)` if the size matched and `n` bytes were removed (or
1656    /// `n = 0` and the size check passed without I/O).  Returns `Ok(false)`
1657    /// without modifying the file if the size does not match.
1658    ///
1659    /// When `n = 0` only the read lock is taken (no file mutation occurs).
1660    ///
1661    /// # Feature flag
1662    ///
1663    /// Only available when the `atomic` Cargo feature is enabled.
1664    ///
1665    /// # Errors
1666    ///
1667    /// Returns [`io::ErrorKind::InvalidInput`] if `n` exceeds the current
1668    /// payload size.  Propagates any I/O error from `set_len`,
1669    /// `write_committed_len`, or `durable_sync`.
1670    #[cfg(feature = "atomic")]
1671    pub fn try_discard(&self, s: u64, n: u64) -> io::Result<bool> {
1672        if n == 0 {
1673            let file = self.lock.read().unwrap();
1674            let data_size = file.metadata()?.len().saturating_sub(HEADER_SIZE);
1675            return Ok(data_size == s);
1676        }
1677        let mut file = self.lock.write().unwrap();
1678        let raw_size = file.seek(SeekFrom::End(0))?;
1679        let data_size = raw_size - HEADER_SIZE;
1680        if data_size != s {
1681            return Ok(false);
1682        }
1683        if n > data_size {
1684            return Err(io::Error::new(
1685                io::ErrorKind::InvalidInput,
1686                format!("try_discard: n ({n}) exceeds payload size ({data_size})"),
1687            ));
1688        }
1689        let new_data_len = data_size - n;
1690        let locked = self.locked.load(Ordering::Acquire);
1691        if new_data_len < locked {
1692            return Err(io::Error::new(
1693                io::ErrorKind::InvalidInput,
1694                format!("try_discard: would shrink payload below locked length ({locked})"),
1695            ));
1696        }
1697        file.set_len(HEADER_SIZE + new_data_len)?;
1698        write_committed_len(&mut file, new_data_len)?;
1699        durable_sync(&file)?;
1700        Ok(true)
1701    }
1702
1703    /// Pop `n` bytes off the tail, pass them read-only to a callback that
1704    /// returns the new tail bytes, then write the new tail.
1705    ///
1706    /// The read, callback invocation, and write all happen under the same write
1707    /// lock, so no other thread can observe the state between the pop and the
1708    /// push.  The callback may return a [`Vec<u8>`] of any length — the file
1709    /// will grow or shrink accordingly using the same crash-safe ordering
1710    /// strategy as [`atrunc`](Self::atrunc).
1711    ///
1712    /// `n = 0` is valid: the callback receives an empty slice and whatever it
1713    /// returns is appended.
1714    ///
1715    /// # Feature flag
1716    ///
1717    /// Only available when the `atomic` Cargo feature is enabled.
1718    ///
1719    /// # Errors
1720    ///
1721    /// Returns [`io::ErrorKind::InvalidInput`] if `n` exceeds the current
1722    /// payload size.  Propagates any I/O error from `read_exact`, `set_len`,
1723    /// `write_all`, or `durable_sync`.
1724    #[cfg(feature = "atomic")]
1725    pub fn replace<F>(&self, n: u64, f: F) -> io::Result<()>
1726    where
1727        F: FnOnce(&[u8]) -> Vec<u8>,
1728    {
1729        let mut file = self.lock.write().unwrap();
1730        let file_end = file.seek(SeekFrom::End(0))?;
1731        let data_size = file_end - HEADER_SIZE;
1732        if n > data_size {
1733            return Err(io::Error::new(
1734                io::ErrorKind::InvalidInput,
1735                format!("replace: n ({n}) exceeds payload size ({data_size})"),
1736            ));
1737        }
1738        let locked = self.locked.load(Ordering::Acquire);
1739        let new_tail_start = data_size - n;
1740        if new_tail_start < locked {
1741            return Err(io::Error::new(
1742                io::ErrorKind::InvalidInput,
1743                format!("replace: operation would modify locked region [0, {locked})"),
1744            ));
1745        }
1746        let tail_offset = HEADER_SIZE + new_tail_start;
1747        file.seek(SeekFrom::Start(tail_offset))?;
1748        let mut old_tail = vec![0u8; n as usize];
1749        file.read_exact(&mut old_tail)?;
1750        let new_tail = f(&old_tail);
1751        let new_tail_len = new_tail.len() as u64;
1752        let final_data_len = new_tail_start + new_tail_len;
1753
1754        if new_tail_len > n {
1755            // Net extension: extend first, write new tail, sync, commit.
1756            let new_file_end = HEADER_SIZE + final_data_len;
1757            file.set_len(new_file_end)?;
1758            file.seek(SeekFrom::Start(tail_offset))?;
1759            if let Err(e) = file.write_all(&new_tail) {
1760                let _ = file.set_len(file_end);
1761                return Err(e);
1762            }
1763            if let Err(e) = durable_sync(&file) {
1764                let _ = file.set_len(file_end);
1765                return Err(e);
1766            }
1767            write_committed_len(&mut file, final_data_len)?;
1768        } else {
1769            // Net truncation or same size: write new tail, truncate, sync, commit.
1770            if !new_tail.is_empty() {
1771                file.seek(SeekFrom::Start(tail_offset))?;
1772                file.write_all(&new_tail)?;
1773            }
1774            file.set_len(HEADER_SIZE + final_data_len)?;
1775            durable_sync(&file)?;
1776            write_committed_len(&mut file, final_data_len)?;
1777        }
1778        Ok(())
1779    }
1780}
1781
1782#[cfg(all(feature = "set", feature = "atomic"))]
1783impl BStack {
1784    /// Atomically read `buf.len()` bytes at `offset` and overwrite them with
1785    /// `buf`, returning the old contents.
1786    ///
1787    /// Both the read and the write happen under the same write lock, so no
1788    /// other thread can observe either the pre-swap or mid-swap state.  The
1789    /// file size is never changed.
1790    ///
1791    /// An empty `buf` is a valid no-op and returns an empty `Vec`.
1792    ///
1793    /// # Feature flags
1794    ///
1795    /// Only available when both the `set` and `atomic` Cargo features are
1796    /// enabled.
1797    ///
1798    /// # Errors
1799    ///
1800    /// Returns [`io::ErrorKind::InvalidInput`] if `offset + buf.len()`
1801    /// overflows `u64` or exceeds the current payload size.  Propagates any
1802    /// I/O error from `read_exact`, `write_all`, or `durable_sync`.
1803    #[cfg(all(feature = "set", feature = "atomic"))]
1804    pub fn swap(&self, offset: u64, buf: impl AsRef<[u8]>) -> io::Result<Vec<u8>> {
1805        let buf = buf.as_ref();
1806        if buf.is_empty() {
1807            return Ok(Vec::new());
1808        }
1809        let end = offset.checked_add(buf.len() as u64).ok_or_else(|| {
1810            io::Error::new(
1811                io::ErrorKind::InvalidInput,
1812                "swap: offset + len overflows u64",
1813            )
1814        })?;
1815        let mut file = self.lock.write().unwrap();
1816        // Load `locked` under the write lock (see `set` for rationale).
1817        let locked = self.locked.load(Ordering::Acquire);
1818        if offset < locked {
1819            return Err(io::Error::new(
1820                io::ErrorKind::InvalidInput,
1821                format!("swap: range [{offset}, {end}) overlaps locked region [0, {locked})"),
1822            ));
1823        }
1824        let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE);
1825        if end > data_size {
1826            return Err(io::Error::new(
1827                io::ErrorKind::InvalidInput,
1828                format!("swap: range [{offset}, {end}) exceeds payload size ({data_size})"),
1829            ));
1830        }
1831        file.seek(SeekFrom::Start(HEADER_SIZE + offset))?;
1832        let mut old = vec![0u8; buf.len()];
1833        file.read_exact(&mut old)?;
1834        file.seek(SeekFrom::Start(HEADER_SIZE + offset))?;
1835        file.write_all(buf)?;
1836        durable_sync(&file)?;
1837        Ok(old)
1838    }
1839
1840    /// Atomically read `buf.len()` bytes at `offset` into `buf` while writing
1841    /// the original contents of `buf` into that position.
1842    ///
1843    /// On return, `buf` contains the bytes that were previously at `offset`,
1844    /// and the file contains what `buf` held on entry.  Buffer-reuse
1845    /// counterpart of [`swap`](Self::swap).
1846    ///
1847    /// An empty `buf` is a valid no-op.
1848    ///
1849    /// # Feature flags
1850    ///
1851    /// Only available when both the `set` and `atomic` Cargo features are
1852    /// enabled.
1853    ///
1854    /// # Errors
1855    ///
1856    /// Returns [`io::ErrorKind::InvalidInput`] if `offset + buf.len()`
1857    /// overflows `u64` or exceeds the current payload size.  Propagates any
1858    /// I/O error from `read_exact`, `write_all`, or `durable_sync`.
1859    #[cfg(all(feature = "set", feature = "atomic"))]
1860    pub fn swap_into(&self, offset: u64, buf: &mut [u8]) -> io::Result<()> {
1861        if buf.is_empty() {
1862            return Ok(());
1863        }
1864        let end = offset.checked_add(buf.len() as u64).ok_or_else(|| {
1865            io::Error::new(
1866                io::ErrorKind::InvalidInput,
1867                "swap_into: offset + len overflows u64",
1868            )
1869        })?;
1870        let mut file = self.lock.write().unwrap();
1871        // Load `locked` under the write lock (see `set` for rationale).
1872        let locked = self.locked.load(Ordering::Acquire);
1873        if offset < locked {
1874            return Err(io::Error::new(
1875                io::ErrorKind::InvalidInput,
1876                format!("swap_into: range [{offset}, {end}) overlaps locked region [0, {locked})"),
1877            ));
1878        }
1879        let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE);
1880        if end > data_size {
1881            return Err(io::Error::new(
1882                io::ErrorKind::InvalidInput,
1883                format!("swap_into: range [{offset}, {end}) exceeds payload size ({data_size})"),
1884            ));
1885        }
1886        file.seek(SeekFrom::Start(HEADER_SIZE + offset))?;
1887        let mut tmp = vec![0u8; buf.len()];
1888        file.read_exact(&mut tmp)?;
1889        file.seek(SeekFrom::Start(HEADER_SIZE + offset))?;
1890        file.write_all(buf)?;
1891        durable_sync(&file)?;
1892        buf.copy_from_slice(&tmp);
1893        Ok(())
1894    }
1895
1896    /// Compare-and-exchange: read `old.len()` bytes at `offset` and, if they
1897    /// equal `old`, overwrite them with `new`.
1898    ///
1899    /// Returns `Ok(true)` if the comparison succeeded and the exchange was
1900    /// performed.  Returns `Ok(false)` without modifying the file if
1901    /// `old.len() != new.len()` or if the current bytes do not match `old`.
1902    ///
1903    /// Both the compare and the exchange happen under the same write lock.
1904    ///
1905    /// # Feature flags
1906    ///
1907    /// Only available when both the `set` and `atomic` Cargo features are
1908    /// enabled.
1909    ///
1910    /// # Errors
1911    ///
1912    /// Returns [`io::ErrorKind::InvalidInput`] if `offset + old.len()`
1913    /// overflows `u64` or exceeds the current payload size.  Propagates any
1914    /// I/O error from `read_exact`, `write_all`, or `durable_sync`.
1915    #[cfg(all(feature = "set", feature = "atomic"))]
1916    pub fn cas(
1917        &self,
1918        offset: u64,
1919        old: impl AsRef<[u8]>,
1920        new: impl AsRef<[u8]>,
1921    ) -> io::Result<bool> {
1922        let old = old.as_ref();
1923        let new = new.as_ref();
1924        if old.len() != new.len() {
1925            return Ok(false);
1926        }
1927        if old.is_empty() {
1928            return Ok(true);
1929        }
1930        let end = offset.checked_add(old.len() as u64).ok_or_else(|| {
1931            io::Error::new(
1932                io::ErrorKind::InvalidInput,
1933                "cas: offset + len overflows u64",
1934            )
1935        })?;
1936        let mut file = self.lock.write().unwrap();
1937        // Load `locked` under the write lock (see `set` for rationale).
1938        let locked = self.locked.load(Ordering::Acquire);
1939        if offset < locked {
1940            return Err(io::Error::new(
1941                io::ErrorKind::InvalidInput,
1942                format!("cas: range [{offset}, {end}) overlaps locked region [0, {locked})"),
1943            ));
1944        }
1945        let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE);
1946        if end > data_size {
1947            return Err(io::Error::new(
1948                io::ErrorKind::InvalidInput,
1949                format!("cas: range [{offset}, {end}) exceeds payload size ({data_size})"),
1950            ));
1951        }
1952        file.seek(SeekFrom::Start(HEADER_SIZE + offset))?;
1953        let mut current = vec![0u8; old.len()];
1954        file.read_exact(&mut current)?;
1955        if current != old {
1956            return Ok(false);
1957        }
1958        file.seek(SeekFrom::Start(HEADER_SIZE + offset))?;
1959        file.write_all(new)?;
1960        durable_sync(&file)?;
1961        Ok(true)
1962    }
1963
1964    /// Read bytes in the half-open logical range `[start, end)`, pass them to
1965    /// a callback that may mutate them in place, then write the modified bytes
1966    /// back.
1967    ///
1968    /// The read, callback invocation, and write all happen under the same write
1969    /// lock, so no other thread can observe an intermediate state.  The file
1970    /// size is never changed.
1971    ///
1972    /// `start == end` is a valid no-op: `f` is called with an empty slice and
1973    /// no I/O is performed beyond the initial size check.
1974    ///
1975    /// # Feature flags
1976    ///
1977    /// Only available when both the `set` and `atomic` Cargo features are
1978    /// enabled.
1979    ///
1980    /// # Errors
1981    ///
1982    /// Returns [`io::ErrorKind::InvalidInput`] if `end < start` or if `end`
1983    /// exceeds the current payload size.  Propagates any I/O error from
1984    /// `read_exact`, `write_all`, or `durable_sync`.
1985    #[cfg(all(feature = "set", feature = "atomic"))]
1986    pub fn process<F>(&self, start: u64, end: u64, f: F) -> io::Result<()>
1987    where
1988        F: FnOnce(&mut [u8]),
1989    {
1990        if end < start {
1991            return Err(io::Error::new(
1992                io::ErrorKind::InvalidInput,
1993                format!("process: end ({end}) < start ({start})"),
1994            ));
1995        }
1996        let n = end - start;
1997        let mut file = self.lock.write().unwrap();
1998        let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE);
1999        if end > data_size {
2000            return Err(io::Error::new(
2001                io::ErrorKind::InvalidInput,
2002                format!("process: end ({end}) exceeds payload size ({data_size})"),
2003            ));
2004        }
2005        let locked = self.locked.load(Ordering::Acquire);
2006        if start < locked {
2007            return Err(io::Error::new(
2008                io::ErrorKind::InvalidInput,
2009                format!("process: range [{start}, {end}) overlaps locked region [0, {locked})"),
2010            ));
2011        }
2012        let mut buf = vec![0u8; n as usize];
2013        if n > 0 {
2014            file.seek(SeekFrom::Start(HEADER_SIZE + start))?;
2015            file.read_exact(&mut buf)?;
2016        }
2017        f(&mut buf);
2018        if n > 0 {
2019            file.seek(SeekFrom::Start(HEADER_SIZE + start))?;
2020            file.write_all(&buf)?;
2021            durable_sync(&file)?;
2022        }
2023        Ok(())
2024    }
2025}
2026
2027// ---------------------------------------------------------------------------
2028
2029impl BStack {
2030    /// Return the current **logical** payload size in bytes (excludes the
2031    /// 16-byte header).
2032    ///
2033    /// Takes the read lock, so it can run concurrently with other `len` calls
2034    /// but blocks while any write-lock operation is in progress.  The returned
2035    /// value always reflects a clean operation boundary.
2036    ///
2037    /// # Errors
2038    ///
2039    /// Propagates any [`io::Error`] from [`File::metadata`].
2040    pub fn len(&self) -> io::Result<u64> {
2041        let file = self.lock.read().unwrap();
2042        Ok(file.metadata()?.len().saturating_sub(HEADER_SIZE))
2043    }
2044
2045    /// Return `true` if the stack contains no payload bytes.
2046    ///
2047    /// # Errors
2048    ///
2049    /// Propagates any [`io::Error`] from [`File::metadata`].
2050    pub fn is_empty(&self) -> io::Result<bool> {
2051        Ok(self.len()? == 0)
2052    }
2053
2054    /// Returns the current locked length.  `0` means no bytes are locked.
2055    ///
2056    /// The locked region is `[0, locked_len())`.  All bytes within this range
2057    /// are permanently immutable: writes and shrink operations that would
2058    /// touch them return [`io::ErrorKind::InvalidInput`], and reads to ranges
2059    /// entirely within it skip the rwlock on Unix and Windows.
2060    pub fn locked_len(&self) -> u64 {
2061        self.locked.load(Ordering::Acquire)
2062    }
2063
2064    /// Extend the locked region to cover `[0, n)`.
2065    ///
2066    /// `n` must be ≥ the current locked length and ≤ the current payload
2067    /// length.  After this call, reads to `[0, n)` are lock-free on Unix and
2068    /// Windows, and all write and shrink operations that would touch `[0, n)`
2069    /// return [`io::ErrorKind::InvalidInput`].
2070    ///
2071    /// Acquires the exclusive write lock to ensure all in-flight writes to
2072    /// `[0, n)` have completed before the region is declared immutable.
2073    ///
2074    /// # Errors
2075    ///
2076    /// Returns [`io::ErrorKind::InvalidInput`] if `n` is less than the current
2077    /// locked length (partition can only grow) or if `n` exceeds the current
2078    /// payload length.
2079    pub fn lock_up_to(&self, n: u64) -> io::Result<()> {
2080        // Acquire the write lock to serialise against any in-flight writers.
2081        let file = self.lock.write().unwrap();
2082        let data_size = file.metadata()?.len().saturating_sub(HEADER_SIZE);
2083        let current_locked = self.locked.load(Ordering::Relaxed);
2084        if n < current_locked {
2085            return Err(io::Error::new(
2086                io::ErrorKind::InvalidInput,
2087                format!(
2088                    "lock_up_to: n ({n}) is less than the current locked length ({current_locked})"
2089                ),
2090            ));
2091        }
2092        if n > data_size {
2093            return Err(io::Error::new(
2094                io::ErrorKind::InvalidInput,
2095                format!("lock_up_to: n ({n}) exceeds payload size ({data_size})"),
2096            ));
2097        }
2098        // Release store: all writes completed under the write lock above are
2099        // visible to any thread that subsequently loads `locked` with Acquire.
2100        self.locked.store(n, Ordering::Release);
2101        drop(file);
2102        Ok(())
2103    }
2104
2105    /// Open a `BStack` and immediately lock the first `n` bytes.
2106    ///
2107    /// Equivalent to [`BStack::open`] followed by [`lock_up_to`](Self::lock_up_to),
2108    /// but expressed as a single call for the common pattern where the locked
2109    /// region is known ahead of time (e.g. a fixed-size metadata block whose
2110    /// size is a compile-time or configuration constant).
2111    ///
2112    /// # Errors
2113    ///
2114    /// Propagates all errors from [`open`](Self::open).  Returns
2115    /// [`io::ErrorKind::InvalidInput`] if `n` exceeds the payload length of
2116    /// the opened file.
2117    pub fn open_locked_up_to(path: impl AsRef<Path>, n: u64) -> io::Result<Self> {
2118        let stack = Self::open(path)?;
2119        stack.lock_up_to(n)?;
2120        Ok(stack)
2121    }
2122}
2123
2124// ---------------------------------------------------------------------------
2125// io::Write
2126
2127/// Appends bytes to the stack.
2128///
2129/// Each call to [`write`](io::Write::write) is equivalent to [`push`](BStack::push):
2130/// all bytes are written atomically and durably synced before returning.
2131/// Calling `write_all` or chaining multiple `write` calls therefore issues
2132/// one `durable_sync` per call — callers that need to batch many small writes
2133/// without per-write syncs should accumulate data and call `push` directly.
2134///
2135/// [`flush`](io::Write::flush) is a no-op because every `write` is already
2136/// durable.
2137impl io::Write for BStack {
2138    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
2139        self.push(buf)?;
2140        Ok(buf.len())
2141    }
2142
2143    fn flush(&mut self) -> io::Result<()> {
2144        Ok(())
2145    }
2146}
2147
2148/// Shared-reference counterpart of `impl Write for BStack`.
2149///
2150/// Because [`push`](BStack::push) takes `&self` (interior mutability via
2151/// `RwLock`), the `Write` implementation is also available on `&BStack`,
2152/// mirroring the standard library's `impl Write for &File`.
2153impl io::Write for &BStack {
2154    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
2155        self.push(buf)?;
2156        Ok(buf.len())
2157    }
2158
2159    fn flush(&mut self) -> io::Result<()> {
2160        Ok(())
2161    }
2162}
2163
2164impl fmt::Debug for BStack {
2165    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2166        f.debug_struct("BStack")
2167            .field(
2168                "version",
2169                &format!("{}.{}.{}", MAGIC[4], MAGIC[5], MAGIC[6]),
2170            )
2171            .field("len", &self.len().ok())
2172            .finish_non_exhaustive()
2173    }
2174}
2175
2176impl Eq for BStack {}
2177
2178/// Two `BStack` instances are equal iff they are the **same instance** in memory.
2179///
2180/// Because [`BStack::open`] acquires an exclusive advisory lock, no two
2181/// `BStack` values within one process can refer to the same file at the same
2182/// time.  Pointer identity is therefore the only meaningful equality: a stack
2183/// is equal to itself and to nothing else.
2184impl PartialEq for BStack {
2185    fn eq(&self, other: &Self) -> bool {
2186        std::ptr::eq(self, other)
2187    }
2188}
2189
2190/// Hashes the instance address, consistent with the pointer-identity [`PartialEq`].
2191impl Hash for BStack {
2192    fn hash<H: Hasher>(&self, state: &mut H) {
2193        (self as *const BStack).hash(state);
2194    }
2195}
2196
2197/// A cursor-based reader over a [`BStack`] payload.
2198///
2199/// `BStackReader` implements [`io::Read`] and [`io::Seek`], allowing the
2200/// stack's payload to be consumed through any interface that expects a
2201/// readable, seekable byte stream.
2202///
2203/// # Construction
2204///
2205/// ```no_run
2206/// use bstack::BStack;
2207///
2208/// # fn main() -> std::io::Result<()> {
2209/// let stack = BStack::open("log.bin")?;
2210/// stack.push(b"hello world")?;
2211///
2212/// // Start reading from the beginning.
2213/// let mut reader = stack.reader();
2214///
2215/// // Or start from an arbitrary offset.
2216/// let mut mid = stack.reader_at(6);
2217/// # Ok(())
2218/// # }
2219/// ```
2220///
2221/// # Concurrency
2222///
2223/// `BStackReader` borrows the stack immutably, so multiple readers can coexist
2224/// and run concurrently with each other and with [`peek`](BStack::peek) /
2225/// [`get`](BStack::get) calls.  Concurrent [`push`](BStack::push) or
2226/// [`pop`](BStack::pop) operations are not blocked by an active reader, but
2227/// reading interleaved with writes may observe different snapshots of the
2228/// payload across calls — callers are responsible for synchronisation when
2229/// that matters.
2230pub struct BStackReader<'a> {
2231    stack: &'a BStack,
2232    offset: u64,
2233}
2234
2235impl BStack {
2236    /// Create a [`BStackReader`] positioned at the start of the payload.
2237    pub fn reader(&self) -> BStackReader<'_> {
2238        BStackReader {
2239            stack: self,
2240            offset: 0,
2241        }
2242    }
2243
2244    /// Create a [`BStackReader`] positioned at `offset` bytes into the payload.
2245    ///
2246    /// Seeking past the current end is allowed; [`read`](io::Read::read) will
2247    /// return `Ok(0)` until new data is pushed past that point.
2248    pub fn reader_at(&self, offset: u64) -> BStackReader<'_> {
2249        BStackReader {
2250            stack: self,
2251            offset,
2252        }
2253    }
2254}
2255
2256impl<'a> BStackReader<'a> {
2257    /// Return the current logical read offset within the payload.
2258    pub fn position(&self) -> u64 {
2259        self.offset
2260    }
2261}
2262
2263impl<'a> From<&'a BStack> for BStackReader<'a> {
2264    fn from(stack: &'a BStack) -> Self {
2265        stack.reader()
2266    }
2267}
2268
2269impl<'a> From<BStackReader<'a>> for &'a BStack {
2270    fn from(val: BStackReader<'a>) -> Self {
2271        val.stack
2272    }
2273}
2274
2275/// Two readers are equal when they point to the **same `BStack` instance**
2276/// (pointer identity) and share the same cursor `offset`.
2277impl<'a> PartialEq for BStackReader<'a> {
2278    fn eq(&self, other: &Self) -> bool {
2279        self.stack == other.stack && self.offset == other.offset
2280    }
2281}
2282
2283impl<'a> Eq for BStackReader<'a> {}
2284
2285/// Hashes `(BStack pointer, offset)`, consistent with [`PartialEq`].
2286impl<'a> Hash for BStackReader<'a> {
2287    fn hash<H: Hasher>(&self, state: &mut H) {
2288        self.stack.hash(state);
2289        self.offset.hash(state);
2290    }
2291}
2292
2293impl<'a> PartialOrd for BStackReader<'a> {
2294    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
2295        Some(self.cmp(other))
2296    }
2297}
2298
2299/// Ordered by `BStack` instance address, then by cursor `offset`.
2300///
2301/// The address component groups all readers over the same stack together,
2302/// and within that group the natural read order (smaller offset first) applies.
2303/// This ordering is consistent with the pointer-identity [`PartialEq`].
2304impl<'a> Ord for BStackReader<'a> {
2305    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
2306        let self_ptr = self.stack as *const BStack as usize;
2307        let other_ptr = other.stack as *const BStack as usize;
2308        self_ptr
2309            .cmp(&other_ptr)
2310            .then(self.offset.cmp(&other.offset))
2311    }
2312}
2313
2314impl<'a> io::Read for BStackReader<'a> {
2315    /// Read bytes from the current position into `buf`.
2316    ///
2317    /// Returns the number of bytes read, which may be less than `buf.len()` if
2318    /// the end of the payload is reached.  Returns `Ok(0)` at EOF.
2319    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
2320        if buf.is_empty() {
2321            return Ok(0);
2322        }
2323        let data_size = self.stack.len()?;
2324        if self.offset >= data_size {
2325            return Ok(0);
2326        }
2327        let available = (data_size - self.offset) as usize;
2328        let n = buf.len().min(available);
2329        self.stack.get_into(self.offset, &mut buf[..n])?;
2330        self.offset += n as u64;
2331        Ok(n)
2332    }
2333}
2334
2335impl<'a> io::Seek for BStackReader<'a> {
2336    /// Move the read cursor.
2337    ///
2338    /// [`SeekFrom::Start`] and [`SeekFrom::Current`] with a non-negative delta
2339    /// may advance the cursor past the current end of the payload; subsequent
2340    /// [`read`](io::Read::read) calls will return `Ok(0)` until the payload
2341    /// grows past that point.  Seeking before the start of the payload returns
2342    /// [`io::ErrorKind::InvalidInput`].
2343    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
2344        let data_size = self.stack.len()? as i128;
2345        let new_offset = match pos {
2346            SeekFrom::Start(n) => n as i128,
2347            SeekFrom::End(n) => data_size + n as i128,
2348            SeekFrom::Current(n) => self.offset as i128 + n as i128,
2349        };
2350        if new_offset < 0 {
2351            return Err(io::Error::new(
2352                io::ErrorKind::InvalidInput,
2353                "seek before beginning of payload",
2354            ));
2355        }
2356        self.offset = new_offset as u64;
2357        Ok(self.offset)
2358    }
2359}