1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
// SPDX-License-Identifier: Apache-2.0
//! Pack and prune operations for FsStore.
use std::fs;
use super::{
FsStore,
fs_io::list_hashes_from_dir,
fs_paths::{blobs_dir, hash_path, packs_dir, trees_dir},
};
use crate::{
object::ContentHash,
store::{
HeddleError, ObjectStore, Result,
pack::{ObjectType as PackObjectType, PackBuilder},
},
};
impl FsStore {
/// Bulk-install many blobs as a single packfile. Two fsyncs total
/// (one for `.pack`, one for `.idx`) regardless of blob count —
/// vs. N×fsync if each blob were written loose. Used by the
/// snapshot hot path; called at the end of the tree walk with
/// every new blob accumulated in memory.
///
/// Skips blobs already in the store (whether loose or packed) so
/// re-snapshotting an unchanged worktree doesn't churn the pack
/// directory. With every blob already known, this is a no-op.
pub(super) fn put_blobs_packed_impl(&self, blobs: Vec<(ContentHash, Vec<u8>)>) -> Result<()> {
if blobs.is_empty() {
return Ok(());
}
// Snapshot-time pack: skip the sliding-window delta search.
// It's a CPU win on similar-content files (the GC packer
// benefits) but for a single snapshot the inputs are
// unrelated content (random binaries, small text, etc.) and
// every pair-wise delta estimate runs across the full
// payloads — for 16×4MB blobs that's tens of seconds of
// hashing for ~zero compression benefit. GC's
// `pack_objects_impl` keeps the full delta search; this
// path only optimizes durability + write throughput.
let mut compression = self.compression;
compression.max_delta_size = 0;
let mut builder = PackBuilder::new(compression);
let mut staged: Vec<(ContentHash, Vec<u8>)> = Vec::with_capacity(blobs.len());
for (hash, data) in blobs {
if ObjectStore::has_blob(self, &hash)? {
continue;
}
staged.push((hash, data.clone()));
builder.add(hash, PackObjectType::Blob, data);
}
if staged.is_empty() {
return Ok(());
}
let (pack_data, index_data, _stats) = builder.build()?;
// Install the pack files. `install_pack_files` clears the
// recent-objects caches because a generic pack install (e.g.
// received over the network) might shadow loose objects we
// didn't write. For our locally-built pack we know exactly
// what we just installed, so we re-populate `recent_blobs`
// with the staged contents immediately afterwards. Without
// this the snapshot hot path takes a cache miss on every
// blob it just wrote, and `seed_large_repository` style
// benchmarks that snapshot-many-times-in-a-loop end up
// re-reading every parent state from disk between
// iterations.
self.install_pack_files(&pack_data, &index_data)?;
if let Ok(mut cache) = self.recent_blobs.write() {
for (hash, data) in staged {
cache.insert(hash, crate::object::Blob::from_slice(&data));
}
}
Ok(())
}
pub(super) fn pack_objects_impl(&self, _aggressive: bool) -> Result<(u64, u64)> {
let blobs = list_hashes_from_dir(&blobs_dir(&self.root))?;
let trees = list_hashes_from_dir(&trees_dir(&self.root))?;
if blobs.is_empty() && trees.is_empty() {
return Ok((0, 0));
}
let mut builder = PackBuilder::new(self.compression);
for hash in &blobs {
if let Some(blob) = ObjectStore::get_blob(self, hash)? {
builder.add(*hash, PackObjectType::Blob, blob.content().to_vec());
}
}
for hash in &trees {
if let Some(tree) = ObjectStore::get_tree(self, hash)? {
let data = rmp_serde::to_vec(&tree)?;
builder.add(*hash, PackObjectType::Tree, data);
}
}
let (pack_data, index_data, stats) = builder.build()?;
self.install_pack_files(&pack_data, &index_data)?;
// GC packs *replace* loose objects (followed by
// `prune_loose_objects`). Bust the recent-objects caches so
// a subsequent get_* doesn't return a stale `Blob`/`Tree`
// pointing at a path we're about to delete. The snapshot hot
// path doesn't go through here — it calls
// `install_pack_files` directly via `put_blobs_packed_impl`,
// which keeps its caches warm.
self.clear_recent_object_caches();
let saved = stats.total_uncompressed - stats.total_compressed;
Ok((stats.object_count, saved))
}
pub(super) fn install_pack_files(&self, pack_data: &[u8], index_data: &[u8]) -> Result<()> {
let packs = packs_dir(&self.root);
fs::create_dir_all(&packs)?;
let pack_hash = blake3::hash(pack_data);
let pack_name = format!("{}", pack_hash.to_hex());
let pack_path = packs.join(format!("{}.pack", pack_name));
let index_path = packs.join(format!("{}.idx", pack_name));
self.write_pack_atomic(&pack_path, pack_data)?;
self.write_pack_atomic(&index_path, index_data)?;
// Pack manager picks up the new files. We do *not* clear the
// recent-object caches here — every caller that follows this
// with a destructive prune is responsible for clearing them
// explicitly. Snapshot installs rely on cache stickiness to
// keep tight snapshot loops fast (see
// `put_blobs_packed_impl`).
self.reload_packs()?;
Ok(())
}
/// Move a pack and its index already on disk into the store's
/// pack directory, computing the pack's content-hash by streaming
/// the file (constant memory regardless of pack size). Pairs with
/// `StreamingPackBuilder`: pack data, the index, *and* this
/// installation step never load the full pack or index into
/// memory.
///
/// Both source files are `rename(2)`'d into place; the index is
/// no longer copied through memory the way `install_pack_files`
/// did via `write_pack_atomic`. Cross-device renames fall back to
/// copy + remove for the rare EXDEV case.
pub(super) fn install_pack_files_streaming(
&self,
src_pack_path: &std::path::Path,
src_index_path: &std::path::Path,
) -> Result<()> {
use std::io::Read;
let packs = packs_dir(&self.root);
fs::create_dir_all(&packs)?;
// Stream-hash the pack file to derive its name. 64 KiB chunks
// keep the hasher's working set tiny.
let mut hasher = blake3::Hasher::new();
let mut file = fs::File::open(src_pack_path)?;
let mut buf = vec![0u8; 64 * 1024];
loop {
let n = file.read(&mut buf)?;
if n == 0 {
break;
}
hasher.update(&buf[..n]);
}
drop(file);
let pack_hash = hasher.finalize();
let pack_name = format!("{}", pack_hash.to_hex());
let pack_path = packs.join(format!("{}.pack", pack_name));
let index_path = packs.join(format!("{}.idx", pack_name));
// Move the staged pack file into the store. `rename` is
// atomic on POSIX when both paths are on the same filesystem;
// the heddle store keeps its staging dir under the same root,
// so this should always satisfy that constraint.
match fs::rename(src_pack_path, &pack_path) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
// Same pack already installed (content-addressed).
// Drop the staging copy so it doesn't accumulate.
let _ = fs::remove_file(src_pack_path);
}
Err(e) => {
// Fall back to copy + remove for the (rare) case of
// a cross-device rename. EXDEV is not on a stable
// ErrorKind variant; match by raw_os_error if needed
// on Linux.
let _ = fs::copy(src_pack_path, &pack_path)?;
let _ = fs::remove_file(src_pack_path);
let _ = e; // silence unused-var if EXDEV path didn't fire
}
}
// Move the index file alongside the pack. Same rename
// semantics as the pack: atomic on same-filesystem POSIX,
// copy+remove fallback for cross-device.
match fs::rename(src_index_path, &index_path) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
let _ = fs::remove_file(src_index_path);
}
Err(_) => {
let _ = fs::copy(src_index_path, &index_path)?;
let _ = fs::remove_file(src_index_path);
}
}
self.clear_recent_object_caches();
self.reload_packs()?;
Ok(())
}
pub(super) fn prune_loose_objects_impl(&self) -> Result<(u64, u64)> {
let mut removed = 0u64;
let mut bytes_freed = 0u64;
let blobs = list_hashes_from_dir(&blobs_dir(&self.root))?;
let trees = list_hashes_from_dir(&trees_dir(&self.root))?;
let pack_manager = self
.pack_manager()
.read()
.map_err(|_| HeddleError::Config("Failed to acquire pack manager lock".to_string()))?;
for hash in &blobs {
if pack_manager.get_hashed_object(hash)?.is_some() {
let path = hash_path(&blobs_dir(&self.root), hash);
match fs::metadata(&path) {
Ok(metadata) => match fs::remove_file(&path) {
Ok(()) => {
bytes_freed += metadata.len();
removed += 1;
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => return Err(HeddleError::from(e)),
},
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => return Err(HeddleError::from(e)),
}
}
}
for hash in &trees {
if pack_manager.get_hashed_object(hash)?.is_some() {
let path = hash_path(&trees_dir(&self.root), hash);
match fs::metadata(&path) {
Ok(metadata) => match fs::remove_file(&path) {
Ok(()) => {
bytes_freed += metadata.len();
removed += 1;
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => return Err(HeddleError::from(e)),
},
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => return Err(HeddleError::from(e)),
}
}
}
Ok((removed, bytes_freed))
}
}