Skip to main content

objects/store/fs/
fs_io.rs

1// SPDX-License-Identifier: Apache-2.0
2#![deny(clippy::cast_possible_truncation)]
3
4//! IO helpers for FsStore.
5
6use std::{
7    collections::BTreeSet,
8    fs::File,
9    io::Read,
10    path::{Path, PathBuf},
11    sync::Mutex,
12};
13
14use bytes::Bytes;
15
16use crate::{
17    error::HeddleError,
18    fs_atomic::{enrich_fs_error, enrich_rename_error, sync_directory},
19    store::{Result, atomic::temp_path},
20};
21
22const MMAP_THRESHOLD_BYTES: u64 = 256 * 1024;
23
24pub(super) enum FileBytes {
25    Vec(Vec<u8>),
26    Mmap(memmap2::Mmap),
27}
28
29impl FileBytes {
30    pub(super) fn as_slice(&self) -> &[u8] {
31        match self {
32            FileBytes::Vec(data) => data,
33            FileBytes::Mmap(data) => data,
34        }
35    }
36
37    pub(super) fn into_vec(self) -> Vec<u8> {
38        match self {
39            FileBytes::Vec(data) => data,
40            FileBytes::Mmap(data) => data.to_vec(),
41        }
42    }
43}
44
45#[derive(Clone, Copy, Debug, Eq, PartialEq)]
46pub(super) enum AtomicWriteMode {
47    Durable,
48    BatchDirectorySync,
49    /// No fsync at all. Caller asserts the file is a recoverable
50    /// cache mirror — the authoritative copy lives elsewhere
51    /// (typically a pack) and re-derivation on read is correct.
52    /// On macOS APFS, `sync_data` alone is ~5 ms per call
53    /// (`F_FULLFSYNC`-class cost); skipping it cuts cache-write
54    /// throughput from ~200 writes/s to ~5500 writes/s. The price
55    /// is that a torn write after a crash could leave the cached
56    /// file with garbage bytes — readers must guard with a hash
57    /// check before trusting the content.
58    NoSync,
59}
60
61pub(super) fn write_atomic(
62    path: &Path,
63    data: &[u8],
64    mode: AtomicWriteMode,
65    pending_directory_syncs: Option<&Mutex<BTreeSet<PathBuf>>>,
66) -> Result<()> {
67    let parent = path
68        .parent()
69        .ok_or_else(|| std::io::Error::other("invalid atomic write path"))?;
70    std::fs::create_dir_all(parent)
71        .map_err(|e| HeddleError::Io(enrich_fs_error(parent, "creating", e)))?;
72
73    let temp_path = temp_path(path);
74    // Tag each fallible op with the verb that should appear in the
75    // user-facing message if it trips. We compute the wrapped error at
76    // the boundary so an EXDEV from `rename` gets the src+dst-aware
77    // message via `enrich_rename_error`, while a write into the temp
78    // file gets the "writing" verb against the destination path. The
79    // deferred-directory-sync lock-poison case is non-IO and gets a
80    // synthetic `io::Error::other`.
81    enum Op {
82        Write,
83        Rename,
84        SyncDir,
85    }
86    let mut failing_op = Op::Write;
87    let write_result: std::io::Result<()> = (|| {
88        // Open with explicit mode 0o644 instead of relying on the
89        // process umask. This makes loose objects byte-and-mode
90        // deterministic: clonefile on macOS preserves source mode,
91        // so a worktree materialised from a loose blob inherits
92        // 0o644 *without* an extra chmod. `repository_materialization`
93        // skips `set_file_mode` on non-executable files because of
94        // this contract — see `materialize_blob`'s comment near the
95        // `set_file_mode(dest, true)` call.
96        let mut opts = std::fs::OpenOptions::new();
97        opts.write(true).create_new(true);
98        #[cfg(unix)]
99        {
100            use std::os::unix::fs::OpenOptionsExt;
101            opts.mode(0o644);
102        }
103        let mut file = opts.open(&temp_path)?;
104        use std::io::Write as _;
105        file.write_all(data)?;
106        match mode {
107            // `Durable` is the strongest mode: data + metadata fsync
108            // before rename, then directory fsync after — so the file
109            // is fully on disk and discoverable through the parent
110            // directory before this returns.
111            AtomicWriteMode::Durable => file.sync_all()?,
112            // `BatchDirectorySync` keeps per-file content durability
113            // (so a crash mid-batch can't leave a renamed-but-empty
114            // file behind) but defers parent-directory fsyncs to
115            // `flush_snapshot_write_batch`. The trees + state file
116            // written during a snapshot rely on this mode for
117            // durability of their *contents*; the deferred dir fsync
118            // is what makes the rename observable to a fresh process.
119            // Without `sync_data` here, a crash after rename + before
120            // flush could leave a file that "exists" in the directory
121            // but whose data blocks weren't flushed — exactly the
122            // ACID violation we want to avoid for state/tree writes.
123            AtomicWriteMode::BatchDirectorySync => file.sync_data()?,
124            // Cache-mirror writes: no fsync. Caller guards reads
125            // with a hash check, so torn-write corruption is
126            // recoverable (re-promote from the authoritative copy).
127            AtomicWriteMode::NoSync => {}
128        }
129        failing_op = Op::Rename;
130        std::fs::rename(&temp_path, path)?;
131        failing_op = Op::SyncDir;
132        match mode {
133            AtomicWriteMode::Durable => sync_directory(parent)?,
134            AtomicWriteMode::BatchDirectorySync => {
135                if let Some(pending) = pending_directory_syncs {
136                    let mut dirs = pending.lock().map_err(|_| {
137                        std::io::Error::other("failed to acquire pending directory sync lock")
138                    })?;
139                    dirs.insert(parent.to_path_buf());
140                }
141            }
142            AtomicWriteMode::NoSync => {}
143        }
144        Ok(())
145    })();
146
147    if let Err(err) = write_result {
148        let _ = std::fs::remove_file(&temp_path);
149        let wrapped = match failing_op {
150            Op::Write => enrich_fs_error(path, "writing", err),
151            Op::Rename => enrich_rename_error(&temp_path, path, err),
152            Op::SyncDir => enrich_fs_error(parent, "syncing", err),
153        };
154        return Err(HeddleError::Io(wrapped));
155    }
156
157    Ok(())
158}
159
160/// Read the file's header (up to `header_len` bytes) and report its
161/// total on-disk size, without loading the body. Returns `Ok(None)`
162/// when the file is missing.
163///
164/// Used by [`crate::store::ObjectStore::blob_size`] on `FsStore` to
165/// avoid pulling whole blobs through `get_blob` just to learn their
166/// uncompressed size — the size is recorded in the compression header
167/// for compressed blobs, and equals the file length for raw blobs.
168pub(super) fn read_file_header(path: &Path, header_len: usize) -> Result<Option<(Vec<u8>, u64)>> {
169    let mut file = match File::open(path) {
170        Ok(file) => file,
171        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
172        Err(e) => return Err(e.into()),
173    };
174
175    let metadata = file.metadata()?;
176    let len = metadata.len();
177    let to_read = if len > header_len as u64 {
178        header_len
179    } else {
180        checked_file_len_to_usize(len)?
181    };
182    let mut header = vec![0u8; to_read];
183    if to_read > 0 {
184        use std::io::Read as _;
185        file.read_exact(&mut header)?;
186    }
187    Ok(Some((header, len)))
188}
189
190/// Read a pack file as zero-copy [`Bytes`]. For packs that clear the
191/// mmap threshold, the underlying memory is the mmap'd region —
192/// every `Bytes::slice` into it is a zero-copy view. Smaller packs
193/// fall back to a heap read wrapped in `Bytes`. Public because the
194/// pack reader lives in a sibling module and needs to bypass the
195/// `pub(super)` gate on `read_file_bytes`.
196pub fn read_file_bytes_for_pack(path: &Path) -> Result<Bytes> {
197    let file = File::open(path)?;
198    let len = file.metadata()?.len();
199    if len == 0 {
200        return Ok(Bytes::new());
201    }
202    if len >= MMAP_THRESHOLD_BYTES {
203        let mmap = unsafe { memmap2::MmapOptions::new().map(&file)? };
204        if mmap.len() != checked_file_len_to_usize(len)? {
205            return Err(HeddleError::InvalidObject(
206                "pack file size changed during memory mapping".to_string(),
207            ));
208        }
209        return Ok(Bytes::from_owner(mmap));
210    }
211    let mut data = Vec::with_capacity(checked_file_len_to_usize(len)?);
212    let mut reader = file;
213    reader.read_to_end(&mut data)?;
214    Ok(Bytes::from(data))
215}
216
217pub(super) fn read_file_bytes(path: &Path) -> Result<Option<FileBytes>> {
218    let file = match File::open(path) {
219        Ok(file) => file,
220        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
221        Err(e) => return Err(e.into()),
222    };
223
224    let metadata = file.metadata()?;
225    let len = metadata.len();
226    if len == 0 {
227        return Ok(Some(FileBytes::Vec(vec![])));
228    }
229    if len >= MMAP_THRESHOLD_BYTES {
230        let mmap = unsafe { memmap2::MmapOptions::new().map(&file)? };
231        if mmap.len() != checked_file_len_to_usize(len)? {
232            return Err(crate::store::HeddleError::InvalidObject(
233                "file size changed during memory mapping".to_string(),
234            ));
235        }
236        return Ok(Some(FileBytes::Mmap(mmap)));
237    }
238
239    let mut data = Vec::with_capacity(checked_file_len_to_usize(len)?);
240    let mut reader = file;
241    reader.read_to_end(&mut data)?;
242    Ok(Some(FileBytes::Vec(data)))
243}
244
245fn checked_file_len_to_usize(len: u64) -> Result<usize> {
246    usize::try_from(len).map_err(|_| {
247        HeddleError::InvalidObject(format!("file length {len} exceeds platform limits"))
248    })
249}
250
251/// List all content hashes from a sharded directory structure (aa/bbcc... → aabbcc...).
252pub(super) fn list_hashes_from_dir(
253    dir: &std::path::Path,
254) -> Result<Vec<crate::object::ContentHash>> {
255    use std::fs;
256
257    use tracing::debug;
258
259    if !dir.exists() {
260        return Ok(Vec::new());
261    }
262
263    let mut hashes = Vec::new();
264    for entry in fs::read_dir(dir)? {
265        let entry = entry?;
266        let path = entry.path();
267        if path.is_dir() {
268            let prefix = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
269            if prefix.len() == 2 {
270                for sub_entry in fs::read_dir(&path)? {
271                    let sub_entry = sub_entry?;
272                    let sub_path = sub_entry.path();
273                    if let Some(name) = sub_path.file_name().and_then(|n| n.to_str()) {
274                        let full_hash = format!("{}{}", prefix, name);
275                        if let Ok(hash) = crate::object::ContentHash::from_hex(&full_hash) {
276                            hashes.push(hash);
277                        }
278                    }
279                }
280            }
281        }
282    }
283    debug!(count = hashes.len(), "Listed hashes");
284    Ok(hashes)
285}