1use alloc::vec;
2use alloc::vec::Vec;
3
4use super::advertisement::ResourceAdvertisement;
5use super::parts::{extract_metadata, map_hash};
6use super::proof::{build_proof_data, compute_expected_proof, compute_resource_hash};
7use super::types::*;
8use super::window::WindowState;
9use crate::buffer::types::Compressor;
10use crate::constants::*;
11
12pub struct ResourceReceiver {
17 pub status: ResourceStatus,
19 pub resource_hash: Vec<u8>,
21 pub random_hash: Vec<u8>,
23 pub original_hash: Vec<u8>,
25 pub flags: AdvFlags,
27 pub transfer_size: u64,
29 pub data_size: u64,
31 pub total_parts: usize,
33 parts: Vec<Option<Vec<u8>>>,
35 hashmap: Vec<Option<[u8; RESOURCE_MAPHASH_LEN]>>,
37 hashmap_height: usize,
39 pub waiting_for_hmu: bool,
41 pub received_count: usize,
43 pub outstanding_parts: usize,
45 consecutive_completed_height: isize,
47 sdu: usize,
49 link_rtt: f64,
51 pub retries_left: usize,
53 max_retries: usize,
55 pub rtt: Option<f64>,
57 part_timeout_factor: f64,
59 pub last_activity: f64,
61 pub req_sent: f64,
63 req_sent_bytes: usize,
65 req_resp: Option<f64>,
67 rtt_rxd_bytes: usize,
69 rtt_rxd_bytes_at_part_req: usize,
71 req_resp_rtt_rate: f64,
73 req_data_rtt_rate: f64,
75 pub eifr: Option<f64>,
77 previous_eifr: Option<f64>,
79 pub segment_index: u64,
81 pub total_segments: u64,
83 pub has_metadata: bool,
85 pub request_id: Option<Vec<u8>>,
87 pub window: WindowState,
89 pub advertisement_packet: Vec<u8>,
91 pub max_decompressed_size: usize,
93}
94
95impl ResourceReceiver {
96 pub fn from_advertisement(
98 adv_data: &[u8],
99 sdu: usize,
100 link_rtt: f64,
101 now: f64,
102 previous_window: Option<usize>,
103 previous_eifr: Option<f64>,
104 ) -> Result<Self, ResourceError> {
105 let adv = ResourceAdvertisement::unpack(adv_data)?;
106
107 if adv.resource_hash.len() != 32 {
109 return Err(ResourceError::InvalidAdvertisement);
110 }
111
112 let total_parts = adv.num_parts as usize;
113 let parts_vec: Vec<Option<Vec<u8>>> = vec![None; total_parts];
114 let mut hashmap_vec: Vec<Option<[u8; RESOURCE_MAPHASH_LEN]>> = vec![None; total_parts];
115
116 let initial_hashes = adv.hashmap.len() / RESOURCE_MAPHASH_LEN;
118 let mut hashmap_height = 0;
119 for (i, slot) in hashmap_vec.iter_mut().enumerate().take(initial_hashes) {
120 if i < total_parts {
121 let start = i * RESOURCE_MAPHASH_LEN;
122 let end = start + RESOURCE_MAPHASH_LEN;
123 let mut h = [0u8; RESOURCE_MAPHASH_LEN];
124 h.copy_from_slice(&adv.hashmap[start..end]);
125 *slot = Some(h);
126 hashmap_height += 1;
127 }
128 }
129
130 let mut window_state = WindowState::new();
131 if let Some(prev_w) = previous_window {
132 window_state.restore(prev_w);
133 }
134
135 Ok(ResourceReceiver {
136 status: ResourceStatus::None,
137 resource_hash: adv.resource_hash,
138 random_hash: adv.random_hash,
139 original_hash: adv.original_hash,
140 flags: adv.flags,
141 transfer_size: adv.transfer_size,
142 data_size: adv.data_size,
143 total_parts,
144 parts: parts_vec,
145 hashmap: hashmap_vec,
146 hashmap_height,
147 waiting_for_hmu: false,
148 received_count: 0,
149 outstanding_parts: 0,
150 consecutive_completed_height: -1,
151 sdu,
152 link_rtt,
153 retries_left: RESOURCE_MAX_RETRIES,
154 max_retries: RESOURCE_MAX_RETRIES,
155 rtt: None,
156 part_timeout_factor: RESOURCE_PART_TIMEOUT_FACTOR,
157 last_activity: now,
158 req_sent: 0.0,
159 req_sent_bytes: 0,
160 req_resp: None,
161 rtt_rxd_bytes: 0,
162 rtt_rxd_bytes_at_part_req: 0,
163 req_resp_rtt_rate: 0.0,
164 req_data_rtt_rate: 0.0,
165 eifr: None,
166 previous_eifr,
167 segment_index: adv.segment_index,
168 total_segments: adv.total_segments,
169 has_metadata: adv.flags.has_metadata,
170 request_id: adv.request_id,
171 window: window_state,
172 advertisement_packet: adv_data.to_vec(),
173 max_decompressed_size: RESOURCE_AUTO_COMPRESS_MAX_SIZE,
174 })
175 }
176
177 pub fn accept(&mut self, now: f64) -> Vec<ResourceAction> {
179 self.status = ResourceStatus::Transferring;
180 self.last_activity = now;
181 self.request_next(now)
182 }
183
184 pub fn reject(&mut self) -> Vec<ResourceAction> {
186 self.status = ResourceStatus::Rejected;
187 vec![ResourceAction::SendCancelReceiver(
188 self.resource_hash.clone(),
189 )]
190 }
191
192 fn corrupt_actions(&mut self, error: ResourceError) -> Vec<ResourceAction> {
193 self.status = ResourceStatus::Corrupt;
194 vec![
195 ResourceAction::SendCancelReceiver(self.resource_hash.clone()),
196 ResourceAction::Failed(error),
197 ResourceAction::TeardownLink,
198 ]
199 }
200
201 pub fn receive_part(&mut self, part_data: &[u8], now: f64) -> Vec<ResourceAction> {
203 if self.status == ResourceStatus::Failed {
204 return vec![];
205 }
206
207 self.last_activity = now;
208 self.retries_left = self.max_retries;
209
210 if self.req_resp.is_none() {
212 self.req_resp = Some(now);
213 let rtt = now - self.req_sent;
214 self.part_timeout_factor = RESOURCE_PART_TIMEOUT_FACTOR_AFTER_RTT;
215
216 if self.rtt.is_none() {
217 self.rtt = Some(rtt);
218 } else if let Some(current_rtt) = self.rtt {
219 if rtt < current_rtt {
220 self.rtt = Some(f64::max(current_rtt - current_rtt * 0.05, rtt));
221 } else {
222 self.rtt = Some(f64::min(current_rtt + current_rtt * 0.05, rtt));
223 }
224 }
225
226 if rtt > 0.0 {
227 let req_resp_cost = part_data.len() + self.req_sent_bytes;
228 self.req_resp_rtt_rate = req_resp_cost as f64 / rtt;
229 self.window.update_req_resp_rate(self.req_resp_rtt_rate);
230 }
231 }
232
233 self.status = ResourceStatus::Transferring;
234
235 let part_hash = map_hash(part_data, &self.random_hash);
237
238 let consecutive_idx = if self.consecutive_completed_height >= 0 {
240 self.consecutive_completed_height as usize
241 } else {
242 0
243 };
244
245 let mut matched = false;
246 let search_end = core::cmp::min(consecutive_idx + self.window.window, self.total_parts);
247 for i in consecutive_idx..search_end {
248 if let Some(ref h) = self.hashmap[i] {
249 if *h == part_hash {
250 if self.parts[i].is_none() {
251 self.parts[i] = Some(part_data.to_vec());
252 self.rtt_rxd_bytes += part_data.len();
253 self.received_count += 1;
254 self.outstanding_parts = self.outstanding_parts.saturating_sub(1);
255
256 if i as isize == self.consecutive_completed_height + 1 {
258 self.consecutive_completed_height = i as isize;
259 }
260
261 let mut cp = (self.consecutive_completed_height + 1) as usize;
263 while cp < self.total_parts && self.parts[cp].is_some() {
264 self.consecutive_completed_height = cp as isize;
265 cp += 1;
266 }
267
268 matched = true;
269 }
270 break;
271 }
272 }
273 }
274
275 let mut actions = Vec::new();
276
277 if self.received_count == self.total_parts {
279 actions.push(ResourceAction::ProgressUpdate {
280 received: self.received_count,
281 total: self.total_parts,
282 });
283 return actions;
285 }
286
287 if matched {
288 actions.push(ResourceAction::ProgressUpdate {
289 received: self.received_count,
290 total: self.total_parts,
291 });
292 }
293
294 if self.outstanding_parts == 0 && self.received_count < self.total_parts {
296 self.window.on_window_complete();
298
299 if self.req_sent > 0.0 {
301 let rtt = now - self.req_sent;
302 let req_transferred = self.rtt_rxd_bytes - self.rtt_rxd_bytes_at_part_req;
303 if rtt > 0.0 {
304 self.req_data_rtt_rate = req_transferred as f64 / rtt;
305 self.rtt_rxd_bytes_at_part_req = self.rtt_rxd_bytes;
306 self.window.update_data_rate(self.req_data_rtt_rate);
307 }
308 }
309
310 let next_actions = self.request_next(now);
311 actions.extend(next_actions);
312 }
313
314 actions
315 }
316
317 pub fn handle_hashmap_update(&mut self, hmu_data: &[u8], now: f64) -> Vec<ResourceAction> {
321 if self.status == ResourceStatus::Failed {
322 return vec![];
323 }
324
325 self.last_activity = now;
326 self.retries_left = self.max_retries;
327
328 if hmu_data.len() <= 32 {
329 return vec![];
330 }
331
332 let payload = &hmu_data[32..];
333 let (value, _) = match crate::msgpack::unpack(payload) {
334 Ok(v) => v,
335 Err(_) => return vec![],
336 };
337
338 let arr = match value.as_array() {
339 Some(a) if a.len() >= 2 => a,
340 _ => return vec![],
341 };
342
343 let segment = match arr[0].as_uint() {
344 Some(s) => s as usize,
345 None => return vec![],
346 };
347
348 let hashmap_bytes = match arr[1].as_bin() {
349 Some(b) => b,
350 None => return vec![],
351 };
352
353 let seg_len = RESOURCE_HASHMAP_MAX_LEN;
355 let num_hashes = hashmap_bytes.len() / RESOURCE_MAPHASH_LEN;
356 for i in 0..num_hashes {
357 let idx = i + segment * seg_len;
358 if idx < self.total_parts {
359 let start = i * RESOURCE_MAPHASH_LEN;
360 let end = start + RESOURCE_MAPHASH_LEN;
361 if self.hashmap[idx].is_none() {
362 self.hashmap_height += 1;
363 }
364 let mut h = [0u8; RESOURCE_MAPHASH_LEN];
365 h.copy_from_slice(&hashmap_bytes[start..end]);
366 self.hashmap[idx] = Some(h);
367 }
368 }
369
370 self.waiting_for_hmu = false;
371 self.request_next(now)
372 }
373
374 pub fn request_next(&mut self, now: f64) -> Vec<ResourceAction> {
376 if self.status == ResourceStatus::Failed || self.waiting_for_hmu {
377 return vec![];
378 }
379
380 self.outstanding_parts = 0;
381 let mut hashmap_exhausted = RESOURCE_HASHMAP_IS_NOT_EXHAUSTED;
382 let mut requested_hashes = Vec::new();
383
384 let pn_start = (self.consecutive_completed_height + 1) as usize;
385 let search_end = core::cmp::min(pn_start + self.window.window, self.total_parts);
386 let mut i = 0;
387
388 for pn in pn_start..search_end {
389 if self.parts[pn].is_none() {
390 match self.hashmap[pn] {
391 Some(ref h) => {
392 requested_hashes.extend_from_slice(h);
393 self.outstanding_parts += 1;
394 i += 1;
395 }
396 None => {
397 hashmap_exhausted = RESOURCE_HASHMAP_IS_EXHAUSTED;
398 }
399 }
400 }
401 if i >= self.window.window || hashmap_exhausted == RESOURCE_HASHMAP_IS_EXHAUSTED {
402 break;
403 }
404 }
405
406 let mut request_data = Vec::new();
407 request_data.push(hashmap_exhausted);
408 if hashmap_exhausted == RESOURCE_HASHMAP_IS_EXHAUSTED {
409 if self.hashmap_height > 0 {
411 if let Some(ref last_hash) = self.hashmap[self.hashmap_height - 1] {
412 request_data.extend_from_slice(last_hash);
413 } else {
414 request_data.extend_from_slice(&[0u8; RESOURCE_MAPHASH_LEN]);
415 }
416 } else {
417 request_data.extend_from_slice(&[0u8; RESOURCE_MAPHASH_LEN]);
418 }
419 self.waiting_for_hmu = true;
420 }
421
422 request_data.extend_from_slice(&self.resource_hash);
423 request_data.extend_from_slice(&requested_hashes);
424
425 self.last_activity = now;
426 self.req_sent = now;
427 self.req_sent_bytes = request_data.len();
428 self.req_resp = None;
429
430 vec![ResourceAction::SendRequest(request_data)]
431 }
432
433 #[allow(clippy::type_complexity)]
435 pub fn assemble(
436 &mut self,
437 decrypt_fn: &dyn Fn(&[u8]) -> Result<Vec<u8>, ()>,
438 compressor: &dyn Compressor,
439 ) -> Vec<ResourceAction> {
440 if self.received_count != self.total_parts {
441 return vec![ResourceAction::Failed(ResourceError::InvalidState)];
442 }
443
444 self.status = ResourceStatus::Assembling;
445
446 let mut stream = Vec::new();
448 for part in &self.parts {
449 match part {
450 Some(data) => stream.extend_from_slice(data),
451 None => {
452 self.status = ResourceStatus::Failed;
453 return vec![ResourceAction::Failed(ResourceError::InvalidState)];
454 }
455 }
456 }
457
458 let decrypted = if self.flags.encrypted {
460 match decrypt_fn(&stream) {
461 Ok(d) => d,
462 Err(_) => {
463 self.status = ResourceStatus::Failed;
464 return vec![ResourceAction::Failed(ResourceError::DecryptionFailed)];
465 }
466 }
467 } else {
468 stream
469 };
470
471 if decrypted.len() < RESOURCE_RANDOM_HASH_SIZE {
473 return self.corrupt_actions(ResourceError::InvalidPart);
474 }
475 let data_after_random = &decrypted[RESOURCE_RANDOM_HASH_SIZE..];
476
477 let decompressed = if self.flags.compressed {
479 match compressor.decompress_bounded(data_after_random, self.max_decompressed_size) {
480 Ok(d) => d,
481 Err(crate::buffer::types::DecompressError::TooLarge) => {
482 return self.corrupt_actions(ResourceError::TooLarge);
483 }
484 Err(crate::buffer::types::DecompressError::InvalidData) => {
485 return self.corrupt_actions(ResourceError::DecompressionFailed);
486 }
487 }
488 } else {
489 data_after_random.to_vec()
490 };
491
492 let calculated_hash = compute_resource_hash(&decompressed, &self.random_hash);
494 if calculated_hash.as_slice() != self.resource_hash.as_slice() {
495 return self.corrupt_actions(ResourceError::HashMismatch);
496 }
497
498 let expected_proof = compute_expected_proof(&decompressed, &calculated_hash);
500 let proof_data = build_proof_data(&calculated_hash, &expected_proof);
501
502 let (data, metadata) = if self.has_metadata && self.segment_index == 1 {
504 match extract_metadata(&decompressed) {
505 Some((meta, rest)) => (rest, Some(meta)),
506 None => return self.corrupt_actions(ResourceError::InvalidPart),
507 }
508 } else {
509 (decompressed, None)
510 };
511
512 self.status = ResourceStatus::Complete;
513
514 vec![
515 ResourceAction::SendProof(proof_data),
516 ResourceAction::DataReceived { data, metadata },
517 ResourceAction::Completed,
518 ]
519 }
520
521 pub fn handle_cancel(&mut self) -> Vec<ResourceAction> {
523 if self.status < ResourceStatus::Complete {
524 self.status = ResourceStatus::Failed;
525 return vec![ResourceAction::Failed(ResourceError::Rejected)];
526 }
527 vec![]
528 }
529
530 #[allow(clippy::type_complexity)]
532 pub fn tick(
533 &mut self,
534 now: f64,
535 decrypt_fn: &dyn Fn(&[u8]) -> Result<Vec<u8>, ()>,
536 compressor: &dyn Compressor,
537 ) -> Vec<ResourceAction> {
538 if self.status >= ResourceStatus::Assembling {
539 return vec![];
540 }
541
542 if self.status == ResourceStatus::Transferring {
543 if self.received_count == self.total_parts {
545 return self.assemble(decrypt_fn, compressor);
546 }
547
548 let eifr = self.compute_eifr();
550 let retries_used = self.max_retries - self.retries_left;
551 let extra_wait = retries_used as f64 * RESOURCE_PER_RETRY_DELAY;
552 let expected_hmu_wait =
553 if eifr > 0.0 && (self.waiting_for_hmu || self.outstanding_parts == 0) {
554 (self.sdu as f64 * 8.0 * RESOURCE_HMU_WAIT_FACTOR) / eifr
555 } else {
556 0.0
557 };
558 let expected_tof = if self.outstanding_parts > 0 && eifr > 0.0 {
559 (self.outstanding_parts as f64 * self.sdu as f64 * 8.0) / eifr
560 } else if eifr > 0.0 {
561 (3.0 * self.sdu as f64) / eifr
562 } else {
563 10.0 };
565
566 let sleep_time = self.last_activity
567 + self.part_timeout_factor * expected_tof
568 + expected_hmu_wait
569 + RESOURCE_RETRY_GRACE_TIME
570 + extra_wait;
571
572 if now > sleep_time {
573 if self.retries_left > 0 {
574 self.window.on_timeout();
576 self.retries_left -= 1;
577 self.waiting_for_hmu = false;
578 return self.request_next(now);
579 } else {
580 self.status = ResourceStatus::Failed;
581 return vec![ResourceAction::Failed(ResourceError::MaxRetriesExceeded)];
582 }
583 }
584 }
585
586 vec![]
587 }
588
589 fn compute_eifr(&mut self) -> f64 {
591 let eifr = if self.req_data_rtt_rate > 0.0 {
592 self.req_data_rtt_rate * 8.0
593 } else if let Some(prev) = self.previous_eifr {
594 prev
595 } else {
596 let rtt = self.rtt.unwrap_or(self.link_rtt);
598 if rtt > 0.0 {
599 (self.sdu as f64 * 8.0) / rtt
600 } else {
601 10000.0
602 }
603 };
604 self.eifr = Some(eifr);
605 eifr
606 }
607
608 pub fn progress(&self) -> (usize, usize) {
610 (self.received_count, self.total_parts)
611 }
612
613 pub fn get_transfer_state(&self) -> (usize, Option<f64>) {
615 (self.window.window, self.eifr)
616 }
617}
618
619#[cfg(test)]
620mod tests {
621 use super::*;
622 use crate::buffer::types::{Compressor, DecompressError, NoopCompressor};
623 use crate::resource::sender::ResourceSender;
624
625 fn identity_encrypt(data: &[u8]) -> Vec<u8> {
626 data.to_vec()
627 }
628
629 fn identity_decrypt(data: &[u8]) -> Result<Vec<u8>, ()> {
630 Ok(data.to_vec())
631 }
632
633 struct ExpandingCompressor;
634
635 impl Compressor for ExpandingCompressor {
636 fn compress(&self, data: &[u8]) -> Option<Vec<u8>> {
637 Some(data[..data.len() / 2].to_vec())
638 }
639
640 fn decompress(&self, data: &[u8]) -> Option<Vec<u8>> {
641 self.decompress_bounded(data, usize::MAX).ok()
642 }
643
644 fn decompress_bounded(
645 &self,
646 data: &[u8],
647 max_output_size: usize,
648 ) -> Result<Vec<u8>, DecompressError> {
649 let mut out = data.to_vec();
650 out.extend_from_slice(data);
651 if out.len() > max_output_size {
652 return Err(DecompressError::TooLarge);
653 }
654 Ok(out)
655 }
656 }
657
658 fn base_timeout(receiver: &ResourceReceiver, eifr: f64) -> f64 {
659 let expected_tof = if receiver.outstanding_parts > 0 {
660 (receiver.outstanding_parts as f64 * receiver.sdu as f64 * 8.0) / eifr
661 } else {
662 (3.0 * receiver.sdu as f64) / eifr
663 };
664
665 receiver.last_activity
666 + receiver.part_timeout_factor * expected_tof
667 + RESOURCE_RETRY_GRACE_TIME
668 }
669
670 fn hmu_timeout(receiver: &ResourceReceiver, eifr: f64) -> f64 {
671 let expected_hmu_wait = (receiver.sdu as f64 * 8.0 * RESOURCE_HMU_WAIT_FACTOR) / eifr;
672 base_timeout(receiver, eifr) + expected_hmu_wait
673 }
674
675 fn make_sender_receiver() -> (ResourceSender, ResourceReceiver) {
676 let mut rng = rns_crypto::FixedRng::new(&[0x42; 64]);
677 let data = b"Hello, Resource Transfer!";
678
679 let sender = ResourceSender::new(
680 data,
681 None,
682 RESOURCE_SDU,
683 &identity_encrypt,
684 &NoopCompressor,
685 &mut rng,
686 1000.0,
687 false,
688 false,
689 None,
690 1,
691 1,
692 None,
693 0.5,
694 6.0,
695 )
696 .unwrap();
697
698 let adv_data = sender.get_advertisement(0);
699 let receiver =
700 ResourceReceiver::from_advertisement(&adv_data, RESOURCE_SDU, 0.5, 1000.0, None, None)
701 .unwrap();
702
703 (sender, receiver)
704 }
705
706 #[test]
707 fn test_from_advertisement() {
708 let (sender, receiver) = make_sender_receiver();
709 assert_eq!(receiver.total_parts, sender.total_parts());
710 assert_eq!(receiver.transfer_size, sender.transfer_size as u64);
711 assert_eq!(receiver.resource_hash, sender.resource_hash.to_vec());
712 assert!(!receiver.advertisement_packet.is_empty());
713 assert_eq!(
714 receiver.max_decompressed_size,
715 RESOURCE_AUTO_COMPRESS_MAX_SIZE
716 );
717 }
718
719 #[test]
720 fn test_accept() {
721 let (_, mut receiver) = make_sender_receiver();
722 let actions = receiver.accept(1000.0);
723 assert_eq!(receiver.status, ResourceStatus::Transferring);
724 assert!(!actions.is_empty());
725 assert!(actions
726 .iter()
727 .any(|a| matches!(a, ResourceAction::SendRequest(_))));
728 }
729
730 #[test]
731 fn test_reject() {
732 let (_, mut receiver) = make_sender_receiver();
733 let actions = receiver.reject();
734 assert_eq!(receiver.status, ResourceStatus::Rejected);
735 assert!(actions
736 .iter()
737 .any(|a| matches!(a, ResourceAction::SendCancelReceiver(_))));
738 }
739
740 #[test]
741 fn test_receive_part_stores() {
742 let (mut sender, mut receiver) = make_sender_receiver();
743 receiver.accept(1000.0);
744
745 let mut request = Vec::new();
748 request.push(RESOURCE_HASHMAP_IS_NOT_EXHAUSTED);
749 request.extend_from_slice(&sender.resource_hash);
750 request.extend_from_slice(&sender.part_hashes[0]);
751
752 let send_actions = sender.handle_request(&request, 1001.0);
753 let part_data = send_actions
754 .iter()
755 .find_map(|a| match a {
756 ResourceAction::SendPart(d) => Some(d.clone()),
757 _ => None,
758 })
759 .unwrap();
760
761 receiver.req_sent = 1000.5;
763 let _actions = receiver.receive_part(&part_data, 1001.0);
764 assert_eq!(receiver.received_count, 1);
765 }
766
767 #[test]
768 fn test_consecutive_completed_height() {
769 let (sender, mut receiver) = make_sender_receiver();
770 receiver.accept(1000.0);
771
772 if sender.total_parts() > 1 {
774 assert_eq!(receiver.consecutive_completed_height, -1);
776 }
777 }
778
779 #[test]
780 fn test_handle_cancel() {
781 let (_, mut receiver) = make_sender_receiver();
782 receiver.accept(1000.0);
783 let _actions = receiver.handle_cancel();
784 assert_eq!(receiver.status, ResourceStatus::Failed);
785 }
786
787 #[test]
788 fn test_assemble_compressed_resource_rejects_oversized_decompression() {
789 let data = b"oversized!";
790 let mut rng = rns_crypto::FixedRng::new(&[0x93; 64]);
791
792 let mut sender = ResourceSender::new(
793 data,
794 None,
795 RESOURCE_SDU,
796 &identity_encrypt,
797 &ExpandingCompressor,
798 &mut rng,
799 1000.0,
800 true,
801 false,
802 None,
803 1,
804 1,
805 None,
806 0.5,
807 6.0,
808 )
809 .unwrap();
810
811 let adv = sender.get_advertisement(0);
812 let mut receiver =
813 ResourceReceiver::from_advertisement(&adv, RESOURCE_SDU, 0.5, 1000.0, None, None)
814 .unwrap();
815 receiver.max_decompressed_size = data.len() - 1;
816
817 let request_data = receiver
818 .accept(1001.0)
819 .into_iter()
820 .find_map(|a| match a {
821 ResourceAction::SendRequest(d) => Some(d),
822 _ => None,
823 })
824 .unwrap();
825
826 let send_actions = sender.handle_request(&request_data, 1002.0);
827 receiver.req_sent = 1001.0;
828 for action in &send_actions {
829 if let ResourceAction::SendPart(part_data) = action {
830 receiver.receive_part(part_data, 1003.0);
831 }
832 }
833
834 let assemble_actions = receiver.assemble(&identity_decrypt, &ExpandingCompressor);
835 assert_eq!(receiver.status, ResourceStatus::Corrupt);
836 assert!(assemble_actions
837 .iter()
838 .any(|a| matches!(a, ResourceAction::SendCancelReceiver(_))));
839 assert!(assemble_actions
840 .iter()
841 .any(|a| matches!(a, ResourceAction::Failed(ResourceError::TooLarge))));
842 assert!(assemble_actions
843 .iter()
844 .any(|a| matches!(a, ResourceAction::TeardownLink)));
845 }
846
847 #[test]
848 fn test_full_transfer_small_data() {
849 let data = b"small data";
851 let mut rng = rns_crypto::FixedRng::new(&[0x77; 64]);
852
853 let mut sender = ResourceSender::new(
854 data,
855 None,
856 RESOURCE_SDU,
857 &identity_encrypt,
858 &NoopCompressor,
859 &mut rng,
860 1000.0,
861 false,
862 false,
863 None,
864 1,
865 1,
866 None,
867 0.5,
868 6.0,
869 )
870 .unwrap();
871
872 let adv = sender.get_advertisement(0);
873 let mut receiver =
874 ResourceReceiver::from_advertisement(&adv, RESOURCE_SDU, 0.5, 1000.0, None, None)
875 .unwrap();
876
877 let req_actions = receiver.accept(1001.0);
879 assert_eq!(receiver.status, ResourceStatus::Transferring);
880
881 let request_data = req_actions
883 .iter()
884 .find_map(|a| match a {
885 ResourceAction::SendRequest(d) => Some(d.clone()),
886 _ => None,
887 })
888 .unwrap();
889
890 let send_actions = sender.handle_request(&request_data, 1002.0);
892
893 receiver.req_sent = 1001.0;
895 for action in &send_actions {
896 if let ResourceAction::SendPart(part_data) = action {
897 receiver.receive_part(part_data, 1003.0);
898 }
899 }
900
901 assert_eq!(receiver.received_count, receiver.total_parts);
902
903 let assemble_actions = receiver.assemble(&identity_decrypt, &NoopCompressor);
905
906 let has_proof = assemble_actions
908 .iter()
909 .any(|a| matches!(a, ResourceAction::SendProof(_)));
910 let has_data = assemble_actions
911 .iter()
912 .any(|a| matches!(a, ResourceAction::DataReceived { .. }));
913 let has_complete = assemble_actions
914 .iter()
915 .any(|a| matches!(a, ResourceAction::Completed));
916
917 assert!(has_proof, "Should send proof");
918 assert!(has_data, "Should return data");
919 assert!(has_complete, "Should be completed");
920
921 let received_data = assemble_actions
923 .iter()
924 .find_map(|a| match a {
925 ResourceAction::DataReceived { data, .. } => Some(data.clone()),
926 _ => None,
927 })
928 .unwrap();
929 assert_eq!(received_data, data);
930
931 let proof_data = assemble_actions
933 .iter()
934 .find_map(|a| match a {
935 ResourceAction::SendProof(d) => Some(d.clone()),
936 _ => None,
937 })
938 .unwrap();
939
940 let _proof_actions = sender.handle_proof(&proof_data, 1004.0);
941 assert_eq!(sender.status, ResourceStatus::Complete);
942 }
943
944 #[test]
945 fn test_full_transfer_with_metadata() {
946 let data = b"data with metadata";
947 let metadata = b"some metadata";
948 let mut rng = rns_crypto::FixedRng::new(&[0x88; 64]);
949
950 let mut sender = ResourceSender::new(
951 data,
952 Some(metadata),
953 RESOURCE_SDU,
954 &identity_encrypt,
955 &NoopCompressor,
956 &mut rng,
957 1000.0,
958 false,
959 false,
960 None,
961 1,
962 1,
963 None,
964 0.5,
965 6.0,
966 )
967 .unwrap();
968
969 assert!(sender.flags.has_metadata);
970
971 let adv = sender.get_advertisement(0);
972 let mut receiver =
973 ResourceReceiver::from_advertisement(&adv, RESOURCE_SDU, 0.5, 1000.0, None, None)
974 .unwrap();
975
976 assert!(receiver.has_metadata);
977
978 let req_actions = receiver.accept(1001.0);
980 let request_data = req_actions
981 .iter()
982 .find_map(|a| match a {
983 ResourceAction::SendRequest(d) => Some(d.clone()),
984 _ => None,
985 })
986 .unwrap();
987
988 let send_actions = sender.handle_request(&request_data, 1002.0);
989 receiver.req_sent = 1001.0;
990 for action in &send_actions {
991 if let ResourceAction::SendPart(part_data) = action {
992 receiver.receive_part(part_data, 1003.0);
993 }
994 }
995
996 let assemble_actions = receiver.assemble(&identity_decrypt, &NoopCompressor);
997
998 let (recv_data, recv_meta) = assemble_actions
999 .iter()
1000 .find_map(|a| match a {
1001 ResourceAction::DataReceived { data, metadata } => {
1002 Some((data.clone(), metadata.clone()))
1003 }
1004 _ => None,
1005 })
1006 .unwrap();
1007
1008 assert_eq!(recv_data, data);
1009 assert_eq!(recv_meta.unwrap(), metadata);
1010 }
1011
1012 #[test]
1013 fn test_previous_window_restore() {
1014 let (_, _receiver) = make_sender_receiver();
1015 let adv_data = {
1017 let mut rng = rns_crypto::FixedRng::new(&[0x42; 64]);
1018 let sender = ResourceSender::new(
1019 b"test",
1020 None,
1021 RESOURCE_SDU,
1022 &identity_encrypt,
1023 &NoopCompressor,
1024 &mut rng,
1025 1000.0,
1026 false,
1027 false,
1028 None,
1029 1,
1030 1,
1031 None,
1032 0.5,
1033 6.0,
1034 )
1035 .unwrap();
1036 sender.get_advertisement(0)
1037 };
1038
1039 let receiver = ResourceReceiver::from_advertisement(
1040 &adv_data,
1041 RESOURCE_SDU,
1042 0.5,
1043 1000.0,
1044 Some(8),
1045 Some(50000.0),
1046 )
1047 .unwrap();
1048 assert_eq!(receiver.window.window, 8);
1049 }
1050
1051 #[test]
1052 fn test_tick_timeout_retry() {
1053 let (_, mut receiver) = make_sender_receiver();
1054 receiver.accept(1000.0);
1055 receiver.rtt = Some(0.1);
1056
1057 let actions = receiver.tick(9999.0, &identity_decrypt, &NoopCompressor);
1059 assert!(!actions.is_empty() || receiver.retries_left < RESOURCE_MAX_RETRIES);
1061 }
1062
1063 #[test]
1064 fn test_tick_waiting_for_hmu_gets_extra_timeout() {
1065 let (_, mut receiver) = make_sender_receiver();
1066 receiver.accept(1000.0);
1067 receiver.waiting_for_hmu = true;
1068 receiver.outstanding_parts = 0;
1069 let eifr = 10_000.0;
1070 receiver.previous_eifr = Some(eifr);
1071 receiver.last_activity = 1000.0;
1072
1073 let old_timeout = base_timeout(&receiver, eifr);
1074 let now = old_timeout + 0.01;
1075
1076 let actions = receiver.tick(now, &identity_decrypt, &NoopCompressor);
1077
1078 assert!(actions.is_empty(), "receiver should keep waiting for HMU");
1079 assert_eq!(receiver.retries_left, RESOURCE_MAX_RETRIES);
1080 assert_eq!(receiver.status, ResourceStatus::Transferring);
1081 }
1082
1083 #[test]
1084 fn test_tick_zero_outstanding_parts_gets_extra_timeout_without_hmu_flag() {
1085 let (_, mut receiver) = make_sender_receiver();
1086 receiver.accept(1000.0);
1087 receiver.waiting_for_hmu = false;
1088 receiver.outstanding_parts = 0;
1089 let eifr = 10_000.0;
1090 receiver.previous_eifr = Some(eifr);
1091 receiver.last_activity = 1000.0;
1092
1093 let old_timeout = base_timeout(&receiver, eifr);
1094 let now = old_timeout + 0.01;
1095
1096 let actions = receiver.tick(now, &identity_decrypt, &NoopCompressor);
1097
1098 assert!(
1099 actions.is_empty(),
1100 "receiver should keep waiting for follow-up hashmap data"
1101 );
1102 assert_eq!(receiver.retries_left, RESOURCE_MAX_RETRIES);
1103 assert_eq!(receiver.status, ResourceStatus::Transferring);
1104 }
1105
1106 #[test]
1107 fn test_tick_waiting_for_hmu_retries_after_extended_timeout() {
1108 let (_, mut receiver) = make_sender_receiver();
1109 receiver.accept(1000.0);
1110 receiver.waiting_for_hmu = true;
1111 receiver.outstanding_parts = 0;
1112 let eifr = 10_000.0;
1113 receiver.previous_eifr = Some(eifr);
1114 receiver.last_activity = 1000.0;
1115
1116 let now = hmu_timeout(&receiver, eifr) + 0.01;
1117 let _actions = receiver.tick(now, &identity_decrypt, &NoopCompressor);
1118
1119 assert_eq!(receiver.retries_left, RESOURCE_MAX_RETRIES - 1);
1120 assert!(!receiver.waiting_for_hmu);
1121 }
1122
1123 #[test]
1124 fn test_tick_inflight_parts_do_not_get_hmu_timeout_extension() {
1125 let (_, mut receiver) = make_sender_receiver();
1126 receiver.accept(1000.0);
1127 receiver.waiting_for_hmu = false;
1128 receiver.outstanding_parts = 2;
1129 let eifr = 10_000.0;
1130 receiver.previous_eifr = Some(eifr);
1131 receiver.last_activity = 1000.0;
1132
1133 let now = base_timeout(&receiver, eifr) + 0.01;
1134 let _actions = receiver.tick(now, &identity_decrypt, &NoopCompressor);
1135
1136 assert_eq!(receiver.retries_left, RESOURCE_MAX_RETRIES - 1);
1137 }
1138
1139 #[test]
1140 fn test_tick_max_retries_exceeded() {
1141 let (_, mut receiver) = make_sender_receiver();
1142 receiver.accept(1000.0);
1143 receiver.retries_left = 0;
1144 receiver.rtt = Some(0.001);
1145 receiver.eifr = Some(100000.0);
1146
1147 let _actions = receiver.tick(9999.0, &identity_decrypt, &NoopCompressor);
1148 assert_eq!(receiver.status, ResourceStatus::Failed);
1149 }
1150}