1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
use std::fs::{File, OpenOptions};
use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
use std::path::Path;
use crate::io::locking::{self, FileLocking, LockMode};
/// Default buffer size for buffered I/O (64 KiB).
const DEFAULT_BUF_SIZE: usize = 64 * 1024;
/// Wraps `std::fs::File` with buffered, positioned I/O convenience methods.
///
/// Write operations go through a `BufWriter` to merge small writes.
/// Read operations go through a `BufReader` to reduce syscall overhead.
/// The buffers are flushed automatically when switching between
/// read and write operations.
pub struct FileHandle {
file: Option<File>,
mode: Mode,
/// Active locking policy. Used when downgrading the lock for SWMR.
/// When the policy is [`FileLocking::Disabled`], no lock was taken.
lock_policy: FileLocking,
/// True if a lock is currently held on the underlying file.
lock_held: bool,
}
enum Mode {
/// Read/write capable, currently buffering writes.
Writer(BufWriter<File>),
/// Read/write capable, currently buffering reads.
Reader(BufReader<File>),
/// Read-only file.
ReadOnly(BufReader<File>),
/// Transitional state while swapping buffers.
Transitioning,
}
impl FileHandle {
/// Create a new file with the env-var-derived locking policy.
pub fn create(path: &Path) -> std::io::Result<Self> {
Self::create_with_locking(path, FileLocking::from_env_or(FileLocking::default()))
}
/// Create a new file (truncating if it already exists) opened for
/// read/write access, with an explicit locking policy.
///
/// The lock is acquired *before* the file is truncated, so a lock
/// conflict on an existing file does not destroy its contents.
pub fn create_with_locking(path: &Path, policy: FileLocking) -> std::io::Result<Self> {
// Open without O_TRUNC first so that we can validate the lock
// before destroying any existing data.
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(path)?;
let lock_held = locking::try_acquire(&file, LockMode::Exclusive, policy)?;
// Now that the lock is held (or skipped per policy), truncate.
file.set_len(0)?;
Ok(Self {
file: None,
mode: Mode::Writer(BufWriter::with_capacity(DEFAULT_BUF_SIZE, file)),
lock_policy: policy,
lock_held,
})
}
/// Open an existing file for read-only access with the env-var-derived
/// locking policy.
pub fn open_read(path: &Path) -> std::io::Result<Self> {
Self::open_read_with_locking(path, FileLocking::from_env_or(FileLocking::default()))
}
/// Open an existing file for read-only access with an explicit locking
/// policy. A shared lock is taken so multiple readers can coexist.
pub fn open_read_with_locking(path: &Path, policy: FileLocking) -> std::io::Result<Self> {
let file = OpenOptions::new().read(true).open(path)?;
let lock_held = locking::try_acquire(&file, LockMode::Shared, policy)?;
Ok(Self {
file: None,
mode: Mode::ReadOnly(BufReader::with_capacity(DEFAULT_BUF_SIZE, file)),
lock_policy: policy,
lock_held,
})
}
/// Open an existing file for read/write access with the env-var-derived
/// locking policy.
pub fn open_readwrite(path: &Path) -> std::io::Result<Self> {
Self::open_readwrite_with_locking(path, FileLocking::from_env_or(FileLocking::default()))
}
/// Open an existing file for read/write access with an explicit locking
/// policy. An exclusive lock is taken.
pub fn open_readwrite_with_locking(path: &Path, policy: FileLocking) -> std::io::Result<Self> {
let file = OpenOptions::new().read(true).write(true).open(path)?;
let lock_held = locking::try_acquire(&file, LockMode::Exclusive, policy)?;
Ok(Self {
file: None,
mode: Mode::Writer(BufWriter::with_capacity(DEFAULT_BUF_SIZE, file)),
lock_policy: policy,
lock_held,
})
}
/// Locking policy this handle was opened with.
pub fn lock_policy(&self) -> FileLocking {
self.lock_policy
}
/// Whether a lock is currently held on this handle.
pub fn lock_held(&self) -> bool {
self.lock_held
}
/// Release the OS-level lock so concurrent SWMR readers (and other
/// openers) can attach. No-op if the policy is
/// [`FileLocking::Disabled`] or no lock is held.
///
/// We don't try to *downgrade* the exclusive lock to shared here:
/// Windows' `LockFileEx` is a mandatory range lock, and an
/// `unlock` followed by `try_lock_shared` on the same handle leaves
/// the file in a state where subsequent `WriteFile` calls through
/// that handle can fail with `ERROR_LOCK_VIOLATION`. Instead we
/// release the lock entirely — matching the HDF5 C library, which
/// also doesn't enforce reader/writer separation purely through
/// OS locks during SWMR streaming.
pub fn release_lock(&mut self) -> std::io::Result<()> {
if !self.lock_held || matches!(self.lock_policy, FileLocking::Disabled) {
return Ok(());
}
// Flush any pending writes so they hit disk before the lock
// window opens.
self.flush_buffers()?;
locking::release(self.get_file_ref())?;
self.lock_held = false;
Ok(())
}
/// Extract the raw `File` from the current mode, flushing if needed.
fn take_file(&mut self) -> std::io::Result<File> {
let old = std::mem::replace(&mut self.mode, Mode::Transitioning);
match old {
Mode::Writer(w) => w.into_inner().map_err(|e| e.into_error()),
Mode::Reader(r) => Ok(r.into_inner()),
Mode::ReadOnly(r) => Ok(r.into_inner()),
Mode::Transitioning => {
// Use stashed file if available
self.file
.take()
.ok_or_else(|| std::io::Error::other("no file available"))
}
}
}
/// Ensure we are in writer mode.
fn ensure_writer(&mut self) -> std::io::Result<()> {
match &self.mode {
Mode::Writer(_) => return Ok(()),
Mode::ReadOnly(_) => {
return Err(std::io::Error::new(
std::io::ErrorKind::PermissionDenied,
"file opened read-only",
));
}
_ => {}
}
let file = self.take_file()?;
self.mode = Mode::Writer(BufWriter::with_capacity(DEFAULT_BUF_SIZE, file));
Ok(())
}
/// Ensure we are in reader mode.
fn ensure_reader(&mut self) -> std::io::Result<()> {
match &self.mode {
Mode::Reader(_) | Mode::ReadOnly(_) => return Ok(()),
_ => {}
}
let file = self.take_file()?;
self.mode = Mode::Reader(BufReader::with_capacity(DEFAULT_BUF_SIZE, file));
Ok(())
}
/// Write `data` at the given byte offset.
pub fn write_at(&mut self, offset: u64, data: &[u8]) -> std::io::Result<()> {
self.ensure_writer()?;
if let Mode::Writer(w) = &mut self.mode {
w.seek(SeekFrom::Start(offset))?;
w.write_all(data)?;
}
Ok(())
}
/// Read exactly `len` bytes starting at the given byte offset.
pub fn read_at(&mut self, offset: u64, len: usize) -> std::io::Result<Vec<u8>> {
self.ensure_reader()?;
match &mut self.mode {
Mode::Reader(r) | Mode::ReadOnly(r) => {
// `offset`/`len` are often file-derived; reject a request
// larger than the file before allocating, so a corrupt size
// field cannot drive an unbounded allocation.
let file_len = r.get_ref().metadata()?.len();
let end = offset.checked_add(len as u64);
if end.is_none_or(|e| e > file_len) {
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
format!("read past end: offset={offset} len={len} file_size={file_len}"),
));
}
r.seek(SeekFrom::Start(offset))?;
let mut buf = vec![0u8; len];
r.read_exact(&mut buf)?;
Ok(buf)
}
_ => unreachable!(),
}
}
/// Read up to `max_len` bytes starting at the given byte offset.
pub fn read_at_most(&mut self, offset: u64, max_len: usize) -> std::io::Result<Vec<u8>> {
self.ensure_reader()?;
match &mut self.mode {
Mode::Reader(r) | Mode::ReadOnly(r) => {
// Clamp the allocation to what the file can actually hold.
let file_len = r.get_ref().metadata()?.len();
let avail = file_len.saturating_sub(offset);
let max_len = (max_len as u64).min(avail) as usize;
r.seek(SeekFrom::Start(offset))?;
let mut buf = vec![0u8; max_len];
let mut total = 0;
loop {
match r.read(&mut buf[total..]) {
Ok(0) => break,
Ok(n) => total += n,
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
}
}
buf.truncate(total);
Ok(buf)
}
_ => unreachable!(),
}
}
/// Flush file data (not necessarily metadata) to disk.
pub fn sync_data(&mut self) -> std::io::Result<()> {
self.flush_buffers()?;
self.get_file_ref().sync_data()
}
/// Flush both file data and metadata to disk.
pub fn sync_all(&mut self) -> std::io::Result<()> {
self.flush_buffers()?;
self.get_file_ref().sync_all()
}
/// Return the current file size by seeking to the end.
pub fn file_size(&mut self) -> std::io::Result<u64> {
self.ensure_reader()?;
match &mut self.mode {
Mode::Reader(r) | Mode::ReadOnly(r) => {
let pos = r.seek(SeekFrom::End(0))?;
Ok(pos)
}
_ => unreachable!(),
}
}
/// Flush any pending buffered writes.
fn flush_buffers(&mut self) -> std::io::Result<()> {
if let Mode::Writer(w) = &mut self.mode {
w.flush()?;
}
Ok(())
}
/// Get a reference to the underlying File for sync operations.
fn get_file_ref(&self) -> &File {
match &self.mode {
Mode::Writer(w) => w.get_ref(),
Mode::Reader(r) | Mode::ReadOnly(r) => r.get_ref(),
Mode::Transitioning => unreachable!("sync during transition"),
}
}
}
/// A memory-mapped read-only file handle for zero-copy reads.
///
/// Available when the `mmap` feature is enabled.
#[cfg(feature = "mmap")]
pub struct MmapFileHandle {
mmap: memmap2::Mmap,
/// Keep the underlying file alive so the OS lock survives for the
/// lifetime of this handle. (The mmap itself doesn't pin the fd.)
_file: File,
}
#[cfg(feature = "mmap")]
impl MmapFileHandle {
/// Open a file with memory mapping for read-only access, using the
/// env-var-derived locking policy.
pub fn open(path: &Path) -> std::io::Result<Self> {
Self::open_with_locking(path, FileLocking::from_env_or(FileLocking::default()))
}
/// Open a file with memory mapping with an explicit locking policy.
/// A shared lock is taken (mmap is read-only) so the handle blocks
/// concurrent writers as long as it lives.
pub fn open_with_locking(path: &Path, policy: FileLocking) -> std::io::Result<Self> {
let file = File::open(path)?;
// Take the shared lock BEFORE mmapping so the mmap doesn't
// capture a snapshot of a file that's being concurrently
// modified.
let _ = locking::try_acquire(&file, LockMode::Shared, policy)?;
let mmap = unsafe { memmap2::Mmap::map(&file)? };
Ok(Self { mmap, _file: file })
}
/// Return the total size of the mapped file.
pub fn len(&self) -> usize {
self.mmap.len()
}
/// Return whether the file is empty.
pub fn is_empty(&self) -> bool {
self.mmap.is_empty()
}
/// Read exactly `len` bytes at `offset`. Zero-copy: returns a slice.
pub fn read_at(&self, offset: u64, len: usize) -> std::io::Result<&[u8]> {
// `offset`/`len` are file-derived; compute the end in u64 and reject
// overflow so a hostile value cannot wrap past the bounds check.
let end = offset
.checked_add(len as u64)
.filter(|&e| e <= self.mmap.len() as u64);
match end {
Some(end) => Ok(&self.mmap[offset as usize..end as usize]),
None => Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
format!(
"mmap read past end: offset={} len={} file_size={}",
offset,
len,
self.mmap.len()
),
)),
}
}
/// Read up to `max_len` bytes at `offset`. Returns a slice.
pub fn read_at_most(&self, offset: u64, max_len: usize) -> &[u8] {
if offset >= self.mmap.len() as u64 {
return &[];
}
let start = offset as usize;
let end = (start as u64)
.saturating_add(max_len as u64)
.min(self.mmap.len() as u64) as usize;
&self.mmap[start..end]
}
}