Skip to main content

wire/
native_pack.rs

1// SPDX-License-Identifier: Apache-2.0
2use objects::store::{
3    CompressionConfig, ObjectStore,
4    pack::{ObjectType as PackObjectType, PackBuilder, PackObjectId},
5};
6
7use crate::{ObjectId, ObjectInfo, ObjectType, ProtocolError, Result, load_object_data};
8
9/// Maximum hosted native-pack body accepted by the receive primitive.
10///
11/// Native sync packs are produced from bounded state-closure wants and
12/// each decoded pack object is separately capped at 1 GiB in the pack
13/// reader. A 2 GiB compressed pack is materially above normal hosted
14/// sync use while still preventing an untrusted server from growing the
15/// in-memory receive buffer without limit. The receive path can now move
16/// to temp-file spooling plus `install_pack_streaming` — that install API
17/// reports the installed ids the receiver needs, so only the spooling of
18/// the receive buffer itself remains.
19pub const MAX_RECEIVED_PACK_SIZE: u64 = 2 * 1024 * 1024 * 1024;
20
21/// Maximum hosted native-pack index accepted by the receive primitive.
22///
23/// Pack indexes are proportional to object count, not object payload
24/// size. 256 MiB leaves room for millions of entries while bounding the
25/// second in-memory buffer controlled by the remote sender.
26pub const MAX_RECEIVED_PACK_INDEX_SIZE: u64 = 256 * 1024 * 1024;
27
28#[derive(Debug, Clone)]
29pub struct NativePackBundle {
30    pub pack_data: Vec<u8>,
31    pub index_data: Vec<u8>,
32}
33
34#[derive(Debug, Default, Clone)]
35pub struct PackChunkState {
36    pub pack_data: Vec<u8>,
37    pub index_data: Vec<u8>,
38    pack_progress: (u64, u32),
39    index_progress: (u64, u32),
40    pack_complete: bool,
41    index_complete: bool,
42}
43
44impl PackChunkState {
45    pub fn is_complete(&self) -> bool {
46        self.pack_complete && self.index_complete
47    }
48}
49
50pub fn native_pack_excluded_object_types() -> &'static [ObjectType] {
51    &[ObjectType::Redaction, ObjectType::StateVisibility]
52}
53
54pub fn is_native_packable_object_type(obj_type: ObjectType) -> bool {
55    !native_pack_excluded_object_types().contains(&obj_type)
56}
57
58pub fn build_native_pack(
59    store: &impl ObjectStore,
60    objects: &[ObjectInfo],
61) -> Result<NativePackBundle> {
62    let mut builder = PackBuilder::new(sync_pack_compression());
63
64    for info in objects {
65        // Sidecar records (redaction + state-visibility) live outside
66        // `.heddle/objects/` so GC cannot touch them, and must not be
67        // folded into the content-addressed pack. They ship via the
68        // per-object transfer path instead; callers split them out before
69        // packing.
70        if !is_native_packable_object_type(info.obj_type) {
71            continue;
72        }
73        let object = load_object_data(store, &info.id, info.obj_type)?;
74        let pack_id = to_pack_object_id(&object.id);
75        builder.add_id(pack_id, to_pack_object_type(object.obj_type)?, object.data);
76    }
77
78    let (pack_data, index_data, _) = builder.build()?;
79    Ok(NativePackBundle {
80        pack_data,
81        index_data,
82    })
83}
84
85fn sync_pack_compression() -> CompressionConfig {
86    CompressionConfig {
87        level: 1,
88        min_size: 1024,
89        max_delta_size: 0,
90        ..CompressionConfig::default()
91    }
92}
93
94pub fn install_received_pack(
95    store: &impl ObjectStore,
96    pack_data: &[u8],
97    index_data: &[u8],
98) -> Result<Vec<PackObjectId>> {
99    store
100        .install_pack(pack_data, index_data)
101        .map_err(ProtocolError::from)
102}
103
104pub fn next_pack_chunk(
105    data: &[u8],
106    chunk_size: usize,
107    chunk_index: usize,
108) -> Option<(usize, Vec<u8>, bool)> {
109    let (start, len) = crate::chunk_bounds(data.len(), chunk_size.max(1), chunk_index)?;
110    let is_final = start + len == data.len();
111    Some((start, data[start..start + len].to_vec(), is_final))
112}
113
114pub fn receive_pack_chunk(
115    state: &mut PackChunkState,
116    is_index: bool,
117    resume_offset: u64,
118    chunk_index: u32,
119    is_complete: bool,
120    data: &[u8],
121    is_final_chunk: bool,
122) -> Result<()> {
123    let max_bytes = if is_index {
124        MAX_RECEIVED_PACK_INDEX_SIZE
125    } else {
126        MAX_RECEIVED_PACK_SIZE
127    };
128    receive_pack_chunk_with_limit(
129        state,
130        is_index,
131        resume_offset,
132        chunk_index,
133        is_complete,
134        data,
135        is_final_chunk,
136        max_bytes,
137    )
138}
139
140#[allow(clippy::too_many_arguments)]
141fn receive_pack_chunk_with_limit(
142    state: &mut PackChunkState,
143    is_index: bool,
144    resume_offset: u64,
145    chunk_index: u32,
146    is_complete: bool,
147    data: &[u8],
148    is_final_chunk: bool,
149    max_bytes: u64,
150) -> Result<()> {
151    let (buffer, progress, complete) = if is_index {
152        (
153            &mut state.index_data,
154            &mut state.index_progress,
155            &mut state.index_complete,
156        )
157    } else {
158        (
159            &mut state.pack_data,
160            &mut state.pack_progress,
161            &mut state.pack_complete,
162        )
163    };
164
165    if resume_offset != progress.0 {
166        return Err(ProtocolError::InvalidState(format!(
167            "native pack chunk resume offset mismatch: expected {}, got {}",
168            progress.0, resume_offset
169        )));
170    }
171    if chunk_index != progress.1 {
172        return Err(ProtocolError::InvalidState(format!(
173            "native pack chunk index mismatch: expected {}, got {}",
174            progress.1, chunk_index
175        )));
176    }
177
178    let data_len = u64::try_from(data.len()).map_err(|_| {
179        ProtocolError::InvalidState("native pack chunk length does not fit in u64".to_string())
180    })?;
181    let next_offset = progress.0.checked_add(data_len).ok_or_else(|| {
182        ProtocolError::InvalidState("native pack chunk offset overflow".to_string())
183    })?;
184    if next_offset > max_bytes {
185        let stream_name = if is_index { "index" } else { "body" };
186        return Err(ProtocolError::InvalidState(format!(
187            "native pack {stream_name} exceeds receive size limit: {next_offset} bytes (max {max_bytes})"
188        )));
189    }
190
191    buffer.extend_from_slice(data);
192    *progress = (next_offset, progress.1 + 1);
193    if is_final_chunk || is_complete {
194        *complete = true;
195    }
196    Ok(())
197}
198
199fn to_pack_object_id(id: &ObjectId) -> PackObjectId {
200    match id {
201        ObjectId::Hash(hash) => PackObjectId::Hash(*hash),
202        ObjectId::ChangeId(change_id) => PackObjectId::ChangeId(*change_id),
203    }
204}
205
206fn to_pack_object_type(obj_type: ObjectType) -> Result<PackObjectType> {
207    match obj_type {
208        ObjectType::Blob => Ok(PackObjectType::Blob),
209        ObjectType::Tree => Ok(PackObjectType::Tree),
210        ObjectType::State => Ok(PackObjectType::State),
211        ObjectType::Action => Ok(PackObjectType::Action),
212        ObjectType::Redaction => Err(ProtocolError::InvalidState(
213            "Redaction sidecar records cannot be packed into the content-addressed object pack"
214                .to_string(),
215        )),
216        ObjectType::StateVisibility => Err(ProtocolError::InvalidState(
217            "StateVisibility sidecar records cannot be packed into the content-addressed object pack"
218                .to_string(),
219        )),
220    }
221}
222
223#[cfg(test)]
224mod tests {
225    use objects::{
226        object::Blob,
227        store::{FsStore, ObjectStore, pack::PackObjectId},
228    };
229    use tempfile::TempDir;
230
231    use super::{
232        MAX_RECEIVED_PACK_SIZE, ObjectId, ObjectInfo, ObjectType, PackChunkState,
233        build_native_pack, install_received_pack, next_pack_chunk, receive_pack_chunk,
234        receive_pack_chunk_with_limit,
235    };
236
237    fn create_test_store() -> (TempDir, FsStore) {
238        let temp = TempDir::new().unwrap();
239        let store = FsStore::new(temp.path().join(".heddle"));
240        store.init().unwrap();
241        (temp, store)
242    }
243
244    #[test]
245    fn receive_pack_chunk_rejects_cumulative_size_over_limit_before_buffering() {
246        let mut state = PackChunkState::default();
247
248        receive_pack_chunk_with_limit(&mut state, false, 0, 0, false, b"abcd", false, 8).unwrap();
249        receive_pack_chunk_with_limit(&mut state, false, 4, 1, false, b"efgh", false, 8).unwrap();
250
251        let error = receive_pack_chunk_with_limit(&mut state, false, 8, 2, false, b"i", false, 8)
252            .unwrap_err();
253
254        assert_eq!(state.pack_data, b"abcdefgh");
255        assert!(
256            error
257                .to_string()
258                .contains("native pack body exceeds receive size limit")
259        );
260        assert!(error.to_string().contains("9 bytes (max 8)"));
261    }
262
263    #[test]
264    fn receive_pack_chunk_checks_production_limit_before_extending_buffer() {
265        let mut state = PackChunkState {
266            pack_progress: (MAX_RECEIVED_PACK_SIZE - 1, 0),
267            ..PackChunkState::default()
268        };
269
270        let error = receive_pack_chunk(
271            &mut state,
272            false,
273            MAX_RECEIVED_PACK_SIZE - 1,
274            0,
275            false,
276            b"xx",
277            false,
278        )
279        .unwrap_err();
280
281        assert!(state.pack_data.is_empty());
282        assert!(
283            error
284                .to_string()
285                .contains("native pack body exceeds receive size limit")
286        );
287    }
288
289    #[test]
290    fn normal_size_native_pack_receives_and_installs() {
291        let (_source_temp, source_store) = create_test_store();
292        let (_dest_temp, dest_store) = create_test_store();
293        let blob = Blob::from("native pack receive regression");
294        let hash = source_store.put_blob(&blob).unwrap();
295        let bundle = build_native_pack(
296            &source_store,
297            &[ObjectInfo {
298                id: ObjectId::Hash(hash),
299                obj_type: ObjectType::Blob,
300                size: blob.size() as u64,
301                delta_base: None,
302            }],
303        )
304        .unwrap();
305
306        let mut state = PackChunkState::default();
307        let mut chunk_index = 0usize;
308        while let Some((start, data, is_final)) = next_pack_chunk(&bundle.pack_data, 7, chunk_index)
309        {
310            receive_pack_chunk(
311                &mut state,
312                false,
313                start as u64,
314                chunk_index as u32,
315                is_final,
316                &data,
317                is_final,
318            )
319            .unwrap();
320            chunk_index += 1;
321        }
322
323        let mut index_chunk = 0usize;
324        while let Some((start, data, is_final)) =
325            next_pack_chunk(&bundle.index_data, 5, index_chunk)
326        {
327            receive_pack_chunk(
328                &mut state,
329                true,
330                start as u64,
331                index_chunk as u32,
332                is_final,
333                &data,
334                is_final,
335            )
336            .unwrap();
337            index_chunk += 1;
338        }
339
340        assert!(state.is_complete());
341        assert_eq!(state.pack_data, bundle.pack_data);
342        assert_eq!(state.index_data, bundle.index_data);
343
344        let installed_ids =
345            install_received_pack(&dest_store, &state.pack_data, &state.index_data).unwrap();
346
347        assert_eq!(installed_ids, vec![PackObjectId::Hash(hash)]);
348        let installed_blob = dest_store.get_blob(&hash).unwrap().unwrap();
349        assert_eq!(installed_blob.content(), blob.content());
350    }
351}