1use alloc::vec;
2use alloc::vec::Vec;
3
4use crate::constants::*;
5use crate::buffer::types::Compressor;
6use super::types::*;
7use super::advertisement::ResourceAdvertisement;
8use super::parts::{map_hash, extract_metadata};
9use super::proof::{compute_resource_hash, compute_expected_proof, build_proof_data};
10use super::window::WindowState;
11
12pub struct ResourceReceiver {
17 pub status: ResourceStatus,
19 pub resource_hash: Vec<u8>,
21 pub random_hash: Vec<u8>,
23 pub original_hash: Vec<u8>,
25 pub flags: AdvFlags,
27 pub transfer_size: u64,
29 pub data_size: u64,
31 pub total_parts: usize,
33 parts: Vec<Option<Vec<u8>>>,
35 hashmap: Vec<Option<[u8; RESOURCE_MAPHASH_LEN]>>,
37 hashmap_height: usize,
39 pub waiting_for_hmu: bool,
41 pub received_count: usize,
43 pub outstanding_parts: usize,
45 consecutive_completed_height: isize,
47 sdu: usize,
49 link_rtt: f64,
51 pub retries_left: usize,
53 max_retries: usize,
55 pub rtt: Option<f64>,
57 part_timeout_factor: f64,
59 pub last_activity: f64,
61 pub req_sent: f64,
63 req_sent_bytes: usize,
65 req_resp: Option<f64>,
67 rtt_rxd_bytes: usize,
69 rtt_rxd_bytes_at_part_req: usize,
71 req_resp_rtt_rate: f64,
73 req_data_rtt_rate: f64,
75 pub eifr: Option<f64>,
77 previous_eifr: Option<f64>,
79 pub segment_index: u64,
81 pub total_segments: u64,
83 pub has_metadata: bool,
85 pub request_id: Option<Vec<u8>>,
87 pub window: WindowState,
89}
90
91impl ResourceReceiver {
92 pub fn from_advertisement(
94 adv_data: &[u8],
95 sdu: usize,
96 link_rtt: f64,
97 now: f64,
98 previous_window: Option<usize>,
99 previous_eifr: Option<f64>,
100 ) -> Result<Self, ResourceError> {
101 let adv = ResourceAdvertisement::unpack(adv_data)?;
102
103 if adv.resource_hash.len() != 32 {
105 return Err(ResourceError::InvalidAdvertisement);
106 }
107
108 let total_parts = adv.num_parts as usize;
109 let parts_vec: Vec<Option<Vec<u8>>> = vec![None; total_parts];
110 let mut hashmap_vec: Vec<Option<[u8; RESOURCE_MAPHASH_LEN]>> = vec![None; total_parts];
111
112 let initial_hashes = adv.hashmap.len() / RESOURCE_MAPHASH_LEN;
114 let mut hashmap_height = 0;
115 for i in 0..initial_hashes {
116 if i < total_parts {
117 let start = i * RESOURCE_MAPHASH_LEN;
118 let end = start + RESOURCE_MAPHASH_LEN;
119 let mut h = [0u8; RESOURCE_MAPHASH_LEN];
120 h.copy_from_slice(&adv.hashmap[start..end]);
121 hashmap_vec[i] = Some(h);
122 hashmap_height += 1;
123 }
124 }
125
126 let mut window_state = WindowState::new();
127 if let Some(prev_w) = previous_window {
128 window_state.restore(prev_w);
129 }
130
131 Ok(ResourceReceiver {
132 status: ResourceStatus::None,
133 resource_hash: adv.resource_hash,
134 random_hash: adv.random_hash,
135 original_hash: adv.original_hash,
136 flags: adv.flags,
137 transfer_size: adv.transfer_size,
138 data_size: adv.data_size,
139 total_parts,
140 parts: parts_vec,
141 hashmap: hashmap_vec,
142 hashmap_height,
143 waiting_for_hmu: false,
144 received_count: 0,
145 outstanding_parts: 0,
146 consecutive_completed_height: -1,
147 sdu,
148 link_rtt,
149 retries_left: RESOURCE_MAX_RETRIES,
150 max_retries: RESOURCE_MAX_RETRIES,
151 rtt: None,
152 part_timeout_factor: RESOURCE_PART_TIMEOUT_FACTOR,
153 last_activity: now,
154 req_sent: 0.0,
155 req_sent_bytes: 0,
156 req_resp: None,
157 rtt_rxd_bytes: 0,
158 rtt_rxd_bytes_at_part_req: 0,
159 req_resp_rtt_rate: 0.0,
160 req_data_rtt_rate: 0.0,
161 eifr: None,
162 previous_eifr,
163 segment_index: adv.segment_index,
164 total_segments: adv.total_segments,
165 has_metadata: adv.flags.has_metadata,
166 request_id: adv.request_id,
167 window: window_state,
168 })
169 }
170
171 pub fn accept(&mut self, now: f64) -> Vec<ResourceAction> {
173 self.status = ResourceStatus::Transferring;
174 self.last_activity = now;
175 self.request_next(now)
176 }
177
178 pub fn reject(&mut self) -> Vec<ResourceAction> {
180 self.status = ResourceStatus::Rejected;
181 vec![ResourceAction::SendCancelReceiver(self.resource_hash.clone())]
182 }
183
184 pub fn receive_part(&mut self, part_data: &[u8], now: f64) -> Vec<ResourceAction> {
186 if self.status == ResourceStatus::Failed {
187 return vec![];
188 }
189
190 self.last_activity = now;
191 self.retries_left = self.max_retries;
192
193 if self.req_resp.is_none() {
195 self.req_resp = Some(now);
196 let rtt = now - self.req_sent;
197 self.part_timeout_factor = RESOURCE_PART_TIMEOUT_FACTOR_AFTER_RTT;
198
199 if self.rtt.is_none() {
200 self.rtt = Some(rtt);
201 } else if let Some(current_rtt) = self.rtt {
202 if rtt < current_rtt {
203 self.rtt = Some(f64::max(current_rtt - current_rtt * 0.05, rtt));
204 } else {
205 self.rtt = Some(f64::min(current_rtt + current_rtt * 0.05, rtt));
206 }
207 }
208
209 if rtt > 0.0 {
210 let req_resp_cost = part_data.len() + self.req_sent_bytes;
211 self.req_resp_rtt_rate = req_resp_cost as f64 / rtt;
212 self.window.update_req_resp_rate(self.req_resp_rtt_rate);
213 }
214 }
215
216 self.status = ResourceStatus::Transferring;
217
218 let part_hash = map_hash(part_data, &self.random_hash);
220
221 let consecutive_idx = if self.consecutive_completed_height >= 0 {
223 self.consecutive_completed_height as usize
224 } else {
225 0
226 };
227
228 let mut matched = false;
229 let search_end = core::cmp::min(consecutive_idx + self.window.window, self.total_parts);
230 for i in consecutive_idx..search_end {
231 if let Some(ref h) = self.hashmap[i] {
232 if *h == part_hash {
233 if self.parts[i].is_none() {
234 self.parts[i] = Some(part_data.to_vec());
235 self.rtt_rxd_bytes += part_data.len();
236 self.received_count += 1;
237 self.outstanding_parts = self.outstanding_parts.saturating_sub(1);
238
239 if i as isize == self.consecutive_completed_height + 1 {
241 self.consecutive_completed_height = i as isize;
242 }
243
244 let mut cp = (self.consecutive_completed_height + 1) as usize;
246 while cp < self.total_parts && self.parts[cp].is_some() {
247 self.consecutive_completed_height = cp as isize;
248 cp += 1;
249 }
250
251 matched = true;
252 }
253 break;
254 }
255 }
256 }
257
258 let mut actions = Vec::new();
259
260 if self.received_count == self.total_parts {
262 actions.push(ResourceAction::ProgressUpdate {
263 received: self.received_count,
264 total: self.total_parts,
265 });
266 return actions;
268 }
269
270 if matched {
271 actions.push(ResourceAction::ProgressUpdate {
272 received: self.received_count,
273 total: self.total_parts,
274 });
275 }
276
277 if self.outstanding_parts == 0 && self.received_count < self.total_parts {
279 self.window.on_window_complete();
281
282 if self.req_sent > 0.0 {
284 let rtt = now - self.req_sent;
285 let req_transferred = self.rtt_rxd_bytes - self.rtt_rxd_bytes_at_part_req;
286 if rtt > 0.0 {
287 self.req_data_rtt_rate = req_transferred as f64 / rtt;
288 self.rtt_rxd_bytes_at_part_req = self.rtt_rxd_bytes;
289 self.window.update_data_rate(self.req_data_rtt_rate);
290 }
291 }
292
293 let next_actions = self.request_next(now);
294 actions.extend(next_actions);
295 }
296
297 actions
298 }
299
300 pub fn handle_hashmap_update(&mut self, hmu_data: &[u8], now: f64) -> Vec<ResourceAction> {
304 if self.status == ResourceStatus::Failed {
305 return vec![];
306 }
307
308 self.last_activity = now;
309 self.retries_left = self.max_retries;
310
311 if hmu_data.len() <= 32 {
312 return vec![];
313 }
314
315 let payload = &hmu_data[32..];
316 let (value, _) = match crate::msgpack::unpack(payload) {
317 Ok(v) => v,
318 Err(_) => return vec![],
319 };
320
321 let arr = match value.as_array() {
322 Some(a) if a.len() >= 2 => a,
323 _ => return vec![],
324 };
325
326 let segment = match arr[0].as_uint() {
327 Some(s) => s as usize,
328 None => return vec![],
329 };
330
331 let hashmap_bytes = match arr[1].as_bin() {
332 Some(b) => b,
333 None => return vec![],
334 };
335
336 let seg_len = RESOURCE_HASHMAP_MAX_LEN;
338 let num_hashes = hashmap_bytes.len() / RESOURCE_MAPHASH_LEN;
339 for i in 0..num_hashes {
340 let idx = i + segment * seg_len;
341 if idx < self.total_parts {
342 let start = i * RESOURCE_MAPHASH_LEN;
343 let end = start + RESOURCE_MAPHASH_LEN;
344 if self.hashmap[idx].is_none() {
345 self.hashmap_height += 1;
346 }
347 let mut h = [0u8; RESOURCE_MAPHASH_LEN];
348 h.copy_from_slice(&hashmap_bytes[start..end]);
349 self.hashmap[idx] = Some(h);
350 }
351 }
352
353 self.waiting_for_hmu = false;
354 self.request_next(now)
355 }
356
357 pub fn request_next(&mut self, now: f64) -> Vec<ResourceAction> {
359 if self.status == ResourceStatus::Failed || self.waiting_for_hmu {
360 return vec![];
361 }
362
363 self.outstanding_parts = 0;
364 let mut hashmap_exhausted = RESOURCE_HASHMAP_IS_NOT_EXHAUSTED;
365 let mut requested_hashes = Vec::new();
366
367 let pn_start = (self.consecutive_completed_height + 1) as usize;
368 let search_end = core::cmp::min(pn_start + self.window.window, self.total_parts);
369 let mut i = 0;
370
371 for pn in pn_start..search_end {
372 if self.parts[pn].is_none() {
373 match self.hashmap[pn] {
374 Some(ref h) => {
375 requested_hashes.extend_from_slice(h);
376 self.outstanding_parts += 1;
377 i += 1;
378 }
379 None => {
380 hashmap_exhausted = RESOURCE_HASHMAP_IS_EXHAUSTED;
381 }
382 }
383 }
384 if i >= self.window.window || hashmap_exhausted == RESOURCE_HASHMAP_IS_EXHAUSTED {
385 break;
386 }
387 }
388
389 let mut request_data = Vec::new();
390 request_data.push(hashmap_exhausted);
391 if hashmap_exhausted == RESOURCE_HASHMAP_IS_EXHAUSTED {
392 if self.hashmap_height > 0 {
394 if let Some(ref last_hash) = self.hashmap[self.hashmap_height - 1] {
395 request_data.extend_from_slice(last_hash);
396 } else {
397 request_data.extend_from_slice(&[0u8; RESOURCE_MAPHASH_LEN]);
398 }
399 } else {
400 request_data.extend_from_slice(&[0u8; RESOURCE_MAPHASH_LEN]);
401 }
402 self.waiting_for_hmu = true;
403 }
404
405 request_data.extend_from_slice(&self.resource_hash);
406 request_data.extend_from_slice(&requested_hashes);
407
408 self.last_activity = now;
409 self.req_sent = now;
410 self.req_sent_bytes = request_data.len();
411 self.req_resp = None;
412
413 vec![ResourceAction::SendRequest(request_data)]
414 }
415
416 pub fn assemble(
418 &mut self,
419 decrypt_fn: &dyn Fn(&[u8]) -> Result<Vec<u8>, ()>,
420 compressor: &dyn Compressor,
421 ) -> Vec<ResourceAction> {
422 if self.received_count != self.total_parts {
423 return vec![ResourceAction::Failed(ResourceError::InvalidState)];
424 }
425
426 self.status = ResourceStatus::Assembling;
427
428 let mut stream = Vec::new();
430 for part in &self.parts {
431 match part {
432 Some(data) => stream.extend_from_slice(data),
433 None => {
434 self.status = ResourceStatus::Failed;
435 return vec![ResourceAction::Failed(ResourceError::InvalidState)];
436 }
437 }
438 }
439
440 let decrypted = if self.flags.encrypted {
442 match decrypt_fn(&stream) {
443 Ok(d) => d,
444 Err(_) => {
445 self.status = ResourceStatus::Failed;
446 return vec![ResourceAction::Failed(ResourceError::DecryptionFailed)];
447 }
448 }
449 } else {
450 stream
451 };
452
453 if decrypted.len() < RESOURCE_RANDOM_HASH_SIZE {
455 self.status = ResourceStatus::Corrupt;
456 return vec![ResourceAction::Failed(ResourceError::InvalidPart)];
457 }
458 let data_after_random = &decrypted[RESOURCE_RANDOM_HASH_SIZE..];
459
460 let decompressed = if self.flags.compressed {
462 match compressor.decompress(data_after_random) {
463 Some(d) => d,
464 None => {
465 self.status = ResourceStatus::Corrupt;
466 return vec![ResourceAction::Failed(ResourceError::DecompressionFailed)];
467 }
468 }
469 } else {
470 data_after_random.to_vec()
471 };
472
473 let calculated_hash = compute_resource_hash(&decompressed, &self.random_hash);
475 if calculated_hash.as_slice() != self.resource_hash.as_slice() {
476 self.status = ResourceStatus::Corrupt;
477 return vec![ResourceAction::Failed(ResourceError::HashMismatch)];
478 }
479
480 let expected_proof = compute_expected_proof(&decompressed, &calculated_hash);
482 let proof_data = build_proof_data(&calculated_hash, &expected_proof);
483
484 let (data, metadata) = if self.has_metadata && self.segment_index == 1 {
486 match extract_metadata(&decompressed) {
487 Some((meta, rest)) => (rest, Some(meta)),
488 None => {
489 self.status = ResourceStatus::Corrupt;
490 return vec![ResourceAction::Failed(ResourceError::InvalidPart)];
491 }
492 }
493 } else {
494 (decompressed, None)
495 };
496
497 self.status = ResourceStatus::Complete;
498
499 vec![
500 ResourceAction::SendProof(proof_data),
501 ResourceAction::DataReceived { data, metadata },
502 ResourceAction::Completed,
503 ]
504 }
505
506 pub fn handle_cancel(&mut self) -> Vec<ResourceAction> {
508 if self.status < ResourceStatus::Complete {
509 self.status = ResourceStatus::Failed;
510 return vec![ResourceAction::Failed(ResourceError::Rejected)];
511 }
512 vec![]
513 }
514
515 pub fn tick(
517 &mut self,
518 now: f64,
519 decrypt_fn: &dyn Fn(&[u8]) -> Result<Vec<u8>, ()>,
520 compressor: &dyn Compressor,
521 ) -> Vec<ResourceAction> {
522 if self.status >= ResourceStatus::Assembling {
523 return vec![];
524 }
525
526 if self.status == ResourceStatus::Transferring {
527 if self.received_count == self.total_parts {
529 return self.assemble(decrypt_fn, compressor);
530 }
531
532 let eifr = self.compute_eifr();
534 let retries_used = self.max_retries - self.retries_left;
535 let extra_wait = retries_used as f64 * RESOURCE_PER_RETRY_DELAY;
536 let expected_tof = if self.outstanding_parts > 0 && eifr > 0.0 {
537 (self.outstanding_parts as f64 * self.sdu as f64 * 8.0) / eifr
538 } else {
539 if eifr > 0.0 {
540 (3.0 * self.sdu as f64) / eifr
541 } else {
542 10.0 }
544 };
545
546 let sleep_time = self.last_activity
547 + self.part_timeout_factor * expected_tof
548 + RESOURCE_RETRY_GRACE_TIME
549 + extra_wait;
550
551 if now > sleep_time {
552 if self.retries_left > 0 {
553 self.window.on_timeout();
555 self.retries_left -= 1;
556 self.waiting_for_hmu = false;
557 return self.request_next(now);
558 } else {
559 self.status = ResourceStatus::Failed;
560 return vec![ResourceAction::Failed(ResourceError::MaxRetriesExceeded)];
561 }
562 }
563 }
564
565 vec![]
566 }
567
568 fn compute_eifr(&mut self) -> f64 {
570 let eifr = if self.req_data_rtt_rate > 0.0 {
571 self.req_data_rtt_rate * 8.0
572 } else if let Some(prev) = self.previous_eifr {
573 prev
574 } else {
575 let rtt = self.rtt.unwrap_or(self.link_rtt);
577 if rtt > 0.0 {
578 (self.sdu as f64 * 8.0) / rtt
579 } else {
580 10000.0
581 }
582 };
583 self.eifr = Some(eifr);
584 eifr
585 }
586
587 pub fn progress(&self) -> (usize, usize) {
589 (self.received_count, self.total_parts)
590 }
591
592 pub fn get_transfer_state(&self) -> (usize, Option<f64>) {
594 (self.window.window, self.eifr)
595 }
596}
597
598#[cfg(test)]
599mod tests {
600 use super::*;
601 use crate::buffer::types::NoopCompressor;
602 use crate::resource::sender::ResourceSender;
603
604 fn identity_encrypt(data: &[u8]) -> Vec<u8> {
605 data.to_vec()
606 }
607
608 fn identity_decrypt(data: &[u8]) -> Result<Vec<u8>, ()> {
609 Ok(data.to_vec())
610 }
611
612 fn make_sender_receiver() -> (ResourceSender, ResourceReceiver) {
613 let mut rng = rns_crypto::FixedRng::new(&[0x42; 64]);
614 let data = b"Hello, Resource Transfer!";
615
616 let sender = ResourceSender::new(
617 data,
618 None,
619 RESOURCE_SDU,
620 &identity_encrypt,
621 &NoopCompressor,
622 &mut rng,
623 1000.0,
624 false,
625 false,
626 None,
627 1,
628 1,
629 None,
630 0.5,
631 6.0,
632 )
633 .unwrap();
634
635 let adv_data = sender.get_advertisement(0);
636 let receiver = ResourceReceiver::from_advertisement(
637 &adv_data,
638 RESOURCE_SDU,
639 0.5,
640 1000.0,
641 None,
642 None,
643 )
644 .unwrap();
645
646 (sender, receiver)
647 }
648
649 #[test]
650 fn test_from_advertisement() {
651 let (sender, receiver) = make_sender_receiver();
652 assert_eq!(receiver.total_parts, sender.total_parts());
653 assert_eq!(receiver.transfer_size, sender.transfer_size as u64);
654 assert_eq!(receiver.resource_hash, sender.resource_hash.to_vec());
655 }
656
657 #[test]
658 fn test_accept() {
659 let (_, mut receiver) = make_sender_receiver();
660 let actions = receiver.accept(1000.0);
661 assert_eq!(receiver.status, ResourceStatus::Transferring);
662 assert!(!actions.is_empty());
663 assert!(actions.iter().any(|a| matches!(a, ResourceAction::SendRequest(_))));
664 }
665
666 #[test]
667 fn test_reject() {
668 let (_, mut receiver) = make_sender_receiver();
669 let actions = receiver.reject();
670 assert_eq!(receiver.status, ResourceStatus::Rejected);
671 assert!(actions.iter().any(|a| matches!(a, ResourceAction::SendCancelReceiver(_))));
672 }
673
674 #[test]
675 fn test_receive_part_stores() {
676 let (mut sender, mut receiver) = make_sender_receiver();
677 receiver.accept(1000.0);
678
679 let mut request = Vec::new();
682 request.push(RESOURCE_HASHMAP_IS_NOT_EXHAUSTED);
683 request.extend_from_slice(&sender.resource_hash);
684 request.extend_from_slice(&sender.part_hashes[0]);
685
686 let send_actions = sender.handle_request(&request, 1001.0);
687 let part_data = send_actions
688 .iter()
689 .find_map(|a| match a {
690 ResourceAction::SendPart(d) => Some(d.clone()),
691 _ => None,
692 })
693 .unwrap();
694
695 receiver.req_sent = 1000.5;
697 let _actions = receiver.receive_part(&part_data, 1001.0);
698 assert_eq!(receiver.received_count, 1);
699 }
700
701 #[test]
702 fn test_consecutive_completed_height() {
703 let (sender, mut receiver) = make_sender_receiver();
704 receiver.accept(1000.0);
705
706 if sender.total_parts() > 1 {
708 assert_eq!(receiver.consecutive_completed_height, -1);
710 }
711 }
712
713 #[test]
714 fn test_handle_cancel() {
715 let (_, mut receiver) = make_sender_receiver();
716 receiver.accept(1000.0);
717 let _actions = receiver.handle_cancel();
718 assert_eq!(receiver.status, ResourceStatus::Failed);
719 }
720
721 #[test]
722 fn test_full_transfer_small_data() {
723 let data = b"small data";
725 let mut rng = rns_crypto::FixedRng::new(&[0x77; 64]);
726
727 let mut sender = ResourceSender::new(
728 data,
729 None,
730 RESOURCE_SDU,
731 &identity_encrypt,
732 &NoopCompressor,
733 &mut rng,
734 1000.0,
735 false,
736 false,
737 None,
738 1,
739 1,
740 None,
741 0.5,
742 6.0,
743 )
744 .unwrap();
745
746 let adv = sender.get_advertisement(0);
747 let mut receiver = ResourceReceiver::from_advertisement(
748 &adv,
749 RESOURCE_SDU,
750 0.5,
751 1000.0,
752 None,
753 None,
754 )
755 .unwrap();
756
757 let req_actions = receiver.accept(1001.0);
759 assert_eq!(receiver.status, ResourceStatus::Transferring);
760
761 let request_data = req_actions
763 .iter()
764 .find_map(|a| match a {
765 ResourceAction::SendRequest(d) => Some(d.clone()),
766 _ => None,
767 })
768 .unwrap();
769
770 let send_actions = sender.handle_request(&request_data, 1002.0);
772
773 receiver.req_sent = 1001.0;
775 for action in &send_actions {
776 if let ResourceAction::SendPart(part_data) = action {
777 receiver.receive_part(part_data, 1003.0);
778 }
779 }
780
781 assert_eq!(receiver.received_count, receiver.total_parts);
782
783 let assemble_actions = receiver.assemble(&identity_decrypt, &NoopCompressor);
785
786 let has_proof = assemble_actions.iter().any(|a| matches!(a, ResourceAction::SendProof(_)));
788 let has_data = assemble_actions.iter().any(|a| matches!(a, ResourceAction::DataReceived { .. }));
789 let has_complete = assemble_actions.iter().any(|a| matches!(a, ResourceAction::Completed));
790
791 assert!(has_proof, "Should send proof");
792 assert!(has_data, "Should return data");
793 assert!(has_complete, "Should be completed");
794
795 let received_data = assemble_actions
797 .iter()
798 .find_map(|a| match a {
799 ResourceAction::DataReceived { data, .. } => Some(data.clone()),
800 _ => None,
801 })
802 .unwrap();
803 assert_eq!(received_data, data);
804
805 let proof_data = assemble_actions
807 .iter()
808 .find_map(|a| match a {
809 ResourceAction::SendProof(d) => Some(d.clone()),
810 _ => None,
811 })
812 .unwrap();
813
814 let _proof_actions = sender.handle_proof(&proof_data, 1004.0);
815 assert_eq!(sender.status, ResourceStatus::Complete);
816 }
817
818 #[test]
819 fn test_full_transfer_with_metadata() {
820 let data = b"data with metadata";
821 let metadata = b"some metadata";
822 let mut rng = rns_crypto::FixedRng::new(&[0x88; 64]);
823
824 let mut sender = ResourceSender::new(
825 data,
826 Some(metadata),
827 RESOURCE_SDU,
828 &identity_encrypt,
829 &NoopCompressor,
830 &mut rng,
831 1000.0,
832 false,
833 false,
834 None,
835 1,
836 1,
837 None,
838 0.5,
839 6.0,
840 )
841 .unwrap();
842
843 assert!(sender.flags.has_metadata);
844
845 let adv = sender.get_advertisement(0);
846 let mut receiver = ResourceReceiver::from_advertisement(
847 &adv,
848 RESOURCE_SDU,
849 0.5,
850 1000.0,
851 None,
852 None,
853 )
854 .unwrap();
855
856 assert!(receiver.has_metadata);
857
858 let req_actions = receiver.accept(1001.0);
860 let request_data = req_actions
861 .iter()
862 .find_map(|a| match a {
863 ResourceAction::SendRequest(d) => Some(d.clone()),
864 _ => None,
865 })
866 .unwrap();
867
868 let send_actions = sender.handle_request(&request_data, 1002.0);
869 receiver.req_sent = 1001.0;
870 for action in &send_actions {
871 if let ResourceAction::SendPart(part_data) = action {
872 receiver.receive_part(part_data, 1003.0);
873 }
874 }
875
876 let assemble_actions = receiver.assemble(&identity_decrypt, &NoopCompressor);
877
878 let (recv_data, recv_meta) = assemble_actions
879 .iter()
880 .find_map(|a| match a {
881 ResourceAction::DataReceived { data, metadata } => Some((data.clone(), metadata.clone())),
882 _ => None,
883 })
884 .unwrap();
885
886 assert_eq!(recv_data, data);
887 assert_eq!(recv_meta.unwrap(), metadata);
888 }
889
890 #[test]
891 fn test_previous_window_restore() {
892 let (_, _receiver) = make_sender_receiver();
893 let adv_data = {
895 let mut rng = rns_crypto::FixedRng::new(&[0x42; 64]);
896 let sender = ResourceSender::new(
897 b"test",
898 None,
899 RESOURCE_SDU,
900 &identity_encrypt,
901 &NoopCompressor,
902 &mut rng,
903 1000.0,
904 false,
905 false,
906 None,
907 1,
908 1,
909 None,
910 0.5,
911 6.0,
912 )
913 .unwrap();
914 sender.get_advertisement(0)
915 };
916
917 let receiver = ResourceReceiver::from_advertisement(
918 &adv_data,
919 RESOURCE_SDU,
920 0.5,
921 1000.0,
922 Some(8),
923 Some(50000.0),
924 )
925 .unwrap();
926 assert_eq!(receiver.window.window, 8);
927 }
928
929 #[test]
930 fn test_tick_timeout_retry() {
931 let (_, mut receiver) = make_sender_receiver();
932 receiver.accept(1000.0);
933 receiver.rtt = Some(0.1);
934
935 let actions = receiver.tick(9999.0, &identity_decrypt, &NoopCompressor);
937 assert!(!actions.is_empty() || receiver.retries_left < RESOURCE_MAX_RETRIES);
939 }
940
941 #[test]
942 fn test_tick_max_retries_exceeded() {
943 let (_, mut receiver) = make_sender_receiver();
944 receiver.accept(1000.0);
945 receiver.retries_left = 0;
946 receiver.rtt = Some(0.001);
947 receiver.eifr = Some(100000.0);
948
949 let _actions = receiver.tick(9999.0, &identity_decrypt, &NoopCompressor);
950 assert_eq!(receiver.status, ResourceStatus::Failed);
951 }
952}