Skip to main content

rns_core/resource/
receiver.rs

1use alloc::vec;
2use alloc::vec::Vec;
3
4use crate::constants::*;
5use crate::buffer::types::Compressor;
6use super::types::*;
7use super::advertisement::ResourceAdvertisement;
8use super::parts::{map_hash, extract_metadata};
9use super::proof::{compute_resource_hash, compute_expected_proof, build_proof_data};
10use super::window::WindowState;
11
12/// Resource receiver state machine.
13///
14/// Unpacks advertisements, requests parts, receives parts, assembles data.
15/// Returns `Vec<ResourceAction>` — no I/O, no callbacks.
16pub struct ResourceReceiver {
17    /// Current status
18    pub status: ResourceStatus,
19    /// Resource hash (from advertisement, 32 bytes)
20    pub resource_hash: Vec<u8>,
21    /// Random hash (from advertisement)
22    pub random_hash: Vec<u8>,
23    /// Original hash
24    pub original_hash: Vec<u8>,
25    /// Flags
26    pub flags: AdvFlags,
27    /// Transfer size (encrypted)
28    pub transfer_size: u64,
29    /// Total uncompressed data size
30    pub data_size: u64,
31    /// Total parts
32    pub total_parts: usize,
33    /// Received parts data (None = not yet received)
34    parts: Vec<Option<Vec<u8>>>,
35    /// Hashmap: part index -> map_hash (None if not yet known)
36    hashmap: Vec<Option<[u8; RESOURCE_MAPHASH_LEN]>>,
37    /// Number of hashmap entries populated
38    hashmap_height: usize,
39    /// Whether we're waiting for a hashmap update
40    pub waiting_for_hmu: bool,
41    /// Number of parts received
42    pub received_count: usize,
43    /// Outstanding parts in current window request
44    pub outstanding_parts: usize,
45    /// Consecutive completed height (-1 means none)
46    consecutive_completed_height: isize,
47    /// SDU size
48    sdu: usize,
49    /// Link RTT estimate (from link establishment)
50    link_rtt: f64,
51    /// Retries left
52    pub retries_left: usize,
53    /// Max retries
54    max_retries: usize,
55    /// RTT estimate
56    pub rtt: Option<f64>,
57    /// Part timeout factor
58    part_timeout_factor: f64,
59    /// Last activity timestamp
60    pub last_activity: f64,
61    /// Request sent timestamp
62    pub req_sent: f64,
63    /// Request sent bytes
64    req_sent_bytes: usize,
65    /// Request response timestamp
66    req_resp: Option<f64>,
67    /// RTT received bytes
68    rtt_rxd_bytes: usize,
69    /// RTT received bytes at part request
70    rtt_rxd_bytes_at_part_req: usize,
71    /// Request response RTT rate
72    req_resp_rtt_rate: f64,
73    /// Request data RTT rate
74    req_data_rtt_rate: f64,
75    /// EIFR
76    pub eifr: Option<f64>,
77    /// Previous EIFR from prior transfer
78    previous_eifr: Option<f64>,
79    /// Segment index
80    pub segment_index: u64,
81    /// Total segments
82    pub total_segments: u64,
83    /// Has metadata
84    pub has_metadata: bool,
85    /// Request ID
86    pub request_id: Option<Vec<u8>>,
87    /// Window state
88    pub window: WindowState,
89}
90
91impl ResourceReceiver {
92    /// Create a receiver from an advertisement packet.
93    pub fn from_advertisement(
94        adv_data: &[u8],
95        sdu: usize,
96        link_rtt: f64,
97        now: f64,
98        previous_window: Option<usize>,
99        previous_eifr: Option<f64>,
100    ) -> Result<Self, ResourceError> {
101        let adv = ResourceAdvertisement::unpack(adv_data)?;
102
103        // Validate resource_hash is 32 bytes
104        if adv.resource_hash.len() != 32 {
105            return Err(ResourceError::InvalidAdvertisement);
106        }
107
108        let total_parts = adv.num_parts as usize;
109        let parts_vec: Vec<Option<Vec<u8>>> = vec![None; total_parts];
110        let mut hashmap_vec: Vec<Option<[u8; RESOURCE_MAPHASH_LEN]>> = vec![None; total_parts];
111
112        // Populate initial hashmap from advertisement
113        let initial_hashes = adv.hashmap.len() / RESOURCE_MAPHASH_LEN;
114        let mut hashmap_height = 0;
115        for i in 0..initial_hashes {
116            if i < total_parts {
117                let start = i * RESOURCE_MAPHASH_LEN;
118                let end = start + RESOURCE_MAPHASH_LEN;
119                let mut h = [0u8; RESOURCE_MAPHASH_LEN];
120                h.copy_from_slice(&adv.hashmap[start..end]);
121                hashmap_vec[i] = Some(h);
122                hashmap_height += 1;
123            }
124        }
125
126        let mut window_state = WindowState::new();
127        if let Some(prev_w) = previous_window {
128            window_state.restore(prev_w);
129        }
130
131        Ok(ResourceReceiver {
132            status: ResourceStatus::None,
133            resource_hash: adv.resource_hash,
134            random_hash: adv.random_hash,
135            original_hash: adv.original_hash,
136            flags: adv.flags,
137            transfer_size: adv.transfer_size,
138            data_size: adv.data_size,
139            total_parts,
140            parts: parts_vec,
141            hashmap: hashmap_vec,
142            hashmap_height,
143            waiting_for_hmu: false,
144            received_count: 0,
145            outstanding_parts: 0,
146            consecutive_completed_height: -1,
147            sdu,
148            link_rtt,
149            retries_left: RESOURCE_MAX_RETRIES,
150            max_retries: RESOURCE_MAX_RETRIES,
151            rtt: None,
152            part_timeout_factor: RESOURCE_PART_TIMEOUT_FACTOR,
153            last_activity: now,
154            req_sent: 0.0,
155            req_sent_bytes: 0,
156            req_resp: None,
157            rtt_rxd_bytes: 0,
158            rtt_rxd_bytes_at_part_req: 0,
159            req_resp_rtt_rate: 0.0,
160            req_data_rtt_rate: 0.0,
161            eifr: None,
162            previous_eifr,
163            segment_index: adv.segment_index,
164            total_segments: adv.total_segments,
165            has_metadata: adv.flags.has_metadata,
166            request_id: adv.request_id,
167            window: window_state,
168        })
169    }
170
171    /// Accept the advertised resource. Begins transfer.
172    pub fn accept(&mut self, now: f64) -> Vec<ResourceAction> {
173        self.status = ResourceStatus::Transferring;
174        self.last_activity = now;
175        self.request_next(now)
176    }
177
178    /// Reject the advertised resource.
179    pub fn reject(&mut self) -> Vec<ResourceAction> {
180        self.status = ResourceStatus::Rejected;
181        vec![ResourceAction::SendCancelReceiver(self.resource_hash.clone())]
182    }
183
184    /// Receive a part. Matches by map hash and stores it.
185    pub fn receive_part(&mut self, part_data: &[u8], now: f64) -> Vec<ResourceAction> {
186        if self.status == ResourceStatus::Failed {
187            return vec![];
188        }
189
190        self.last_activity = now;
191        self.retries_left = self.max_retries;
192
193        // Update RTT on first part of window
194        if self.req_resp.is_none() {
195            self.req_resp = Some(now);
196            let rtt = now - self.req_sent;
197            self.part_timeout_factor = RESOURCE_PART_TIMEOUT_FACTOR_AFTER_RTT;
198
199            if self.rtt.is_none() {
200                self.rtt = Some(rtt);
201            } else if let Some(current_rtt) = self.rtt {
202                if rtt < current_rtt {
203                    self.rtt = Some(f64::max(current_rtt - current_rtt * 0.05, rtt));
204                } else {
205                    self.rtt = Some(f64::min(current_rtt + current_rtt * 0.05, rtt));
206                }
207            }
208
209            if rtt > 0.0 {
210                let req_resp_cost = part_data.len() + self.req_sent_bytes;
211                self.req_resp_rtt_rate = req_resp_cost as f64 / rtt;
212                self.window.update_req_resp_rate(self.req_resp_rtt_rate);
213            }
214        }
215
216        self.status = ResourceStatus::Transferring;
217
218        // Compute map hash for this part
219        let part_hash = map_hash(part_data, &self.random_hash);
220
221        // Search in the window around consecutive_completed_height
222        let consecutive_idx = if self.consecutive_completed_height >= 0 {
223            self.consecutive_completed_height as usize
224        } else {
225            0
226        };
227
228        let mut matched = false;
229        let search_end = core::cmp::min(consecutive_idx + self.window.window, self.total_parts);
230        for i in consecutive_idx..search_end {
231            if let Some(ref h) = self.hashmap[i] {
232                if *h == part_hash {
233                    if self.parts[i].is_none() {
234                        self.parts[i] = Some(part_data.to_vec());
235                        self.rtt_rxd_bytes += part_data.len();
236                        self.received_count += 1;
237                        self.outstanding_parts = self.outstanding_parts.saturating_sub(1);
238
239                        // Update consecutive completed height
240                        if i as isize == self.consecutive_completed_height + 1 {
241                            self.consecutive_completed_height = i as isize;
242                        }
243
244                        // Walk forward to extend consecutive height
245                        let mut cp = (self.consecutive_completed_height + 1) as usize;
246                        while cp < self.total_parts && self.parts[cp].is_some() {
247                            self.consecutive_completed_height = cp as isize;
248                            cp += 1;
249                        }
250
251                        matched = true;
252                    }
253                    break;
254                }
255            }
256        }
257
258        let mut actions = Vec::new();
259
260        // Check if all parts received
261        if self.received_count == self.total_parts {
262            actions.push(ResourceAction::ProgressUpdate {
263                received: self.received_count,
264                total: self.total_parts,
265            });
266            // Assembly will be triggered by caller
267            return actions;
268        }
269
270        if matched {
271            actions.push(ResourceAction::ProgressUpdate {
272                received: self.received_count,
273                total: self.total_parts,
274            });
275        }
276
277        // Request next window when outstanding is 0
278        if self.outstanding_parts == 0 && self.received_count < self.total_parts {
279            // Window complete — grow
280            self.window.on_window_complete();
281
282            // Update data rate
283            if self.req_sent > 0.0 {
284                let rtt = now - self.req_sent;
285                let req_transferred = self.rtt_rxd_bytes - self.rtt_rxd_bytes_at_part_req;
286                if rtt > 0.0 {
287                    self.req_data_rtt_rate = req_transferred as f64 / rtt;
288                    self.rtt_rxd_bytes_at_part_req = self.rtt_rxd_bytes;
289                    self.window.update_data_rate(self.req_data_rtt_rate);
290                }
291            }
292
293            let next_actions = self.request_next(now);
294            actions.extend(next_actions);
295        }
296
297        actions
298    }
299
300    /// Handle a hashmap update packet.
301    ///
302    /// HMU format: [resource_hash: 32 bytes][msgpack([segment, hashmap])]
303    pub fn handle_hashmap_update(&mut self, hmu_data: &[u8], now: f64) -> Vec<ResourceAction> {
304        if self.status == ResourceStatus::Failed {
305            return vec![];
306        }
307
308        self.last_activity = now;
309        self.retries_left = self.max_retries;
310
311        if hmu_data.len() <= 32 {
312            return vec![];
313        }
314
315        let payload = &hmu_data[32..];
316        let (value, _) = match crate::msgpack::unpack(payload) {
317            Ok(v) => v,
318            Err(_) => return vec![],
319        };
320
321        let arr = match value.as_array() {
322            Some(a) if a.len() >= 2 => a,
323            _ => return vec![],
324        };
325
326        let segment = match arr[0].as_uint() {
327            Some(s) => s as usize,
328            None => return vec![],
329        };
330
331        let hashmap_bytes = match arr[1].as_bin() {
332            Some(b) => b,
333            None => return vec![],
334        };
335
336        // Populate hashmap slots
337        let seg_len = RESOURCE_HASHMAP_MAX_LEN;
338        let num_hashes = hashmap_bytes.len() / RESOURCE_MAPHASH_LEN;
339        for i in 0..num_hashes {
340            let idx = i + segment * seg_len;
341            if idx < self.total_parts {
342                let start = i * RESOURCE_MAPHASH_LEN;
343                let end = start + RESOURCE_MAPHASH_LEN;
344                if self.hashmap[idx].is_none() {
345                    self.hashmap_height += 1;
346                }
347                let mut h = [0u8; RESOURCE_MAPHASH_LEN];
348                h.copy_from_slice(&hashmap_bytes[start..end]);
349                self.hashmap[idx] = Some(h);
350            }
351        }
352
353        self.waiting_for_hmu = false;
354        self.request_next(now)
355    }
356
357    /// Build and return request for next window of parts.
358    pub fn request_next(&mut self, now: f64) -> Vec<ResourceAction> {
359        if self.status == ResourceStatus::Failed || self.waiting_for_hmu {
360            return vec![];
361        }
362
363        self.outstanding_parts = 0;
364        let mut hashmap_exhausted = RESOURCE_HASHMAP_IS_NOT_EXHAUSTED;
365        let mut requested_hashes = Vec::new();
366
367        let pn_start = (self.consecutive_completed_height + 1) as usize;
368        let search_end = core::cmp::min(pn_start + self.window.window, self.total_parts);
369        let mut i = 0;
370
371        for pn in pn_start..search_end {
372            if self.parts[pn].is_none() {
373                match self.hashmap[pn] {
374                    Some(ref h) => {
375                        requested_hashes.extend_from_slice(h);
376                        self.outstanding_parts += 1;
377                        i += 1;
378                    }
379                    None => {
380                        hashmap_exhausted = RESOURCE_HASHMAP_IS_EXHAUSTED;
381                    }
382                }
383            }
384            if i >= self.window.window || hashmap_exhausted == RESOURCE_HASHMAP_IS_EXHAUSTED {
385                break;
386            }
387        }
388
389        let mut request_data = Vec::new();
390        request_data.push(hashmap_exhausted);
391        if hashmap_exhausted == RESOURCE_HASHMAP_IS_EXHAUSTED {
392            // Append last known map hash
393            if self.hashmap_height > 0 {
394                if let Some(ref last_hash) = self.hashmap[self.hashmap_height - 1] {
395                    request_data.extend_from_slice(last_hash);
396                } else {
397                    request_data.extend_from_slice(&[0u8; RESOURCE_MAPHASH_LEN]);
398                }
399            } else {
400                request_data.extend_from_slice(&[0u8; RESOURCE_MAPHASH_LEN]);
401            }
402            self.waiting_for_hmu = true;
403        }
404
405        request_data.extend_from_slice(&self.resource_hash);
406        request_data.extend_from_slice(&requested_hashes);
407
408        self.last_activity = now;
409        self.req_sent = now;
410        self.req_sent_bytes = request_data.len();
411        self.req_resp = None;
412
413        vec![ResourceAction::SendRequest(request_data)]
414    }
415
416    /// Assemble received parts, decrypt, decompress, verify hash.
417    pub fn assemble(
418        &mut self,
419        decrypt_fn: &dyn Fn(&[u8]) -> Result<Vec<u8>, ()>,
420        compressor: &dyn Compressor,
421    ) -> Vec<ResourceAction> {
422        if self.received_count != self.total_parts {
423            return vec![ResourceAction::Failed(ResourceError::InvalidState)];
424        }
425
426        self.status = ResourceStatus::Assembling;
427
428        // Join all parts
429        let mut stream = Vec::new();
430        for part in &self.parts {
431            match part {
432                Some(data) => stream.extend_from_slice(data),
433                None => {
434                    self.status = ResourceStatus::Failed;
435                    return vec![ResourceAction::Failed(ResourceError::InvalidState)];
436                }
437            }
438        }
439
440        // Decrypt
441        let decrypted = if self.flags.encrypted {
442            match decrypt_fn(&stream) {
443                Ok(d) => d,
444                Err(_) => {
445                    self.status = ResourceStatus::Failed;
446                    return vec![ResourceAction::Failed(ResourceError::DecryptionFailed)];
447                }
448            }
449        } else {
450            stream
451        };
452
453        // Strip random hash prefix
454        if decrypted.len() < RESOURCE_RANDOM_HASH_SIZE {
455            self.status = ResourceStatus::Corrupt;
456            return vec![ResourceAction::Failed(ResourceError::InvalidPart)];
457        }
458        let data_after_random = &decrypted[RESOURCE_RANDOM_HASH_SIZE..];
459
460        // Decompress
461        let decompressed = if self.flags.compressed {
462            match compressor.decompress(data_after_random) {
463                Some(d) => d,
464                None => {
465                    self.status = ResourceStatus::Corrupt;
466                    return vec![ResourceAction::Failed(ResourceError::DecompressionFailed)];
467                }
468            }
469        } else {
470            data_after_random.to_vec()
471        };
472
473        // Verify hash
474        let calculated_hash = compute_resource_hash(&decompressed, &self.random_hash);
475        if calculated_hash.as_slice() != self.resource_hash.as_slice() {
476            self.status = ResourceStatus::Corrupt;
477            return vec![ResourceAction::Failed(ResourceError::HashMismatch)];
478        }
479
480        // Compute proof before metadata extraction (proof uses full decompressed data)
481        let expected_proof = compute_expected_proof(&decompressed, &calculated_hash);
482        let proof_data = build_proof_data(&calculated_hash, &expected_proof);
483
484        // Extract metadata if present
485        let (data, metadata) = if self.has_metadata && self.segment_index == 1 {
486            match extract_metadata(&decompressed) {
487                Some((meta, rest)) => (rest, Some(meta)),
488                None => {
489                    self.status = ResourceStatus::Corrupt;
490                    return vec![ResourceAction::Failed(ResourceError::InvalidPart)];
491                }
492            }
493        } else {
494            (decompressed, None)
495        };
496
497        self.status = ResourceStatus::Complete;
498
499        vec![
500            ResourceAction::SendProof(proof_data),
501            ResourceAction::DataReceived { data, metadata },
502            ResourceAction::Completed,
503        ]
504    }
505
506    /// Handle cancel from sender (RESOURCE_ICL).
507    pub fn handle_cancel(&mut self) -> Vec<ResourceAction> {
508        if self.status < ResourceStatus::Complete {
509            self.status = ResourceStatus::Failed;
510            return vec![ResourceAction::Failed(ResourceError::Rejected)];
511        }
512        vec![]
513    }
514
515    /// Periodic tick. Checks for timeouts.
516    pub fn tick(
517        &mut self,
518        now: f64,
519        decrypt_fn: &dyn Fn(&[u8]) -> Result<Vec<u8>, ()>,
520        compressor: &dyn Compressor,
521    ) -> Vec<ResourceAction> {
522        if self.status >= ResourceStatus::Assembling {
523            return vec![];
524        }
525
526        if self.status == ResourceStatus::Transferring {
527            // Check if all parts received — trigger assembly
528            if self.received_count == self.total_parts {
529                return self.assemble(decrypt_fn, compressor);
530            }
531
532            // Compute timeout
533            let eifr = self.compute_eifr();
534            let retries_used = self.max_retries - self.retries_left;
535            let extra_wait = retries_used as f64 * RESOURCE_PER_RETRY_DELAY;
536            let expected_tof = if self.outstanding_parts > 0 && eifr > 0.0 {
537                (self.outstanding_parts as f64 * self.sdu as f64 * 8.0) / eifr
538            } else {
539                if eifr > 0.0 {
540                    (3.0 * self.sdu as f64) / eifr
541                } else {
542                    10.0 // fallback
543                }
544            };
545
546            let sleep_time = self.last_activity
547                + self.part_timeout_factor * expected_tof
548                + RESOURCE_RETRY_GRACE_TIME
549                + extra_wait;
550
551            if now > sleep_time {
552                if self.retries_left > 0 {
553                    // Timeout — shrink window, retry
554                    self.window.on_timeout();
555                    self.retries_left -= 1;
556                    self.waiting_for_hmu = false;
557                    return self.request_next(now);
558                } else {
559                    self.status = ResourceStatus::Failed;
560                    return vec![ResourceAction::Failed(ResourceError::MaxRetriesExceeded)];
561                }
562            }
563        }
564
565        vec![]
566    }
567
568    /// Compute EIFR (expected inflight rate) and update self.eifr.
569    fn compute_eifr(&mut self) -> f64 {
570        let eifr = if self.req_data_rtt_rate > 0.0 {
571            self.req_data_rtt_rate * 8.0
572        } else if let Some(prev) = self.previous_eifr {
573            prev
574        } else {
575            // Fallback: use link_rtt as establishment cost estimate
576            let rtt = self.rtt.unwrap_or(self.link_rtt);
577            if rtt > 0.0 {
578                (self.sdu as f64 * 8.0) / rtt
579            } else {
580                10000.0
581            }
582        };
583        self.eifr = Some(eifr);
584        eifr
585    }
586
587    /// Get current progress as (received, total).
588    pub fn progress(&self) -> (usize, usize) {
589        (self.received_count, self.total_parts)
590    }
591
592    /// Get window and EIFR for passing to next transfer.
593    pub fn get_transfer_state(&self) -> (usize, Option<f64>) {
594        (self.window.window, self.eifr)
595    }
596}
597
598#[cfg(test)]
599mod tests {
600    use super::*;
601    use crate::buffer::types::NoopCompressor;
602    use crate::resource::sender::ResourceSender;
603
604    fn identity_encrypt(data: &[u8]) -> Vec<u8> {
605        data.to_vec()
606    }
607
608    fn identity_decrypt(data: &[u8]) -> Result<Vec<u8>, ()> {
609        Ok(data.to_vec())
610    }
611
612    fn make_sender_receiver() -> (ResourceSender, ResourceReceiver) {
613        let mut rng = rns_crypto::FixedRng::new(&[0x42; 64]);
614        let data = b"Hello, Resource Transfer!";
615
616        let sender = ResourceSender::new(
617            data,
618            None,
619            RESOURCE_SDU,
620            &identity_encrypt,
621            &NoopCompressor,
622            &mut rng,
623            1000.0,
624            false,
625            false,
626            None,
627            1,
628            1,
629            None,
630            0.5,
631            6.0,
632        )
633        .unwrap();
634
635        let adv_data = sender.get_advertisement(0);
636        let receiver = ResourceReceiver::from_advertisement(
637            &adv_data,
638            RESOURCE_SDU,
639            0.5,
640            1000.0,
641            None,
642            None,
643        )
644        .unwrap();
645
646        (sender, receiver)
647    }
648
649    #[test]
650    fn test_from_advertisement() {
651        let (sender, receiver) = make_sender_receiver();
652        assert_eq!(receiver.total_parts, sender.total_parts());
653        assert_eq!(receiver.transfer_size, sender.transfer_size as u64);
654        assert_eq!(receiver.resource_hash, sender.resource_hash.to_vec());
655    }
656
657    #[test]
658    fn test_accept() {
659        let (_, mut receiver) = make_sender_receiver();
660        let actions = receiver.accept(1000.0);
661        assert_eq!(receiver.status, ResourceStatus::Transferring);
662        assert!(!actions.is_empty());
663        assert!(actions.iter().any(|a| matches!(a, ResourceAction::SendRequest(_))));
664    }
665
666    #[test]
667    fn test_reject() {
668        let (_, mut receiver) = make_sender_receiver();
669        let actions = receiver.reject();
670        assert_eq!(receiver.status, ResourceStatus::Rejected);
671        assert!(actions.iter().any(|a| matches!(a, ResourceAction::SendCancelReceiver(_))));
672    }
673
674    #[test]
675    fn test_receive_part_stores() {
676        let (mut sender, mut receiver) = make_sender_receiver();
677        receiver.accept(1000.0);
678
679        // Get part data from sender (we use identity encryption so parts ARE the raw data)
680        // Request first part
681        let mut request = Vec::new();
682        request.push(RESOURCE_HASHMAP_IS_NOT_EXHAUSTED);
683        request.extend_from_slice(&sender.resource_hash);
684        request.extend_from_slice(&sender.part_hashes[0]);
685
686        let send_actions = sender.handle_request(&request, 1001.0);
687        let part_data = send_actions
688            .iter()
689            .find_map(|a| match a {
690                ResourceAction::SendPart(d) => Some(d.clone()),
691                _ => None,
692            })
693            .unwrap();
694
695        // Give it to receiver
696        receiver.req_sent = 1000.5;
697        let _actions = receiver.receive_part(&part_data, 1001.0);
698        assert_eq!(receiver.received_count, 1);
699    }
700
701    #[test]
702    fn test_consecutive_completed_height() {
703        let (sender, mut receiver) = make_sender_receiver();
704        receiver.accept(1000.0);
705
706        // Simulate receiving parts in order for a multi-part resource
707        if sender.total_parts() > 1 {
708            // This only applies to multi-part transfers
709            assert_eq!(receiver.consecutive_completed_height, -1);
710        }
711    }
712
713    #[test]
714    fn test_handle_cancel() {
715        let (_, mut receiver) = make_sender_receiver();
716        receiver.accept(1000.0);
717        let _actions = receiver.handle_cancel();
718        assert_eq!(receiver.status, ResourceStatus::Failed);
719    }
720
721    #[test]
722    fn test_full_transfer_small_data() {
723        // End-to-end: sender creates, receiver accepts, parts flow, assembly completes
724        let data = b"small data";
725        let mut rng = rns_crypto::FixedRng::new(&[0x77; 64]);
726
727        let mut sender = ResourceSender::new(
728            data,
729            None,
730            RESOURCE_SDU,
731            &identity_encrypt,
732            &NoopCompressor,
733            &mut rng,
734            1000.0,
735            false,
736            false,
737            None,
738            1,
739            1,
740            None,
741            0.5,
742            6.0,
743        )
744        .unwrap();
745
746        let adv = sender.get_advertisement(0);
747        let mut receiver = ResourceReceiver::from_advertisement(
748            &adv,
749            RESOURCE_SDU,
750            0.5,
751            1000.0,
752            None,
753            None,
754        )
755        .unwrap();
756
757        // Accept
758        let req_actions = receiver.accept(1001.0);
759        assert_eq!(receiver.status, ResourceStatus::Transferring);
760
761        // Get request data
762        let request_data = req_actions
763            .iter()
764            .find_map(|a| match a {
765                ResourceAction::SendRequest(d) => Some(d.clone()),
766                _ => None,
767            })
768            .unwrap();
769
770        // Sender handles request
771        let send_actions = sender.handle_request(&request_data, 1002.0);
772
773        // Feed all parts to receiver
774        receiver.req_sent = 1001.0;
775        for action in &send_actions {
776            if let ResourceAction::SendPart(part_data) = action {
777                receiver.receive_part(part_data, 1003.0);
778            }
779        }
780
781        assert_eq!(receiver.received_count, receiver.total_parts);
782
783        // Assemble
784        let assemble_actions = receiver.assemble(&identity_decrypt, &NoopCompressor);
785
786        // Check for proof and data
787        let has_proof = assemble_actions.iter().any(|a| matches!(a, ResourceAction::SendProof(_)));
788        let has_data = assemble_actions.iter().any(|a| matches!(a, ResourceAction::DataReceived { .. }));
789        let has_complete = assemble_actions.iter().any(|a| matches!(a, ResourceAction::Completed));
790
791        assert!(has_proof, "Should send proof");
792        assert!(has_data, "Should return data");
793        assert!(has_complete, "Should be completed");
794
795        // Verify data matches
796        let received_data = assemble_actions
797            .iter()
798            .find_map(|a| match a {
799                ResourceAction::DataReceived { data, .. } => Some(data.clone()),
800                _ => None,
801            })
802            .unwrap();
803        assert_eq!(received_data, data);
804
805        // Verify proof validates on sender side
806        let proof_data = assemble_actions
807            .iter()
808            .find_map(|a| match a {
809                ResourceAction::SendProof(d) => Some(d.clone()),
810                _ => None,
811            })
812            .unwrap();
813
814        let _proof_actions = sender.handle_proof(&proof_data, 1004.0);
815        assert_eq!(sender.status, ResourceStatus::Complete);
816    }
817
818    #[test]
819    fn test_full_transfer_with_metadata() {
820        let data = b"data with metadata";
821        let metadata = b"some metadata";
822        let mut rng = rns_crypto::FixedRng::new(&[0x88; 64]);
823
824        let mut sender = ResourceSender::new(
825            data,
826            Some(metadata),
827            RESOURCE_SDU,
828            &identity_encrypt,
829            &NoopCompressor,
830            &mut rng,
831            1000.0,
832            false,
833            false,
834            None,
835            1,
836            1,
837            None,
838            0.5,
839            6.0,
840        )
841        .unwrap();
842
843        assert!(sender.flags.has_metadata);
844
845        let adv = sender.get_advertisement(0);
846        let mut receiver = ResourceReceiver::from_advertisement(
847            &adv,
848            RESOURCE_SDU,
849            0.5,
850            1000.0,
851            None,
852            None,
853        )
854        .unwrap();
855
856        assert!(receiver.has_metadata);
857
858        // Transfer all parts
859        let req_actions = receiver.accept(1001.0);
860        let request_data = req_actions
861            .iter()
862            .find_map(|a| match a {
863                ResourceAction::SendRequest(d) => Some(d.clone()),
864                _ => None,
865            })
866            .unwrap();
867
868        let send_actions = sender.handle_request(&request_data, 1002.0);
869        receiver.req_sent = 1001.0;
870        for action in &send_actions {
871            if let ResourceAction::SendPart(part_data) = action {
872                receiver.receive_part(part_data, 1003.0);
873            }
874        }
875
876        let assemble_actions = receiver.assemble(&identity_decrypt, &NoopCompressor);
877
878        let (recv_data, recv_meta) = assemble_actions
879            .iter()
880            .find_map(|a| match a {
881                ResourceAction::DataReceived { data, metadata } => Some((data.clone(), metadata.clone())),
882                _ => None,
883            })
884            .unwrap();
885
886        assert_eq!(recv_data, data);
887        assert_eq!(recv_meta.unwrap(), metadata);
888    }
889
890    #[test]
891    fn test_previous_window_restore() {
892        let (_, _receiver) = make_sender_receiver();
893        // Create with previous window
894        let adv_data = {
895            let mut rng = rns_crypto::FixedRng::new(&[0x42; 64]);
896            let sender = ResourceSender::new(
897                b"test",
898                None,
899                RESOURCE_SDU,
900                &identity_encrypt,
901                &NoopCompressor,
902                &mut rng,
903                1000.0,
904                false,
905                false,
906                None,
907                1,
908                1,
909                None,
910                0.5,
911                6.0,
912            )
913            .unwrap();
914            sender.get_advertisement(0)
915        };
916
917        let receiver = ResourceReceiver::from_advertisement(
918            &adv_data,
919            RESOURCE_SDU,
920            0.5,
921            1000.0,
922            Some(8),
923            Some(50000.0),
924        )
925        .unwrap();
926        assert_eq!(receiver.window.window, 8);
927    }
928
929    #[test]
930    fn test_tick_timeout_retry() {
931        let (_, mut receiver) = make_sender_receiver();
932        receiver.accept(1000.0);
933        receiver.rtt = Some(0.1);
934
935        // Way past timeout
936        let actions = receiver.tick(9999.0, &identity_decrypt, &NoopCompressor);
937        // Should have retried (window decreased, request_next called)
938        assert!(!actions.is_empty() || receiver.retries_left < RESOURCE_MAX_RETRIES);
939    }
940
941    #[test]
942    fn test_tick_max_retries_exceeded() {
943        let (_, mut receiver) = make_sender_receiver();
944        receiver.accept(1000.0);
945        receiver.retries_left = 0;
946        receiver.rtt = Some(0.001);
947        receiver.eifr = Some(100000.0);
948
949        let _actions = receiver.tick(9999.0, &identity_decrypt, &NoopCompressor);
950        assert_eq!(receiver.status, ResourceStatus::Failed);
951    }
952}