bstack/lib.rs
1//! A persistent, fsync-durable binary stack backed by a single file.
2//!
3//! # Overview
4//!
5//! [`BStack`] treats a file as a flat byte buffer that grows and shrinks from
6//! the tail. Every mutating operation — [`push`](BStack::push),
7//! [`extend`](BStack::extend), [`pop`](BStack::pop), [`discard`](BStack::discard), (with the `set`
8//! feature) [`set`](BStack::set) and [`zero`](BStack::zero), (with the `atomic` feature)
9//! [`replace`](BStack::replace), and (with both `set` and `atomic`)
10//! [`process`](BStack::process) — calls a *durable sync* before returning,
11//! so the data survives a process crash or an unclean system shutdown.
12//! Read-only operations — [`peek`](BStack::peek),
13//! [`peek_into`](BStack::peek_into), [`get`](BStack::get), and
14//! [`get_into`](BStack::get_into) — never modify the file and on Unix and
15//! Windows can run concurrently with each other.
16//! [`pop_into`](BStack::pop_into) is the buffer-passing counterpart of `pop`,
17//! carrying the same durability and atomicity guarantees.
18//! [`discard`](BStack::discard) is like `pop` but discards the removed bytes
19//! without reading or returning them, avoiding any allocation or copy.
20//!
21//! The crate depends on **`libc`** (Unix) and **`windows-sys`** (Windows) for
22//! platform-specific syscalls, and uses **no `unsafe` code beyond the required
23//! FFI calls**.
24//!
25//! # File format
26//!
27//! Every file begins with a fixed 16-byte header:
28//!
29//! ```text
30//! ┌────────────────────────┬──────────────┬──────────────┐
31//! │ header (16 B) │ payload 0 │ payload 1 │ ...
32//! │ magic[8] | clen[8 LE] │ │ │
33//! └────────────────────────┴──────────────┴──────────────┘
34//! ^ ^ ^ ^
35//! file offset 0 offset 16 16+n0 EOF
36//! ```
37//!
38//! * **`magic`** — 8 bytes: `BSTK` + major(1 B) + minor(1 B) + patch(1 B) + reserved(1 B).
39//! This version writes `BSTK\x00\x01\x0a\x00` (0.1.10). [`open`](BStack::open)
40//! accepts any file whose first 6 bytes match `BSTK\x00\x01` (any 0.1.x) and
41//! rejects anything with a different major or minor.
42//! * **`clen`** — little-endian `u64` recording the *committed* payload length.
43//! It is updated atomically with each [`push`](BStack::push) or
44//! [`pop`](BStack::pop) and is used for crash recovery on the next
45//! [`open`](BStack::open).
46//!
47//! All user-visible offsets are **logical** (0-based from the start of the
48//! payload region, i.e. from file byte 16).
49//!
50//! # Crash recovery
51//!
52//! On [`open`](BStack::open), the header's committed length is compared against
53//! the actual file size:
54//!
55//! | Condition | Cause | Recovery |
56//! |-----------|-------|----------|
57//! | `file_size − 16 > clen` | partial tail write (push crashed before header update) | truncate to `16 + clen` |
58//! | `file_size − 16 < clen` | partial truncation (pop crashed before header update) | set `clen = file_size − 16` |
59//!
60//! After recovery a `durable_sync` ensures the repaired state is on stable
61//! storage before any caller can observe or modify the file.
62//!
63//! # Durability
64//!
65//! | Operation | Syscall sequence |
66//! |-----------|-----------------|
67//! | `push` | `lseek(END)` → `write(data)` → `lseek(8)` → `write(clen)` → `durable_sync` |
68//! | `extend` | `lseek(END)` → `set_len(new_end)` → `lseek(8)` → `write(clen)` → `durable_sync` |
69//! | `pop`, `pop_into` | `lseek` → `read` → `ftruncate` → `lseek(8)` → `write(clen)` → `durable_sync` |
70//! | `discard` | `ftruncate` → `lseek(8)` → `write(clen)` → `durable_sync` |
71//! | `set` *(feature)* | `lseek(offset)` → `write(data)` → `durable_sync` |
72//! | `zero` *(feature)* | `lseek(offset)` → `write(zeros)` → `durable_sync` |
73//! | `atrunc` *(feature: atomic, net extension)* | `set_len(new_end)` → `lseek(tail)` → `write(buf)` → `durable_sync` → `lseek(8)` → `write(clen)` |
74//! | `atrunc` *(feature: atomic, net truncation)* | `lseek(tail)` → `write(buf)` → `set_len(new_end)` → `durable_sync` → `lseek(8)` → `write(clen)` |
75//! | `splice`, `splice_into` *(feature: atomic)* | `lseek(tail)` → `read(n)` → *(then as `atrunc`)* |
76//! | `try_extend` *(feature: atomic)* | `lseek(END)` — conditional `push` sequence if size matches |
77//! | `try_discard` *(feature: atomic)* | `lseek(END)` — conditional `discard` sequence if size matches |
78//! | `swap`, `swap_into` *(features: set+atomic)* | `lseek(offset)` → `read` → `lseek(offset)` → `write(buf)` → `durable_sync` |
79//! | `cas` *(features: set+atomic)* | `lseek(offset)` → `read` → compare — conditional `lseek(offset)` → `write(new)` → `durable_sync` |
80//! | `process` *(features: set+atomic)* | `lseek(start)` → `read(end−start)` → *(callback)* → `lseek(start)` → `write(buf)` → `durable_sync` |
81//! | `replace` *(feature: atomic)* | `lseek(tail)` → `read(n)` → *(callback)* → *(then as `atrunc`)* |
82//! | `peek`, `peek_into`, `get`, `get_into` | `pread(2)` on Unix; `ReadFile`+`OVERLAPPED` on Windows; `lseek` → `read` elsewhere (no sync — read-only) |
83//!
84//! **`durable_sync` on macOS** issues `fcntl(F_FULLFSYNC)`, which flushes the
85//! drive's hardware write cache. Plain `fdatasync` is not sufficient on macOS
86//! because the kernel may acknowledge it before the drive controller has
87//! committed the data. If `F_FULLFSYNC` is not supported by the device the
88//! implementation falls back to `sync_data` (`fdatasync`).
89//!
90//! **`durable_sync` on other Unix** calls `sync_data` (`fdatasync`), which is
91//! sufficient on Linux and BSD.
92//!
93//! **`durable_sync` on Windows** calls `sync_data`, which maps to
94//! `FlushFileBuffers`. This flushes the kernel write-back cache and waits for
95//! the drive to acknowledge, providing equivalent durability to `fdatasync`.
96//!
97//! # Multi-process safety
98//!
99//! On Unix, [`open`](BStack::open) acquires an **exclusive advisory `flock`**
100//! on the file (`LOCK_EX | LOCK_NB`). If another process already holds the
101//! lock, `open` returns immediately with [`io::ErrorKind::WouldBlock`] rather
102//! than blocking indefinitely. The lock is released automatically when the
103//! [`BStack`] is dropped (the underlying file descriptor is closed).
104//!
105//! On Windows, [`open`](BStack::open) acquires an **exclusive `LockFileEx`**
106//! lock (`LOCKFILE_EXCLUSIVE_LOCK | LOCKFILE_FAIL_IMMEDIATELY`) covering the
107//! entire file range. If another process already holds the lock, `open`
108//! returns immediately with [`io::ErrorKind::WouldBlock`]
109//! (`ERROR_LOCK_VIOLATION`). The lock is released when the [`BStack`] is
110//! dropped (the underlying file handle is closed).
111//!
112//! > **Note:** Both `flock` (Unix) and `LockFileEx` (Windows) are advisory
113//! > and per-process. They prevent well-behaved concurrent opens across
114//! > processes but do not protect against processes that bypass the lock or
115//! > against raw writes to the file.
116//!
117//! # Correct usage
118//!
119//! bstack files must only be opened through this crate or a compatible
120//! implementation that understands the file format, the header protocol, and
121//! the locking semantics. Reading or writing the underlying file with raw
122//! tools or syscalls while a [`BStack`] instance is live — or manually editing
123//! the header fields — can silently corrupt the committed-length sentinel or
124//! bypass the advisory lock.
125//!
126//! **The authors make no guarantees about the behaviour of this crate —
127//! including freedom from data loss or logical corruption — when the file has
128//! been accessed outside of this crate's controlled interface.**
129//!
130//! # Thread safety
131//!
132//! `BStack` wraps the file in a [`std::sync::RwLock`].
133//!
134//! | Operation | Lock (Unix / Windows) | Lock (other) |
135//! |-----------|-----------------------|--------------|
136//! | `push`, `extend`, `pop`, `pop_into`, `discard` | write | write |
137//! | `set`, `zero` *(feature)* | write | write |
138//! | `atrunc`, `splice`, `splice_into`, `try_extend` *(feature: atomic)* | write | write |
139//! | `try_discard(s, n > 0)` *(feature: atomic)* | write | write |
140//! | `try_discard(s, 0)` *(feature: atomic)* | **read** | **read** |
141//! | `swap`, `swap_into`, `cas` *(features: set+atomic)* | write | write |
142//! | `process` *(features: set+atomic)* | write | write |
143//! | `replace` *(feature: atomic)* | write | write |
144//! | `peek`, `peek_into`, `get`, `get_into` | **read** (or **none** for ranges entirely within the locked region) | write |
145//! | `len` | read | read |
146//!
147//! On Unix and Windows, `peek`, `peek_into`, `get`, and `get_into` use a
148//! cursor-safe positional read (`pread(2)` on Unix; `ReadFile` with
149//! `OVERLAPPED` on Windows) that does not modify the file-position cursor.
150//! This allows multiple concurrent calls to any of these methods to run in
151//! parallel while any ongoing `push`, `pop`, or `pop_into` still serialises
152//! all writers via the write lock. When a read range lies entirely within
153//! the [locked region](#locked-region-lock_up_to), the rwlock is bypassed
154//! altogether — see that section for the concurrency model.
155//!
156//! On other platforms a seek is required, so `peek`, `peek_into`, `get`, and
157//! `get_into` fall back to the write lock and all reads serialise.
158//!
159//! # Locked region (`lock_up_to`)
160//!
161//! [`BStack`] maintains an in-memory **monotonically growing partition
162//! boundary** named the *locked region*. Bytes in `[0, locked_len())` are
163//! declared permanently immutable for the lifetime of the open file.
164//!
165//! The locked length starts at `0` on every [`open`](BStack::open) and is
166//! **not persisted to disk** — the file format is unchanged. Callers extend
167//! the boundary by calling [`lock_up_to`](BStack::lock_up_to) (or open and
168//! lock in one step with [`open_locked_up_to`](BStack::open_locked_up_to)).
169//! It can only grow; attempts to shrink it return
170//! [`io::ErrorKind::InvalidInput`].
171//!
172//! ## Effects
173//!
174//! * **Lock-free reads on Unix and Windows.** When [`get`](BStack::get),
175//! [`get_into`](BStack::get_into), or [`peek_into`](BStack::peek_into) are
176//! called with a range that lies entirely within the locked region, the
177//! rwlock is bypassed and the read is served directly by `pread(2)`
178//! (Unix) or `ReadFile` + `OVERLAPPED` (Windows). The `fstat` size check
179//! is skipped too — the locked length is a sufficient upper bound.
180//!
181//! * **Write protection.** [`set`](BStack::set), [`zero`](BStack::zero),
182//! [`swap`](BStack::swap), [`swap_into`](BStack::swap_into),
183//! [`cas`](BStack::cas), and [`process`](BStack::process) return
184//! [`io::ErrorKind::InvalidInput`] when their target range overlaps the
185//! locked region. [`atrunc`](BStack::atrunc), [`splice`](BStack::splice),
186//! [`splice_into`](BStack::splice_into), and [`replace`](BStack::replace)
187//! return the same error when the operation would modify bytes inside it.
188//!
189//! * **Shrink protection.** [`pop`](BStack::pop),
190//! [`pop_into`](BStack::pop_into), [`discard`](BStack::discard), and
191//! [`try_discard`](BStack::try_discard) return
192//! [`io::ErrorKind::InvalidInput`] when they would shrink the payload
193//! below the locked length.
194//!
195//! Callers that never invoke `lock_up_to` see no behavioural change — every
196//! read and write path adds only a single uncontended `AtomicU64::load` and
197//! a comparison.
198//!
199//! ## Concurrency model
200//!
201//! `lock_up_to(n)` acquires the exclusive write lock before publishing the
202//! new boundary with a `Release` store. Lock-free readers `Acquire`-load
203//! `locked` before each call. Two consequences follow:
204//!
205//! * A stale load is always safe. If a reader sees an older (smaller)
206//! `locked` value, it falls through to the rwlock path; if it sees a
207//! newer value, the entire range it now reads is by definition immutable.
208//!
209//! * Locked-region checks on writers are evaluated **under the write lock**,
210//! so they cannot race against a concurrent `lock_up_to` extending the
211//! boundary across the write target.
212//!
213//! ## Typical use
214//!
215//! ```no_run
216//! use bstack::BStack;
217//!
218//! # fn main() -> std::io::Result<()> {
219//! // A fixed 64-byte metadata block at the head of the file, read by many
220//! // threads but never modified after first write.
221//! let stack = BStack::open_locked_up_to("meta.bin", 64)?;
222//! assert_eq!(stack.locked_len(), 64);
223//!
224//! // Reads of the metadata bypass the rwlock on Unix and Windows.
225//! let header = stack.get(0, 64)?;
226//! # let _ = header;
227//! # Ok(())
228//! # }
229//! ```
230//!
231//! On platforms other than Unix and Windows the boundary still enforces
232//! immutability through the rwlock path; only the lock-free read fast path
233//! is platform-gated.
234//!
235//! # Standard I/O adapters
236//!
237//! ## Writing
238//!
239//! `BStack` implements [`std::io::Write`] (and so does `&BStack`, mirroring
240//! [`std::io::Write` for `&File`]). Each call to `write` is forwarded to
241//! [`push`](BStack::push), so every write is atomically appended and durably
242//! synced before returning. `flush` is a no-op.
243//!
244//! ```no_run
245//! use std::io::Write;
246//! use bstack::BStack;
247//!
248//! # fn main() -> std::io::Result<()> {
249//! let mut stack = BStack::open("log.bin")?;
250//! stack.write_all(b"hello")?;
251//! stack.write_all(b"world")?;
252//! # Ok(())
253//! # }
254//! ```
255//!
256//! ## Reading
257//!
258//! [`BStackReader`] wraps a `&BStack` with a cursor and implements
259//! [`std::io::Read`] and [`std::io::Seek`]. Use [`BStack::reader`] or
260//! [`BStack::reader_at`] to construct one.
261//!
262//! ```no_run
263//! use std::io::{Read, Seek, SeekFrom};
264//! use bstack::BStack;
265//!
266//! # fn main() -> std::io::Result<()> {
267//! let stack = BStack::open("log.bin")?;
268//! stack.push(b"hello world")?;
269//!
270//! let mut reader = stack.reader();
271//! let mut buf = [0u8; 5];
272//! reader.read_exact(&mut buf)?; // b"hello"
273//! reader.seek(SeekFrom::Start(6))?;
274//! reader.read_exact(&mut buf)?; // b"world"
275//! # Ok(())
276//! # }
277//! ```
278//!
279//! # Trait implementations
280//!
281//! ## `BStack`
282//!
283//! | Trait | Semantics |
284//! |-------|-----------|
285//! | `Debug` | Shows `version` (semver string from the magic header, e.g. `"0.1.6"`) and `len` (`Option<u64>`, `None` on I/O failure). |
286//! | `PartialEq` / `Eq` | **Pointer identity.** Two values are equal iff they are the same instance. No two distinct `BStack` values in one process can refer to the same file. |
287//! | `Hash` | Hashes the instance address — consistent with pointer-identity `PartialEq`. |
288//!
289//! ## `BStackReader`
290//!
291//! | Trait | Semantics |
292//! |-------|-----------|
293//! | `PartialEq` / `Eq` | Equal when both the `BStack` pointer (identity) and the cursor `offset` match. |
294//! | `Hash` | Hashes `(BStack pointer, offset)` — consistent with `PartialEq`. |
295//! | `PartialOrd` / `Ord` | Ordered by `BStack` instance address, then by cursor `offset`. Groups all readers over the same stack and within that group orders by position. |
296//!
297//! # Feature flags
298//!
299//! | Feature | Description |
300//! |---------|-------------|
301//! | `set` | Enables [`BStack::set`] and [`BStack::zero`] — in-place overwrite of existing payload bytes (or with zeros) without changing the file size. |
302//! | `alloc` | Enables [`BStackAllocator`], [`BStackBulkAllocator`], [`BStackSlice`], [`BStackSliceReader`], and [`LinearBStackAllocator`] — region-based allocation over a `BStack` payload. |
303//! | `atomic` | Enables [`BStack::atrunc`], [`BStack::splice`], [`BStack::splice_into`], [`BStack::try_extend`], [`BStack::try_discard`], and [`BStack::replace`] — compound read-modify-write operations that hold the write lock across what would otherwise be separate calls. Combined with `set`, also enables [`BStack::swap`], [`BStack::swap_into`], [`BStack::cas`], and [`BStack::process`]. |
304//!
305//! Enable with:
306//!
307//! ```toml
308//! [dependencies]
309//! bstack = { version = "0.1", features = ["set"] }
310//! # or
311//! bstack = { version = "0.1", features = ["alloc"] }
312//! # or both
313//! bstack = { version = "0.1", features = ["alloc", "set"] }
314//! ```
315//!
316//! # Allocator (`alloc` feature)
317//!
318//! The `alloc` feature adds a region-management layer on top of [`BStack`].
319//!
320//! ## Key types
321//!
322//! * [`BStackAllocator`] — trait for types that own a [`BStack`] and manage
323//! contiguous byte regions within its payload. Requires `stack()`,
324//! `into_stack()`, `alloc()`, and `realloc()`; provides a default no-op
325//! `dealloc()` and delegation helpers `len()` / `is_empty()`.
326//!
327//! * [`BStackBulkAllocator`] — extension trait for [`BStackAllocator`] that
328//! adds atomic bulk operations. Both methods are required with no default; on error
329//! the backing store is left unchanged unless a crash occur.
330//!
331//! * [`BStackSlice`]`<'a, A>` — lightweight `Copy` handle (allocator reference +
332//! offset + length) to a contiguous region. Exposes `read`, `read_into`,
333//! `read_range_into`, `subslice`, `subslice_range`, `reader`, `reader_at`,
334//! and (with the `set` feature) `write`, `write_range`, `zero`, `zero_range`.
335//!
336//! * [`BStackSliceReader`]`<'a, A>` — cursor-based reader over a
337//! [`BStackSlice`], implementing [`io::Read`] and [`io::Seek`] in the
338//! slice's coordinate space.
339//!
340//! * [`LinearBStackAllocator`] — reference bump allocator that appends regions
341//! sequentially. `realloc` is O(1) for the tail allocation and returns
342//! `Unsupported` for non-tail slices. `dealloc` reclaims the tail via
343//! [`BStack::discard`]; non-tail deallocations are a no-op. Every operation
344//! maps to exactly one [`BStack`] call and is crash-safe by inheritance.
345//! Implements [`BStackAllocator`] and [`BStackBulkAllocator`].
346//!
347//! * [`FirstFitBStackAllocator`] — Experimental: a persistent first-fit free-list allocator
348//! that reuses freed regions to prevent unbounded file growth. Requires both
349//! `alloc` and `set` features.
350//!
351//! * [`GhostTreeBstackAllocator`] — A pure-AVL general-purpose allocator with
352//! zero-overhead live allocations. Free blocks store their AVL node inline,
353//! and the tree is keyed on `(size, address)` for best-fit allocation.
354//! Provides O(log n) allocation and deallocation with crash recovery through
355//! tree rebalancing on mount.
356//!
357//! ## Lifetime model
358//!
359//! `BStackSlice<'a, A>` borrows the **allocator** for `'a`, not the
360//! [`BStack`] directly. As a result the borrow checker statically prevents
361//! calling [`BStackAllocator::into_stack`] — which consumes the allocator by
362//! value — while any slice is still in scope.
363//!
364//! ## Quick example
365//!
366//! ```skip
367//! use bstack::{BStack, BStackAllocator, LinearBStackAllocator};
368//!
369//! # fn main() -> std::io::Result<()> {
370//! let alloc = LinearBStackAllocator::new(BStack::open("data.bstack")?);
371//!
372//! let slice = alloc.alloc(128)?; // reserve 128 zero bytes
373//! let data = slice.read()?; // read them back
374//! alloc.dealloc(slice)?; // release (tail, so O(1))
375//!
376//! let stack = alloc.into_stack(); // reclaim the BStack
377//! # Ok(())
378//! # }
379//! ```
380//!
381//! # Examples
382//!
383//! ```no_run
384//! use bstack::BStack;
385//!
386//! # fn main() -> std::io::Result<()> {
387//! let stack = BStack::open("log.bin")?;
388//!
389//! // push returns the logical byte offset where the payload starts.
390//! let off0 = stack.push(b"hello")?; // 0
391//! let off1 = stack.push(b"world")?; // 5
392//!
393//! assert_eq!(stack.len()?, 10);
394//!
395//! // peek reads from a logical offset to the end without removing anything.
396//! assert_eq!(stack.peek(off1)?, b"world");
397//!
398//! // get reads an arbitrary half-open logical byte range.
399//! assert_eq!(stack.get(3, 8)?, b"lowor");
400//!
401//! // pop removes bytes from the tail and returns them.
402//! assert_eq!(stack.pop(5)?, b"world");
403//! assert_eq!(stack.len()?, 5);
404//! # Ok(())
405//! # }
406//! ```
407
408#[cfg(all(test, feature = "alloc", feature = "set"))]
409mod alloc_fuzz_tests;
410mod test;
411
412#[cfg(feature = "alloc")]
413mod alloc;
414#[cfg(feature = "alloc")]
415pub use alloc::{
416 BStackAllocator, BStackBulkAllocator, BStackSlice, BStackSliceAllocator, BStackSliceReader,
417 LinearBStackAllocator, ManualAllocator,
418};
419#[cfg(all(feature = "alloc", feature = "set"))]
420pub use alloc::{BStackSliceWriter, FirstFitBStackAllocator, GhostTreeBstackAllocator};
421
422#[cfg(all(feature = "guarded", feature = "atomic"))]
423pub use alloc::{BStackAtomicGuardedSlice, BStackAtomicGuardedSliceSubview};
424#[cfg(feature = "guarded")]
425pub use alloc::{BStackGuardedSlice, BStackGuardedSliceSubview};
426
427use std::fmt;
428use std::fs::{File, OpenOptions};
429use std::hash::{Hash, Hasher};
430use std::io::{self, Read, Seek, SeekFrom, Write};
431use std::path::Path;
432use std::sync::RwLock;
433use std::sync::atomic::{AtomicU64, Ordering};
434
435#[cfg(unix)]
436use std::os::unix::fs::FileExt;
437#[cfg(unix)]
438use std::os::unix::io::AsRawFd;
439#[cfg(unix)]
440use std::os::unix::io::RawFd;
441
442#[cfg(windows)]
443use std::os::windows::fs::FileExt as WindowsFileExt;
444#[cfg(windows)]
445use std::os::windows::io::AsRawHandle;
446#[cfg(windows)]
447use windows_sys::Win32::Foundation::HANDLE;
448#[cfg(windows)]
449use windows_sys::Win32::Storage::FileSystem::{
450 LOCKFILE_EXCLUSIVE_LOCK, LOCKFILE_FAIL_IMMEDIATELY, LockFileEx, ReadFile,
451};
452#[cfg(windows)]
453use windows_sys::Win32::System::IO::OVERLAPPED;
454
455/// Full magic for files written by this version (`BSTK` + major 0 + minor 1 + patch 10 + 0).
456const MAGIC: [u8; 8] = *b"BSTK\x00\x01\x0a\x00";
457
458/// Compatibility prefix checked on open: `BSTK` + major 0 + minor 1.
459/// Any file whose first 6 bytes match is considered a compatible 0.1.x file.
460const MAGIC_PREFIX: [u8; 6] = *b"BSTK\x00\x01";
461
462/// Bytes occupied by the file header (magic[8] + committed_len[8]).
463const HEADER_SIZE: u64 = 16;
464
465/// Flush all in-flight writes to stable storage.
466///
467/// On macOS this uses `F_FULLFSYNC` to flush the drive's hardware write cache,
468/// which `fdatasync` alone does not guarantee. Falls back to `sync_data` if
469/// `F_FULLFSYNC` returns an error (e.g. the device doesn't support it).
470/// On all other platforms this delegates to `sync_data` (`fdatasync`).
471fn durable_sync(file: &File) -> io::Result<()> {
472 #[cfg(target_os = "macos")]
473 {
474 let ret = unsafe { libc::fcntl(file.as_raw_fd(), libc::F_FULLFSYNC) };
475 if ret != -1 {
476 return Ok(());
477 }
478 // Device does not support F_FULLFSYNC; fall back to fdatasync.
479 }
480 file.sync_data()
481}
482
483/// Acquire an exclusive, non-blocking advisory flock on `file`.
484///
485/// Returns `Err(WouldBlock)` if another process already holds the lock.
486#[cfg(unix)]
487fn flock_exclusive(file: &File) -> io::Result<()> {
488 let ret = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX | libc::LOCK_NB) };
489 if ret == 0 {
490 Ok(())
491 } else {
492 Err(io::Error::last_os_error())
493 }
494}
495
496/// Acquire an exclusive, non-blocking `LockFileEx` lock on `file`.
497///
498/// Locks the entire file range (offset 0, length `u64::MAX`).
499/// Returns `Err(WouldBlock)` if another process already holds the lock
500/// (`ERROR_LOCK_VIOLATION` maps to `io::ErrorKind::WouldBlock` in Rust).
501#[cfg(windows)]
502fn lock_file_exclusive(file: &File) -> io::Result<()> {
503 let handle = file.as_raw_handle() as windows_sys::Win32::Foundation::HANDLE;
504 // OVERLAPPED is required by LockFileEx even for synchronous handles.
505 // Offset fields (0, 0) anchor the lock at byte 0 of the file.
506 let mut overlapped: OVERLAPPED = unsafe { std::mem::zeroed() };
507 let ret = unsafe {
508 LockFileEx(
509 handle,
510 LOCKFILE_EXCLUSIVE_LOCK | LOCKFILE_FAIL_IMMEDIATELY,
511 0, // reserved, must be zero
512 u32::MAX, // nNumberOfBytesToLockLow ─┐ lock entire
513 u32::MAX, // nNumberOfBytesToLockHigh ─┘ file space
514 &mut overlapped,
515 )
516 };
517 if ret != 0 {
518 Ok(())
519 } else {
520 Err(io::Error::last_os_error())
521 }
522}
523
524/// Write the 16-byte header into a brand-new (empty) file.
525fn init_header(file: &mut File) -> io::Result<()> {
526 file.seek(SeekFrom::Start(0))?;
527 file.write_all(&MAGIC)?;
528 file.write_all(&0u64.to_le_bytes())
529}
530
531/// Overwrite the committed-length field at file offset 8.
532fn write_committed_len(file: &mut File, len: u64) -> io::Result<()> {
533 file.seek(SeekFrom::Start(8))?;
534 file.write_all(&len.to_le_bytes())
535}
536
537/// Read `len` bytes from absolute file position `offset` without modifying
538/// the file-position cursor, so the caller only needs a shared (read) lock.
539///
540/// On Unix this uses `pread(2)` via `read_exact_at`.
541/// On Windows this uses `ReadFile` with an `OVERLAPPED` offset (via
542/// `seek_read`), which is also cursor-safe on synchronous handles.
543#[cfg(unix)]
544fn pread_exact(file: &File, offset: u64, len: usize) -> io::Result<Vec<u8>> {
545 let mut buf = vec![0u8; len];
546 file.read_exact_at(&mut buf, offset)?;
547 Ok(buf)
548}
549
550/// Windows counterpart of `pread_exact` — see the shared doc comment above.
551#[cfg(windows)]
552fn pread_exact(file: &File, offset: u64, len: usize) -> io::Result<Vec<u8>> {
553 let mut buf = vec![0u8; len];
554 let mut filled = 0usize;
555 while filled < len {
556 let n = file.seek_read(&mut buf[filled..], offset + filled as u64)?;
557 if n == 0 {
558 return Err(io::Error::new(
559 io::ErrorKind::UnexpectedEof,
560 "pread_exact: unexpected EOF",
561 ));
562 }
563 filled += n;
564 }
565 Ok(buf)
566}
567
568/// Fill `buf` from absolute file position `offset` without modifying the
569/// file-position cursor. Unix uses `pread(2)` via `read_exact_at`;
570/// Windows uses `ReadFile` with an `OVERLAPPED` offset via `seek_read`.
571#[cfg(unix)]
572fn pread_exact_into(file: &File, offset: u64, buf: &mut [u8]) -> io::Result<()> {
573 file.read_exact_at(buf, offset)
574}
575
576/// Windows counterpart of `pread_exact_into`.
577#[cfg(windows)]
578fn pread_exact_into(file: &File, offset: u64, buf: &mut [u8]) -> io::Result<()> {
579 let len = buf.len();
580 let mut filled = 0usize;
581 while filled < len {
582 let n = file.seek_read(&mut buf[filled..], offset + filled as u64)?;
583 if n == 0 {
584 return Err(io::Error::new(
585 io::ErrorKind::UnexpectedEof,
586 "pread_exact_into: unexpected EOF",
587 ));
588 }
589 filled += n;
590 }
591 Ok(())
592}
593
594/// Lock-free positional read using a raw file descriptor (Unix).
595///
596/// Calls `pread(2)` directly, bypassing the `RwLock<File>`. Safe only when
597/// the target range is within the immutable locked region, ensuring no
598/// concurrent writer can touch those bytes.
599#[cfg(unix)]
600fn pread_exact_raw(fd: RawFd, offset: u64, buf: &mut [u8]) -> io::Result<()> {
601 let mut filled = 0usize;
602 while filled < buf.len() {
603 let n = unsafe {
604 libc::pread(
605 fd,
606 buf[filled..].as_mut_ptr() as *mut libc::c_void,
607 buf.len() - filled,
608 (offset + filled as u64) as libc::off_t,
609 )
610 };
611 if n < 0 {
612 return Err(io::Error::last_os_error());
613 }
614 if n == 0 {
615 return Err(io::Error::new(
616 io::ErrorKind::UnexpectedEof,
617 "locked pread: unexpected EOF",
618 ));
619 }
620 filled += n as usize;
621 }
622 Ok(())
623}
624
625/// Lock-free positional read using a raw Windows HANDLE.
626///
627/// Calls `ReadFile` with an `OVERLAPPED` offset, bypassing the `RwLock<File>`.
628/// Safe under the same invariant as `pread_exact_raw`.
629#[cfg(windows)]
630fn pread_exact_raw_handle(handle: isize, offset: u64, buf: &mut [u8]) -> io::Result<()> {
631 let handle = handle as HANDLE;
632 let mut filled = 0usize;
633 let len = buf.len();
634 while filled < len {
635 let current_offset = offset + filled as u64;
636 let mut overlapped: OVERLAPPED = unsafe { std::mem::zeroed() };
637 // SAFETY: the Anonymous/Anonymous path exists in the windows-sys OVERLAPPED layout.
638 overlapped.Anonymous.Anonymous.Offset = current_offset as u32;
639 overlapped.Anonymous.Anonymous.OffsetHigh = (current_offset >> 32) as u32;
640 let mut bytes_read: u32 = 0;
641 let ret = unsafe {
642 ReadFile(
643 handle,
644 buf[filled..].as_mut_ptr(),
645 (len - filled) as u32,
646 &mut bytes_read,
647 &mut overlapped,
648 )
649 };
650 if ret == 0 {
651 return Err(io::Error::last_os_error());
652 }
653 if bytes_read == 0 {
654 return Err(io::Error::new(
655 io::ErrorKind::UnexpectedEof,
656 "locked ReadFile: unexpected EOF",
657 ));
658 }
659 filled += bytes_read as usize;
660 }
661 Ok(())
662}
663
664/// Read and validate the header; return the committed payload length.
665fn read_header(file: &mut File) -> io::Result<u64> {
666 file.seek(SeekFrom::Start(0))?;
667 let mut hdr = [0u8; 16];
668 file.read_exact(&mut hdr)?;
669 if hdr[0..6] != MAGIC_PREFIX {
670 return Err(io::Error::new(
671 io::ErrorKind::InvalidData,
672 "bstack: bad magic number — not a bstack file or incompatible version",
673 ));
674 }
675 Ok(u64::from_le_bytes(hdr[8..16].try_into().unwrap()))
676}
677
678// ---------------------------------------------------------------------------
679
680/// A persistent, fsync-durable binary stack backed by a single file.
681///
682/// See the [crate-level documentation](crate) for the file format, durability
683/// guarantees, crash recovery, multi-process safety, and thread-safety model.
684pub struct BStack {
685 lock: RwLock<File>,
686 /// Monotonically growing partition boundary. Bytes in `[0, locked)` are
687 /// immutable and can be read without the rwlock on supported platforms.
688 /// Not persisted — resets to 0 on every open.
689 locked: AtomicU64,
690 /// Copy of the raw file descriptor used for lock-free positional reads
691 /// on the locked region. The `File` inside `lock` retains ownership and
692 /// will close the descriptor when `BStack` is dropped.
693 #[cfg(unix)]
694 fd: RawFd,
695 /// Copy of the Windows HANDLE stored as `isize` so the field is
696 /// `Send + Sync`. Same lifetime guarantee as `fd` above.
697 #[cfg(windows)]
698 handle: isize,
699}
700
701// `BStack` is auto-`Send + Sync` on every platform: all fields
702// (`RwLock<File>`, `AtomicU64`, and the `RawFd` / `isize` handle) already
703// implement both traits. The lock-free `pread` / `ReadFile`+`OVERLAPPED`
704// paths are cursor-independent and safe to call from any thread, and the raw
705// fd / handle remains valid for as long as `BStack` owns the `File`.
706
707impl BStack {
708 /// Open or create a stack file at `path`.
709 ///
710 /// On a **new** file the 16-byte header is written and durably synced
711 /// before returning.
712 ///
713 /// On an **existing** file the header is validated and, if a previous crash
714 /// left the file in an inconsistent state, the file is repaired and durably
715 /// synced before returning (see *Crash recovery* in the crate docs).
716 ///
717 /// On Unix an **exclusive advisory `flock`** is acquired; if another
718 /// process already holds the lock this function returns immediately with
719 /// [`io::ErrorKind::WouldBlock`].
720 ///
721 /// # Errors
722 ///
723 /// * [`io::ErrorKind::WouldBlock`] — another process holds the exclusive
724 /// lock (Unix only).
725 /// * [`io::ErrorKind::InvalidData`] — the file exists but its header magic
726 /// is wrong (not a bstack file, or created by an incompatible version),
727 /// or the file is too short to contain a valid header.
728 /// * Any [`io::Error`] from [`OpenOptions::open`], `read`, `write`, or
729 /// `durable_sync`.
730 pub fn open(path: impl AsRef<Path>) -> io::Result<Self> {
731 let mut file = OpenOptions::new()
732 .read(true)
733 .write(true)
734 .create(true)
735 .truncate(false)
736 .open(path)?;
737
738 #[cfg(unix)]
739 flock_exclusive(&file)?;
740
741 #[cfg(windows)]
742 lock_file_exclusive(&file)?;
743
744 let raw_size = file.metadata()?.len();
745
746 if raw_size == 0 {
747 init_header(&mut file)?;
748 durable_sync(&file)?;
749 } else if raw_size < HEADER_SIZE {
750 return Err(io::Error::new(
751 io::ErrorKind::InvalidData,
752 format!(
753 "bstack: file is {raw_size} bytes — too small to contain the 16-byte header"
754 ),
755 ));
756 } else {
757 let committed_len = read_header(&mut file)?;
758 let actual_data_len = raw_size - HEADER_SIZE;
759 if actual_data_len != committed_len {
760 // Recover: use whichever length is smaller (the committed
761 // value is the last successfully synced boundary).
762 let correct_len = committed_len.min(actual_data_len);
763 file.set_len(HEADER_SIZE + correct_len)?;
764 write_committed_len(&mut file, correct_len)?;
765 durable_sync(&file)?;
766 }
767 }
768
769 #[cfg(unix)]
770 let fd = file.as_raw_fd();
771 #[cfg(windows)]
772 let handle = file.as_raw_handle() as isize;
773
774 Ok(BStack {
775 #[cfg(unix)]
776 fd,
777 #[cfg(windows)]
778 handle,
779 lock: RwLock::new(file),
780 locked: AtomicU64::new(0),
781 })
782 }
783
784 /// Append `data` to the end of the file.
785 ///
786 /// Returns the **logical** byte offset at which `data` begins — i.e. the
787 /// payload size immediately before the write. An empty slice is valid; it
788 /// writes nothing and returns the current end offset.
789 ///
790 /// # Atomicity
791 ///
792 /// Either the full payload is written, the header committed-length is
793 /// updated, and the whole thing is durably synced, or the file is
794 /// left unchanged (best-effort rollback via `ftruncate` + header reset).
795 ///
796 /// # Errors
797 ///
798 /// Returns any [`io::Error`] from `write_all`, `durable_sync`, or the
799 /// fallback `set_len`.
800 pub fn push(&self, data: impl AsRef<[u8]>) -> io::Result<u64> {
801 let data = data.as_ref();
802 let mut file = self.lock.write().unwrap();
803 let file_end = file.seek(SeekFrom::End(0))?;
804 let logical_offset = file_end - HEADER_SIZE;
805
806 if data.is_empty() {
807 return Ok(logical_offset);
808 }
809
810 if let Err(e) = file.write_all(data) {
811 let _ = file.set_len(file_end);
812 return Err(e);
813 }
814
815 let new_len = logical_offset + data.len() as u64;
816 if let Err(e) = write_committed_len(&mut file, new_len).and_then(|_| durable_sync(&file)) {
817 // Roll back: truncate data and reset header.
818 let _ = file.set_len(file_end);
819 let _ = write_committed_len(&mut file, logical_offset);
820 return Err(e);
821 }
822
823 Ok(logical_offset)
824 }
825
826 /// Append `n` zero bytes to the end of the file.
827 ///
828 /// Returns the **logical** byte offset at which the zeros begin — i.e. the
829 /// payload size immediately before the write. `n = 0` is valid; it writes
830 /// nothing and returns the current end offset.
831 ///
832 /// # Atomicity
833 ///
834 /// Either the file is extended, the header committed-length is updated,
835 /// and the whole thing is durably synced, or the file is left unchanged
836 /// (best-effort rollback via `ftruncate` + header reset).
837 ///
838 /// # Errors
839 ///
840 /// Returns any [`io::Error`] from `set_len`, `durable_sync`, or the
841 /// fallback `set_len`.
842 pub fn extend(&self, n: u64) -> io::Result<u64> {
843 let mut file = self.lock.write().unwrap();
844 let file_end = file.seek(SeekFrom::End(0))?;
845 let logical_offset = file_end - HEADER_SIZE;
846
847 if n == 0 {
848 return Ok(logical_offset);
849 }
850
851 let new_file_end = file_end + n;
852 file.set_len(new_file_end)?;
853
854 let new_len = logical_offset + n;
855 if let Err(e) = write_committed_len(&mut file, new_len).and_then(|_| durable_sync(&file)) {
856 // Roll back: truncate and reset header.
857 let _ = file.set_len(file_end);
858 let _ = write_committed_len(&mut file, logical_offset);
859 return Err(e);
860 }
861
862 Ok(logical_offset)
863 }
864
865 /// Remove and return the last `n` bytes of the file.
866 ///
867 /// `n = 0` is valid: no bytes are removed and an empty `Vec` is returned.
868 /// `n` may span across multiple previous [`push`](Self::push) boundaries.
869 ///
870 /// # Atomicity
871 ///
872 /// The bytes are read before the file is truncated. The committed-length
873 /// in the header is updated and durably synced after the truncation.
874 ///
875 /// # Errors
876 ///
877 /// Returns [`io::ErrorKind::InvalidInput`] if `n` exceeds the current
878 /// payload size. Also propagates any I/O error from `read_exact`,
879 /// `set_len`, `write_all`, or `durable_sync`.
880 pub fn pop(&self, n: u64) -> io::Result<Vec<u8>> {
881 let mut file = self.lock.write().unwrap();
882 let raw_size = file.seek(SeekFrom::End(0))?;
883 let data_size = raw_size - HEADER_SIZE;
884 if n > data_size {
885 return Err(io::Error::new(
886 io::ErrorKind::InvalidInput,
887 format!("pop({n}) exceeds payload size ({data_size})"),
888 ));
889 }
890 let new_data_len = data_size - n;
891 let locked = self.locked.load(Ordering::Acquire);
892 if new_data_len < locked {
893 return Err(io::Error::new(
894 io::ErrorKind::InvalidInput,
895 format!("pop({n}) would shrink payload below locked length ({locked})"),
896 ));
897 }
898 file.seek(SeekFrom::Start(HEADER_SIZE + new_data_len))?;
899 let mut buf = vec![0u8; n as usize];
900 file.read_exact(&mut buf)?;
901 file.set_len(HEADER_SIZE + new_data_len)?;
902 write_committed_len(&mut file, new_data_len)?;
903 durable_sync(&file)?;
904 Ok(buf)
905 }
906
907 /// Return a copy of every payload byte from `offset` to the end of the
908 /// file.
909 ///
910 /// `offset` is a **logical** offset (as returned by [`push`](Self::push)).
911 /// `offset == len()` is valid and returns an empty `Vec`. The file is not
912 /// modified.
913 ///
914 /// # Concurrency
915 ///
916 /// On Unix and Windows this uses a cursor-safe positional read (`pread(2)`
917 /// on Unix; `ReadFile`+`OVERLAPPED` on Windows), so the method takes only
918 /// the **read lock**, allowing multiple concurrent `peek` and `get` calls
919 /// to run in parallel.
920 ///
921 /// On other platforms a seek is required; the method falls back to the
922 /// write lock and concurrent reads serialise.
923 ///
924 /// # Errors
925 ///
926 /// Returns [`io::ErrorKind::InvalidInput`] if `offset` exceeds the current
927 /// payload size.
928 pub fn peek(&self, offset: u64) -> io::Result<Vec<u8>> {
929 #[cfg(any(unix, windows))]
930 {
931 let file = self.lock.read().unwrap();
932 let data_size = file.metadata()?.len().saturating_sub(HEADER_SIZE);
933 if offset > data_size {
934 return Err(io::Error::new(
935 io::ErrorKind::InvalidInput,
936 format!("peek offset ({offset}) exceeds payload size ({data_size})"),
937 ));
938 }
939 pread_exact(&file, HEADER_SIZE + offset, (data_size - offset) as usize)
940 }
941 #[cfg(not(any(unix, windows)))]
942 {
943 let mut file = self.lock.write().unwrap();
944 let raw_size = file.seek(SeekFrom::End(0))?;
945 let data_size = raw_size.saturating_sub(HEADER_SIZE);
946 if offset > data_size {
947 return Err(io::Error::new(
948 io::ErrorKind::InvalidInput,
949 format!("peek offset ({offset}) exceeds payload size ({data_size})"),
950 ));
951 }
952 file.seek(SeekFrom::Start(HEADER_SIZE + offset))?;
953 let mut buf = vec![0u8; (data_size - offset) as usize];
954 file.read_exact(&mut buf)?;
955 Ok(buf)
956 }
957 }
958
959 /// Return a copy of the bytes in the half-open logical range `[start, end)`.
960 ///
961 /// `start == end` is valid and returns an empty `Vec`. The file is not
962 /// modified.
963 ///
964 /// # Concurrency
965 ///
966 /// Same as [`peek`](Self::peek): on Unix and Windows the read lock is
967 /// taken and concurrent `get`/`peek`/`len` calls may run in parallel. On
968 /// other platforms the write lock is taken and reads serialise.
969 ///
970 /// # Errors
971 ///
972 /// Returns [`io::ErrorKind::InvalidInput`] if `end < start` or if `end`
973 /// exceeds the current payload size.
974 pub fn get(&self, start: u64, end: u64) -> io::Result<Vec<u8>> {
975 if end < start {
976 return Err(io::Error::new(
977 io::ErrorKind::InvalidInput,
978 format!("get: end ({end}) < start ({start})"),
979 ));
980 }
981 // Fast-path: if the range lies entirely within the locked region,
982 // skip the rwlock — locked bytes are immutable so no lock is needed.
983 #[cfg(unix)]
984 {
985 let locked = self.locked.load(Ordering::Acquire);
986 if end <= locked {
987 let mut buf = vec![0u8; (end - start) as usize];
988 pread_exact_raw(self.fd, HEADER_SIZE + start, &mut buf)?;
989 return Ok(buf);
990 }
991 }
992 #[cfg(windows)]
993 {
994 let locked = self.locked.load(Ordering::Acquire);
995 if end <= locked {
996 let mut buf = vec![0u8; (end - start) as usize];
997 pread_exact_raw_handle(self.handle, HEADER_SIZE + start, &mut buf)?;
998 return Ok(buf);
999 }
1000 }
1001 #[cfg(any(unix, windows))]
1002 {
1003 let file = self.lock.read().unwrap();
1004 let data_size = file.metadata()?.len().saturating_sub(HEADER_SIZE);
1005 if end > data_size {
1006 return Err(io::Error::new(
1007 io::ErrorKind::InvalidInput,
1008 format!("get: end ({end}) exceeds payload size ({data_size})"),
1009 ));
1010 }
1011 pread_exact(&file, HEADER_SIZE + start, (end - start) as usize)
1012 }
1013 #[cfg(not(any(unix, windows)))]
1014 {
1015 let mut file = self.lock.write().unwrap();
1016 let raw_size = file.seek(SeekFrom::End(0))?;
1017 let data_size = raw_size.saturating_sub(HEADER_SIZE);
1018 if end > data_size {
1019 return Err(io::Error::new(
1020 io::ErrorKind::InvalidInput,
1021 format!("get: end ({end}) exceeds payload size ({data_size})"),
1022 ));
1023 }
1024 file.seek(SeekFrom::Start(HEADER_SIZE + start))?;
1025 let mut buf = vec![0u8; (end - start) as usize];
1026 file.read_exact(&mut buf)?;
1027 Ok(buf)
1028 }
1029 }
1030
1031 /// Fill `buf` with bytes from logical `offset` to `offset + buf.len()`.
1032 ///
1033 /// Reads exactly `buf.len()` bytes from `offset` into the caller-supplied
1034 /// buffer. An empty buffer is a valid no-op. The file is not modified.
1035 ///
1036 /// Use this instead of [`peek`](Self::peek) when the destination buffer is
1037 /// already allocated and you want to avoid the extra heap allocation.
1038 ///
1039 /// # Concurrency
1040 ///
1041 /// Same as [`peek`](Self::peek): on Unix and Windows only the read lock is
1042 /// taken; on other platforms the write lock serialises all reads.
1043 ///
1044 /// # Errors
1045 ///
1046 /// Returns [`io::ErrorKind::InvalidInput`] if `offset + buf.len()` overflows
1047 /// `u64` or exceeds the current payload size.
1048 pub fn peek_into(&self, offset: u64, buf: &mut [u8]) -> io::Result<()> {
1049 if buf.is_empty() {
1050 return Ok(());
1051 }
1052 let len = buf.len() as u64;
1053 let end = offset.checked_add(len).ok_or_else(|| {
1054 io::Error::new(
1055 io::ErrorKind::InvalidInput,
1056 "peek_into: offset + len overflows u64",
1057 )
1058 })?;
1059 // Fast-path: locked region is immutable, skip the rwlock.
1060 #[cfg(unix)]
1061 {
1062 let locked = self.locked.load(Ordering::Acquire);
1063 if end <= locked {
1064 return pread_exact_raw(self.fd, HEADER_SIZE + offset, buf);
1065 }
1066 }
1067 #[cfg(windows)]
1068 {
1069 let locked = self.locked.load(Ordering::Acquire);
1070 if end <= locked {
1071 return pread_exact_raw_handle(self.handle, HEADER_SIZE + offset, buf);
1072 }
1073 }
1074 #[cfg(any(unix, windows))]
1075 {
1076 let file = self.lock.read().unwrap();
1077 let data_size = file.metadata()?.len().saturating_sub(HEADER_SIZE);
1078 if end > data_size {
1079 return Err(io::Error::new(
1080 io::ErrorKind::InvalidInput,
1081 format!(
1082 "peek_into: range [{offset}, {end}) exceeds payload size ({data_size})"
1083 ),
1084 ));
1085 }
1086 pread_exact_into(&file, HEADER_SIZE + offset, buf)
1087 }
1088 #[cfg(not(any(unix, windows)))]
1089 {
1090 let mut file = self.lock.write().unwrap();
1091 let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE);
1092 if end > data_size {
1093 return Err(io::Error::new(
1094 io::ErrorKind::InvalidInput,
1095 format!(
1096 "peek_into: range [{offset}, {end}) exceeds payload size ({data_size})"
1097 ),
1098 ));
1099 }
1100 file.seek(SeekFrom::Start(HEADER_SIZE + offset))?;
1101 file.read_exact(buf)
1102 }
1103 }
1104
1105 /// Fill `buf` with bytes from the half-open logical range
1106 /// `[start, start + buf.len())`.
1107 ///
1108 /// An empty buffer is a valid no-op. The file is not modified.
1109 ///
1110 /// Use this instead of [`get`](Self::get) when the destination buffer is
1111 /// already allocated and you want to avoid the extra heap allocation.
1112 ///
1113 /// # Concurrency
1114 ///
1115 /// Same as [`get`](Self::get): on Unix and Windows only the read lock is
1116 /// taken; on other platforms the write lock serialises all reads.
1117 ///
1118 /// # Errors
1119 ///
1120 /// Returns [`io::ErrorKind::InvalidInput`] if `start + buf.len()` overflows
1121 /// `u64` or exceeds the current payload size.
1122 pub fn get_into(&self, start: u64, buf: &mut [u8]) -> io::Result<()> {
1123 if buf.is_empty() {
1124 return Ok(());
1125 }
1126 let len = buf.len() as u64;
1127 let end = start.checked_add(len).ok_or_else(|| {
1128 io::Error::new(
1129 io::ErrorKind::InvalidInput,
1130 "get_into: start + len overflows u64",
1131 )
1132 })?;
1133 // Fast-path: locked region is immutable, skip the rwlock.
1134 #[cfg(unix)]
1135 {
1136 let locked = self.locked.load(Ordering::Acquire);
1137 if end <= locked {
1138 return pread_exact_raw(self.fd, HEADER_SIZE + start, buf);
1139 }
1140 }
1141 #[cfg(windows)]
1142 {
1143 let locked = self.locked.load(Ordering::Acquire);
1144 if end <= locked {
1145 return pread_exact_raw_handle(self.handle, HEADER_SIZE + start, buf);
1146 }
1147 }
1148 #[cfg(any(unix, windows))]
1149 {
1150 let file = self.lock.read().unwrap();
1151 let data_size = file.metadata()?.len().saturating_sub(HEADER_SIZE);
1152 if end > data_size {
1153 return Err(io::Error::new(
1154 io::ErrorKind::InvalidInput,
1155 format!("get_into: end ({end}) exceeds payload size ({data_size})"),
1156 ));
1157 }
1158 pread_exact_into(&file, HEADER_SIZE + start, buf)
1159 }
1160 #[cfg(not(any(unix, windows)))]
1161 {
1162 let mut file = self.lock.write().unwrap();
1163 let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE);
1164 if end > data_size {
1165 return Err(io::Error::new(
1166 io::ErrorKind::InvalidInput,
1167 format!("get_into: end ({end}) exceeds payload size ({data_size})"),
1168 ));
1169 }
1170 file.seek(SeekFrom::Start(HEADER_SIZE + start))?;
1171 file.read_exact(buf)
1172 }
1173 }
1174
1175 /// Remove the last `buf.len()` bytes from the file and write them into `buf`.
1176 ///
1177 /// An empty buffer is a valid no-op: no bytes are removed.
1178 ///
1179 /// Use this instead of [`pop`](Self::pop) when the destination buffer is
1180 /// already allocated and you want to avoid the extra heap allocation.
1181 ///
1182 /// # Atomicity
1183 ///
1184 /// Same guarantees as [`pop`](Self::pop).
1185 ///
1186 /// # Errors
1187 ///
1188 /// Returns [`io::ErrorKind::InvalidInput`] if `buf.len()` exceeds the
1189 /// current payload size. Also propagates any I/O error from `read_exact`,
1190 /// `set_len`, `write_all`, or `durable_sync`.
1191 pub fn pop_into(&self, buf: &mut [u8]) -> io::Result<()> {
1192 if buf.is_empty() {
1193 return Ok(());
1194 }
1195 let n = buf.len() as u64;
1196 let mut file = self.lock.write().unwrap();
1197 let raw_size = file.seek(SeekFrom::End(0))?;
1198 let data_size = raw_size - HEADER_SIZE;
1199 if n > data_size {
1200 return Err(io::Error::new(
1201 io::ErrorKind::InvalidInput,
1202 format!("pop_into({n}) exceeds payload size ({data_size})"),
1203 ));
1204 }
1205 let new_data_len = data_size - n;
1206 let locked = self.locked.load(Ordering::Acquire);
1207 if new_data_len < locked {
1208 return Err(io::Error::new(
1209 io::ErrorKind::InvalidInput,
1210 format!("pop_into({n}) would shrink payload below locked length ({locked})"),
1211 ));
1212 }
1213 file.seek(SeekFrom::Start(HEADER_SIZE + new_data_len))?;
1214 file.read_exact(buf)?;
1215 file.set_len(HEADER_SIZE + new_data_len)?;
1216 write_committed_len(&mut file, new_data_len)?;
1217 durable_sync(&file)?;
1218 Ok(())
1219 }
1220
1221 /// Remove (discard) the last `n` bytes from the file without returning them.
1222 ///
1223 /// Equivalent to [`pop`](Self::pop) but avoids allocating a buffer for the
1224 /// removed bytes. `n = 0` is valid and is a no-op.
1225 ///
1226 /// # Atomicity
1227 ///
1228 /// Same guarantees as [`pop`](Self::pop).
1229 ///
1230 /// # Errors
1231 ///
1232 /// Returns [`io::ErrorKind::InvalidInput`] if `n` exceeds the current
1233 /// payload size. Also propagates any I/O error from `set_len`,
1234 /// `write_all`, or `durable_sync`.
1235 pub fn discard(&self, n: u64) -> io::Result<()> {
1236 if n == 0 {
1237 return Ok(());
1238 }
1239 let mut file = self.lock.write().unwrap();
1240 let raw_size = file.seek(SeekFrom::End(0))?;
1241 let data_size = raw_size - HEADER_SIZE;
1242 if n > data_size {
1243 return Err(io::Error::new(
1244 io::ErrorKind::InvalidInput,
1245 format!("discard({n}) exceeds payload size ({data_size})"),
1246 ));
1247 }
1248 let new_data_len = data_size - n;
1249 let locked = self.locked.load(Ordering::Acquire);
1250 if new_data_len < locked {
1251 return Err(io::Error::new(
1252 io::ErrorKind::InvalidInput,
1253 format!("discard({n}) would shrink payload below locked length ({locked})"),
1254 ));
1255 }
1256 file.set_len(HEADER_SIZE + new_data_len)?;
1257 write_committed_len(&mut file, new_data_len)?;
1258 durable_sync(&file)?;
1259 Ok(())
1260 }
1261
1262 /// Overwrite `data` bytes in place starting at logical `offset`.
1263 ///
1264 /// The file size is never changed: if `offset + data.len()` would exceed
1265 /// the current payload size the call is rejected. An empty slice is a
1266 /// valid no-op.
1267 ///
1268 /// # Feature flag
1269 ///
1270 /// Only available when the `set` Cargo feature is enabled.
1271 ///
1272 /// # Durability
1273 ///
1274 /// Equivalent to `push`/`pop`: the overwritten bytes are durably synced
1275 /// before the call returns.
1276 ///
1277 /// # Errors
1278 ///
1279 /// Returns [`io::ErrorKind::InvalidInput`] if `offset + data.len()`
1280 /// exceeds the current payload size, or if the addition overflows `u64`.
1281 /// Propagates any I/O error from `write_all` or `durable_sync`.
1282 #[cfg(feature = "set")]
1283 pub fn set(&self, offset: u64, data: impl AsRef<[u8]>) -> io::Result<()> {
1284 let data = data.as_ref();
1285 if data.is_empty() {
1286 return Ok(());
1287 }
1288 let end = offset.checked_add(data.len() as u64).ok_or_else(|| {
1289 io::Error::new(
1290 io::ErrorKind::InvalidInput,
1291 "set: offset + len overflows u64",
1292 )
1293 })?;
1294 let mut file = self.lock.write().unwrap();
1295 // Load `locked` under the write lock — otherwise a concurrent
1296 // `lock_up_to` could extend the locked region between our check and
1297 // our write, letting us mutate a now-immutable byte.
1298 let locked = self.locked.load(Ordering::Acquire);
1299 if offset < locked {
1300 return Err(io::Error::new(
1301 io::ErrorKind::InvalidInput,
1302 format!("set: range [{offset}, {end}) overlaps locked region [0, {locked})"),
1303 ));
1304 }
1305 let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE);
1306 if end > data_size {
1307 return Err(io::Error::new(
1308 io::ErrorKind::InvalidInput,
1309 format!("set: write end ({end}) exceeds payload size ({data_size})"),
1310 ));
1311 }
1312 file.seek(SeekFrom::Start(HEADER_SIZE + offset))?;
1313 file.write_all(data)?;
1314 durable_sync(&file)
1315 }
1316
1317 /// Overwrite `n` bytes with zeros in place starting at logical `offset`.
1318 ///
1319 /// The file size is never changed: if `offset + n` would exceed
1320 /// the current payload size the call is rejected. `n = 0` is a
1321 /// valid no-op.
1322 ///
1323 /// # Feature flag
1324 ///
1325 /// Only available when the `set` Cargo feature is enabled.
1326 ///
1327 /// # Durability
1328 ///
1329 /// Equivalent to `push`/`pop`: the overwritten bytes are durably synced
1330 /// before the call returns.
1331 ///
1332 /// # Errors
1333 ///
1334 /// Returns [`io::ErrorKind::InvalidInput`] if `offset + n`
1335 /// exceeds the current payload size, or if the addition overflows `u64`.
1336 /// Propagates any I/O error from `write_all` or `durable_sync`.
1337 #[cfg(feature = "set")]
1338 pub fn zero(&self, offset: u64, n: u64) -> io::Result<()> {
1339 if n == 0 {
1340 return Ok(());
1341 }
1342 let end = offset.checked_add(n).ok_or_else(|| {
1343 io::Error::new(
1344 io::ErrorKind::InvalidInput,
1345 "zero: offset + n overflows u64",
1346 )
1347 })?;
1348 let mut file = self.lock.write().unwrap();
1349 // Load `locked` under the write lock (see `set` for rationale).
1350 let locked = self.locked.load(Ordering::Acquire);
1351 if offset < locked {
1352 return Err(io::Error::new(
1353 io::ErrorKind::InvalidInput,
1354 format!("zero: range [{offset}, {end}) overlaps locked region [0, {locked})"),
1355 ));
1356 }
1357 let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE);
1358 if end > data_size {
1359 return Err(io::Error::new(
1360 io::ErrorKind::InvalidInput,
1361 format!("zero: write end ({end}) exceeds payload size ({data_size})"),
1362 ));
1363 }
1364 file.seek(SeekFrom::Start(HEADER_SIZE + offset))?;
1365 let zeros = vec![0u8; n as usize];
1366 file.write_all(&zeros)?;
1367 durable_sync(&file)
1368 }
1369}
1370
1371// ---------------------------------------------------------------------------
1372// Atomic compound operations
1373
1374#[cfg(feature = "atomic")]
1375impl BStack {
1376 /// Cut `n` bytes off the tail then append `buf` as a single atomic operation.
1377 ///
1378 /// The operation ordering is chosen based on the net size change to maximise
1379 /// crash-recovery safety (see *Durability* in the crate docs):
1380 ///
1381 /// * **Net extension** (`buf.len() > n`): the file is extended first, `buf`
1382 /// is written into the freed tail region plus the new space, then a
1383 /// `durable_sync` commits the data before the header committed-length is
1384 /// updated. On crash before the header update, recovery truncates back to
1385 /// the original committed length — a clean rollback.
1386 ///
1387 /// * **Net truncation or same size** (`buf.len() ≤ n`): `buf` is written
1388 /// into the tail first, then the file is truncated, then `durable_sync`
1389 /// commits the result before the header is updated. On crash after
1390 /// truncation, recovery sets the committed length to the (smaller) file
1391 /// size, committing the final state.
1392 ///
1393 /// `n = 0` with an empty `buf` is a valid no-op.
1394 ///
1395 /// # Feature flag
1396 ///
1397 /// Only available when the `atomic` Cargo feature is enabled.
1398 ///
1399 /// # Errors
1400 ///
1401 /// Returns [`io::ErrorKind::InvalidInput`] if `n` exceeds the current
1402 /// payload size. Propagates any I/O error from `set_len`, `write_all`,
1403 /// or `durable_sync`.
1404 #[cfg(feature = "atomic")]
1405 pub fn atrunc(&self, n: u64, buf: impl AsRef<[u8]>) -> io::Result<()> {
1406 let buf = buf.as_ref();
1407 let buf_len = buf.len() as u64;
1408 if n == 0 && buf_len == 0 {
1409 return Ok(());
1410 }
1411 let mut file = self.lock.write().unwrap();
1412 let file_end = file.seek(SeekFrom::End(0))?;
1413 let data_size = file_end - HEADER_SIZE;
1414 if n > data_size {
1415 return Err(io::Error::new(
1416 io::ErrorKind::InvalidInput,
1417 format!("atrunc: n ({n}) exceeds payload size ({data_size})"),
1418 ));
1419 }
1420 let locked = self.locked.load(Ordering::Acquire);
1421 let new_tail_start = data_size - n;
1422 if new_tail_start < locked {
1423 return Err(io::Error::new(
1424 io::ErrorKind::InvalidInput,
1425 format!("atrunc: operation would modify locked region [0, {locked})"),
1426 ));
1427 }
1428 let tail_offset = HEADER_SIZE + new_tail_start;
1429 let final_data_len = new_tail_start + buf_len;
1430
1431 if buf_len > n {
1432 // Net extension: extend first so data is never lost, then write buf,
1433 // sync the data, then commit the new length.
1434 let new_file_end = HEADER_SIZE + final_data_len;
1435 file.set_len(new_file_end)?;
1436 file.seek(SeekFrom::Start(tail_offset))?;
1437 if let Err(e) = file.write_all(buf) {
1438 let _ = file.set_len(file_end);
1439 return Err(e);
1440 }
1441 if let Err(e) = durable_sync(&file) {
1442 let _ = file.set_len(file_end);
1443 return Err(e);
1444 }
1445 write_committed_len(&mut file, final_data_len)?;
1446 } else {
1447 // Net truncation or same size: write buf into the old tail first,
1448 // truncate, sync, then commit the new length.
1449 if !buf.is_empty() {
1450 file.seek(SeekFrom::Start(tail_offset))?;
1451 file.write_all(buf)?;
1452 }
1453 file.set_len(HEADER_SIZE + final_data_len)?;
1454 durable_sync(&file)?;
1455 write_committed_len(&mut file, final_data_len)?;
1456 }
1457 Ok(())
1458 }
1459
1460 /// Pop `n` bytes off the tail then append `buf`, returning the removed bytes.
1461 ///
1462 /// The bytes are read before any mutation, so they are always available in
1463 /// the returned `Vec` even if the subsequent write fails. The same
1464 /// ordering strategy as [`atrunc`](Self::atrunc) is used.
1465 ///
1466 /// `n = 0` with an empty `buf` is a valid no-op and returns an empty `Vec`.
1467 ///
1468 /// # Feature flag
1469 ///
1470 /// Only available when the `atomic` Cargo feature is enabled.
1471 ///
1472 /// # Errors
1473 ///
1474 /// Returns [`io::ErrorKind::InvalidInput`] if `n` exceeds the current
1475 /// payload size. Propagates any I/O error from `read_exact`, `set_len`,
1476 /// `write_all`, or `durable_sync`.
1477 #[cfg(feature = "atomic")]
1478 pub fn splice(&self, n: u64, buf: impl AsRef<[u8]>) -> io::Result<Vec<u8>> {
1479 let buf = buf.as_ref();
1480 let buf_len = buf.len() as u64;
1481 if n == 0 && buf_len == 0 {
1482 return Ok(Vec::new());
1483 }
1484 let mut file = self.lock.write().unwrap();
1485 let file_end = file.seek(SeekFrom::End(0))?;
1486 let data_size = file_end - HEADER_SIZE;
1487 if n > data_size {
1488 return Err(io::Error::new(
1489 io::ErrorKind::InvalidInput,
1490 format!("splice: n ({n}) exceeds payload size ({data_size})"),
1491 ));
1492 }
1493 let locked = self.locked.load(Ordering::Acquire);
1494 let new_tail_start = data_size - n;
1495 if new_tail_start < locked {
1496 return Err(io::Error::new(
1497 io::ErrorKind::InvalidInput,
1498 format!("splice: operation would modify locked region [0, {locked})"),
1499 ));
1500 }
1501 let tail_offset = HEADER_SIZE + new_tail_start;
1502 let final_data_len = new_tail_start + buf_len;
1503
1504 // Read the bytes to remove before any mutation.
1505 file.seek(SeekFrom::Start(tail_offset))?;
1506 let mut removed = vec![0u8; n as usize];
1507 file.read_exact(&mut removed)?;
1508
1509 if buf_len > n {
1510 // Net extension: extend first, write buf, sync, commit.
1511 let new_file_end = HEADER_SIZE + final_data_len;
1512 file.set_len(new_file_end)?;
1513 file.seek(SeekFrom::Start(tail_offset))?;
1514 if let Err(e) = file.write_all(buf) {
1515 let _ = file.set_len(file_end);
1516 return Err(e);
1517 }
1518 if let Err(e) = durable_sync(&file) {
1519 let _ = file.set_len(file_end);
1520 return Err(e);
1521 }
1522 write_committed_len(&mut file, final_data_len)?;
1523 } else {
1524 // Net truncation or same size: write buf, truncate, sync, commit.
1525 if !buf.is_empty() {
1526 file.seek(SeekFrom::Start(tail_offset))?;
1527 file.write_all(buf)?;
1528 }
1529 file.set_len(HEADER_SIZE + final_data_len)?;
1530 durable_sync(&file)?;
1531 write_committed_len(&mut file, final_data_len)?;
1532 }
1533
1534 Ok(removed)
1535 }
1536
1537 /// Pop `old.len()` bytes off the tail into `old`, then append `new`.
1538 ///
1539 /// Buffer-reuse counterpart of [`splice`](Self::splice): avoids allocating
1540 /// a `Vec` for the removed bytes by writing them into the caller-supplied
1541 /// `old` slice. The same ordering strategy as [`atrunc`](Self::atrunc) is
1542 /// used for the write/truncation side.
1543 ///
1544 /// An empty `old` with an empty `new` is a valid no-op.
1545 ///
1546 /// # Feature flag
1547 ///
1548 /// Only available when the `atomic` Cargo feature is enabled.
1549 ///
1550 /// # Errors
1551 ///
1552 /// Returns [`io::ErrorKind::InvalidInput`] if `old.len()` exceeds the
1553 /// current payload size. Propagates any I/O error from `read_exact`,
1554 /// `set_len`, `write_all`, or `durable_sync`.
1555 #[cfg(feature = "atomic")]
1556 pub fn splice_into(&self, old: &mut [u8], new: impl AsRef<[u8]>) -> io::Result<()> {
1557 let new = new.as_ref();
1558 let n = old.len() as u64;
1559 let new_len = new.len() as u64;
1560 if n == 0 && new_len == 0 {
1561 return Ok(());
1562 }
1563 let mut file = self.lock.write().unwrap();
1564 let file_end = file.seek(SeekFrom::End(0))?;
1565 let data_size = file_end - HEADER_SIZE;
1566 if n > data_size {
1567 return Err(io::Error::new(
1568 io::ErrorKind::InvalidInput,
1569 format!("splice_into: n ({n}) exceeds payload size ({data_size})"),
1570 ));
1571 }
1572 let locked = self.locked.load(Ordering::Acquire);
1573 let new_tail_start = data_size - n;
1574 if new_tail_start < locked {
1575 return Err(io::Error::new(
1576 io::ErrorKind::InvalidInput,
1577 format!("splice_into: operation would modify locked region [0, {locked})"),
1578 ));
1579 }
1580 let tail_offset = HEADER_SIZE + new_tail_start;
1581 let final_data_len = new_tail_start + new_len;
1582
1583 // Read the bytes to remove before any mutation.
1584 file.seek(SeekFrom::Start(tail_offset))?;
1585 file.read_exact(old)?;
1586
1587 if new_len > n {
1588 // Net extension: extend first, write new, sync, commit.
1589 let new_file_end = HEADER_SIZE + final_data_len;
1590 file.set_len(new_file_end)?;
1591 file.seek(SeekFrom::Start(tail_offset))?;
1592 if let Err(e) = file.write_all(new) {
1593 let _ = file.set_len(file_end);
1594 return Err(e);
1595 }
1596 if let Err(e) = durable_sync(&file) {
1597 let _ = file.set_len(file_end);
1598 return Err(e);
1599 }
1600 write_committed_len(&mut file, final_data_len)?;
1601 } else {
1602 // Net truncation or same size: write new, truncate, sync, commit.
1603 if !new.is_empty() {
1604 file.seek(SeekFrom::Start(tail_offset))?;
1605 file.write_all(new)?;
1606 }
1607 file.set_len(HEADER_SIZE + final_data_len)?;
1608 durable_sync(&file)?;
1609 write_committed_len(&mut file, final_data_len)?;
1610 }
1611 Ok(())
1612 }
1613
1614 /// Append `buf` only if the current logical payload size equals `s`.
1615 ///
1616 /// Returns `Ok(true)` if the size matched and `buf` was appended (or `buf`
1617 /// is empty and no I/O was needed). Returns `Ok(false)` without modifying
1618 /// the file if the size does not match.
1619 ///
1620 /// # Feature flag
1621 ///
1622 /// Only available when the `atomic` Cargo feature is enabled.
1623 ///
1624 /// # Errors
1625 ///
1626 /// Propagates any I/O error from `write_all`, `write_committed_len`, or
1627 /// `durable_sync`.
1628 #[cfg(feature = "atomic")]
1629 pub fn try_extend(&self, s: u64, buf: impl AsRef<[u8]>) -> io::Result<bool> {
1630 let buf = buf.as_ref();
1631 let mut file = self.lock.write().unwrap();
1632 let file_end = file.seek(SeekFrom::End(0))?;
1633 let data_size = file_end - HEADER_SIZE;
1634 if data_size != s {
1635 return Ok(false);
1636 }
1637 if buf.is_empty() {
1638 return Ok(true);
1639 }
1640 if let Err(e) = file.write_all(buf) {
1641 let _ = file.set_len(file_end);
1642 return Err(e);
1643 }
1644 let new_len = data_size + buf.len() as u64;
1645 if let Err(e) = write_committed_len(&mut file, new_len).and_then(|_| durable_sync(&file)) {
1646 let _ = file.set_len(file_end);
1647 let _ = write_committed_len(&mut file, data_size);
1648 return Err(e);
1649 }
1650 Ok(true)
1651 }
1652
1653 /// Discard `n` bytes only if the current logical payload size equals `s`.
1654 ///
1655 /// Returns `Ok(true)` if the size matched and `n` bytes were removed (or
1656 /// `n = 0` and the size check passed without I/O). Returns `Ok(false)`
1657 /// without modifying the file if the size does not match.
1658 ///
1659 /// When `n = 0` only the read lock is taken (no file mutation occurs).
1660 ///
1661 /// # Feature flag
1662 ///
1663 /// Only available when the `atomic` Cargo feature is enabled.
1664 ///
1665 /// # Errors
1666 ///
1667 /// Returns [`io::ErrorKind::InvalidInput`] if `n` exceeds the current
1668 /// payload size. Propagates any I/O error from `set_len`,
1669 /// `write_committed_len`, or `durable_sync`.
1670 #[cfg(feature = "atomic")]
1671 pub fn try_discard(&self, s: u64, n: u64) -> io::Result<bool> {
1672 if n == 0 {
1673 let file = self.lock.read().unwrap();
1674 let data_size = file.metadata()?.len().saturating_sub(HEADER_SIZE);
1675 return Ok(data_size == s);
1676 }
1677 let mut file = self.lock.write().unwrap();
1678 let raw_size = file.seek(SeekFrom::End(0))?;
1679 let data_size = raw_size - HEADER_SIZE;
1680 if data_size != s {
1681 return Ok(false);
1682 }
1683 if n > data_size {
1684 return Err(io::Error::new(
1685 io::ErrorKind::InvalidInput,
1686 format!("try_discard: n ({n}) exceeds payload size ({data_size})"),
1687 ));
1688 }
1689 let new_data_len = data_size - n;
1690 let locked = self.locked.load(Ordering::Acquire);
1691 if new_data_len < locked {
1692 return Err(io::Error::new(
1693 io::ErrorKind::InvalidInput,
1694 format!("try_discard: would shrink payload below locked length ({locked})"),
1695 ));
1696 }
1697 file.set_len(HEADER_SIZE + new_data_len)?;
1698 write_committed_len(&mut file, new_data_len)?;
1699 durable_sync(&file)?;
1700 Ok(true)
1701 }
1702
1703 /// Pop `n` bytes off the tail, pass them read-only to a callback that
1704 /// returns the new tail bytes, then write the new tail.
1705 ///
1706 /// The read, callback invocation, and write all happen under the same write
1707 /// lock, so no other thread can observe the state between the pop and the
1708 /// push. The callback may return a [`Vec<u8>`] of any length — the file
1709 /// will grow or shrink accordingly using the same crash-safe ordering
1710 /// strategy as [`atrunc`](Self::atrunc).
1711 ///
1712 /// `n = 0` is valid: the callback receives an empty slice and whatever it
1713 /// returns is appended.
1714 ///
1715 /// # Feature flag
1716 ///
1717 /// Only available when the `atomic` Cargo feature is enabled.
1718 ///
1719 /// # Errors
1720 ///
1721 /// Returns [`io::ErrorKind::InvalidInput`] if `n` exceeds the current
1722 /// payload size. Propagates any I/O error from `read_exact`, `set_len`,
1723 /// `write_all`, or `durable_sync`.
1724 #[cfg(feature = "atomic")]
1725 pub fn replace<F>(&self, n: u64, f: F) -> io::Result<()>
1726 where
1727 F: FnOnce(&[u8]) -> Vec<u8>,
1728 {
1729 let mut file = self.lock.write().unwrap();
1730 let file_end = file.seek(SeekFrom::End(0))?;
1731 let data_size = file_end - HEADER_SIZE;
1732 if n > data_size {
1733 return Err(io::Error::new(
1734 io::ErrorKind::InvalidInput,
1735 format!("replace: n ({n}) exceeds payload size ({data_size})"),
1736 ));
1737 }
1738 let locked = self.locked.load(Ordering::Acquire);
1739 let new_tail_start = data_size - n;
1740 if new_tail_start < locked {
1741 return Err(io::Error::new(
1742 io::ErrorKind::InvalidInput,
1743 format!("replace: operation would modify locked region [0, {locked})"),
1744 ));
1745 }
1746 let tail_offset = HEADER_SIZE + new_tail_start;
1747 file.seek(SeekFrom::Start(tail_offset))?;
1748 let mut old_tail = vec![0u8; n as usize];
1749 file.read_exact(&mut old_tail)?;
1750 let new_tail = f(&old_tail);
1751 let new_tail_len = new_tail.len() as u64;
1752 let final_data_len = new_tail_start + new_tail_len;
1753
1754 if new_tail_len > n {
1755 // Net extension: extend first, write new tail, sync, commit.
1756 let new_file_end = HEADER_SIZE + final_data_len;
1757 file.set_len(new_file_end)?;
1758 file.seek(SeekFrom::Start(tail_offset))?;
1759 if let Err(e) = file.write_all(&new_tail) {
1760 let _ = file.set_len(file_end);
1761 return Err(e);
1762 }
1763 if let Err(e) = durable_sync(&file) {
1764 let _ = file.set_len(file_end);
1765 return Err(e);
1766 }
1767 write_committed_len(&mut file, final_data_len)?;
1768 } else {
1769 // Net truncation or same size: write new tail, truncate, sync, commit.
1770 if !new_tail.is_empty() {
1771 file.seek(SeekFrom::Start(tail_offset))?;
1772 file.write_all(&new_tail)?;
1773 }
1774 file.set_len(HEADER_SIZE + final_data_len)?;
1775 durable_sync(&file)?;
1776 write_committed_len(&mut file, final_data_len)?;
1777 }
1778 Ok(())
1779 }
1780}
1781
1782#[cfg(all(feature = "set", feature = "atomic"))]
1783impl BStack {
1784 /// Atomically read `buf.len()` bytes at `offset` and overwrite them with
1785 /// `buf`, returning the old contents.
1786 ///
1787 /// Both the read and the write happen under the same write lock, so no
1788 /// other thread can observe either the pre-swap or mid-swap state. The
1789 /// file size is never changed.
1790 ///
1791 /// An empty `buf` is a valid no-op and returns an empty `Vec`.
1792 ///
1793 /// # Feature flags
1794 ///
1795 /// Only available when both the `set` and `atomic` Cargo features are
1796 /// enabled.
1797 ///
1798 /// # Errors
1799 ///
1800 /// Returns [`io::ErrorKind::InvalidInput`] if `offset + buf.len()`
1801 /// overflows `u64` or exceeds the current payload size. Propagates any
1802 /// I/O error from `read_exact`, `write_all`, or `durable_sync`.
1803 #[cfg(all(feature = "set", feature = "atomic"))]
1804 pub fn swap(&self, offset: u64, buf: impl AsRef<[u8]>) -> io::Result<Vec<u8>> {
1805 let buf = buf.as_ref();
1806 if buf.is_empty() {
1807 return Ok(Vec::new());
1808 }
1809 let end = offset.checked_add(buf.len() as u64).ok_or_else(|| {
1810 io::Error::new(
1811 io::ErrorKind::InvalidInput,
1812 "swap: offset + len overflows u64",
1813 )
1814 })?;
1815 let mut file = self.lock.write().unwrap();
1816 // Load `locked` under the write lock (see `set` for rationale).
1817 let locked = self.locked.load(Ordering::Acquire);
1818 if offset < locked {
1819 return Err(io::Error::new(
1820 io::ErrorKind::InvalidInput,
1821 format!("swap: range [{offset}, {end}) overlaps locked region [0, {locked})"),
1822 ));
1823 }
1824 let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE);
1825 if end > data_size {
1826 return Err(io::Error::new(
1827 io::ErrorKind::InvalidInput,
1828 format!("swap: range [{offset}, {end}) exceeds payload size ({data_size})"),
1829 ));
1830 }
1831 file.seek(SeekFrom::Start(HEADER_SIZE + offset))?;
1832 let mut old = vec![0u8; buf.len()];
1833 file.read_exact(&mut old)?;
1834 file.seek(SeekFrom::Start(HEADER_SIZE + offset))?;
1835 file.write_all(buf)?;
1836 durable_sync(&file)?;
1837 Ok(old)
1838 }
1839
1840 /// Atomically read `buf.len()` bytes at `offset` into `buf` while writing
1841 /// the original contents of `buf` into that position.
1842 ///
1843 /// On return, `buf` contains the bytes that were previously at `offset`,
1844 /// and the file contains what `buf` held on entry. Buffer-reuse
1845 /// counterpart of [`swap`](Self::swap).
1846 ///
1847 /// An empty `buf` is a valid no-op.
1848 ///
1849 /// # Feature flags
1850 ///
1851 /// Only available when both the `set` and `atomic` Cargo features are
1852 /// enabled.
1853 ///
1854 /// # Errors
1855 ///
1856 /// Returns [`io::ErrorKind::InvalidInput`] if `offset + buf.len()`
1857 /// overflows `u64` or exceeds the current payload size. Propagates any
1858 /// I/O error from `read_exact`, `write_all`, or `durable_sync`.
1859 #[cfg(all(feature = "set", feature = "atomic"))]
1860 pub fn swap_into(&self, offset: u64, buf: &mut [u8]) -> io::Result<()> {
1861 if buf.is_empty() {
1862 return Ok(());
1863 }
1864 let end = offset.checked_add(buf.len() as u64).ok_or_else(|| {
1865 io::Error::new(
1866 io::ErrorKind::InvalidInput,
1867 "swap_into: offset + len overflows u64",
1868 )
1869 })?;
1870 let mut file = self.lock.write().unwrap();
1871 // Load `locked` under the write lock (see `set` for rationale).
1872 let locked = self.locked.load(Ordering::Acquire);
1873 if offset < locked {
1874 return Err(io::Error::new(
1875 io::ErrorKind::InvalidInput,
1876 format!("swap_into: range [{offset}, {end}) overlaps locked region [0, {locked})"),
1877 ));
1878 }
1879 let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE);
1880 if end > data_size {
1881 return Err(io::Error::new(
1882 io::ErrorKind::InvalidInput,
1883 format!("swap_into: range [{offset}, {end}) exceeds payload size ({data_size})"),
1884 ));
1885 }
1886 file.seek(SeekFrom::Start(HEADER_SIZE + offset))?;
1887 let mut tmp = vec![0u8; buf.len()];
1888 file.read_exact(&mut tmp)?;
1889 file.seek(SeekFrom::Start(HEADER_SIZE + offset))?;
1890 file.write_all(buf)?;
1891 durable_sync(&file)?;
1892 buf.copy_from_slice(&tmp);
1893 Ok(())
1894 }
1895
1896 /// Compare-and-exchange: read `old.len()` bytes at `offset` and, if they
1897 /// equal `old`, overwrite them with `new`.
1898 ///
1899 /// Returns `Ok(true)` if the comparison succeeded and the exchange was
1900 /// performed. Returns `Ok(false)` without modifying the file if
1901 /// `old.len() != new.len()` or if the current bytes do not match `old`.
1902 ///
1903 /// Both the compare and the exchange happen under the same write lock.
1904 ///
1905 /// # Feature flags
1906 ///
1907 /// Only available when both the `set` and `atomic` Cargo features are
1908 /// enabled.
1909 ///
1910 /// # Errors
1911 ///
1912 /// Returns [`io::ErrorKind::InvalidInput`] if `offset + old.len()`
1913 /// overflows `u64` or exceeds the current payload size. Propagates any
1914 /// I/O error from `read_exact`, `write_all`, or `durable_sync`.
1915 #[cfg(all(feature = "set", feature = "atomic"))]
1916 pub fn cas(
1917 &self,
1918 offset: u64,
1919 old: impl AsRef<[u8]>,
1920 new: impl AsRef<[u8]>,
1921 ) -> io::Result<bool> {
1922 let old = old.as_ref();
1923 let new = new.as_ref();
1924 if old.len() != new.len() {
1925 return Ok(false);
1926 }
1927 if old.is_empty() {
1928 return Ok(true);
1929 }
1930 let end = offset.checked_add(old.len() as u64).ok_or_else(|| {
1931 io::Error::new(
1932 io::ErrorKind::InvalidInput,
1933 "cas: offset + len overflows u64",
1934 )
1935 })?;
1936 let mut file = self.lock.write().unwrap();
1937 // Load `locked` under the write lock (see `set` for rationale).
1938 let locked = self.locked.load(Ordering::Acquire);
1939 if offset < locked {
1940 return Err(io::Error::new(
1941 io::ErrorKind::InvalidInput,
1942 format!("cas: range [{offset}, {end}) overlaps locked region [0, {locked})"),
1943 ));
1944 }
1945 let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE);
1946 if end > data_size {
1947 return Err(io::Error::new(
1948 io::ErrorKind::InvalidInput,
1949 format!("cas: range [{offset}, {end}) exceeds payload size ({data_size})"),
1950 ));
1951 }
1952 file.seek(SeekFrom::Start(HEADER_SIZE + offset))?;
1953 let mut current = vec![0u8; old.len()];
1954 file.read_exact(&mut current)?;
1955 if current != old {
1956 return Ok(false);
1957 }
1958 file.seek(SeekFrom::Start(HEADER_SIZE + offset))?;
1959 file.write_all(new)?;
1960 durable_sync(&file)?;
1961 Ok(true)
1962 }
1963
1964 /// Read bytes in the half-open logical range `[start, end)`, pass them to
1965 /// a callback that may mutate them in place, then write the modified bytes
1966 /// back.
1967 ///
1968 /// The read, callback invocation, and write all happen under the same write
1969 /// lock, so no other thread can observe an intermediate state. The file
1970 /// size is never changed.
1971 ///
1972 /// `start == end` is a valid no-op: `f` is called with an empty slice and
1973 /// no I/O is performed beyond the initial size check.
1974 ///
1975 /// # Feature flags
1976 ///
1977 /// Only available when both the `set` and `atomic` Cargo features are
1978 /// enabled.
1979 ///
1980 /// # Errors
1981 ///
1982 /// Returns [`io::ErrorKind::InvalidInput`] if `end < start` or if `end`
1983 /// exceeds the current payload size. Propagates any I/O error from
1984 /// `read_exact`, `write_all`, or `durable_sync`.
1985 #[cfg(all(feature = "set", feature = "atomic"))]
1986 pub fn process<F>(&self, start: u64, end: u64, f: F) -> io::Result<()>
1987 where
1988 F: FnOnce(&mut [u8]),
1989 {
1990 if end < start {
1991 return Err(io::Error::new(
1992 io::ErrorKind::InvalidInput,
1993 format!("process: end ({end}) < start ({start})"),
1994 ));
1995 }
1996 let n = end - start;
1997 let mut file = self.lock.write().unwrap();
1998 let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE);
1999 if end > data_size {
2000 return Err(io::Error::new(
2001 io::ErrorKind::InvalidInput,
2002 format!("process: end ({end}) exceeds payload size ({data_size})"),
2003 ));
2004 }
2005 let locked = self.locked.load(Ordering::Acquire);
2006 if start < locked {
2007 return Err(io::Error::new(
2008 io::ErrorKind::InvalidInput,
2009 format!("process: range [{start}, {end}) overlaps locked region [0, {locked})"),
2010 ));
2011 }
2012 let mut buf = vec![0u8; n as usize];
2013 if n > 0 {
2014 file.seek(SeekFrom::Start(HEADER_SIZE + start))?;
2015 file.read_exact(&mut buf)?;
2016 }
2017 f(&mut buf);
2018 if n > 0 {
2019 file.seek(SeekFrom::Start(HEADER_SIZE + start))?;
2020 file.write_all(&buf)?;
2021 durable_sync(&file)?;
2022 }
2023 Ok(())
2024 }
2025}
2026
2027// ---------------------------------------------------------------------------
2028
2029impl BStack {
2030 /// Return the current **logical** payload size in bytes (excludes the
2031 /// 16-byte header).
2032 ///
2033 /// Takes the read lock, so it can run concurrently with other `len` calls
2034 /// but blocks while any write-lock operation is in progress. The returned
2035 /// value always reflects a clean operation boundary.
2036 ///
2037 /// # Errors
2038 ///
2039 /// Propagates any [`io::Error`] from [`File::metadata`].
2040 pub fn len(&self) -> io::Result<u64> {
2041 let file = self.lock.read().unwrap();
2042 Ok(file.metadata()?.len().saturating_sub(HEADER_SIZE))
2043 }
2044
2045 /// Return `true` if the stack contains no payload bytes.
2046 ///
2047 /// # Errors
2048 ///
2049 /// Propagates any [`io::Error`] from [`File::metadata`].
2050 pub fn is_empty(&self) -> io::Result<bool> {
2051 Ok(self.len()? == 0)
2052 }
2053
2054 /// Returns the current locked length. `0` means no bytes are locked.
2055 ///
2056 /// The locked region is `[0, locked_len())`. All bytes within this range
2057 /// are permanently immutable: writes and shrink operations that would
2058 /// touch them return [`io::ErrorKind::InvalidInput`], and reads to ranges
2059 /// entirely within it skip the rwlock on Unix and Windows.
2060 pub fn locked_len(&self) -> u64 {
2061 self.locked.load(Ordering::Acquire)
2062 }
2063
2064 /// Extend the locked region to cover `[0, n)`.
2065 ///
2066 /// `n` must be ≥ the current locked length and ≤ the current payload
2067 /// length. After this call, reads to `[0, n)` are lock-free on Unix and
2068 /// Windows, and all write and shrink operations that would touch `[0, n)`
2069 /// return [`io::ErrorKind::InvalidInput`].
2070 ///
2071 /// Acquires the exclusive write lock to ensure all in-flight writes to
2072 /// `[0, n)` have completed before the region is declared immutable.
2073 ///
2074 /// # Errors
2075 ///
2076 /// Returns [`io::ErrorKind::InvalidInput`] if `n` is less than the current
2077 /// locked length (partition can only grow) or if `n` exceeds the current
2078 /// payload length.
2079 pub fn lock_up_to(&self, n: u64) -> io::Result<()> {
2080 // Acquire the write lock to serialise against any in-flight writers.
2081 let file = self.lock.write().unwrap();
2082 let data_size = file.metadata()?.len().saturating_sub(HEADER_SIZE);
2083 let current_locked = self.locked.load(Ordering::Relaxed);
2084 if n < current_locked {
2085 return Err(io::Error::new(
2086 io::ErrorKind::InvalidInput,
2087 format!(
2088 "lock_up_to: n ({n}) is less than the current locked length ({current_locked})"
2089 ),
2090 ));
2091 }
2092 if n > data_size {
2093 return Err(io::Error::new(
2094 io::ErrorKind::InvalidInput,
2095 format!("lock_up_to: n ({n}) exceeds payload size ({data_size})"),
2096 ));
2097 }
2098 // Release store: all writes completed under the write lock above are
2099 // visible to any thread that subsequently loads `locked` with Acquire.
2100 self.locked.store(n, Ordering::Release);
2101 drop(file);
2102 Ok(())
2103 }
2104
2105 /// Open a `BStack` and immediately lock the first `n` bytes.
2106 ///
2107 /// Equivalent to [`BStack::open`] followed by [`lock_up_to`](Self::lock_up_to),
2108 /// but expressed as a single call for the common pattern where the locked
2109 /// region is known ahead of time (e.g. a fixed-size metadata block whose
2110 /// size is a compile-time or configuration constant).
2111 ///
2112 /// # Errors
2113 ///
2114 /// Propagates all errors from [`open`](Self::open). Returns
2115 /// [`io::ErrorKind::InvalidInput`] if `n` exceeds the payload length of
2116 /// the opened file.
2117 pub fn open_locked_up_to(path: impl AsRef<Path>, n: u64) -> io::Result<Self> {
2118 let stack = Self::open(path)?;
2119 stack.lock_up_to(n)?;
2120 Ok(stack)
2121 }
2122}
2123
2124// ---------------------------------------------------------------------------
2125// io::Write
2126
2127/// Appends bytes to the stack.
2128///
2129/// Each call to [`write`](io::Write::write) is equivalent to [`push`](BStack::push):
2130/// all bytes are written atomically and durably synced before returning.
2131/// Calling `write_all` or chaining multiple `write` calls therefore issues
2132/// one `durable_sync` per call — callers that need to batch many small writes
2133/// without per-write syncs should accumulate data and call `push` directly.
2134///
2135/// [`flush`](io::Write::flush) is a no-op because every `write` is already
2136/// durable.
2137impl io::Write for BStack {
2138 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
2139 self.push(buf)?;
2140 Ok(buf.len())
2141 }
2142
2143 fn flush(&mut self) -> io::Result<()> {
2144 Ok(())
2145 }
2146}
2147
2148/// Shared-reference counterpart of `impl Write for BStack`.
2149///
2150/// Because [`push`](BStack::push) takes `&self` (interior mutability via
2151/// `RwLock`), the `Write` implementation is also available on `&BStack`,
2152/// mirroring the standard library's `impl Write for &File`.
2153impl io::Write for &BStack {
2154 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
2155 self.push(buf)?;
2156 Ok(buf.len())
2157 }
2158
2159 fn flush(&mut self) -> io::Result<()> {
2160 Ok(())
2161 }
2162}
2163
2164impl fmt::Debug for BStack {
2165 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2166 f.debug_struct("BStack")
2167 .field(
2168 "version",
2169 &format!("{}.{}.{}", MAGIC[4], MAGIC[5], MAGIC[6]),
2170 )
2171 .field("len", &self.len().ok())
2172 .finish_non_exhaustive()
2173 }
2174}
2175
2176impl Eq for BStack {}
2177
2178/// Two `BStack` instances are equal iff they are the **same instance** in memory.
2179///
2180/// Because [`BStack::open`] acquires an exclusive advisory lock, no two
2181/// `BStack` values within one process can refer to the same file at the same
2182/// time. Pointer identity is therefore the only meaningful equality: a stack
2183/// is equal to itself and to nothing else.
2184impl PartialEq for BStack {
2185 fn eq(&self, other: &Self) -> bool {
2186 std::ptr::eq(self, other)
2187 }
2188}
2189
2190/// Hashes the instance address, consistent with the pointer-identity [`PartialEq`].
2191impl Hash for BStack {
2192 fn hash<H: Hasher>(&self, state: &mut H) {
2193 (self as *const BStack).hash(state);
2194 }
2195}
2196
2197/// A cursor-based reader over a [`BStack`] payload.
2198///
2199/// `BStackReader` implements [`io::Read`] and [`io::Seek`], allowing the
2200/// stack's payload to be consumed through any interface that expects a
2201/// readable, seekable byte stream.
2202///
2203/// # Construction
2204///
2205/// ```no_run
2206/// use bstack::BStack;
2207///
2208/// # fn main() -> std::io::Result<()> {
2209/// let stack = BStack::open("log.bin")?;
2210/// stack.push(b"hello world")?;
2211///
2212/// // Start reading from the beginning.
2213/// let mut reader = stack.reader();
2214///
2215/// // Or start from an arbitrary offset.
2216/// let mut mid = stack.reader_at(6);
2217/// # Ok(())
2218/// # }
2219/// ```
2220///
2221/// # Concurrency
2222///
2223/// `BStackReader` borrows the stack immutably, so multiple readers can coexist
2224/// and run concurrently with each other and with [`peek`](BStack::peek) /
2225/// [`get`](BStack::get) calls. Concurrent [`push`](BStack::push) or
2226/// [`pop`](BStack::pop) operations are not blocked by an active reader, but
2227/// reading interleaved with writes may observe different snapshots of the
2228/// payload across calls — callers are responsible for synchronisation when
2229/// that matters.
2230pub struct BStackReader<'a> {
2231 stack: &'a BStack,
2232 offset: u64,
2233}
2234
2235impl BStack {
2236 /// Create a [`BStackReader`] positioned at the start of the payload.
2237 pub fn reader(&self) -> BStackReader<'_> {
2238 BStackReader {
2239 stack: self,
2240 offset: 0,
2241 }
2242 }
2243
2244 /// Create a [`BStackReader`] positioned at `offset` bytes into the payload.
2245 ///
2246 /// Seeking past the current end is allowed; [`read`](io::Read::read) will
2247 /// return `Ok(0)` until new data is pushed past that point.
2248 pub fn reader_at(&self, offset: u64) -> BStackReader<'_> {
2249 BStackReader {
2250 stack: self,
2251 offset,
2252 }
2253 }
2254}
2255
2256impl<'a> BStackReader<'a> {
2257 /// Return the current logical read offset within the payload.
2258 pub fn position(&self) -> u64 {
2259 self.offset
2260 }
2261}
2262
2263impl<'a> From<&'a BStack> for BStackReader<'a> {
2264 fn from(stack: &'a BStack) -> Self {
2265 stack.reader()
2266 }
2267}
2268
2269impl<'a> From<BStackReader<'a>> for &'a BStack {
2270 fn from(val: BStackReader<'a>) -> Self {
2271 val.stack
2272 }
2273}
2274
2275/// Two readers are equal when they point to the **same `BStack` instance**
2276/// (pointer identity) and share the same cursor `offset`.
2277impl<'a> PartialEq for BStackReader<'a> {
2278 fn eq(&self, other: &Self) -> bool {
2279 self.stack == other.stack && self.offset == other.offset
2280 }
2281}
2282
2283impl<'a> Eq for BStackReader<'a> {}
2284
2285/// Hashes `(BStack pointer, offset)`, consistent with [`PartialEq`].
2286impl<'a> Hash for BStackReader<'a> {
2287 fn hash<H: Hasher>(&self, state: &mut H) {
2288 self.stack.hash(state);
2289 self.offset.hash(state);
2290 }
2291}
2292
2293impl<'a> PartialOrd for BStackReader<'a> {
2294 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
2295 Some(self.cmp(other))
2296 }
2297}
2298
2299/// Ordered by `BStack` instance address, then by cursor `offset`.
2300///
2301/// The address component groups all readers over the same stack together,
2302/// and within that group the natural read order (smaller offset first) applies.
2303/// This ordering is consistent with the pointer-identity [`PartialEq`].
2304impl<'a> Ord for BStackReader<'a> {
2305 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
2306 let self_ptr = self.stack as *const BStack as usize;
2307 let other_ptr = other.stack as *const BStack as usize;
2308 self_ptr
2309 .cmp(&other_ptr)
2310 .then(self.offset.cmp(&other.offset))
2311 }
2312}
2313
2314impl<'a> io::Read for BStackReader<'a> {
2315 /// Read bytes from the current position into `buf`.
2316 ///
2317 /// Returns the number of bytes read, which may be less than `buf.len()` if
2318 /// the end of the payload is reached. Returns `Ok(0)` at EOF.
2319 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
2320 if buf.is_empty() {
2321 return Ok(0);
2322 }
2323 let data_size = self.stack.len()?;
2324 if self.offset >= data_size {
2325 return Ok(0);
2326 }
2327 let available = (data_size - self.offset) as usize;
2328 let n = buf.len().min(available);
2329 self.stack.get_into(self.offset, &mut buf[..n])?;
2330 self.offset += n as u64;
2331 Ok(n)
2332 }
2333}
2334
2335impl<'a> io::Seek for BStackReader<'a> {
2336 /// Move the read cursor.
2337 ///
2338 /// [`SeekFrom::Start`] and [`SeekFrom::Current`] with a non-negative delta
2339 /// may advance the cursor past the current end of the payload; subsequent
2340 /// [`read`](io::Read::read) calls will return `Ok(0)` until the payload
2341 /// grows past that point. Seeking before the start of the payload returns
2342 /// [`io::ErrorKind::InvalidInput`].
2343 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
2344 let data_size = self.stack.len()? as i128;
2345 let new_offset = match pos {
2346 SeekFrom::Start(n) => n as i128,
2347 SeekFrom::End(n) => data_size + n as i128,
2348 SeekFrom::Current(n) => self.offset as i128 + n as i128,
2349 };
2350 if new_offset < 0 {
2351 return Err(io::Error::new(
2352 io::ErrorKind::InvalidInput,
2353 "seek before beginning of payload",
2354 ));
2355 }
2356 self.offset = new_offset as u64;
2357 Ok(self.offset)
2358 }
2359}