1use alloc::vec;
2use alloc::vec::Vec;
3
4use super::advertisement::ResourceAdvertisement;
5use super::parts::{extract_metadata, map_hash};
6use super::proof::{build_proof_data, compute_expected_proof, compute_resource_hash};
7use super::types::*;
8use super::window::WindowState;
9use crate::buffer::types::Compressor;
10use crate::constants::*;
11
12pub struct ResourceReceiver {
17 pub status: ResourceStatus,
19 pub resource_hash: Vec<u8>,
21 pub random_hash: Vec<u8>,
23 pub original_hash: Vec<u8>,
25 pub flags: AdvFlags,
27 pub transfer_size: u64,
29 pub data_size: u64,
31 pub total_parts: usize,
33 parts: Vec<Option<Vec<u8>>>,
35 hashmap: Vec<Option<[u8; RESOURCE_MAPHASH_LEN]>>,
37 hashmap_height: usize,
39 pub waiting_for_hmu: bool,
41 pub received_count: usize,
43 pub outstanding_parts: usize,
45 consecutive_completed_height: isize,
47 sdu: usize,
49 link_rtt: f64,
51 pub retries_left: usize,
53 max_retries: usize,
55 pub rtt: Option<f64>,
57 part_timeout_factor: f64,
59 pub last_activity: f64,
61 pub req_sent: f64,
63 req_sent_bytes: usize,
65 req_resp: Option<f64>,
67 rtt_rxd_bytes: usize,
69 rtt_rxd_bytes_at_part_req: usize,
71 req_resp_rtt_rate: f64,
73 req_data_rtt_rate: f64,
75 pub eifr: Option<f64>,
77 previous_eifr: Option<f64>,
79 pub segment_index: u64,
81 pub total_segments: u64,
83 pub has_metadata: bool,
85 pub request_id: Option<Vec<u8>>,
87 pub window: WindowState,
89 pub advertisement_packet: Vec<u8>,
91 pub max_decompressed_size: usize,
93}
94
95impl ResourceReceiver {
96 pub fn from_advertisement(
98 adv_data: &[u8],
99 sdu: usize,
100 link_rtt: f64,
101 now: f64,
102 previous_window: Option<usize>,
103 previous_eifr: Option<f64>,
104 ) -> Result<Self, ResourceError> {
105 let adv = ResourceAdvertisement::unpack(adv_data)?;
106
107 if adv.resource_hash.len() != 32 {
109 return Err(ResourceError::InvalidAdvertisement);
110 }
111 if adv.random_hash.len() != RESOURCE_RANDOM_HASH_SIZE || adv.original_hash.len() != 32 {
112 return Err(ResourceError::InvalidAdvertisement);
113 }
114 if adv.transfer_size > RESOURCE_AUTO_COMPRESS_MAX_SIZE as u64
115 || adv.data_size > RESOURCE_AUTO_COMPRESS_MAX_SIZE as u64
116 {
117 return Err(ResourceError::TooLarge);
118 }
119
120 let total_parts = usize::try_from(adv.num_parts).map_err(|_| ResourceError::TooLarge)?;
121 if total_parts == 0 {
122 return Err(ResourceError::InvalidAdvertisement);
123 }
124 let max_parts = RESOURCE_AUTO_COMPRESS_MAX_SIZE
125 .div_ceil(sdu.max(1))
126 .saturating_add(RESOURCE_COLLISION_GUARD_SIZE);
127 if total_parts > max_parts {
128 return Err(ResourceError::TooLarge);
129 }
130 let parts_vec: Vec<Option<Vec<u8>>> = vec![None; total_parts];
131 let mut hashmap_vec: Vec<Option<[u8; RESOURCE_MAPHASH_LEN]>> = vec![None; total_parts];
132
133 let initial_hashes = adv.hashmap.len() / RESOURCE_MAPHASH_LEN;
135 let mut hashmap_height = 0;
136 for (i, slot) in hashmap_vec.iter_mut().enumerate().take(initial_hashes) {
137 if i < total_parts {
138 let start = i * RESOURCE_MAPHASH_LEN;
139 let end = start + RESOURCE_MAPHASH_LEN;
140 let mut h = [0u8; RESOURCE_MAPHASH_LEN];
141 h.copy_from_slice(&adv.hashmap[start..end]);
142 *slot = Some(h);
143 hashmap_height += 1;
144 }
145 }
146
147 let mut window_state = WindowState::new();
148 if let Some(prev_w) = previous_window {
149 window_state.restore(prev_w);
150 }
151
152 Ok(ResourceReceiver {
153 status: ResourceStatus::None,
154 resource_hash: adv.resource_hash,
155 random_hash: adv.random_hash,
156 original_hash: adv.original_hash,
157 flags: adv.flags,
158 transfer_size: adv.transfer_size,
159 data_size: adv.data_size,
160 total_parts,
161 parts: parts_vec,
162 hashmap: hashmap_vec,
163 hashmap_height,
164 waiting_for_hmu: false,
165 received_count: 0,
166 outstanding_parts: 0,
167 consecutive_completed_height: -1,
168 sdu,
169 link_rtt,
170 retries_left: RESOURCE_MAX_RETRIES,
171 max_retries: RESOURCE_MAX_RETRIES,
172 rtt: None,
173 part_timeout_factor: RESOURCE_PART_TIMEOUT_FACTOR,
174 last_activity: now,
175 req_sent: 0.0,
176 req_sent_bytes: 0,
177 req_resp: None,
178 rtt_rxd_bytes: 0,
179 rtt_rxd_bytes_at_part_req: 0,
180 req_resp_rtt_rate: 0.0,
181 req_data_rtt_rate: 0.0,
182 eifr: None,
183 previous_eifr,
184 segment_index: adv.segment_index,
185 total_segments: adv.total_segments,
186 has_metadata: adv.flags.has_metadata,
187 request_id: adv.request_id,
188 window: window_state,
189 advertisement_packet: adv_data.to_vec(),
190 max_decompressed_size: RESOURCE_AUTO_COMPRESS_MAX_SIZE,
191 })
192 }
193
194 pub fn accept(&mut self, now: f64) -> Vec<ResourceAction> {
196 self.status = ResourceStatus::Transferring;
197 self.last_activity = now;
198 self.request_next(now)
199 }
200
201 pub fn reject(&mut self) -> Vec<ResourceAction> {
203 self.status = ResourceStatus::Rejected;
204 vec![ResourceAction::SendCancelReceiver(
205 self.resource_hash.clone(),
206 )]
207 }
208
209 fn corrupt_actions(&mut self, error: ResourceError) -> Vec<ResourceAction> {
210 self.status = ResourceStatus::Corrupt;
211 vec![
212 ResourceAction::SendCancelReceiver(self.resource_hash.clone()),
213 ResourceAction::Failed(error),
214 ResourceAction::TeardownLink,
215 ]
216 }
217
218 pub fn receive_part(&mut self, part_data: &[u8], now: f64) -> Vec<ResourceAction> {
220 if self.status == ResourceStatus::Failed {
221 return vec![];
222 }
223
224 self.last_activity = now;
225 self.retries_left = self.max_retries;
226
227 if self.req_resp.is_none() {
229 self.req_resp = Some(now);
230 let rtt = now - self.req_sent;
231 self.part_timeout_factor = RESOURCE_PART_TIMEOUT_FACTOR_AFTER_RTT;
232
233 if self.rtt.is_none() {
234 self.rtt = Some(rtt);
235 } else if let Some(current_rtt) = self.rtt {
236 if rtt < current_rtt {
237 self.rtt = Some(f64::max(current_rtt - current_rtt * 0.05, rtt));
238 } else {
239 self.rtt = Some(f64::min(current_rtt + current_rtt * 0.05, rtt));
240 }
241 }
242
243 if rtt > 0.0 {
244 let req_resp_cost = part_data.len() + self.req_sent_bytes;
245 self.req_resp_rtt_rate = req_resp_cost as f64 / rtt;
246 self.window.update_req_resp_rate(self.req_resp_rtt_rate);
247 }
248 }
249
250 self.status = ResourceStatus::Transferring;
251
252 let part_hash = map_hash(part_data, &self.random_hash);
254
255 let consecutive_idx = if self.consecutive_completed_height >= 0 {
257 self.consecutive_completed_height as usize
258 } else {
259 0
260 };
261
262 let mut matched = false;
263 let search_end = core::cmp::min(consecutive_idx + self.window.window, self.total_parts);
264 for i in consecutive_idx..search_end {
265 if let Some(ref h) = self.hashmap[i] {
266 if *h == part_hash {
267 if self.parts[i].is_none() {
268 self.parts[i] = Some(part_data.to_vec());
269 self.rtt_rxd_bytes += part_data.len();
270 self.received_count += 1;
271 self.outstanding_parts = self.outstanding_parts.saturating_sub(1);
272
273 if i as isize == self.consecutive_completed_height + 1 {
275 self.consecutive_completed_height = i as isize;
276 }
277
278 let mut cp = (self.consecutive_completed_height + 1) as usize;
280 while cp < self.total_parts && self.parts[cp].is_some() {
281 self.consecutive_completed_height = cp as isize;
282 cp += 1;
283 }
284
285 matched = true;
286 }
287 break;
288 }
289 }
290 }
291
292 let mut actions = Vec::new();
293
294 if self.received_count == self.total_parts {
296 actions.push(ResourceAction::ProgressUpdate {
297 received: self.received_count,
298 total: self.total_parts,
299 });
300 return actions;
302 }
303
304 if matched {
305 actions.push(ResourceAction::ProgressUpdate {
306 received: self.received_count,
307 total: self.total_parts,
308 });
309 }
310
311 if self.outstanding_parts == 0 && self.received_count < self.total_parts {
313 self.window.on_window_complete();
315
316 if self.req_sent > 0.0 {
318 let rtt = now - self.req_sent;
319 let req_transferred = self.rtt_rxd_bytes - self.rtt_rxd_bytes_at_part_req;
320 if rtt > 0.0 {
321 self.req_data_rtt_rate = req_transferred as f64 / rtt;
322 self.rtt_rxd_bytes_at_part_req = self.rtt_rxd_bytes;
323 self.window.update_data_rate(self.req_data_rtt_rate);
324 }
325 }
326
327 let next_actions = self.request_next(now);
328 actions.extend(next_actions);
329 }
330
331 actions
332 }
333
334 pub fn handle_hashmap_update(&mut self, hmu_data: &[u8], now: f64) -> Vec<ResourceAction> {
338 if self.status == ResourceStatus::Failed {
339 return vec![];
340 }
341
342 self.last_activity = now;
343 self.retries_left = self.max_retries;
344
345 if hmu_data.len() <= 32 {
346 return vec![];
347 }
348
349 let payload = &hmu_data[32..];
350 let (value, _) = match crate::msgpack::unpack(payload) {
351 Ok(v) => v,
352 Err(_) => return vec![],
353 };
354
355 let arr = match value.as_array() {
356 Some(a) if a.len() >= 2 => a,
357 _ => return vec![],
358 };
359
360 let segment = match arr[0].as_uint() {
361 Some(s) => s as usize,
362 None => return vec![],
363 };
364
365 let hashmap_bytes = match arr[1].as_bin() {
366 Some(b) => b,
367 None => return vec![],
368 };
369
370 let seg_len = RESOURCE_HASHMAP_MAX_LEN;
372 let num_hashes = hashmap_bytes.len() / RESOURCE_MAPHASH_LEN;
373 for i in 0..num_hashes {
374 let idx = i + segment * seg_len;
375 if idx < self.total_parts {
376 let start = i * RESOURCE_MAPHASH_LEN;
377 let end = start + RESOURCE_MAPHASH_LEN;
378 if self.hashmap[idx].is_none() {
379 self.hashmap_height += 1;
380 }
381 let mut h = [0u8; RESOURCE_MAPHASH_LEN];
382 h.copy_from_slice(&hashmap_bytes[start..end]);
383 self.hashmap[idx] = Some(h);
384 }
385 }
386
387 self.waiting_for_hmu = false;
388 self.request_next(now)
389 }
390
391 pub fn request_next(&mut self, now: f64) -> Vec<ResourceAction> {
393 if self.status == ResourceStatus::Failed || self.waiting_for_hmu {
394 return vec![];
395 }
396
397 self.outstanding_parts = 0;
398 let mut hashmap_exhausted = RESOURCE_HASHMAP_IS_NOT_EXHAUSTED;
399 let mut requested_hashes = Vec::new();
400
401 let pn_start = (self.consecutive_completed_height + 1) as usize;
402 let search_end = core::cmp::min(pn_start + self.window.window, self.total_parts);
403 let mut i = 0;
404
405 for pn in pn_start..search_end {
406 if self.parts[pn].is_none() {
407 match self.hashmap[pn] {
408 Some(ref h) => {
409 requested_hashes.extend_from_slice(h);
410 self.outstanding_parts += 1;
411 i += 1;
412 }
413 None => {
414 hashmap_exhausted = RESOURCE_HASHMAP_IS_EXHAUSTED;
415 }
416 }
417 }
418 if i >= self.window.window || hashmap_exhausted == RESOURCE_HASHMAP_IS_EXHAUSTED {
419 break;
420 }
421 }
422
423 let mut request_data = Vec::new();
424 request_data.push(hashmap_exhausted);
425 if hashmap_exhausted == RESOURCE_HASHMAP_IS_EXHAUSTED {
426 if self.hashmap_height > 0 {
428 if let Some(ref last_hash) = self.hashmap[self.hashmap_height - 1] {
429 request_data.extend_from_slice(last_hash);
430 } else {
431 request_data.extend_from_slice(&[0u8; RESOURCE_MAPHASH_LEN]);
432 }
433 } else {
434 request_data.extend_from_slice(&[0u8; RESOURCE_MAPHASH_LEN]);
435 }
436 self.waiting_for_hmu = true;
437 }
438
439 request_data.extend_from_slice(&self.resource_hash);
440 request_data.extend_from_slice(&requested_hashes);
441
442 self.last_activity = now;
443 self.req_sent = now;
444 self.req_sent_bytes = request_data.len();
445 self.req_resp = None;
446
447 vec![ResourceAction::SendRequest(request_data)]
448 }
449
450 #[allow(clippy::type_complexity)]
452 pub fn assemble(
453 &mut self,
454 decrypt_fn: &dyn Fn(&[u8]) -> Result<Vec<u8>, ()>,
455 compressor: &dyn Compressor,
456 ) -> Vec<ResourceAction> {
457 if self.received_count != self.total_parts {
458 return vec![ResourceAction::Failed(ResourceError::InvalidState)];
459 }
460
461 self.status = ResourceStatus::Assembling;
462
463 let mut stream = Vec::new();
465 for part in &self.parts {
466 match part {
467 Some(data) => stream.extend_from_slice(data),
468 None => {
469 self.status = ResourceStatus::Failed;
470 return vec![ResourceAction::Failed(ResourceError::InvalidState)];
471 }
472 }
473 }
474
475 let decrypted = if self.flags.encrypted {
477 match decrypt_fn(&stream) {
478 Ok(d) => d,
479 Err(_) => {
480 self.status = ResourceStatus::Failed;
481 return vec![ResourceAction::Failed(ResourceError::DecryptionFailed)];
482 }
483 }
484 } else {
485 stream
486 };
487
488 if decrypted.len() < RESOURCE_RANDOM_HASH_SIZE {
490 return self.corrupt_actions(ResourceError::InvalidPart);
491 }
492 let data_after_random = &decrypted[RESOURCE_RANDOM_HASH_SIZE..];
493
494 let decompressed = if self.flags.compressed {
496 match compressor.decompress_bounded(data_after_random, self.max_decompressed_size) {
497 Ok(d) => d,
498 Err(crate::buffer::types::DecompressError::TooLarge) => {
499 return self.corrupt_actions(ResourceError::TooLarge);
500 }
501 Err(crate::buffer::types::DecompressError::InvalidData) => {
502 return self.corrupt_actions(ResourceError::DecompressionFailed);
503 }
504 }
505 } else {
506 data_after_random.to_vec()
507 };
508
509 let calculated_hash = compute_resource_hash(&decompressed, &self.random_hash);
511 if calculated_hash.as_slice() != self.resource_hash.as_slice() {
512 return self.corrupt_actions(ResourceError::HashMismatch);
513 }
514
515 let expected_proof = compute_expected_proof(&decompressed, &calculated_hash);
517 let proof_data = build_proof_data(&calculated_hash, &expected_proof);
518
519 let (data, metadata) = if self.has_metadata && self.segment_index == 1 {
521 match extract_metadata(&decompressed) {
522 Some((meta, rest)) => (rest, Some(meta)),
523 None => return self.corrupt_actions(ResourceError::InvalidPart),
524 }
525 } else {
526 (decompressed, None)
527 };
528
529 self.status = ResourceStatus::Complete;
530
531 vec![
532 ResourceAction::SendProof(proof_data),
533 ResourceAction::DataReceived { data, metadata },
534 ResourceAction::Completed,
535 ]
536 }
537
538 pub fn handle_cancel(&mut self) -> Vec<ResourceAction> {
540 if self.status < ResourceStatus::Complete {
541 self.status = ResourceStatus::Failed;
542 return vec![ResourceAction::Failed(ResourceError::Rejected)];
543 }
544 vec![]
545 }
546
547 #[allow(clippy::type_complexity)]
549 pub fn tick(
550 &mut self,
551 now: f64,
552 decrypt_fn: &dyn Fn(&[u8]) -> Result<Vec<u8>, ()>,
553 compressor: &dyn Compressor,
554 ) -> Vec<ResourceAction> {
555 if self.status >= ResourceStatus::Assembling {
556 return vec![];
557 }
558
559 if self.status == ResourceStatus::Transferring {
560 if self.received_count == self.total_parts {
562 return self.assemble(decrypt_fn, compressor);
563 }
564
565 let eifr = self.compute_eifr();
567 let retries_used = self.max_retries - self.retries_left;
568 let extra_wait = retries_used as f64 * RESOURCE_PER_RETRY_DELAY;
569 let expected_hmu_wait =
570 if eifr > 0.0 && (self.waiting_for_hmu || self.outstanding_parts == 0) {
571 (self.sdu as f64 * 8.0 * RESOURCE_HMU_WAIT_FACTOR) / eifr
572 } else {
573 0.0
574 };
575 let expected_tof = if self.outstanding_parts > 0 && eifr > 0.0 {
576 (self.outstanding_parts as f64 * self.sdu as f64 * 8.0) / eifr
577 } else if eifr > 0.0 {
578 (3.0 * self.sdu as f64) / eifr
579 } else {
580 10.0 };
582
583 let sleep_time = self.last_activity
584 + self.part_timeout_factor * expected_tof
585 + expected_hmu_wait
586 + RESOURCE_RETRY_GRACE_TIME
587 + extra_wait;
588
589 if now > sleep_time {
590 if self.retries_left > 0 {
591 self.window.on_timeout();
593 self.retries_left -= 1;
594 self.waiting_for_hmu = false;
595 return self.request_next(now);
596 } else {
597 self.status = ResourceStatus::Failed;
598 return vec![ResourceAction::Failed(ResourceError::MaxRetriesExceeded)];
599 }
600 }
601 }
602
603 vec![]
604 }
605
606 fn compute_eifr(&mut self) -> f64 {
608 let eifr = if self.req_data_rtt_rate > 0.0 {
609 self.req_data_rtt_rate * 8.0
610 } else if let Some(prev) = self.previous_eifr {
611 prev
612 } else {
613 let rtt = self.rtt.unwrap_or(self.link_rtt);
615 if rtt > 0.0 {
616 (self.sdu as f64 * 8.0) / rtt
617 } else {
618 10000.0
619 }
620 };
621 self.eifr = Some(eifr);
622 eifr
623 }
624
625 pub fn progress(&self) -> (usize, usize) {
627 (self.received_count, self.total_parts)
628 }
629
630 pub fn get_transfer_state(&self) -> (usize, Option<f64>) {
632 (self.window.window, self.eifr)
633 }
634}
635
636#[cfg(test)]
637mod tests {
638 use super::*;
639 use crate::buffer::types::{Compressor, DecompressError, NoopCompressor};
640 use crate::resource::advertisement::ResourceAdvertisement;
641 use crate::resource::sender::ResourceSender;
642
643 fn identity_encrypt(data: &[u8]) -> Vec<u8> {
644 data.to_vec()
645 }
646
647 fn identity_decrypt(data: &[u8]) -> Result<Vec<u8>, ()> {
648 Ok(data.to_vec())
649 }
650
651 struct ExpandingCompressor;
652
653 impl Compressor for ExpandingCompressor {
654 fn compress(&self, data: &[u8]) -> Option<Vec<u8>> {
655 Some(data[..data.len() / 2].to_vec())
656 }
657
658 fn decompress(&self, data: &[u8]) -> Option<Vec<u8>> {
659 self.decompress_bounded(data, usize::MAX).ok()
660 }
661
662 fn decompress_bounded(
663 &self,
664 data: &[u8],
665 max_output_size: usize,
666 ) -> Result<Vec<u8>, DecompressError> {
667 let mut out = data.to_vec();
668 out.extend_from_slice(data);
669 if out.len() > max_output_size {
670 return Err(DecompressError::TooLarge);
671 }
672 Ok(out)
673 }
674 }
675
676 fn base_timeout(receiver: &ResourceReceiver, eifr: f64) -> f64 {
677 let expected_tof = if receiver.outstanding_parts > 0 {
678 (receiver.outstanding_parts as f64 * receiver.sdu as f64 * 8.0) / eifr
679 } else {
680 (3.0 * receiver.sdu as f64) / eifr
681 };
682
683 receiver.last_activity
684 + receiver.part_timeout_factor * expected_tof
685 + RESOURCE_RETRY_GRACE_TIME
686 }
687
688 fn hmu_timeout(receiver: &ResourceReceiver, eifr: f64) -> f64 {
689 let expected_hmu_wait = (receiver.sdu as f64 * 8.0 * RESOURCE_HMU_WAIT_FACTOR) / eifr;
690 base_timeout(receiver, eifr) + expected_hmu_wait
691 }
692
693 fn make_sender_receiver() -> (ResourceSender, ResourceReceiver) {
694 let mut rng = rns_crypto::FixedRng::new(&[0x42; 64]);
695 let data = b"Hello, Resource Transfer!";
696
697 let sender = ResourceSender::new(
698 data,
699 None,
700 RESOURCE_SDU,
701 &identity_encrypt,
702 &NoopCompressor,
703 &mut rng,
704 1000.0,
705 false,
706 false,
707 None,
708 1,
709 1,
710 None,
711 0.5,
712 6.0,
713 )
714 .unwrap();
715
716 let adv_data = sender.get_advertisement(0);
717 let receiver =
718 ResourceReceiver::from_advertisement(&adv_data, RESOURCE_SDU, 0.5, 1000.0, None, None)
719 .unwrap();
720
721 (sender, receiver)
722 }
723
724 #[test]
725 fn test_from_advertisement() {
726 let (sender, receiver) = make_sender_receiver();
727 assert_eq!(receiver.total_parts, sender.total_parts());
728 assert_eq!(receiver.transfer_size, sender.transfer_size as u64);
729 assert_eq!(receiver.resource_hash, sender.resource_hash.to_vec());
730 assert!(!receiver.advertisement_packet.is_empty());
731 assert_eq!(
732 receiver.max_decompressed_size,
733 RESOURCE_AUTO_COMPRESS_MAX_SIZE
734 );
735 }
736
737 #[test]
738 fn test_from_advertisement_rejects_huge_part_count() {
739 let adv = ResourceAdvertisement {
740 transfer_size: 1024,
741 data_size: 1024,
742 num_parts: u64::MAX,
743 resource_hash: vec![0x11; 32],
744 random_hash: vec![0x22; RESOURCE_RANDOM_HASH_SIZE],
745 original_hash: vec![0x33; 32],
746 hashmap: vec![0x44; RESOURCE_MAPHASH_LEN],
747 flags: AdvFlags {
748 encrypted: false,
749 compressed: false,
750 split: false,
751 is_request: false,
752 is_response: false,
753 has_metadata: false,
754 },
755 segment_index: 1,
756 total_segments: 1,
757 request_id: None,
758 };
759
760 let err = match ResourceReceiver::from_advertisement(
761 &adv.pack(0),
762 RESOURCE_SDU,
763 0.5,
764 1000.0,
765 None,
766 None,
767 ) {
768 Ok(_) => panic!("huge advertisement should be rejected"),
769 Err(err) => err,
770 };
771
772 assert_eq!(err, ResourceError::TooLarge);
773 }
774
775 #[test]
776 fn test_from_advertisement_rejects_oversized_transfer() {
777 let adv = ResourceAdvertisement {
778 transfer_size: RESOURCE_AUTO_COMPRESS_MAX_SIZE as u64 + 1,
779 data_size: 1024,
780 num_parts: 1,
781 resource_hash: vec![0x11; 32],
782 random_hash: vec![0x22; RESOURCE_RANDOM_HASH_SIZE],
783 original_hash: vec![0x33; 32],
784 hashmap: vec![0x44; RESOURCE_MAPHASH_LEN],
785 flags: AdvFlags {
786 encrypted: false,
787 compressed: false,
788 split: false,
789 is_request: false,
790 is_response: false,
791 has_metadata: false,
792 },
793 segment_index: 1,
794 total_segments: 1,
795 request_id: None,
796 };
797
798 let err = match ResourceReceiver::from_advertisement(
799 &adv.pack(0),
800 RESOURCE_SDU,
801 0.5,
802 1000.0,
803 None,
804 None,
805 ) {
806 Ok(_) => panic!("oversized advertisement should be rejected"),
807 Err(err) => err,
808 };
809
810 assert_eq!(err, ResourceError::TooLarge);
811 }
812
813 #[test]
814 fn test_accept() {
815 let (_, mut receiver) = make_sender_receiver();
816 let actions = receiver.accept(1000.0);
817 assert_eq!(receiver.status, ResourceStatus::Transferring);
818 assert!(!actions.is_empty());
819 assert!(actions
820 .iter()
821 .any(|a| matches!(a, ResourceAction::SendRequest(_))));
822 }
823
824 #[test]
825 fn test_reject() {
826 let (_, mut receiver) = make_sender_receiver();
827 let actions = receiver.reject();
828 assert_eq!(receiver.status, ResourceStatus::Rejected);
829 assert!(actions
830 .iter()
831 .any(|a| matches!(a, ResourceAction::SendCancelReceiver(_))));
832 }
833
834 #[test]
835 fn test_receive_part_stores() {
836 let (mut sender, mut receiver) = make_sender_receiver();
837 receiver.accept(1000.0);
838
839 let mut request = Vec::new();
842 request.push(RESOURCE_HASHMAP_IS_NOT_EXHAUSTED);
843 request.extend_from_slice(&sender.resource_hash);
844 request.extend_from_slice(&sender.part_hashes[0]);
845
846 let send_actions = sender.handle_request(&request, 1001.0);
847 let part_data = send_actions
848 .iter()
849 .find_map(|a| match a {
850 ResourceAction::SendPart(d) => Some(d.clone()),
851 _ => None,
852 })
853 .unwrap();
854
855 receiver.req_sent = 1000.5;
857 let _actions = receiver.receive_part(&part_data, 1001.0);
858 assert_eq!(receiver.received_count, 1);
859 }
860
861 #[test]
862 fn test_consecutive_completed_height() {
863 let (sender, mut receiver) = make_sender_receiver();
864 receiver.accept(1000.0);
865
866 if sender.total_parts() > 1 {
868 assert_eq!(receiver.consecutive_completed_height, -1);
870 }
871 }
872
873 #[test]
874 fn test_handle_cancel() {
875 let (_, mut receiver) = make_sender_receiver();
876 receiver.accept(1000.0);
877 let _actions = receiver.handle_cancel();
878 assert_eq!(receiver.status, ResourceStatus::Failed);
879 }
880
881 #[test]
882 fn test_assemble_compressed_resource_rejects_oversized_decompression() {
883 let data = b"oversized!";
884 let mut rng = rns_crypto::FixedRng::new(&[0x93; 64]);
885
886 let mut sender = ResourceSender::new(
887 data,
888 None,
889 RESOURCE_SDU,
890 &identity_encrypt,
891 &ExpandingCompressor,
892 &mut rng,
893 1000.0,
894 true,
895 false,
896 None,
897 1,
898 1,
899 None,
900 0.5,
901 6.0,
902 )
903 .unwrap();
904
905 let adv = sender.get_advertisement(0);
906 let mut receiver =
907 ResourceReceiver::from_advertisement(&adv, RESOURCE_SDU, 0.5, 1000.0, None, None)
908 .unwrap();
909 receiver.max_decompressed_size = data.len() - 1;
910
911 let request_data = receiver
912 .accept(1001.0)
913 .into_iter()
914 .find_map(|a| match a {
915 ResourceAction::SendRequest(d) => Some(d),
916 _ => None,
917 })
918 .unwrap();
919
920 let send_actions = sender.handle_request(&request_data, 1002.0);
921 receiver.req_sent = 1001.0;
922 for action in &send_actions {
923 if let ResourceAction::SendPart(part_data) = action {
924 receiver.receive_part(part_data, 1003.0);
925 }
926 }
927
928 let assemble_actions = receiver.assemble(&identity_decrypt, &ExpandingCompressor);
929 assert_eq!(receiver.status, ResourceStatus::Corrupt);
930 assert!(assemble_actions
931 .iter()
932 .any(|a| matches!(a, ResourceAction::SendCancelReceiver(_))));
933 assert!(assemble_actions
934 .iter()
935 .any(|a| matches!(a, ResourceAction::Failed(ResourceError::TooLarge))));
936 assert!(assemble_actions
937 .iter()
938 .any(|a| matches!(a, ResourceAction::TeardownLink)));
939 }
940
941 #[test]
942 fn test_full_transfer_small_data() {
943 let data = b"small data";
945 let mut rng = rns_crypto::FixedRng::new(&[0x77; 64]);
946
947 let mut sender = ResourceSender::new(
948 data,
949 None,
950 RESOURCE_SDU,
951 &identity_encrypt,
952 &NoopCompressor,
953 &mut rng,
954 1000.0,
955 false,
956 false,
957 None,
958 1,
959 1,
960 None,
961 0.5,
962 6.0,
963 )
964 .unwrap();
965
966 let adv = sender.get_advertisement(0);
967 let mut receiver =
968 ResourceReceiver::from_advertisement(&adv, RESOURCE_SDU, 0.5, 1000.0, None, None)
969 .unwrap();
970
971 let req_actions = receiver.accept(1001.0);
973 assert_eq!(receiver.status, ResourceStatus::Transferring);
974
975 let request_data = req_actions
977 .iter()
978 .find_map(|a| match a {
979 ResourceAction::SendRequest(d) => Some(d.clone()),
980 _ => None,
981 })
982 .unwrap();
983
984 let send_actions = sender.handle_request(&request_data, 1002.0);
986
987 receiver.req_sent = 1001.0;
989 for action in &send_actions {
990 if let ResourceAction::SendPart(part_data) = action {
991 receiver.receive_part(part_data, 1003.0);
992 }
993 }
994
995 assert_eq!(receiver.received_count, receiver.total_parts);
996
997 let assemble_actions = receiver.assemble(&identity_decrypt, &NoopCompressor);
999
1000 let has_proof = assemble_actions
1002 .iter()
1003 .any(|a| matches!(a, ResourceAction::SendProof(_)));
1004 let has_data = assemble_actions
1005 .iter()
1006 .any(|a| matches!(a, ResourceAction::DataReceived { .. }));
1007 let has_complete = assemble_actions
1008 .iter()
1009 .any(|a| matches!(a, ResourceAction::Completed));
1010
1011 assert!(has_proof, "Should send proof");
1012 assert!(has_data, "Should return data");
1013 assert!(has_complete, "Should be completed");
1014
1015 let received_data = assemble_actions
1017 .iter()
1018 .find_map(|a| match a {
1019 ResourceAction::DataReceived { data, .. } => Some(data.clone()),
1020 _ => None,
1021 })
1022 .unwrap();
1023 assert_eq!(received_data, data);
1024
1025 let proof_data = assemble_actions
1027 .iter()
1028 .find_map(|a| match a {
1029 ResourceAction::SendProof(d) => Some(d.clone()),
1030 _ => None,
1031 })
1032 .unwrap();
1033
1034 let _proof_actions = sender.handle_proof(&proof_data, 1004.0);
1035 assert_eq!(sender.status, ResourceStatus::Complete);
1036 }
1037
1038 #[test]
1039 fn test_full_transfer_with_metadata() {
1040 let data = b"data with metadata";
1041 let metadata = b"some metadata";
1042 let mut rng = rns_crypto::FixedRng::new(&[0x88; 64]);
1043
1044 let mut sender = ResourceSender::new(
1045 data,
1046 Some(metadata),
1047 RESOURCE_SDU,
1048 &identity_encrypt,
1049 &NoopCompressor,
1050 &mut rng,
1051 1000.0,
1052 false,
1053 false,
1054 None,
1055 1,
1056 1,
1057 None,
1058 0.5,
1059 6.0,
1060 )
1061 .unwrap();
1062
1063 assert!(sender.flags.has_metadata);
1064
1065 let adv = sender.get_advertisement(0);
1066 let mut receiver =
1067 ResourceReceiver::from_advertisement(&adv, RESOURCE_SDU, 0.5, 1000.0, None, None)
1068 .unwrap();
1069
1070 assert!(receiver.has_metadata);
1071
1072 let req_actions = receiver.accept(1001.0);
1074 let request_data = req_actions
1075 .iter()
1076 .find_map(|a| match a {
1077 ResourceAction::SendRequest(d) => Some(d.clone()),
1078 _ => None,
1079 })
1080 .unwrap();
1081
1082 let send_actions = sender.handle_request(&request_data, 1002.0);
1083 receiver.req_sent = 1001.0;
1084 for action in &send_actions {
1085 if let ResourceAction::SendPart(part_data) = action {
1086 receiver.receive_part(part_data, 1003.0);
1087 }
1088 }
1089
1090 let assemble_actions = receiver.assemble(&identity_decrypt, &NoopCompressor);
1091
1092 let (recv_data, recv_meta) = assemble_actions
1093 .iter()
1094 .find_map(|a| match a {
1095 ResourceAction::DataReceived { data, metadata } => {
1096 Some((data.clone(), metadata.clone()))
1097 }
1098 _ => None,
1099 })
1100 .unwrap();
1101
1102 assert_eq!(recv_data, data);
1103 assert_eq!(recv_meta.unwrap(), metadata);
1104 }
1105
1106 #[test]
1107 fn test_previous_window_restore() {
1108 let (_, _receiver) = make_sender_receiver();
1109 let adv_data = {
1111 let mut rng = rns_crypto::FixedRng::new(&[0x42; 64]);
1112 let sender = ResourceSender::new(
1113 b"test",
1114 None,
1115 RESOURCE_SDU,
1116 &identity_encrypt,
1117 &NoopCompressor,
1118 &mut rng,
1119 1000.0,
1120 false,
1121 false,
1122 None,
1123 1,
1124 1,
1125 None,
1126 0.5,
1127 6.0,
1128 )
1129 .unwrap();
1130 sender.get_advertisement(0)
1131 };
1132
1133 let receiver = ResourceReceiver::from_advertisement(
1134 &adv_data,
1135 RESOURCE_SDU,
1136 0.5,
1137 1000.0,
1138 Some(8),
1139 Some(50000.0),
1140 )
1141 .unwrap();
1142 assert_eq!(receiver.window.window, 8);
1143 }
1144
1145 #[test]
1146 fn test_tick_timeout_retry() {
1147 let (_, mut receiver) = make_sender_receiver();
1148 receiver.accept(1000.0);
1149 receiver.rtt = Some(0.1);
1150
1151 let actions = receiver.tick(9999.0, &identity_decrypt, &NoopCompressor);
1153 assert!(!actions.is_empty() || receiver.retries_left < RESOURCE_MAX_RETRIES);
1155 }
1156
1157 #[test]
1158 fn test_tick_waiting_for_hmu_gets_extra_timeout() {
1159 let (_, mut receiver) = make_sender_receiver();
1160 receiver.accept(1000.0);
1161 receiver.waiting_for_hmu = true;
1162 receiver.outstanding_parts = 0;
1163 let eifr = 10_000.0;
1164 receiver.previous_eifr = Some(eifr);
1165 receiver.last_activity = 1000.0;
1166
1167 let old_timeout = base_timeout(&receiver, eifr);
1168 let now = old_timeout + 0.01;
1169
1170 let actions = receiver.tick(now, &identity_decrypt, &NoopCompressor);
1171
1172 assert!(actions.is_empty(), "receiver should keep waiting for HMU");
1173 assert_eq!(receiver.retries_left, RESOURCE_MAX_RETRIES);
1174 assert_eq!(receiver.status, ResourceStatus::Transferring);
1175 }
1176
1177 #[test]
1178 fn test_tick_zero_outstanding_parts_gets_extra_timeout_without_hmu_flag() {
1179 let (_, mut receiver) = make_sender_receiver();
1180 receiver.accept(1000.0);
1181 receiver.waiting_for_hmu = false;
1182 receiver.outstanding_parts = 0;
1183 let eifr = 10_000.0;
1184 receiver.previous_eifr = Some(eifr);
1185 receiver.last_activity = 1000.0;
1186
1187 let old_timeout = base_timeout(&receiver, eifr);
1188 let now = old_timeout + 0.01;
1189
1190 let actions = receiver.tick(now, &identity_decrypt, &NoopCompressor);
1191
1192 assert!(
1193 actions.is_empty(),
1194 "receiver should keep waiting for follow-up hashmap data"
1195 );
1196 assert_eq!(receiver.retries_left, RESOURCE_MAX_RETRIES);
1197 assert_eq!(receiver.status, ResourceStatus::Transferring);
1198 }
1199
1200 #[test]
1201 fn test_tick_waiting_for_hmu_retries_after_extended_timeout() {
1202 let (_, mut receiver) = make_sender_receiver();
1203 receiver.accept(1000.0);
1204 receiver.waiting_for_hmu = true;
1205 receiver.outstanding_parts = 0;
1206 let eifr = 10_000.0;
1207 receiver.previous_eifr = Some(eifr);
1208 receiver.last_activity = 1000.0;
1209
1210 let now = hmu_timeout(&receiver, eifr) + 0.01;
1211 let _actions = receiver.tick(now, &identity_decrypt, &NoopCompressor);
1212
1213 assert_eq!(receiver.retries_left, RESOURCE_MAX_RETRIES - 1);
1214 assert!(!receiver.waiting_for_hmu);
1215 }
1216
1217 #[test]
1218 fn test_tick_inflight_parts_do_not_get_hmu_timeout_extension() {
1219 let (_, mut receiver) = make_sender_receiver();
1220 receiver.accept(1000.0);
1221 receiver.waiting_for_hmu = false;
1222 receiver.outstanding_parts = 2;
1223 let eifr = 10_000.0;
1224 receiver.previous_eifr = Some(eifr);
1225 receiver.last_activity = 1000.0;
1226
1227 let now = base_timeout(&receiver, eifr) + 0.01;
1228 let _actions = receiver.tick(now, &identity_decrypt, &NoopCompressor);
1229
1230 assert_eq!(receiver.retries_left, RESOURCE_MAX_RETRIES - 1);
1231 }
1232
1233 #[test]
1234 fn test_tick_max_retries_exceeded() {
1235 let (_, mut receiver) = make_sender_receiver();
1236 receiver.accept(1000.0);
1237 receiver.retries_left = 0;
1238 receiver.rtt = Some(0.001);
1239 receiver.eifr = Some(100000.0);
1240
1241 let _actions = receiver.tick(9999.0, &identity_decrypt, &NoopCompressor);
1242 assert_eq!(receiver.status, ResourceStatus::Failed);
1243 }
1244}