1use objects::{
3 object::{State, Tree},
4 store::ObjectStore,
5};
6
7use crate::{ObjectData, ObjectId, ObjectRequest, ObjectType, ProtocolError, Result};
8
9pub const MAX_RECEIVED_REDACTIONS_BLOB_SIZE: u64 = 64 * 1024 * 1024;
17
18pub const MAX_RECEIVED_STATE_VISIBILITY_BLOB_SIZE: u64 = 64 * 1024 * 1024;
25
26const PULL_DECODE_ENVELOPE_HEADROOM: u64 = 1024 * 1024;
37
38const fn max_u64(a: u64, b: u64) -> u64 {
39 if a > b { a } else { b }
40}
41
42pub const MAX_PULL_DECODE_MESSAGE_SIZE: usize = (max_u64(
59 MAX_RECEIVED_REDACTIONS_BLOB_SIZE,
60 MAX_RECEIVED_STATE_VISIBILITY_BLOB_SIZE,
61) + PULL_DECODE_ENVELOPE_HEADROOM) as usize;
62
63pub fn check_received_transfer_blob_size(
72 blob_len: usize,
73 max_bytes: u64,
74 kind: &str,
75) -> Result<()> {
76 let len = u64::try_from(blob_len).map_err(|_| {
77 ProtocolError::InvalidState(format!("{kind} blob length does not fit in u64"))
78 })?;
79 if len > max_bytes {
80 return Err(ProtocolError::InvalidState(format!(
81 "{kind} blob exceeds receive size limit: {len} bytes (max {max_bytes})"
82 )));
83 }
84 Ok(())
85}
86
87#[allow(dead_code)]
88pub fn chunk_count(object_size: usize, chunk_size: usize) -> usize {
89 if object_size == 0 || chunk_size == 0 {
90 return 0;
91 }
92 object_size.div_ceil(chunk_size)
93}
94
95#[allow(dead_code)]
96pub fn chunk_bounds(
97 object_size: usize,
98 chunk_size: usize,
99 chunk_index: usize,
100) -> Option<(usize, usize)> {
101 if chunk_size == 0 {
102 return None;
103 }
104
105 let start = chunk_index.checked_mul(chunk_size)?;
106 if start >= object_size {
107 return None;
108 }
109 let end = (start + chunk_size).min(object_size);
110 Some((start, end - start))
111}
112
113#[allow(dead_code)]
114pub fn chunk_offset(chunk_index: usize, chunk_size: usize) -> Option<usize> {
115 chunk_index.checked_mul(chunk_size)
116}
117
118pub fn load_requested_object(store: &impl ObjectStore, req: &ObjectRequest) -> Result<ObjectData> {
119 let (obj_type, data) = match &req.id {
126 ObjectId::Hash(hash) => {
127 if let Some(blob) = store.get_blob(hash)? {
128 (ObjectType::Blob, blob.content().to_vec())
129 } else if let Some(tree) = store.get_tree(hash)? {
130 (ObjectType::Tree, rmp_serde::to_vec_named(&tree)?)
131 } else {
132 return Err(ProtocolError::ObjectNotFound(hash.to_hex()));
133 }
134 }
135 ObjectId::ChangeId(change_id) => {
136 let state = store
137 .get_state(change_id)?
138 .ok_or_else(|| ProtocolError::ObjectNotFound(change_id.to_string()))?;
139 (ObjectType::State, rmp_serde::to_vec_named(&state)?)
140 }
141 };
142
143 Ok(ObjectData {
144 id: req.id.clone(),
145 obj_type,
146 data,
147 is_delta: false,
148 })
149}
150
151pub fn load_object_data(
152 store: &impl ObjectStore,
153 id: &ObjectId,
154 obj_type: ObjectType,
155) -> Result<ObjectData> {
156 let data = match (id, obj_type) {
157 (ObjectId::Hash(hash), ObjectType::Blob) => store
158 .get_blob(hash)?
159 .ok_or_else(|| ProtocolError::ObjectNotFound(hash.to_hex()))?
160 .content()
161 .to_vec(),
162 (ObjectId::Hash(hash), ObjectType::Tree) => {
163 let tree = store
164 .get_tree(hash)?
165 .ok_or_else(|| ProtocolError::ObjectNotFound(hash.to_hex()))?;
166 rmp_serde::to_vec_named(&tree)?
167 }
168 (ObjectId::ChangeId(change_id), ObjectType::State) => {
169 let state = store
170 .get_state(change_id)?
171 .ok_or_else(|| ProtocolError::ObjectNotFound(change_id.to_string()))?;
172 rmp_serde::to_vec_named(&state)?
173 }
174 (ObjectId::Hash(hash), ObjectType::Redaction) => store
175 .get_redactions_bytes_for_blob(hash)?
176 .ok_or_else(|| ProtocolError::ObjectNotFound(hash.to_hex()))?,
177 (ObjectId::ChangeId(change_id), ObjectType::StateVisibility) => store
178 .get_state_visibility_bytes_for_state(change_id)?
179 .ok_or_else(|| ProtocolError::ObjectNotFound(change_id.to_string_full()))?,
180 _ => {
181 return Err(ProtocolError::InvalidState(
182 "object id/type mismatch".to_string(),
183 ));
184 }
185 };
186
187 Ok(ObjectData {
188 id: id.clone(),
189 obj_type,
190 data,
191 is_delta: false,
192 })
193}
194
195pub fn store_received_object(store: &impl ObjectStore, data: &ObjectData) -> Result<()> {
196 match (&data.id, data.obj_type) {
197 (ObjectId::Hash(hash), ObjectType::Blob) => {
198 store.put_blob_bytes_with_hash(&data.data, *hash)?;
199 }
200 (ObjectId::Hash(hash), ObjectType::Tree) => {
201 let tree: Tree = rmp_serde::from_slice(&data.data)?;
202 tree.validate().map_err(|error| {
203 ProtocolError::InvalidState(format!("invalid tree object: {error}"))
204 })?;
205 if &tree.hash() != hash {
206 return Err(ProtocolError::InvalidState(
207 "tree hash mismatch".to_string(),
208 ));
209 }
210 store.put_tree_serialized(&data.data, *hash)?;
211 }
212 (ObjectId::ChangeId(change_id), ObjectType::State) => {
213 let state: State = rmp_serde::from_slice(&data.data)?;
214 if state.change_id != *change_id {
215 return Err(ProtocolError::InvalidState(format!(
216 "ChangeId mismatch: expected {}, got {}",
217 change_id, state.change_id
218 )));
219 }
220 store.put_state_serialized(&data.data, *change_id)?;
221 }
222 (_, ObjectType::Redaction) => {
223 return Err(ProtocolError::InvalidState(
228 "Redaction objects must be persisted via Repository::accept_wire_redactions, \
229 not store_received_object — signature verification is required"
230 .to_string(),
231 ));
232 }
233 (_, ObjectType::StateVisibility) => {
234 return Err(ProtocolError::InvalidState(
238 "StateVisibility objects must be persisted via Repository::accept_wire_state_visibility, \
239 not store_received_object — sidecar validation is required"
240 .to_string(),
241 ));
242 }
243 _ => {
244 return Err(ProtocolError::InvalidState(
245 "object id/type mismatch".to_string(),
246 ));
247 }
248 }
249
250 Ok(())
251}
252
253#[cfg(test)]
254mod tests {
255 use objects::{
256 object::{Attribution, Blob, ContentHash, Principal, State, Tree, TreeEntry},
257 store::{FsStore, ObjectStore},
258 };
259 use tempfile::TempDir;
260
261 use super::*;
262
263 fn create_test_store() -> (TempDir, FsStore) {
264 let temp = TempDir::new().unwrap();
265 let store = FsStore::new(temp.path().join(".heddle"));
266 store.init().unwrap();
267 (temp, store)
268 }
269
270 fn test_attribution() -> Attribution {
271 Attribution::human(Principal::new("Wire Tester", "wire@example.com"))
272 }
273
274 #[test]
275 fn primary_objects_roundtrip_through_wire_data() {
276 let (_source_temp, source) = create_test_store();
277 let (_dest_temp, dest) = create_test_store();
278
279 let blob = Blob::from("wire transfer blob\n");
280 let blob_hash = source.put_blob(&blob).unwrap();
281 let tree = Tree::from_entries(vec![TreeEntry::file("lib.rs", blob_hash, false).unwrap()]);
282 let tree_hash = source.put_tree(&tree).unwrap();
283 let state = State::new(tree_hash, Vec::new(), test_attribution())
284 .with_intent("exercise wire transfer");
285 source.put_state(&state).unwrap();
286
287 let blob_data = load_requested_object(
288 &source,
289 &ObjectRequest {
290 id: ObjectId::Hash(blob_hash),
291 have_base: None,
292 },
293 )
294 .unwrap();
295 assert_eq!(blob_data.obj_type, ObjectType::Blob);
296 assert_eq!(blob_data.data, blob.content());
297 store_received_object(&dest, &blob_data).unwrap();
298 assert_eq!(
299 dest.get_blob(&blob_hash).unwrap().unwrap().content(),
300 blob.content()
301 );
302
303 let tree_data = load_requested_object(
304 &source,
305 &ObjectRequest {
306 id: ObjectId::Hash(tree_hash),
307 have_base: None,
308 },
309 )
310 .unwrap();
311 assert_eq!(tree_data.obj_type, ObjectType::Tree);
312 assert_eq!(
313 rmp_serde::from_slice::<Tree>(&tree_data.data).unwrap(),
314 tree
315 );
316 store_received_object(&dest, &tree_data).unwrap();
317 assert_eq!(dest.get_tree(&tree_hash).unwrap().unwrap(), tree);
318
319 let state_data = load_requested_object(
320 &source,
321 &ObjectRequest {
322 id: ObjectId::ChangeId(state.change_id),
323 have_base: None,
324 },
325 )
326 .unwrap();
327 assert_eq!(state_data.obj_type, ObjectType::State);
328 assert_eq!(
329 rmp_serde::from_slice::<State>(&state_data.data).unwrap(),
330 state
331 );
332 store_received_object(&dest, &state_data).unwrap();
333 assert_eq!(
334 dest.get_state(&state.change_id).unwrap().unwrap().change_id,
335 state.change_id
336 );
337 }
338
339 #[test]
340 fn load_object_data_reports_missing_and_id_type_mismatch_errors() {
341 let (_temp, store) = create_test_store();
342 let missing_hash = ContentHash::from_bytes([7; 32]);
343 let missing_state = objects::object::ChangeId::from_bytes([9; 16]);
344
345 let missing = load_requested_object(
346 &store,
347 &ObjectRequest {
348 id: ObjectId::Hash(missing_hash),
349 have_base: None,
350 },
351 )
352 .unwrap_err();
353 assert!(
354 matches!(missing, ProtocolError::ObjectNotFound(id) if id == missing_hash.to_hex())
355 );
356
357 let missing = load_requested_object(
358 &store,
359 &ObjectRequest {
360 id: ObjectId::ChangeId(missing_state),
361 have_base: None,
362 },
363 )
364 .unwrap_err();
365 assert!(
366 matches!(missing, ProtocolError::ObjectNotFound(id) if id == missing_state.to_string())
367 );
368
369 let mismatch =
370 load_object_data(&store, &ObjectId::Hash(missing_hash), ObjectType::State).unwrap_err();
371 assert!(
372 matches!(mismatch, ProtocolError::InvalidState(message) if message == "object id/type mismatch")
373 );
374
375 let mismatch =
376 load_object_data(&store, &ObjectId::ChangeId(missing_state), ObjectType::Blob)
377 .unwrap_err();
378 assert!(
379 matches!(mismatch, ProtocolError::InvalidState(message) if message == "object id/type mismatch")
380 );
381 }
382
383 #[test]
384 fn store_received_object_rejects_mismatched_object_identity() {
385 let (_temp, store) = create_test_store();
386 let blob = Blob::from("tree leaf");
387 let blob_hash = store.put_blob(&blob).unwrap();
388 let tree = Tree::from_entries(vec![TreeEntry::file("leaf.txt", blob_hash, false).unwrap()]);
389 let tree_bytes = rmp_serde::to_vec_named(&tree).unwrap();
390 let wrong_hash = ContentHash::from_bytes([4; 32]);
391
392 let error = store_received_object(
393 &store,
394 &ObjectData {
395 id: ObjectId::Hash(wrong_hash),
396 obj_type: ObjectType::Tree,
397 data: tree_bytes,
398 is_delta: false,
399 },
400 )
401 .unwrap_err();
402 assert!(
403 matches!(error, ProtocolError::InvalidState(message) if message == "tree hash mismatch")
404 );
405
406 let state = State::new(tree.hash(), Vec::new(), test_attribution());
407 let wrong_state_id = objects::object::ChangeId::from_bytes([5; 16]);
408 let error = store_received_object(
409 &store,
410 &ObjectData {
411 id: ObjectId::ChangeId(wrong_state_id),
412 obj_type: ObjectType::State,
413 data: rmp_serde::to_vec_named(&state).unwrap(),
414 is_delta: false,
415 },
416 )
417 .unwrap_err();
418 assert!(
419 matches!(error, ProtocolError::InvalidState(message) if message.contains("ChangeId mismatch"))
420 );
421 }
422
423 #[test]
424 fn store_received_object_rejects_raw_sidecar_objects() {
425 let (_temp, store) = create_test_store();
426 let blob_hash = ContentHash::from_bytes([1; 32]);
427 let state_id = objects::object::ChangeId::from_bytes([2; 16]);
428
429 let redaction_error = store_received_object(
430 &store,
431 &ObjectData {
432 id: ObjectId::Hash(blob_hash),
433 obj_type: ObjectType::Redaction,
434 data: b"unsigned redaction bytes".to_vec(),
435 is_delta: false,
436 },
437 )
438 .unwrap_err();
439 assert!(
440 matches!(redaction_error, ProtocolError::InvalidState(message) if message.contains("signature verification is required"))
441 );
442
443 let visibility_error = store_received_object(
444 &store,
445 &ObjectData {
446 id: ObjectId::ChangeId(state_id),
447 obj_type: ObjectType::StateVisibility,
448 data: b"raw visibility bytes".to_vec(),
449 is_delta: false,
450 },
451 )
452 .unwrap_err();
453 assert!(
454 matches!(visibility_error, ProtocolError::InvalidState(message) if message.contains("sidecar validation is required"))
455 );
456 }
457
458 #[test]
459 fn test_chunk_count_rounds_up() {
460 assert_eq!(chunk_count(0, 64), 0);
461 assert_eq!(chunk_count(1, 64), 1);
462 assert_eq!(chunk_count(64, 64), 1);
463 assert_eq!(chunk_count(65, 64), 2);
464 }
465
466 #[test]
467 fn test_chunk_bounds_returns_ranges() {
468 assert_eq!(chunk_bounds(100, 32, 0), Some((0, 32)));
469 assert_eq!(chunk_bounds(100, 32, 2), Some((64, 32)));
470 assert_eq!(chunk_bounds(100, 32, 3), Some((96, 4)));
471 assert_eq!(chunk_bounds(100, 32, 4), None);
472 assert_eq!(chunk_bounds(100, 0, 0), None);
473 }
474
475 #[test]
476 fn test_chunk_offset_returns_position() {
477 assert_eq!(chunk_offset(0, 64), Some(0));
478 assert_eq!(chunk_offset(3, 64), Some(192));
479 assert_eq!(chunk_offset(usize::MAX, 2), None);
480 }
481
482 #[test]
483 fn received_transfer_blob_at_limit_is_accepted() {
484 check_received_transfer_blob_size(8, 8, "redactions").unwrap();
485 }
486
487 #[test]
488 fn received_transfer_blob_over_limit_is_rejected() {
489 let error = check_received_transfer_blob_size(9, 8, "redactions").unwrap_err();
490 let message = error.to_string();
491 assert!(
492 message.contains("redactions blob exceeds receive size limit"),
493 "unexpected error: {message}"
494 );
495 assert!(
496 message.contains("9 bytes (max 8)"),
497 "unexpected error: {message}"
498 );
499 }
500
501 #[test]
502 fn received_transfer_blob_caps_are_enforced_against_production_limits() {
503 check_received_transfer_blob_size(
504 MAX_RECEIVED_REDACTIONS_BLOB_SIZE as usize,
505 MAX_RECEIVED_REDACTIONS_BLOB_SIZE,
506 "redactions",
507 )
508 .unwrap();
509 check_received_transfer_blob_size(
510 MAX_RECEIVED_STATE_VISIBILITY_BLOB_SIZE as usize,
511 MAX_RECEIVED_STATE_VISIBILITY_BLOB_SIZE,
512 "state-visibility",
513 )
514 .unwrap();
515 }
516}