1use std::{
3 fs::{self, File, OpenOptions},
4 io::{Read, Write},
5 path::{Path, PathBuf},
6 time::{SystemTime, UNIX_EPOCH},
7};
8
9use objects::store::{
10 CompressionConfig, ObjectStore,
11 pack::{PackBuilder, PackObjectId, StreamingPackBuilder},
12};
13
14use crate::{
15 ObjectData, ObjectId, ObjectInfo, ObjectType, ProtocolError, Result, load_object_data,
16};
17
18pub const MAX_RECEIVED_PACK_SIZE: u64 = 2 * 1024 * 1024 * 1024;
29
30pub const MAX_RECEIVED_PACK_INDEX_SIZE: u64 = 256 * 1024 * 1024;
36
37pub const MAX_RECEIVED_GIT_PACK_SIZE: u64 = 2 * 1024 * 1024 * 1024;
44
45#[derive(Debug, Clone)]
46pub struct NativePackBundle {
47 pub pack_data: Vec<u8>,
48 pub index_data: Vec<u8>,
49}
50
51#[derive(Debug)]
52pub struct NativePackFileBundle {
53 dir: PathBuf,
54 pub pack_path: PathBuf,
55 pub index_path: PathBuf,
56 pub pack_len: u64,
57 pub index_len: u64,
58}
59
60impl Drop for NativePackFileBundle {
61 fn drop(&mut self) {
62 let _ = fs::remove_dir_all(&self.dir);
63 }
64}
65
66#[derive(Debug)]
67pub struct PackFileChunkReader {
68 file: File,
69 total_len: u64,
70 chunk_size: usize,
71 offset: u64,
72 chunk_index: u32,
73}
74
75pub type NativePackFileChunk = (u64, u32, Vec<u8>, bool);
76
77impl PackFileChunkReader {
78 pub fn open(path: &Path, chunk_size: usize) -> Result<Self> {
79 let file = File::open(path)?;
80 let total_len = file.metadata()?.len();
81 Ok(Self {
82 file,
83 total_len,
84 chunk_size: chunk_size.max(1),
85 offset: 0,
86 chunk_index: 0,
87 })
88 }
89
90 pub fn next_chunk(&mut self) -> Result<Option<NativePackFileChunk>> {
91 if self.offset >= self.total_len {
92 return Ok(None);
93 }
94 let remaining = self.total_len - self.offset;
95 let len = remaining.min(self.chunk_size as u64);
96 let len = usize::try_from(len).map_err(|_| {
97 ProtocolError::InvalidState("native pack file chunk length exceeds usize".to_string())
98 })?;
99 let mut data = vec![0u8; len];
100 self.file.read_exact(&mut data)?;
101
102 let offset = self.offset;
103 let chunk_index = self.chunk_index;
104 self.offset = self.offset.checked_add(len as u64).ok_or_else(|| {
105 ProtocolError::InvalidState("native pack file chunk offset overflow".to_string())
106 })?;
107 self.chunk_index = self.chunk_index.checked_add(1).ok_or_else(|| {
108 ProtocolError::InvalidState("native pack file chunk index overflow".to_string())
109 })?;
110 Ok(Some((
111 offset,
112 chunk_index,
113 data,
114 self.offset == self.total_len,
115 )))
116 }
117}
118
119#[derive(Debug)]
120pub struct GrowingPackChunkReader {
121 file: File,
122 chunk_size: usize,
123 offset: u64,
124 chunk_index: u32,
125}
126
127impl GrowingPackChunkReader {
128 pub fn open(path: &Path, chunk_size: usize) -> Result<Self> {
129 Ok(Self {
130 file: File::open(path)?,
131 chunk_size: chunk_size.max(1),
132 offset: 0,
133 chunk_index: 0,
134 })
135 }
136
137 pub fn next_available_chunk(
138 &mut self,
139 final_stream: bool,
140 ) -> Result<Option<NativePackFileChunk>> {
141 let total_len = self.file.metadata()?.len();
142 if self.offset >= total_len {
143 return Ok(None);
144 }
145 let available = total_len - self.offset;
146 if !final_stream && available < self.chunk_size as u64 {
147 return Ok(None);
148 }
149
150 let len = available.min(self.chunk_size as u64);
151 let len = usize::try_from(len).map_err(|_| {
152 ProtocolError::InvalidState(
153 "growing native pack chunk length exceeds usize".to_string(),
154 )
155 })?;
156 let mut data = vec![0u8; len];
157 self.file.read_exact(&mut data)?;
158
159 let offset = self.offset;
160 let chunk_index = self.chunk_index;
161 self.offset = self.offset.checked_add(len as u64).ok_or_else(|| {
162 ProtocolError::InvalidState("growing native pack chunk offset overflow".to_string())
163 })?;
164 self.chunk_index = self.chunk_index.checked_add(1).ok_or_else(|| {
165 ProtocolError::InvalidState("growing native pack chunk index overflow".to_string())
166 })?;
167 Ok(Some((
168 offset,
169 chunk_index,
170 data,
171 final_stream && self.offset == total_len,
172 )))
173 }
174}
175
176pub struct NativePackStreamingWriter {
177 dir: Option<PathBuf>,
178 pack_path: PathBuf,
179 index_path: PathBuf,
180 builder: Option<StreamingPackBuilder<File>>,
181}
182
183impl NativePackStreamingWriter {
184 pub fn new_in(root: &Path, object_count: u64) -> Result<Self> {
185 let base = root.join("transfer-spool");
186 fs::create_dir_all(&base)?;
187 let dir = unique_spool_dir(&base)?;
188 let pack_path = dir.join("pack");
189 let index_path = dir.join("idx");
190 let bucket_dir = dir.join("buckets");
191 let pack_file = OpenOptions::new()
192 .read(true)
193 .write(true)
194 .create_new(true)
195 .open(&pack_path)?;
196 let builder = StreamingPackBuilder::new_with_object_count(
197 pack_file,
198 index_path.clone(),
199 sync_pack_compression(),
200 bucket_dir,
201 object_count,
202 )
203 .map_err(ProtocolError::from)?;
204
205 Ok(Self {
206 dir: Some(dir),
207 pack_path,
208 index_path,
209 builder: Some(builder),
210 })
211 }
212
213 pub fn pack_path(&self) -> &Path {
214 &self.pack_path
215 }
216
217 pub fn index_path(&self) -> &Path {
218 &self.index_path
219 }
220
221 pub fn add_object_data(&mut self, object: ObjectData) -> Result<()> {
222 if !is_native_packable_object_type(object.obj_type) {
223 return Err(ProtocolError::InvalidState(format!(
224 "{:?} sidecar records cannot be packed into the content-addressed object pack",
225 object.obj_type
226 )));
227 }
228 let builder = self.builder.as_mut().ok_or_else(|| {
229 ProtocolError::InvalidState("native pack streaming writer is finalized".to_string())
230 })?;
231 let pack_id = to_pack_object_id(&object.id);
232 builder
233 .add_id(pack_id, object.obj_type.pack_object_type()?, object.data)
234 .map_err(ProtocolError::from)
235 }
236
237 pub fn flush_pack(&mut self) -> Result<()> {
238 let builder = self.builder.as_mut().ok_or_else(|| {
239 ProtocolError::InvalidState("native pack streaming writer is finalized".to_string())
240 })?;
241 builder.flush_pack().map_err(ProtocolError::from)
242 }
243
244 pub fn finish(mut self) -> Result<NativePackFileBundle> {
245 let builder = self.builder.take().ok_or_else(|| {
246 ProtocolError::InvalidState("native pack streaming writer is finalized".to_string())
247 })?;
248 let (file, _) = builder.finalize().map_err(ProtocolError::from)?;
249 file.sync_all()?;
250 drop(file);
251 let pack_len = fs::metadata(&self.pack_path)?.len();
252 let index_len = fs::metadata(&self.index_path)?.len();
253 let dir = self.dir.take().ok_or_else(|| {
254 ProtocolError::InvalidState("native pack streaming writer lost spool dir".to_string())
255 })?;
256 Ok(NativePackFileBundle {
257 dir,
258 pack_path: self.pack_path.clone(),
259 index_path: self.index_path.clone(),
260 pack_len,
261 index_len,
262 })
263 }
264}
265
266impl Drop for NativePackStreamingWriter {
267 fn drop(&mut self) {
268 if let Some(dir) = self.dir.take() {
269 let _ = fs::remove_dir_all(dir);
270 }
271 }
272}
273
274#[derive(Debug, Default, Clone)]
275pub struct PackChunkState {
276 pub pack_data: Vec<u8>,
277 pub index_data: Vec<u8>,
278 pack_progress: (u64, u32),
279 index_progress: (u64, u32),
280 pack_complete: bool,
281 index_complete: bool,
282}
283
284impl PackChunkState {
285 pub fn is_complete(&self) -> bool {
286 self.pack_complete && self.index_complete
287 }
288}
289
290#[derive(Debug, Default, Clone)]
291pub struct GitPackChunkState {
292 transfer_id: Option<String>,
293 pack_size: Option<u64>,
294 next_offset: u64,
295 next_chunk_index: u32,
296 pack_data: Vec<u8>,
297}
298
299impl GitPackChunkState {
300 pub fn is_idle(&self) -> bool {
301 self.transfer_id.is_none()
302 && self.pack_size.is_none()
303 && self.next_offset == 0
304 && self.next_chunk_index == 0
305 && self.pack_data.is_empty()
306 }
307
308 pub fn ensure_idle(&self) -> Result<()> {
309 if self.is_idle() {
310 Ok(())
311 } else {
312 Err(ProtocolError::InvalidState(
313 "Git pack transfer ended before final chunk".to_string(),
314 ))
315 }
316 }
317
318 pub fn receive_chunk(
319 &mut self,
320 transfer_id: &str,
321 offset: u64,
322 chunk_index: u32,
323 is_final_chunk: bool,
324 pack_size: u64,
325 data: &[u8],
326 ) -> Result<Option<Vec<u8>>> {
327 if transfer_id.is_empty() {
328 return Err(ProtocolError::InvalidState(
329 "Git pack transfer_id is required".to_string(),
330 ));
331 }
332 if pack_size > MAX_RECEIVED_GIT_PACK_SIZE {
333 return Err(ProtocolError::InvalidState(format!(
334 "Git pack exceeds maximum transfer size of {MAX_RECEIVED_GIT_PACK_SIZE} bytes"
335 )));
336 }
337 if data.is_empty() {
338 return Err(ProtocolError::InvalidState(
339 "Git pack chunk must not be empty".to_string(),
340 ));
341 }
342 match self.transfer_id.as_ref() {
343 Some(current) if current != transfer_id => {
344 return Err(ProtocolError::InvalidState(format!(
345 "Git pack transfer id changed from {current:?} to {transfer_id:?}"
346 )));
347 }
348 Some(_) => {}
349 None => {
350 self.transfer_id = Some(transfer_id.to_string());
351 self.pack_size = Some(pack_size);
352 }
353 }
354 if self.pack_size != Some(pack_size) {
355 return Err(ProtocolError::InvalidState(
356 "Git pack size changed during transfer".to_string(),
357 ));
358 }
359 if offset != self.next_offset {
360 return Err(ProtocolError::InvalidState(format!(
361 "Git pack offset mismatch: expected {}, got {}",
362 self.next_offset, offset
363 )));
364 }
365 if chunk_index != self.next_chunk_index {
366 return Err(ProtocolError::InvalidState(format!(
367 "Git pack chunk index mismatch: expected {}, got {}",
368 self.next_chunk_index, chunk_index
369 )));
370 }
371 let chunk_len = u64::try_from(data.len()).map_err(|_| {
372 ProtocolError::InvalidState("Git pack chunk length exceeds u64".to_string())
373 })?;
374 let next_offset = self
375 .next_offset
376 .checked_add(chunk_len)
377 .ok_or_else(|| ProtocolError::InvalidState("Git pack offset overflow".to_string()))?;
378 if next_offset > pack_size {
379 return Err(ProtocolError::InvalidState(
380 "Git pack chunk exceeds declared pack size".to_string(),
381 ));
382 }
383 self.pack_data.extend_from_slice(data);
384 self.next_offset = next_offset;
385 self.next_chunk_index = self.next_chunk_index.checked_add(1).ok_or_else(|| {
386 ProtocolError::InvalidState("Git pack chunk index overflow".to_string())
387 })?;
388 if is_final_chunk {
389 if self.next_offset != pack_size {
390 return Err(ProtocolError::InvalidState(format!(
391 "Git pack final size mismatch: declared {}, received {}",
392 pack_size, self.next_offset
393 )));
394 }
395 let pack_data = std::mem::take(&mut self.pack_data);
396 self.transfer_id = None;
397 self.pack_size = None;
398 self.next_offset = 0;
399 self.next_chunk_index = 0;
400 return Ok(Some(pack_data));
401 }
402 if self.next_offset == pack_size {
403 return Err(ProtocolError::InvalidState(
404 "Git pack reached declared size without final chunk marker".to_string(),
405 ));
406 }
407 Ok(None)
408 }
409}
410
411#[derive(Debug)]
412pub struct PackChunkSpool {
413 dir: PathBuf,
414 pack: PackStreamSpool,
415 index: PackStreamSpool,
416}
417
418impl PackChunkSpool {
419 pub fn new_in(root: &Path) -> Result<Self> {
420 let base = root.join("transfer-spool");
421 fs::create_dir_all(&base)?;
422 let dir = unique_spool_dir(&base)?;
423 let pack = PackStreamSpool::new(dir.join("pack"))?;
424 let index = PackStreamSpool::new(dir.join("idx"))?;
425 Ok(Self { dir, pack, index })
426 }
427
428 pub fn is_complete(&self) -> bool {
429 self.pack.complete && self.index.complete
430 }
431
432 #[allow(clippy::too_many_arguments)]
433 pub fn receive_chunk(
434 &mut self,
435 is_index: bool,
436 resume_offset: u64,
437 chunk_index: u32,
438 is_complete: bool,
439 data: &[u8],
440 is_final_chunk: bool,
441 ) -> Result<()> {
442 let max_bytes = if is_index {
443 MAX_RECEIVED_PACK_INDEX_SIZE
444 } else {
445 MAX_RECEIVED_PACK_SIZE
446 };
447 let stream = if is_index {
448 &mut self.index
449 } else {
450 &mut self.pack
451 };
452 receive_pack_chunk_to_spool(
453 stream,
454 is_index,
455 resume_offset,
456 chunk_index,
457 is_complete,
458 data,
459 is_final_chunk,
460 max_bytes,
461 )
462 }
463
464 pub fn install_into(&mut self, store: &impl ObjectStore) -> Result<Vec<PackObjectId>> {
465 if !self.is_complete() {
466 return Err(ProtocolError::InvalidState(
467 "native pack spool is incomplete".to_string(),
468 ));
469 }
470 self.pack.close()?;
471 self.index.close()?;
472 store
473 .install_pack_streaming(&self.pack.path, &self.index.path)
474 .map_err(ProtocolError::from)
475 }
476}
477
478impl Drop for PackChunkSpool {
479 fn drop(&mut self) {
480 let _ = fs::remove_dir_all(&self.dir);
481 }
482}
483
484#[derive(Debug)]
485struct PackStreamSpool {
486 path: PathBuf,
487 file: Option<File>,
488 progress: (u64, u32),
489 complete: bool,
490}
491
492impl PackStreamSpool {
493 fn new(path: PathBuf) -> Result<Self> {
494 let file = File::create(&path)?;
495 Ok(Self {
496 path,
497 file: Some(file),
498 progress: (0, 0),
499 complete: false,
500 })
501 }
502
503 fn write_all(&mut self, data: &[u8]) -> Result<()> {
504 let Some(file) = self.file.as_mut() else {
505 return Err(ProtocolError::InvalidState(
506 "native pack spool stream is already closed".to_string(),
507 ));
508 };
509 file.write_all(data)?;
510 Ok(())
511 }
512
513 fn close(&mut self) -> Result<()> {
514 if let Some(mut file) = self.file.take() {
515 file.flush()?;
516 file.sync_all()?;
517 }
518 Ok(())
519 }
520}
521
522pub fn native_pack_excluded_object_types() -> &'static [ObjectType] {
523 &[ObjectType::Redaction, ObjectType::StateVisibility]
524}
525
526pub fn is_native_packable_object_type(obj_type: ObjectType) -> bool {
527 obj_type.packable()
528}
529
530pub fn build_native_pack(
531 store: &impl ObjectStore,
532 objects: &[ObjectInfo],
533) -> Result<NativePackBundle> {
534 let mut builder = PackBuilder::new(sync_pack_compression());
535
536 for info in objects {
537 if !is_native_packable_object_type(info.obj_type) {
543 continue;
544 }
545 let object = load_object_data(store, &info.id, info.obj_type)?;
546 let pack_id = to_pack_object_id(&object.id);
547 builder.add_id(pack_id, object.obj_type.pack_object_type()?, object.data);
548 }
549
550 let (pack_data, index_data, _) = builder.build()?;
551 Ok(NativePackBundle {
552 pack_data,
553 index_data,
554 })
555}
556
557fn sync_pack_compression() -> CompressionConfig {
558 CompressionConfig {
559 level: 1,
560 min_size: 1024,
561 max_delta_size: 0,
562 ..CompressionConfig::default()
563 }
564}
565
566pub fn install_received_pack(
567 store: &impl ObjectStore,
568 pack_data: &[u8],
569 index_data: &[u8],
570) -> Result<Vec<PackObjectId>> {
571 store
572 .install_pack(pack_data, index_data)
573 .map_err(ProtocolError::from)
574}
575
576pub fn next_pack_chunk(
577 data: &[u8],
578 chunk_size: usize,
579 chunk_index: usize,
580) -> Option<(usize, Vec<u8>, bool)> {
581 let (start, len) = crate::chunk_bounds(data.len(), chunk_size.max(1), chunk_index)?;
582 let is_final = start + len == data.len();
583 Some((start, data[start..start + len].to_vec(), is_final))
584}
585
586pub fn receive_pack_chunk(
587 state: &mut PackChunkState,
588 is_index: bool,
589 resume_offset: u64,
590 chunk_index: u32,
591 is_complete: bool,
592 data: &[u8],
593 is_final_chunk: bool,
594) -> Result<()> {
595 let max_bytes = if is_index {
596 MAX_RECEIVED_PACK_INDEX_SIZE
597 } else {
598 MAX_RECEIVED_PACK_SIZE
599 };
600 receive_pack_chunk_with_limit(
601 state,
602 is_index,
603 resume_offset,
604 chunk_index,
605 is_complete,
606 data,
607 is_final_chunk,
608 max_bytes,
609 )
610}
611
612#[allow(clippy::too_many_arguments)]
613fn receive_pack_chunk_with_limit(
614 state: &mut PackChunkState,
615 is_index: bool,
616 resume_offset: u64,
617 chunk_index: u32,
618 is_complete: bool,
619 data: &[u8],
620 is_final_chunk: bool,
621 max_bytes: u64,
622) -> Result<()> {
623 let (buffer, progress, complete) = if is_index {
624 (
625 &mut state.index_data,
626 &mut state.index_progress,
627 &mut state.index_complete,
628 )
629 } else {
630 (
631 &mut state.pack_data,
632 &mut state.pack_progress,
633 &mut state.pack_complete,
634 )
635 };
636
637 let next_progress = validate_pack_chunk(
638 *progress,
639 is_index,
640 resume_offset,
641 chunk_index,
642 data,
643 max_bytes,
644 )?;
645
646 buffer.extend_from_slice(data);
647 *progress = next_progress;
648 if is_final_chunk || is_complete {
649 *complete = true;
650 }
651 Ok(())
652}
653
654#[allow(clippy::too_many_arguments)]
655fn receive_pack_chunk_to_spool(
656 stream: &mut PackStreamSpool,
657 is_index: bool,
658 resume_offset: u64,
659 chunk_index: u32,
660 is_complete: bool,
661 data: &[u8],
662 is_final_chunk: bool,
663 max_bytes: u64,
664) -> Result<()> {
665 let next_progress = validate_pack_chunk(
666 stream.progress,
667 is_index,
668 resume_offset,
669 chunk_index,
670 data,
671 max_bytes,
672 )?;
673 stream.write_all(data)?;
674 stream.progress = next_progress;
675 if is_final_chunk || is_complete {
676 stream.complete = true;
677 }
678 Ok(())
679}
680
681fn validate_pack_chunk(
682 progress: (u64, u32),
683 is_index: bool,
684 resume_offset: u64,
685 chunk_index: u32,
686 data: &[u8],
687 max_bytes: u64,
688) -> Result<(u64, u32)> {
689 if resume_offset != progress.0 {
690 return Err(ProtocolError::InvalidState(format!(
691 "native pack chunk resume offset mismatch: expected {}, got {}",
692 progress.0, resume_offset
693 )));
694 }
695 if chunk_index != progress.1 {
696 return Err(ProtocolError::InvalidState(format!(
697 "native pack chunk index mismatch: expected {}, got {}",
698 progress.1, chunk_index
699 )));
700 }
701
702 let data_len = u64::try_from(data.len()).map_err(|_| {
703 ProtocolError::InvalidState("native pack chunk length does not fit in u64".to_string())
704 })?;
705 let next_offset = progress.0.checked_add(data_len).ok_or_else(|| {
706 ProtocolError::InvalidState("native pack chunk offset overflow".to_string())
707 })?;
708 if next_offset > max_bytes {
709 let stream_name = if is_index { "index" } else { "body" };
710 return Err(ProtocolError::InvalidState(format!(
711 "native pack {stream_name} exceeds receive size limit: {next_offset} bytes (max {max_bytes})"
712 )));
713 }
714 let next_chunk = progress.1.checked_add(1).ok_or_else(|| {
715 ProtocolError::InvalidState("native pack chunk index overflow".to_string())
716 })?;
717
718 Ok((next_offset, next_chunk))
719}
720
721fn unique_spool_dir(base: &Path) -> Result<PathBuf> {
722 let stamp = SystemTime::now()
723 .duration_since(UNIX_EPOCH)
724 .map_err(|err| {
725 ProtocolError::InvalidState(format!("system clock before UNIX epoch: {err}"))
726 })?
727 .as_nanos();
728 for attempt in 0..100u32 {
729 let dir = base.join(format!("pack-{}-{stamp}-{attempt}", std::process::id()));
730 match fs::create_dir(&dir) {
731 Ok(()) => return Ok(dir),
732 Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => continue,
733 Err(err) => return Err(ProtocolError::Io(err)),
734 }
735 }
736 Err(ProtocolError::InvalidState(
737 "failed to allocate native pack spool directory".to_string(),
738 ))
739}
740
741fn to_pack_object_id(id: &ObjectId) -> PackObjectId {
742 match id {
743 ObjectId::Hash(hash) => PackObjectId::Hash(*hash),
744 ObjectId::ChangeId(change_id) => PackObjectId::ChangeId(*change_id),
745 }
746}
747
748#[cfg(test)]
749mod tests {
750 use objects::{
751 object::Blob,
752 store::{FsStore, ObjectStore, pack::PackObjectId},
753 };
754 use tempfile::TempDir;
755
756 use super::{
757 GitPackChunkState, GrowingPackChunkReader, MAX_RECEIVED_PACK_SIZE,
758 NativePackStreamingWriter, ObjectData, ObjectId, ObjectInfo, ObjectType, PackChunkSpool,
759 PackChunkState, PackFileChunkReader, build_native_pack, install_received_pack,
760 next_pack_chunk, receive_pack_chunk, receive_pack_chunk_with_limit,
761 };
762
763 fn create_test_store() -> (TempDir, FsStore) {
764 let temp = TempDir::new().unwrap();
765 let store = FsStore::new(temp.path().join(".heddle"));
766 store.init().unwrap();
767 (temp, store)
768 }
769
770 #[test]
771 fn receive_pack_chunk_rejects_cumulative_size_over_limit_before_buffering() {
772 let mut state = PackChunkState::default();
773
774 receive_pack_chunk_with_limit(&mut state, false, 0, 0, false, b"abcd", false, 8).unwrap();
775 receive_pack_chunk_with_limit(&mut state, false, 4, 1, false, b"efgh", false, 8).unwrap();
776
777 let error = receive_pack_chunk_with_limit(&mut state, false, 8, 2, false, b"i", false, 8)
778 .unwrap_err();
779
780 assert_eq!(state.pack_data, b"abcdefgh");
781 assert!(
782 error
783 .to_string()
784 .contains("native pack body exceeds receive size limit")
785 );
786 assert!(error.to_string().contains("9 bytes (max 8)"));
787 }
788
789 #[test]
790 fn receive_pack_chunk_checks_production_limit_before_extending_buffer() {
791 let mut state = PackChunkState {
792 pack_progress: (MAX_RECEIVED_PACK_SIZE - 1, 0),
793 ..PackChunkState::default()
794 };
795
796 let error = receive_pack_chunk(
797 &mut state,
798 false,
799 MAX_RECEIVED_PACK_SIZE - 1,
800 0,
801 false,
802 b"xx",
803 false,
804 )
805 .unwrap_err();
806
807 assert!(state.pack_data.is_empty());
808 assert!(
809 error
810 .to_string()
811 .contains("native pack body exceeds receive size limit")
812 );
813 }
814
815 #[test]
816 fn receive_pack_chunk_rejects_resume_offset_mismatch_before_buffering() {
817 let mut state = PackChunkState::default();
818
819 let error =
820 receive_pack_chunk(&mut state, false, 1, 0, false, b"late chunk", false).unwrap_err();
821
822 assert!(state.pack_data.is_empty());
823 assert!(
824 error
825 .to_string()
826 .contains("native pack chunk resume offset mismatch: expected 0, got 1")
827 );
828 }
829
830 #[test]
831 fn receive_pack_chunk_rejects_chunk_index_mismatch_before_buffering() {
832 let mut state = PackChunkState::default();
833
834 receive_pack_chunk(&mut state, false, 0, 0, false, b"abc", false).unwrap();
835 let error = receive_pack_chunk(&mut state, false, 3, 2, false, b"def", false).unwrap_err();
836
837 assert_eq!(state.pack_data, b"abc");
838 assert!(
839 error
840 .to_string()
841 .contains("native pack chunk index mismatch: expected 1, got 2")
842 );
843 }
844
845 #[test]
846 fn git_pack_chunk_state_requires_ordered_chunks_and_final_size() {
847 let mut state = GitPackChunkState::default();
848
849 assert!(
850 state
851 .receive_chunk("git-pack:test", 0, 0, false, 8, b"abcd")
852 .unwrap()
853 .is_none()
854 );
855 let error = state
856 .receive_chunk("git-pack:test", 4, 2, true, 8, b"efgh")
857 .unwrap_err();
858
859 assert!(
860 error
861 .to_string()
862 .contains("Git pack chunk index mismatch: expected 1, got 2")
863 );
864 assert!(state.ensure_idle().is_err());
865
866 let mut state = GitPackChunkState::default();
867 state
868 .receive_chunk("git-pack:test", 0, 0, false, 8, b"abcd")
869 .unwrap();
870 let complete = state
871 .receive_chunk("git-pack:test", 4, 1, true, 8, b"efgh")
872 .unwrap()
873 .unwrap();
874
875 assert_eq!(complete, b"abcdefgh");
876 assert!(state.ensure_idle().is_ok());
877 }
878
879 #[test]
880 fn receive_pack_chunk_accepts_completion_flags_for_pack_and_index() {
881 let mut state = PackChunkState::default();
882
883 receive_pack_chunk(&mut state, false, 0, 0, true, b"pack-body", false).unwrap();
884 assert!(!state.is_complete());
885 receive_pack_chunk(&mut state, true, 0, 0, false, b"pack-index", true).unwrap();
886
887 assert!(state.is_complete());
888 assert_eq!(state.pack_data, b"pack-body");
889 assert_eq!(state.index_data, b"pack-index");
890 }
891
892 #[test]
893 fn normal_size_native_pack_receives_and_installs() {
894 let (_source_temp, source_store) = create_test_store();
895 let (_dest_temp, dest_store) = create_test_store();
896 let blob = Blob::from("native pack receive regression");
897 let hash = source_store.put_blob(&blob).unwrap();
898 let bundle = build_native_pack(
899 &source_store,
900 &[ObjectInfo {
901 id: ObjectId::Hash(hash),
902 obj_type: ObjectType::Blob,
903 size: blob.size() as u64,
904 delta_base: None,
905 }],
906 )
907 .unwrap();
908
909 let mut state = PackChunkState::default();
910 let mut chunk_index = 0usize;
911 while let Some((start, data, is_final)) = next_pack_chunk(&bundle.pack_data, 7, chunk_index)
912 {
913 receive_pack_chunk(
914 &mut state,
915 false,
916 start as u64,
917 chunk_index as u32,
918 is_final,
919 &data,
920 is_final,
921 )
922 .unwrap();
923 chunk_index += 1;
924 }
925
926 let mut index_chunk = 0usize;
927 while let Some((start, data, is_final)) =
928 next_pack_chunk(&bundle.index_data, 5, index_chunk)
929 {
930 receive_pack_chunk(
931 &mut state,
932 true,
933 start as u64,
934 index_chunk as u32,
935 is_final,
936 &data,
937 is_final,
938 )
939 .unwrap();
940 index_chunk += 1;
941 }
942
943 assert!(state.is_complete());
944 assert_eq!(state.pack_data, bundle.pack_data);
945 assert_eq!(state.index_data, bundle.index_data);
946
947 let installed_ids =
948 install_received_pack(&dest_store, &state.pack_data, &state.index_data).unwrap();
949
950 assert_eq!(installed_ids, vec![PackObjectId::Hash(hash)]);
951 let installed_blob = dest_store.get_blob(&hash).unwrap().unwrap();
952 assert_eq!(installed_blob.content(), blob.content());
953 }
954
955 #[test]
956 fn normal_size_native_pack_spools_and_installs() {
957 let (_source_temp, source_store) = create_test_store();
958 let (dest_temp, dest_store) = create_test_store();
959 let blob = Blob::from("native pack spooled receive regression");
960 let hash = source_store.put_blob(&blob).unwrap();
961 let bundle = build_native_pack(
962 &source_store,
963 &[ObjectInfo {
964 id: ObjectId::Hash(hash),
965 obj_type: ObjectType::Blob,
966 size: blob.size() as u64,
967 delta_base: None,
968 }],
969 )
970 .unwrap();
971
972 let mut spool = PackChunkSpool::new_in(dest_temp.path()).unwrap();
973 let mut chunk_index = 0usize;
974 while let Some((start, data, is_final)) = next_pack_chunk(&bundle.pack_data, 7, chunk_index)
975 {
976 spool
977 .receive_chunk(
978 false,
979 start as u64,
980 chunk_index as u32,
981 is_final,
982 &data,
983 is_final,
984 )
985 .unwrap();
986 chunk_index += 1;
987 }
988
989 let mut index_chunk = 0usize;
990 while let Some((start, data, is_final)) =
991 next_pack_chunk(&bundle.index_data, 5, index_chunk)
992 {
993 spool
994 .receive_chunk(
995 true,
996 start as u64,
997 index_chunk as u32,
998 is_final,
999 &data,
1000 is_final,
1001 )
1002 .unwrap();
1003 index_chunk += 1;
1004 }
1005
1006 assert!(spool.is_complete());
1007 let installed_ids = spool.install_into(&dest_store).unwrap();
1008
1009 assert_eq!(installed_ids, vec![PackObjectId::Hash(hash)]);
1010 let installed_blob = dest_store.get_blob(&hash).unwrap().unwrap();
1011 assert_eq!(installed_blob.content(), blob.content());
1012 }
1013
1014 #[test]
1015 fn native_pack_streaming_writer_drains_growing_pack_and_installs() {
1016 let (source_temp, source_store) = create_test_store();
1017 let (dest_temp, dest_store) = create_test_store();
1018 let blob = Blob::from("native pack growing stream regression");
1019 let hash = source_store.put_blob(&blob).unwrap();
1020 let large_blob = Blob::from_slice(&vec![b'z'; 4096]);
1021 let large_hash = source_store.put_blob(&large_blob).unwrap();
1022
1023 let mut writer = NativePackStreamingWriter::new_in(source_temp.path(), 2).unwrap();
1024 let mut pack_reader = GrowingPackChunkReader::open(writer.pack_path(), 31).unwrap();
1025 let mut spool = PackChunkSpool::new_in(dest_temp.path()).unwrap();
1026 let mut saw_interleaved_pack_chunk = false;
1027
1028 for (id, obj_type, data) in [
1029 (
1030 ObjectId::Hash(hash),
1031 ObjectType::Blob,
1032 blob.content().to_vec(),
1033 ),
1034 (
1035 ObjectId::Hash(large_hash),
1036 ObjectType::Blob,
1037 large_blob.content().to_vec(),
1038 ),
1039 ] {
1040 writer
1041 .add_object_data(ObjectData {
1042 id,
1043 obj_type,
1044 data,
1045 is_delta: false,
1046 })
1047 .unwrap();
1048 writer.flush_pack().unwrap();
1049 while let Some((offset, chunk_index, data, is_final)) =
1050 pack_reader.next_available_chunk(false).unwrap()
1051 {
1052 assert!(
1053 !is_final,
1054 "pre-final growing pack drain must not mark chunks final"
1055 );
1056 saw_interleaved_pack_chunk = true;
1057 spool
1058 .receive_chunk(false, offset, chunk_index, false, &data, false)
1059 .unwrap();
1060 }
1061 }
1062
1063 let bundle = writer.finish().unwrap();
1064 let mut saw_final_pack_chunk = false;
1065 while let Some((offset, chunk_index, data, is_final)) =
1066 pack_reader.next_available_chunk(true).unwrap()
1067 {
1068 saw_final_pack_chunk |= is_final;
1069 spool
1070 .receive_chunk(false, offset, chunk_index, is_final, &data, is_final)
1071 .unwrap();
1072 }
1073
1074 let mut index_reader = PackFileChunkReader::open(&bundle.index_path, 17).unwrap();
1075 while let Some((offset, chunk_index, data, is_final)) = index_reader.next_chunk().unwrap() {
1076 spool
1077 .receive_chunk(true, offset, chunk_index, is_final, &data, is_final)
1078 .unwrap();
1079 }
1080
1081 assert!(
1082 saw_interleaved_pack_chunk,
1083 "expected at least one pack chunk before finalize"
1084 );
1085 assert!(
1086 saw_final_pack_chunk,
1087 "expected final pack chunk after finish"
1088 );
1089 assert!(spool.is_complete());
1090 let mut installed_ids = spool.install_into(&dest_store).unwrap();
1091 let mut expected_ids = vec![PackObjectId::Hash(hash), PackObjectId::Hash(large_hash)];
1092 installed_ids.sort();
1093 expected_ids.sort();
1094
1095 assert_eq!(installed_ids, expected_ids);
1096 let installed_blob = dest_store.get_blob(&hash).unwrap().unwrap();
1097 assert_eq!(installed_blob.content(), blob.content());
1098 let installed_large_blob = dest_store.get_blob(&large_hash).unwrap().unwrap();
1099 assert_eq!(installed_large_blob.content(), large_blob.content());
1100 }
1101}