objects/store/fs/
fs_io.rs1#![deny(clippy::cast_possible_truncation)]
3
4use std::{
7 collections::BTreeSet,
8 fs::File,
9 io::Read,
10 path::{Path, PathBuf},
11 sync::Mutex,
12};
13
14use bytes::Bytes;
15
16use crate::{
17 error::HeddleError,
18 fs_atomic::{enrich_fs_error, enrich_rename_error, sync_directory},
19 store::{Result, atomic::temp_path},
20};
21
22const MMAP_THRESHOLD_BYTES: u64 = 256 * 1024;
23
24pub(super) enum FileBytes {
25 Vec(Vec<u8>),
26 Mmap(memmap2::Mmap),
27}
28
29impl FileBytes {
30 pub(super) fn as_slice(&self) -> &[u8] {
31 match self {
32 FileBytes::Vec(data) => data,
33 FileBytes::Mmap(data) => data,
34 }
35 }
36
37}
38
39#[derive(Clone, Copy, Debug, Eq, PartialEq)]
40pub(super) enum AtomicWriteMode {
41 Durable,
42 BatchDirectorySync,
43 NoSync,
53}
54
55pub(super) fn write_atomic(
56 path: &Path,
57 data: &[u8],
58 mode: AtomicWriteMode,
59 pending_directory_syncs: Option<&Mutex<BTreeSet<PathBuf>>>,
60) -> Result<()> {
61 let parent = path
62 .parent()
63 .ok_or_else(|| std::io::Error::other("invalid atomic write path"))?;
64 std::fs::create_dir_all(parent)
65 .map_err(|e| HeddleError::Io(enrich_fs_error(parent, "creating", e)))?;
66
67 let temp_path = temp_path(path);
68 enum Op {
76 Write,
77 Rename,
78 SyncDir,
79 }
80 let mut failing_op = Op::Write;
81 let write_result: std::io::Result<()> = (|| {
82 let mut opts = std::fs::OpenOptions::new();
91 opts.write(true).create_new(true);
92 #[cfg(unix)]
93 {
94 use std::os::unix::fs::OpenOptionsExt;
95 opts.mode(0o644);
96 }
97 let mut file = opts.open(&temp_path)?;
98 use std::io::Write as _;
99 file.write_all(data)?;
100 match mode {
101 AtomicWriteMode::Durable => file.sync_all()?,
106 AtomicWriteMode::BatchDirectorySync => file.sync_data()?,
118 AtomicWriteMode::NoSync => {}
122 }
123 failing_op = Op::Rename;
124 std::fs::rename(&temp_path, path)?;
125 failing_op = Op::SyncDir;
126 match mode {
127 AtomicWriteMode::Durable => sync_directory(parent)?,
128 AtomicWriteMode::BatchDirectorySync => {
129 if let Some(pending) = pending_directory_syncs {
130 let mut dirs = pending.lock().map_err(|_| {
131 std::io::Error::other("failed to acquire pending directory sync lock")
132 })?;
133 dirs.insert(parent.to_path_buf());
134 }
135 }
136 AtomicWriteMode::NoSync => {}
137 }
138 Ok(())
139 })();
140
141 if let Err(err) = write_result {
142 let _ = std::fs::remove_file(&temp_path);
143 let wrapped = match failing_op {
144 Op::Write => enrich_fs_error(path, "writing", err),
145 Op::Rename => enrich_rename_error(&temp_path, path, err),
146 Op::SyncDir => enrich_fs_error(parent, "syncing", err),
147 };
148 return Err(HeddleError::Io(wrapped));
149 }
150
151 Ok(())
152}
153
154pub(super) fn read_file_header(path: &Path, header_len: usize) -> Result<Option<(Vec<u8>, u64)>> {
163 let mut file = match File::open(path) {
164 Ok(file) => file,
165 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
166 Err(e) => return Err(e.into()),
167 };
168
169 let metadata = file.metadata()?;
170 let len = metadata.len();
171 let to_read = if len > header_len as u64 {
172 header_len
173 } else {
174 checked_file_len_to_usize(len)?
175 };
176 let mut header = vec![0u8; to_read];
177 if to_read > 0 {
178 use std::io::Read as _;
179 file.read_exact(&mut header)?;
180 }
181 Ok(Some((header, len)))
182}
183
184pub fn read_file_bytes_for_pack(path: &Path) -> Result<Bytes> {
191 let file = File::open(path)?;
192 let len = file.metadata()?.len();
193 if len == 0 {
194 return Ok(Bytes::new());
195 }
196 if len >= MMAP_THRESHOLD_BYTES {
197 let mmap = unsafe { memmap2::MmapOptions::new().map(&file)? };
198 if mmap.len() != checked_file_len_to_usize(len)? {
199 return Err(HeddleError::InvalidObject(
200 "pack file size changed during memory mapping".to_string(),
201 ));
202 }
203 return Ok(Bytes::from_owner(mmap));
204 }
205 let mut data = Vec::with_capacity(checked_file_len_to_usize(len)?);
206 let mut reader = file;
207 reader.read_to_end(&mut data)?;
208 Ok(Bytes::from(data))
209}
210
211pub(super) fn read_file_bytes(path: &Path) -> Result<Option<FileBytes>> {
212 let file = match File::open(path) {
213 Ok(file) => file,
214 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
215 Err(e) => return Err(e.into()),
216 };
217
218 let metadata = file.metadata()?;
219 let len = metadata.len();
220 if len == 0 {
221 return Ok(Some(FileBytes::Vec(vec![])));
222 }
223 if len >= MMAP_THRESHOLD_BYTES {
224 let mmap = unsafe { memmap2::MmapOptions::new().map(&file)? };
225 if mmap.len() != checked_file_len_to_usize(len)? {
226 return Err(crate::store::HeddleError::InvalidObject(
227 "file size changed during memory mapping".to_string(),
228 ));
229 }
230 return Ok(Some(FileBytes::Mmap(mmap)));
231 }
232
233 let mut data = Vec::with_capacity(checked_file_len_to_usize(len)?);
234 let mut reader = file;
235 reader.read_to_end(&mut data)?;
236 Ok(Some(FileBytes::Vec(data)))
237}
238
239fn checked_file_len_to_usize(len: u64) -> Result<usize> {
240 usize::try_from(len).map_err(|_| {
241 HeddleError::InvalidObject(format!("file length {len} exceeds platform limits"))
242 })
243}
244
245pub(super) fn list_hashes_from_dir(
247 dir: &std::path::Path,
248) -> Result<Vec<crate::object::ContentHash>> {
249 use std::fs;
250
251 use tracing::debug;
252
253 if !dir.exists() {
254 return Ok(Vec::new());
255 }
256
257 let mut hashes = Vec::new();
258 for entry in fs::read_dir(dir)? {
259 let entry = entry?;
260 let path = entry.path();
261 if path.is_dir() {
262 let prefix = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
263 if prefix.len() == 2 {
264 for sub_entry in fs::read_dir(&path)? {
265 let sub_entry = sub_entry?;
266 let sub_path = sub_entry.path();
267 if let Some(name) = sub_path.file_name().and_then(|n| n.to_str()) {
268 let full_hash = format!("{}{}", prefix, name);
269 if let Ok(hash) = crate::object::ContentHash::from_hex(&full_hash) {
270 hashes.push(hash);
271 }
272 }
273 }
274 }
275 }
276 }
277 debug!(count = hashes.len(), "Listed hashes");
278 Ok(hashes)
279}