Skip to main content

objects/store/fs/
fs_io.rs

1// SPDX-License-Identifier: Apache-2.0
2#![deny(clippy::cast_possible_truncation)]
3
4//! IO helpers for FsStore.
5
6use std::{
7    collections::BTreeSet,
8    fs::File,
9    io::Read,
10    path::{Path, PathBuf},
11    sync::Mutex,
12};
13
14use bytes::Bytes;
15
16use crate::{
17    error::HeddleError,
18    fs_atomic::{enrich_fs_error, enrich_rename_error, sync_directory, temp_path},
19    store::Result,
20};
21
22const MMAP_THRESHOLD_BYTES: u64 = 256 * 1024;
23
24pub(super) enum FileBytes {
25    Vec(Vec<u8>),
26    Mmap(memmap2::Mmap),
27}
28
29impl FileBytes {
30    pub(super) fn as_slice(&self) -> &[u8] {
31        match self {
32            FileBytes::Vec(data) => data,
33            FileBytes::Mmap(data) => data,
34        }
35    }
36}
37
38#[derive(Clone, Copy, Debug, Eq, PartialEq)]
39pub(super) enum AtomicWriteMode {
40    Durable,
41    BatchDirectorySync,
42    /// No fsync at all. Caller asserts the file is a recoverable
43    /// cache mirror — the authoritative copy lives elsewhere
44    /// (typically a pack) and re-derivation on read is correct.
45    /// On macOS APFS, `sync_data` alone is ~5 ms per call
46    /// (`F_FULLFSYNC`-class cost); skipping it cuts cache-write
47    /// throughput from ~200 writes/s to ~5500 writes/s. The price
48    /// is that a torn write after a crash could leave the cached
49    /// file with garbage bytes — readers must guard with a hash
50    /// check before trusting the content.
51    NoSync,
52}
53
54pub(super) fn write_atomic(
55    path: &Path,
56    data: &[u8],
57    mode: AtomicWriteMode,
58    pending_directory_syncs: Option<&Mutex<BTreeSet<PathBuf>>>,
59) -> Result<()> {
60    let parent = path
61        .parent()
62        .ok_or_else(|| std::io::Error::other("invalid atomic write path"))?;
63    std::fs::create_dir_all(parent)
64        .map_err(|e| HeddleError::Io(enrich_fs_error(parent, "creating", e)))?;
65
66    let temp_path = temp_path(path);
67    // Tag each fallible op with the verb that should appear in the
68    // user-facing message if it trips. We compute the wrapped error at
69    // the boundary so an EXDEV from `rename` gets the src+dst-aware
70    // message via `enrich_rename_error`, while a write into the temp
71    // file gets the "writing" verb against the destination path. The
72    // deferred-directory-sync lock-poison case is non-IO and gets a
73    // synthetic `io::Error::other`.
74    enum Op {
75        Write,
76        Rename,
77        SyncDir,
78    }
79    let mut failing_op = Op::Write;
80    let write_result: std::io::Result<()> = (|| {
81        // Open with explicit mode 0o644 instead of relying on the
82        // process umask. This makes loose objects byte-and-mode
83        // deterministic: clonefile on macOS preserves source mode,
84        // so a worktree materialised from a loose blob inherits
85        // 0o644 *without* an extra chmod. `repository_materialization`
86        // skips `set_file_mode` on non-executable files because of
87        // this contract — see `materialize_blob`'s comment near the
88        // `set_file_mode(dest, true)` call.
89        let mut opts = std::fs::OpenOptions::new();
90        opts.write(true).create_new(true);
91        #[cfg(unix)]
92        {
93            use std::os::unix::fs::OpenOptionsExt;
94            opts.mode(0o644);
95        }
96        let mut file = opts.open(&temp_path)?;
97        use std::io::Write as _;
98        file.write_all(data)?;
99        match mode {
100            // `Durable` is the strongest mode: data + metadata fsync
101            // before rename, then directory fsync after — so the file
102            // is fully on disk and discoverable through the parent
103            // directory before this returns.
104            AtomicWriteMode::Durable => file.sync_all()?,
105            // `BatchDirectorySync` keeps per-file content durability
106            // (so a crash mid-batch can't leave a renamed-but-empty
107            // file behind) but defers parent-directory fsyncs to
108            // `flush_snapshot_write_batch`. The trees + state file
109            // written during a snapshot rely on this mode for
110            // durability of their *contents*; the deferred dir fsync
111            // is what makes the rename observable to a fresh process.
112            // Without `sync_data` here, a crash after rename + before
113            // flush could leave a file that "exists" in the directory
114            // but whose data blocks weren't flushed — exactly the
115            // ACID violation we want to avoid for state/tree writes.
116            AtomicWriteMode::BatchDirectorySync => file.sync_data()?,
117            // Cache-mirror writes: no fsync. Caller guards reads
118            // with a hash check, so torn-write corruption is
119            // recoverable (re-promote from the authoritative copy).
120            AtomicWriteMode::NoSync => {}
121        }
122        failing_op = Op::Rename;
123        std::fs::rename(&temp_path, path)?;
124        failing_op = Op::SyncDir;
125        match mode {
126            AtomicWriteMode::Durable => sync_directory(parent)?,
127            AtomicWriteMode::BatchDirectorySync => {
128                if let Some(pending) = pending_directory_syncs {
129                    let mut dirs = pending.lock().map_err(|_| {
130                        std::io::Error::other("failed to acquire pending directory sync lock")
131                    })?;
132                    dirs.insert(parent.to_path_buf());
133                }
134            }
135            AtomicWriteMode::NoSync => {}
136        }
137        Ok(())
138    })();
139
140    if let Err(err) = write_result {
141        let _ = std::fs::remove_file(&temp_path);
142        let wrapped = match failing_op {
143            Op::Write => enrich_fs_error(path, "writing", err),
144            Op::Rename => enrich_rename_error(&temp_path, path, err),
145            Op::SyncDir => enrich_fs_error(parent, "syncing", err),
146        };
147        return Err(HeddleError::Io(wrapped));
148    }
149
150    Ok(())
151}
152
153/// Read the file's header (up to `header_len` bytes) and report its
154/// total on-disk size, without loading the body. Returns `Ok(None)`
155/// when the file is missing.
156///
157/// Used by [`crate::store::ObjectStore::blob_size`] on `FsStore` to
158/// avoid pulling whole blobs through `get_blob` just to learn their
159/// uncompressed size — the size is recorded in the compression header
160/// for compressed blobs, and equals the file length for raw blobs.
161pub(super) fn read_file_header(path: &Path, header_len: usize) -> Result<Option<(Vec<u8>, u64)>> {
162    let mut file = match File::open(path) {
163        Ok(file) => file,
164        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
165        Err(e) => return Err(e.into()),
166    };
167
168    let metadata = file.metadata()?;
169    let len = metadata.len();
170    let to_read = if len > header_len as u64 {
171        header_len
172    } else {
173        checked_file_len_to_usize(len)?
174    };
175    let mut header = vec![0u8; to_read];
176    if to_read > 0 {
177        use std::io::Read as _;
178        file.read_exact(&mut header)?;
179    }
180    Ok(Some((header, len)))
181}
182
183/// Read a pack file as zero-copy [`Bytes`]. For packs that clear the
184/// mmap threshold, the underlying memory is the mmap'd region —
185/// every `Bytes::slice` into it is a zero-copy view. Smaller packs
186/// fall back to a heap read wrapped in `Bytes`. Public because the
187/// pack reader lives in a sibling module and needs to bypass the
188/// `pub(super)` gate on `read_file_bytes`.
189pub fn read_file_bytes_for_pack(path: &Path) -> Result<Bytes> {
190    let file = File::open(path)?;
191    let len = file.metadata()?.len();
192    if len == 0 {
193        return Ok(Bytes::new());
194    }
195    if len >= MMAP_THRESHOLD_BYTES {
196        let mmap = unsafe { memmap2::MmapOptions::new().map(&file)? };
197        if mmap.len() != checked_file_len_to_usize(len)? {
198            return Err(HeddleError::InvalidObject(
199                "pack file size changed during memory mapping".to_string(),
200            ));
201        }
202        return Ok(Bytes::from_owner(mmap));
203    }
204    let mut data = Vec::with_capacity(checked_file_len_to_usize(len)?);
205    let mut reader = file;
206    reader.read_to_end(&mut data)?;
207    Ok(Bytes::from(data))
208}
209
210pub(super) fn read_file_bytes(path: &Path) -> Result<Option<FileBytes>> {
211    let file = match File::open(path) {
212        Ok(file) => file,
213        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
214        Err(e) => return Err(e.into()),
215    };
216
217    let metadata = file.metadata()?;
218    let len = metadata.len();
219    if len == 0 {
220        return Ok(Some(FileBytes::Vec(vec![])));
221    }
222    if len >= MMAP_THRESHOLD_BYTES {
223        let mmap = unsafe { memmap2::MmapOptions::new().map(&file)? };
224        if mmap.len() != checked_file_len_to_usize(len)? {
225            return Err(crate::store::HeddleError::InvalidObject(
226                "file size changed during memory mapping".to_string(),
227            ));
228        }
229        return Ok(Some(FileBytes::Mmap(mmap)));
230    }
231
232    let mut data = Vec::with_capacity(checked_file_len_to_usize(len)?);
233    let mut reader = file;
234    reader.read_to_end(&mut data)?;
235    Ok(Some(FileBytes::Vec(data)))
236}
237
238fn checked_file_len_to_usize(len: u64) -> Result<usize> {
239    usize::try_from(len).map_err(|_| {
240        HeddleError::InvalidObject(format!("file length {len} exceeds platform limits"))
241    })
242}
243
244/// List all content hashes from a sharded directory structure (aa/bbcc... → aabbcc...).
245pub(super) fn list_hashes_from_dir(
246    dir: &std::path::Path,
247) -> Result<Vec<crate::object::ContentHash>> {
248    use std::fs;
249
250    use tracing::debug;
251
252    if !dir.exists() {
253        return Ok(Vec::new());
254    }
255
256    let mut hashes = Vec::new();
257    for entry in fs::read_dir(dir)? {
258        let entry = entry?;
259        let path = entry.path();
260        if path.is_dir() {
261            let prefix = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
262            if prefix.len() == 2 {
263                for sub_entry in fs::read_dir(&path)? {
264                    let sub_entry = sub_entry?;
265                    let sub_path = sub_entry.path();
266                    if let Some(name) = sub_path.file_name().and_then(|n| n.to_str()) {
267                        let full_hash = format!("{}{}", prefix, name);
268                        if let Ok(hash) = crate::object::ContentHash::from_hex(&full_hash) {
269                            hashes.push(hash);
270                        }
271                    }
272                }
273            }
274        }
275    }
276    debug!(count = hashes.len(), "Listed hashes");
277    Ok(hashes)
278}