Skip to main content

objects/store/fs/
fs_io.rs

1// SPDX-License-Identifier: Apache-2.0
2#![deny(clippy::cast_possible_truncation)]
3
4//! IO helpers for FsStore.
5
6use std::{
7    collections::BTreeSet,
8    fs::File,
9    io::Read,
10    path::{Path, PathBuf},
11    sync::Mutex,
12};
13
14use bytes::Bytes;
15
16use crate::{
17    error::HeddleError,
18    fs_atomic::{enrich_fs_error, enrich_rename_error, sync_directory},
19    store::{Result, atomic::temp_path},
20};
21
22const MMAP_THRESHOLD_BYTES: u64 = 256 * 1024;
23
24pub(super) enum FileBytes {
25    Vec(Vec<u8>),
26    Mmap(memmap2::Mmap),
27}
28
29impl FileBytes {
30    pub(super) fn as_slice(&self) -> &[u8] {
31        match self {
32            FileBytes::Vec(data) => data,
33            FileBytes::Mmap(data) => data,
34        }
35    }
36
37}
38
39#[derive(Clone, Copy, Debug, Eq, PartialEq)]
40pub(super) enum AtomicWriteMode {
41    Durable,
42    BatchDirectorySync,
43    /// No fsync at all. Caller asserts the file is a recoverable
44    /// cache mirror — the authoritative copy lives elsewhere
45    /// (typically a pack) and re-derivation on read is correct.
46    /// On macOS APFS, `sync_data` alone is ~5 ms per call
47    /// (`F_FULLFSYNC`-class cost); skipping it cuts cache-write
48    /// throughput from ~200 writes/s to ~5500 writes/s. The price
49    /// is that a torn write after a crash could leave the cached
50    /// file with garbage bytes — readers must guard with a hash
51    /// check before trusting the content.
52    NoSync,
53}
54
55pub(super) fn write_atomic(
56    path: &Path,
57    data: &[u8],
58    mode: AtomicWriteMode,
59    pending_directory_syncs: Option<&Mutex<BTreeSet<PathBuf>>>,
60) -> Result<()> {
61    let parent = path
62        .parent()
63        .ok_or_else(|| std::io::Error::other("invalid atomic write path"))?;
64    std::fs::create_dir_all(parent)
65        .map_err(|e| HeddleError::Io(enrich_fs_error(parent, "creating", e)))?;
66
67    let temp_path = temp_path(path);
68    // Tag each fallible op with the verb that should appear in the
69    // user-facing message if it trips. We compute the wrapped error at
70    // the boundary so an EXDEV from `rename` gets the src+dst-aware
71    // message via `enrich_rename_error`, while a write into the temp
72    // file gets the "writing" verb against the destination path. The
73    // deferred-directory-sync lock-poison case is non-IO and gets a
74    // synthetic `io::Error::other`.
75    enum Op {
76        Write,
77        Rename,
78        SyncDir,
79    }
80    let mut failing_op = Op::Write;
81    let write_result: std::io::Result<()> = (|| {
82        // Open with explicit mode 0o644 instead of relying on the
83        // process umask. This makes loose objects byte-and-mode
84        // deterministic: clonefile on macOS preserves source mode,
85        // so a worktree materialised from a loose blob inherits
86        // 0o644 *without* an extra chmod. `repository_materialization`
87        // skips `set_file_mode` on non-executable files because of
88        // this contract — see `materialize_blob`'s comment near the
89        // `set_file_mode(dest, true)` call.
90        let mut opts = std::fs::OpenOptions::new();
91        opts.write(true).create_new(true);
92        #[cfg(unix)]
93        {
94            use std::os::unix::fs::OpenOptionsExt;
95            opts.mode(0o644);
96        }
97        let mut file = opts.open(&temp_path)?;
98        use std::io::Write as _;
99        file.write_all(data)?;
100        match mode {
101            // `Durable` is the strongest mode: data + metadata fsync
102            // before rename, then directory fsync after — so the file
103            // is fully on disk and discoverable through the parent
104            // directory before this returns.
105            AtomicWriteMode::Durable => file.sync_all()?,
106            // `BatchDirectorySync` keeps per-file content durability
107            // (so a crash mid-batch can't leave a renamed-but-empty
108            // file behind) but defers parent-directory fsyncs to
109            // `flush_snapshot_write_batch`. The trees + state file
110            // written during a snapshot rely on this mode for
111            // durability of their *contents*; the deferred dir fsync
112            // is what makes the rename observable to a fresh process.
113            // Without `sync_data` here, a crash after rename + before
114            // flush could leave a file that "exists" in the directory
115            // but whose data blocks weren't flushed — exactly the
116            // ACID violation we want to avoid for state/tree writes.
117            AtomicWriteMode::BatchDirectorySync => file.sync_data()?,
118            // Cache-mirror writes: no fsync. Caller guards reads
119            // with a hash check, so torn-write corruption is
120            // recoverable (re-promote from the authoritative copy).
121            AtomicWriteMode::NoSync => {}
122        }
123        failing_op = Op::Rename;
124        std::fs::rename(&temp_path, path)?;
125        failing_op = Op::SyncDir;
126        match mode {
127            AtomicWriteMode::Durable => sync_directory(parent)?,
128            AtomicWriteMode::BatchDirectorySync => {
129                if let Some(pending) = pending_directory_syncs {
130                    let mut dirs = pending.lock().map_err(|_| {
131                        std::io::Error::other("failed to acquire pending directory sync lock")
132                    })?;
133                    dirs.insert(parent.to_path_buf());
134                }
135            }
136            AtomicWriteMode::NoSync => {}
137        }
138        Ok(())
139    })();
140
141    if let Err(err) = write_result {
142        let _ = std::fs::remove_file(&temp_path);
143        let wrapped = match failing_op {
144            Op::Write => enrich_fs_error(path, "writing", err),
145            Op::Rename => enrich_rename_error(&temp_path, path, err),
146            Op::SyncDir => enrich_fs_error(parent, "syncing", err),
147        };
148        return Err(HeddleError::Io(wrapped));
149    }
150
151    Ok(())
152}
153
154/// Read the file's header (up to `header_len` bytes) and report its
155/// total on-disk size, without loading the body. Returns `Ok(None)`
156/// when the file is missing.
157///
158/// Used by [`crate::store::ObjectStore::blob_size`] on `FsStore` to
159/// avoid pulling whole blobs through `get_blob` just to learn their
160/// uncompressed size — the size is recorded in the compression header
161/// for compressed blobs, and equals the file length for raw blobs.
162pub(super) fn read_file_header(path: &Path, header_len: usize) -> Result<Option<(Vec<u8>, u64)>> {
163    let mut file = match File::open(path) {
164        Ok(file) => file,
165        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
166        Err(e) => return Err(e.into()),
167    };
168
169    let metadata = file.metadata()?;
170    let len = metadata.len();
171    let to_read = if len > header_len as u64 {
172        header_len
173    } else {
174        checked_file_len_to_usize(len)?
175    };
176    let mut header = vec![0u8; to_read];
177    if to_read > 0 {
178        use std::io::Read as _;
179        file.read_exact(&mut header)?;
180    }
181    Ok(Some((header, len)))
182}
183
184/// Read a pack file as zero-copy [`Bytes`]. For packs that clear the
185/// mmap threshold, the underlying memory is the mmap'd region —
186/// every `Bytes::slice` into it is a zero-copy view. Smaller packs
187/// fall back to a heap read wrapped in `Bytes`. Public because the
188/// pack reader lives in a sibling module and needs to bypass the
189/// `pub(super)` gate on `read_file_bytes`.
190pub fn read_file_bytes_for_pack(path: &Path) -> Result<Bytes> {
191    let file = File::open(path)?;
192    let len = file.metadata()?.len();
193    if len == 0 {
194        return Ok(Bytes::new());
195    }
196    if len >= MMAP_THRESHOLD_BYTES {
197        let mmap = unsafe { memmap2::MmapOptions::new().map(&file)? };
198        if mmap.len() != checked_file_len_to_usize(len)? {
199            return Err(HeddleError::InvalidObject(
200                "pack file size changed during memory mapping".to_string(),
201            ));
202        }
203        return Ok(Bytes::from_owner(mmap));
204    }
205    let mut data = Vec::with_capacity(checked_file_len_to_usize(len)?);
206    let mut reader = file;
207    reader.read_to_end(&mut data)?;
208    Ok(Bytes::from(data))
209}
210
211pub(super) fn read_file_bytes(path: &Path) -> Result<Option<FileBytes>> {
212    let file = match File::open(path) {
213        Ok(file) => file,
214        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
215        Err(e) => return Err(e.into()),
216    };
217
218    let metadata = file.metadata()?;
219    let len = metadata.len();
220    if len == 0 {
221        return Ok(Some(FileBytes::Vec(vec![])));
222    }
223    if len >= MMAP_THRESHOLD_BYTES {
224        let mmap = unsafe { memmap2::MmapOptions::new().map(&file)? };
225        if mmap.len() != checked_file_len_to_usize(len)? {
226            return Err(crate::store::HeddleError::InvalidObject(
227                "file size changed during memory mapping".to_string(),
228            ));
229        }
230        return Ok(Some(FileBytes::Mmap(mmap)));
231    }
232
233    let mut data = Vec::with_capacity(checked_file_len_to_usize(len)?);
234    let mut reader = file;
235    reader.read_to_end(&mut data)?;
236    Ok(Some(FileBytes::Vec(data)))
237}
238
239fn checked_file_len_to_usize(len: u64) -> Result<usize> {
240    usize::try_from(len).map_err(|_| {
241        HeddleError::InvalidObject(format!("file length {len} exceeds platform limits"))
242    })
243}
244
245/// List all content hashes from a sharded directory structure (aa/bbcc... → aabbcc...).
246pub(super) fn list_hashes_from_dir(
247    dir: &std::path::Path,
248) -> Result<Vec<crate::object::ContentHash>> {
249    use std::fs;
250
251    use tracing::debug;
252
253    if !dir.exists() {
254        return Ok(Vec::new());
255    }
256
257    let mut hashes = Vec::new();
258    for entry in fs::read_dir(dir)? {
259        let entry = entry?;
260        let path = entry.path();
261        if path.is_dir() {
262            let prefix = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
263            if prefix.len() == 2 {
264                for sub_entry in fs::read_dir(&path)? {
265                    let sub_entry = sub_entry?;
266                    let sub_path = sub_entry.path();
267                    if let Some(name) = sub_path.file_name().and_then(|n| n.to_str()) {
268                        let full_hash = format!("{}{}", prefix, name);
269                        if let Ok(hash) = crate::object::ContentHash::from_hex(&full_hash) {
270                            hashes.push(hash);
271                        }
272                    }
273                }
274            }
275        }
276    }
277    debug!(count = hashes.len(), "Listed hashes");
278    Ok(hashes)
279}