Skip to main content

rns_core/resource/
receiver.rs

1use alloc::vec;
2use alloc::vec::Vec;
3
4use super::advertisement::ResourceAdvertisement;
5use super::parts::{extract_metadata, map_hash};
6use super::proof::{build_proof_data, compute_expected_proof, compute_resource_hash};
7use super::types::*;
8use super::window::WindowState;
9use crate::buffer::types::Compressor;
10use crate::constants::*;
11
12/// Resource receiver state machine.
13///
14/// Unpacks advertisements, requests parts, receives parts, assembles data.
15/// Returns `Vec<ResourceAction>` — no I/O, no callbacks.
16pub struct ResourceReceiver {
17    /// Current status
18    pub status: ResourceStatus,
19    /// Resource hash (from advertisement, 32 bytes)
20    pub resource_hash: Vec<u8>,
21    /// Random hash (from advertisement)
22    pub random_hash: Vec<u8>,
23    /// Original hash
24    pub original_hash: Vec<u8>,
25    /// Flags
26    pub flags: AdvFlags,
27    /// Transfer size (encrypted)
28    pub transfer_size: u64,
29    /// Total uncompressed data size
30    pub data_size: u64,
31    /// Total parts
32    pub total_parts: usize,
33    /// Received parts data (None = not yet received)
34    parts: Vec<Option<Vec<u8>>>,
35    /// Hashmap: part index -> map_hash (None if not yet known)
36    hashmap: Vec<Option<[u8; RESOURCE_MAPHASH_LEN]>>,
37    /// Number of hashmap entries populated
38    hashmap_height: usize,
39    /// Whether we're waiting for a hashmap update
40    pub waiting_for_hmu: bool,
41    /// Number of parts received
42    pub received_count: usize,
43    /// Outstanding parts in current window request
44    pub outstanding_parts: usize,
45    /// Consecutive completed height (-1 means none)
46    consecutive_completed_height: isize,
47    /// SDU size
48    sdu: usize,
49    /// Link RTT estimate (from link establishment)
50    link_rtt: f64,
51    /// Retries left
52    pub retries_left: usize,
53    /// Max retries
54    max_retries: usize,
55    /// RTT estimate
56    pub rtt: Option<f64>,
57    /// Part timeout factor
58    part_timeout_factor: f64,
59    /// Last activity timestamp
60    pub last_activity: f64,
61    /// Request sent timestamp
62    pub req_sent: f64,
63    /// Request sent bytes
64    req_sent_bytes: usize,
65    /// Request response timestamp
66    req_resp: Option<f64>,
67    /// RTT received bytes
68    rtt_rxd_bytes: usize,
69    /// RTT received bytes at part request
70    rtt_rxd_bytes_at_part_req: usize,
71    /// Request response RTT rate
72    req_resp_rtt_rate: f64,
73    /// Request data RTT rate
74    req_data_rtt_rate: f64,
75    /// EIFR
76    pub eifr: Option<f64>,
77    /// Previous EIFR from prior transfer
78    previous_eifr: Option<f64>,
79    /// Segment index
80    pub segment_index: u64,
81    /// Total segments
82    pub total_segments: u64,
83    /// Has metadata
84    pub has_metadata: bool,
85    /// Request ID
86    pub request_id: Option<Vec<u8>>,
87    /// Window state
88    pub window: WindowState,
89    /// Original advertisement bytes for this incoming transfer.
90    pub advertisement_packet: Vec<u8>,
91    /// Maximum allowed decompressed size for compressed resource payloads.
92    pub max_decompressed_size: usize,
93}
94
95impl ResourceReceiver {
96    /// Create a receiver from an advertisement packet.
97    pub fn from_advertisement(
98        adv_data: &[u8],
99        sdu: usize,
100        link_rtt: f64,
101        now: f64,
102        previous_window: Option<usize>,
103        previous_eifr: Option<f64>,
104    ) -> Result<Self, ResourceError> {
105        let adv = ResourceAdvertisement::unpack(adv_data)?;
106
107        // Validate resource_hash is 32 bytes
108        if adv.resource_hash.len() != 32 {
109            return Err(ResourceError::InvalidAdvertisement);
110        }
111
112        let total_parts = adv.num_parts as usize;
113        let parts_vec: Vec<Option<Vec<u8>>> = vec![None; total_parts];
114        let mut hashmap_vec: Vec<Option<[u8; RESOURCE_MAPHASH_LEN]>> = vec![None; total_parts];
115
116        // Populate initial hashmap from advertisement
117        let initial_hashes = adv.hashmap.len() / RESOURCE_MAPHASH_LEN;
118        let mut hashmap_height = 0;
119        for (i, slot) in hashmap_vec.iter_mut().enumerate().take(initial_hashes) {
120            if i < total_parts {
121                let start = i * RESOURCE_MAPHASH_LEN;
122                let end = start + RESOURCE_MAPHASH_LEN;
123                let mut h = [0u8; RESOURCE_MAPHASH_LEN];
124                h.copy_from_slice(&adv.hashmap[start..end]);
125                *slot = Some(h);
126                hashmap_height += 1;
127            }
128        }
129
130        let mut window_state = WindowState::new();
131        if let Some(prev_w) = previous_window {
132            window_state.restore(prev_w);
133        }
134
135        Ok(ResourceReceiver {
136            status: ResourceStatus::None,
137            resource_hash: adv.resource_hash,
138            random_hash: adv.random_hash,
139            original_hash: adv.original_hash,
140            flags: adv.flags,
141            transfer_size: adv.transfer_size,
142            data_size: adv.data_size,
143            total_parts,
144            parts: parts_vec,
145            hashmap: hashmap_vec,
146            hashmap_height,
147            waiting_for_hmu: false,
148            received_count: 0,
149            outstanding_parts: 0,
150            consecutive_completed_height: -1,
151            sdu,
152            link_rtt,
153            retries_left: RESOURCE_MAX_RETRIES,
154            max_retries: RESOURCE_MAX_RETRIES,
155            rtt: None,
156            part_timeout_factor: RESOURCE_PART_TIMEOUT_FACTOR,
157            last_activity: now,
158            req_sent: 0.0,
159            req_sent_bytes: 0,
160            req_resp: None,
161            rtt_rxd_bytes: 0,
162            rtt_rxd_bytes_at_part_req: 0,
163            req_resp_rtt_rate: 0.0,
164            req_data_rtt_rate: 0.0,
165            eifr: None,
166            previous_eifr,
167            segment_index: adv.segment_index,
168            total_segments: adv.total_segments,
169            has_metadata: adv.flags.has_metadata,
170            request_id: adv.request_id,
171            window: window_state,
172            advertisement_packet: adv_data.to_vec(),
173            max_decompressed_size: RESOURCE_AUTO_COMPRESS_MAX_SIZE,
174        })
175    }
176
177    /// Accept the advertised resource. Begins transfer.
178    pub fn accept(&mut self, now: f64) -> Vec<ResourceAction> {
179        self.status = ResourceStatus::Transferring;
180        self.last_activity = now;
181        self.request_next(now)
182    }
183
184    /// Reject the advertised resource.
185    pub fn reject(&mut self) -> Vec<ResourceAction> {
186        self.status = ResourceStatus::Rejected;
187        vec![ResourceAction::SendCancelReceiver(
188            self.resource_hash.clone(),
189        )]
190    }
191
192    fn corrupt_actions(&mut self, error: ResourceError) -> Vec<ResourceAction> {
193        self.status = ResourceStatus::Corrupt;
194        vec![
195            ResourceAction::SendCancelReceiver(self.resource_hash.clone()),
196            ResourceAction::Failed(error),
197            ResourceAction::TeardownLink,
198        ]
199    }
200
201    /// Receive a part. Matches by map hash and stores it.
202    pub fn receive_part(&mut self, part_data: &[u8], now: f64) -> Vec<ResourceAction> {
203        if self.status == ResourceStatus::Failed {
204            return vec![];
205        }
206
207        self.last_activity = now;
208        self.retries_left = self.max_retries;
209
210        // Update RTT on first part of window
211        if self.req_resp.is_none() {
212            self.req_resp = Some(now);
213            let rtt = now - self.req_sent;
214            self.part_timeout_factor = RESOURCE_PART_TIMEOUT_FACTOR_AFTER_RTT;
215
216            if self.rtt.is_none() {
217                self.rtt = Some(rtt);
218            } else if let Some(current_rtt) = self.rtt {
219                if rtt < current_rtt {
220                    self.rtt = Some(f64::max(current_rtt - current_rtt * 0.05, rtt));
221                } else {
222                    self.rtt = Some(f64::min(current_rtt + current_rtt * 0.05, rtt));
223                }
224            }
225
226            if rtt > 0.0 {
227                let req_resp_cost = part_data.len() + self.req_sent_bytes;
228                self.req_resp_rtt_rate = req_resp_cost as f64 / rtt;
229                self.window.update_req_resp_rate(self.req_resp_rtt_rate);
230            }
231        }
232
233        self.status = ResourceStatus::Transferring;
234
235        // Compute map hash for this part
236        let part_hash = map_hash(part_data, &self.random_hash);
237
238        // Search in the window around consecutive_completed_height
239        let consecutive_idx = if self.consecutive_completed_height >= 0 {
240            self.consecutive_completed_height as usize
241        } else {
242            0
243        };
244
245        let mut matched = false;
246        let search_end = core::cmp::min(consecutive_idx + self.window.window, self.total_parts);
247        for i in consecutive_idx..search_end {
248            if let Some(ref h) = self.hashmap[i] {
249                if *h == part_hash {
250                    if self.parts[i].is_none() {
251                        self.parts[i] = Some(part_data.to_vec());
252                        self.rtt_rxd_bytes += part_data.len();
253                        self.received_count += 1;
254                        self.outstanding_parts = self.outstanding_parts.saturating_sub(1);
255
256                        // Update consecutive completed height
257                        if i as isize == self.consecutive_completed_height + 1 {
258                            self.consecutive_completed_height = i as isize;
259                        }
260
261                        // Walk forward to extend consecutive height
262                        let mut cp = (self.consecutive_completed_height + 1) as usize;
263                        while cp < self.total_parts && self.parts[cp].is_some() {
264                            self.consecutive_completed_height = cp as isize;
265                            cp += 1;
266                        }
267
268                        matched = true;
269                    }
270                    break;
271                }
272            }
273        }
274
275        let mut actions = Vec::new();
276
277        // Check if all parts received
278        if self.received_count == self.total_parts {
279            actions.push(ResourceAction::ProgressUpdate {
280                received: self.received_count,
281                total: self.total_parts,
282            });
283            // Assembly will be triggered by caller
284            return actions;
285        }
286
287        if matched {
288            actions.push(ResourceAction::ProgressUpdate {
289                received: self.received_count,
290                total: self.total_parts,
291            });
292        }
293
294        // Request next window when outstanding is 0
295        if self.outstanding_parts == 0 && self.received_count < self.total_parts {
296            // Window complete — grow
297            self.window.on_window_complete();
298
299            // Update data rate
300            if self.req_sent > 0.0 {
301                let rtt = now - self.req_sent;
302                let req_transferred = self.rtt_rxd_bytes - self.rtt_rxd_bytes_at_part_req;
303                if rtt > 0.0 {
304                    self.req_data_rtt_rate = req_transferred as f64 / rtt;
305                    self.rtt_rxd_bytes_at_part_req = self.rtt_rxd_bytes;
306                    self.window.update_data_rate(self.req_data_rtt_rate);
307                }
308            }
309
310            let next_actions = self.request_next(now);
311            actions.extend(next_actions);
312        }
313
314        actions
315    }
316
317    /// Handle a hashmap update packet.
318    ///
319    /// HMU format: [resource_hash: 32 bytes][msgpack([segment, hashmap])]
320    pub fn handle_hashmap_update(&mut self, hmu_data: &[u8], now: f64) -> Vec<ResourceAction> {
321        if self.status == ResourceStatus::Failed {
322            return vec![];
323        }
324
325        self.last_activity = now;
326        self.retries_left = self.max_retries;
327
328        if hmu_data.len() <= 32 {
329            return vec![];
330        }
331
332        let payload = &hmu_data[32..];
333        let (value, _) = match crate::msgpack::unpack(payload) {
334            Ok(v) => v,
335            Err(_) => return vec![],
336        };
337
338        let arr = match value.as_array() {
339            Some(a) if a.len() >= 2 => a,
340            _ => return vec![],
341        };
342
343        let segment = match arr[0].as_uint() {
344            Some(s) => s as usize,
345            None => return vec![],
346        };
347
348        let hashmap_bytes = match arr[1].as_bin() {
349            Some(b) => b,
350            None => return vec![],
351        };
352
353        // Populate hashmap slots
354        let seg_len = RESOURCE_HASHMAP_MAX_LEN;
355        let num_hashes = hashmap_bytes.len() / RESOURCE_MAPHASH_LEN;
356        for i in 0..num_hashes {
357            let idx = i + segment * seg_len;
358            if idx < self.total_parts {
359                let start = i * RESOURCE_MAPHASH_LEN;
360                let end = start + RESOURCE_MAPHASH_LEN;
361                if self.hashmap[idx].is_none() {
362                    self.hashmap_height += 1;
363                }
364                let mut h = [0u8; RESOURCE_MAPHASH_LEN];
365                h.copy_from_slice(&hashmap_bytes[start..end]);
366                self.hashmap[idx] = Some(h);
367            }
368        }
369
370        self.waiting_for_hmu = false;
371        self.request_next(now)
372    }
373
374    /// Build and return request for next window of parts.
375    pub fn request_next(&mut self, now: f64) -> Vec<ResourceAction> {
376        if self.status == ResourceStatus::Failed || self.waiting_for_hmu {
377            return vec![];
378        }
379
380        self.outstanding_parts = 0;
381        let mut hashmap_exhausted = RESOURCE_HASHMAP_IS_NOT_EXHAUSTED;
382        let mut requested_hashes = Vec::new();
383
384        let pn_start = (self.consecutive_completed_height + 1) as usize;
385        let search_end = core::cmp::min(pn_start + self.window.window, self.total_parts);
386        let mut i = 0;
387
388        for pn in pn_start..search_end {
389            if self.parts[pn].is_none() {
390                match self.hashmap[pn] {
391                    Some(ref h) => {
392                        requested_hashes.extend_from_slice(h);
393                        self.outstanding_parts += 1;
394                        i += 1;
395                    }
396                    None => {
397                        hashmap_exhausted = RESOURCE_HASHMAP_IS_EXHAUSTED;
398                    }
399                }
400            }
401            if i >= self.window.window || hashmap_exhausted == RESOURCE_HASHMAP_IS_EXHAUSTED {
402                break;
403            }
404        }
405
406        let mut request_data = Vec::new();
407        request_data.push(hashmap_exhausted);
408        if hashmap_exhausted == RESOURCE_HASHMAP_IS_EXHAUSTED {
409            // Append last known map hash
410            if self.hashmap_height > 0 {
411                if let Some(ref last_hash) = self.hashmap[self.hashmap_height - 1] {
412                    request_data.extend_from_slice(last_hash);
413                } else {
414                    request_data.extend_from_slice(&[0u8; RESOURCE_MAPHASH_LEN]);
415                }
416            } else {
417                request_data.extend_from_slice(&[0u8; RESOURCE_MAPHASH_LEN]);
418            }
419            self.waiting_for_hmu = true;
420        }
421
422        request_data.extend_from_slice(&self.resource_hash);
423        request_data.extend_from_slice(&requested_hashes);
424
425        self.last_activity = now;
426        self.req_sent = now;
427        self.req_sent_bytes = request_data.len();
428        self.req_resp = None;
429
430        vec![ResourceAction::SendRequest(request_data)]
431    }
432
433    /// Assemble received parts, decrypt, decompress, verify hash.
434    #[allow(clippy::type_complexity)]
435    pub fn assemble(
436        &mut self,
437        decrypt_fn: &dyn Fn(&[u8]) -> Result<Vec<u8>, ()>,
438        compressor: &dyn Compressor,
439    ) -> Vec<ResourceAction> {
440        if self.received_count != self.total_parts {
441            return vec![ResourceAction::Failed(ResourceError::InvalidState)];
442        }
443
444        self.status = ResourceStatus::Assembling;
445
446        // Join all parts
447        let mut stream = Vec::new();
448        for part in &self.parts {
449            match part {
450                Some(data) => stream.extend_from_slice(data),
451                None => {
452                    self.status = ResourceStatus::Failed;
453                    return vec![ResourceAction::Failed(ResourceError::InvalidState)];
454                }
455            }
456        }
457
458        // Decrypt
459        let decrypted = if self.flags.encrypted {
460            match decrypt_fn(&stream) {
461                Ok(d) => d,
462                Err(_) => {
463                    self.status = ResourceStatus::Failed;
464                    return vec![ResourceAction::Failed(ResourceError::DecryptionFailed)];
465                }
466            }
467        } else {
468            stream
469        };
470
471        // Strip random hash prefix
472        if decrypted.len() < RESOURCE_RANDOM_HASH_SIZE {
473            return self.corrupt_actions(ResourceError::InvalidPart);
474        }
475        let data_after_random = &decrypted[RESOURCE_RANDOM_HASH_SIZE..];
476
477        // Decompress
478        let decompressed = if self.flags.compressed {
479            match compressor.decompress_bounded(data_after_random, self.max_decompressed_size) {
480                Ok(d) => d,
481                Err(crate::buffer::types::DecompressError::TooLarge) => {
482                    return self.corrupt_actions(ResourceError::TooLarge);
483                }
484                Err(crate::buffer::types::DecompressError::InvalidData) => {
485                    return self.corrupt_actions(ResourceError::DecompressionFailed);
486                }
487            }
488        } else {
489            data_after_random.to_vec()
490        };
491
492        // Verify hash
493        let calculated_hash = compute_resource_hash(&decompressed, &self.random_hash);
494        if calculated_hash.as_slice() != self.resource_hash.as_slice() {
495            return self.corrupt_actions(ResourceError::HashMismatch);
496        }
497
498        // Compute proof before metadata extraction (proof uses full decompressed data)
499        let expected_proof = compute_expected_proof(&decompressed, &calculated_hash);
500        let proof_data = build_proof_data(&calculated_hash, &expected_proof);
501
502        // Extract metadata if present
503        let (data, metadata) = if self.has_metadata && self.segment_index == 1 {
504            match extract_metadata(&decompressed) {
505                Some((meta, rest)) => (rest, Some(meta)),
506                None => return self.corrupt_actions(ResourceError::InvalidPart),
507            }
508        } else {
509            (decompressed, None)
510        };
511
512        self.status = ResourceStatus::Complete;
513
514        vec![
515            ResourceAction::SendProof(proof_data),
516            ResourceAction::DataReceived { data, metadata },
517            ResourceAction::Completed,
518        ]
519    }
520
521    /// Handle cancel from sender (RESOURCE_ICL).
522    pub fn handle_cancel(&mut self) -> Vec<ResourceAction> {
523        if self.status < ResourceStatus::Complete {
524            self.status = ResourceStatus::Failed;
525            return vec![ResourceAction::Failed(ResourceError::Rejected)];
526        }
527        vec![]
528    }
529
530    /// Periodic tick. Checks for timeouts.
531    #[allow(clippy::type_complexity)]
532    pub fn tick(
533        &mut self,
534        now: f64,
535        decrypt_fn: &dyn Fn(&[u8]) -> Result<Vec<u8>, ()>,
536        compressor: &dyn Compressor,
537    ) -> Vec<ResourceAction> {
538        if self.status >= ResourceStatus::Assembling {
539            return vec![];
540        }
541
542        if self.status == ResourceStatus::Transferring {
543            // Check if all parts received — trigger assembly
544            if self.received_count == self.total_parts {
545                return self.assemble(decrypt_fn, compressor);
546            }
547
548            // Compute timeout
549            let eifr = self.compute_eifr();
550            let retries_used = self.max_retries - self.retries_left;
551            let extra_wait = retries_used as f64 * RESOURCE_PER_RETRY_DELAY;
552            let expected_hmu_wait =
553                if eifr > 0.0 && (self.waiting_for_hmu || self.outstanding_parts == 0) {
554                    (self.sdu as f64 * 8.0 * RESOURCE_HMU_WAIT_FACTOR) / eifr
555                } else {
556                    0.0
557                };
558            let expected_tof = if self.outstanding_parts > 0 && eifr > 0.0 {
559                (self.outstanding_parts as f64 * self.sdu as f64 * 8.0) / eifr
560            } else if eifr > 0.0 {
561                (3.0 * self.sdu as f64) / eifr
562            } else {
563                10.0 // fallback
564            };
565
566            let sleep_time = self.last_activity
567                + self.part_timeout_factor * expected_tof
568                + expected_hmu_wait
569                + RESOURCE_RETRY_GRACE_TIME
570                + extra_wait;
571
572            if now > sleep_time {
573                if self.retries_left > 0 {
574                    // Timeout — shrink window, retry
575                    self.window.on_timeout();
576                    self.retries_left -= 1;
577                    self.waiting_for_hmu = false;
578                    return self.request_next(now);
579                } else {
580                    self.status = ResourceStatus::Failed;
581                    return vec![ResourceAction::Failed(ResourceError::MaxRetriesExceeded)];
582                }
583            }
584        }
585
586        vec![]
587    }
588
589    /// Compute EIFR (expected inflight rate) and update self.eifr.
590    fn compute_eifr(&mut self) -> f64 {
591        let eifr = if self.req_data_rtt_rate > 0.0 {
592            self.req_data_rtt_rate * 8.0
593        } else if let Some(prev) = self.previous_eifr {
594            prev
595        } else {
596            // Fallback: use link_rtt as establishment cost estimate
597            let rtt = self.rtt.unwrap_or(self.link_rtt);
598            if rtt > 0.0 {
599                (self.sdu as f64 * 8.0) / rtt
600            } else {
601                10000.0
602            }
603        };
604        self.eifr = Some(eifr);
605        eifr
606    }
607
608    /// Get current progress as (received, total).
609    pub fn progress(&self) -> (usize, usize) {
610        (self.received_count, self.total_parts)
611    }
612
613    /// Get window and EIFR for passing to next transfer.
614    pub fn get_transfer_state(&self) -> (usize, Option<f64>) {
615        (self.window.window, self.eifr)
616    }
617}
618
619#[cfg(test)]
620mod tests {
621    use super::*;
622    use crate::buffer::types::{Compressor, DecompressError, NoopCompressor};
623    use crate::resource::sender::ResourceSender;
624
625    fn identity_encrypt(data: &[u8]) -> Vec<u8> {
626        data.to_vec()
627    }
628
629    fn identity_decrypt(data: &[u8]) -> Result<Vec<u8>, ()> {
630        Ok(data.to_vec())
631    }
632
633    struct ExpandingCompressor;
634
635    impl Compressor for ExpandingCompressor {
636        fn compress(&self, data: &[u8]) -> Option<Vec<u8>> {
637            Some(data[..data.len() / 2].to_vec())
638        }
639
640        fn decompress(&self, data: &[u8]) -> Option<Vec<u8>> {
641            self.decompress_bounded(data, usize::MAX).ok()
642        }
643
644        fn decompress_bounded(
645            &self,
646            data: &[u8],
647            max_output_size: usize,
648        ) -> Result<Vec<u8>, DecompressError> {
649            let mut out = data.to_vec();
650            out.extend_from_slice(data);
651            if out.len() > max_output_size {
652                return Err(DecompressError::TooLarge);
653            }
654            Ok(out)
655        }
656    }
657
658    fn base_timeout(receiver: &ResourceReceiver, eifr: f64) -> f64 {
659        let expected_tof = if receiver.outstanding_parts > 0 {
660            (receiver.outstanding_parts as f64 * receiver.sdu as f64 * 8.0) / eifr
661        } else {
662            (3.0 * receiver.sdu as f64) / eifr
663        };
664
665        receiver.last_activity
666            + receiver.part_timeout_factor * expected_tof
667            + RESOURCE_RETRY_GRACE_TIME
668    }
669
670    fn hmu_timeout(receiver: &ResourceReceiver, eifr: f64) -> f64 {
671        let expected_hmu_wait = (receiver.sdu as f64 * 8.0 * RESOURCE_HMU_WAIT_FACTOR) / eifr;
672        base_timeout(receiver, eifr) + expected_hmu_wait
673    }
674
675    fn make_sender_receiver() -> (ResourceSender, ResourceReceiver) {
676        let mut rng = rns_crypto::FixedRng::new(&[0x42; 64]);
677        let data = b"Hello, Resource Transfer!";
678
679        let sender = ResourceSender::new(
680            data,
681            None,
682            RESOURCE_SDU,
683            &identity_encrypt,
684            &NoopCompressor,
685            &mut rng,
686            1000.0,
687            false,
688            false,
689            None,
690            1,
691            1,
692            None,
693            0.5,
694            6.0,
695        )
696        .unwrap();
697
698        let adv_data = sender.get_advertisement(0);
699        let receiver =
700            ResourceReceiver::from_advertisement(&adv_data, RESOURCE_SDU, 0.5, 1000.0, None, None)
701                .unwrap();
702
703        (sender, receiver)
704    }
705
706    #[test]
707    fn test_from_advertisement() {
708        let (sender, receiver) = make_sender_receiver();
709        assert_eq!(receiver.total_parts, sender.total_parts());
710        assert_eq!(receiver.transfer_size, sender.transfer_size as u64);
711        assert_eq!(receiver.resource_hash, sender.resource_hash.to_vec());
712        assert!(!receiver.advertisement_packet.is_empty());
713        assert_eq!(
714            receiver.max_decompressed_size,
715            RESOURCE_AUTO_COMPRESS_MAX_SIZE
716        );
717    }
718
719    #[test]
720    fn test_accept() {
721        let (_, mut receiver) = make_sender_receiver();
722        let actions = receiver.accept(1000.0);
723        assert_eq!(receiver.status, ResourceStatus::Transferring);
724        assert!(!actions.is_empty());
725        assert!(actions
726            .iter()
727            .any(|a| matches!(a, ResourceAction::SendRequest(_))));
728    }
729
730    #[test]
731    fn test_reject() {
732        let (_, mut receiver) = make_sender_receiver();
733        let actions = receiver.reject();
734        assert_eq!(receiver.status, ResourceStatus::Rejected);
735        assert!(actions
736            .iter()
737            .any(|a| matches!(a, ResourceAction::SendCancelReceiver(_))));
738    }
739
740    #[test]
741    fn test_receive_part_stores() {
742        let (mut sender, mut receiver) = make_sender_receiver();
743        receiver.accept(1000.0);
744
745        // Get part data from sender (we use identity encryption so parts ARE the raw data)
746        // Request first part
747        let mut request = Vec::new();
748        request.push(RESOURCE_HASHMAP_IS_NOT_EXHAUSTED);
749        request.extend_from_slice(&sender.resource_hash);
750        request.extend_from_slice(&sender.part_hashes[0]);
751
752        let send_actions = sender.handle_request(&request, 1001.0);
753        let part_data = send_actions
754            .iter()
755            .find_map(|a| match a {
756                ResourceAction::SendPart(d) => Some(d.clone()),
757                _ => None,
758            })
759            .unwrap();
760
761        // Give it to receiver
762        receiver.req_sent = 1000.5;
763        let _actions = receiver.receive_part(&part_data, 1001.0);
764        assert_eq!(receiver.received_count, 1);
765    }
766
767    #[test]
768    fn test_consecutive_completed_height() {
769        let (sender, mut receiver) = make_sender_receiver();
770        receiver.accept(1000.0);
771
772        // Simulate receiving parts in order for a multi-part resource
773        if sender.total_parts() > 1 {
774            // This only applies to multi-part transfers
775            assert_eq!(receiver.consecutive_completed_height, -1);
776        }
777    }
778
779    #[test]
780    fn test_handle_cancel() {
781        let (_, mut receiver) = make_sender_receiver();
782        receiver.accept(1000.0);
783        let _actions = receiver.handle_cancel();
784        assert_eq!(receiver.status, ResourceStatus::Failed);
785    }
786
787    #[test]
788    fn test_assemble_compressed_resource_rejects_oversized_decompression() {
789        let data = b"oversized!";
790        let mut rng = rns_crypto::FixedRng::new(&[0x93; 64]);
791
792        let mut sender = ResourceSender::new(
793            data,
794            None,
795            RESOURCE_SDU,
796            &identity_encrypt,
797            &ExpandingCompressor,
798            &mut rng,
799            1000.0,
800            true,
801            false,
802            None,
803            1,
804            1,
805            None,
806            0.5,
807            6.0,
808        )
809        .unwrap();
810
811        let adv = sender.get_advertisement(0);
812        let mut receiver =
813            ResourceReceiver::from_advertisement(&adv, RESOURCE_SDU, 0.5, 1000.0, None, None)
814                .unwrap();
815        receiver.max_decompressed_size = data.len() - 1;
816
817        let request_data = receiver
818            .accept(1001.0)
819            .into_iter()
820            .find_map(|a| match a {
821                ResourceAction::SendRequest(d) => Some(d),
822                _ => None,
823            })
824            .unwrap();
825
826        let send_actions = sender.handle_request(&request_data, 1002.0);
827        receiver.req_sent = 1001.0;
828        for action in &send_actions {
829            if let ResourceAction::SendPart(part_data) = action {
830                receiver.receive_part(part_data, 1003.0);
831            }
832        }
833
834        let assemble_actions = receiver.assemble(&identity_decrypt, &ExpandingCompressor);
835        assert_eq!(receiver.status, ResourceStatus::Corrupt);
836        assert!(assemble_actions
837            .iter()
838            .any(|a| matches!(a, ResourceAction::SendCancelReceiver(_))));
839        assert!(assemble_actions
840            .iter()
841            .any(|a| matches!(a, ResourceAction::Failed(ResourceError::TooLarge))));
842        assert!(assemble_actions
843            .iter()
844            .any(|a| matches!(a, ResourceAction::TeardownLink)));
845    }
846
847    #[test]
848    fn test_full_transfer_small_data() {
849        // End-to-end: sender creates, receiver accepts, parts flow, assembly completes
850        let data = b"small data";
851        let mut rng = rns_crypto::FixedRng::new(&[0x77; 64]);
852
853        let mut sender = ResourceSender::new(
854            data,
855            None,
856            RESOURCE_SDU,
857            &identity_encrypt,
858            &NoopCompressor,
859            &mut rng,
860            1000.0,
861            false,
862            false,
863            None,
864            1,
865            1,
866            None,
867            0.5,
868            6.0,
869        )
870        .unwrap();
871
872        let adv = sender.get_advertisement(0);
873        let mut receiver =
874            ResourceReceiver::from_advertisement(&adv, RESOURCE_SDU, 0.5, 1000.0, None, None)
875                .unwrap();
876
877        // Accept
878        let req_actions = receiver.accept(1001.0);
879        assert_eq!(receiver.status, ResourceStatus::Transferring);
880
881        // Get request data
882        let request_data = req_actions
883            .iter()
884            .find_map(|a| match a {
885                ResourceAction::SendRequest(d) => Some(d.clone()),
886                _ => None,
887            })
888            .unwrap();
889
890        // Sender handles request
891        let send_actions = sender.handle_request(&request_data, 1002.0);
892
893        // Feed all parts to receiver
894        receiver.req_sent = 1001.0;
895        for action in &send_actions {
896            if let ResourceAction::SendPart(part_data) = action {
897                receiver.receive_part(part_data, 1003.0);
898            }
899        }
900
901        assert_eq!(receiver.received_count, receiver.total_parts);
902
903        // Assemble
904        let assemble_actions = receiver.assemble(&identity_decrypt, &NoopCompressor);
905
906        // Check for proof and data
907        let has_proof = assemble_actions
908            .iter()
909            .any(|a| matches!(a, ResourceAction::SendProof(_)));
910        let has_data = assemble_actions
911            .iter()
912            .any(|a| matches!(a, ResourceAction::DataReceived { .. }));
913        let has_complete = assemble_actions
914            .iter()
915            .any(|a| matches!(a, ResourceAction::Completed));
916
917        assert!(has_proof, "Should send proof");
918        assert!(has_data, "Should return data");
919        assert!(has_complete, "Should be completed");
920
921        // Verify data matches
922        let received_data = assemble_actions
923            .iter()
924            .find_map(|a| match a {
925                ResourceAction::DataReceived { data, .. } => Some(data.clone()),
926                _ => None,
927            })
928            .unwrap();
929        assert_eq!(received_data, data);
930
931        // Verify proof validates on sender side
932        let proof_data = assemble_actions
933            .iter()
934            .find_map(|a| match a {
935                ResourceAction::SendProof(d) => Some(d.clone()),
936                _ => None,
937            })
938            .unwrap();
939
940        let _proof_actions = sender.handle_proof(&proof_data, 1004.0);
941        assert_eq!(sender.status, ResourceStatus::Complete);
942    }
943
944    #[test]
945    fn test_full_transfer_with_metadata() {
946        let data = b"data with metadata";
947        let metadata = b"some metadata";
948        let mut rng = rns_crypto::FixedRng::new(&[0x88; 64]);
949
950        let mut sender = ResourceSender::new(
951            data,
952            Some(metadata),
953            RESOURCE_SDU,
954            &identity_encrypt,
955            &NoopCompressor,
956            &mut rng,
957            1000.0,
958            false,
959            false,
960            None,
961            1,
962            1,
963            None,
964            0.5,
965            6.0,
966        )
967        .unwrap();
968
969        assert!(sender.flags.has_metadata);
970
971        let adv = sender.get_advertisement(0);
972        let mut receiver =
973            ResourceReceiver::from_advertisement(&adv, RESOURCE_SDU, 0.5, 1000.0, None, None)
974                .unwrap();
975
976        assert!(receiver.has_metadata);
977
978        // Transfer all parts
979        let req_actions = receiver.accept(1001.0);
980        let request_data = req_actions
981            .iter()
982            .find_map(|a| match a {
983                ResourceAction::SendRequest(d) => Some(d.clone()),
984                _ => None,
985            })
986            .unwrap();
987
988        let send_actions = sender.handle_request(&request_data, 1002.0);
989        receiver.req_sent = 1001.0;
990        for action in &send_actions {
991            if let ResourceAction::SendPart(part_data) = action {
992                receiver.receive_part(part_data, 1003.0);
993            }
994        }
995
996        let assemble_actions = receiver.assemble(&identity_decrypt, &NoopCompressor);
997
998        let (recv_data, recv_meta) = assemble_actions
999            .iter()
1000            .find_map(|a| match a {
1001                ResourceAction::DataReceived { data, metadata } => {
1002                    Some((data.clone(), metadata.clone()))
1003                }
1004                _ => None,
1005            })
1006            .unwrap();
1007
1008        assert_eq!(recv_data, data);
1009        assert_eq!(recv_meta.unwrap(), metadata);
1010    }
1011
1012    #[test]
1013    fn test_previous_window_restore() {
1014        let (_, _receiver) = make_sender_receiver();
1015        // Create with previous window
1016        let adv_data = {
1017            let mut rng = rns_crypto::FixedRng::new(&[0x42; 64]);
1018            let sender = ResourceSender::new(
1019                b"test",
1020                None,
1021                RESOURCE_SDU,
1022                &identity_encrypt,
1023                &NoopCompressor,
1024                &mut rng,
1025                1000.0,
1026                false,
1027                false,
1028                None,
1029                1,
1030                1,
1031                None,
1032                0.5,
1033                6.0,
1034            )
1035            .unwrap();
1036            sender.get_advertisement(0)
1037        };
1038
1039        let receiver = ResourceReceiver::from_advertisement(
1040            &adv_data,
1041            RESOURCE_SDU,
1042            0.5,
1043            1000.0,
1044            Some(8),
1045            Some(50000.0),
1046        )
1047        .unwrap();
1048        assert_eq!(receiver.window.window, 8);
1049    }
1050
1051    #[test]
1052    fn test_tick_timeout_retry() {
1053        let (_, mut receiver) = make_sender_receiver();
1054        receiver.accept(1000.0);
1055        receiver.rtt = Some(0.1);
1056
1057        // Way past timeout
1058        let actions = receiver.tick(9999.0, &identity_decrypt, &NoopCompressor);
1059        // Should have retried (window decreased, request_next called)
1060        assert!(!actions.is_empty() || receiver.retries_left < RESOURCE_MAX_RETRIES);
1061    }
1062
1063    #[test]
1064    fn test_tick_waiting_for_hmu_gets_extra_timeout() {
1065        let (_, mut receiver) = make_sender_receiver();
1066        receiver.accept(1000.0);
1067        receiver.waiting_for_hmu = true;
1068        receiver.outstanding_parts = 0;
1069        let eifr = 10_000.0;
1070        receiver.previous_eifr = Some(eifr);
1071        receiver.last_activity = 1000.0;
1072
1073        let old_timeout = base_timeout(&receiver, eifr);
1074        let now = old_timeout + 0.01;
1075
1076        let actions = receiver.tick(now, &identity_decrypt, &NoopCompressor);
1077
1078        assert!(actions.is_empty(), "receiver should keep waiting for HMU");
1079        assert_eq!(receiver.retries_left, RESOURCE_MAX_RETRIES);
1080        assert_eq!(receiver.status, ResourceStatus::Transferring);
1081    }
1082
1083    #[test]
1084    fn test_tick_zero_outstanding_parts_gets_extra_timeout_without_hmu_flag() {
1085        let (_, mut receiver) = make_sender_receiver();
1086        receiver.accept(1000.0);
1087        receiver.waiting_for_hmu = false;
1088        receiver.outstanding_parts = 0;
1089        let eifr = 10_000.0;
1090        receiver.previous_eifr = Some(eifr);
1091        receiver.last_activity = 1000.0;
1092
1093        let old_timeout = base_timeout(&receiver, eifr);
1094        let now = old_timeout + 0.01;
1095
1096        let actions = receiver.tick(now, &identity_decrypt, &NoopCompressor);
1097
1098        assert!(
1099            actions.is_empty(),
1100            "receiver should keep waiting for follow-up hashmap data"
1101        );
1102        assert_eq!(receiver.retries_left, RESOURCE_MAX_RETRIES);
1103        assert_eq!(receiver.status, ResourceStatus::Transferring);
1104    }
1105
1106    #[test]
1107    fn test_tick_waiting_for_hmu_retries_after_extended_timeout() {
1108        let (_, mut receiver) = make_sender_receiver();
1109        receiver.accept(1000.0);
1110        receiver.waiting_for_hmu = true;
1111        receiver.outstanding_parts = 0;
1112        let eifr = 10_000.0;
1113        receiver.previous_eifr = Some(eifr);
1114        receiver.last_activity = 1000.0;
1115
1116        let now = hmu_timeout(&receiver, eifr) + 0.01;
1117        let _actions = receiver.tick(now, &identity_decrypt, &NoopCompressor);
1118
1119        assert_eq!(receiver.retries_left, RESOURCE_MAX_RETRIES - 1);
1120        assert!(!receiver.waiting_for_hmu);
1121    }
1122
1123    #[test]
1124    fn test_tick_inflight_parts_do_not_get_hmu_timeout_extension() {
1125        let (_, mut receiver) = make_sender_receiver();
1126        receiver.accept(1000.0);
1127        receiver.waiting_for_hmu = false;
1128        receiver.outstanding_parts = 2;
1129        let eifr = 10_000.0;
1130        receiver.previous_eifr = Some(eifr);
1131        receiver.last_activity = 1000.0;
1132
1133        let now = base_timeout(&receiver, eifr) + 0.01;
1134        let _actions = receiver.tick(now, &identity_decrypt, &NoopCompressor);
1135
1136        assert_eq!(receiver.retries_left, RESOURCE_MAX_RETRIES - 1);
1137    }
1138
1139    #[test]
1140    fn test_tick_max_retries_exceeded() {
1141        let (_, mut receiver) = make_sender_receiver();
1142        receiver.accept(1000.0);
1143        receiver.retries_left = 0;
1144        receiver.rtt = Some(0.001);
1145        receiver.eifr = Some(100000.0);
1146
1147        let _actions = receiver.tick(9999.0, &identity_decrypt, &NoopCompressor);
1148        assert_eq!(receiver.status, ResourceStatus::Failed);
1149    }
1150}