1use crate::config::{project_id_from_path, StoreLayout};
2use crate::error::{ChkpttError, Result};
3use crate::index::{FileEntry, FileIndex};
4use crate::ops::io_order::sort_scanned_refs_for_locality;
5use crate::ops::lock::ProjectLock;
6use crate::scanner::ScannedFile;
7use crate::store::blob::{hex_to_bytes, read_or_mmap, read_path_bytes};
8use crate::store::catalog::{BlobLocation, CatalogSnapshot, ManifestEntry, MetadataCatalog};
9use crate::store::pack::PackWriter;
10use crate::store::snapshot::{Snapshot, SnapshotStats};
11use crate::store::tree::{EntryType, TreeEntry, TreeStore};
12use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
13use std::path::Path;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::{mpsc, Arc, Mutex};
16
17use crate::ops::progress::{emit, ProgressCallback, ProgressEvent};
18
19#[derive(Default)]
20pub struct SaveOptions {
21 pub message: Option<String>,
22 pub include_deps: bool,
23 pub progress: ProgressCallback,
24}
25
26#[derive(Debug)]
27pub struct SaveResult {
28 pub snapshot_id: String,
29 pub stats: SnapshotStats,
30}
31
32struct ProcessedFile {
34 relative_path: String,
35 blob_hash_bytes: [u8; 16],
36 size: u64,
37 mode: u32,
38 entry_type: EntryType,
39}
40
41struct PreparedFile {
42 relative_path: String,
43 blob_hash_bytes: [u8; 16],
44 compressed: Option<Vec<u8>>,
46 size: u64,
47 mode: u32,
48 mtime_secs: i64,
49 mtime_nanos: i64,
50 inode: Option<u64>,
51 entry_type: EntryType,
52}
53
54struct NewBlobRecord {
55 blob_hash: [u8; 16],
56 size: u64,
57}
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
60struct HardlinkKey {
61 device: u64,
62 inode: u64,
63}
64
65#[derive(Debug, Clone)]
66struct HardlinkPrepared {
67 blob_hash_bytes: [u8; 16],
68}
69
70const SEEN_HASH_SHARDS: usize = 64;
71const PREPARED_FILE_PIPELINE_SLOTS: usize = 64;
72struct ShardedSeenHashes {
73 shards: Vec<Mutex<HashSet<[u8; 16]>>>,
74}
75
76impl ShardedSeenHashes {
77 fn new(expected_entries: usize) -> Self {
78 let shard_count = SEEN_HASH_SHARDS.max(1);
79 let per_shard_capacity = expected_entries.div_ceil(shard_count).max(1);
80 let mut shards = Vec::with_capacity(shard_count);
81 for _ in 0..shard_count {
82 shards.push(Mutex::new(HashSet::with_capacity(per_shard_capacity)));
83 }
84 Self { shards }
85 }
86
87 #[inline]
88 fn insert(&self, hash: [u8; 16]) -> bool {
89 let shard_index = ((hash[0] as usize) << 8 | hash[1] as usize) % self.shards.len();
90 let mut shard = self.shards[shard_index].lock().unwrap();
91 shard.insert(hash)
92 }
93}
94
95pub fn save(workspace_root: &Path, options: SaveOptions) -> Result<SaveResult> {
100 let project_id = project_id_from_path(workspace_root);
102
103 let layout = StoreLayout::new(&project_id);
105 layout.ensure_dirs()?;
106
107 let _lock = ProjectLock::acquire(&layout.locks_dir())?;
109 let catalog = MetadataCatalog::open(layout.catalog_path())?;
110
111 let scanned_files =
113 crate::scanner::scan_workspace_with_options(workspace_root, None, options.include_deps)?;
114 emit(
115 &options.progress,
116 ProgressEvent::ScanComplete {
117 file_count: scanned_files.len() as u64,
118 },
119 );
120
121 let mut index = FileIndex::open(layout.index_path())?;
123 let cached_entries = index.entries();
124
125 let packs_dir = layout.packs_dir();
127 let known_blob_hashes = catalog.all_blob_hashes()?;
133 let mut pack_writer = PackWriter::new(&packs_dir)?;
134
135 let mut processed_files = Vec::with_capacity(scanned_files.len());
137 let mut files_to_prepare = Vec::new();
138 let mut updated_entries = Vec::new();
139 let mut blob_locations_to_record = Vec::new();
140 let mut new_blob_records = Vec::new();
141 let mut total_bytes: u64 = 0;
142 let mut current_paths =
143 (!cached_entries.is_empty()).then(|| HashSet::with_capacity(scanned_files.len()));
144
145 for scanned in &scanned_files {
146 if let Some(paths) = current_paths.as_mut() {
147 paths.insert(scanned.relative_path.as_str());
148 }
149
150 if let Some(processed) =
151 cached_processed_file(scanned, cached_entries.get(&scanned.relative_path))
152 {
153 total_bytes += processed.size;
154 processed_files.push(processed);
155 } else {
156 files_to_prepare.push(scanned);
157 }
158 }
159 let removed_paths = current_paths
160 .map(|paths| {
161 cached_entries
162 .keys()
163 .filter(|path| !paths.contains(path.as_str()))
164 .cloned()
165 .collect::<Vec<_>>()
166 })
167 .unwrap_or_default();
168
169 let mut new_objects: u64 = 0;
170
171 let seen_hashes = Arc::new(ShardedSeenHashes::new(files_to_prepare.len()));
173
174 sort_scanned_refs_for_locality(&mut files_to_prepare);
176
177 let total_to_process = files_to_prepare.len() as u64;
179 emit(
180 &options.progress,
181 ProgressEvent::ProcessStart {
182 total: total_to_process,
183 },
184 );
185 let progress_counter = AtomicU64::new(0);
186 process_prepared_files_streaming(
187 &files_to_prepare,
188 &seen_hashes,
189 &options.progress,
190 &progress_counter,
191 total_to_process,
192 |prepared| {
193 let PreparedFile {
194 relative_path,
195 blob_hash_bytes,
196 compressed,
197 size,
198 mode,
199 mtime_secs,
200 mtime_nanos,
201 inode,
202 entry_type,
203 } = prepared;
204
205 total_bytes += size;
206 if let Some(compressed) = compressed {
207 let exists_in_store = known_blob_hashes.contains(&blob_hash_bytes);
208 if !exists_in_store {
209 pack_writer.add_pre_compressed_bytes(blob_hash_bytes, compressed)?;
210 new_blob_records.push(NewBlobRecord {
211 blob_hash: blob_hash_bytes,
212 size,
213 });
214 new_objects += 1;
215 }
216 }
217
218 updated_entries.push(FileEntry {
219 path: relative_path.clone(),
220 blob_hash: blob_hash_bytes,
221 size,
222 mtime_secs,
223 mtime_nanos,
224 inode,
225 mode,
226 });
227 processed_files.push(ProcessedFile {
228 relative_path,
229 blob_hash_bytes,
230 size,
231 mode,
232 entry_type,
233 });
234 Ok(())
235 },
236 )?;
237 let new_pack_hash = if !pack_writer.is_empty() {
238 Some(pack_writer.finish()?)
239 } else {
240 None
241 };
242 if let Some(pack_hash) = new_pack_hash {
243 for blob in &new_blob_records {
244 blob_locations_to_record.push((
245 blob.blob_hash,
246 BlobLocation {
247 pack_hash: Some(pack_hash.clone()),
248 size: blob.size,
249 },
250 ));
251 }
252 }
253 catalog.bulk_upsert_blob_locations(&blob_locations_to_record)?;
254 emit(&options.progress, ProgressEvent::PackComplete);
255
256 let latest_catalog_snapshot = catalog.latest_snapshot()?;
258
259 let root_tree_hash = if new_objects == 0
260 && removed_paths.is_empty()
261 && updated_entries.is_empty()
262 {
263 if let Some(ref snapshot) = latest_catalog_snapshot {
264 root_tree_hash_for_snapshot(&catalog, snapshot, &TreeStore::new(layout.trees_dir()))?
266 } else {
267 let tree_store = TreeStore::new(layout.trees_dir());
268 let hex = build_tree(&processed_files, &tree_store)?;
269 hex_to_bytes(&hex)?
270 }
271 } else {
272 let tree_store = TreeStore::new(layout.trees_dir());
273 let hex = build_tree(&processed_files, &tree_store)?;
274 hex_to_bytes(&hex)?
275 };
276
277 let parent_snapshot_id = latest_catalog_snapshot
279 .as_ref()
280 .map(|snapshot| snapshot.id.clone());
281
282 let stats = SnapshotStats {
284 total_files: scanned_files.len() as u64,
285 total_bytes,
286 new_objects,
287 };
288
289 let snapshot = Snapshot::new(
290 options.message,
291 root_tree_hash,
292 parent_snapshot_id,
293 stats.clone(),
294 );
295
296 let snapshot_id = snapshot.id.clone();
297 let catalog_snapshot = CatalogSnapshot {
298 id: snapshot.id.clone(),
299 created_at: snapshot.created_at,
300 message: snapshot.message.clone(),
301 parent_snapshot_id: snapshot.parent_snapshot_id.clone(),
302 manifest_snapshot_id: None,
303 root_tree_hash: Some(root_tree_hash),
304 stats: stats.clone(),
305 };
306 let no_manifest_changes =
307 new_objects == 0 && removed_paths.is_empty() && updated_entries.is_empty();
308
309 if no_manifest_changes {
311 let manifest_snapshot_id = latest_catalog_snapshot
312 .as_ref()
313 .map(|snapshot| {
314 snapshot
315 .manifest_snapshot_id
316 .as_deref()
317 .unwrap_or(snapshot.id.as_str())
318 })
319 .unwrap_or(snapshot.id.as_str());
320 catalog.insert_snapshot_metadata_only(&catalog_snapshot, manifest_snapshot_id)?;
321 } else {
322 let mut manifest: Vec<ManifestEntry> = processed_files
323 .iter()
324 .map(|processed| ManifestEntry {
325 path: processed.relative_path.clone(),
326 blob_hash: processed.blob_hash_bytes,
327 size: processed.size,
328 mode: processed.mode,
329 })
330 .collect();
331 manifest.sort_unstable_by(|left, right| left.path.cmp(&right.path));
332 catalog.insert_snapshot(&catalog_snapshot, &manifest)?;
333 }
334
335 index.apply_changes(&removed_paths, &updated_entries)?;
337
338 Ok(SaveResult { snapshot_id, stats })
342}
343
344#[cfg_attr(not(test), allow(dead_code))]
345fn store_has_external_objects(objects_dir: &Path, packs_dir: &Path) -> Result<bool> {
346 let _ = objects_dir;
347 Ok(store_has_pack_objects(packs_dir)?)
348}
349
350fn store_has_pack_objects(packs_dir: &Path) -> Result<bool> {
351 let entries = match std::fs::read_dir(packs_dir) {
352 Ok(entries) => entries,
353 Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(false),
354 Err(error) => return Err(error.into()),
355 };
356
357 for entry in entries {
358 let entry = entry?;
359 if !entry.file_type()?.is_file() {
360 continue;
361 }
362
363 let name = entry.file_name();
364 let name = name.to_string_lossy();
365 if name.starts_with("pack-") && name.ends_with(".dat") {
366 return Ok(true);
367 }
368 }
369
370 Ok(false)
371}
372
373fn root_tree_hash_for_snapshot(
374 catalog: &MetadataCatalog,
375 snapshot: &CatalogSnapshot,
376 tree_store: &TreeStore,
377) -> Result<[u8; 16]> {
378 if let Some(root_tree_hash) = snapshot.root_tree_hash {
379 return Ok(root_tree_hash);
380 }
381
382 let manifest = catalog.snapshot_manifest(&snapshot.id)?;
383 if manifest.is_empty() && snapshot.stats.total_files > 0 {
384 return Err(ChkpttError::StoreCorrupted(format!(
385 "snapshot '{}' is missing both manifest entries and root_tree_hash",
386 snapshot.id
387 )));
388 }
389 let root_tree_hex = build_tree(
390 &manifest
391 .into_iter()
392 .map(|entry| ProcessedFile {
393 relative_path: entry.path,
394 blob_hash_bytes: entry.blob_hash,
395 size: entry.size,
396 mode: entry.mode,
397 entry_type: if entry.mode & 0o170000 == 0o120000 {
398 EntryType::Symlink
399 } else {
400 EntryType::File
401 },
402 })
403 .collect::<Vec<_>>(),
404 tree_store,
405 )?;
406 hex_to_bytes(&root_tree_hex)
407}
408
409fn cached_processed_file(
410 scanned: &ScannedFile,
411 cached: Option<&FileEntry>,
412) -> Option<ProcessedFile> {
413 if let Some(cached) = cached {
414 if cached.mtime_secs == scanned.mtime_secs
415 && cached.mtime_nanos == scanned.mtime_nanos
416 && cached.size == scanned.size
417 && cached.inode == scanned.inode
418 && cached.mode == scanned.mode
419 {
420 return Some(ProcessedFile {
421 relative_path: scanned.relative_path.clone(),
422 blob_hash_bytes: cached.blob_hash,
423 size: scanned.size,
424 mode: scanned.mode,
425 entry_type: if scanned.is_symlink {
426 EntryType::Symlink
427 } else {
428 EntryType::File
429 },
430 });
431 }
432 }
433 None
434}
435
436fn prepare_file(scanned: &ScannedFile, seen_hashes: &ShardedSeenHashes) -> Result<PreparedFile> {
437 let (blob_hash_bytes, compressed) = if scanned.is_symlink {
438 let content = read_path_bytes(&scanned.absolute_path, true)?;
439 let blob_hash_bytes = xxhash_rust::xxh3::xxh3_128(&content).to_le_bytes();
440 let is_new = seen_hashes.insert(blob_hash_bytes);
441 let compressed = if is_new {
442 Some(compress_with_worker_context(&content))
443 } else {
444 None
445 };
446 (blob_hash_bytes, compressed)
447 } else {
448 let content = read_or_mmap(&scanned.absolute_path)?;
449 let blob_hash_bytes = xxhash_rust::xxh3::xxh3_128(content.as_ref()).to_le_bytes();
450 let is_new = seen_hashes.insert(blob_hash_bytes);
451 let compressed = if is_new {
452 Some(compress_with_worker_context(content.as_ref()))
453 } else {
454 None
455 };
456 (blob_hash_bytes, compressed)
457 };
458
459 Ok(PreparedFile {
460 relative_path: scanned.relative_path.clone(),
461 blob_hash_bytes,
462 compressed,
463 size: scanned.size,
464 mode: scanned.mode,
465 mtime_secs: scanned.mtime_secs,
466 mtime_nanos: scanned.mtime_nanos,
467 inode: scanned.inode,
468 entry_type: if scanned.is_symlink {
469 EntryType::Symlink
470 } else {
471 EntryType::File
472 },
473 })
474}
475
476fn hardlink_key(scanned: &ScannedFile) -> Option<HardlinkKey> {
477 if scanned.is_symlink {
478 return None;
479 }
480
481 Some(HardlinkKey {
482 device: scanned.device?,
483 inode: scanned.inode?,
484 })
485}
486
487fn prepare_file_with_hardlink_cache(
488 scanned: &ScannedFile,
489 seen_hashes: &ShardedSeenHashes,
490 hardlinks: &mut HashMap<HardlinkKey, HardlinkPrepared>,
491) -> Result<PreparedFile> {
492 if let Some(key) = hardlink_key(scanned) {
493 if let Some(cached) = hardlinks.get(&key) {
494 return Ok(PreparedFile {
495 relative_path: scanned.relative_path.clone(),
496 blob_hash_bytes: cached.blob_hash_bytes,
497 compressed: None,
498 size: scanned.size,
499 mode: scanned.mode,
500 mtime_secs: scanned.mtime_secs,
501 mtime_nanos: scanned.mtime_nanos,
502 inode: scanned.inode,
503 entry_type: if scanned.is_symlink {
504 EntryType::Symlink
505 } else {
506 EntryType::File
507 },
508 });
509 }
510
511 let prepared = prepare_file(scanned, seen_hashes)?;
512 hardlinks.insert(
513 key,
514 HardlinkPrepared {
515 blob_hash_bytes: prepared.blob_hash_bytes,
516 },
517 );
518 return Ok(prepared);
519 }
520
521 prepare_file(scanned, seen_hashes)
522}
523
524fn split_scanned_refs_preserving_hardlinks<'a>(
525 scanned_files: &'a [&'a ScannedFile],
526 worker_count: usize,
527) -> Vec<&'a [&'a ScannedFile]> {
528 if scanned_files.is_empty() {
529 return Vec::new();
530 }
531
532 let target_chunk_size = scanned_files.len().div_ceil(worker_count.max(1));
533 let mut chunks = Vec::with_capacity(worker_count.max(1));
534 let mut start = 0usize;
535
536 while start < scanned_files.len() {
537 let mut end = (start + target_chunk_size).min(scanned_files.len());
538 while end < scanned_files.len()
539 && hardlink_key(scanned_files[end - 1]) == hardlink_key(scanned_files[end])
540 && hardlink_key(scanned_files[end]).is_some()
541 {
542 end += 1;
543 }
544 chunks.push(&scanned_files[start..end]);
545 start = end;
546 }
547
548 chunks
549}
550
551fn compress_with_worker_context(content: &[u8]) -> Vec<u8> {
552 use lz4_flex::frame::FrameEncoder;
553 let mut encoder = FrameEncoder::new(Vec::new());
554 std::io::Write::write_all(&mut encoder, content).unwrap();
555 encoder.finish().unwrap()
556}
557
558fn process_prepared_files_streaming<F>(
561 scanned_files: &[&ScannedFile],
562 seen_hashes: &Arc<ShardedSeenHashes>,
563 progress: &ProgressCallback,
564 progress_counter: &AtomicU64,
565 total_to_process: u64,
566 mut on_prepared: F,
567) -> Result<()>
568where
569 F: FnMut(PreparedFile) -> Result<()>,
570{
571 if scanned_files.is_empty() {
572 return Ok(());
573 }
574
575 let worker_count = std::thread::available_parallelism()
576 .map(|n| n.get())
577 .unwrap_or(1)
578 .min(scanned_files.len());
579
580 if worker_count <= 1 {
581 let mut hardlinks = HashMap::new();
582 for scanned in scanned_files {
583 let prepared = prepare_file_with_hardlink_cache(scanned, seen_hashes, &mut hardlinks)?;
584 let completed = progress_counter.fetch_add(1, Ordering::Relaxed) + 1;
585 emit(
586 progress,
587 ProgressEvent::ProcessFile {
588 completed,
589 total: total_to_process,
590 },
591 );
592 on_prepared(prepared)?;
593 }
594 return Ok(());
595 }
596
597 let chunks = split_scanned_refs_preserving_hardlinks(scanned_files, worker_count);
598
599 std::thread::scope(|scope| -> Result<()> {
600 let (sender, receiver) = mpsc::sync_channel::<Result<PreparedFile>>(
601 PREPARED_FILE_PIPELINE_SLOTS.max(worker_count),
602 );
603
604 let handles: Vec<_> = chunks
605 .into_iter()
606 .map(|chunk| {
607 let sender = sender.clone();
608 scope.spawn(move || {
609 let mut hardlinks = HashMap::new();
610
611 for scanned in chunk {
612 let result =
613 prepare_file_with_hardlink_cache(scanned, seen_hashes, &mut hardlinks);
614 let completed = progress_counter.fetch_add(1, Ordering::Relaxed) + 1;
615 emit(
616 progress,
617 ProgressEvent::ProcessFile {
618 completed,
619 total: total_to_process,
620 },
621 );
622 if sender.send(result).is_err() {
623 return;
624 }
625 }
626 })
627 })
628 .collect();
629 drop(sender);
630
631 let mut consumer_result = Ok(());
632 loop {
633 match receiver.recv() {
634 Ok(Ok(prepared)) => {
635 if let Err(error) = on_prepared(prepared) {
636 consumer_result = Err(error);
637 break;
638 }
639 }
640 Ok(Err(error)) => {
641 consumer_result = Err(error);
642 break;
643 }
644 Err(_) => break,
645 }
646 }
647 drop(receiver);
648
649 for handle in handles {
650 handle.join().unwrap();
651 }
652
653 consumer_result
654 })
655}
656
657fn build_tree(processed_files: &[ProcessedFile], tree_store: &TreeStore) -> Result<String> {
662 let mut dir_files: BTreeMap<String, Vec<&ProcessedFile>> = BTreeMap::new();
664 let mut all_dirs: BTreeSet<String> = BTreeSet::new();
665 let mut child_dirs: BTreeMap<String, Vec<String>> = BTreeMap::new();
666 all_dirs.insert(String::new()); for pf in processed_files {
669 let parent = match pf.relative_path.rfind('/') {
670 Some(pos) => &pf.relative_path[..pos],
671 None => "",
672 };
673 dir_files.entry(parent.to_string()).or_default().push(pf);
674 register_directory_hierarchy(parent, &mut all_dirs, &mut child_dirs);
675 }
676
677 let mut dir_list: Vec<String> = all_dirs.into_iter().collect();
679 dir_list.sort_unstable_by(|a, b| {
680 let depth_a = if a.is_empty() {
681 0
682 } else {
683 a.matches('/').count() + 1
684 };
685 let depth_b = if b.is_empty() {
686 0
687 } else {
688 b.matches('/').count() + 1
689 };
690 depth_b.cmp(&depth_a).then_with(|| a.cmp(b))
691 });
692
693 let mut dir_hashes: BTreeMap<String, String> = BTreeMap::new();
695 let mut pack_entries: Vec<(String, Vec<u8>)> = Vec::with_capacity(dir_list.len());
696 let mut known_hashes: HashSet<[u8; 16]> = HashSet::with_capacity(dir_list.len());
697
698 for dir in &dir_list {
699 let file_count = dir_files.get(dir).map(|files| files.len()).unwrap_or(0);
700 let child_count = child_dirs
701 .get(dir)
702 .map(|children| children.len())
703 .unwrap_or(0);
704 let mut entries: Vec<TreeEntry> = Vec::with_capacity(file_count + child_count);
705
706 if let Some(files) = dir_files.get(dir) {
707 for pf in files {
708 let name = match pf.relative_path.rfind('/') {
709 Some(pos) => &pf.relative_path[pos + 1..],
710 None => &pf.relative_path,
711 };
712 entries.push(TreeEntry {
713 name: name.to_string(),
714 entry_type: pf.entry_type,
715 hash: pf.blob_hash_bytes,
716 size: pf.size,
717 mode: pf.mode,
718 });
719 }
720 }
721
722 if let Some(children) = child_dirs.get(dir) {
723 for sub_dir in children {
724 let sub_hash = dir_hashes.get(sub_dir).ok_or_else(|| {
725 ChkpttError::Other(format!("Missing tree hash for directory '{}'", sub_dir))
726 })?;
727 let sub_name = match sub_dir.rfind('/') {
728 Some(pos) => &sub_dir[pos + 1..],
729 None => sub_dir.as_str(),
730 };
731 entries.push(TreeEntry {
732 name: sub_name.to_string(),
733 entry_type: EntryType::Dir,
734 hash: hex_to_bytes(sub_hash)?,
735 size: 0,
736 mode: 0o040755,
737 });
738 }
739 }
740
741 entries.sort_unstable_by(|a, b| a.name.cmp(&b.name));
742 let encoded = bitcode::encode(&entries);
743 let hash_bytes = xxhash_rust::xxh3::xxh3_128(&encoded).to_le_bytes();
744 let hash_hex = crate::store::blob::bytes_to_hex(&hash_bytes);
745
746 dir_hashes.insert(dir.clone(), hash_hex.clone());
747 if known_hashes.insert(hash_bytes) {
748 pack_entries.push((hash_hex, encoded));
749 }
750 }
751
752 tree_store.write_pack(&pack_entries)?;
754
755 dir_hashes
756 .get("")
757 .cloned()
758 .ok_or_else(|| ChkpttError::Other("Failed to build root tree".into()))
759}
760
761fn register_directory_hierarchy(
762 dir: &str,
763 all_dirs: &mut BTreeSet<String>,
764 child_dirs: &mut BTreeMap<String, Vec<String>>,
765) {
766 if dir.is_empty() {
767 return;
768 }
769
770 if all_dirs.contains(dir) {
772 return;
773 }
774
775 let mut segments_end = Vec::new();
776 for (i, ch) in dir.char_indices() {
777 if ch == '/' {
778 segments_end.push(i);
779 }
780 }
781 segments_end.push(dir.len());
782
783 let mut parent = String::new();
784 for &end in &segments_end {
785 let current = &dir[..end];
786 if all_dirs.insert(current.to_string()) {
787 child_dirs
788 .entry(parent)
789 .or_default()
790 .push(current.to_string());
791 }
792 parent = current.to_string();
793 }
794}
795
796#[cfg(test)]
797mod tests {
798 use super::*;
799 use std::fs;
800 use std::sync::atomic::{AtomicUsize, Ordering};
801 use std::sync::Arc;
802 use tempfile::TempDir;
803
804 #[test]
805 fn test_hex_to_bytes() {
806 let hex = "a3b2c1d4e5f60718293a4b5c6d7e8f90";
807 let bytes = hex_to_bytes(hex).unwrap();
808 assert_eq!(bytes[0], 0xa3);
809 assert_eq!(bytes[1], 0xb2);
810 assert_eq!(bytes[15], 0x90);
811 }
812
813 #[test]
814 fn test_hex_to_bytes_invalid_length() {
815 assert!(hex_to_bytes("abc").is_err());
816 }
817
818 #[test]
819 fn test_store_has_external_objects_false_for_empty_store() {
820 let dir = TempDir::new().unwrap();
821 let objects_dir = dir.path().join("objects");
822 let packs_dir = dir.path().join("packs");
823 std::fs::create_dir_all(&objects_dir).unwrap();
824 std::fs::create_dir_all(&packs_dir).unwrap();
825
826 assert!(!store_has_external_objects(&objects_dir, &packs_dir).unwrap());
827 }
828
829 #[test]
830 fn test_store_has_external_objects_true_for_pack_objects() {
831 let dir = TempDir::new().unwrap();
832 let objects_dir = dir.path().join("objects");
833 let packs_dir = dir.path().join("packs");
834 std::fs::create_dir_all(&packs_dir).unwrap();
835 std::fs::write(packs_dir.join("pack-demo.dat"), b"pack").unwrap();
836
837 assert!(store_has_external_objects(&objects_dir, &packs_dir).unwrap());
838 }
839
840 #[test]
841 fn test_compress_with_worker_context_roundtrip() {
842 let content = b"compression-context-roundtrip-data";
843
844 let compressed = compress_with_worker_context(content);
845 let decompressed = {
846 use lz4_flex::frame::FrameDecoder;
847 let mut decoder = FrameDecoder::new(&compressed[..]);
848 let mut buf = Vec::new();
849 std::io::Read::read_to_end(&mut decoder, &mut buf).unwrap();
850 buf
851 };
852
853 assert_eq!(decompressed, content);
854 }
855
856 #[test]
857 fn test_sharded_seen_hashes_dedups_single_thread() {
858 let seen = ShardedSeenHashes::new(8);
859 let hash = [7u8; 16];
860
861 assert!(seen.insert(hash));
862 assert!(!seen.insert(hash));
863 }
864
865 #[test]
866 fn test_sharded_seen_hashes_dedups_multi_thread() {
867 let seen = Arc::new(ShardedSeenHashes::new(1024));
868 let unique_inserts = Arc::new(AtomicUsize::new(0));
869 let duplicate_hash = [9u8; 16];
870
871 std::thread::scope(|scope| {
872 for i in 0..8u8 {
873 let seen = Arc::clone(&seen);
874 let unique_inserts = Arc::clone(&unique_inserts);
875 scope.spawn(move || {
876 if seen.insert(duplicate_hash) {
877 unique_inserts.fetch_add(1, Ordering::Relaxed);
878 }
879 let mut unique_hash = [0u8; 16];
880 unique_hash[0] = i;
881 if seen.insert(unique_hash) {
882 unique_inserts.fetch_add(1, Ordering::Relaxed);
883 }
884 });
885 }
886 });
887
888 assert_eq!(unique_inserts.load(Ordering::Relaxed), 9);
889 }
890
891 #[test]
892 fn test_split_scanned_refs_preserves_hardlink_groups() {
893 let f1 = scanned("a.txt", Some(1));
894 let f2 = scanned("b.txt", Some(1));
895 let f3 = scanned("c.txt", Some(2));
896 let f4 = scanned("d.txt", Some(3));
897 let f5 = scanned("e.txt", Some(3));
898 let refs = vec![&f1, &f2, &f3, &f4, &f5];
899
900 let chunks = split_scanned_refs_preserving_hardlinks(&refs, 2);
901 let paths: Vec<Vec<&str>> = chunks
902 .into_iter()
903 .map(|chunk| {
904 chunk
905 .iter()
906 .map(|file| file.relative_path.as_str())
907 .collect::<Vec<_>>()
908 })
909 .collect();
910
911 assert_eq!(
912 paths,
913 vec![vec!["a.txt", "b.txt", "c.txt"], vec!["d.txt", "e.txt"]]
914 );
915 }
916
917 #[cfg(unix)]
918 #[test]
919 fn test_prepare_file_with_hardlink_cache_reuses_existing_read() {
920 let dir = TempDir::new().unwrap();
921 let original = dir.path().join("original.txt");
922 let alias = dir.path().join("alias.txt");
923 fs::write(&original, "same-content").unwrap();
924 fs::hard_link(&original, &alias).unwrap();
925
926 let original_scanned = scanned_from_path("original.txt", &original);
927 let alias_scanned = scanned_from_path("alias.txt", &alias);
928 let seen_hashes = ShardedSeenHashes::new(2);
929 let mut hardlinks = HashMap::new();
930
931 let first =
932 prepare_file_with_hardlink_cache(&original_scanned, &seen_hashes, &mut hardlinks)
933 .unwrap();
934 let second =
935 prepare_file_with_hardlink_cache(&alias_scanned, &seen_hashes, &mut hardlinks).unwrap();
936
937 assert!(first.compressed.is_some());
938 assert!(second.compressed.is_none());
939 assert_eq!(first.blob_hash_bytes, second.blob_hash_bytes);
940 }
941
942 fn scanned(relative_path: &str, inode: Option<u64>) -> ScannedFile {
943 ScannedFile {
944 relative_path: relative_path.to_string(),
945 absolute_path: std::path::PathBuf::from(relative_path),
946 size: 1,
947 mtime_secs: 1,
948 mtime_nanos: 1,
949 device: Some(1),
950 inode,
951 mode: 0o100644,
952 is_symlink: false,
953 }
954 }
955
956 #[cfg(unix)]
957 fn scanned_from_path(relative_path: &str, path: &Path) -> ScannedFile {
958 use std::os::unix::fs::MetadataExt;
959
960 let metadata = fs::metadata(path).unwrap();
961 ScannedFile {
962 relative_path: relative_path.to_string(),
963 absolute_path: path.to_path_buf(),
964 size: metadata.len(),
965 mtime_secs: metadata.mtime(),
966 mtime_nanos: metadata.mtime_nsec(),
967 device: Some(metadata.dev()),
968 inode: Some(metadata.ino()),
969 mode: metadata.mode(),
970 is_symlink: metadata.file_type().is_symlink(),
971 }
972 }
973}