Skip to main content

rns_core/resource/
receiver.rs

1use alloc::vec;
2use alloc::vec::Vec;
3
4use super::advertisement::ResourceAdvertisement;
5use super::parts::{extract_metadata, map_hash};
6use super::proof::{build_proof_data, compute_expected_proof, compute_resource_hash};
7use super::types::*;
8use super::window::WindowState;
9use crate::buffer::types::Compressor;
10use crate::constants::*;
11
12/// Resource receiver state machine.
13///
14/// Unpacks advertisements, requests parts, receives parts, assembles data.
15/// Returns `Vec<ResourceAction>` — no I/O, no callbacks.
16pub struct ResourceReceiver {
17    /// Current status
18    pub status: ResourceStatus,
19    /// Resource hash (from advertisement, 32 bytes)
20    pub resource_hash: Vec<u8>,
21    /// Random hash (from advertisement)
22    pub random_hash: Vec<u8>,
23    /// Original hash
24    pub original_hash: Vec<u8>,
25    /// Flags
26    pub flags: AdvFlags,
27    /// Transfer size (encrypted)
28    pub transfer_size: u64,
29    /// Total uncompressed data size
30    pub data_size: u64,
31    /// Total parts
32    pub total_parts: usize,
33    /// Received parts data (None = not yet received)
34    parts: Vec<Option<Vec<u8>>>,
35    /// Hashmap: part index -> map_hash (None if not yet known)
36    hashmap: Vec<Option<[u8; RESOURCE_MAPHASH_LEN]>>,
37    /// Number of hashmap entries populated
38    hashmap_height: usize,
39    /// Whether we're waiting for a hashmap update
40    pub waiting_for_hmu: bool,
41    /// Number of parts received
42    pub received_count: usize,
43    /// Outstanding parts in current window request
44    pub outstanding_parts: usize,
45    /// Consecutive completed height (-1 means none)
46    consecutive_completed_height: isize,
47    /// SDU size
48    sdu: usize,
49    /// Link RTT estimate (from link establishment)
50    link_rtt: f64,
51    /// Retries left
52    pub retries_left: usize,
53    /// Max retries
54    max_retries: usize,
55    /// RTT estimate
56    pub rtt: Option<f64>,
57    /// Part timeout factor
58    part_timeout_factor: f64,
59    /// Last activity timestamp
60    pub last_activity: f64,
61    /// Request sent timestamp
62    pub req_sent: f64,
63    /// Request sent bytes
64    req_sent_bytes: usize,
65    /// Request response timestamp
66    req_resp: Option<f64>,
67    /// RTT received bytes
68    rtt_rxd_bytes: usize,
69    /// RTT received bytes at part request
70    rtt_rxd_bytes_at_part_req: usize,
71    /// Request response RTT rate
72    req_resp_rtt_rate: f64,
73    /// Request data RTT rate
74    req_data_rtt_rate: f64,
75    /// EIFR
76    pub eifr: Option<f64>,
77    /// Previous EIFR from prior transfer
78    previous_eifr: Option<f64>,
79    /// Segment index
80    pub segment_index: u64,
81    /// Total segments
82    pub total_segments: u64,
83    /// Has metadata
84    pub has_metadata: bool,
85    /// Request ID
86    pub request_id: Option<Vec<u8>>,
87    /// Window state
88    pub window: WindowState,
89}
90
91impl ResourceReceiver {
92    /// Create a receiver from an advertisement packet.
93    pub fn from_advertisement(
94        adv_data: &[u8],
95        sdu: usize,
96        link_rtt: f64,
97        now: f64,
98        previous_window: Option<usize>,
99        previous_eifr: Option<f64>,
100    ) -> Result<Self, ResourceError> {
101        let adv = ResourceAdvertisement::unpack(adv_data)?;
102
103        // Validate resource_hash is 32 bytes
104        if adv.resource_hash.len() != 32 {
105            return Err(ResourceError::InvalidAdvertisement);
106        }
107
108        let total_parts = adv.num_parts as usize;
109        let parts_vec: Vec<Option<Vec<u8>>> = vec![None; total_parts];
110        let mut hashmap_vec: Vec<Option<[u8; RESOURCE_MAPHASH_LEN]>> = vec![None; total_parts];
111
112        // Populate initial hashmap from advertisement
113        let initial_hashes = adv.hashmap.len() / RESOURCE_MAPHASH_LEN;
114        let mut hashmap_height = 0;
115        for i in 0..initial_hashes {
116            if i < total_parts {
117                let start = i * RESOURCE_MAPHASH_LEN;
118                let end = start + RESOURCE_MAPHASH_LEN;
119                let mut h = [0u8; RESOURCE_MAPHASH_LEN];
120                h.copy_from_slice(&adv.hashmap[start..end]);
121                hashmap_vec[i] = Some(h);
122                hashmap_height += 1;
123            }
124        }
125
126        let mut window_state = WindowState::new();
127        if let Some(prev_w) = previous_window {
128            window_state.restore(prev_w);
129        }
130
131        Ok(ResourceReceiver {
132            status: ResourceStatus::None,
133            resource_hash: adv.resource_hash,
134            random_hash: adv.random_hash,
135            original_hash: adv.original_hash,
136            flags: adv.flags,
137            transfer_size: adv.transfer_size,
138            data_size: adv.data_size,
139            total_parts,
140            parts: parts_vec,
141            hashmap: hashmap_vec,
142            hashmap_height,
143            waiting_for_hmu: false,
144            received_count: 0,
145            outstanding_parts: 0,
146            consecutive_completed_height: -1,
147            sdu,
148            link_rtt,
149            retries_left: RESOURCE_MAX_RETRIES,
150            max_retries: RESOURCE_MAX_RETRIES,
151            rtt: None,
152            part_timeout_factor: RESOURCE_PART_TIMEOUT_FACTOR,
153            last_activity: now,
154            req_sent: 0.0,
155            req_sent_bytes: 0,
156            req_resp: None,
157            rtt_rxd_bytes: 0,
158            rtt_rxd_bytes_at_part_req: 0,
159            req_resp_rtt_rate: 0.0,
160            req_data_rtt_rate: 0.0,
161            eifr: None,
162            previous_eifr,
163            segment_index: adv.segment_index,
164            total_segments: adv.total_segments,
165            has_metadata: adv.flags.has_metadata,
166            request_id: adv.request_id,
167            window: window_state,
168        })
169    }
170
171    /// Accept the advertised resource. Begins transfer.
172    pub fn accept(&mut self, now: f64) -> Vec<ResourceAction> {
173        self.status = ResourceStatus::Transferring;
174        self.last_activity = now;
175        self.request_next(now)
176    }
177
178    /// Reject the advertised resource.
179    pub fn reject(&mut self) -> Vec<ResourceAction> {
180        self.status = ResourceStatus::Rejected;
181        vec![ResourceAction::SendCancelReceiver(
182            self.resource_hash.clone(),
183        )]
184    }
185
186    /// Receive a part. Matches by map hash and stores it.
187    pub fn receive_part(&mut self, part_data: &[u8], now: f64) -> Vec<ResourceAction> {
188        if self.status == ResourceStatus::Failed {
189            return vec![];
190        }
191
192        self.last_activity = now;
193        self.retries_left = self.max_retries;
194
195        // Update RTT on first part of window
196        if self.req_resp.is_none() {
197            self.req_resp = Some(now);
198            let rtt = now - self.req_sent;
199            self.part_timeout_factor = RESOURCE_PART_TIMEOUT_FACTOR_AFTER_RTT;
200
201            if self.rtt.is_none() {
202                self.rtt = Some(rtt);
203            } else if let Some(current_rtt) = self.rtt {
204                if rtt < current_rtt {
205                    self.rtt = Some(f64::max(current_rtt - current_rtt * 0.05, rtt));
206                } else {
207                    self.rtt = Some(f64::min(current_rtt + current_rtt * 0.05, rtt));
208                }
209            }
210
211            if rtt > 0.0 {
212                let req_resp_cost = part_data.len() + self.req_sent_bytes;
213                self.req_resp_rtt_rate = req_resp_cost as f64 / rtt;
214                self.window.update_req_resp_rate(self.req_resp_rtt_rate);
215            }
216        }
217
218        self.status = ResourceStatus::Transferring;
219
220        // Compute map hash for this part
221        let part_hash = map_hash(part_data, &self.random_hash);
222
223        // Search in the window around consecutive_completed_height
224        let consecutive_idx = if self.consecutive_completed_height >= 0 {
225            self.consecutive_completed_height as usize
226        } else {
227            0
228        };
229
230        let mut matched = false;
231        let search_end = core::cmp::min(consecutive_idx + self.window.window, self.total_parts);
232        for i in consecutive_idx..search_end {
233            if let Some(ref h) = self.hashmap[i] {
234                if *h == part_hash {
235                    if self.parts[i].is_none() {
236                        self.parts[i] = Some(part_data.to_vec());
237                        self.rtt_rxd_bytes += part_data.len();
238                        self.received_count += 1;
239                        self.outstanding_parts = self.outstanding_parts.saturating_sub(1);
240
241                        // Update consecutive completed height
242                        if i as isize == self.consecutive_completed_height + 1 {
243                            self.consecutive_completed_height = i as isize;
244                        }
245
246                        // Walk forward to extend consecutive height
247                        let mut cp = (self.consecutive_completed_height + 1) as usize;
248                        while cp < self.total_parts && self.parts[cp].is_some() {
249                            self.consecutive_completed_height = cp as isize;
250                            cp += 1;
251                        }
252
253                        matched = true;
254                    }
255                    break;
256                }
257            }
258        }
259
260        let mut actions = Vec::new();
261
262        // Check if all parts received
263        if self.received_count == self.total_parts {
264            actions.push(ResourceAction::ProgressUpdate {
265                received: self.received_count,
266                total: self.total_parts,
267            });
268            // Assembly will be triggered by caller
269            return actions;
270        }
271
272        if matched {
273            actions.push(ResourceAction::ProgressUpdate {
274                received: self.received_count,
275                total: self.total_parts,
276            });
277        }
278
279        // Request next window when outstanding is 0
280        if self.outstanding_parts == 0 && self.received_count < self.total_parts {
281            // Window complete — grow
282            self.window.on_window_complete();
283
284            // Update data rate
285            if self.req_sent > 0.0 {
286                let rtt = now - self.req_sent;
287                let req_transferred = self.rtt_rxd_bytes - self.rtt_rxd_bytes_at_part_req;
288                if rtt > 0.0 {
289                    self.req_data_rtt_rate = req_transferred as f64 / rtt;
290                    self.rtt_rxd_bytes_at_part_req = self.rtt_rxd_bytes;
291                    self.window.update_data_rate(self.req_data_rtt_rate);
292                }
293            }
294
295            let next_actions = self.request_next(now);
296            actions.extend(next_actions);
297        }
298
299        actions
300    }
301
302    /// Handle a hashmap update packet.
303    ///
304    /// HMU format: [resource_hash: 32 bytes][msgpack([segment, hashmap])]
305    pub fn handle_hashmap_update(&mut self, hmu_data: &[u8], now: f64) -> Vec<ResourceAction> {
306        if self.status == ResourceStatus::Failed {
307            return vec![];
308        }
309
310        self.last_activity = now;
311        self.retries_left = self.max_retries;
312
313        if hmu_data.len() <= 32 {
314            return vec![];
315        }
316
317        let payload = &hmu_data[32..];
318        let (value, _) = match crate::msgpack::unpack(payload) {
319            Ok(v) => v,
320            Err(_) => return vec![],
321        };
322
323        let arr = match value.as_array() {
324            Some(a) if a.len() >= 2 => a,
325            _ => return vec![],
326        };
327
328        let segment = match arr[0].as_uint() {
329            Some(s) => s as usize,
330            None => return vec![],
331        };
332
333        let hashmap_bytes = match arr[1].as_bin() {
334            Some(b) => b,
335            None => return vec![],
336        };
337
338        // Populate hashmap slots
339        let seg_len = RESOURCE_HASHMAP_MAX_LEN;
340        let num_hashes = hashmap_bytes.len() / RESOURCE_MAPHASH_LEN;
341        for i in 0..num_hashes {
342            let idx = i + segment * seg_len;
343            if idx < self.total_parts {
344                let start = i * RESOURCE_MAPHASH_LEN;
345                let end = start + RESOURCE_MAPHASH_LEN;
346                if self.hashmap[idx].is_none() {
347                    self.hashmap_height += 1;
348                }
349                let mut h = [0u8; RESOURCE_MAPHASH_LEN];
350                h.copy_from_slice(&hashmap_bytes[start..end]);
351                self.hashmap[idx] = Some(h);
352            }
353        }
354
355        self.waiting_for_hmu = false;
356        self.request_next(now)
357    }
358
359    /// Build and return request for next window of parts.
360    pub fn request_next(&mut self, now: f64) -> Vec<ResourceAction> {
361        if self.status == ResourceStatus::Failed || self.waiting_for_hmu {
362            return vec![];
363        }
364
365        self.outstanding_parts = 0;
366        let mut hashmap_exhausted = RESOURCE_HASHMAP_IS_NOT_EXHAUSTED;
367        let mut requested_hashes = Vec::new();
368
369        let pn_start = (self.consecutive_completed_height + 1) as usize;
370        let search_end = core::cmp::min(pn_start + self.window.window, self.total_parts);
371        let mut i = 0;
372
373        for pn in pn_start..search_end {
374            if self.parts[pn].is_none() {
375                match self.hashmap[pn] {
376                    Some(ref h) => {
377                        requested_hashes.extend_from_slice(h);
378                        self.outstanding_parts += 1;
379                        i += 1;
380                    }
381                    None => {
382                        hashmap_exhausted = RESOURCE_HASHMAP_IS_EXHAUSTED;
383                    }
384                }
385            }
386            if i >= self.window.window || hashmap_exhausted == RESOURCE_HASHMAP_IS_EXHAUSTED {
387                break;
388            }
389        }
390
391        let mut request_data = Vec::new();
392        request_data.push(hashmap_exhausted);
393        if hashmap_exhausted == RESOURCE_HASHMAP_IS_EXHAUSTED {
394            // Append last known map hash
395            if self.hashmap_height > 0 {
396                if let Some(ref last_hash) = self.hashmap[self.hashmap_height - 1] {
397                    request_data.extend_from_slice(last_hash);
398                } else {
399                    request_data.extend_from_slice(&[0u8; RESOURCE_MAPHASH_LEN]);
400                }
401            } else {
402                request_data.extend_from_slice(&[0u8; RESOURCE_MAPHASH_LEN]);
403            }
404            self.waiting_for_hmu = true;
405        }
406
407        request_data.extend_from_slice(&self.resource_hash);
408        request_data.extend_from_slice(&requested_hashes);
409
410        self.last_activity = now;
411        self.req_sent = now;
412        self.req_sent_bytes = request_data.len();
413        self.req_resp = None;
414
415        vec![ResourceAction::SendRequest(request_data)]
416    }
417
418    /// Assemble received parts, decrypt, decompress, verify hash.
419    pub fn assemble(
420        &mut self,
421        decrypt_fn: &dyn Fn(&[u8]) -> Result<Vec<u8>, ()>,
422        compressor: &dyn Compressor,
423    ) -> Vec<ResourceAction> {
424        if self.received_count != self.total_parts {
425            return vec![ResourceAction::Failed(ResourceError::InvalidState)];
426        }
427
428        self.status = ResourceStatus::Assembling;
429
430        // Join all parts
431        let mut stream = Vec::new();
432        for part in &self.parts {
433            match part {
434                Some(data) => stream.extend_from_slice(data),
435                None => {
436                    self.status = ResourceStatus::Failed;
437                    return vec![ResourceAction::Failed(ResourceError::InvalidState)];
438                }
439            }
440        }
441
442        // Decrypt
443        let decrypted = if self.flags.encrypted {
444            match decrypt_fn(&stream) {
445                Ok(d) => d,
446                Err(_) => {
447                    self.status = ResourceStatus::Failed;
448                    return vec![ResourceAction::Failed(ResourceError::DecryptionFailed)];
449                }
450            }
451        } else {
452            stream
453        };
454
455        // Strip random hash prefix
456        if decrypted.len() < RESOURCE_RANDOM_HASH_SIZE {
457            self.status = ResourceStatus::Corrupt;
458            return vec![ResourceAction::Failed(ResourceError::InvalidPart)];
459        }
460        let data_after_random = &decrypted[RESOURCE_RANDOM_HASH_SIZE..];
461
462        // Decompress
463        let decompressed = if self.flags.compressed {
464            match compressor.decompress(data_after_random) {
465                Some(d) => d,
466                None => {
467                    self.status = ResourceStatus::Corrupt;
468                    return vec![ResourceAction::Failed(ResourceError::DecompressionFailed)];
469                }
470            }
471        } else {
472            data_after_random.to_vec()
473        };
474
475        // Verify hash
476        let calculated_hash = compute_resource_hash(&decompressed, &self.random_hash);
477        if calculated_hash.as_slice() != self.resource_hash.as_slice() {
478            self.status = ResourceStatus::Corrupt;
479            return vec![ResourceAction::Failed(ResourceError::HashMismatch)];
480        }
481
482        // Compute proof before metadata extraction (proof uses full decompressed data)
483        let expected_proof = compute_expected_proof(&decompressed, &calculated_hash);
484        let proof_data = build_proof_data(&calculated_hash, &expected_proof);
485
486        // Extract metadata if present
487        let (data, metadata) = if self.has_metadata && self.segment_index == 1 {
488            match extract_metadata(&decompressed) {
489                Some((meta, rest)) => (rest, Some(meta)),
490                None => {
491                    self.status = ResourceStatus::Corrupt;
492                    return vec![ResourceAction::Failed(ResourceError::InvalidPart)];
493                }
494            }
495        } else {
496            (decompressed, None)
497        };
498
499        self.status = ResourceStatus::Complete;
500
501        vec![
502            ResourceAction::SendProof(proof_data),
503            ResourceAction::DataReceived { data, metadata },
504            ResourceAction::Completed,
505        ]
506    }
507
508    /// Handle cancel from sender (RESOURCE_ICL).
509    pub fn handle_cancel(&mut self) -> Vec<ResourceAction> {
510        if self.status < ResourceStatus::Complete {
511            self.status = ResourceStatus::Failed;
512            return vec![ResourceAction::Failed(ResourceError::Rejected)];
513        }
514        vec![]
515    }
516
517    /// Periodic tick. Checks for timeouts.
518    pub fn tick(
519        &mut self,
520        now: f64,
521        decrypt_fn: &dyn Fn(&[u8]) -> Result<Vec<u8>, ()>,
522        compressor: &dyn Compressor,
523    ) -> Vec<ResourceAction> {
524        if self.status >= ResourceStatus::Assembling {
525            return vec![];
526        }
527
528        if self.status == ResourceStatus::Transferring {
529            // Check if all parts received — trigger assembly
530            if self.received_count == self.total_parts {
531                return self.assemble(decrypt_fn, compressor);
532            }
533
534            // Compute timeout
535            let eifr = self.compute_eifr();
536            let retries_used = self.max_retries - self.retries_left;
537            let extra_wait = retries_used as f64 * RESOURCE_PER_RETRY_DELAY;
538            let expected_tof = if self.outstanding_parts > 0 && eifr > 0.0 {
539                (self.outstanding_parts as f64 * self.sdu as f64 * 8.0) / eifr
540            } else {
541                if eifr > 0.0 {
542                    (3.0 * self.sdu as f64) / eifr
543                } else {
544                    10.0 // fallback
545                }
546            };
547
548            let sleep_time = self.last_activity
549                + self.part_timeout_factor * expected_tof
550                + RESOURCE_RETRY_GRACE_TIME
551                + extra_wait;
552
553            if now > sleep_time {
554                if self.retries_left > 0 {
555                    // Timeout — shrink window, retry
556                    self.window.on_timeout();
557                    self.retries_left -= 1;
558                    self.waiting_for_hmu = false;
559                    return self.request_next(now);
560                } else {
561                    self.status = ResourceStatus::Failed;
562                    return vec![ResourceAction::Failed(ResourceError::MaxRetriesExceeded)];
563                }
564            }
565        }
566
567        vec![]
568    }
569
570    /// Compute EIFR (expected inflight rate) and update self.eifr.
571    fn compute_eifr(&mut self) -> f64 {
572        let eifr = if self.req_data_rtt_rate > 0.0 {
573            self.req_data_rtt_rate * 8.0
574        } else if let Some(prev) = self.previous_eifr {
575            prev
576        } else {
577            // Fallback: use link_rtt as establishment cost estimate
578            let rtt = self.rtt.unwrap_or(self.link_rtt);
579            if rtt > 0.0 {
580                (self.sdu as f64 * 8.0) / rtt
581            } else {
582                10000.0
583            }
584        };
585        self.eifr = Some(eifr);
586        eifr
587    }
588
589    /// Get current progress as (received, total).
590    pub fn progress(&self) -> (usize, usize) {
591        (self.received_count, self.total_parts)
592    }
593
594    /// Get window and EIFR for passing to next transfer.
595    pub fn get_transfer_state(&self) -> (usize, Option<f64>) {
596        (self.window.window, self.eifr)
597    }
598}
599
600#[cfg(test)]
601mod tests {
602    use super::*;
603    use crate::buffer::types::NoopCompressor;
604    use crate::resource::sender::ResourceSender;
605
606    fn identity_encrypt(data: &[u8]) -> Vec<u8> {
607        data.to_vec()
608    }
609
610    fn identity_decrypt(data: &[u8]) -> Result<Vec<u8>, ()> {
611        Ok(data.to_vec())
612    }
613
614    fn make_sender_receiver() -> (ResourceSender, ResourceReceiver) {
615        let mut rng = rns_crypto::FixedRng::new(&[0x42; 64]);
616        let data = b"Hello, Resource Transfer!";
617
618        let sender = ResourceSender::new(
619            data,
620            None,
621            RESOURCE_SDU,
622            &identity_encrypt,
623            &NoopCompressor,
624            &mut rng,
625            1000.0,
626            false,
627            false,
628            None,
629            1,
630            1,
631            None,
632            0.5,
633            6.0,
634        )
635        .unwrap();
636
637        let adv_data = sender.get_advertisement(0);
638        let receiver =
639            ResourceReceiver::from_advertisement(&adv_data, RESOURCE_SDU, 0.5, 1000.0, None, None)
640                .unwrap();
641
642        (sender, receiver)
643    }
644
645    #[test]
646    fn test_from_advertisement() {
647        let (sender, receiver) = make_sender_receiver();
648        assert_eq!(receiver.total_parts, sender.total_parts());
649        assert_eq!(receiver.transfer_size, sender.transfer_size as u64);
650        assert_eq!(receiver.resource_hash, sender.resource_hash.to_vec());
651    }
652
653    #[test]
654    fn test_accept() {
655        let (_, mut receiver) = make_sender_receiver();
656        let actions = receiver.accept(1000.0);
657        assert_eq!(receiver.status, ResourceStatus::Transferring);
658        assert!(!actions.is_empty());
659        assert!(actions
660            .iter()
661            .any(|a| matches!(a, ResourceAction::SendRequest(_))));
662    }
663
664    #[test]
665    fn test_reject() {
666        let (_, mut receiver) = make_sender_receiver();
667        let actions = receiver.reject();
668        assert_eq!(receiver.status, ResourceStatus::Rejected);
669        assert!(actions
670            .iter()
671            .any(|a| matches!(a, ResourceAction::SendCancelReceiver(_))));
672    }
673
674    #[test]
675    fn test_receive_part_stores() {
676        let (mut sender, mut receiver) = make_sender_receiver();
677        receiver.accept(1000.0);
678
679        // Get part data from sender (we use identity encryption so parts ARE the raw data)
680        // Request first part
681        let mut request = Vec::new();
682        request.push(RESOURCE_HASHMAP_IS_NOT_EXHAUSTED);
683        request.extend_from_slice(&sender.resource_hash);
684        request.extend_from_slice(&sender.part_hashes[0]);
685
686        let send_actions = sender.handle_request(&request, 1001.0);
687        let part_data = send_actions
688            .iter()
689            .find_map(|a| match a {
690                ResourceAction::SendPart(d) => Some(d.clone()),
691                _ => None,
692            })
693            .unwrap();
694
695        // Give it to receiver
696        receiver.req_sent = 1000.5;
697        let _actions = receiver.receive_part(&part_data, 1001.0);
698        assert_eq!(receiver.received_count, 1);
699    }
700
701    #[test]
702    fn test_consecutive_completed_height() {
703        let (sender, mut receiver) = make_sender_receiver();
704        receiver.accept(1000.0);
705
706        // Simulate receiving parts in order for a multi-part resource
707        if sender.total_parts() > 1 {
708            // This only applies to multi-part transfers
709            assert_eq!(receiver.consecutive_completed_height, -1);
710        }
711    }
712
713    #[test]
714    fn test_handle_cancel() {
715        let (_, mut receiver) = make_sender_receiver();
716        receiver.accept(1000.0);
717        let _actions = receiver.handle_cancel();
718        assert_eq!(receiver.status, ResourceStatus::Failed);
719    }
720
721    #[test]
722    fn test_full_transfer_small_data() {
723        // End-to-end: sender creates, receiver accepts, parts flow, assembly completes
724        let data = b"small data";
725        let mut rng = rns_crypto::FixedRng::new(&[0x77; 64]);
726
727        let mut sender = ResourceSender::new(
728            data,
729            None,
730            RESOURCE_SDU,
731            &identity_encrypt,
732            &NoopCompressor,
733            &mut rng,
734            1000.0,
735            false,
736            false,
737            None,
738            1,
739            1,
740            None,
741            0.5,
742            6.0,
743        )
744        .unwrap();
745
746        let adv = sender.get_advertisement(0);
747        let mut receiver =
748            ResourceReceiver::from_advertisement(&adv, RESOURCE_SDU, 0.5, 1000.0, None, None)
749                .unwrap();
750
751        // Accept
752        let req_actions = receiver.accept(1001.0);
753        assert_eq!(receiver.status, ResourceStatus::Transferring);
754
755        // Get request data
756        let request_data = req_actions
757            .iter()
758            .find_map(|a| match a {
759                ResourceAction::SendRequest(d) => Some(d.clone()),
760                _ => None,
761            })
762            .unwrap();
763
764        // Sender handles request
765        let send_actions = sender.handle_request(&request_data, 1002.0);
766
767        // Feed all parts to receiver
768        receiver.req_sent = 1001.0;
769        for action in &send_actions {
770            if let ResourceAction::SendPart(part_data) = action {
771                receiver.receive_part(part_data, 1003.0);
772            }
773        }
774
775        assert_eq!(receiver.received_count, receiver.total_parts);
776
777        // Assemble
778        let assemble_actions = receiver.assemble(&identity_decrypt, &NoopCompressor);
779
780        // Check for proof and data
781        let has_proof = assemble_actions
782            .iter()
783            .any(|a| matches!(a, ResourceAction::SendProof(_)));
784        let has_data = assemble_actions
785            .iter()
786            .any(|a| matches!(a, ResourceAction::DataReceived { .. }));
787        let has_complete = assemble_actions
788            .iter()
789            .any(|a| matches!(a, ResourceAction::Completed));
790
791        assert!(has_proof, "Should send proof");
792        assert!(has_data, "Should return data");
793        assert!(has_complete, "Should be completed");
794
795        // Verify data matches
796        let received_data = assemble_actions
797            .iter()
798            .find_map(|a| match a {
799                ResourceAction::DataReceived { data, .. } => Some(data.clone()),
800                _ => None,
801            })
802            .unwrap();
803        assert_eq!(received_data, data);
804
805        // Verify proof validates on sender side
806        let proof_data = assemble_actions
807            .iter()
808            .find_map(|a| match a {
809                ResourceAction::SendProof(d) => Some(d.clone()),
810                _ => None,
811            })
812            .unwrap();
813
814        let _proof_actions = sender.handle_proof(&proof_data, 1004.0);
815        assert_eq!(sender.status, ResourceStatus::Complete);
816    }
817
818    #[test]
819    fn test_full_transfer_with_metadata() {
820        let data = b"data with metadata";
821        let metadata = b"some metadata";
822        let mut rng = rns_crypto::FixedRng::new(&[0x88; 64]);
823
824        let mut sender = ResourceSender::new(
825            data,
826            Some(metadata),
827            RESOURCE_SDU,
828            &identity_encrypt,
829            &NoopCompressor,
830            &mut rng,
831            1000.0,
832            false,
833            false,
834            None,
835            1,
836            1,
837            None,
838            0.5,
839            6.0,
840        )
841        .unwrap();
842
843        assert!(sender.flags.has_metadata);
844
845        let adv = sender.get_advertisement(0);
846        let mut receiver =
847            ResourceReceiver::from_advertisement(&adv, RESOURCE_SDU, 0.5, 1000.0, None, None)
848                .unwrap();
849
850        assert!(receiver.has_metadata);
851
852        // Transfer all parts
853        let req_actions = receiver.accept(1001.0);
854        let request_data = req_actions
855            .iter()
856            .find_map(|a| match a {
857                ResourceAction::SendRequest(d) => Some(d.clone()),
858                _ => None,
859            })
860            .unwrap();
861
862        let send_actions = sender.handle_request(&request_data, 1002.0);
863        receiver.req_sent = 1001.0;
864        for action in &send_actions {
865            if let ResourceAction::SendPart(part_data) = action {
866                receiver.receive_part(part_data, 1003.0);
867            }
868        }
869
870        let assemble_actions = receiver.assemble(&identity_decrypt, &NoopCompressor);
871
872        let (recv_data, recv_meta) = assemble_actions
873            .iter()
874            .find_map(|a| match a {
875                ResourceAction::DataReceived { data, metadata } => {
876                    Some((data.clone(), metadata.clone()))
877                }
878                _ => None,
879            })
880            .unwrap();
881
882        assert_eq!(recv_data, data);
883        assert_eq!(recv_meta.unwrap(), metadata);
884    }
885
886    #[test]
887    fn test_previous_window_restore() {
888        let (_, _receiver) = make_sender_receiver();
889        // Create with previous window
890        let adv_data = {
891            let mut rng = rns_crypto::FixedRng::new(&[0x42; 64]);
892            let sender = ResourceSender::new(
893                b"test",
894                None,
895                RESOURCE_SDU,
896                &identity_encrypt,
897                &NoopCompressor,
898                &mut rng,
899                1000.0,
900                false,
901                false,
902                None,
903                1,
904                1,
905                None,
906                0.5,
907                6.0,
908            )
909            .unwrap();
910            sender.get_advertisement(0)
911        };
912
913        let receiver = ResourceReceiver::from_advertisement(
914            &adv_data,
915            RESOURCE_SDU,
916            0.5,
917            1000.0,
918            Some(8),
919            Some(50000.0),
920        )
921        .unwrap();
922        assert_eq!(receiver.window.window, 8);
923    }
924
925    #[test]
926    fn test_tick_timeout_retry() {
927        let (_, mut receiver) = make_sender_receiver();
928        receiver.accept(1000.0);
929        receiver.rtt = Some(0.1);
930
931        // Way past timeout
932        let actions = receiver.tick(9999.0, &identity_decrypt, &NoopCompressor);
933        // Should have retried (window decreased, request_next called)
934        assert!(!actions.is_empty() || receiver.retries_left < RESOURCE_MAX_RETRIES);
935    }
936
937    #[test]
938    fn test_tick_max_retries_exceeded() {
939        let (_, mut receiver) = make_sender_receiver();
940        receiver.accept(1000.0);
941        receiver.retries_left = 0;
942        receiver.rtt = Some(0.001);
943        receiver.eifr = Some(100000.0);
944
945        let _actions = receiver.tick(9999.0, &identity_decrypt, &NoopCompressor);
946        assert_eq!(receiver.status, ResourceStatus::Failed);
947    }
948}