Skip to main content

wire/
object_transfer.rs

1// SPDX-License-Identifier: Apache-2.0
2use objects::{
3    object::{State, Tree},
4    store::ObjectStore,
5};
6
7use crate::{ObjectData, ObjectId, ObjectRequest, ObjectType, ProtocolError, Result};
8
9/// Maximum redaction sidecar blob accepted from the pull stream, per blob.
10///
11/// Redaction sidecars are signed range lists for a single blob — orders of
12/// magnitude smaller than the blob payload they describe. 64 MiB bounds the
13/// server-controlled receive buffer on the pull stream (the same
14/// unbounded-allocation OOM class #366 closed for the native pack/index
15/// buffers) while leaving generous headroom for any legitimate record.
16pub const MAX_RECEIVED_REDACTIONS_BLOB_SIZE: u64 = 64 * 1024 * 1024;
17
18/// Maximum state-visibility sidecar blob accepted from the pull stream, per
19/// state.
20///
21/// State-visibility sidecars are per-state tier records, not object payloads.
22/// 64 MiB bounds this second server-controlled pull-stream buffer with the
23/// same receive-side cap.
24pub const MAX_RECEIVED_STATE_VISIBILITY_BLOB_SIZE: u64 = 64 * 1024 * 1024;
25
26/// Envelope headroom added on top of the largest legitimate sidecar blob when
27/// sizing the pull-stream gRPC decode limit. Covers the protobuf fields that
28/// wrap a max-size sidecar blob in a `PullMessage` — the oneof tag, the
29/// `blob_hash`/`state_id` string, and the transfer checkpoint — none of which
30/// approach a MiB. Kept deliberately tight (not generously round): the decode
31/// limit is a per-*message* bound, so the unavoidable slop above the precise
32/// per-blob cap equals this headroom. Minimizing it keeps the worst-case
33/// attacker-forced allocation within ~1 MiB of the 64 MiB blob cap; the exact
34/// per-blob cap for that residual window is enforced by the post-decode
35/// `check_received_transfer_blob_size` defense-in-depth check.
36const PULL_DECODE_ENVELOPE_HEADROOM: u64 = 1024 * 1024;
37
38const fn max_u64(a: u64, b: u64) -> u64 {
39    if a > b { a } else { b }
40}
41
42/// Inbound gRPC decode limit for the pull stream (tonic's
43/// `max_decoding_message_size`).
44///
45/// This is the *load-bearing* bound on the single-shot, server-controlled
46/// sidecar allocation. tonic refuses to decode an inbound `PullMessage` larger
47/// than this, so an oversized `redactions_blob` / `state_visibility_blob` is
48/// rejected at the decode boundary *before* its `Vec<u8>` is materialized.
49/// [`check_received_transfer_blob_size`] is retained as a cheap post-decode
50/// defense-in-depth check, but the allocation itself is bounded here.
51///
52/// Sized to the largest legitimate single message — a sidecar transfer carrying
53/// a max-size blob ([`MAX_RECEIVED_REDACTIONS_BLOB_SIZE`] /
54/// [`MAX_RECEIVED_STATE_VISIBILITY_BLOB_SIZE`], 64 MiB) — plus
55/// [`PULL_DECODE_ENVELOPE_HEADROOM`]. Native pack chunks share this stream but
56/// are bounded far below this by the negotiated chunk size, so they are
57/// unaffected.
58pub const MAX_PULL_DECODE_MESSAGE_SIZE: usize = (max_u64(
59    MAX_RECEIVED_REDACTIONS_BLOB_SIZE,
60    MAX_RECEIVED_STATE_VISIBILITY_BLOB_SIZE,
61) + PULL_DECODE_ENVELOPE_HEADROOM) as usize;
62
63/// Reject a received per-object transfer sidecar blob whose length exceeds
64/// `max_bytes`, before it is handed to the repository accept path.
65///
66/// Sidecar blobs (redaction, state-visibility) arrive as single
67/// server-controlled buffers on the pull stream. This is the single-shot
68/// analogue of [`crate::receive_pack_chunk`]'s running-total check: it bounds
69/// the in-memory allocation a hostile or buggy server can drive on the receive
70/// side. `kind` names the blob in the error (e.g. `"redactions"`).
71pub fn check_received_transfer_blob_size(
72    blob_len: usize,
73    max_bytes: u64,
74    kind: &str,
75) -> Result<()> {
76    let len = u64::try_from(blob_len).map_err(|_| {
77        ProtocolError::InvalidState(format!("{kind} blob length does not fit in u64"))
78    })?;
79    if len > max_bytes {
80        return Err(ProtocolError::InvalidState(format!(
81            "{kind} blob exceeds receive size limit: {len} bytes (max {max_bytes})"
82        )));
83    }
84    Ok(())
85}
86
87#[allow(dead_code)]
88pub fn chunk_count(object_size: usize, chunk_size: usize) -> usize {
89    if object_size == 0 || chunk_size == 0 {
90        return 0;
91    }
92    object_size.div_ceil(chunk_size)
93}
94
95#[allow(dead_code)]
96pub fn chunk_bounds(
97    object_size: usize,
98    chunk_size: usize,
99    chunk_index: usize,
100) -> Option<(usize, usize)> {
101    if chunk_size == 0 {
102        return None;
103    }
104
105    let start = chunk_index.checked_mul(chunk_size)?;
106    if start >= object_size {
107        return None;
108    }
109    let end = (start + chunk_size).min(object_size);
110    Some((start, end - start))
111}
112
113#[allow(dead_code)]
114pub fn chunk_offset(chunk_index: usize, chunk_size: usize) -> Option<usize> {
115    chunk_index.checked_mul(chunk_size)
116}
117
118pub fn load_requested_object(store: &impl ObjectStore, req: &ObjectRequest) -> Result<ObjectData> {
119    // Note on sidecar objects: redactions and state visibility are keyed by
120    // ids that also identify primary objects. `load_requested_object`
121    // resolves blob-vs-tree or state by id shape/probe; it cannot
122    // disambiguate a sidecar request by ObjectId alone. Callers that need to
123    // fetch a sidecar must use `load_object_data` with an explicit object
124    // type.
125    let (obj_type, data) = match &req.id {
126        ObjectId::Hash(hash) => {
127            if let Some(blob) = store.get_blob(hash)? {
128                (ObjectType::Blob, blob.content().to_vec())
129            } else if let Some(tree) = store.get_tree(hash)? {
130                (ObjectType::Tree, rmp_serde::to_vec_named(&tree)?)
131            } else {
132                return Err(ProtocolError::ObjectNotFound(hash.to_hex()));
133            }
134        }
135        ObjectId::ChangeId(change_id) => {
136            let state = store
137                .get_state(change_id)?
138                .ok_or_else(|| ProtocolError::ObjectNotFound(change_id.to_string()))?;
139            (ObjectType::State, rmp_serde::to_vec_named(&state)?)
140        }
141    };
142
143    Ok(ObjectData {
144        id: req.id.clone(),
145        obj_type,
146        data,
147        is_delta: false,
148    })
149}
150
151pub fn load_object_data(
152    store: &impl ObjectStore,
153    id: &ObjectId,
154    obj_type: ObjectType,
155) -> Result<ObjectData> {
156    let data = match (id, obj_type) {
157        (ObjectId::Hash(hash), ObjectType::Blob) => store
158            .get_blob(hash)?
159            .ok_or_else(|| ProtocolError::ObjectNotFound(hash.to_hex()))?
160            .content()
161            .to_vec(),
162        (ObjectId::Hash(hash), ObjectType::Tree) => {
163            let tree = store
164                .get_tree(hash)?
165                .ok_or_else(|| ProtocolError::ObjectNotFound(hash.to_hex()))?;
166            rmp_serde::to_vec_named(&tree)?
167        }
168        (ObjectId::ChangeId(change_id), ObjectType::State) => {
169            let state = store
170                .get_state(change_id)?
171                .ok_or_else(|| ProtocolError::ObjectNotFound(change_id.to_string()))?;
172            rmp_serde::to_vec_named(&state)?
173        }
174        (ObjectId::Hash(hash), ObjectType::Redaction) => store
175            .get_redactions_bytes_for_blob(hash)?
176            .ok_or_else(|| ProtocolError::ObjectNotFound(hash.to_hex()))?,
177        (ObjectId::ChangeId(change_id), ObjectType::StateVisibility) => store
178            .get_state_visibility_bytes_for_state(change_id)?
179            .ok_or_else(|| ProtocolError::ObjectNotFound(change_id.to_string_full()))?,
180        _ => {
181            return Err(ProtocolError::InvalidState(
182                "object id/type mismatch".to_string(),
183            ));
184        }
185    };
186
187    Ok(ObjectData {
188        id: id.clone(),
189        obj_type,
190        data,
191        is_delta: false,
192    })
193}
194
195pub fn store_received_object(store: &impl ObjectStore, data: &ObjectData) -> Result<()> {
196    match (&data.id, data.obj_type) {
197        (ObjectId::Hash(hash), ObjectType::Blob) => {
198            store.put_blob_bytes_with_hash(&data.data, *hash)?;
199        }
200        (ObjectId::Hash(hash), ObjectType::Tree) => {
201            let tree: Tree = rmp_serde::from_slice(&data.data)?;
202            tree.validate().map_err(|error| {
203                ProtocolError::InvalidState(format!("invalid tree object: {error}"))
204            })?;
205            if &tree.hash() != hash {
206                return Err(ProtocolError::InvalidState(
207                    "tree hash mismatch".to_string(),
208                ));
209            }
210            store.put_tree_serialized(&data.data, *hash)?;
211        }
212        (ObjectId::ChangeId(change_id), ObjectType::State) => {
213            let state: State = rmp_serde::from_slice(&data.data)?;
214            if state.change_id != *change_id {
215                return Err(ProtocolError::InvalidState(format!(
216                    "ChangeId mismatch: expected {}, got {}",
217                    change_id, state.change_id
218                )));
219            }
220            store.put_state_serialized(&data.data, *change_id)?;
221        }
222        (_, ObjectType::Redaction) => {
223            // Redactions ship signed and need verification before any
224            // bytes hit the sidecar. Refuse here so callers route via
225            // `Repository::accept_wire_redactions` instead of silently
226            // landing an unverified record.
227            return Err(ProtocolError::InvalidState(
228                "Redaction objects must be persisted via Repository::accept_wire_redactions, \
229                 not store_received_object — signature verification is required"
230                    .to_string(),
231            ));
232        }
233        (_, ObjectType::StateVisibility) => {
234            // State visibility must be validated and normalized at the
235            // Repository boundary (`put_state_visibility` enforces
236            // public-by-absence). Refuse raw sidecar writes here.
237            return Err(ProtocolError::InvalidState(
238                "StateVisibility objects must be persisted via Repository::accept_wire_state_visibility, \
239                 not store_received_object — sidecar validation is required"
240                    .to_string(),
241            ));
242        }
243        _ => {
244            return Err(ProtocolError::InvalidState(
245                "object id/type mismatch".to_string(),
246            ));
247        }
248    }
249
250    Ok(())
251}
252
253#[cfg(test)]
254mod tests {
255    use super::*;
256
257    #[test]
258    fn test_chunk_count_rounds_up() {
259        assert_eq!(chunk_count(0, 64), 0);
260        assert_eq!(chunk_count(1, 64), 1);
261        assert_eq!(chunk_count(64, 64), 1);
262        assert_eq!(chunk_count(65, 64), 2);
263    }
264
265    #[test]
266    fn test_chunk_bounds_returns_ranges() {
267        assert_eq!(chunk_bounds(100, 32, 0), Some((0, 32)));
268        assert_eq!(chunk_bounds(100, 32, 2), Some((64, 32)));
269        assert_eq!(chunk_bounds(100, 32, 3), Some((96, 4)));
270        assert_eq!(chunk_bounds(100, 32, 4), None);
271        assert_eq!(chunk_bounds(100, 0, 0), None);
272    }
273
274    #[test]
275    fn test_chunk_offset_returns_position() {
276        assert_eq!(chunk_offset(0, 64), Some(0));
277        assert_eq!(chunk_offset(3, 64), Some(192));
278        assert_eq!(chunk_offset(usize::MAX, 2), None);
279    }
280
281    #[test]
282    fn received_transfer_blob_at_limit_is_accepted() {
283        check_received_transfer_blob_size(8, 8, "redactions").unwrap();
284    }
285
286    #[test]
287    fn received_transfer_blob_over_limit_is_rejected() {
288        let error = check_received_transfer_blob_size(9, 8, "redactions").unwrap_err();
289        let message = error.to_string();
290        assert!(
291            message.contains("redactions blob exceeds receive size limit"),
292            "unexpected error: {message}"
293        );
294        assert!(
295            message.contains("9 bytes (max 8)"),
296            "unexpected error: {message}"
297        );
298    }
299
300    #[test]
301    fn received_transfer_blob_caps_are_enforced_against_production_limits() {
302        check_received_transfer_blob_size(
303            MAX_RECEIVED_REDACTIONS_BLOB_SIZE as usize,
304            MAX_RECEIVED_REDACTIONS_BLOB_SIZE,
305            "redactions",
306        )
307        .unwrap();
308        check_received_transfer_blob_size(
309            MAX_RECEIVED_STATE_VISIBILITY_BLOB_SIZE as usize,
310            MAX_RECEIVED_STATE_VISIBILITY_BLOB_SIZE,
311            "state-visibility",
312        )
313        .unwrap();
314    }
315}