Skip to main content

wire/
object_transfer.rs

1// SPDX-License-Identifier: Apache-2.0
2use objects::{
3    object::{State, Tree},
4    store::ObjectStore,
5};
6
7use crate::{ObjectData, ObjectId, ObjectRequest, ObjectType, ProtocolError, Result};
8
9/// Maximum redaction sidecar blob accepted from the pull stream, per blob.
10///
11/// Redaction sidecars are signed range lists for a single blob — orders of
12/// magnitude smaller than the blob payload they describe. 64 MiB bounds the
13/// server-controlled receive buffer on the pull stream (the same
14/// unbounded-allocation OOM class #366 closed for the native pack/index
15/// buffers) while leaving generous headroom for any legitimate record.
16pub const MAX_RECEIVED_REDACTIONS_BLOB_SIZE: u64 = 64 * 1024 * 1024;
17
18/// Maximum state-visibility sidecar blob accepted from the pull stream, per
19/// state.
20///
21/// State-visibility sidecars are per-state tier records, not object payloads.
22/// 64 MiB bounds this second server-controlled pull-stream buffer with the
23/// same receive-side cap.
24pub const MAX_RECEIVED_STATE_VISIBILITY_BLOB_SIZE: u64 = 64 * 1024 * 1024;
25
26/// Envelope headroom added on top of the largest legitimate sidecar blob when
27/// sizing the pull-stream gRPC decode limit. Covers the protobuf fields that
28/// wrap a max-size sidecar blob in a `PullMessage` — the oneof tag, the
29/// `blob_hash`/`state_id` string, and the transfer checkpoint — none of which
30/// approach a MiB. Kept deliberately tight (not generously round): the decode
31/// limit is a per-*message* bound, so the unavoidable slop above the precise
32/// per-blob cap equals this headroom. Minimizing it keeps the worst-case
33/// attacker-forced allocation within ~1 MiB of the 64 MiB blob cap; the exact
34/// per-blob cap for that residual window is enforced by the post-decode
35/// `check_received_transfer_blob_size` defense-in-depth check.
36const PULL_DECODE_ENVELOPE_HEADROOM: u64 = 1024 * 1024;
37
38const fn max_u64(a: u64, b: u64) -> u64 {
39    if a > b { a } else { b }
40}
41
42/// Inbound gRPC decode limit for the pull stream (tonic's
43/// `max_decoding_message_size`).
44///
45/// This is the *load-bearing* bound on the single-shot, server-controlled
46/// sidecar allocation. tonic refuses to decode an inbound `PullMessage` larger
47/// than this, so an oversized `redactions_blob` / `state_visibility_blob` is
48/// rejected at the decode boundary *before* its `Vec<u8>` is materialized.
49/// [`check_received_transfer_blob_size`] is retained as a cheap post-decode
50/// defense-in-depth check, but the allocation itself is bounded here.
51///
52/// Sized to the largest legitimate single message — a sidecar transfer carrying
53/// a max-size blob ([`MAX_RECEIVED_REDACTIONS_BLOB_SIZE`] /
54/// [`MAX_RECEIVED_STATE_VISIBILITY_BLOB_SIZE`], 64 MiB) — plus
55/// [`PULL_DECODE_ENVELOPE_HEADROOM`]. Native pack chunks share this stream but
56/// are bounded far below this by the negotiated chunk size, so they are
57/// unaffected.
58pub const MAX_PULL_DECODE_MESSAGE_SIZE: usize = (max_u64(
59    MAX_RECEIVED_REDACTIONS_BLOB_SIZE,
60    MAX_RECEIVED_STATE_VISIBILITY_BLOB_SIZE,
61) + PULL_DECODE_ENVELOPE_HEADROOM) as usize;
62
63/// Reject a received per-object transfer sidecar blob whose length exceeds
64/// `max_bytes`, before it is handed to the repository accept path.
65///
66/// Sidecar blobs (redaction, state-visibility) arrive as single
67/// server-controlled buffers on the pull stream. This is the single-shot
68/// analogue of [`crate::receive_pack_chunk`]'s running-total check: it bounds
69/// the in-memory allocation a hostile or buggy server can drive on the receive
70/// side. `kind` names the blob in the error (e.g. `"redactions"`).
71pub fn check_received_transfer_blob_size(
72    blob_len: usize,
73    max_bytes: u64,
74    kind: &str,
75) -> Result<()> {
76    let len = u64::try_from(blob_len).map_err(|_| {
77        ProtocolError::InvalidState(format!("{kind} blob length does not fit in u64"))
78    })?;
79    if len > max_bytes {
80        return Err(ProtocolError::InvalidState(format!(
81            "{kind} blob exceeds receive size limit: {len} bytes (max {max_bytes})"
82        )));
83    }
84    Ok(())
85}
86
87#[allow(dead_code)]
88pub fn chunk_count(object_size: usize, chunk_size: usize) -> usize {
89    if object_size == 0 || chunk_size == 0 {
90        return 0;
91    }
92    object_size.div_ceil(chunk_size)
93}
94
95#[allow(dead_code)]
96pub fn chunk_bounds(
97    object_size: usize,
98    chunk_size: usize,
99    chunk_index: usize,
100) -> Option<(usize, usize)> {
101    if chunk_size == 0 {
102        return None;
103    }
104
105    let start = chunk_index.checked_mul(chunk_size)?;
106    if start >= object_size {
107        return None;
108    }
109    let end = (start + chunk_size).min(object_size);
110    Some((start, end - start))
111}
112
113#[allow(dead_code)]
114pub fn chunk_offset(chunk_index: usize, chunk_size: usize) -> Option<usize> {
115    chunk_index.checked_mul(chunk_size)
116}
117
118pub fn load_requested_object(store: &impl ObjectStore, req: &ObjectRequest) -> Result<ObjectData> {
119    // Note on sidecar objects: redactions and state visibility are keyed by
120    // ids that also identify primary objects. `load_requested_object`
121    // resolves blob-vs-tree or state by id shape/probe; it cannot
122    // disambiguate a sidecar request by ObjectId alone. Callers that need to
123    // fetch a sidecar must use `load_object_data` with an explicit object
124    // type.
125    let (obj_type, data) = match &req.id {
126        ObjectId::Hash(hash) => {
127            if let Some(blob) = store.get_blob(hash)? {
128                (ObjectType::Blob, blob.content().to_vec())
129            } else if let Some(tree) = store.get_tree(hash)? {
130                (ObjectType::Tree, rmp_serde::to_vec_named(&tree)?)
131            } else {
132                return Err(ProtocolError::ObjectNotFound(hash.to_hex()));
133            }
134        }
135        ObjectId::ChangeId(change_id) => {
136            let state = store
137                .get_state(change_id)?
138                .ok_or_else(|| ProtocolError::ObjectNotFound(change_id.to_string()))?;
139            (ObjectType::State, rmp_serde::to_vec_named(&state)?)
140        }
141    };
142
143    Ok(ObjectData {
144        id: req.id.clone(),
145        obj_type,
146        data,
147        is_delta: false,
148    })
149}
150
151pub fn load_object_data(
152    store: &impl ObjectStore,
153    id: &ObjectId,
154    obj_type: ObjectType,
155) -> Result<ObjectData> {
156    let data = match (id, obj_type) {
157        (ObjectId::Hash(hash), ObjectType::Blob) => store
158            .get_blob(hash)?
159            .ok_or_else(|| ProtocolError::ObjectNotFound(hash.to_hex()))?
160            .content()
161            .to_vec(),
162        (ObjectId::Hash(hash), ObjectType::Tree) => {
163            let tree = store
164                .get_tree(hash)?
165                .ok_or_else(|| ProtocolError::ObjectNotFound(hash.to_hex()))?;
166            rmp_serde::to_vec_named(&tree)?
167        }
168        (ObjectId::ChangeId(change_id), ObjectType::State) => {
169            let state = store
170                .get_state(change_id)?
171                .ok_or_else(|| ProtocolError::ObjectNotFound(change_id.to_string()))?;
172            rmp_serde::to_vec_named(&state)?
173        }
174        (ObjectId::Hash(hash), ObjectType::Redaction) => store
175            .get_redactions_bytes_for_blob(hash)?
176            .ok_or_else(|| ProtocolError::ObjectNotFound(hash.to_hex()))?,
177        (ObjectId::ChangeId(change_id), ObjectType::StateVisibility) => store
178            .get_state_visibility_bytes_for_state(change_id)?
179            .ok_or_else(|| ProtocolError::ObjectNotFound(change_id.to_string_full()))?,
180        _ => {
181            return Err(ProtocolError::InvalidState(
182                "object id/type mismatch".to_string(),
183            ));
184        }
185    };
186
187    Ok(ObjectData {
188        id: id.clone(),
189        obj_type,
190        data,
191        is_delta: false,
192    })
193}
194
195pub fn store_received_object(store: &impl ObjectStore, data: &ObjectData) -> Result<()> {
196    match (&data.id, data.obj_type) {
197        (ObjectId::Hash(hash), ObjectType::Blob) => {
198            store.put_blob_bytes_with_hash(&data.data, *hash)?;
199        }
200        (ObjectId::Hash(hash), ObjectType::Tree) => {
201            let tree: Tree = rmp_serde::from_slice(&data.data)?;
202            tree.validate().map_err(|error| {
203                ProtocolError::InvalidState(format!("invalid tree object: {error}"))
204            })?;
205            if &tree.hash() != hash {
206                return Err(ProtocolError::InvalidState(
207                    "tree hash mismatch".to_string(),
208                ));
209            }
210            store.put_tree_serialized(&data.data, *hash)?;
211        }
212        (ObjectId::ChangeId(change_id), ObjectType::State) => {
213            let state: State = rmp_serde::from_slice(&data.data)?;
214            if state.change_id != *change_id {
215                return Err(ProtocolError::InvalidState(format!(
216                    "ChangeId mismatch: expected {}, got {}",
217                    change_id, state.change_id
218                )));
219            }
220            store.put_state_serialized(&data.data, *change_id)?;
221        }
222        (_, ObjectType::Redaction) => {
223            // Redactions ship signed and need verification before any
224            // bytes hit the sidecar. Refuse here so callers route via
225            // `Repository::accept_wire_redactions` instead of silently
226            // landing an unverified record.
227            return Err(ProtocolError::InvalidState(
228                "Redaction objects must be persisted via Repository::accept_wire_redactions, \
229                 not store_received_object — signature verification is required"
230                    .to_string(),
231            ));
232        }
233        (_, ObjectType::StateVisibility) => {
234            // State visibility must be validated and normalized at the
235            // Repository boundary (`put_state_visibility` enforces
236            // public-by-absence). Refuse raw sidecar writes here.
237            return Err(ProtocolError::InvalidState(
238                "StateVisibility objects must be persisted via Repository::accept_wire_state_visibility, \
239                 not store_received_object — sidecar validation is required"
240                    .to_string(),
241            ));
242        }
243        _ => {
244            return Err(ProtocolError::InvalidState(
245                "object id/type mismatch".to_string(),
246            ));
247        }
248    }
249
250    Ok(())
251}
252
253#[cfg(test)]
254mod tests {
255    use objects::{
256        object::{Attribution, Blob, ContentHash, Principal, State, Tree, TreeEntry},
257        store::{FsStore, ObjectStore},
258    };
259    use tempfile::TempDir;
260
261    use super::*;
262
263    fn create_test_store() -> (TempDir, FsStore) {
264        let temp = TempDir::new().unwrap();
265        let store = FsStore::new(temp.path().join(".heddle"));
266        store.init().unwrap();
267        (temp, store)
268    }
269
270    fn test_attribution() -> Attribution {
271        Attribution::human(Principal::new("Wire Tester", "wire@example.com"))
272    }
273
274    #[test]
275    fn primary_objects_roundtrip_through_wire_data() {
276        let (_source_temp, source) = create_test_store();
277        let (_dest_temp, dest) = create_test_store();
278
279        let blob = Blob::from("wire transfer blob\n");
280        let blob_hash = source.put_blob(&blob).unwrap();
281        let tree = Tree::from_entries(vec![TreeEntry::file("lib.rs", blob_hash, false).unwrap()]);
282        let tree_hash = source.put_tree(&tree).unwrap();
283        let state = State::new(tree_hash, Vec::new(), test_attribution())
284            .with_intent("exercise wire transfer");
285        source.put_state(&state).unwrap();
286
287        let blob_data = load_requested_object(
288            &source,
289            &ObjectRequest {
290                id: ObjectId::Hash(blob_hash),
291                have_base: None,
292            },
293        )
294        .unwrap();
295        assert_eq!(blob_data.obj_type, ObjectType::Blob);
296        assert_eq!(blob_data.data, blob.content());
297        store_received_object(&dest, &blob_data).unwrap();
298        assert_eq!(
299            dest.get_blob(&blob_hash).unwrap().unwrap().content(),
300            blob.content()
301        );
302
303        let tree_data = load_requested_object(
304            &source,
305            &ObjectRequest {
306                id: ObjectId::Hash(tree_hash),
307                have_base: None,
308            },
309        )
310        .unwrap();
311        assert_eq!(tree_data.obj_type, ObjectType::Tree);
312        assert_eq!(
313            rmp_serde::from_slice::<Tree>(&tree_data.data).unwrap(),
314            tree
315        );
316        store_received_object(&dest, &tree_data).unwrap();
317        assert_eq!(dest.get_tree(&tree_hash).unwrap().unwrap(), tree);
318
319        let state_data = load_requested_object(
320            &source,
321            &ObjectRequest {
322                id: ObjectId::ChangeId(state.change_id),
323                have_base: None,
324            },
325        )
326        .unwrap();
327        assert_eq!(state_data.obj_type, ObjectType::State);
328        assert_eq!(
329            rmp_serde::from_slice::<State>(&state_data.data).unwrap(),
330            state
331        );
332        store_received_object(&dest, &state_data).unwrap();
333        assert_eq!(
334            dest.get_state(&state.change_id).unwrap().unwrap().change_id,
335            state.change_id
336        );
337    }
338
339    #[test]
340    fn load_object_data_reports_missing_and_id_type_mismatch_errors() {
341        let (_temp, store) = create_test_store();
342        let missing_hash = ContentHash::from_bytes([7; 32]);
343        let missing_state = objects::object::ChangeId::from_bytes([9; 16]);
344
345        let missing = load_requested_object(
346            &store,
347            &ObjectRequest {
348                id: ObjectId::Hash(missing_hash),
349                have_base: None,
350            },
351        )
352        .unwrap_err();
353        assert!(
354            matches!(missing, ProtocolError::ObjectNotFound(id) if id == missing_hash.to_hex())
355        );
356
357        let missing = load_requested_object(
358            &store,
359            &ObjectRequest {
360                id: ObjectId::ChangeId(missing_state),
361                have_base: None,
362            },
363        )
364        .unwrap_err();
365        assert!(
366            matches!(missing, ProtocolError::ObjectNotFound(id) if id == missing_state.to_string())
367        );
368
369        let mismatch =
370            load_object_data(&store, &ObjectId::Hash(missing_hash), ObjectType::State).unwrap_err();
371        assert!(
372            matches!(mismatch, ProtocolError::InvalidState(message) if message == "object id/type mismatch")
373        );
374
375        let mismatch =
376            load_object_data(&store, &ObjectId::ChangeId(missing_state), ObjectType::Blob)
377                .unwrap_err();
378        assert!(
379            matches!(mismatch, ProtocolError::InvalidState(message) if message == "object id/type mismatch")
380        );
381    }
382
383    #[test]
384    fn store_received_object_rejects_mismatched_object_identity() {
385        let (_temp, store) = create_test_store();
386        let blob = Blob::from("tree leaf");
387        let blob_hash = store.put_blob(&blob).unwrap();
388        let tree = Tree::from_entries(vec![TreeEntry::file("leaf.txt", blob_hash, false).unwrap()]);
389        let tree_bytes = rmp_serde::to_vec_named(&tree).unwrap();
390        let wrong_hash = ContentHash::from_bytes([4; 32]);
391
392        let error = store_received_object(
393            &store,
394            &ObjectData {
395                id: ObjectId::Hash(wrong_hash),
396                obj_type: ObjectType::Tree,
397                data: tree_bytes,
398                is_delta: false,
399            },
400        )
401        .unwrap_err();
402        assert!(
403            matches!(error, ProtocolError::InvalidState(message) if message == "tree hash mismatch")
404        );
405
406        let state = State::new(tree.hash(), Vec::new(), test_attribution());
407        let wrong_state_id = objects::object::ChangeId::from_bytes([5; 16]);
408        let error = store_received_object(
409            &store,
410            &ObjectData {
411                id: ObjectId::ChangeId(wrong_state_id),
412                obj_type: ObjectType::State,
413                data: rmp_serde::to_vec_named(&state).unwrap(),
414                is_delta: false,
415            },
416        )
417        .unwrap_err();
418        assert!(
419            matches!(error, ProtocolError::InvalidState(message) if message.contains("ChangeId mismatch"))
420        );
421    }
422
423    #[test]
424    fn store_received_object_rejects_raw_sidecar_objects() {
425        let (_temp, store) = create_test_store();
426        let blob_hash = ContentHash::from_bytes([1; 32]);
427        let state_id = objects::object::ChangeId::from_bytes([2; 16]);
428
429        let redaction_error = store_received_object(
430            &store,
431            &ObjectData {
432                id: ObjectId::Hash(blob_hash),
433                obj_type: ObjectType::Redaction,
434                data: b"unsigned redaction bytes".to_vec(),
435                is_delta: false,
436            },
437        )
438        .unwrap_err();
439        assert!(
440            matches!(redaction_error, ProtocolError::InvalidState(message) if message.contains("signature verification is required"))
441        );
442
443        let visibility_error = store_received_object(
444            &store,
445            &ObjectData {
446                id: ObjectId::ChangeId(state_id),
447                obj_type: ObjectType::StateVisibility,
448                data: b"raw visibility bytes".to_vec(),
449                is_delta: false,
450            },
451        )
452        .unwrap_err();
453        assert!(
454            matches!(visibility_error, ProtocolError::InvalidState(message) if message.contains("sidecar validation is required"))
455        );
456    }
457
458    #[test]
459    fn test_chunk_count_rounds_up() {
460        assert_eq!(chunk_count(0, 64), 0);
461        assert_eq!(chunk_count(1, 64), 1);
462        assert_eq!(chunk_count(64, 64), 1);
463        assert_eq!(chunk_count(65, 64), 2);
464    }
465
466    #[test]
467    fn test_chunk_bounds_returns_ranges() {
468        assert_eq!(chunk_bounds(100, 32, 0), Some((0, 32)));
469        assert_eq!(chunk_bounds(100, 32, 2), Some((64, 32)));
470        assert_eq!(chunk_bounds(100, 32, 3), Some((96, 4)));
471        assert_eq!(chunk_bounds(100, 32, 4), None);
472        assert_eq!(chunk_bounds(100, 0, 0), None);
473    }
474
475    #[test]
476    fn test_chunk_offset_returns_position() {
477        assert_eq!(chunk_offset(0, 64), Some(0));
478        assert_eq!(chunk_offset(3, 64), Some(192));
479        assert_eq!(chunk_offset(usize::MAX, 2), None);
480    }
481
482    #[test]
483    fn received_transfer_blob_at_limit_is_accepted() {
484        check_received_transfer_blob_size(8, 8, "redactions").unwrap();
485    }
486
487    #[test]
488    fn received_transfer_blob_over_limit_is_rejected() {
489        let error = check_received_transfer_blob_size(9, 8, "redactions").unwrap_err();
490        let message = error.to_string();
491        assert!(
492            message.contains("redactions blob exceeds receive size limit"),
493            "unexpected error: {message}"
494        );
495        assert!(
496            message.contains("9 bytes (max 8)"),
497            "unexpected error: {message}"
498        );
499    }
500
501    #[test]
502    fn received_transfer_blob_caps_are_enforced_against_production_limits() {
503        check_received_transfer_blob_size(
504            MAX_RECEIVED_REDACTIONS_BLOB_SIZE as usize,
505            MAX_RECEIVED_REDACTIONS_BLOB_SIZE,
506            "redactions",
507        )
508        .unwrap();
509        check_received_transfer_blob_size(
510            MAX_RECEIVED_STATE_VISIBILITY_BLOB_SIZE as usize,
511            MAX_RECEIVED_STATE_VISIBILITY_BLOB_SIZE,
512            "state-visibility",
513        )
514        .unwrap();
515    }
516}