1use std::fs;
63use std::io::{self, BufRead, BufReader, Read, Write};
64use std::path::{Path, PathBuf};
65
66use snapdir_core::manifest::Manifest;
67use snapdir_core::merkle::{snapshot_id, Blake3Hasher, Hasher};
68use snapdir_core::store::{manifest_path, object_path, StoreError};
69
70use crate::file_store::FileStore;
71use crate::stream::StreamStore;
72
73pub const WIRE_VERSION: u32 = 1;
77
78pub const WIRE_CAPS: &[&str] = &["objects-needed", "send-pack", "receive-pack"];
80
81pub const WIRE_MAGIC: &str = "SNAPPACK 1\n";
84
85pub const MAX_HEADER_BYTES: usize = 128;
90
91pub const MAX_MANIFEST_BYTES: u64 = 64 * 1024 * 1024;
94
95const STAGE_PREALLOC_CAP: u64 = 8 * 1024 * 1024;
100
101#[must_use]
108pub fn is_hex64(s: &str) -> bool {
109 s.len() == 64 && s.bytes().all(|b| matches!(b, b'0'..=b'9' | b'a'..=b'f'))
110}
111
112#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
114pub struct PackWriteReport {
115 pub objects_written: u64,
118 pub manifest_written: bool,
120}
121
122#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
124pub struct PackReadReport {
125 pub objects_written: u64,
127 pub objects_skipped: u64,
130 pub manifest_committed: bool,
133}
134
135pub trait PackSink {
151 fn has_object(&mut self, checksum: &str) -> Result<bool, StoreError>;
154
155 fn stage_object(
161 &mut self,
162 checksum: &str,
163 len: u64,
164 payload: &mut dyn Read,
165 ) -> Result<(), StoreError>;
166
167 fn commit_object(&mut self, checksum: &str) -> Result<(), StoreError>;
169
170 fn abort_object(&mut self, checksum: &str);
173
174 fn put_manifest(&mut self, id: &str, manifest: &Manifest) -> Result<(), StoreError>;
177}
178
179pub struct StreamSink<'a> {
185 store: &'a dyn StreamStore,
186 staged: Option<(String, Vec<u8>)>,
187}
188
189impl<'a> StreamSink<'a> {
190 #[must_use]
192 pub fn new(store: &'a dyn StreamStore) -> Self {
193 Self {
194 store,
195 staged: None,
196 }
197 }
198}
199
200impl PackSink for StreamSink<'_> {
201 fn has_object(&mut self, checksum: &str) -> Result<bool, StoreError> {
202 self.store.has_object(checksum)
203 }
204
205 fn stage_object(
206 &mut self,
207 checksum: &str,
208 len: u64,
209 payload: &mut dyn Read,
210 ) -> Result<(), StoreError> {
211 self.staged = None;
214 let prealloc = usize::try_from(len.min(STAGE_PREALLOC_CAP)).unwrap_or(0);
218 let mut buf = Vec::with_capacity(prealloc);
219 payload.read_to_end(&mut buf)?;
220 self.staged = Some((checksum.to_owned(), buf));
221 Ok(())
222 }
223
224 fn commit_object(&mut self, checksum: &str) -> Result<(), StoreError> {
225 match self.staged.take() {
226 Some((staged_checksum, bytes)) if staged_checksum == checksum => {
227 self.store.put_object(checksum, bytes)
230 }
231 _ => Err(protocol(format!(
232 "internal pack sink error: commit of {checksum} without a matching staged payload"
233 ))),
234 }
235 }
236
237 fn abort_object(&mut self, _checksum: &str) {
238 self.staged = None;
239 }
240
241 fn put_manifest(&mut self, id: &str, manifest: &Manifest) -> Result<(), StoreError> {
242 self.store.put_manifest(id, manifest)
243 }
244}
245
246pub struct FileSink<'a> {
256 store: &'a FileStore,
257 staged: Option<StagedFile>,
258}
259
260struct StagedFile {
262 checksum: String,
263 tmp: PathBuf,
264 target: PathBuf,
265}
266
267impl<'a> FileSink<'a> {
268 #[must_use]
270 pub fn new(store: &'a FileStore) -> Self {
271 Self {
272 store,
273 staged: None,
274 }
275 }
276}
277
278impl Drop for FileSink<'_> {
279 fn drop(&mut self) {
282 if let Some(staged) = self.staged.take() {
283 let _ = fs::remove_file(&staged.tmp);
284 }
285 }
286}
287
288impl PackSink for FileSink<'_> {
289 fn has_object(&mut self, checksum: &str) -> Result<bool, StoreError> {
290 StreamStore::has_object(self.store, checksum)
291 }
292
293 fn stage_object(
294 &mut self,
295 checksum: &str,
296 _len: u64,
297 payload: &mut dyn Read,
298 ) -> Result<(), StoreError> {
299 self.abort_object(checksum);
301
302 let target = self.store.root().join(object_path(checksum));
305 if let Some(parent) = target.parent() {
306 fs::create_dir_all(parent)?;
307 }
308 let tmp = temp_sibling(&target);
309 let mut file = fs::File::create(&tmp)?;
310 let copied = io::copy(payload, &mut file);
313 drop(file);
314 if let Err(err) = copied {
315 let _ = fs::remove_file(&tmp);
317 return Err(err.into());
318 }
319 self.staged = Some(StagedFile {
320 checksum: checksum.to_owned(),
321 tmp,
322 target,
323 });
324 Ok(())
325 }
326
327 fn commit_object(&mut self, checksum: &str) -> Result<(), StoreError> {
328 match self.staged.take() {
329 Some(staged) if staged.checksum == checksum => {
330 fs::rename(&staged.tmp, &staged.target)?;
334 Ok(())
335 }
336 other => {
337 if let Some(staged) = other {
338 let _ = fs::remove_file(&staged.tmp);
339 }
340 Err(protocol(format!(
341 "internal pack sink error: commit of {checksum} without a matching staged payload"
342 )))
343 }
344 }
345 }
346
347 fn abort_object(&mut self, _checksum: &str) {
348 if let Some(staged) = self.staged.take() {
349 let _ = fs::remove_file(&staged.tmp);
350 }
351 }
352
353 fn put_manifest(&mut self, id: &str, manifest: &Manifest) -> Result<(), StoreError> {
354 self.store.put_manifest(id, manifest)
357 }
358}
359
360pub fn write_pack(
381 source: &dyn StreamStore,
382 ids: &[String],
383 manifest_id: Option<&str>,
384 mut out: impl Write,
385) -> Result<PackWriteReport, StoreError> {
386 for id in ids {
388 if !is_hex64(id) {
389 return Err(protocol(format!(
390 "invalid object checksum {id:?}: expected 64 lowercase hex characters"
391 )));
392 }
393 }
394 if let Some(id) = manifest_id {
395 if !is_hex64(id) {
396 return Err(protocol(format!(
397 "invalid manifest id {id:?}: expected 64 lowercase hex characters"
398 )));
399 }
400 }
401
402 let hasher = Blake3Hasher::new();
403
404 let manifest_payload: Option<(&str, Vec<u8>)> = match manifest_id {
409 Some(id) => {
410 let manifest = source.get_manifest(id)?;
411 let mut text = manifest.to_string();
412 text.push('\n');
413 let bytes = text.into_bytes();
414 let actual = hasher.hash_hex(&bytes);
415 if actual != id {
416 return Err(StoreError::Integrity {
417 address: manifest_path(id),
418 expected: id.to_owned(),
419 actual,
420 });
421 }
422 Some((id, bytes))
423 }
424 None => None,
425 };
426
427 out.write_all(WIRE_MAGIC.as_bytes())?;
428 let mut report = PackWriteReport::default();
429
430 for id in ids {
431 let bytes = source.get_object(id)?;
435 let actual = hasher.hash_hex(&bytes);
436 if actual != *id {
437 return Err(StoreError::Integrity {
438 address: object_path(id),
439 expected: id.clone(),
440 actual,
441 });
442 }
443 writeln!(out, "obj {id} {}", bytes.len())?;
444 out.write_all(&bytes)?;
445 report.objects_written += 1;
446 }
447
448 if let Some((id, bytes)) = manifest_payload {
449 writeln!(out, "manifest {id} {}", bytes.len())?;
450 out.write_all(&bytes)?;
451 report.manifest_written = true;
452 }
453
454 out.write_all(b"end\n")?;
455 out.flush()?;
456 Ok(report)
457}
458
459pub fn read_pack(input: impl Read, sink: &mut dyn PackSink) -> Result<PackReadReport, StoreError> {
472 let mut input = BufReader::new(input);
473
474 check_magic(&read_header_line(&mut input)?)?;
475
476 let mut report = PackReadReport::default();
477 let mut pending_manifest: Option<(String, Manifest)> = None;
478
479 loop {
480 let line = read_header_line(&mut input)?;
481 if line == "end" {
482 if let Some((id, manifest)) = pending_manifest.take() {
486 sink.put_manifest(&id, &manifest)?;
487 report.manifest_committed = true;
488 }
489 return Ok(report);
490 }
491 if pending_manifest.is_some() {
492 return Err(protocol(format!(
493 "record after the manifest record (only the `end` trailer may follow it): {line:?}"
494 )));
495 }
496 let (kind, checksum, len) = parse_record_header(&line)?;
497 match kind {
498 RecordKind::Obj => read_obj_record(&mut input, sink, &checksum, len, &mut report)?,
499 RecordKind::Manifest => {
500 pending_manifest = Some(read_manifest_record(&mut input, &checksum, len)?);
501 }
502 }
503 }
504}
505
506enum RecordKind {
508 Obj,
509 Manifest,
510}
511
512fn read_obj_record(
514 input: &mut dyn Read,
515 sink: &mut dyn PackSink,
516 checksum: &str,
517 len: u64,
518 report: &mut PackReadReport,
519) -> Result<(), StoreError> {
520 let present = sink.has_object(checksum)?;
521 let mut payload = HashingTake::new(input, len);
522
523 if present {
524 payload.drain()?;
527 } else if let Err(err) = sink.stage_object(checksum, len, &mut payload) {
528 sink.abort_object(checksum);
529 return Err(err);
530 }
531
532 if payload.remaining() > 0 {
533 if !present {
534 sink.abort_object(checksum);
535 }
536 return Err(if payload.hit_eof() {
537 protocol(format!(
538 "truncated pack stream: EOF inside the payload of object {checksum} \
539 ({} of {len} bytes missing)",
540 payload.remaining()
541 ))
542 } else {
543 protocol(format!(
544 "internal pack sink error: sink consumed only {} of {len} payload bytes \
545 for object {checksum}",
546 len - payload.remaining()
547 ))
548 });
549 }
550
551 let actual = payload.finalize_hex();
555 if actual != checksum {
556 if !present {
557 sink.abort_object(checksum);
558 }
559 return Err(StoreError::Integrity {
560 address: object_path(checksum),
561 expected: checksum.to_owned(),
562 actual,
563 });
564 }
565
566 if present {
567 report.objects_skipped += 1;
568 } else {
569 sink.commit_object(checksum)?;
570 report.objects_written += 1;
571 }
572 Ok(())
573}
574
575fn read_manifest_record(
578 input: &mut dyn Read,
579 id: &str,
580 len: u64,
581) -> Result<(String, Manifest), StoreError> {
582 if len > MAX_MANIFEST_BYTES {
583 return Err(protocol(format!(
584 "manifest record of {len} bytes exceeds the {MAX_MANIFEST_BYTES}-byte cap"
585 )));
586 }
587 let mut payload = HashingTake::new(input, len);
588 let mut buf = Vec::with_capacity(usize::try_from(len.min(STAGE_PREALLOC_CAP)).unwrap_or(0));
591 payload.read_to_end(&mut buf)?;
592 if payload.remaining() > 0 {
593 return Err(protocol(format!(
594 "truncated pack stream: EOF inside the payload of manifest {id} \
595 ({} of {len} bytes missing)",
596 payload.remaining()
597 )));
598 }
599
600 let actual = payload.finalize_hex();
603 if actual != id {
604 return Err(StoreError::Integrity {
605 address: manifest_path(id),
606 expected: id.to_owned(),
607 actual,
608 });
609 }
610
611 let text = String::from_utf8(buf).map_err(|err| StoreError::Backend {
615 message: format!("manifest {id} payload is not valid UTF-8"),
616 source: Some(Box::new(err)),
617 })?;
618 let manifest = Manifest::parse(&text)?;
619 let rendered_id = snapshot_id(&manifest, &Blake3Hasher::new());
620 if rendered_id != id {
621 return Err(StoreError::Integrity {
622 address: manifest_path(id),
623 expected: id.to_owned(),
624 actual: rendered_id,
625 });
626 }
627
628 Ok((id.to_owned(), manifest))
629}
630
631fn protocol(message: impl Into<String>) -> StoreError {
634 StoreError::Backend {
635 message: message.into(),
636 source: None,
637 }
638}
639
640fn read_header_line(input: &mut impl BufRead) -> Result<String, StoreError> {
646 let mut line: Vec<u8> = Vec::with_capacity(32);
647 loop {
648 let mut byte = [0u8; 1];
649 let n = input.read(&mut byte)?;
650 if n == 0 {
651 return Err(protocol(if line.is_empty() {
652 "truncated pack stream: unexpected EOF before the `end` trailer".to_owned()
653 } else {
654 format!(
655 "truncated pack stream: EOF inside a header line (read {:?} so far)",
656 String::from_utf8_lossy(&line)
657 )
658 }));
659 }
660 if byte[0] == b'\n' {
661 break;
662 }
663 line.push(byte[0]);
664 if line.len() >= MAX_HEADER_BYTES {
665 return Err(protocol(format!(
666 "header line exceeds the {MAX_HEADER_BYTES}-byte cap"
667 )));
668 }
669 }
670 String::from_utf8(line).map_err(|err| StoreError::Backend {
671 message: "header line is not valid UTF-8".to_owned(),
672 source: Some(Box::new(err)),
673 })
674}
675
676fn check_magic(line: &str) -> Result<(), StoreError> {
680 let Some(version) = line.strip_prefix("SNAPPACK ") else {
681 return Err(protocol(format!(
682 "bad pack magic {line:?} (expected {:?})",
683 WIRE_MAGIC.trim_end()
684 )));
685 };
686 if version != WIRE_VERSION.to_string() {
687 return Err(protocol(format!(
688 "unsupported pack wire version {version:?}: this build speaks wire={WIRE_VERSION}"
689 )));
690 }
691 Ok(())
692}
693
694fn parse_record_header(line: &str) -> Result<(RecordKind, String, u64), StoreError> {
698 let mut parts = line.split(' ');
699 let kind = match parts.next() {
700 Some("obj") => RecordKind::Obj,
701 Some("manifest") => RecordKind::Manifest,
702 _ => {
703 return Err(protocol(format!(
704 "unknown record header {line:?} (expected `obj`, `manifest`, or `end`)"
705 )));
706 }
707 };
708 let (Some(checksum), Some(len_token), None) = (parts.next(), parts.next(), parts.next()) else {
709 return Err(protocol(format!(
710 "malformed record header {line:?} (expected `<kind> <hex64> <len>`)"
711 )));
712 };
713 if !is_hex64(checksum) {
714 return Err(protocol(format!(
715 "invalid checksum {checksum:?} in record header: expected 64 lowercase hex characters"
716 )));
717 }
718 if len_token.is_empty() || !len_token.bytes().all(|b| b.is_ascii_digit()) {
719 return Err(protocol(format!(
720 "invalid payload length {len_token:?} in record header: expected a decimal u64"
721 )));
722 }
723 let len: u64 = len_token.parse().map_err(|_| {
724 protocol(format!(
725 "payload length {len_token:?} does not fit in a u64"
726 ))
727 })?;
728 Ok((kind, checksum.to_owned(), len))
729}
730
731struct HashingTake<'a> {
737 inner: &'a mut dyn Read,
738 remaining: u64,
739 hit_eof: bool,
740 hasher: blake3::Hasher,
741}
742
743impl<'a> HashingTake<'a> {
744 fn new(inner: &'a mut dyn Read, len: u64) -> Self {
745 Self {
746 inner,
747 remaining: len,
748 hit_eof: false,
749 hasher: blake3::Hasher::new(),
750 }
751 }
752
753 fn remaining(&self) -> u64 {
755 self.remaining
756 }
757
758 fn hit_eof(&self) -> bool {
761 self.hit_eof
762 }
763
764 fn finalize_hex(&self) -> String {
766 self.hasher.finalize().to_hex().to_string()
767 }
768
769 fn drain(&mut self) -> io::Result<()> {
772 let mut buf = [0u8; 8 * 1024];
773 loop {
774 if self.read(&mut buf)? == 0 {
775 return Ok(());
776 }
777 }
778 }
779}
780
781impl Read for HashingTake<'_> {
782 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
783 if self.remaining == 0 || buf.is_empty() {
784 return Ok(0);
785 }
786 let cap = usize::try_from(self.remaining)
787 .unwrap_or(usize::MAX)
788 .min(buf.len());
789 let n = self.inner.read(&mut buf[..cap])?;
790 if n == 0 {
791 self.hit_eof = true;
792 return Ok(0);
793 }
794 self.hasher.update(&buf[..n]);
795 self.remaining -= n as u64;
796 Ok(n)
797 }
798}
799
800fn temp_sibling(target: &Path) -> PathBuf {
805 use std::sync::atomic::{AtomicU64, Ordering};
806 static COUNTER: AtomicU64 = AtomicU64::new(0);
807 let n = COUNTER.fetch_add(1, Ordering::Relaxed);
808 let pid = std::process::id();
809 let file_name = target
810 .file_name()
811 .map(|s| s.to_string_lossy().into_owned())
812 .unwrap_or_default();
813 let tmp_name = format!("{file_name}.{pid}.{n}.tmp");
814 match target.parent() {
815 Some(parent) => parent.join(tmp_name),
816 None => PathBuf::from(tmp_name),
817 }
818}
819
820#[cfg(test)]
821mod tests {
822 use super::*;
823 use snapdir_core::manifest::{ManifestEntry, PathType};
824 use snapdir_core::store::Store;
825 use std::fs;
826
827 struct TempDir {
830 path: PathBuf,
831 }
832
833 impl TempDir {
834 fn new(tag: &str) -> Self {
835 use std::sync::atomic::{AtomicU64, Ordering};
836 static COUNTER: AtomicU64 = AtomicU64::new(0);
837 let n = COUNTER.fetch_add(1, Ordering::Relaxed);
838 let path = std::env::temp_dir().join(format!(
839 "snapdir-pack-test-{}-{tag}-{n}",
840 std::process::id()
841 ));
842 fs::create_dir_all(&path).expect("create temp dir");
843 Self { path }
844 }
845
846 fn path(&self) -> &Path {
847 &self.path
848 }
849 }
850
851 impl Drop for TempDir {
852 fn drop(&mut self) {
853 let _ = fs::remove_dir_all(&self.path);
854 }
855 }
856
857 fn big_payload(len: usize) -> Vec<u8> {
859 (0..len).map(|i| u8::try_from(i % 251).unwrap()).collect()
860 }
861
862 fn seed_store(tag: &str, payloads: &[Vec<u8>]) -> (TempDir, FileStore, Vec<String>) {
865 let dir = TempDir::new(tag);
866 let store = FileStore::from_root(dir.path());
867 let hasher = Blake3Hasher::new();
868 let ids = payloads
869 .iter()
870 .map(|p| {
871 let checksum = hasher.hash_hex(p);
872 store.put_object(&checksum, p.clone()).expect("seed object");
873 checksum
874 })
875 .collect();
876 (dir, store, ids)
877 }
878
879 fn manifest_for(payloads: &[Vec<u8>]) -> (Manifest, String) {
882 let hasher = Blake3Hasher::new();
883 let mut manifest = Manifest::new();
884 manifest.push(ManifestEntry::new(PathType::Directory, "700", "x", 0, "./"));
885 for (i, payload) in payloads.iter().enumerate() {
886 manifest.push(ManifestEntry::new(
887 PathType::File,
888 "600",
889 hasher.hash_hex(payload),
890 u64::try_from(payload.len()).unwrap(),
891 format!("./obj-{i:02}"),
892 ));
893 }
894 let manifest = Manifest::from_entries(manifest.entries().to_vec());
895 let id = snapshot_id(&manifest, &hasher);
896 (manifest, id)
897 }
898
899 fn manifest_bytes(manifest: &Manifest) -> Vec<u8> {
901 let mut text = manifest.to_string();
902 text.push('\n');
903 text.into_bytes()
904 }
905
906 fn files_under(dir: &Path) -> Vec<PathBuf> {
909 let mut files = Vec::new();
910 let Ok(entries) = fs::read_dir(dir) else {
911 return files;
912 };
913 for entry in entries.flatten() {
914 let path = entry.path();
915 if path.is_dir() {
916 files.extend(files_under(&path));
917 } else {
918 files.push(path);
919 }
920 }
921 files
922 }
923
924 fn raw_record(kind: &str, hex: &str, payload: &[u8]) -> Vec<u8> {
926 let mut out = format!("{kind} {hex} {}\n", payload.len()).into_bytes();
927 out.extend_from_slice(payload);
928 out
929 }
930
931 fn raw_stream(records: &[Vec<u8>]) -> Vec<u8> {
933 let mut out = WIRE_MAGIC.as_bytes().to_vec();
934 for record in records {
935 out.extend_from_slice(record);
936 }
937 out.extend_from_slice(b"end\n");
938 out
939 }
940
941 fn hex_of(bytes: &[u8]) -> String {
942 Blake3Hasher::new().hash_hex(bytes)
943 }
944
945 #[test]
948 fn pack_wire_constants_are_consistent() {
949 assert_eq!(WIRE_MAGIC, format!("SNAPPACK {WIRE_VERSION}\n"));
950 assert_eq!(WIRE_VERSION, 1);
951 assert_eq!(WIRE_CAPS, &["objects-needed", "send-pack", "receive-pack"]);
952 }
953
954 #[test]
955 fn pack_is_hex64_validates_shape() {
956 let valid = "0123456789abcdef".repeat(4);
957 assert!(is_hex64(&valid));
958 assert!(!is_hex64(&valid[..63])); assert!(!is_hex64(&format!("{valid}0"))); assert!(!is_hex64(&valid.to_uppercase())); assert!(!is_hex64(&format!("g{}", &valid[1..]))); assert!(!is_hex64("")); }
964
965 #[test]
968 fn pack_roundtrip_file_sink_streams_objects_and_manifest() {
969 let payloads = vec![
971 Vec::new(),
972 b"hello pack\n".to_vec(),
973 big_payload(3 * 1024 * 1024 + 7),
974 ];
975 let (a_dir, a, ids) = seed_store("rt-a", &payloads);
976 let (manifest, man_id) = manifest_for(&payloads);
977 a.put_manifest(&man_id, &manifest).expect("seed manifest");
978
979 let mut pack = Vec::new();
980 let wrote = write_pack(&a, &ids, Some(&man_id), &mut pack).expect("write_pack");
981 assert_eq!(wrote.objects_written, 3);
982 assert!(wrote.manifest_written);
983
984 let b_dir = TempDir::new("rt-b");
985 let b = FileStore::from_root(b_dir.path());
986 let mut sink = FileSink::new(&b);
987 let read = read_pack(pack.as_slice(), &mut sink).expect("read_pack");
988 assert_eq!(read.objects_written, 3);
989 assert_eq!(read.objects_skipped, 0);
990 assert!(read.manifest_committed);
991
992 for (id, payload) in ids.iter().zip(&payloads) {
994 let key = object_path(id);
995 assert_eq!(
996 fs::read(b_dir.path().join(&key)).expect("b object"),
997 *payload,
998 "object {key} byte-equal"
999 );
1000 assert_eq!(
1001 fs::read(a_dir.path().join(&key)).expect("a object"),
1002 fs::read(b_dir.path().join(&key)).expect("b object"),
1003 );
1004 }
1005 let back = b.get_manifest(&man_id).expect("manifest in B");
1007 assert_eq!(back, manifest);
1008 assert_eq!(snapshot_id(&back, &Blake3Hasher::new()), man_id);
1009 assert!(
1011 !files_under(b_dir.path())
1012 .iter()
1013 .any(|p| p.to_string_lossy().ends_with(".tmp")),
1014 "no stray temp files after a clean stream"
1015 );
1016 }
1017
1018 #[test]
1019 fn pack_roundtrip_stream_sink_generic() {
1020 let payloads = vec![b"alpha\n".to_vec(), b"beta\n".to_vec()];
1021 let (_a_dir, a, ids) = seed_store("ss-a", &payloads);
1022 let (manifest, man_id) = manifest_for(&payloads);
1023 a.put_manifest(&man_id, &manifest).expect("seed manifest");
1024
1025 let mut pack = Vec::new();
1026 write_pack(&a, &ids, Some(&man_id), &mut pack).expect("write_pack");
1027
1028 let b_dir = TempDir::new("ss-b");
1029 let b = FileStore::from_root(b_dir.path());
1030 let mut sink = StreamSink::new(&b);
1031 let read = read_pack(pack.as_slice(), &mut sink).expect("read_pack");
1032 assert_eq!(read.objects_written, 2);
1033 assert!(read.manifest_committed);
1034
1035 for (id, payload) in ids.iter().zip(&payloads) {
1036 assert_eq!(b.get_object(id).expect("object"), *payload);
1037 }
1038 assert_eq!(b.get_manifest(&man_id).expect("manifest"), manifest);
1039 }
1040
1041 #[test]
1042 fn pack_empty_stream_roundtrips() {
1043 let payloads: Vec<Vec<u8>> = Vec::new();
1045 let (_a_dir, a, ids) = seed_store("empty-a", &payloads);
1046 let mut pack = Vec::new();
1047 let wrote = write_pack(&a, &ids, None, &mut pack).expect("write_pack");
1048 assert_eq!(wrote, PackWriteReport::default());
1049 assert_eq!(pack, b"SNAPPACK 1\nend\n");
1050
1051 let b_dir = TempDir::new("empty-b");
1052 let b = FileStore::from_root(b_dir.path());
1053 let mut sink = FileSink::new(&b);
1054 let read = read_pack(pack.as_slice(), &mut sink).expect("read_pack");
1055 assert_eq!(read, PackReadReport::default());
1056 }
1057
1058 #[test]
1059 fn pack_manifest_only_stream_completes_interrupted_push() {
1060 let payloads = vec![b"already there\n".to_vec()];
1063 let (_a_dir, a, _ids) = seed_store("mo-a", &payloads);
1064 let (manifest, man_id) = manifest_for(&payloads);
1065 a.put_manifest(&man_id, &manifest).expect("seed manifest");
1066
1067 let mut pack = Vec::new();
1068 let wrote = write_pack(&a, &[], Some(&man_id), &mut pack).expect("write_pack");
1069 assert_eq!(wrote.objects_written, 0);
1070 assert!(wrote.manifest_written);
1071
1072 let b_dir = TempDir::new("mo-b");
1073 let b = FileStore::from_root(b_dir.path());
1074 let mut sink = FileSink::new(&b);
1075 let read = read_pack(pack.as_slice(), &mut sink).expect("read_pack");
1076 assert!(read.manifest_committed);
1077 assert_eq!(b.get_manifest(&man_id).expect("manifest"), manifest);
1078 }
1079
1080 #[test]
1083 fn pack_rejects_oversized_header_line() {
1084 let mut stream = WIRE_MAGIC.as_bytes().to_vec();
1085 stream.extend_from_slice("o".repeat(200).as_bytes());
1086 stream.extend_from_slice(b"\nend\n");
1087 let b_dir = TempDir::new("hdr-cap");
1088 let b = FileStore::from_root(b_dir.path());
1089 let mut sink = FileSink::new(&b);
1090 let err = read_pack(stream.as_slice(), &mut sink).expect_err("must reject");
1091 assert!(err.to_string().contains("128-byte cap"), "got: {err}");
1092 }
1093
1094 #[test]
1095 fn pack_rejects_bad_magic_and_bad_version() {
1096 let b_dir = TempDir::new("magic");
1097 let b = FileStore::from_root(b_dir.path());
1098 for stream in [
1099 &b"SNAPHACK 1\nend\n"[..],
1100 &b"GARBAGE\nend\n"[..],
1101 &b"SNAPPACK 2\nend\n"[..],
1102 &b"SNAPPACK 01\nend\n"[..], &b"SNAPPACK \nend\n"[..],
1104 &b""[..], ] {
1106 let mut sink = FileSink::new(&b);
1107 assert!(
1108 read_pack(stream, &mut sink).is_err(),
1109 "stream {:?} must be rejected",
1110 String::from_utf8_lossy(stream)
1111 );
1112 }
1113 }
1114
1115 #[test]
1116 fn pack_rejects_malformed_checksums_and_lengths() {
1117 let valid = "0123456789abcdef".repeat(4);
1118 let headers = [
1119 format!("obj {} 0", valid.to_uppercase()), format!("obj {} 0", &valid[..63]), format!("obj {valid}0 0"), format!("obj g{} 0", &valid[1..]), format!("obj {valid} 12x"), format!("obj {valid} +5"), format!("obj {valid} 99999999999999999999"), format!("obj {valid}"), format!("obj {valid} 0 extra"), format!("blob {valid} 0"), format!("obj {valid} 0"), ];
1131 let b_dir = TempDir::new("hdr-bad");
1132 let b = FileStore::from_root(b_dir.path());
1133 for header in headers {
1134 let mut stream = WIRE_MAGIC.as_bytes().to_vec();
1135 stream.extend_from_slice(header.as_bytes());
1136 stream.extend_from_slice(b"\nend\n");
1137 let mut sink = FileSink::new(&b);
1138 assert!(
1139 read_pack(stream.as_slice(), &mut sink).is_err(),
1140 "header {header:?} must be rejected"
1141 );
1142 }
1143 }
1144
1145 #[test]
1148 fn pack_mismatched_object_files_nothing_and_leaves_no_temp() {
1149 let claimed = hex_of(b"good bytes");
1152 let evil = b"evil bytes"; let (manifest, man_id) = manifest_for(&[b"good bytes".to_vec()]);
1154 let stream = raw_stream(&[
1155 raw_record("obj", &claimed, evil),
1156 raw_record("manifest", &man_id, &manifest_bytes(&manifest)),
1157 ]);
1158
1159 let b_dir = TempDir::new("sec");
1160 let b = FileStore::from_root(b_dir.path());
1161 let mut sink = FileSink::new(&b);
1162 let err = read_pack(stream.as_slice(), &mut sink).expect_err("must abort");
1163 match err {
1164 StoreError::Integrity {
1165 expected, actual, ..
1166 } => {
1167 assert_eq!(expected, claimed);
1168 assert_eq!(actual, hex_of(evil));
1169 }
1170 other => panic!("expected Integrity, got {other:?}"),
1171 }
1172 drop(sink);
1173
1174 assert!(!StreamStore::has_object(&b, &claimed).unwrap());
1177 assert!(!b_dir.path().join(object_path(&claimed)).exists());
1178 assert!(matches!(
1179 b.get_manifest(&man_id),
1180 Err(StoreError::ManifestNotFound { .. })
1181 ));
1182 assert_eq!(
1183 files_under(b_dir.path()),
1184 Vec::<PathBuf>::new(),
1185 "no file (object, manifest, or stray temp) may survive"
1186 );
1187 }
1188
1189 #[test]
1190 fn pack_mismatch_aborts_even_when_object_already_present() {
1191 let good = b"present object\n".to_vec();
1195 let payloads = vec![good.clone()];
1196 let (b_dir, b, ids) = seed_store("dup-bad", &payloads);
1197
1198 let corrupt = b"PRESENT OBJECT\n"; let later = b"later payload\n";
1200 let stream = raw_stream(&[
1201 raw_record("obj", &ids[0], corrupt),
1202 raw_record("obj", &hex_of(later), later),
1203 ]);
1204
1205 let mut sink = FileSink::new(&b);
1206 let err = read_pack(stream.as_slice(), &mut sink).expect_err("must abort");
1207 assert!(matches!(err, StoreError::Integrity { .. }));
1208 drop(sink);
1209
1210 assert_eq!(b.get_object(&ids[0]).unwrap(), good);
1213 assert!(!StreamStore::has_object(&b, &hex_of(later)).unwrap());
1214 assert!(
1215 !files_under(b_dir.path())
1216 .iter()
1217 .any(|p| p.to_string_lossy().ends_with(".tmp")),
1218 "no stray temp files"
1219 );
1220 }
1221
1222 #[test]
1225 fn pack_truncated_before_end_files_objects_but_never_manifest() {
1226 let payloads = vec![b"one\n".to_vec(), b"two\n".to_vec()];
1230 let (_a_dir, a, ids) = seed_store("trunc-a", &payloads);
1231 let (manifest, man_id) = manifest_for(&payloads);
1232 a.put_manifest(&man_id, &manifest).expect("seed manifest");
1233
1234 let mut pack = Vec::new();
1235 write_pack(&a, &ids, Some(&man_id), &mut pack).expect("write_pack");
1236 assert!(pack.ends_with(b"end\n"));
1237 let cut = &pack[..pack.len() - b"end\n".len()];
1238
1239 let b_dir = TempDir::new("trunc-b");
1240 let b = FileStore::from_root(b_dir.path());
1241 let mut sink = FileSink::new(&b);
1242 let err = read_pack(cut, &mut sink).expect_err("truncation is a hard error");
1243 assert!(err.to_string().contains("truncated"), "got: {err}");
1244 drop(sink);
1245
1246 for (id, payload) in ids.iter().zip(&payloads) {
1248 assert_eq!(b.get_object(id).unwrap(), *payload);
1249 }
1250 assert!(matches!(
1252 b.get_manifest(&man_id),
1253 Err(StoreError::ManifestNotFound { .. })
1254 ));
1255 assert!(
1256 !files_under(b_dir.path())
1257 .iter()
1258 .any(|p| p.to_string_lossy().ends_with(".tmp")),
1259 "no stray temp files"
1260 );
1261 }
1262
1263 #[test]
1264 fn pack_truncated_mid_payload_keeps_earlier_objects_drops_partial() {
1265 let payloads = vec![b"first object\n".to_vec(), big_payload(256 * 1024)];
1266 let (_a_dir, a, ids) = seed_store("midcut-a", &payloads);
1267
1268 let mut pack = Vec::new();
1269 write_pack(&a, &ids, None, &mut pack).expect("write_pack");
1270 let cut = &pack[..pack.len() - 100_000];
1272
1273 let b_dir = TempDir::new("midcut-b");
1274 let b = FileStore::from_root(b_dir.path());
1275 let mut sink = FileSink::new(&b);
1276 let err = read_pack(cut, &mut sink).expect_err("mid-payload truncation");
1277 assert!(err.to_string().contains("truncated"), "got: {err}");
1278 drop(sink);
1279
1280 assert_eq!(b.get_object(&ids[0]).unwrap(), payloads[0]);
1283 assert!(!StreamStore::has_object(&b, &ids[1]).unwrap());
1284 assert!(
1285 !files_under(b_dir.path())
1286 .iter()
1287 .any(|p| p.to_string_lossy().ends_with(".tmp")),
1288 "partial payload temp must be removed"
1289 );
1290 }
1291
1292 #[test]
1295 fn pack_duplicate_object_records_are_idempotent_write_once() {
1296 let payload = b"duplicated payload\n".to_vec();
1297 let checksum = hex_of(&payload);
1298 let stream = raw_stream(&[
1299 raw_record("obj", &checksum, &payload),
1300 raw_record("obj", &checksum, &payload),
1301 ]);
1302
1303 let b_dir = TempDir::new("dup");
1304 let b = FileStore::from_root(b_dir.path());
1305 let mut sink = FileSink::new(&b);
1306 let read = read_pack(stream.as_slice(), &mut sink).expect("duplicates are fine");
1307 assert_eq!(read.objects_written, 1, "write-once");
1308 assert_eq!(
1309 read.objects_skipped, 1,
1310 "second record verified-but-skipped"
1311 );
1312 assert_eq!(b.get_object(&checksum).unwrap(), payload);
1313 }
1314
1315 #[test]
1316 fn pack_preseeded_object_is_skipped_but_verified() {
1317 let payload = b"already in the store\n".to_vec();
1320 let (_b_dir, b, ids) = seed_store("preseed", std::slice::from_ref(&payload));
1321
1322 let stream = raw_stream(&[raw_record("obj", &ids[0], &payload)]);
1323 let mut sink = FileSink::new(&b);
1324 let read = read_pack(stream.as_slice(), &mut sink).expect("read_pack");
1325 assert_eq!(read.objects_written, 0);
1326 assert_eq!(read.objects_skipped, 1);
1327 assert_eq!(b.get_object(&ids[0]).unwrap(), payload);
1328 }
1329
1330 #[test]
1333 fn pack_rejects_record_after_manifest() {
1334 let payload = b"object after manifest\n".to_vec();
1335 let (manifest, man_id) = manifest_for(std::slice::from_ref(&payload));
1336 let stream = raw_stream(&[
1337 raw_record("manifest", &man_id, &manifest_bytes(&manifest)),
1338 raw_record("obj", &hex_of(&payload), &payload),
1339 ]);
1340
1341 let b_dir = TempDir::new("after-man");
1342 let b = FileStore::from_root(b_dir.path());
1343 let mut sink = FileSink::new(&b);
1344 let err = read_pack(stream.as_slice(), &mut sink).expect_err("must reject");
1345 assert!(err.to_string().contains("after the manifest"), "got: {err}");
1346 assert!(matches!(
1348 b.get_manifest(&man_id),
1349 Err(StoreError::ManifestNotFound { .. })
1350 ));
1351 }
1352
1353 #[test]
1354 fn pack_manifest_payload_must_hash_to_claimed_id() {
1355 let (manifest, _real_id) = manifest_for(&[b"whatever\n".to_vec()]);
1357 let wrong_id = hex_of(b"some other bytes");
1358 let stream = raw_stream(&[raw_record(
1359 "manifest",
1360 &wrong_id,
1361 &manifest_bytes(&manifest),
1362 )]);
1363 let b_dir = TempDir::new("man-bad-id");
1364 let b = FileStore::from_root(b_dir.path());
1365 let mut sink = FileSink::new(&b);
1366 let err = read_pack(stream.as_slice(), &mut sink).expect_err("must reject");
1367 assert!(matches!(err, StoreError::Integrity { .. }));
1368 assert!(matches!(
1369 b.get_manifest(&wrong_id),
1370 Err(StoreError::ManifestNotFound { .. })
1371 ));
1372
1373 let garbage = b"not a manifest at all\n".to_vec();
1376 let garbage_id = hex_of(&garbage);
1377 let stream = raw_stream(&[raw_record("manifest", &garbage_id, &garbage)]);
1378 let mut sink = FileSink::new(&b);
1379 assert!(read_pack(stream.as_slice(), &mut sink).is_err());
1380 assert!(matches!(
1381 b.get_manifest(&garbage_id),
1382 Err(StoreError::ManifestNotFound { .. })
1383 ));
1384 }
1385
1386 #[test]
1387 fn pack_rejects_manifest_over_64mib_cap() {
1388 let big_len: u64 = MAX_MANIFEST_BYTES + 1;
1389 let claimed = hex_of(b"irrelevant");
1390 let mut stream = WIRE_MAGIC.as_bytes().to_vec();
1391 stream.extend_from_slice(format!("manifest {claimed} {big_len}\n").as_bytes());
1392 let b_dir = TempDir::new("man-cap");
1394 let b = FileStore::from_root(b_dir.path());
1395 let mut sink = FileSink::new(&b);
1396 let err = read_pack(stream.as_slice(), &mut sink).expect_err("must reject");
1397 assert!(err.to_string().contains("cap"), "got: {err}");
1398 }
1399
1400 #[test]
1403 fn pack_write_missing_object_aborts_before_end() {
1404 let payloads = vec![b"present\n".to_vec()];
1405 let (_a_dir, a, mut ids) = seed_store("wmiss-a", &payloads);
1406 let absent = hex_of(b"never stored");
1408 ids.push(absent.clone());
1409
1410 let mut pack = Vec::new();
1411 let err = write_pack(&a, &ids, None, &mut pack).expect_err("missing object");
1412 assert!(matches!(err, StoreError::ObjectNotFound { .. }));
1413 assert!(!pack.ends_with(b"end\n"));
1415 let b_dir = TempDir::new("wmiss-b");
1416 let b = FileStore::from_root(b_dir.path());
1417 let mut sink = FileSink::new(&b);
1418 assert!(read_pack(pack.as_slice(), &mut sink).is_err());
1419 }
1420
1421 #[test]
1422 fn pack_write_invalid_id_emits_nothing() {
1423 let (_a_dir, a, _ids) = seed_store("winv-a", &[b"x\n".to_vec()]);
1424 let mut pack = Vec::new();
1425 let err = write_pack(&a, &["NOT-HEX".to_owned()], None, &mut pack).expect_err("invalid id");
1426 assert!(
1427 err.to_string().contains("invalid object checksum"),
1428 "got: {err}"
1429 );
1430 assert!(pack.is_empty(), "fail closed: not a single byte written");
1431
1432 let mut pack = Vec::new();
1434 let err = write_pack(&a, &[], Some("zz"), &mut pack).expect_err("invalid manifest id");
1435 assert!(
1436 err.to_string().contains("invalid manifest id"),
1437 "got: {err}"
1438 );
1439 assert!(pack.is_empty());
1440 }
1441
1442 #[test]
1443 fn pack_write_emits_records_in_input_order() {
1444 let payloads = vec![b"bbb\n".to_vec(), b"aaa\n".to_vec(), b"ccc\n".to_vec()];
1445 let (_a_dir, a, ids) = seed_store("order-a", &payloads);
1446 let mut pack = Vec::new();
1447 write_pack(&a, &ids, None, &mut pack).expect("write_pack");
1448 let text = String::from_utf8_lossy(&pack);
1449 let positions: Vec<usize> = ids
1450 .iter()
1451 .map(|id| text.find(id.as_str()).expect("record present"))
1452 .collect();
1453 let mut sorted = positions.clone();
1454 sorted.sort_unstable();
1455 assert_eq!(positions, sorted, "obj records keep input order");
1456 }
1457
1458 #[test]
1461 fn pack_objects_needed_returns_absent_subset_in_input_order() {
1462 let p1 = b"seeded one\n".to_vec();
1463 let p3 = b"seeded three\n".to_vec();
1464 let (_dir, store, seeded) = seed_store("needed", &[p1, p3]);
1465 let absent_a = hex_of(b"absent a");
1466 let absent_b = hex_of(b"absent b");
1467
1468 let list = vec![
1470 seeded[0].clone(),
1471 absent_a.clone(),
1472 seeded[1].clone(),
1473 absent_b.clone(),
1474 ];
1475 let needed = store.objects_needed(&list).expect("objects_needed");
1476 assert_eq!(needed, vec![absent_a.clone(), absent_b.clone()]);
1477
1478 let list = vec![absent_b.clone(), absent_a.clone(), absent_b.clone()];
1481 let needed = store.objects_needed(&list).expect("objects_needed");
1482 assert_eq!(needed, vec![absent_b.clone(), absent_a, absent_b]);
1483
1484 assert_eq!(
1486 store.objects_needed(&seeded).expect("ok"),
1487 Vec::<String>::new()
1488 );
1489 }
1490
1491 #[test]
1492 fn pack_objects_needed_invalid_checksum_fails_closed() {
1493 let (_dir, store, seeded) = seed_store("needed-bad", &[b"x\n".to_vec()]);
1494 let valid_absent = hex_of(b"absent");
1495 for bad in [
1496 "UPPERCASE0000000000000000000000000000000000000000000000000000AA".to_owned(),
1497 "0123456789abcdef".repeat(4)[..63].to_owned(),
1498 format!("{}0", "0123456789abcdef".repeat(4)),
1499 "not hex at all".to_owned(),
1500 String::new(),
1501 ] {
1502 let list = vec![seeded[0].clone(), valid_absent.clone(), bad.clone()];
1505 let err = store.objects_needed(&list).expect_err("must fail closed");
1506 assert!(
1507 err.to_string().contains("invalid object checksum"),
1508 "checksum {bad:?}: got {err}"
1509 );
1510 }
1511 }
1512}