objects/store/fs/
fs_io.rs1#![deny(clippy::cast_possible_truncation)]
3
4use std::{
7 collections::BTreeSet,
8 fs::File,
9 io::Read,
10 path::{Path, PathBuf},
11 sync::Mutex,
12};
13
14use bytes::Bytes;
15
16use crate::{
17 error::HeddleError,
18 fs_atomic::{enrich_fs_error, enrich_rename_error, sync_directory},
19 store::{Result, atomic::temp_path},
20};
21
22const MMAP_THRESHOLD_BYTES: u64 = 256 * 1024;
23
24pub(super) enum FileBytes {
25 Vec(Vec<u8>),
26 Mmap(memmap2::Mmap),
27}
28
29impl FileBytes {
30 pub(super) fn as_slice(&self) -> &[u8] {
31 match self {
32 FileBytes::Vec(data) => data,
33 FileBytes::Mmap(data) => data,
34 }
35 }
36
37 pub(super) fn into_vec(self) -> Vec<u8> {
38 match self {
39 FileBytes::Vec(data) => data,
40 FileBytes::Mmap(data) => data.to_vec(),
41 }
42 }
43}
44
45#[derive(Clone, Copy, Debug, Eq, PartialEq)]
46pub(super) enum AtomicWriteMode {
47 Durable,
48 BatchDirectorySync,
49 NoSync,
59}
60
61pub(super) fn write_atomic(
62 path: &Path,
63 data: &[u8],
64 mode: AtomicWriteMode,
65 pending_directory_syncs: Option<&Mutex<BTreeSet<PathBuf>>>,
66) -> Result<()> {
67 let parent = path
68 .parent()
69 .ok_or_else(|| std::io::Error::other("invalid atomic write path"))?;
70 std::fs::create_dir_all(parent)
71 .map_err(|e| HeddleError::Io(enrich_fs_error(parent, "creating", e)))?;
72
73 let temp_path = temp_path(path);
74 enum Op {
82 Write,
83 Rename,
84 SyncDir,
85 }
86 let mut failing_op = Op::Write;
87 let write_result: std::io::Result<()> = (|| {
88 let mut opts = std::fs::OpenOptions::new();
97 opts.write(true).create_new(true);
98 #[cfg(unix)]
99 {
100 use std::os::unix::fs::OpenOptionsExt;
101 opts.mode(0o644);
102 }
103 let mut file = opts.open(&temp_path)?;
104 use std::io::Write as _;
105 file.write_all(data)?;
106 match mode {
107 AtomicWriteMode::Durable => file.sync_all()?,
112 AtomicWriteMode::BatchDirectorySync => file.sync_data()?,
124 AtomicWriteMode::NoSync => {}
128 }
129 failing_op = Op::Rename;
130 std::fs::rename(&temp_path, path)?;
131 failing_op = Op::SyncDir;
132 match mode {
133 AtomicWriteMode::Durable => sync_directory(parent)?,
134 AtomicWriteMode::BatchDirectorySync => {
135 if let Some(pending) = pending_directory_syncs {
136 let mut dirs = pending.lock().map_err(|_| {
137 std::io::Error::other("failed to acquire pending directory sync lock")
138 })?;
139 dirs.insert(parent.to_path_buf());
140 }
141 }
142 AtomicWriteMode::NoSync => {}
143 }
144 Ok(())
145 })();
146
147 if let Err(err) = write_result {
148 let _ = std::fs::remove_file(&temp_path);
149 let wrapped = match failing_op {
150 Op::Write => enrich_fs_error(path, "writing", err),
151 Op::Rename => enrich_rename_error(&temp_path, path, err),
152 Op::SyncDir => enrich_fs_error(parent, "syncing", err),
153 };
154 return Err(HeddleError::Io(wrapped));
155 }
156
157 Ok(())
158}
159
160pub(super) fn read_file_header(path: &Path, header_len: usize) -> Result<Option<(Vec<u8>, u64)>> {
169 let mut file = match File::open(path) {
170 Ok(file) => file,
171 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
172 Err(e) => return Err(e.into()),
173 };
174
175 let metadata = file.metadata()?;
176 let len = metadata.len();
177 let to_read = if len > header_len as u64 {
178 header_len
179 } else {
180 checked_file_len_to_usize(len)?
181 };
182 let mut header = vec![0u8; to_read];
183 if to_read > 0 {
184 use std::io::Read as _;
185 file.read_exact(&mut header)?;
186 }
187 Ok(Some((header, len)))
188}
189
190pub fn read_file_bytes_for_pack(path: &Path) -> Result<Bytes> {
197 let file = File::open(path)?;
198 let len = file.metadata()?.len();
199 if len == 0 {
200 return Ok(Bytes::new());
201 }
202 if len >= MMAP_THRESHOLD_BYTES {
203 let mmap = unsafe { memmap2::MmapOptions::new().map(&file)? };
204 if mmap.len() != checked_file_len_to_usize(len)? {
205 return Err(HeddleError::InvalidObject(
206 "pack file size changed during memory mapping".to_string(),
207 ));
208 }
209 return Ok(Bytes::from_owner(mmap));
210 }
211 let mut data = Vec::with_capacity(checked_file_len_to_usize(len)?);
212 let mut reader = file;
213 reader.read_to_end(&mut data)?;
214 Ok(Bytes::from(data))
215}
216
217pub(super) fn read_file_bytes(path: &Path) -> Result<Option<FileBytes>> {
218 let file = match File::open(path) {
219 Ok(file) => file,
220 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
221 Err(e) => return Err(e.into()),
222 };
223
224 let metadata = file.metadata()?;
225 let len = metadata.len();
226 if len == 0 {
227 return Ok(Some(FileBytes::Vec(vec![])));
228 }
229 if len >= MMAP_THRESHOLD_BYTES {
230 let mmap = unsafe { memmap2::MmapOptions::new().map(&file)? };
231 if mmap.len() != checked_file_len_to_usize(len)? {
232 return Err(crate::store::HeddleError::InvalidObject(
233 "file size changed during memory mapping".to_string(),
234 ));
235 }
236 return Ok(Some(FileBytes::Mmap(mmap)));
237 }
238
239 let mut data = Vec::with_capacity(checked_file_len_to_usize(len)?);
240 let mut reader = file;
241 reader.read_to_end(&mut data)?;
242 Ok(Some(FileBytes::Vec(data)))
243}
244
245fn checked_file_len_to_usize(len: u64) -> Result<usize> {
246 usize::try_from(len).map_err(|_| {
247 HeddleError::InvalidObject(format!("file length {len} exceeds platform limits"))
248 })
249}
250
251pub(super) fn list_hashes_from_dir(
253 dir: &std::path::Path,
254) -> Result<Vec<crate::object::ContentHash>> {
255 use std::fs;
256
257 use tracing::debug;
258
259 if !dir.exists() {
260 return Ok(Vec::new());
261 }
262
263 let mut hashes = Vec::new();
264 for entry in fs::read_dir(dir)? {
265 let entry = entry?;
266 let path = entry.path();
267 if path.is_dir() {
268 let prefix = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
269 if prefix.len() == 2 {
270 for sub_entry in fs::read_dir(&path)? {
271 let sub_entry = sub_entry?;
272 let sub_path = sub_entry.path();
273 if let Some(name) = sub_path.file_name().and_then(|n| n.to_str()) {
274 let full_hash = format!("{}{}", prefix, name);
275 if let Ok(hash) = crate::object::ContentHash::from_hex(&full_hash) {
276 hashes.push(hash);
277 }
278 }
279 }
280 }
281 }
282 }
283 debug!(count = hashes.len(), "Listed hashes");
284 Ok(hashes)
285}