1use objects::store::{
3 CompressionConfig, ObjectStore,
4 pack::{ObjectType as PackObjectType, PackBuilder, PackObjectId},
5};
6
7use crate::{ObjectId, ObjectInfo, ObjectType, ProtocolError, Result, load_object_data};
8
9pub const MAX_RECEIVED_PACK_SIZE: u64 = 2 * 1024 * 1024 * 1024;
20
21pub const MAX_RECEIVED_PACK_INDEX_SIZE: u64 = 256 * 1024 * 1024;
27
28#[derive(Debug, Clone)]
29pub struct NativePackBundle {
30 pub pack_data: Vec<u8>,
31 pub index_data: Vec<u8>,
32}
33
34#[derive(Debug, Default, Clone)]
35pub struct PackChunkState {
36 pub pack_data: Vec<u8>,
37 pub index_data: Vec<u8>,
38 pack_progress: (u64, u32),
39 index_progress: (u64, u32),
40 pack_complete: bool,
41 index_complete: bool,
42}
43
44impl PackChunkState {
45 pub fn is_complete(&self) -> bool {
46 self.pack_complete && self.index_complete
47 }
48}
49
50pub fn native_pack_excluded_object_types() -> &'static [ObjectType] {
51 &[ObjectType::Redaction, ObjectType::StateVisibility]
52}
53
54pub fn is_native_packable_object_type(obj_type: ObjectType) -> bool {
55 !native_pack_excluded_object_types().contains(&obj_type)
56}
57
58pub fn build_native_pack(
59 store: &impl ObjectStore,
60 objects: &[ObjectInfo],
61) -> Result<NativePackBundle> {
62 let mut builder = PackBuilder::new(sync_pack_compression());
63
64 for info in objects {
65 if !is_native_packable_object_type(info.obj_type) {
71 continue;
72 }
73 let object = load_object_data(store, &info.id, info.obj_type)?;
74 let pack_id = to_pack_object_id(&object.id);
75 builder.add_id(pack_id, to_pack_object_type(object.obj_type)?, object.data);
76 }
77
78 let (pack_data, index_data, _) = builder.build()?;
79 Ok(NativePackBundle {
80 pack_data,
81 index_data,
82 })
83}
84
85fn sync_pack_compression() -> CompressionConfig {
86 CompressionConfig {
87 level: 1,
88 min_size: 1024,
89 max_delta_size: 0,
90 ..CompressionConfig::default()
91 }
92}
93
94pub fn install_received_pack(
95 store: &impl ObjectStore,
96 pack_data: &[u8],
97 index_data: &[u8],
98) -> Result<Vec<PackObjectId>> {
99 store
100 .install_pack(pack_data, index_data)
101 .map_err(ProtocolError::from)
102}
103
104pub fn next_pack_chunk(
105 data: &[u8],
106 chunk_size: usize,
107 chunk_index: usize,
108) -> Option<(usize, Vec<u8>, bool)> {
109 let (start, len) = crate::chunk_bounds(data.len(), chunk_size.max(1), chunk_index)?;
110 let is_final = start + len == data.len();
111 Some((start, data[start..start + len].to_vec(), is_final))
112}
113
114pub fn receive_pack_chunk(
115 state: &mut PackChunkState,
116 is_index: bool,
117 resume_offset: u64,
118 chunk_index: u32,
119 is_complete: bool,
120 data: &[u8],
121 is_final_chunk: bool,
122) -> Result<()> {
123 let max_bytes = if is_index {
124 MAX_RECEIVED_PACK_INDEX_SIZE
125 } else {
126 MAX_RECEIVED_PACK_SIZE
127 };
128 receive_pack_chunk_with_limit(
129 state,
130 is_index,
131 resume_offset,
132 chunk_index,
133 is_complete,
134 data,
135 is_final_chunk,
136 max_bytes,
137 )
138}
139
140#[allow(clippy::too_many_arguments)]
141fn receive_pack_chunk_with_limit(
142 state: &mut PackChunkState,
143 is_index: bool,
144 resume_offset: u64,
145 chunk_index: u32,
146 is_complete: bool,
147 data: &[u8],
148 is_final_chunk: bool,
149 max_bytes: u64,
150) -> Result<()> {
151 let (buffer, progress, complete) = if is_index {
152 (
153 &mut state.index_data,
154 &mut state.index_progress,
155 &mut state.index_complete,
156 )
157 } else {
158 (
159 &mut state.pack_data,
160 &mut state.pack_progress,
161 &mut state.pack_complete,
162 )
163 };
164
165 if resume_offset != progress.0 {
166 return Err(ProtocolError::InvalidState(format!(
167 "native pack chunk resume offset mismatch: expected {}, got {}",
168 progress.0, resume_offset
169 )));
170 }
171 if chunk_index != progress.1 {
172 return Err(ProtocolError::InvalidState(format!(
173 "native pack chunk index mismatch: expected {}, got {}",
174 progress.1, chunk_index
175 )));
176 }
177
178 let data_len = u64::try_from(data.len()).map_err(|_| {
179 ProtocolError::InvalidState("native pack chunk length does not fit in u64".to_string())
180 })?;
181 let next_offset = progress.0.checked_add(data_len).ok_or_else(|| {
182 ProtocolError::InvalidState("native pack chunk offset overflow".to_string())
183 })?;
184 if next_offset > max_bytes {
185 let stream_name = if is_index { "index" } else { "body" };
186 return Err(ProtocolError::InvalidState(format!(
187 "native pack {stream_name} exceeds receive size limit: {next_offset} bytes (max {max_bytes})"
188 )));
189 }
190
191 buffer.extend_from_slice(data);
192 *progress = (next_offset, progress.1 + 1);
193 if is_final_chunk || is_complete {
194 *complete = true;
195 }
196 Ok(())
197}
198
199fn to_pack_object_id(id: &ObjectId) -> PackObjectId {
200 match id {
201 ObjectId::Hash(hash) => PackObjectId::Hash(*hash),
202 ObjectId::ChangeId(change_id) => PackObjectId::ChangeId(*change_id),
203 }
204}
205
206fn to_pack_object_type(obj_type: ObjectType) -> Result<PackObjectType> {
207 match obj_type {
208 ObjectType::Blob => Ok(PackObjectType::Blob),
209 ObjectType::Tree => Ok(PackObjectType::Tree),
210 ObjectType::State => Ok(PackObjectType::State),
211 ObjectType::Action => Ok(PackObjectType::Action),
212 ObjectType::Redaction => Err(ProtocolError::InvalidState(
213 "Redaction sidecar records cannot be packed into the content-addressed object pack"
214 .to_string(),
215 )),
216 ObjectType::StateVisibility => Err(ProtocolError::InvalidState(
217 "StateVisibility sidecar records cannot be packed into the content-addressed object pack"
218 .to_string(),
219 )),
220 }
221}
222
223#[cfg(test)]
224mod tests {
225 use objects::{
226 object::Blob,
227 store::{FsStore, ObjectStore, pack::PackObjectId},
228 };
229 use tempfile::TempDir;
230
231 use super::{
232 MAX_RECEIVED_PACK_SIZE, ObjectId, ObjectInfo, ObjectType, PackChunkState,
233 build_native_pack, install_received_pack, next_pack_chunk, receive_pack_chunk,
234 receive_pack_chunk_with_limit,
235 };
236
237 fn create_test_store() -> (TempDir, FsStore) {
238 let temp = TempDir::new().unwrap();
239 let store = FsStore::new(temp.path().join(".heddle"));
240 store.init().unwrap();
241 (temp, store)
242 }
243
244 #[test]
245 fn receive_pack_chunk_rejects_cumulative_size_over_limit_before_buffering() {
246 let mut state = PackChunkState::default();
247
248 receive_pack_chunk_with_limit(&mut state, false, 0, 0, false, b"abcd", false, 8).unwrap();
249 receive_pack_chunk_with_limit(&mut state, false, 4, 1, false, b"efgh", false, 8).unwrap();
250
251 let error = receive_pack_chunk_with_limit(&mut state, false, 8, 2, false, b"i", false, 8)
252 .unwrap_err();
253
254 assert_eq!(state.pack_data, b"abcdefgh");
255 assert!(
256 error
257 .to_string()
258 .contains("native pack body exceeds receive size limit")
259 );
260 assert!(error.to_string().contains("9 bytes (max 8)"));
261 }
262
263 #[test]
264 fn receive_pack_chunk_checks_production_limit_before_extending_buffer() {
265 let mut state = PackChunkState {
266 pack_progress: (MAX_RECEIVED_PACK_SIZE - 1, 0),
267 ..PackChunkState::default()
268 };
269
270 let error = receive_pack_chunk(
271 &mut state,
272 false,
273 MAX_RECEIVED_PACK_SIZE - 1,
274 0,
275 false,
276 b"xx",
277 false,
278 )
279 .unwrap_err();
280
281 assert!(state.pack_data.is_empty());
282 assert!(
283 error
284 .to_string()
285 .contains("native pack body exceeds receive size limit")
286 );
287 }
288
289 #[test]
290 fn normal_size_native_pack_receives_and_installs() {
291 let (_source_temp, source_store) = create_test_store();
292 let (_dest_temp, dest_store) = create_test_store();
293 let blob = Blob::from("native pack receive regression");
294 let hash = source_store.put_blob(&blob).unwrap();
295 let bundle = build_native_pack(
296 &source_store,
297 &[ObjectInfo {
298 id: ObjectId::Hash(hash),
299 obj_type: ObjectType::Blob,
300 size: blob.size() as u64,
301 delta_base: None,
302 }],
303 )
304 .unwrap();
305
306 let mut state = PackChunkState::default();
307 let mut chunk_index = 0usize;
308 while let Some((start, data, is_final)) = next_pack_chunk(&bundle.pack_data, 7, chunk_index)
309 {
310 receive_pack_chunk(
311 &mut state,
312 false,
313 start as u64,
314 chunk_index as u32,
315 is_final,
316 &data,
317 is_final,
318 )
319 .unwrap();
320 chunk_index += 1;
321 }
322
323 let mut index_chunk = 0usize;
324 while let Some((start, data, is_final)) =
325 next_pack_chunk(&bundle.index_data, 5, index_chunk)
326 {
327 receive_pack_chunk(
328 &mut state,
329 true,
330 start as u64,
331 index_chunk as u32,
332 is_final,
333 &data,
334 is_final,
335 )
336 .unwrap();
337 index_chunk += 1;
338 }
339
340 assert!(state.is_complete());
341 assert_eq!(state.pack_data, bundle.pack_data);
342 assert_eq!(state.index_data, bundle.index_data);
343
344 let installed_ids =
345 install_received_pack(&dest_store, &state.pack_data, &state.index_data).unwrap();
346
347 assert_eq!(installed_ids, vec![PackObjectId::Hash(hash)]);
348 let installed_blob = dest_store.get_blob(&hash).unwrap().unwrap();
349 assert_eq!(installed_blob.content(), blob.content());
350 }
351}