objects/store/fs/
fs_io.rs1#![deny(clippy::cast_possible_truncation)]
3
4use std::{
7 collections::BTreeSet,
8 fs::File,
9 io::Read,
10 path::{Path, PathBuf},
11 sync::Mutex,
12};
13
14use bytes::Bytes;
15
16use crate::{
17 error::HeddleError,
18 fs_atomic::{enrich_fs_error, enrich_rename_error, sync_directory, temp_path},
19 store::Result,
20};
21
22const MMAP_THRESHOLD_BYTES: u64 = 256 * 1024;
23
24pub(super) enum FileBytes {
25 Vec(Vec<u8>),
26 Mmap(memmap2::Mmap),
27}
28
29impl FileBytes {
30 pub(super) fn as_slice(&self) -> &[u8] {
31 match self {
32 FileBytes::Vec(data) => data,
33 FileBytes::Mmap(data) => data,
34 }
35 }
36}
37
38#[derive(Clone, Copy, Debug, Eq, PartialEq)]
39pub(super) enum AtomicWriteMode {
40 Durable,
41 BatchDirectorySync,
42 NoSync,
52}
53
54pub(super) fn write_atomic(
55 path: &Path,
56 data: &[u8],
57 mode: AtomicWriteMode,
58 pending_directory_syncs: Option<&Mutex<BTreeSet<PathBuf>>>,
59) -> Result<()> {
60 let parent = path
61 .parent()
62 .ok_or_else(|| std::io::Error::other("invalid atomic write path"))?;
63 std::fs::create_dir_all(parent)
64 .map_err(|e| HeddleError::Io(enrich_fs_error(parent, "creating", e)))?;
65
66 let temp_path = temp_path(path);
67 enum Op {
75 Write,
76 Rename,
77 SyncDir,
78 }
79 let mut failing_op = Op::Write;
80 let write_result: std::io::Result<()> = (|| {
81 let mut opts = std::fs::OpenOptions::new();
90 opts.write(true).create_new(true);
91 #[cfg(unix)]
92 {
93 use std::os::unix::fs::OpenOptionsExt;
94 opts.mode(0o644);
95 }
96 let mut file = opts.open(&temp_path)?;
97 use std::io::Write as _;
98 file.write_all(data)?;
99 match mode {
100 AtomicWriteMode::Durable => file.sync_all()?,
105 AtomicWriteMode::BatchDirectorySync => file.sync_data()?,
117 AtomicWriteMode::NoSync => {}
121 }
122 failing_op = Op::Rename;
123 std::fs::rename(&temp_path, path)?;
124 failing_op = Op::SyncDir;
125 match mode {
126 AtomicWriteMode::Durable => sync_directory(parent)?,
127 AtomicWriteMode::BatchDirectorySync => {
128 if let Some(pending) = pending_directory_syncs {
129 let mut dirs = pending.lock().map_err(|_| {
130 std::io::Error::other("failed to acquire pending directory sync lock")
131 })?;
132 dirs.insert(parent.to_path_buf());
133 }
134 }
135 AtomicWriteMode::NoSync => {}
136 }
137 Ok(())
138 })();
139
140 if let Err(err) = write_result {
141 let _ = std::fs::remove_file(&temp_path);
142 let wrapped = match failing_op {
143 Op::Write => enrich_fs_error(path, "writing", err),
144 Op::Rename => enrich_rename_error(&temp_path, path, err),
145 Op::SyncDir => enrich_fs_error(parent, "syncing", err),
146 };
147 return Err(HeddleError::Io(wrapped));
148 }
149
150 Ok(())
151}
152
153pub(super) fn read_file_header(path: &Path, header_len: usize) -> Result<Option<(Vec<u8>, u64)>> {
162 let mut file = match File::open(path) {
163 Ok(file) => file,
164 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
165 Err(e) => return Err(e.into()),
166 };
167
168 let metadata = file.metadata()?;
169 let len = metadata.len();
170 let to_read = if len > header_len as u64 {
171 header_len
172 } else {
173 checked_file_len_to_usize(len)?
174 };
175 let mut header = vec![0u8; to_read];
176 if to_read > 0 {
177 use std::io::Read as _;
178 file.read_exact(&mut header)?;
179 }
180 Ok(Some((header, len)))
181}
182
183pub fn read_file_bytes_for_pack(path: &Path) -> Result<Bytes> {
190 let file = File::open(path)?;
191 let len = file.metadata()?.len();
192 if len == 0 {
193 return Ok(Bytes::new());
194 }
195 if len >= MMAP_THRESHOLD_BYTES {
196 let mmap = unsafe { memmap2::MmapOptions::new().map(&file)? };
197 if mmap.len() != checked_file_len_to_usize(len)? {
198 return Err(HeddleError::InvalidObject(
199 "pack file size changed during memory mapping".to_string(),
200 ));
201 }
202 return Ok(Bytes::from_owner(mmap));
203 }
204 let mut data = Vec::with_capacity(checked_file_len_to_usize(len)?);
205 let mut reader = file;
206 reader.read_to_end(&mut data)?;
207 Ok(Bytes::from(data))
208}
209
210pub(super) fn read_file_bytes(path: &Path) -> Result<Option<FileBytes>> {
211 let file = match File::open(path) {
212 Ok(file) => file,
213 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
214 Err(e) => return Err(e.into()),
215 };
216
217 let metadata = file.metadata()?;
218 let len = metadata.len();
219 if len == 0 {
220 return Ok(Some(FileBytes::Vec(vec![])));
221 }
222 if len >= MMAP_THRESHOLD_BYTES {
223 let mmap = unsafe { memmap2::MmapOptions::new().map(&file)? };
224 if mmap.len() != checked_file_len_to_usize(len)? {
225 return Err(crate::store::HeddleError::InvalidObject(
226 "file size changed during memory mapping".to_string(),
227 ));
228 }
229 return Ok(Some(FileBytes::Mmap(mmap)));
230 }
231
232 let mut data = Vec::with_capacity(checked_file_len_to_usize(len)?);
233 let mut reader = file;
234 reader.read_to_end(&mut data)?;
235 Ok(Some(FileBytes::Vec(data)))
236}
237
238fn checked_file_len_to_usize(len: u64) -> Result<usize> {
239 usize::try_from(len).map_err(|_| {
240 HeddleError::InvalidObject(format!("file length {len} exceeds platform limits"))
241 })
242}
243
244pub(super) fn list_hashes_from_dir(
246 dir: &std::path::Path,
247) -> Result<Vec<crate::object::ContentHash>> {
248 use std::fs;
249
250 use tracing::debug;
251
252 if !dir.exists() {
253 return Ok(Vec::new());
254 }
255
256 let mut hashes = Vec::new();
257 for entry in fs::read_dir(dir)? {
258 let entry = entry?;
259 let path = entry.path();
260 if path.is_dir() {
261 let prefix = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
262 if prefix.len() == 2 {
263 for sub_entry in fs::read_dir(&path)? {
264 let sub_entry = sub_entry?;
265 let sub_path = sub_entry.path();
266 if let Some(name) = sub_path.file_name().and_then(|n| n.to_str()) {
267 let full_hash = format!("{}{}", prefix, name);
268 if let Ok(hash) = crate::object::ContentHash::from_hex(&full_hash) {
269 hashes.push(hash);
270 }
271 }
272 }
273 }
274 }
275 }
276 debug!(count = hashes.len(), "Listed hashes");
277 Ok(hashes)
278}