1use objects::{
3 object::{State, Tree},
4 store::ObjectStore,
5};
6
7use crate::{ObjectData, ObjectId, ObjectRequest, ObjectType, ProtocolError, Result};
8
9pub const MAX_RECEIVED_REDACTIONS_BLOB_SIZE: u64 = 64 * 1024 * 1024;
17
18pub const MAX_RECEIVED_STATE_VISIBILITY_BLOB_SIZE: u64 = 64 * 1024 * 1024;
25
26const PULL_DECODE_ENVELOPE_HEADROOM: u64 = 1024 * 1024;
37
38const fn max_u64(a: u64, b: u64) -> u64 {
39 if a > b { a } else { b }
40}
41
42pub const MAX_PULL_DECODE_MESSAGE_SIZE: usize = (max_u64(
59 MAX_RECEIVED_REDACTIONS_BLOB_SIZE,
60 MAX_RECEIVED_STATE_VISIBILITY_BLOB_SIZE,
61) + PULL_DECODE_ENVELOPE_HEADROOM) as usize;
62
63pub fn check_received_transfer_blob_size(
72 blob_len: usize,
73 max_bytes: u64,
74 kind: &str,
75) -> Result<()> {
76 let len = u64::try_from(blob_len).map_err(|_| {
77 ProtocolError::InvalidState(format!("{kind} blob length does not fit in u64"))
78 })?;
79 if len > max_bytes {
80 return Err(ProtocolError::InvalidState(format!(
81 "{kind} blob exceeds receive size limit: {len} bytes (max {max_bytes})"
82 )));
83 }
84 Ok(())
85}
86
87#[allow(dead_code)]
88pub fn chunk_count(object_size: usize, chunk_size: usize) -> usize {
89 if object_size == 0 || chunk_size == 0 {
90 return 0;
91 }
92 object_size.div_ceil(chunk_size)
93}
94
95#[allow(dead_code)]
96pub fn chunk_bounds(
97 object_size: usize,
98 chunk_size: usize,
99 chunk_index: usize,
100) -> Option<(usize, usize)> {
101 if chunk_size == 0 {
102 return None;
103 }
104
105 let start = chunk_index.checked_mul(chunk_size)?;
106 if start >= object_size {
107 return None;
108 }
109 let end = (start + chunk_size).min(object_size);
110 Some((start, end - start))
111}
112
113#[allow(dead_code)]
114pub fn chunk_offset(chunk_index: usize, chunk_size: usize) -> Option<usize> {
115 chunk_index.checked_mul(chunk_size)
116}
117
118pub fn load_requested_object(store: &impl ObjectStore, req: &ObjectRequest) -> Result<ObjectData> {
119 let (obj_type, data) = match &req.id {
126 ObjectId::Hash(hash) => {
127 if let Some(blob) = store.get_blob(hash)? {
128 (ObjectType::Blob, blob.content().to_vec())
129 } else if let Some(tree) = store.get_tree(hash)? {
130 (ObjectType::Tree, rmp_serde::to_vec_named(&tree)?)
131 } else {
132 return Err(ProtocolError::ObjectNotFound(hash.to_hex()));
133 }
134 }
135 ObjectId::ChangeId(change_id) => {
136 let state = store
137 .get_state(change_id)?
138 .ok_or_else(|| ProtocolError::ObjectNotFound(change_id.to_string()))?;
139 (ObjectType::State, rmp_serde::to_vec_named(&state)?)
140 }
141 };
142
143 Ok(ObjectData {
144 id: req.id.clone(),
145 obj_type,
146 data,
147 is_delta: false,
148 })
149}
150
151pub fn load_object_data(
152 store: &impl ObjectStore,
153 id: &ObjectId,
154 obj_type: ObjectType,
155) -> Result<ObjectData> {
156 let data = match (id, obj_type) {
157 (ObjectId::Hash(hash), ObjectType::Blob) => store
158 .get_blob(hash)?
159 .ok_or_else(|| ProtocolError::ObjectNotFound(hash.to_hex()))?
160 .content()
161 .to_vec(),
162 (ObjectId::Hash(hash), ObjectType::Tree) => {
163 let tree = store
164 .get_tree(hash)?
165 .ok_or_else(|| ProtocolError::ObjectNotFound(hash.to_hex()))?;
166 rmp_serde::to_vec_named(&tree)?
167 }
168 (ObjectId::ChangeId(change_id), ObjectType::State) => {
169 let state = store
170 .get_state(change_id)?
171 .ok_or_else(|| ProtocolError::ObjectNotFound(change_id.to_string()))?;
172 rmp_serde::to_vec_named(&state)?
173 }
174 (ObjectId::Hash(hash), ObjectType::Redaction) => store
175 .get_redactions_bytes_for_blob(hash)?
176 .ok_or_else(|| ProtocolError::ObjectNotFound(hash.to_hex()))?,
177 (ObjectId::ChangeId(change_id), ObjectType::StateVisibility) => store
178 .get_state_visibility_bytes_for_state(change_id)?
179 .ok_or_else(|| ProtocolError::ObjectNotFound(change_id.to_string_full()))?,
180 _ => {
181 return Err(ProtocolError::InvalidState(
182 "object id/type mismatch".to_string(),
183 ));
184 }
185 };
186
187 Ok(ObjectData {
188 id: id.clone(),
189 obj_type,
190 data,
191 is_delta: false,
192 })
193}
194
195pub fn store_received_object(store: &impl ObjectStore, data: &ObjectData) -> Result<()> {
196 match (&data.id, data.obj_type) {
197 (ObjectId::Hash(hash), ObjectType::Blob) => {
198 store.put_blob_bytes_with_hash(&data.data, *hash)?;
199 }
200 (ObjectId::Hash(hash), ObjectType::Tree) => {
201 let tree: Tree = rmp_serde::from_slice(&data.data)?;
202 tree.validate().map_err(|error| {
203 ProtocolError::InvalidState(format!("invalid tree object: {error}"))
204 })?;
205 if &tree.hash() != hash {
206 return Err(ProtocolError::InvalidState(
207 "tree hash mismatch".to_string(),
208 ));
209 }
210 store.put_tree_serialized(&data.data, *hash)?;
211 }
212 (ObjectId::ChangeId(change_id), ObjectType::State) => {
213 let state: State = rmp_serde::from_slice(&data.data)?;
214 if state.change_id != *change_id {
215 return Err(ProtocolError::InvalidState(format!(
216 "ChangeId mismatch: expected {}, got {}",
217 change_id, state.change_id
218 )));
219 }
220 store.put_state_serialized(&data.data, *change_id)?;
221 }
222 (_, ObjectType::Redaction) => {
223 return Err(ProtocolError::InvalidState(
228 "Redaction objects must be persisted via Repository::accept_wire_redactions, \
229 not store_received_object — signature verification is required"
230 .to_string(),
231 ));
232 }
233 (_, ObjectType::StateVisibility) => {
234 return Err(ProtocolError::InvalidState(
238 "StateVisibility objects must be persisted via Repository::accept_wire_state_visibility, \
239 not store_received_object — sidecar validation is required"
240 .to_string(),
241 ));
242 }
243 _ => {
244 return Err(ProtocolError::InvalidState(
245 "object id/type mismatch".to_string(),
246 ));
247 }
248 }
249
250 Ok(())
251}
252
253#[cfg(test)]
254mod tests {
255 use super::*;
256
257 #[test]
258 fn test_chunk_count_rounds_up() {
259 assert_eq!(chunk_count(0, 64), 0);
260 assert_eq!(chunk_count(1, 64), 1);
261 assert_eq!(chunk_count(64, 64), 1);
262 assert_eq!(chunk_count(65, 64), 2);
263 }
264
265 #[test]
266 fn test_chunk_bounds_returns_ranges() {
267 assert_eq!(chunk_bounds(100, 32, 0), Some((0, 32)));
268 assert_eq!(chunk_bounds(100, 32, 2), Some((64, 32)));
269 assert_eq!(chunk_bounds(100, 32, 3), Some((96, 4)));
270 assert_eq!(chunk_bounds(100, 32, 4), None);
271 assert_eq!(chunk_bounds(100, 0, 0), None);
272 }
273
274 #[test]
275 fn test_chunk_offset_returns_position() {
276 assert_eq!(chunk_offset(0, 64), Some(0));
277 assert_eq!(chunk_offset(3, 64), Some(192));
278 assert_eq!(chunk_offset(usize::MAX, 2), None);
279 }
280
281 #[test]
282 fn received_transfer_blob_at_limit_is_accepted() {
283 check_received_transfer_blob_size(8, 8, "redactions").unwrap();
284 }
285
286 #[test]
287 fn received_transfer_blob_over_limit_is_rejected() {
288 let error = check_received_transfer_blob_size(9, 8, "redactions").unwrap_err();
289 let message = error.to_string();
290 assert!(
291 message.contains("redactions blob exceeds receive size limit"),
292 "unexpected error: {message}"
293 );
294 assert!(
295 message.contains("9 bytes (max 8)"),
296 "unexpected error: {message}"
297 );
298 }
299
300 #[test]
301 fn received_transfer_blob_caps_are_enforced_against_production_limits() {
302 check_received_transfer_blob_size(
303 MAX_RECEIVED_REDACTIONS_BLOB_SIZE as usize,
304 MAX_RECEIVED_REDACTIONS_BLOB_SIZE,
305 "redactions",
306 )
307 .unwrap();
308 check_received_transfer_blob_size(
309 MAX_RECEIVED_STATE_VISIBILITY_BLOB_SIZE as usize,
310 MAX_RECEIVED_STATE_VISIBILITY_BLOB_SIZE,
311 "state-visibility",
312 )
313 .unwrap();
314 }
315}