1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
// Copyright (c) 2024-present, fjall-rs
// This source code is licensed under both the Apache 2.0 and MIT License
// (found in the LICENSE-* files in the repository)
use crate::{
Slice,
fs::{Fs, FsFile},
};
use std::{io::Write, path::Path};
pub const MAGIC_BYTES: [u8; 4] = [b'L', b'S', b'M', 3];
pub const TABLES_FOLDER: &str = "tables";
pub const BLOBS_FOLDER: &str = "blobs";
pub const CURRENT_VERSION_FILE: &str = "current";
/// Reads bytes from a file at the given offset without changing the cursor.
///
/// Uses [`FsFile::read_at`] (equivalent to `pread(2)`) so multiple threads
/// can call this concurrently on the same file handle.
pub fn read_exact(file: &dyn FsFile, offset: u64, size: usize) -> std::io::Result<Slice> {
// SAFETY: This slice builder starts uninitialized, but we know its length
//
// We use FsFile::read_at which gives us the number of bytes read.
// If that number does not match the slice length, the function errors,
// so the (partially) uninitialized buffer is discarded.
//
// Additionally, generally, block loads furthermore do a checksum check which
// would likely catch the buffer being wrong somehow.
#[expect(unsafe_code, reason = "see safety")]
let mut builder = unsafe { Slice::builder_unzeroed(size) };
// Single call is correct: FsFile::read_at has fill-or-EOF semantics —
// implementations handle EINTR/short-read retry internally.
let bytes_read = file.read_at(&mut builder, offset)?;
if bytes_read != size {
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
format!(
"read_exact({bytes_read}) at {offset} did not read enough bytes {size}; file has length {}",
file.metadata()?.len
),
));
}
Ok(builder.freeze().into())
}
/// Atomically rewrites a file via the [`Fs`] trait.
///
/// Writes `content` to a temporary file in the same directory, fsyncs it,
/// then renames over `path`. This ensures readers never see a partial write.
pub fn rewrite_atomic(path: &Path, content: &[u8], fs: &dyn Fs) -> std::io::Result<()> {
use crate::fs::FsOpenOptions;
use std::sync::atomic::{AtomicU64, Ordering};
static TEMP_SEQ: AtomicU64 = AtomicU64::new(0);
#[expect(
clippy::expect_used,
reason = "every file should have a parent directory"
)]
let folder = path.parent().expect("should have a parent");
let pid = std::process::id();
// Retry with incrementing seq on AlreadyExists — handles leftover temp
// files from a previous crash (PID can be reused, especially in containers).
let tmp_path = loop {
let seq = TEMP_SEQ.fetch_add(1, Ordering::Relaxed);
let candidate = folder.join(format!(".tmp_{pid}_{seq}"));
match fs.open(
&candidate,
&FsOpenOptions::new().write(true).create_new(true),
) {
Ok(mut file) => {
let write_result = file
.write_all(content)
.and_then(|()| file.flush())
.and_then(|()| FsFile::sync_all(&*file));
if let Err(e) = write_result {
drop(file);
let _ = fs.remove_file(&candidate);
return Err(e);
}
break candidate;
}
// Leftover temp file from a previous crash — retry with next seq.
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {}
Err(e) => return Err(e),
}
};
// std::fs::rename overwrites existing destinations on all platforms
// (Rust uses MoveFileExW with MOVEFILE_REPLACE_EXISTING on Windows).
if let Err(e) = fs.rename(&tmp_path, path) {
let _ = fs.remove_file(&tmp_path);
return Err(e);
}
fsync_directory(folder, fs)?;
Ok(())
}
/// Delegates directory sync to the backend.
///
/// On Windows, `StdFs::sync_directory` already returns `Ok(())` (directory
/// fsync is unsupported), but non-`StdFs` backends (e.g., `MemFs`) may use
/// this call for path validation. Always delegate rather than short-circuiting.
pub fn fsync_directory(path: &Path, fs: &dyn Fs) -> std::io::Result<()> {
fs.sync_directory(path)
}
#[cfg(test)]
#[allow(
clippy::unwrap_used,
clippy::indexing_slicing,
clippy::useless_vec,
reason = "test code"
)]
mod tests {
use super::*;
use crate::fs::StdFs;
use std::fs::File;
use std::io::Write;
use test_log::test;
#[test]
fn read_exact_short_read_returns_error() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let path = dir.path().join("short.bin");
{
let mut f = File::create(&path)?;
f.write_all(b"hello")?; // 5 bytes
}
let file = File::open(&path)?;
// Request 10 bytes from a 5-byte file → short read → UnexpectedEof
let err = read_exact(&file, 0, 10).unwrap_err();
assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
Ok(())
}
#[test]
fn atomic_rewrite() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let path = dir.path().join("test.txt");
{
let mut file = File::create(&path)?;
write!(file, "asdasdasdasdasd")?;
}
rewrite_atomic(&path, b"newcontent", &StdFs)?;
let content = std::fs::read_to_string(&path)?;
assert_eq!("newcontent", content);
Ok(())
}
/// Verifies that `StdFs::rename` atomically replaces an existing
/// destination file — the contract required by `rewrite_atomic`.
#[test]
fn std_fs_rename_replaces_existing_file() -> crate::Result<()> {
use crate::fs::{Fs, FsOpenOptions};
let dir = tempfile::tempdir()?;
let src = dir.path().join("src.txt");
let dst = dir.path().join("dst.txt");
// Create both files via Fs trait.
let opts = FsOpenOptions::new().write(true).create(true);
let mut f = StdFs.open(&src, &opts)?;
f.write_all(b"new")?;
drop(f);
let mut f = StdFs.open(&dst, &opts)?;
f.write_all(b"old")?;
drop(f);
StdFs.rename(&src, &dst)?;
// dst now has src content, src is gone.
let content = std::fs::read_to_string(&dst)?;
assert_eq!("new", content);
assert!(!src.exists());
Ok(())
}
}