1use alloc::vec;
2use alloc::vec::Vec;
3
4use super::advertisement::ResourceAdvertisement;
5use super::parts::{extract_metadata, map_hash};
6use super::proof::{build_proof_data, compute_expected_proof, compute_resource_hash};
7use super::types::*;
8use super::window::WindowState;
9use crate::buffer::types::Compressor;
10use crate::constants::*;
11
12pub struct ResourceReceiver {
17 pub status: ResourceStatus,
19 pub resource_hash: Vec<u8>,
21 pub random_hash: Vec<u8>,
23 pub original_hash: Vec<u8>,
25 pub flags: AdvFlags,
27 pub transfer_size: u64,
29 pub data_size: u64,
31 pub total_parts: usize,
33 parts: Vec<Option<Vec<u8>>>,
35 hashmap: Vec<Option<[u8; RESOURCE_MAPHASH_LEN]>>,
37 hashmap_height: usize,
39 pub waiting_for_hmu: bool,
41 pub received_count: usize,
43 pub outstanding_parts: usize,
45 consecutive_completed_height: isize,
47 sdu: usize,
49 link_rtt: f64,
51 pub retries_left: usize,
53 max_retries: usize,
55 pub rtt: Option<f64>,
57 part_timeout_factor: f64,
59 pub last_activity: f64,
61 pub req_sent: f64,
63 req_sent_bytes: usize,
65 req_resp: Option<f64>,
67 rtt_rxd_bytes: usize,
69 rtt_rxd_bytes_at_part_req: usize,
71 req_resp_rtt_rate: f64,
73 req_data_rtt_rate: f64,
75 pub eifr: Option<f64>,
77 previous_eifr: Option<f64>,
79 pub segment_index: u64,
81 pub total_segments: u64,
83 pub has_metadata: bool,
85 pub request_id: Option<Vec<u8>>,
87 pub window: WindowState,
89}
90
91impl ResourceReceiver {
92 pub fn from_advertisement(
94 adv_data: &[u8],
95 sdu: usize,
96 link_rtt: f64,
97 now: f64,
98 previous_window: Option<usize>,
99 previous_eifr: Option<f64>,
100 ) -> Result<Self, ResourceError> {
101 let adv = ResourceAdvertisement::unpack(adv_data)?;
102
103 if adv.resource_hash.len() != 32 {
105 return Err(ResourceError::InvalidAdvertisement);
106 }
107
108 let total_parts = adv.num_parts as usize;
109 let parts_vec: Vec<Option<Vec<u8>>> = vec![None; total_parts];
110 let mut hashmap_vec: Vec<Option<[u8; RESOURCE_MAPHASH_LEN]>> = vec![None; total_parts];
111
112 let initial_hashes = adv.hashmap.len() / RESOURCE_MAPHASH_LEN;
114 let mut hashmap_height = 0;
115 for i in 0..initial_hashes {
116 if i < total_parts {
117 let start = i * RESOURCE_MAPHASH_LEN;
118 let end = start + RESOURCE_MAPHASH_LEN;
119 let mut h = [0u8; RESOURCE_MAPHASH_LEN];
120 h.copy_from_slice(&adv.hashmap[start..end]);
121 hashmap_vec[i] = Some(h);
122 hashmap_height += 1;
123 }
124 }
125
126 let mut window_state = WindowState::new();
127 if let Some(prev_w) = previous_window {
128 window_state.restore(prev_w);
129 }
130
131 Ok(ResourceReceiver {
132 status: ResourceStatus::None,
133 resource_hash: adv.resource_hash,
134 random_hash: adv.random_hash,
135 original_hash: adv.original_hash,
136 flags: adv.flags,
137 transfer_size: adv.transfer_size,
138 data_size: adv.data_size,
139 total_parts,
140 parts: parts_vec,
141 hashmap: hashmap_vec,
142 hashmap_height,
143 waiting_for_hmu: false,
144 received_count: 0,
145 outstanding_parts: 0,
146 consecutive_completed_height: -1,
147 sdu,
148 link_rtt,
149 retries_left: RESOURCE_MAX_RETRIES,
150 max_retries: RESOURCE_MAX_RETRIES,
151 rtt: None,
152 part_timeout_factor: RESOURCE_PART_TIMEOUT_FACTOR,
153 last_activity: now,
154 req_sent: 0.0,
155 req_sent_bytes: 0,
156 req_resp: None,
157 rtt_rxd_bytes: 0,
158 rtt_rxd_bytes_at_part_req: 0,
159 req_resp_rtt_rate: 0.0,
160 req_data_rtt_rate: 0.0,
161 eifr: None,
162 previous_eifr,
163 segment_index: adv.segment_index,
164 total_segments: adv.total_segments,
165 has_metadata: adv.flags.has_metadata,
166 request_id: adv.request_id,
167 window: window_state,
168 })
169 }
170
171 pub fn accept(&mut self, now: f64) -> Vec<ResourceAction> {
173 self.status = ResourceStatus::Transferring;
174 self.last_activity = now;
175 self.request_next(now)
176 }
177
178 pub fn reject(&mut self) -> Vec<ResourceAction> {
180 self.status = ResourceStatus::Rejected;
181 vec![ResourceAction::SendCancelReceiver(
182 self.resource_hash.clone(),
183 )]
184 }
185
186 pub fn receive_part(&mut self, part_data: &[u8], now: f64) -> Vec<ResourceAction> {
188 if self.status == ResourceStatus::Failed {
189 return vec![];
190 }
191
192 self.last_activity = now;
193 self.retries_left = self.max_retries;
194
195 if self.req_resp.is_none() {
197 self.req_resp = Some(now);
198 let rtt = now - self.req_sent;
199 self.part_timeout_factor = RESOURCE_PART_TIMEOUT_FACTOR_AFTER_RTT;
200
201 if self.rtt.is_none() {
202 self.rtt = Some(rtt);
203 } else if let Some(current_rtt) = self.rtt {
204 if rtt < current_rtt {
205 self.rtt = Some(f64::max(current_rtt - current_rtt * 0.05, rtt));
206 } else {
207 self.rtt = Some(f64::min(current_rtt + current_rtt * 0.05, rtt));
208 }
209 }
210
211 if rtt > 0.0 {
212 let req_resp_cost = part_data.len() + self.req_sent_bytes;
213 self.req_resp_rtt_rate = req_resp_cost as f64 / rtt;
214 self.window.update_req_resp_rate(self.req_resp_rtt_rate);
215 }
216 }
217
218 self.status = ResourceStatus::Transferring;
219
220 let part_hash = map_hash(part_data, &self.random_hash);
222
223 let consecutive_idx = if self.consecutive_completed_height >= 0 {
225 self.consecutive_completed_height as usize
226 } else {
227 0
228 };
229
230 let mut matched = false;
231 let search_end = core::cmp::min(consecutive_idx + self.window.window, self.total_parts);
232 for i in consecutive_idx..search_end {
233 if let Some(ref h) = self.hashmap[i] {
234 if *h == part_hash {
235 if self.parts[i].is_none() {
236 self.parts[i] = Some(part_data.to_vec());
237 self.rtt_rxd_bytes += part_data.len();
238 self.received_count += 1;
239 self.outstanding_parts = self.outstanding_parts.saturating_sub(1);
240
241 if i as isize == self.consecutive_completed_height + 1 {
243 self.consecutive_completed_height = i as isize;
244 }
245
246 let mut cp = (self.consecutive_completed_height + 1) as usize;
248 while cp < self.total_parts && self.parts[cp].is_some() {
249 self.consecutive_completed_height = cp as isize;
250 cp += 1;
251 }
252
253 matched = true;
254 }
255 break;
256 }
257 }
258 }
259
260 let mut actions = Vec::new();
261
262 if self.received_count == self.total_parts {
264 actions.push(ResourceAction::ProgressUpdate {
265 received: self.received_count,
266 total: self.total_parts,
267 });
268 return actions;
270 }
271
272 if matched {
273 actions.push(ResourceAction::ProgressUpdate {
274 received: self.received_count,
275 total: self.total_parts,
276 });
277 }
278
279 if self.outstanding_parts == 0 && self.received_count < self.total_parts {
281 self.window.on_window_complete();
283
284 if self.req_sent > 0.0 {
286 let rtt = now - self.req_sent;
287 let req_transferred = self.rtt_rxd_bytes - self.rtt_rxd_bytes_at_part_req;
288 if rtt > 0.0 {
289 self.req_data_rtt_rate = req_transferred as f64 / rtt;
290 self.rtt_rxd_bytes_at_part_req = self.rtt_rxd_bytes;
291 self.window.update_data_rate(self.req_data_rtt_rate);
292 }
293 }
294
295 let next_actions = self.request_next(now);
296 actions.extend(next_actions);
297 }
298
299 actions
300 }
301
302 pub fn handle_hashmap_update(&mut self, hmu_data: &[u8], now: f64) -> Vec<ResourceAction> {
306 if self.status == ResourceStatus::Failed {
307 return vec![];
308 }
309
310 self.last_activity = now;
311 self.retries_left = self.max_retries;
312
313 if hmu_data.len() <= 32 {
314 return vec![];
315 }
316
317 let payload = &hmu_data[32..];
318 let (value, _) = match crate::msgpack::unpack(payload) {
319 Ok(v) => v,
320 Err(_) => return vec![],
321 };
322
323 let arr = match value.as_array() {
324 Some(a) if a.len() >= 2 => a,
325 _ => return vec![],
326 };
327
328 let segment = match arr[0].as_uint() {
329 Some(s) => s as usize,
330 None => return vec![],
331 };
332
333 let hashmap_bytes = match arr[1].as_bin() {
334 Some(b) => b,
335 None => return vec![],
336 };
337
338 let seg_len = RESOURCE_HASHMAP_MAX_LEN;
340 let num_hashes = hashmap_bytes.len() / RESOURCE_MAPHASH_LEN;
341 for i in 0..num_hashes {
342 let idx = i + segment * seg_len;
343 if idx < self.total_parts {
344 let start = i * RESOURCE_MAPHASH_LEN;
345 let end = start + RESOURCE_MAPHASH_LEN;
346 if self.hashmap[idx].is_none() {
347 self.hashmap_height += 1;
348 }
349 let mut h = [0u8; RESOURCE_MAPHASH_LEN];
350 h.copy_from_slice(&hashmap_bytes[start..end]);
351 self.hashmap[idx] = Some(h);
352 }
353 }
354
355 self.waiting_for_hmu = false;
356 self.request_next(now)
357 }
358
359 pub fn request_next(&mut self, now: f64) -> Vec<ResourceAction> {
361 if self.status == ResourceStatus::Failed || self.waiting_for_hmu {
362 return vec![];
363 }
364
365 self.outstanding_parts = 0;
366 let mut hashmap_exhausted = RESOURCE_HASHMAP_IS_NOT_EXHAUSTED;
367 let mut requested_hashes = Vec::new();
368
369 let pn_start = (self.consecutive_completed_height + 1) as usize;
370 let search_end = core::cmp::min(pn_start + self.window.window, self.total_parts);
371 let mut i = 0;
372
373 for pn in pn_start..search_end {
374 if self.parts[pn].is_none() {
375 match self.hashmap[pn] {
376 Some(ref h) => {
377 requested_hashes.extend_from_slice(h);
378 self.outstanding_parts += 1;
379 i += 1;
380 }
381 None => {
382 hashmap_exhausted = RESOURCE_HASHMAP_IS_EXHAUSTED;
383 }
384 }
385 }
386 if i >= self.window.window || hashmap_exhausted == RESOURCE_HASHMAP_IS_EXHAUSTED {
387 break;
388 }
389 }
390
391 let mut request_data = Vec::new();
392 request_data.push(hashmap_exhausted);
393 if hashmap_exhausted == RESOURCE_HASHMAP_IS_EXHAUSTED {
394 if self.hashmap_height > 0 {
396 if let Some(ref last_hash) = self.hashmap[self.hashmap_height - 1] {
397 request_data.extend_from_slice(last_hash);
398 } else {
399 request_data.extend_from_slice(&[0u8; RESOURCE_MAPHASH_LEN]);
400 }
401 } else {
402 request_data.extend_from_slice(&[0u8; RESOURCE_MAPHASH_LEN]);
403 }
404 self.waiting_for_hmu = true;
405 }
406
407 request_data.extend_from_slice(&self.resource_hash);
408 request_data.extend_from_slice(&requested_hashes);
409
410 self.last_activity = now;
411 self.req_sent = now;
412 self.req_sent_bytes = request_data.len();
413 self.req_resp = None;
414
415 vec![ResourceAction::SendRequest(request_data)]
416 }
417
418 pub fn assemble(
420 &mut self,
421 decrypt_fn: &dyn Fn(&[u8]) -> Result<Vec<u8>, ()>,
422 compressor: &dyn Compressor,
423 ) -> Vec<ResourceAction> {
424 if self.received_count != self.total_parts {
425 return vec![ResourceAction::Failed(ResourceError::InvalidState)];
426 }
427
428 self.status = ResourceStatus::Assembling;
429
430 let mut stream = Vec::new();
432 for part in &self.parts {
433 match part {
434 Some(data) => stream.extend_from_slice(data),
435 None => {
436 self.status = ResourceStatus::Failed;
437 return vec![ResourceAction::Failed(ResourceError::InvalidState)];
438 }
439 }
440 }
441
442 let decrypted = if self.flags.encrypted {
444 match decrypt_fn(&stream) {
445 Ok(d) => d,
446 Err(_) => {
447 self.status = ResourceStatus::Failed;
448 return vec![ResourceAction::Failed(ResourceError::DecryptionFailed)];
449 }
450 }
451 } else {
452 stream
453 };
454
455 if decrypted.len() < RESOURCE_RANDOM_HASH_SIZE {
457 self.status = ResourceStatus::Corrupt;
458 return vec![ResourceAction::Failed(ResourceError::InvalidPart)];
459 }
460 let data_after_random = &decrypted[RESOURCE_RANDOM_HASH_SIZE..];
461
462 let decompressed = if self.flags.compressed {
464 match compressor.decompress(data_after_random) {
465 Some(d) => d,
466 None => {
467 self.status = ResourceStatus::Corrupt;
468 return vec![ResourceAction::Failed(ResourceError::DecompressionFailed)];
469 }
470 }
471 } else {
472 data_after_random.to_vec()
473 };
474
475 let calculated_hash = compute_resource_hash(&decompressed, &self.random_hash);
477 if calculated_hash.as_slice() != self.resource_hash.as_slice() {
478 self.status = ResourceStatus::Corrupt;
479 return vec![ResourceAction::Failed(ResourceError::HashMismatch)];
480 }
481
482 let expected_proof = compute_expected_proof(&decompressed, &calculated_hash);
484 let proof_data = build_proof_data(&calculated_hash, &expected_proof);
485
486 let (data, metadata) = if self.has_metadata && self.segment_index == 1 {
488 match extract_metadata(&decompressed) {
489 Some((meta, rest)) => (rest, Some(meta)),
490 None => {
491 self.status = ResourceStatus::Corrupt;
492 return vec![ResourceAction::Failed(ResourceError::InvalidPart)];
493 }
494 }
495 } else {
496 (decompressed, None)
497 };
498
499 self.status = ResourceStatus::Complete;
500
501 vec![
502 ResourceAction::SendProof(proof_data),
503 ResourceAction::DataReceived { data, metadata },
504 ResourceAction::Completed,
505 ]
506 }
507
508 pub fn handle_cancel(&mut self) -> Vec<ResourceAction> {
510 if self.status < ResourceStatus::Complete {
511 self.status = ResourceStatus::Failed;
512 return vec![ResourceAction::Failed(ResourceError::Rejected)];
513 }
514 vec![]
515 }
516
517 pub fn tick(
519 &mut self,
520 now: f64,
521 decrypt_fn: &dyn Fn(&[u8]) -> Result<Vec<u8>, ()>,
522 compressor: &dyn Compressor,
523 ) -> Vec<ResourceAction> {
524 if self.status >= ResourceStatus::Assembling {
525 return vec![];
526 }
527
528 if self.status == ResourceStatus::Transferring {
529 if self.received_count == self.total_parts {
531 return self.assemble(decrypt_fn, compressor);
532 }
533
534 let eifr = self.compute_eifr();
536 let retries_used = self.max_retries - self.retries_left;
537 let extra_wait = retries_used as f64 * RESOURCE_PER_RETRY_DELAY;
538 let expected_tof = if self.outstanding_parts > 0 && eifr > 0.0 {
539 (self.outstanding_parts as f64 * self.sdu as f64 * 8.0) / eifr
540 } else {
541 if eifr > 0.0 {
542 (3.0 * self.sdu as f64) / eifr
543 } else {
544 10.0 }
546 };
547
548 let sleep_time = self.last_activity
549 + self.part_timeout_factor * expected_tof
550 + RESOURCE_RETRY_GRACE_TIME
551 + extra_wait;
552
553 if now > sleep_time {
554 if self.retries_left > 0 {
555 self.window.on_timeout();
557 self.retries_left -= 1;
558 self.waiting_for_hmu = false;
559 return self.request_next(now);
560 } else {
561 self.status = ResourceStatus::Failed;
562 return vec![ResourceAction::Failed(ResourceError::MaxRetriesExceeded)];
563 }
564 }
565 }
566
567 vec![]
568 }
569
570 fn compute_eifr(&mut self) -> f64 {
572 let eifr = if self.req_data_rtt_rate > 0.0 {
573 self.req_data_rtt_rate * 8.0
574 } else if let Some(prev) = self.previous_eifr {
575 prev
576 } else {
577 let rtt = self.rtt.unwrap_or(self.link_rtt);
579 if rtt > 0.0 {
580 (self.sdu as f64 * 8.0) / rtt
581 } else {
582 10000.0
583 }
584 };
585 self.eifr = Some(eifr);
586 eifr
587 }
588
589 pub fn progress(&self) -> (usize, usize) {
591 (self.received_count, self.total_parts)
592 }
593
594 pub fn get_transfer_state(&self) -> (usize, Option<f64>) {
596 (self.window.window, self.eifr)
597 }
598}
599
600#[cfg(test)]
601mod tests {
602 use super::*;
603 use crate::buffer::types::NoopCompressor;
604 use crate::resource::sender::ResourceSender;
605
606 fn identity_encrypt(data: &[u8]) -> Vec<u8> {
607 data.to_vec()
608 }
609
610 fn identity_decrypt(data: &[u8]) -> Result<Vec<u8>, ()> {
611 Ok(data.to_vec())
612 }
613
614 fn make_sender_receiver() -> (ResourceSender, ResourceReceiver) {
615 let mut rng = rns_crypto::FixedRng::new(&[0x42; 64]);
616 let data = b"Hello, Resource Transfer!";
617
618 let sender = ResourceSender::new(
619 data,
620 None,
621 RESOURCE_SDU,
622 &identity_encrypt,
623 &NoopCompressor,
624 &mut rng,
625 1000.0,
626 false,
627 false,
628 None,
629 1,
630 1,
631 None,
632 0.5,
633 6.0,
634 )
635 .unwrap();
636
637 let adv_data = sender.get_advertisement(0);
638 let receiver =
639 ResourceReceiver::from_advertisement(&adv_data, RESOURCE_SDU, 0.5, 1000.0, None, None)
640 .unwrap();
641
642 (sender, receiver)
643 }
644
645 #[test]
646 fn test_from_advertisement() {
647 let (sender, receiver) = make_sender_receiver();
648 assert_eq!(receiver.total_parts, sender.total_parts());
649 assert_eq!(receiver.transfer_size, sender.transfer_size as u64);
650 assert_eq!(receiver.resource_hash, sender.resource_hash.to_vec());
651 }
652
653 #[test]
654 fn test_accept() {
655 let (_, mut receiver) = make_sender_receiver();
656 let actions = receiver.accept(1000.0);
657 assert_eq!(receiver.status, ResourceStatus::Transferring);
658 assert!(!actions.is_empty());
659 assert!(actions
660 .iter()
661 .any(|a| matches!(a, ResourceAction::SendRequest(_))));
662 }
663
664 #[test]
665 fn test_reject() {
666 let (_, mut receiver) = make_sender_receiver();
667 let actions = receiver.reject();
668 assert_eq!(receiver.status, ResourceStatus::Rejected);
669 assert!(actions
670 .iter()
671 .any(|a| matches!(a, ResourceAction::SendCancelReceiver(_))));
672 }
673
674 #[test]
675 fn test_receive_part_stores() {
676 let (mut sender, mut receiver) = make_sender_receiver();
677 receiver.accept(1000.0);
678
679 let mut request = Vec::new();
682 request.push(RESOURCE_HASHMAP_IS_NOT_EXHAUSTED);
683 request.extend_from_slice(&sender.resource_hash);
684 request.extend_from_slice(&sender.part_hashes[0]);
685
686 let send_actions = sender.handle_request(&request, 1001.0);
687 let part_data = send_actions
688 .iter()
689 .find_map(|a| match a {
690 ResourceAction::SendPart(d) => Some(d.clone()),
691 _ => None,
692 })
693 .unwrap();
694
695 receiver.req_sent = 1000.5;
697 let _actions = receiver.receive_part(&part_data, 1001.0);
698 assert_eq!(receiver.received_count, 1);
699 }
700
701 #[test]
702 fn test_consecutive_completed_height() {
703 let (sender, mut receiver) = make_sender_receiver();
704 receiver.accept(1000.0);
705
706 if sender.total_parts() > 1 {
708 assert_eq!(receiver.consecutive_completed_height, -1);
710 }
711 }
712
713 #[test]
714 fn test_handle_cancel() {
715 let (_, mut receiver) = make_sender_receiver();
716 receiver.accept(1000.0);
717 let _actions = receiver.handle_cancel();
718 assert_eq!(receiver.status, ResourceStatus::Failed);
719 }
720
721 #[test]
722 fn test_full_transfer_small_data() {
723 let data = b"small data";
725 let mut rng = rns_crypto::FixedRng::new(&[0x77; 64]);
726
727 let mut sender = ResourceSender::new(
728 data,
729 None,
730 RESOURCE_SDU,
731 &identity_encrypt,
732 &NoopCompressor,
733 &mut rng,
734 1000.0,
735 false,
736 false,
737 None,
738 1,
739 1,
740 None,
741 0.5,
742 6.0,
743 )
744 .unwrap();
745
746 let adv = sender.get_advertisement(0);
747 let mut receiver =
748 ResourceReceiver::from_advertisement(&adv, RESOURCE_SDU, 0.5, 1000.0, None, None)
749 .unwrap();
750
751 let req_actions = receiver.accept(1001.0);
753 assert_eq!(receiver.status, ResourceStatus::Transferring);
754
755 let request_data = req_actions
757 .iter()
758 .find_map(|a| match a {
759 ResourceAction::SendRequest(d) => Some(d.clone()),
760 _ => None,
761 })
762 .unwrap();
763
764 let send_actions = sender.handle_request(&request_data, 1002.0);
766
767 receiver.req_sent = 1001.0;
769 for action in &send_actions {
770 if let ResourceAction::SendPart(part_data) = action {
771 receiver.receive_part(part_data, 1003.0);
772 }
773 }
774
775 assert_eq!(receiver.received_count, receiver.total_parts);
776
777 let assemble_actions = receiver.assemble(&identity_decrypt, &NoopCompressor);
779
780 let has_proof = assemble_actions
782 .iter()
783 .any(|a| matches!(a, ResourceAction::SendProof(_)));
784 let has_data = assemble_actions
785 .iter()
786 .any(|a| matches!(a, ResourceAction::DataReceived { .. }));
787 let has_complete = assemble_actions
788 .iter()
789 .any(|a| matches!(a, ResourceAction::Completed));
790
791 assert!(has_proof, "Should send proof");
792 assert!(has_data, "Should return data");
793 assert!(has_complete, "Should be completed");
794
795 let received_data = assemble_actions
797 .iter()
798 .find_map(|a| match a {
799 ResourceAction::DataReceived { data, .. } => Some(data.clone()),
800 _ => None,
801 })
802 .unwrap();
803 assert_eq!(received_data, data);
804
805 let proof_data = assemble_actions
807 .iter()
808 .find_map(|a| match a {
809 ResourceAction::SendProof(d) => Some(d.clone()),
810 _ => None,
811 })
812 .unwrap();
813
814 let _proof_actions = sender.handle_proof(&proof_data, 1004.0);
815 assert_eq!(sender.status, ResourceStatus::Complete);
816 }
817
818 #[test]
819 fn test_full_transfer_with_metadata() {
820 let data = b"data with metadata";
821 let metadata = b"some metadata";
822 let mut rng = rns_crypto::FixedRng::new(&[0x88; 64]);
823
824 let mut sender = ResourceSender::new(
825 data,
826 Some(metadata),
827 RESOURCE_SDU,
828 &identity_encrypt,
829 &NoopCompressor,
830 &mut rng,
831 1000.0,
832 false,
833 false,
834 None,
835 1,
836 1,
837 None,
838 0.5,
839 6.0,
840 )
841 .unwrap();
842
843 assert!(sender.flags.has_metadata);
844
845 let adv = sender.get_advertisement(0);
846 let mut receiver =
847 ResourceReceiver::from_advertisement(&adv, RESOURCE_SDU, 0.5, 1000.0, None, None)
848 .unwrap();
849
850 assert!(receiver.has_metadata);
851
852 let req_actions = receiver.accept(1001.0);
854 let request_data = req_actions
855 .iter()
856 .find_map(|a| match a {
857 ResourceAction::SendRequest(d) => Some(d.clone()),
858 _ => None,
859 })
860 .unwrap();
861
862 let send_actions = sender.handle_request(&request_data, 1002.0);
863 receiver.req_sent = 1001.0;
864 for action in &send_actions {
865 if let ResourceAction::SendPart(part_data) = action {
866 receiver.receive_part(part_data, 1003.0);
867 }
868 }
869
870 let assemble_actions = receiver.assemble(&identity_decrypt, &NoopCompressor);
871
872 let (recv_data, recv_meta) = assemble_actions
873 .iter()
874 .find_map(|a| match a {
875 ResourceAction::DataReceived { data, metadata } => {
876 Some((data.clone(), metadata.clone()))
877 }
878 _ => None,
879 })
880 .unwrap();
881
882 assert_eq!(recv_data, data);
883 assert_eq!(recv_meta.unwrap(), metadata);
884 }
885
886 #[test]
887 fn test_previous_window_restore() {
888 let (_, _receiver) = make_sender_receiver();
889 let adv_data = {
891 let mut rng = rns_crypto::FixedRng::new(&[0x42; 64]);
892 let sender = ResourceSender::new(
893 b"test",
894 None,
895 RESOURCE_SDU,
896 &identity_encrypt,
897 &NoopCompressor,
898 &mut rng,
899 1000.0,
900 false,
901 false,
902 None,
903 1,
904 1,
905 None,
906 0.5,
907 6.0,
908 )
909 .unwrap();
910 sender.get_advertisement(0)
911 };
912
913 let receiver = ResourceReceiver::from_advertisement(
914 &adv_data,
915 RESOURCE_SDU,
916 0.5,
917 1000.0,
918 Some(8),
919 Some(50000.0),
920 )
921 .unwrap();
922 assert_eq!(receiver.window.window, 8);
923 }
924
925 #[test]
926 fn test_tick_timeout_retry() {
927 let (_, mut receiver) = make_sender_receiver();
928 receiver.accept(1000.0);
929 receiver.rtt = Some(0.1);
930
931 let actions = receiver.tick(9999.0, &identity_decrypt, &NoopCompressor);
933 assert!(!actions.is_empty() || receiver.retries_left < RESOURCE_MAX_RETRIES);
935 }
936
937 #[test]
938 fn test_tick_max_retries_exceeded() {
939 let (_, mut receiver) = make_sender_receiver();
940 receiver.accept(1000.0);
941 receiver.retries_left = 0;
942 receiver.rtt = Some(0.001);
943 receiver.eifr = Some(100000.0);
944
945 let _actions = receiver.tick(9999.0, &identity_decrypt, &NoopCompressor);
946 assert_eq!(receiver.status, ResourceStatus::Failed);
947 }
948}