Skip to main content

rns_core/resource/
receiver.rs

1use alloc::vec;
2use alloc::vec::Vec;
3
4use super::advertisement::ResourceAdvertisement;
5use super::parts::{extract_metadata, map_hash};
6use super::proof::{build_proof_data, compute_expected_proof, compute_resource_hash};
7use super::types::*;
8use super::window::WindowState;
9use crate::buffer::types::Compressor;
10use crate::constants::*;
11
12/// Resource receiver state machine.
13///
14/// Unpacks advertisements, requests parts, receives parts, assembles data.
15/// Returns `Vec<ResourceAction>` — no I/O, no callbacks.
16pub struct ResourceReceiver {
17    /// Current status
18    pub status: ResourceStatus,
19    /// Resource hash (from advertisement, 32 bytes)
20    pub resource_hash: Vec<u8>,
21    /// Random hash (from advertisement)
22    pub random_hash: Vec<u8>,
23    /// Original hash
24    pub original_hash: Vec<u8>,
25    /// Flags
26    pub flags: AdvFlags,
27    /// Transfer size (encrypted)
28    pub transfer_size: u64,
29    /// Total uncompressed data size
30    pub data_size: u64,
31    /// Total parts
32    pub total_parts: usize,
33    /// Received parts data (None = not yet received)
34    parts: Vec<Option<Vec<u8>>>,
35    /// Hashmap: part index -> map_hash (None if not yet known)
36    hashmap: Vec<Option<[u8; RESOURCE_MAPHASH_LEN]>>,
37    /// Number of hashmap entries populated
38    hashmap_height: usize,
39    /// Whether we're waiting for a hashmap update
40    pub waiting_for_hmu: bool,
41    /// Number of parts received
42    pub received_count: usize,
43    /// Outstanding parts in current window request
44    pub outstanding_parts: usize,
45    /// Consecutive completed height (-1 means none)
46    consecutive_completed_height: isize,
47    /// SDU size
48    sdu: usize,
49    /// Link RTT estimate (from link establishment)
50    link_rtt: f64,
51    /// Retries left
52    pub retries_left: usize,
53    /// Max retries
54    max_retries: usize,
55    /// RTT estimate
56    pub rtt: Option<f64>,
57    /// Part timeout factor
58    part_timeout_factor: f64,
59    /// Last activity timestamp
60    pub last_activity: f64,
61    /// Request sent timestamp
62    pub req_sent: f64,
63    /// Request sent bytes
64    req_sent_bytes: usize,
65    /// Request response timestamp
66    req_resp: Option<f64>,
67    /// RTT received bytes
68    rtt_rxd_bytes: usize,
69    /// RTT received bytes at part request
70    rtt_rxd_bytes_at_part_req: usize,
71    /// Request response RTT rate
72    req_resp_rtt_rate: f64,
73    /// Request data RTT rate
74    req_data_rtt_rate: f64,
75    /// EIFR
76    pub eifr: Option<f64>,
77    /// Previous EIFR from prior transfer
78    previous_eifr: Option<f64>,
79    /// Segment index
80    pub segment_index: u64,
81    /// Total segments
82    pub total_segments: u64,
83    /// Has metadata
84    pub has_metadata: bool,
85    /// Request ID
86    pub request_id: Option<Vec<u8>>,
87    /// Window state
88    pub window: WindowState,
89    /// Original advertisement bytes for this incoming transfer.
90    pub advertisement_packet: Vec<u8>,
91    /// Maximum allowed decompressed size for compressed resource payloads.
92    pub max_decompressed_size: usize,
93}
94
95impl ResourceReceiver {
96    /// Create a receiver from an advertisement packet.
97    pub fn from_advertisement(
98        adv_data: &[u8],
99        sdu: usize,
100        link_rtt: f64,
101        now: f64,
102        previous_window: Option<usize>,
103        previous_eifr: Option<f64>,
104    ) -> Result<Self, ResourceError> {
105        let adv = ResourceAdvertisement::unpack(adv_data)?;
106
107        // Validate resource_hash is 32 bytes
108        if adv.resource_hash.len() != 32 {
109            return Err(ResourceError::InvalidAdvertisement);
110        }
111        if adv.random_hash.len() != RESOURCE_RANDOM_HASH_SIZE || adv.original_hash.len() != 32 {
112            return Err(ResourceError::InvalidAdvertisement);
113        }
114        if adv.transfer_size > RESOURCE_AUTO_COMPRESS_MAX_SIZE as u64
115            || adv.data_size > RESOURCE_AUTO_COMPRESS_MAX_SIZE as u64
116        {
117            return Err(ResourceError::TooLarge);
118        }
119
120        let total_parts = usize::try_from(adv.num_parts).map_err(|_| ResourceError::TooLarge)?;
121        if total_parts == 0 {
122            return Err(ResourceError::InvalidAdvertisement);
123        }
124        let max_parts = RESOURCE_AUTO_COMPRESS_MAX_SIZE
125            .div_ceil(sdu.max(1))
126            .saturating_add(RESOURCE_COLLISION_GUARD_SIZE);
127        if total_parts > max_parts {
128            return Err(ResourceError::TooLarge);
129        }
130        let parts_vec: Vec<Option<Vec<u8>>> = vec![None; total_parts];
131        let mut hashmap_vec: Vec<Option<[u8; RESOURCE_MAPHASH_LEN]>> = vec![None; total_parts];
132
133        // Populate initial hashmap from advertisement
134        let initial_hashes = adv.hashmap.len() / RESOURCE_MAPHASH_LEN;
135        let mut hashmap_height = 0;
136        for (i, slot) in hashmap_vec.iter_mut().enumerate().take(initial_hashes) {
137            if i < total_parts {
138                let start = i * RESOURCE_MAPHASH_LEN;
139                let end = start + RESOURCE_MAPHASH_LEN;
140                let mut h = [0u8; RESOURCE_MAPHASH_LEN];
141                h.copy_from_slice(&adv.hashmap[start..end]);
142                *slot = Some(h);
143                hashmap_height += 1;
144            }
145        }
146
147        let mut window_state = WindowState::new();
148        if let Some(prev_w) = previous_window {
149            window_state.restore(prev_w);
150        }
151
152        Ok(ResourceReceiver {
153            status: ResourceStatus::None,
154            resource_hash: adv.resource_hash,
155            random_hash: adv.random_hash,
156            original_hash: adv.original_hash,
157            flags: adv.flags,
158            transfer_size: adv.transfer_size,
159            data_size: adv.data_size,
160            total_parts,
161            parts: parts_vec,
162            hashmap: hashmap_vec,
163            hashmap_height,
164            waiting_for_hmu: false,
165            received_count: 0,
166            outstanding_parts: 0,
167            consecutive_completed_height: -1,
168            sdu,
169            link_rtt,
170            retries_left: RESOURCE_MAX_RETRIES,
171            max_retries: RESOURCE_MAX_RETRIES,
172            rtt: None,
173            part_timeout_factor: RESOURCE_PART_TIMEOUT_FACTOR,
174            last_activity: now,
175            req_sent: 0.0,
176            req_sent_bytes: 0,
177            req_resp: None,
178            rtt_rxd_bytes: 0,
179            rtt_rxd_bytes_at_part_req: 0,
180            req_resp_rtt_rate: 0.0,
181            req_data_rtt_rate: 0.0,
182            eifr: None,
183            previous_eifr,
184            segment_index: adv.segment_index,
185            total_segments: adv.total_segments,
186            has_metadata: adv.flags.has_metadata,
187            request_id: adv.request_id,
188            window: window_state,
189            advertisement_packet: adv_data.to_vec(),
190            max_decompressed_size: RESOURCE_AUTO_COMPRESS_MAX_SIZE,
191        })
192    }
193
194    /// Accept the advertised resource. Begins transfer.
195    pub fn accept(&mut self, now: f64) -> Vec<ResourceAction> {
196        self.status = ResourceStatus::Transferring;
197        self.last_activity = now;
198        self.request_next(now)
199    }
200
201    /// Reject the advertised resource.
202    pub fn reject(&mut self) -> Vec<ResourceAction> {
203        self.status = ResourceStatus::Rejected;
204        vec![ResourceAction::SendCancelReceiver(
205            self.resource_hash.clone(),
206        )]
207    }
208
209    fn corrupt_actions(&mut self, error: ResourceError) -> Vec<ResourceAction> {
210        self.status = ResourceStatus::Corrupt;
211        vec![
212            ResourceAction::SendCancelReceiver(self.resource_hash.clone()),
213            ResourceAction::Failed(error),
214            ResourceAction::TeardownLink,
215        ]
216    }
217
218    /// Receive a part. Matches by map hash and stores it.
219    pub fn receive_part(&mut self, part_data: &[u8], now: f64) -> Vec<ResourceAction> {
220        if self.status == ResourceStatus::Failed {
221            return vec![];
222        }
223
224        self.last_activity = now;
225        self.retries_left = self.max_retries;
226
227        // Update RTT on first part of window
228        if self.req_resp.is_none() {
229            self.req_resp = Some(now);
230            let rtt = now - self.req_sent;
231            self.part_timeout_factor = RESOURCE_PART_TIMEOUT_FACTOR_AFTER_RTT;
232
233            if self.rtt.is_none() {
234                self.rtt = Some(rtt);
235            } else if let Some(current_rtt) = self.rtt {
236                if rtt < current_rtt {
237                    self.rtt = Some(f64::max(current_rtt - current_rtt * 0.05, rtt));
238                } else {
239                    self.rtt = Some(f64::min(current_rtt + current_rtt * 0.05, rtt));
240                }
241            }
242
243            if rtt > 0.0 {
244                let req_resp_cost = part_data.len() + self.req_sent_bytes;
245                self.req_resp_rtt_rate = req_resp_cost as f64 / rtt;
246                self.window.update_req_resp_rate(self.req_resp_rtt_rate);
247            }
248        }
249
250        self.status = ResourceStatus::Transferring;
251
252        // Compute map hash for this part
253        let part_hash = map_hash(part_data, &self.random_hash);
254
255        // Search in the window around consecutive_completed_height
256        let consecutive_idx = if self.consecutive_completed_height >= 0 {
257            self.consecutive_completed_height as usize
258        } else {
259            0
260        };
261
262        let mut matched = false;
263        let search_end = core::cmp::min(consecutive_idx + self.window.window, self.total_parts);
264        for i in consecutive_idx..search_end {
265            if let Some(ref h) = self.hashmap[i] {
266                if *h == part_hash {
267                    if self.parts[i].is_none() {
268                        self.parts[i] = Some(part_data.to_vec());
269                        self.rtt_rxd_bytes += part_data.len();
270                        self.received_count += 1;
271                        self.outstanding_parts = self.outstanding_parts.saturating_sub(1);
272
273                        // Update consecutive completed height
274                        if i as isize == self.consecutive_completed_height + 1 {
275                            self.consecutive_completed_height = i as isize;
276                        }
277
278                        // Walk forward to extend consecutive height
279                        let mut cp = (self.consecutive_completed_height + 1) as usize;
280                        while cp < self.total_parts && self.parts[cp].is_some() {
281                            self.consecutive_completed_height = cp as isize;
282                            cp += 1;
283                        }
284
285                        matched = true;
286                    }
287                    break;
288                }
289            }
290        }
291
292        let mut actions = Vec::new();
293
294        // Check if all parts received
295        if self.received_count == self.total_parts {
296            actions.push(ResourceAction::ProgressUpdate {
297                received: self.received_count,
298                total: self.total_parts,
299            });
300            // Assembly will be triggered by caller
301            return actions;
302        }
303
304        if matched {
305            actions.push(ResourceAction::ProgressUpdate {
306                received: self.received_count,
307                total: self.total_parts,
308            });
309        }
310
311        // Request next window when outstanding is 0
312        if self.outstanding_parts == 0 && self.received_count < self.total_parts {
313            // Window complete — grow
314            self.window.on_window_complete();
315
316            // Update data rate
317            if self.req_sent > 0.0 {
318                let rtt = now - self.req_sent;
319                let req_transferred = self.rtt_rxd_bytes - self.rtt_rxd_bytes_at_part_req;
320                if rtt > 0.0 {
321                    self.req_data_rtt_rate = req_transferred as f64 / rtt;
322                    self.rtt_rxd_bytes_at_part_req = self.rtt_rxd_bytes;
323                    self.window.update_data_rate(self.req_data_rtt_rate);
324                }
325            }
326
327            let next_actions = self.request_next(now);
328            actions.extend(next_actions);
329        }
330
331        actions
332    }
333
334    /// Handle a hashmap update packet.
335    ///
336    /// HMU format: [resource_hash: 32 bytes][msgpack([segment, hashmap])]
337    pub fn handle_hashmap_update(&mut self, hmu_data: &[u8], now: f64) -> Vec<ResourceAction> {
338        if self.status == ResourceStatus::Failed {
339            return vec![];
340        }
341
342        self.last_activity = now;
343        self.retries_left = self.max_retries;
344
345        if hmu_data.len() <= 32 {
346            return vec![];
347        }
348
349        let payload = &hmu_data[32..];
350        let (value, _) = match crate::msgpack::unpack(payload) {
351            Ok(v) => v,
352            Err(_) => return vec![],
353        };
354
355        let arr = match value.as_array() {
356            Some(a) if a.len() >= 2 => a,
357            _ => return vec![],
358        };
359
360        let segment = match arr[0].as_uint() {
361            Some(s) => s as usize,
362            None => return vec![],
363        };
364
365        let hashmap_bytes = match arr[1].as_bin() {
366            Some(b) => b,
367            None => return vec![],
368        };
369
370        // Populate hashmap slots
371        let seg_len = RESOURCE_HASHMAP_MAX_LEN;
372        let num_hashes = hashmap_bytes.len() / RESOURCE_MAPHASH_LEN;
373        for i in 0..num_hashes {
374            let idx = i + segment * seg_len;
375            if idx < self.total_parts {
376                let start = i * RESOURCE_MAPHASH_LEN;
377                let end = start + RESOURCE_MAPHASH_LEN;
378                if self.hashmap[idx].is_none() {
379                    self.hashmap_height += 1;
380                }
381                let mut h = [0u8; RESOURCE_MAPHASH_LEN];
382                h.copy_from_slice(&hashmap_bytes[start..end]);
383                self.hashmap[idx] = Some(h);
384            }
385        }
386
387        self.waiting_for_hmu = false;
388        self.request_next(now)
389    }
390
391    /// Build and return request for next window of parts.
392    pub fn request_next(&mut self, now: f64) -> Vec<ResourceAction> {
393        if self.status == ResourceStatus::Failed || self.waiting_for_hmu {
394            return vec![];
395        }
396
397        self.outstanding_parts = 0;
398        let mut hashmap_exhausted = RESOURCE_HASHMAP_IS_NOT_EXHAUSTED;
399        let mut requested_hashes = Vec::new();
400
401        let pn_start = (self.consecutive_completed_height + 1) as usize;
402        let search_end = core::cmp::min(pn_start + self.window.window, self.total_parts);
403        let mut i = 0;
404
405        for pn in pn_start..search_end {
406            if self.parts[pn].is_none() {
407                match self.hashmap[pn] {
408                    Some(ref h) => {
409                        requested_hashes.extend_from_slice(h);
410                        self.outstanding_parts += 1;
411                        i += 1;
412                    }
413                    None => {
414                        hashmap_exhausted = RESOURCE_HASHMAP_IS_EXHAUSTED;
415                    }
416                }
417            }
418            if i >= self.window.window || hashmap_exhausted == RESOURCE_HASHMAP_IS_EXHAUSTED {
419                break;
420            }
421        }
422
423        let mut request_data = Vec::new();
424        request_data.push(hashmap_exhausted);
425        if hashmap_exhausted == RESOURCE_HASHMAP_IS_EXHAUSTED {
426            // Append last known map hash
427            if self.hashmap_height > 0 {
428                if let Some(ref last_hash) = self.hashmap[self.hashmap_height - 1] {
429                    request_data.extend_from_slice(last_hash);
430                } else {
431                    request_data.extend_from_slice(&[0u8; RESOURCE_MAPHASH_LEN]);
432                }
433            } else {
434                request_data.extend_from_slice(&[0u8; RESOURCE_MAPHASH_LEN]);
435            }
436            self.waiting_for_hmu = true;
437        }
438
439        request_data.extend_from_slice(&self.resource_hash);
440        request_data.extend_from_slice(&requested_hashes);
441
442        self.last_activity = now;
443        self.req_sent = now;
444        self.req_sent_bytes = request_data.len();
445        self.req_resp = None;
446
447        vec![ResourceAction::SendRequest(request_data)]
448    }
449
450    /// Assemble received parts, decrypt, decompress, verify hash.
451    #[allow(clippy::type_complexity)]
452    pub fn assemble(
453        &mut self,
454        decrypt_fn: &dyn Fn(&[u8]) -> Result<Vec<u8>, ()>,
455        compressor: &dyn Compressor,
456    ) -> Vec<ResourceAction> {
457        if self.received_count != self.total_parts {
458            return vec![ResourceAction::Failed(ResourceError::InvalidState)];
459        }
460
461        self.status = ResourceStatus::Assembling;
462
463        // Join all parts
464        let mut stream = Vec::new();
465        for part in &self.parts {
466            match part {
467                Some(data) => stream.extend_from_slice(data),
468                None => {
469                    self.status = ResourceStatus::Failed;
470                    return vec![ResourceAction::Failed(ResourceError::InvalidState)];
471                }
472            }
473        }
474
475        // Decrypt
476        let decrypted = if self.flags.encrypted {
477            match decrypt_fn(&stream) {
478                Ok(d) => d,
479                Err(_) => {
480                    self.status = ResourceStatus::Failed;
481                    return vec![ResourceAction::Failed(ResourceError::DecryptionFailed)];
482                }
483            }
484        } else {
485            stream
486        };
487
488        // Strip random hash prefix
489        if decrypted.len() < RESOURCE_RANDOM_HASH_SIZE {
490            return self.corrupt_actions(ResourceError::InvalidPart);
491        }
492        let data_after_random = &decrypted[RESOURCE_RANDOM_HASH_SIZE..];
493
494        // Decompress
495        let decompressed = if self.flags.compressed {
496            match compressor.decompress_bounded(data_after_random, self.max_decompressed_size) {
497                Ok(d) => d,
498                Err(crate::buffer::types::DecompressError::TooLarge) => {
499                    return self.corrupt_actions(ResourceError::TooLarge);
500                }
501                Err(crate::buffer::types::DecompressError::InvalidData) => {
502                    return self.corrupt_actions(ResourceError::DecompressionFailed);
503                }
504            }
505        } else {
506            data_after_random.to_vec()
507        };
508
509        // Verify hash
510        let calculated_hash = compute_resource_hash(&decompressed, &self.random_hash);
511        if calculated_hash.as_slice() != self.resource_hash.as_slice() {
512            return self.corrupt_actions(ResourceError::HashMismatch);
513        }
514
515        // Compute proof before metadata extraction (proof uses full decompressed data)
516        let expected_proof = compute_expected_proof(&decompressed, &calculated_hash);
517        let proof_data = build_proof_data(&calculated_hash, &expected_proof);
518
519        // Extract metadata if present
520        let (data, metadata) = if self.has_metadata && self.segment_index == 1 {
521            match extract_metadata(&decompressed) {
522                Some((meta, rest)) => (rest, Some(meta)),
523                None => return self.corrupt_actions(ResourceError::InvalidPart),
524            }
525        } else {
526            (decompressed, None)
527        };
528
529        self.status = ResourceStatus::Complete;
530
531        vec![
532            ResourceAction::SendProof(proof_data),
533            ResourceAction::DataReceived { data, metadata },
534            ResourceAction::Completed,
535        ]
536    }
537
538    /// Handle cancel from sender (RESOURCE_ICL).
539    pub fn handle_cancel(&mut self) -> Vec<ResourceAction> {
540        if self.status < ResourceStatus::Complete {
541            self.status = ResourceStatus::Failed;
542            return vec![ResourceAction::Failed(ResourceError::Rejected)];
543        }
544        vec![]
545    }
546
547    /// Periodic tick. Checks for timeouts.
548    #[allow(clippy::type_complexity)]
549    pub fn tick(
550        &mut self,
551        now: f64,
552        decrypt_fn: &dyn Fn(&[u8]) -> Result<Vec<u8>, ()>,
553        compressor: &dyn Compressor,
554    ) -> Vec<ResourceAction> {
555        if self.status >= ResourceStatus::Assembling {
556            return vec![];
557        }
558
559        if self.status == ResourceStatus::Transferring {
560            // Check if all parts received — trigger assembly
561            if self.received_count == self.total_parts {
562                return self.assemble(decrypt_fn, compressor);
563            }
564
565            // Compute timeout
566            let eifr = self.compute_eifr();
567            let retries_used = self.max_retries - self.retries_left;
568            let extra_wait = retries_used as f64 * RESOURCE_PER_RETRY_DELAY;
569            let expected_hmu_wait =
570                if eifr > 0.0 && (self.waiting_for_hmu || self.outstanding_parts == 0) {
571                    (self.sdu as f64 * 8.0 * RESOURCE_HMU_WAIT_FACTOR) / eifr
572                } else {
573                    0.0
574                };
575            let expected_tof = if self.outstanding_parts > 0 && eifr > 0.0 {
576                (self.outstanding_parts as f64 * self.sdu as f64 * 8.0) / eifr
577            } else if eifr > 0.0 {
578                (3.0 * self.sdu as f64) / eifr
579            } else {
580                10.0 // fallback
581            };
582
583            let sleep_time = self.last_activity
584                + self.part_timeout_factor * expected_tof
585                + expected_hmu_wait
586                + RESOURCE_RETRY_GRACE_TIME
587                + extra_wait;
588
589            if now > sleep_time {
590                if self.retries_left > 0 {
591                    // Timeout — shrink window, retry
592                    self.window.on_timeout();
593                    self.retries_left -= 1;
594                    self.waiting_for_hmu = false;
595                    return self.request_next(now);
596                } else {
597                    self.status = ResourceStatus::Failed;
598                    return vec![ResourceAction::Failed(ResourceError::MaxRetriesExceeded)];
599                }
600            }
601        }
602
603        vec![]
604    }
605
606    /// Compute EIFR (expected inflight rate) and update self.eifr.
607    fn compute_eifr(&mut self) -> f64 {
608        let eifr = if self.req_data_rtt_rate > 0.0 {
609            self.req_data_rtt_rate * 8.0
610        } else if let Some(prev) = self.previous_eifr {
611            prev
612        } else {
613            // Fallback: use link_rtt as establishment cost estimate
614            let rtt = self.rtt.unwrap_or(self.link_rtt);
615            if rtt > 0.0 {
616                (self.sdu as f64 * 8.0) / rtt
617            } else {
618                10000.0
619            }
620        };
621        self.eifr = Some(eifr);
622        eifr
623    }
624
625    /// Get current progress as (received, total).
626    pub fn progress(&self) -> (usize, usize) {
627        (self.received_count, self.total_parts)
628    }
629
630    /// Get window and EIFR for passing to next transfer.
631    pub fn get_transfer_state(&self) -> (usize, Option<f64>) {
632        (self.window.window, self.eifr)
633    }
634}
635
636#[cfg(test)]
637mod tests {
638    use super::*;
639    use crate::buffer::types::{Compressor, DecompressError, NoopCompressor};
640    use crate::resource::advertisement::ResourceAdvertisement;
641    use crate::resource::sender::ResourceSender;
642
643    fn identity_encrypt(data: &[u8]) -> Vec<u8> {
644        data.to_vec()
645    }
646
647    fn identity_decrypt(data: &[u8]) -> Result<Vec<u8>, ()> {
648        Ok(data.to_vec())
649    }
650
651    struct ExpandingCompressor;
652
653    impl Compressor for ExpandingCompressor {
654        fn compress(&self, data: &[u8]) -> Option<Vec<u8>> {
655            Some(data[..data.len() / 2].to_vec())
656        }
657
658        fn decompress(&self, data: &[u8]) -> Option<Vec<u8>> {
659            self.decompress_bounded(data, usize::MAX).ok()
660        }
661
662        fn decompress_bounded(
663            &self,
664            data: &[u8],
665            max_output_size: usize,
666        ) -> Result<Vec<u8>, DecompressError> {
667            let mut out = data.to_vec();
668            out.extend_from_slice(data);
669            if out.len() > max_output_size {
670                return Err(DecompressError::TooLarge);
671            }
672            Ok(out)
673        }
674    }
675
676    fn base_timeout(receiver: &ResourceReceiver, eifr: f64) -> f64 {
677        let expected_tof = if receiver.outstanding_parts > 0 {
678            (receiver.outstanding_parts as f64 * receiver.sdu as f64 * 8.0) / eifr
679        } else {
680            (3.0 * receiver.sdu as f64) / eifr
681        };
682
683        receiver.last_activity
684            + receiver.part_timeout_factor * expected_tof
685            + RESOURCE_RETRY_GRACE_TIME
686    }
687
688    fn hmu_timeout(receiver: &ResourceReceiver, eifr: f64) -> f64 {
689        let expected_hmu_wait = (receiver.sdu as f64 * 8.0 * RESOURCE_HMU_WAIT_FACTOR) / eifr;
690        base_timeout(receiver, eifr) + expected_hmu_wait
691    }
692
693    fn make_sender_receiver() -> (ResourceSender, ResourceReceiver) {
694        let mut rng = rns_crypto::FixedRng::new(&[0x42; 64]);
695        let data = b"Hello, Resource Transfer!";
696
697        let sender = ResourceSender::new(
698            data,
699            None,
700            RESOURCE_SDU,
701            &identity_encrypt,
702            &NoopCompressor,
703            &mut rng,
704            1000.0,
705            false,
706            false,
707            None,
708            1,
709            1,
710            None,
711            0.5,
712            6.0,
713        )
714        .unwrap();
715
716        let adv_data = sender.get_advertisement(0);
717        let receiver =
718            ResourceReceiver::from_advertisement(&adv_data, RESOURCE_SDU, 0.5, 1000.0, None, None)
719                .unwrap();
720
721        (sender, receiver)
722    }
723
724    #[test]
725    fn test_from_advertisement() {
726        let (sender, receiver) = make_sender_receiver();
727        assert_eq!(receiver.total_parts, sender.total_parts());
728        assert_eq!(receiver.transfer_size, sender.transfer_size as u64);
729        assert_eq!(receiver.resource_hash, sender.resource_hash.to_vec());
730        assert!(!receiver.advertisement_packet.is_empty());
731        assert_eq!(
732            receiver.max_decompressed_size,
733            RESOURCE_AUTO_COMPRESS_MAX_SIZE
734        );
735    }
736
737    #[test]
738    fn test_from_advertisement_rejects_huge_part_count() {
739        let adv = ResourceAdvertisement {
740            transfer_size: 1024,
741            data_size: 1024,
742            num_parts: u64::MAX,
743            resource_hash: vec![0x11; 32],
744            random_hash: vec![0x22; RESOURCE_RANDOM_HASH_SIZE],
745            original_hash: vec![0x33; 32],
746            hashmap: vec![0x44; RESOURCE_MAPHASH_LEN],
747            flags: AdvFlags {
748                encrypted: false,
749                compressed: false,
750                split: false,
751                is_request: false,
752                is_response: false,
753                has_metadata: false,
754            },
755            segment_index: 1,
756            total_segments: 1,
757            request_id: None,
758        };
759
760        let err = match ResourceReceiver::from_advertisement(
761            &adv.pack(0),
762            RESOURCE_SDU,
763            0.5,
764            1000.0,
765            None,
766            None,
767        ) {
768            Ok(_) => panic!("huge advertisement should be rejected"),
769            Err(err) => err,
770        };
771
772        assert_eq!(err, ResourceError::TooLarge);
773    }
774
775    #[test]
776    fn test_from_advertisement_rejects_oversized_transfer() {
777        let adv = ResourceAdvertisement {
778            transfer_size: RESOURCE_AUTO_COMPRESS_MAX_SIZE as u64 + 1,
779            data_size: 1024,
780            num_parts: 1,
781            resource_hash: vec![0x11; 32],
782            random_hash: vec![0x22; RESOURCE_RANDOM_HASH_SIZE],
783            original_hash: vec![0x33; 32],
784            hashmap: vec![0x44; RESOURCE_MAPHASH_LEN],
785            flags: AdvFlags {
786                encrypted: false,
787                compressed: false,
788                split: false,
789                is_request: false,
790                is_response: false,
791                has_metadata: false,
792            },
793            segment_index: 1,
794            total_segments: 1,
795            request_id: None,
796        };
797
798        let err = match ResourceReceiver::from_advertisement(
799            &adv.pack(0),
800            RESOURCE_SDU,
801            0.5,
802            1000.0,
803            None,
804            None,
805        ) {
806            Ok(_) => panic!("oversized advertisement should be rejected"),
807            Err(err) => err,
808        };
809
810        assert_eq!(err, ResourceError::TooLarge);
811    }
812
813    #[test]
814    fn test_accept() {
815        let (_, mut receiver) = make_sender_receiver();
816        let actions = receiver.accept(1000.0);
817        assert_eq!(receiver.status, ResourceStatus::Transferring);
818        assert!(!actions.is_empty());
819        assert!(actions
820            .iter()
821            .any(|a| matches!(a, ResourceAction::SendRequest(_))));
822    }
823
824    #[test]
825    fn test_reject() {
826        let (_, mut receiver) = make_sender_receiver();
827        let actions = receiver.reject();
828        assert_eq!(receiver.status, ResourceStatus::Rejected);
829        assert!(actions
830            .iter()
831            .any(|a| matches!(a, ResourceAction::SendCancelReceiver(_))));
832    }
833
834    #[test]
835    fn test_receive_part_stores() {
836        let (mut sender, mut receiver) = make_sender_receiver();
837        receiver.accept(1000.0);
838
839        // Get part data from sender (we use identity encryption so parts ARE the raw data)
840        // Request first part
841        let mut request = Vec::new();
842        request.push(RESOURCE_HASHMAP_IS_NOT_EXHAUSTED);
843        request.extend_from_slice(&sender.resource_hash);
844        request.extend_from_slice(&sender.part_hashes[0]);
845
846        let send_actions = sender.handle_request(&request, 1001.0);
847        let part_data = send_actions
848            .iter()
849            .find_map(|a| match a {
850                ResourceAction::SendPart(d) => Some(d.clone()),
851                _ => None,
852            })
853            .unwrap();
854
855        // Give it to receiver
856        receiver.req_sent = 1000.5;
857        let _actions = receiver.receive_part(&part_data, 1001.0);
858        assert_eq!(receiver.received_count, 1);
859    }
860
861    #[test]
862    fn test_consecutive_completed_height() {
863        let (sender, mut receiver) = make_sender_receiver();
864        receiver.accept(1000.0);
865
866        // Simulate receiving parts in order for a multi-part resource
867        if sender.total_parts() > 1 {
868            // This only applies to multi-part transfers
869            assert_eq!(receiver.consecutive_completed_height, -1);
870        }
871    }
872
873    #[test]
874    fn test_handle_cancel() {
875        let (_, mut receiver) = make_sender_receiver();
876        receiver.accept(1000.0);
877        let _actions = receiver.handle_cancel();
878        assert_eq!(receiver.status, ResourceStatus::Failed);
879    }
880
881    #[test]
882    fn test_assemble_compressed_resource_rejects_oversized_decompression() {
883        let data = b"oversized!";
884        let mut rng = rns_crypto::FixedRng::new(&[0x93; 64]);
885
886        let mut sender = ResourceSender::new(
887            data,
888            None,
889            RESOURCE_SDU,
890            &identity_encrypt,
891            &ExpandingCompressor,
892            &mut rng,
893            1000.0,
894            true,
895            false,
896            None,
897            1,
898            1,
899            None,
900            0.5,
901            6.0,
902        )
903        .unwrap();
904
905        let adv = sender.get_advertisement(0);
906        let mut receiver =
907            ResourceReceiver::from_advertisement(&adv, RESOURCE_SDU, 0.5, 1000.0, None, None)
908                .unwrap();
909        receiver.max_decompressed_size = data.len() - 1;
910
911        let request_data = receiver
912            .accept(1001.0)
913            .into_iter()
914            .find_map(|a| match a {
915                ResourceAction::SendRequest(d) => Some(d),
916                _ => None,
917            })
918            .unwrap();
919
920        let send_actions = sender.handle_request(&request_data, 1002.0);
921        receiver.req_sent = 1001.0;
922        for action in &send_actions {
923            if let ResourceAction::SendPart(part_data) = action {
924                receiver.receive_part(part_data, 1003.0);
925            }
926        }
927
928        let assemble_actions = receiver.assemble(&identity_decrypt, &ExpandingCompressor);
929        assert_eq!(receiver.status, ResourceStatus::Corrupt);
930        assert!(assemble_actions
931            .iter()
932            .any(|a| matches!(a, ResourceAction::SendCancelReceiver(_))));
933        assert!(assemble_actions
934            .iter()
935            .any(|a| matches!(a, ResourceAction::Failed(ResourceError::TooLarge))));
936        assert!(assemble_actions
937            .iter()
938            .any(|a| matches!(a, ResourceAction::TeardownLink)));
939    }
940
941    #[test]
942    fn test_full_transfer_small_data() {
943        // End-to-end: sender creates, receiver accepts, parts flow, assembly completes
944        let data = b"small data";
945        let mut rng = rns_crypto::FixedRng::new(&[0x77; 64]);
946
947        let mut sender = ResourceSender::new(
948            data,
949            None,
950            RESOURCE_SDU,
951            &identity_encrypt,
952            &NoopCompressor,
953            &mut rng,
954            1000.0,
955            false,
956            false,
957            None,
958            1,
959            1,
960            None,
961            0.5,
962            6.0,
963        )
964        .unwrap();
965
966        let adv = sender.get_advertisement(0);
967        let mut receiver =
968            ResourceReceiver::from_advertisement(&adv, RESOURCE_SDU, 0.5, 1000.0, None, None)
969                .unwrap();
970
971        // Accept
972        let req_actions = receiver.accept(1001.0);
973        assert_eq!(receiver.status, ResourceStatus::Transferring);
974
975        // Get request data
976        let request_data = req_actions
977            .iter()
978            .find_map(|a| match a {
979                ResourceAction::SendRequest(d) => Some(d.clone()),
980                _ => None,
981            })
982            .unwrap();
983
984        // Sender handles request
985        let send_actions = sender.handle_request(&request_data, 1002.0);
986
987        // Feed all parts to receiver
988        receiver.req_sent = 1001.0;
989        for action in &send_actions {
990            if let ResourceAction::SendPart(part_data) = action {
991                receiver.receive_part(part_data, 1003.0);
992            }
993        }
994
995        assert_eq!(receiver.received_count, receiver.total_parts);
996
997        // Assemble
998        let assemble_actions = receiver.assemble(&identity_decrypt, &NoopCompressor);
999
1000        // Check for proof and data
1001        let has_proof = assemble_actions
1002            .iter()
1003            .any(|a| matches!(a, ResourceAction::SendProof(_)));
1004        let has_data = assemble_actions
1005            .iter()
1006            .any(|a| matches!(a, ResourceAction::DataReceived { .. }));
1007        let has_complete = assemble_actions
1008            .iter()
1009            .any(|a| matches!(a, ResourceAction::Completed));
1010
1011        assert!(has_proof, "Should send proof");
1012        assert!(has_data, "Should return data");
1013        assert!(has_complete, "Should be completed");
1014
1015        // Verify data matches
1016        let received_data = assemble_actions
1017            .iter()
1018            .find_map(|a| match a {
1019                ResourceAction::DataReceived { data, .. } => Some(data.clone()),
1020                _ => None,
1021            })
1022            .unwrap();
1023        assert_eq!(received_data, data);
1024
1025        // Verify proof validates on sender side
1026        let proof_data = assemble_actions
1027            .iter()
1028            .find_map(|a| match a {
1029                ResourceAction::SendProof(d) => Some(d.clone()),
1030                _ => None,
1031            })
1032            .unwrap();
1033
1034        let _proof_actions = sender.handle_proof(&proof_data, 1004.0);
1035        assert_eq!(sender.status, ResourceStatus::Complete);
1036    }
1037
1038    #[test]
1039    fn test_full_transfer_with_metadata() {
1040        let data = b"data with metadata";
1041        let metadata = b"some metadata";
1042        let mut rng = rns_crypto::FixedRng::new(&[0x88; 64]);
1043
1044        let mut sender = ResourceSender::new(
1045            data,
1046            Some(metadata),
1047            RESOURCE_SDU,
1048            &identity_encrypt,
1049            &NoopCompressor,
1050            &mut rng,
1051            1000.0,
1052            false,
1053            false,
1054            None,
1055            1,
1056            1,
1057            None,
1058            0.5,
1059            6.0,
1060        )
1061        .unwrap();
1062
1063        assert!(sender.flags.has_metadata);
1064
1065        let adv = sender.get_advertisement(0);
1066        let mut receiver =
1067            ResourceReceiver::from_advertisement(&adv, RESOURCE_SDU, 0.5, 1000.0, None, None)
1068                .unwrap();
1069
1070        assert!(receiver.has_metadata);
1071
1072        // Transfer all parts
1073        let req_actions = receiver.accept(1001.0);
1074        let request_data = req_actions
1075            .iter()
1076            .find_map(|a| match a {
1077                ResourceAction::SendRequest(d) => Some(d.clone()),
1078                _ => None,
1079            })
1080            .unwrap();
1081
1082        let send_actions = sender.handle_request(&request_data, 1002.0);
1083        receiver.req_sent = 1001.0;
1084        for action in &send_actions {
1085            if let ResourceAction::SendPart(part_data) = action {
1086                receiver.receive_part(part_data, 1003.0);
1087            }
1088        }
1089
1090        let assemble_actions = receiver.assemble(&identity_decrypt, &NoopCompressor);
1091
1092        let (recv_data, recv_meta) = assemble_actions
1093            .iter()
1094            .find_map(|a| match a {
1095                ResourceAction::DataReceived { data, metadata } => {
1096                    Some((data.clone(), metadata.clone()))
1097                }
1098                _ => None,
1099            })
1100            .unwrap();
1101
1102        assert_eq!(recv_data, data);
1103        assert_eq!(recv_meta.unwrap(), metadata);
1104    }
1105
1106    #[test]
1107    fn test_previous_window_restore() {
1108        let (_, _receiver) = make_sender_receiver();
1109        // Create with previous window
1110        let adv_data = {
1111            let mut rng = rns_crypto::FixedRng::new(&[0x42; 64]);
1112            let sender = ResourceSender::new(
1113                b"test",
1114                None,
1115                RESOURCE_SDU,
1116                &identity_encrypt,
1117                &NoopCompressor,
1118                &mut rng,
1119                1000.0,
1120                false,
1121                false,
1122                None,
1123                1,
1124                1,
1125                None,
1126                0.5,
1127                6.0,
1128            )
1129            .unwrap();
1130            sender.get_advertisement(0)
1131        };
1132
1133        let receiver = ResourceReceiver::from_advertisement(
1134            &adv_data,
1135            RESOURCE_SDU,
1136            0.5,
1137            1000.0,
1138            Some(8),
1139            Some(50000.0),
1140        )
1141        .unwrap();
1142        assert_eq!(receiver.window.window, 8);
1143    }
1144
1145    #[test]
1146    fn test_tick_timeout_retry() {
1147        let (_, mut receiver) = make_sender_receiver();
1148        receiver.accept(1000.0);
1149        receiver.rtt = Some(0.1);
1150
1151        // Way past timeout
1152        let actions = receiver.tick(9999.0, &identity_decrypt, &NoopCompressor);
1153        // Should have retried (window decreased, request_next called)
1154        assert!(!actions.is_empty() || receiver.retries_left < RESOURCE_MAX_RETRIES);
1155    }
1156
1157    #[test]
1158    fn test_tick_waiting_for_hmu_gets_extra_timeout() {
1159        let (_, mut receiver) = make_sender_receiver();
1160        receiver.accept(1000.0);
1161        receiver.waiting_for_hmu = true;
1162        receiver.outstanding_parts = 0;
1163        let eifr = 10_000.0;
1164        receiver.previous_eifr = Some(eifr);
1165        receiver.last_activity = 1000.0;
1166
1167        let old_timeout = base_timeout(&receiver, eifr);
1168        let now = old_timeout + 0.01;
1169
1170        let actions = receiver.tick(now, &identity_decrypt, &NoopCompressor);
1171
1172        assert!(actions.is_empty(), "receiver should keep waiting for HMU");
1173        assert_eq!(receiver.retries_left, RESOURCE_MAX_RETRIES);
1174        assert_eq!(receiver.status, ResourceStatus::Transferring);
1175    }
1176
1177    #[test]
1178    fn test_tick_zero_outstanding_parts_gets_extra_timeout_without_hmu_flag() {
1179        let (_, mut receiver) = make_sender_receiver();
1180        receiver.accept(1000.0);
1181        receiver.waiting_for_hmu = false;
1182        receiver.outstanding_parts = 0;
1183        let eifr = 10_000.0;
1184        receiver.previous_eifr = Some(eifr);
1185        receiver.last_activity = 1000.0;
1186
1187        let old_timeout = base_timeout(&receiver, eifr);
1188        let now = old_timeout + 0.01;
1189
1190        let actions = receiver.tick(now, &identity_decrypt, &NoopCompressor);
1191
1192        assert!(
1193            actions.is_empty(),
1194            "receiver should keep waiting for follow-up hashmap data"
1195        );
1196        assert_eq!(receiver.retries_left, RESOURCE_MAX_RETRIES);
1197        assert_eq!(receiver.status, ResourceStatus::Transferring);
1198    }
1199
1200    #[test]
1201    fn test_tick_waiting_for_hmu_retries_after_extended_timeout() {
1202        let (_, mut receiver) = make_sender_receiver();
1203        receiver.accept(1000.0);
1204        receiver.waiting_for_hmu = true;
1205        receiver.outstanding_parts = 0;
1206        let eifr = 10_000.0;
1207        receiver.previous_eifr = Some(eifr);
1208        receiver.last_activity = 1000.0;
1209
1210        let now = hmu_timeout(&receiver, eifr) + 0.01;
1211        let _actions = receiver.tick(now, &identity_decrypt, &NoopCompressor);
1212
1213        assert_eq!(receiver.retries_left, RESOURCE_MAX_RETRIES - 1);
1214        assert!(!receiver.waiting_for_hmu);
1215    }
1216
1217    #[test]
1218    fn test_tick_inflight_parts_do_not_get_hmu_timeout_extension() {
1219        let (_, mut receiver) = make_sender_receiver();
1220        receiver.accept(1000.0);
1221        receiver.waiting_for_hmu = false;
1222        receiver.outstanding_parts = 2;
1223        let eifr = 10_000.0;
1224        receiver.previous_eifr = Some(eifr);
1225        receiver.last_activity = 1000.0;
1226
1227        let now = base_timeout(&receiver, eifr) + 0.01;
1228        let _actions = receiver.tick(now, &identity_decrypt, &NoopCompressor);
1229
1230        assert_eq!(receiver.retries_left, RESOURCE_MAX_RETRIES - 1);
1231    }
1232
1233    #[test]
1234    fn test_tick_max_retries_exceeded() {
1235        let (_, mut receiver) = make_sender_receiver();
1236        receiver.accept(1000.0);
1237        receiver.retries_left = 0;
1238        receiver.rtt = Some(0.001);
1239        receiver.eifr = Some(100000.0);
1240
1241        let _actions = receiver.tick(9999.0, &identity_decrypt, &NoopCompressor);
1242        assert_eq!(receiver.status, ResourceStatus::Failed);
1243    }
1244}