Skip to main content

wire/
native_pack.rs

1// SPDX-License-Identifier: Apache-2.0
2use std::{
3    fs::{self, File, OpenOptions},
4    io::{Read, Write},
5    path::{Path, PathBuf},
6    time::{SystemTime, UNIX_EPOCH},
7};
8
9use objects::store::{
10    CompressionConfig, ObjectStore,
11    pack::{PackBuilder, PackObjectId, StreamingPackBuilder},
12};
13
14use crate::{
15    ObjectData, ObjectId, ObjectInfo, ObjectType, ProtocolError, Result, load_object_data,
16};
17
18/// Maximum hosted native-pack body accepted by the receive primitive.
19///
20/// Native sync packs are produced from bounded state-closure wants and
21/// each decoded pack object is separately capped at 1 GiB in the pack
22/// reader. A 2 GiB compressed pack is materially above normal hosted
23/// sync use while still preventing an untrusted server from growing the
24/// in-memory receive buffer without limit. The receive path can now move
25/// to temp-file spooling plus `install_pack_streaming` — that install API
26/// reports the installed ids the receiver needs, so only the spooling of
27/// the receive buffer itself remains.
28pub const MAX_RECEIVED_PACK_SIZE: u64 = 2 * 1024 * 1024 * 1024;
29
30/// Maximum hosted native-pack index accepted by the receive primitive.
31///
32/// Pack indexes are proportional to object count, not object payload
33/// size. 256 MiB leaves room for millions of entries while bounding the
34/// second in-memory buffer controlled by the remote sender.
35pub const MAX_RECEIVED_PACK_INDEX_SIZE: u64 = 256 * 1024 * 1024;
36
37/// Maximum hosted Git pack accepted by the Git-lane transfer primitive.
38///
39/// Git-overlay sync sends Git-shaped data as raw Git packs. The sender and
40/// receiver still stream those bytes in bounded chunks, but the declared pack
41/// size is untrusted wire input and needs a hard ceiling before buffering or
42/// spooling work begins.
43pub const MAX_RECEIVED_GIT_PACK_SIZE: u64 = 2 * 1024 * 1024 * 1024;
44
45#[derive(Debug, Clone)]
46pub struct NativePackBundle {
47    pub pack_data: Vec<u8>,
48    pub index_data: Vec<u8>,
49}
50
51#[derive(Debug)]
52pub struct NativePackFileBundle {
53    dir: PathBuf,
54    pub pack_path: PathBuf,
55    pub index_path: PathBuf,
56    pub pack_len: u64,
57    pub index_len: u64,
58}
59
60impl Drop for NativePackFileBundle {
61    fn drop(&mut self) {
62        let _ = fs::remove_dir_all(&self.dir);
63    }
64}
65
66#[derive(Debug)]
67pub struct PackFileChunkReader {
68    file: File,
69    total_len: u64,
70    chunk_size: usize,
71    offset: u64,
72    chunk_index: u32,
73}
74
75pub type NativePackFileChunk = (u64, u32, Vec<u8>, bool);
76
77impl PackFileChunkReader {
78    pub fn open(path: &Path, chunk_size: usize) -> Result<Self> {
79        let file = File::open(path)?;
80        let total_len = file.metadata()?.len();
81        Ok(Self {
82            file,
83            total_len,
84            chunk_size: chunk_size.max(1),
85            offset: 0,
86            chunk_index: 0,
87        })
88    }
89
90    pub fn next_chunk(&mut self) -> Result<Option<NativePackFileChunk>> {
91        if self.offset >= self.total_len {
92            return Ok(None);
93        }
94        let remaining = self.total_len - self.offset;
95        let len = remaining.min(self.chunk_size as u64);
96        let len = usize::try_from(len).map_err(|_| {
97            ProtocolError::InvalidState("native pack file chunk length exceeds usize".to_string())
98        })?;
99        let mut data = vec![0u8; len];
100        self.file.read_exact(&mut data)?;
101
102        let offset = self.offset;
103        let chunk_index = self.chunk_index;
104        self.offset = self.offset.checked_add(len as u64).ok_or_else(|| {
105            ProtocolError::InvalidState("native pack file chunk offset overflow".to_string())
106        })?;
107        self.chunk_index = self.chunk_index.checked_add(1).ok_or_else(|| {
108            ProtocolError::InvalidState("native pack file chunk index overflow".to_string())
109        })?;
110        Ok(Some((
111            offset,
112            chunk_index,
113            data,
114            self.offset == self.total_len,
115        )))
116    }
117}
118
119#[derive(Debug)]
120pub struct GrowingPackChunkReader {
121    file: File,
122    chunk_size: usize,
123    offset: u64,
124    chunk_index: u32,
125}
126
127impl GrowingPackChunkReader {
128    pub fn open(path: &Path, chunk_size: usize) -> Result<Self> {
129        Ok(Self {
130            file: File::open(path)?,
131            chunk_size: chunk_size.max(1),
132            offset: 0,
133            chunk_index: 0,
134        })
135    }
136
137    pub fn next_available_chunk(
138        &mut self,
139        final_stream: bool,
140    ) -> Result<Option<NativePackFileChunk>> {
141        let total_len = self.file.metadata()?.len();
142        if self.offset >= total_len {
143            return Ok(None);
144        }
145        let available = total_len - self.offset;
146        if !final_stream && available < self.chunk_size as u64 {
147            return Ok(None);
148        }
149
150        let len = available.min(self.chunk_size as u64);
151        let len = usize::try_from(len).map_err(|_| {
152            ProtocolError::InvalidState(
153                "growing native pack chunk length exceeds usize".to_string(),
154            )
155        })?;
156        let mut data = vec![0u8; len];
157        self.file.read_exact(&mut data)?;
158
159        let offset = self.offset;
160        let chunk_index = self.chunk_index;
161        self.offset = self.offset.checked_add(len as u64).ok_or_else(|| {
162            ProtocolError::InvalidState("growing native pack chunk offset overflow".to_string())
163        })?;
164        self.chunk_index = self.chunk_index.checked_add(1).ok_or_else(|| {
165            ProtocolError::InvalidState("growing native pack chunk index overflow".to_string())
166        })?;
167        Ok(Some((
168            offset,
169            chunk_index,
170            data,
171            final_stream && self.offset == total_len,
172        )))
173    }
174}
175
176pub struct NativePackStreamingWriter {
177    dir: Option<PathBuf>,
178    pack_path: PathBuf,
179    index_path: PathBuf,
180    builder: Option<StreamingPackBuilder<File>>,
181}
182
183impl NativePackStreamingWriter {
184    pub fn new_in(root: &Path, object_count: u64) -> Result<Self> {
185        let base = root.join("transfer-spool");
186        fs::create_dir_all(&base)?;
187        let dir = unique_spool_dir(&base)?;
188        let pack_path = dir.join("pack");
189        let index_path = dir.join("idx");
190        let bucket_dir = dir.join("buckets");
191        let pack_file = OpenOptions::new()
192            .read(true)
193            .write(true)
194            .create_new(true)
195            .open(&pack_path)?;
196        let builder = StreamingPackBuilder::new_with_object_count(
197            pack_file,
198            index_path.clone(),
199            sync_pack_compression(),
200            bucket_dir,
201            object_count,
202        )
203        .map_err(ProtocolError::from)?;
204
205        Ok(Self {
206            dir: Some(dir),
207            pack_path,
208            index_path,
209            builder: Some(builder),
210        })
211    }
212
213    pub fn pack_path(&self) -> &Path {
214        &self.pack_path
215    }
216
217    pub fn index_path(&self) -> &Path {
218        &self.index_path
219    }
220
221    pub fn add_object_data(&mut self, object: ObjectData) -> Result<()> {
222        if !is_native_packable_object_type(object.obj_type) {
223            return Err(ProtocolError::InvalidState(format!(
224                "{:?} sidecar records cannot be packed into the content-addressed object pack",
225                object.obj_type
226            )));
227        }
228        let builder = self.builder.as_mut().ok_or_else(|| {
229            ProtocolError::InvalidState("native pack streaming writer is finalized".to_string())
230        })?;
231        let pack_id = to_pack_object_id(&object.id);
232        builder
233            .add_id(pack_id, object.obj_type.pack_object_type()?, object.data)
234            .map_err(ProtocolError::from)
235    }
236
237    pub fn flush_pack(&mut self) -> Result<()> {
238        let builder = self.builder.as_mut().ok_or_else(|| {
239            ProtocolError::InvalidState("native pack streaming writer is finalized".to_string())
240        })?;
241        builder.flush_pack().map_err(ProtocolError::from)
242    }
243
244    pub fn finish(mut self) -> Result<NativePackFileBundle> {
245        let builder = self.builder.take().ok_or_else(|| {
246            ProtocolError::InvalidState("native pack streaming writer is finalized".to_string())
247        })?;
248        let (file, _) = builder.finalize().map_err(ProtocolError::from)?;
249        file.sync_all()?;
250        drop(file);
251        let pack_len = fs::metadata(&self.pack_path)?.len();
252        let index_len = fs::metadata(&self.index_path)?.len();
253        let dir = self.dir.take().ok_or_else(|| {
254            ProtocolError::InvalidState("native pack streaming writer lost spool dir".to_string())
255        })?;
256        Ok(NativePackFileBundle {
257            dir,
258            pack_path: self.pack_path.clone(),
259            index_path: self.index_path.clone(),
260            pack_len,
261            index_len,
262        })
263    }
264}
265
266impl Drop for NativePackStreamingWriter {
267    fn drop(&mut self) {
268        if let Some(dir) = self.dir.take() {
269            let _ = fs::remove_dir_all(dir);
270        }
271    }
272}
273
274#[derive(Debug, Default, Clone)]
275pub struct PackChunkState {
276    pub pack_data: Vec<u8>,
277    pub index_data: Vec<u8>,
278    pack_progress: (u64, u32),
279    index_progress: (u64, u32),
280    pack_complete: bool,
281    index_complete: bool,
282}
283
284impl PackChunkState {
285    pub fn is_complete(&self) -> bool {
286        self.pack_complete && self.index_complete
287    }
288}
289
290#[derive(Debug, Default, Clone)]
291pub struct GitPackChunkState {
292    transfer_id: Option<String>,
293    pack_size: Option<u64>,
294    next_offset: u64,
295    next_chunk_index: u32,
296    pack_data: Vec<u8>,
297}
298
299impl GitPackChunkState {
300    pub fn is_idle(&self) -> bool {
301        self.transfer_id.is_none()
302            && self.pack_size.is_none()
303            && self.next_offset == 0
304            && self.next_chunk_index == 0
305            && self.pack_data.is_empty()
306    }
307
308    pub fn ensure_idle(&self) -> Result<()> {
309        if self.is_idle() {
310            Ok(())
311        } else {
312            Err(ProtocolError::InvalidState(
313                "Git pack transfer ended before final chunk".to_string(),
314            ))
315        }
316    }
317
318    pub fn receive_chunk(
319        &mut self,
320        transfer_id: &str,
321        offset: u64,
322        chunk_index: u32,
323        is_final_chunk: bool,
324        pack_size: u64,
325        data: &[u8],
326    ) -> Result<Option<Vec<u8>>> {
327        if transfer_id.is_empty() {
328            return Err(ProtocolError::InvalidState(
329                "Git pack transfer_id is required".to_string(),
330            ));
331        }
332        if pack_size > MAX_RECEIVED_GIT_PACK_SIZE {
333            return Err(ProtocolError::InvalidState(format!(
334                "Git pack exceeds maximum transfer size of {MAX_RECEIVED_GIT_PACK_SIZE} bytes"
335            )));
336        }
337        if data.is_empty() {
338            return Err(ProtocolError::InvalidState(
339                "Git pack chunk must not be empty".to_string(),
340            ));
341        }
342        match self.transfer_id.as_ref() {
343            Some(current) if current != transfer_id => {
344                return Err(ProtocolError::InvalidState(format!(
345                    "Git pack transfer id changed from {current:?} to {transfer_id:?}"
346                )));
347            }
348            Some(_) => {}
349            None => {
350                self.transfer_id = Some(transfer_id.to_string());
351                self.pack_size = Some(pack_size);
352            }
353        }
354        if self.pack_size != Some(pack_size) {
355            return Err(ProtocolError::InvalidState(
356                "Git pack size changed during transfer".to_string(),
357            ));
358        }
359        if offset != self.next_offset {
360            return Err(ProtocolError::InvalidState(format!(
361                "Git pack offset mismatch: expected {}, got {}",
362                self.next_offset, offset
363            )));
364        }
365        if chunk_index != self.next_chunk_index {
366            return Err(ProtocolError::InvalidState(format!(
367                "Git pack chunk index mismatch: expected {}, got {}",
368                self.next_chunk_index, chunk_index
369            )));
370        }
371        let chunk_len = u64::try_from(data.len()).map_err(|_| {
372            ProtocolError::InvalidState("Git pack chunk length exceeds u64".to_string())
373        })?;
374        let next_offset = self
375            .next_offset
376            .checked_add(chunk_len)
377            .ok_or_else(|| ProtocolError::InvalidState("Git pack offset overflow".to_string()))?;
378        if next_offset > pack_size {
379            return Err(ProtocolError::InvalidState(
380                "Git pack chunk exceeds declared pack size".to_string(),
381            ));
382        }
383        self.pack_data.extend_from_slice(data);
384        self.next_offset = next_offset;
385        self.next_chunk_index = self.next_chunk_index.checked_add(1).ok_or_else(|| {
386            ProtocolError::InvalidState("Git pack chunk index overflow".to_string())
387        })?;
388        if is_final_chunk {
389            if self.next_offset != pack_size {
390                return Err(ProtocolError::InvalidState(format!(
391                    "Git pack final size mismatch: declared {}, received {}",
392                    pack_size, self.next_offset
393                )));
394            }
395            let pack_data = std::mem::take(&mut self.pack_data);
396            self.transfer_id = None;
397            self.pack_size = None;
398            self.next_offset = 0;
399            self.next_chunk_index = 0;
400            return Ok(Some(pack_data));
401        }
402        if self.next_offset == pack_size {
403            return Err(ProtocolError::InvalidState(
404                "Git pack reached declared size without final chunk marker".to_string(),
405            ));
406        }
407        Ok(None)
408    }
409}
410
411#[derive(Debug)]
412pub struct PackChunkSpool {
413    dir: PathBuf,
414    pack: PackStreamSpool,
415    index: PackStreamSpool,
416}
417
418impl PackChunkSpool {
419    pub fn new_in(root: &Path) -> Result<Self> {
420        let base = root.join("transfer-spool");
421        fs::create_dir_all(&base)?;
422        let dir = unique_spool_dir(&base)?;
423        let pack = PackStreamSpool::new(dir.join("pack"))?;
424        let index = PackStreamSpool::new(dir.join("idx"))?;
425        Ok(Self { dir, pack, index })
426    }
427
428    pub fn is_complete(&self) -> bool {
429        self.pack.complete && self.index.complete
430    }
431
432    #[allow(clippy::too_many_arguments)]
433    pub fn receive_chunk(
434        &mut self,
435        is_index: bool,
436        resume_offset: u64,
437        chunk_index: u32,
438        is_complete: bool,
439        data: &[u8],
440        is_final_chunk: bool,
441    ) -> Result<()> {
442        let max_bytes = if is_index {
443            MAX_RECEIVED_PACK_INDEX_SIZE
444        } else {
445            MAX_RECEIVED_PACK_SIZE
446        };
447        let stream = if is_index {
448            &mut self.index
449        } else {
450            &mut self.pack
451        };
452        receive_pack_chunk_to_spool(
453            stream,
454            is_index,
455            resume_offset,
456            chunk_index,
457            is_complete,
458            data,
459            is_final_chunk,
460            max_bytes,
461        )
462    }
463
464    pub fn install_into(&mut self, store: &impl ObjectStore) -> Result<Vec<PackObjectId>> {
465        if !self.is_complete() {
466            return Err(ProtocolError::InvalidState(
467                "native pack spool is incomplete".to_string(),
468            ));
469        }
470        self.pack.close()?;
471        self.index.close()?;
472        store
473            .install_pack_streaming(&self.pack.path, &self.index.path)
474            .map_err(ProtocolError::from)
475    }
476}
477
478impl Drop for PackChunkSpool {
479    fn drop(&mut self) {
480        let _ = fs::remove_dir_all(&self.dir);
481    }
482}
483
484#[derive(Debug)]
485struct PackStreamSpool {
486    path: PathBuf,
487    file: Option<File>,
488    progress: (u64, u32),
489    complete: bool,
490}
491
492impl PackStreamSpool {
493    fn new(path: PathBuf) -> Result<Self> {
494        let file = File::create(&path)?;
495        Ok(Self {
496            path,
497            file: Some(file),
498            progress: (0, 0),
499            complete: false,
500        })
501    }
502
503    fn write_all(&mut self, data: &[u8]) -> Result<()> {
504        let Some(file) = self.file.as_mut() else {
505            return Err(ProtocolError::InvalidState(
506                "native pack spool stream is already closed".to_string(),
507            ));
508        };
509        file.write_all(data)?;
510        Ok(())
511    }
512
513    fn close(&mut self) -> Result<()> {
514        if let Some(mut file) = self.file.take() {
515            file.flush()?;
516            file.sync_all()?;
517        }
518        Ok(())
519    }
520}
521
522pub fn native_pack_excluded_object_types() -> &'static [ObjectType] {
523    &[ObjectType::Redaction, ObjectType::StateVisibility]
524}
525
526pub fn is_native_packable_object_type(obj_type: ObjectType) -> bool {
527    obj_type.packable()
528}
529
530pub fn build_native_pack(
531    store: &impl ObjectStore,
532    objects: &[ObjectInfo],
533) -> Result<NativePackBundle> {
534    let mut builder = PackBuilder::new(sync_pack_compression());
535
536    for info in objects {
537        // Sidecar records (redaction + state-visibility) live outside
538        // `.heddle/objects/` so GC cannot touch them, and must not be
539        // folded into the content-addressed pack. They ship via the
540        // per-object transfer path instead; callers split them out before
541        // packing.
542        if !is_native_packable_object_type(info.obj_type) {
543            continue;
544        }
545        let object = load_object_data(store, &info.id, info.obj_type)?;
546        let pack_id = to_pack_object_id(&object.id);
547        builder.add_id(pack_id, object.obj_type.pack_object_type()?, object.data);
548    }
549
550    let (pack_data, index_data, _) = builder.build()?;
551    Ok(NativePackBundle {
552        pack_data,
553        index_data,
554    })
555}
556
557fn sync_pack_compression() -> CompressionConfig {
558    CompressionConfig {
559        level: 1,
560        min_size: 1024,
561        max_delta_size: 0,
562        ..CompressionConfig::default()
563    }
564}
565
566pub fn install_received_pack(
567    store: &impl ObjectStore,
568    pack_data: &[u8],
569    index_data: &[u8],
570) -> Result<Vec<PackObjectId>> {
571    store
572        .install_pack(pack_data, index_data)
573        .map_err(ProtocolError::from)
574}
575
576pub fn next_pack_chunk(
577    data: &[u8],
578    chunk_size: usize,
579    chunk_index: usize,
580) -> Option<(usize, Vec<u8>, bool)> {
581    let (start, len) = crate::chunk_bounds(data.len(), chunk_size.max(1), chunk_index)?;
582    let is_final = start + len == data.len();
583    Some((start, data[start..start + len].to_vec(), is_final))
584}
585
586pub fn receive_pack_chunk(
587    state: &mut PackChunkState,
588    is_index: bool,
589    resume_offset: u64,
590    chunk_index: u32,
591    is_complete: bool,
592    data: &[u8],
593    is_final_chunk: bool,
594) -> Result<()> {
595    let max_bytes = if is_index {
596        MAX_RECEIVED_PACK_INDEX_SIZE
597    } else {
598        MAX_RECEIVED_PACK_SIZE
599    };
600    receive_pack_chunk_with_limit(
601        state,
602        is_index,
603        resume_offset,
604        chunk_index,
605        is_complete,
606        data,
607        is_final_chunk,
608        max_bytes,
609    )
610}
611
612#[allow(clippy::too_many_arguments)]
613fn receive_pack_chunk_with_limit(
614    state: &mut PackChunkState,
615    is_index: bool,
616    resume_offset: u64,
617    chunk_index: u32,
618    is_complete: bool,
619    data: &[u8],
620    is_final_chunk: bool,
621    max_bytes: u64,
622) -> Result<()> {
623    let (buffer, progress, complete) = if is_index {
624        (
625            &mut state.index_data,
626            &mut state.index_progress,
627            &mut state.index_complete,
628        )
629    } else {
630        (
631            &mut state.pack_data,
632            &mut state.pack_progress,
633            &mut state.pack_complete,
634        )
635    };
636
637    let next_progress = validate_pack_chunk(
638        *progress,
639        is_index,
640        resume_offset,
641        chunk_index,
642        data,
643        max_bytes,
644    )?;
645
646    buffer.extend_from_slice(data);
647    *progress = next_progress;
648    if is_final_chunk || is_complete {
649        *complete = true;
650    }
651    Ok(())
652}
653
654#[allow(clippy::too_many_arguments)]
655fn receive_pack_chunk_to_spool(
656    stream: &mut PackStreamSpool,
657    is_index: bool,
658    resume_offset: u64,
659    chunk_index: u32,
660    is_complete: bool,
661    data: &[u8],
662    is_final_chunk: bool,
663    max_bytes: u64,
664) -> Result<()> {
665    let next_progress = validate_pack_chunk(
666        stream.progress,
667        is_index,
668        resume_offset,
669        chunk_index,
670        data,
671        max_bytes,
672    )?;
673    stream.write_all(data)?;
674    stream.progress = next_progress;
675    if is_final_chunk || is_complete {
676        stream.complete = true;
677    }
678    Ok(())
679}
680
681fn validate_pack_chunk(
682    progress: (u64, u32),
683    is_index: bool,
684    resume_offset: u64,
685    chunk_index: u32,
686    data: &[u8],
687    max_bytes: u64,
688) -> Result<(u64, u32)> {
689    if resume_offset != progress.0 {
690        return Err(ProtocolError::InvalidState(format!(
691            "native pack chunk resume offset mismatch: expected {}, got {}",
692            progress.0, resume_offset
693        )));
694    }
695    if chunk_index != progress.1 {
696        return Err(ProtocolError::InvalidState(format!(
697            "native pack chunk index mismatch: expected {}, got {}",
698            progress.1, chunk_index
699        )));
700    }
701
702    let data_len = u64::try_from(data.len()).map_err(|_| {
703        ProtocolError::InvalidState("native pack chunk length does not fit in u64".to_string())
704    })?;
705    let next_offset = progress.0.checked_add(data_len).ok_or_else(|| {
706        ProtocolError::InvalidState("native pack chunk offset overflow".to_string())
707    })?;
708    if next_offset > max_bytes {
709        let stream_name = if is_index { "index" } else { "body" };
710        return Err(ProtocolError::InvalidState(format!(
711            "native pack {stream_name} exceeds receive size limit: {next_offset} bytes (max {max_bytes})"
712        )));
713    }
714    let next_chunk = progress.1.checked_add(1).ok_or_else(|| {
715        ProtocolError::InvalidState("native pack chunk index overflow".to_string())
716    })?;
717
718    Ok((next_offset, next_chunk))
719}
720
721fn unique_spool_dir(base: &Path) -> Result<PathBuf> {
722    let stamp = SystemTime::now()
723        .duration_since(UNIX_EPOCH)
724        .map_err(|err| {
725            ProtocolError::InvalidState(format!("system clock before UNIX epoch: {err}"))
726        })?
727        .as_nanos();
728    for attempt in 0..100u32 {
729        let dir = base.join(format!("pack-{}-{stamp}-{attempt}", std::process::id()));
730        match fs::create_dir(&dir) {
731            Ok(()) => return Ok(dir),
732            Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => continue,
733            Err(err) => return Err(ProtocolError::Io(err)),
734        }
735    }
736    Err(ProtocolError::InvalidState(
737        "failed to allocate native pack spool directory".to_string(),
738    ))
739}
740
741fn to_pack_object_id(id: &ObjectId) -> PackObjectId {
742    match id {
743        ObjectId::Hash(hash) => PackObjectId::Hash(*hash),
744        ObjectId::ChangeId(change_id) => PackObjectId::ChangeId(*change_id),
745    }
746}
747
748#[cfg(test)]
749mod tests {
750    use objects::{
751        object::Blob,
752        store::{FsStore, ObjectStore, pack::PackObjectId},
753    };
754    use tempfile::TempDir;
755
756    use super::{
757        GitPackChunkState, GrowingPackChunkReader, MAX_RECEIVED_PACK_SIZE,
758        NativePackStreamingWriter, ObjectData, ObjectId, ObjectInfo, ObjectType, PackChunkSpool,
759        PackChunkState, PackFileChunkReader, build_native_pack, install_received_pack,
760        next_pack_chunk, receive_pack_chunk, receive_pack_chunk_with_limit,
761    };
762
763    fn create_test_store() -> (TempDir, FsStore) {
764        let temp = TempDir::new().unwrap();
765        let store = FsStore::new(temp.path().join(".heddle"));
766        store.init().unwrap();
767        (temp, store)
768    }
769
770    #[test]
771    fn receive_pack_chunk_rejects_cumulative_size_over_limit_before_buffering() {
772        let mut state = PackChunkState::default();
773
774        receive_pack_chunk_with_limit(&mut state, false, 0, 0, false, b"abcd", false, 8).unwrap();
775        receive_pack_chunk_with_limit(&mut state, false, 4, 1, false, b"efgh", false, 8).unwrap();
776
777        let error = receive_pack_chunk_with_limit(&mut state, false, 8, 2, false, b"i", false, 8)
778            .unwrap_err();
779
780        assert_eq!(state.pack_data, b"abcdefgh");
781        assert!(
782            error
783                .to_string()
784                .contains("native pack body exceeds receive size limit")
785        );
786        assert!(error.to_string().contains("9 bytes (max 8)"));
787    }
788
789    #[test]
790    fn receive_pack_chunk_checks_production_limit_before_extending_buffer() {
791        let mut state = PackChunkState {
792            pack_progress: (MAX_RECEIVED_PACK_SIZE - 1, 0),
793            ..PackChunkState::default()
794        };
795
796        let error = receive_pack_chunk(
797            &mut state,
798            false,
799            MAX_RECEIVED_PACK_SIZE - 1,
800            0,
801            false,
802            b"xx",
803            false,
804        )
805        .unwrap_err();
806
807        assert!(state.pack_data.is_empty());
808        assert!(
809            error
810                .to_string()
811                .contains("native pack body exceeds receive size limit")
812        );
813    }
814
815    #[test]
816    fn receive_pack_chunk_rejects_resume_offset_mismatch_before_buffering() {
817        let mut state = PackChunkState::default();
818
819        let error =
820            receive_pack_chunk(&mut state, false, 1, 0, false, b"late chunk", false).unwrap_err();
821
822        assert!(state.pack_data.is_empty());
823        assert!(
824            error
825                .to_string()
826                .contains("native pack chunk resume offset mismatch: expected 0, got 1")
827        );
828    }
829
830    #[test]
831    fn receive_pack_chunk_rejects_chunk_index_mismatch_before_buffering() {
832        let mut state = PackChunkState::default();
833
834        receive_pack_chunk(&mut state, false, 0, 0, false, b"abc", false).unwrap();
835        let error = receive_pack_chunk(&mut state, false, 3, 2, false, b"def", false).unwrap_err();
836
837        assert_eq!(state.pack_data, b"abc");
838        assert!(
839            error
840                .to_string()
841                .contains("native pack chunk index mismatch: expected 1, got 2")
842        );
843    }
844
845    #[test]
846    fn git_pack_chunk_state_requires_ordered_chunks_and_final_size() {
847        let mut state = GitPackChunkState::default();
848
849        assert!(
850            state
851                .receive_chunk("git-pack:test", 0, 0, false, 8, b"abcd")
852                .unwrap()
853                .is_none()
854        );
855        let error = state
856            .receive_chunk("git-pack:test", 4, 2, true, 8, b"efgh")
857            .unwrap_err();
858
859        assert!(
860            error
861                .to_string()
862                .contains("Git pack chunk index mismatch: expected 1, got 2")
863        );
864        assert!(state.ensure_idle().is_err());
865
866        let mut state = GitPackChunkState::default();
867        state
868            .receive_chunk("git-pack:test", 0, 0, false, 8, b"abcd")
869            .unwrap();
870        let complete = state
871            .receive_chunk("git-pack:test", 4, 1, true, 8, b"efgh")
872            .unwrap()
873            .unwrap();
874
875        assert_eq!(complete, b"abcdefgh");
876        assert!(state.ensure_idle().is_ok());
877    }
878
879    #[test]
880    fn receive_pack_chunk_accepts_completion_flags_for_pack_and_index() {
881        let mut state = PackChunkState::default();
882
883        receive_pack_chunk(&mut state, false, 0, 0, true, b"pack-body", false).unwrap();
884        assert!(!state.is_complete());
885        receive_pack_chunk(&mut state, true, 0, 0, false, b"pack-index", true).unwrap();
886
887        assert!(state.is_complete());
888        assert_eq!(state.pack_data, b"pack-body");
889        assert_eq!(state.index_data, b"pack-index");
890    }
891
892    #[test]
893    fn normal_size_native_pack_receives_and_installs() {
894        let (_source_temp, source_store) = create_test_store();
895        let (_dest_temp, dest_store) = create_test_store();
896        let blob = Blob::from("native pack receive regression");
897        let hash = source_store.put_blob(&blob).unwrap();
898        let bundle = build_native_pack(
899            &source_store,
900            &[ObjectInfo {
901                id: ObjectId::Hash(hash),
902                obj_type: ObjectType::Blob,
903                size: blob.size() as u64,
904                delta_base: None,
905            }],
906        )
907        .unwrap();
908
909        let mut state = PackChunkState::default();
910        let mut chunk_index = 0usize;
911        while let Some((start, data, is_final)) = next_pack_chunk(&bundle.pack_data, 7, chunk_index)
912        {
913            receive_pack_chunk(
914                &mut state,
915                false,
916                start as u64,
917                chunk_index as u32,
918                is_final,
919                &data,
920                is_final,
921            )
922            .unwrap();
923            chunk_index += 1;
924        }
925
926        let mut index_chunk = 0usize;
927        while let Some((start, data, is_final)) =
928            next_pack_chunk(&bundle.index_data, 5, index_chunk)
929        {
930            receive_pack_chunk(
931                &mut state,
932                true,
933                start as u64,
934                index_chunk as u32,
935                is_final,
936                &data,
937                is_final,
938            )
939            .unwrap();
940            index_chunk += 1;
941        }
942
943        assert!(state.is_complete());
944        assert_eq!(state.pack_data, bundle.pack_data);
945        assert_eq!(state.index_data, bundle.index_data);
946
947        let installed_ids =
948            install_received_pack(&dest_store, &state.pack_data, &state.index_data).unwrap();
949
950        assert_eq!(installed_ids, vec![PackObjectId::Hash(hash)]);
951        let installed_blob = dest_store.get_blob(&hash).unwrap().unwrap();
952        assert_eq!(installed_blob.content(), blob.content());
953    }
954
955    #[test]
956    fn normal_size_native_pack_spools_and_installs() {
957        let (_source_temp, source_store) = create_test_store();
958        let (dest_temp, dest_store) = create_test_store();
959        let blob = Blob::from("native pack spooled receive regression");
960        let hash = source_store.put_blob(&blob).unwrap();
961        let bundle = build_native_pack(
962            &source_store,
963            &[ObjectInfo {
964                id: ObjectId::Hash(hash),
965                obj_type: ObjectType::Blob,
966                size: blob.size() as u64,
967                delta_base: None,
968            }],
969        )
970        .unwrap();
971
972        let mut spool = PackChunkSpool::new_in(dest_temp.path()).unwrap();
973        let mut chunk_index = 0usize;
974        while let Some((start, data, is_final)) = next_pack_chunk(&bundle.pack_data, 7, chunk_index)
975        {
976            spool
977                .receive_chunk(
978                    false,
979                    start as u64,
980                    chunk_index as u32,
981                    is_final,
982                    &data,
983                    is_final,
984                )
985                .unwrap();
986            chunk_index += 1;
987        }
988
989        let mut index_chunk = 0usize;
990        while let Some((start, data, is_final)) =
991            next_pack_chunk(&bundle.index_data, 5, index_chunk)
992        {
993            spool
994                .receive_chunk(
995                    true,
996                    start as u64,
997                    index_chunk as u32,
998                    is_final,
999                    &data,
1000                    is_final,
1001                )
1002                .unwrap();
1003            index_chunk += 1;
1004        }
1005
1006        assert!(spool.is_complete());
1007        let installed_ids = spool.install_into(&dest_store).unwrap();
1008
1009        assert_eq!(installed_ids, vec![PackObjectId::Hash(hash)]);
1010        let installed_blob = dest_store.get_blob(&hash).unwrap().unwrap();
1011        assert_eq!(installed_blob.content(), blob.content());
1012    }
1013
1014    #[test]
1015    fn native_pack_streaming_writer_drains_growing_pack_and_installs() {
1016        let (source_temp, source_store) = create_test_store();
1017        let (dest_temp, dest_store) = create_test_store();
1018        let blob = Blob::from("native pack growing stream regression");
1019        let hash = source_store.put_blob(&blob).unwrap();
1020        let large_blob = Blob::from_slice(&vec![b'z'; 4096]);
1021        let large_hash = source_store.put_blob(&large_blob).unwrap();
1022
1023        let mut writer = NativePackStreamingWriter::new_in(source_temp.path(), 2).unwrap();
1024        let mut pack_reader = GrowingPackChunkReader::open(writer.pack_path(), 31).unwrap();
1025        let mut spool = PackChunkSpool::new_in(dest_temp.path()).unwrap();
1026        let mut saw_interleaved_pack_chunk = false;
1027
1028        for (id, obj_type, data) in [
1029            (
1030                ObjectId::Hash(hash),
1031                ObjectType::Blob,
1032                blob.content().to_vec(),
1033            ),
1034            (
1035                ObjectId::Hash(large_hash),
1036                ObjectType::Blob,
1037                large_blob.content().to_vec(),
1038            ),
1039        ] {
1040            writer
1041                .add_object_data(ObjectData {
1042                    id,
1043                    obj_type,
1044                    data,
1045                    is_delta: false,
1046                })
1047                .unwrap();
1048            writer.flush_pack().unwrap();
1049            while let Some((offset, chunk_index, data, is_final)) =
1050                pack_reader.next_available_chunk(false).unwrap()
1051            {
1052                assert!(
1053                    !is_final,
1054                    "pre-final growing pack drain must not mark chunks final"
1055                );
1056                saw_interleaved_pack_chunk = true;
1057                spool
1058                    .receive_chunk(false, offset, chunk_index, false, &data, false)
1059                    .unwrap();
1060            }
1061        }
1062
1063        let bundle = writer.finish().unwrap();
1064        let mut saw_final_pack_chunk = false;
1065        while let Some((offset, chunk_index, data, is_final)) =
1066            pack_reader.next_available_chunk(true).unwrap()
1067        {
1068            saw_final_pack_chunk |= is_final;
1069            spool
1070                .receive_chunk(false, offset, chunk_index, is_final, &data, is_final)
1071                .unwrap();
1072        }
1073
1074        let mut index_reader = PackFileChunkReader::open(&bundle.index_path, 17).unwrap();
1075        while let Some((offset, chunk_index, data, is_final)) = index_reader.next_chunk().unwrap() {
1076            spool
1077                .receive_chunk(true, offset, chunk_index, is_final, &data, is_final)
1078                .unwrap();
1079        }
1080
1081        assert!(
1082            saw_interleaved_pack_chunk,
1083            "expected at least one pack chunk before finalize"
1084        );
1085        assert!(
1086            saw_final_pack_chunk,
1087            "expected final pack chunk after finish"
1088        );
1089        assert!(spool.is_complete());
1090        let mut installed_ids = spool.install_into(&dest_store).unwrap();
1091        let mut expected_ids = vec![PackObjectId::Hash(hash), PackObjectId::Hash(large_hash)];
1092        installed_ids.sort();
1093        expected_ids.sort();
1094
1095        assert_eq!(installed_ids, expected_ids);
1096        let installed_blob = dest_store.get_blob(&hash).unwrap().unwrap();
1097        assert_eq!(installed_blob.content(), blob.content());
1098        let installed_large_blob = dest_store.get_blob(&large_hash).unwrap().unwrap();
1099        assert_eq!(installed_large_blob.content(), large_blob.content());
1100    }
1101}