use alloc::vec;
use alloc::vec::Vec;
use super::advertisement::ResourceAdvertisement;
use super::parts::{extract_metadata, map_hash};
use super::proof::{build_proof_data, compute_expected_proof, compute_resource_hash};
use super::types::*;
use super::window::WindowState;
use crate::buffer::types::Compressor;
use crate::constants::*;
pub struct ResourceReceiver {
pub status: ResourceStatus,
pub resource_hash: Vec<u8>,
pub random_hash: Vec<u8>,
pub original_hash: Vec<u8>,
pub flags: AdvFlags,
pub transfer_size: u64,
pub data_size: u64,
pub total_parts: usize,
parts: Vec<Option<Vec<u8>>>,
hashmap: Vec<Option<[u8; RESOURCE_MAPHASH_LEN]>>,
hashmap_height: usize,
pub waiting_for_hmu: bool,
pub received_count: usize,
pub outstanding_parts: usize,
consecutive_completed_height: isize,
sdu: usize,
link_rtt: f64,
pub retries_left: usize,
max_retries: usize,
pub rtt: Option<f64>,
part_timeout_factor: f64,
pub last_activity: f64,
pub req_sent: f64,
req_sent_bytes: usize,
req_resp: Option<f64>,
rtt_rxd_bytes: usize,
rtt_rxd_bytes_at_part_req: usize,
req_resp_rtt_rate: f64,
req_data_rtt_rate: f64,
pub eifr: Option<f64>,
previous_eifr: Option<f64>,
pub segment_index: u64,
pub total_segments: u64,
pub has_metadata: bool,
pub request_id: Option<Vec<u8>>,
pub window: WindowState,
pub advertisement_packet: Vec<u8>,
pub max_decompressed_size: usize,
}
impl ResourceReceiver {
pub fn from_advertisement(
adv_data: &[u8],
sdu: usize,
link_rtt: f64,
now: f64,
previous_window: Option<usize>,
previous_eifr: Option<f64>,
) -> Result<Self, ResourceError> {
let adv = ResourceAdvertisement::unpack(adv_data)?;
if adv.resource_hash.len() != 32 {
return Err(ResourceError::InvalidAdvertisement);
}
if adv.random_hash.len() != RESOURCE_RANDOM_HASH_SIZE || adv.original_hash.len() != 32 {
return Err(ResourceError::InvalidAdvertisement);
}
if adv.transfer_size > RESOURCE_AUTO_COMPRESS_MAX_SIZE as u64
|| adv.data_size > RESOURCE_AUTO_COMPRESS_MAX_SIZE as u64
{
return Err(ResourceError::TooLarge);
}
let total_parts = usize::try_from(adv.num_parts).map_err(|_| ResourceError::TooLarge)?;
if total_parts == 0 {
return Err(ResourceError::InvalidAdvertisement);
}
let max_parts = RESOURCE_AUTO_COMPRESS_MAX_SIZE
.div_ceil(sdu.max(1))
.saturating_add(RESOURCE_COLLISION_GUARD_SIZE);
if total_parts > max_parts {
return Err(ResourceError::TooLarge);
}
let parts_vec: Vec<Option<Vec<u8>>> = vec![None; total_parts];
let mut hashmap_vec: Vec<Option<[u8; RESOURCE_MAPHASH_LEN]>> = vec![None; total_parts];
let initial_hashes = adv.hashmap.len() / RESOURCE_MAPHASH_LEN;
let mut hashmap_height = 0;
for (i, slot) in hashmap_vec.iter_mut().enumerate().take(initial_hashes) {
if i < total_parts {
let start = i * RESOURCE_MAPHASH_LEN;
let end = start + RESOURCE_MAPHASH_LEN;
let mut h = [0u8; RESOURCE_MAPHASH_LEN];
h.copy_from_slice(&adv.hashmap[start..end]);
*slot = Some(h);
hashmap_height += 1;
}
}
let mut window_state = WindowState::new();
if let Some(prev_w) = previous_window {
window_state.restore(prev_w);
}
Ok(ResourceReceiver {
status: ResourceStatus::None,
resource_hash: adv.resource_hash,
random_hash: adv.random_hash,
original_hash: adv.original_hash,
flags: adv.flags,
transfer_size: adv.transfer_size,
data_size: adv.data_size,
total_parts,
parts: parts_vec,
hashmap: hashmap_vec,
hashmap_height,
waiting_for_hmu: false,
received_count: 0,
outstanding_parts: 0,
consecutive_completed_height: -1,
sdu,
link_rtt,
retries_left: RESOURCE_MAX_RETRIES,
max_retries: RESOURCE_MAX_RETRIES,
rtt: None,
part_timeout_factor: RESOURCE_PART_TIMEOUT_FACTOR,
last_activity: now,
req_sent: 0.0,
req_sent_bytes: 0,
req_resp: None,
rtt_rxd_bytes: 0,
rtt_rxd_bytes_at_part_req: 0,
req_resp_rtt_rate: 0.0,
req_data_rtt_rate: 0.0,
eifr: None,
previous_eifr,
segment_index: adv.segment_index,
total_segments: adv.total_segments,
has_metadata: adv.flags.has_metadata,
request_id: adv.request_id,
window: window_state,
advertisement_packet: adv_data.to_vec(),
max_decompressed_size: RESOURCE_AUTO_COMPRESS_MAX_SIZE,
})
}
pub fn accept(&mut self, now: f64) -> Vec<ResourceAction> {
self.status = ResourceStatus::Transferring;
self.last_activity = now;
self.request_next(now)
}
pub fn reject(&mut self) -> Vec<ResourceAction> {
self.status = ResourceStatus::Rejected;
vec![ResourceAction::SendCancelReceiver(
self.resource_hash.clone(),
)]
}
fn corrupt_actions(&mut self, error: ResourceError) -> Vec<ResourceAction> {
self.status = ResourceStatus::Corrupt;
vec![
ResourceAction::SendCancelReceiver(self.resource_hash.clone()),
ResourceAction::Failed(error),
ResourceAction::TeardownLink,
]
}
pub fn receive_part(&mut self, part_data: &[u8], now: f64) -> Vec<ResourceAction> {
if self.status == ResourceStatus::Failed {
return vec![];
}
self.last_activity = now;
self.retries_left = self.max_retries;
if self.req_resp.is_none() {
self.req_resp = Some(now);
let rtt = now - self.req_sent;
self.part_timeout_factor = RESOURCE_PART_TIMEOUT_FACTOR_AFTER_RTT;
if self.rtt.is_none() {
self.rtt = Some(rtt);
} else if let Some(current_rtt) = self.rtt {
if rtt < current_rtt {
self.rtt = Some(f64::max(current_rtt - current_rtt * 0.05, rtt));
} else {
self.rtt = Some(f64::min(current_rtt + current_rtt * 0.05, rtt));
}
}
if rtt > 0.0 {
let req_resp_cost = part_data.len() + self.req_sent_bytes;
self.req_resp_rtt_rate = req_resp_cost as f64 / rtt;
self.window.update_req_resp_rate(self.req_resp_rtt_rate);
}
}
self.status = ResourceStatus::Transferring;
let part_hash = map_hash(part_data, &self.random_hash);
let consecutive_idx = if self.consecutive_completed_height >= 0 {
self.consecutive_completed_height as usize
} else {
0
};
let mut matched = false;
let search_end = core::cmp::min(consecutive_idx + self.window.window, self.total_parts);
for i in consecutive_idx..search_end {
if let Some(ref h) = self.hashmap[i] {
if *h == part_hash {
if self.parts[i].is_none() {
self.parts[i] = Some(part_data.to_vec());
self.rtt_rxd_bytes += part_data.len();
self.received_count += 1;
self.outstanding_parts = self.outstanding_parts.saturating_sub(1);
if i as isize == self.consecutive_completed_height + 1 {
self.consecutive_completed_height = i as isize;
}
let mut cp = (self.consecutive_completed_height + 1) as usize;
while cp < self.total_parts && self.parts[cp].is_some() {
self.consecutive_completed_height = cp as isize;
cp += 1;
}
matched = true;
}
break;
}
}
}
let mut actions = Vec::new();
if self.received_count == self.total_parts {
actions.push(ResourceAction::ProgressUpdate {
received: self.received_count,
total: self.total_parts,
});
return actions;
}
if matched {
actions.push(ResourceAction::ProgressUpdate {
received: self.received_count,
total: self.total_parts,
});
}
if self.outstanding_parts == 0 && self.received_count < self.total_parts {
self.window.on_window_complete();
if self.req_sent > 0.0 {
let rtt = now - self.req_sent;
let req_transferred = self.rtt_rxd_bytes - self.rtt_rxd_bytes_at_part_req;
if rtt > 0.0 {
self.req_data_rtt_rate = req_transferred as f64 / rtt;
self.rtt_rxd_bytes_at_part_req = self.rtt_rxd_bytes;
self.window.update_data_rate(self.req_data_rtt_rate);
}
}
let next_actions = self.request_next(now);
actions.extend(next_actions);
}
actions
}
pub fn handle_hashmap_update(&mut self, hmu_data: &[u8], now: f64) -> Vec<ResourceAction> {
if self.status == ResourceStatus::Failed {
return vec![];
}
self.last_activity = now;
self.retries_left = self.max_retries;
if hmu_data.len() <= 32 {
return vec![];
}
let payload = &hmu_data[32..];
let (value, _) = match crate::msgpack::unpack(payload) {
Ok(v) => v,
Err(_) => return vec![],
};
let arr = match value.as_array() {
Some(a) if a.len() >= 2 => a,
_ => return vec![],
};
let segment = match arr[0].as_uint() {
Some(s) => s as usize,
None => return vec![],
};
let hashmap_bytes = match arr[1].as_bin() {
Some(b) => b,
None => return vec![],
};
let seg_len = RESOURCE_HASHMAP_MAX_LEN;
let num_hashes = hashmap_bytes.len() / RESOURCE_MAPHASH_LEN;
for i in 0..num_hashes {
let idx = i + segment * seg_len;
if idx < self.total_parts {
let start = i * RESOURCE_MAPHASH_LEN;
let end = start + RESOURCE_MAPHASH_LEN;
if self.hashmap[idx].is_none() {
self.hashmap_height += 1;
}
let mut h = [0u8; RESOURCE_MAPHASH_LEN];
h.copy_from_slice(&hashmap_bytes[start..end]);
self.hashmap[idx] = Some(h);
}
}
self.waiting_for_hmu = false;
self.request_next(now)
}
pub fn request_next(&mut self, now: f64) -> Vec<ResourceAction> {
if self.status == ResourceStatus::Failed || self.waiting_for_hmu {
return vec![];
}
self.outstanding_parts = 0;
let mut hashmap_exhausted = RESOURCE_HASHMAP_IS_NOT_EXHAUSTED;
let mut requested_hashes = Vec::new();
let pn_start = (self.consecutive_completed_height + 1) as usize;
let search_end = core::cmp::min(pn_start + self.window.window, self.total_parts);
let mut i = 0;
for pn in pn_start..search_end {
if self.parts[pn].is_none() {
match self.hashmap[pn] {
Some(ref h) => {
requested_hashes.extend_from_slice(h);
self.outstanding_parts += 1;
i += 1;
}
None => {
hashmap_exhausted = RESOURCE_HASHMAP_IS_EXHAUSTED;
}
}
}
if i >= self.window.window || hashmap_exhausted == RESOURCE_HASHMAP_IS_EXHAUSTED {
break;
}
}
let mut request_data = Vec::new();
request_data.push(hashmap_exhausted);
if hashmap_exhausted == RESOURCE_HASHMAP_IS_EXHAUSTED {
if self.hashmap_height > 0 {
if let Some(ref last_hash) = self.hashmap[self.hashmap_height - 1] {
request_data.extend_from_slice(last_hash);
} else {
request_data.extend_from_slice(&[0u8; RESOURCE_MAPHASH_LEN]);
}
} else {
request_data.extend_from_slice(&[0u8; RESOURCE_MAPHASH_LEN]);
}
self.waiting_for_hmu = true;
}
request_data.extend_from_slice(&self.resource_hash);
request_data.extend_from_slice(&requested_hashes);
self.last_activity = now;
self.req_sent = now;
self.req_sent_bytes = request_data.len();
self.req_resp = None;
vec![ResourceAction::SendRequest(request_data)]
}
#[allow(clippy::type_complexity)]
pub fn assemble(
&mut self,
decrypt_fn: &dyn Fn(&[u8]) -> Result<Vec<u8>, ()>,
compressor: &dyn Compressor,
) -> Vec<ResourceAction> {
if self.received_count != self.total_parts {
return vec![ResourceAction::Failed(ResourceError::InvalidState)];
}
self.status = ResourceStatus::Assembling;
let mut stream = Vec::new();
for part in &self.parts {
match part {
Some(data) => stream.extend_from_slice(data),
None => {
self.status = ResourceStatus::Failed;
return vec![ResourceAction::Failed(ResourceError::InvalidState)];
}
}
}
let decrypted = if self.flags.encrypted {
match decrypt_fn(&stream) {
Ok(d) => d,
Err(_) => {
self.status = ResourceStatus::Failed;
return vec![ResourceAction::Failed(ResourceError::DecryptionFailed)];
}
}
} else {
stream
};
if decrypted.len() < RESOURCE_RANDOM_HASH_SIZE {
return self.corrupt_actions(ResourceError::InvalidPart);
}
let data_after_random = &decrypted[RESOURCE_RANDOM_HASH_SIZE..];
let decompressed = if self.flags.compressed {
match compressor.decompress_bounded(data_after_random, self.max_decompressed_size) {
Ok(d) => d,
Err(crate::buffer::types::DecompressError::TooLarge) => {
return self.corrupt_actions(ResourceError::TooLarge);
}
Err(crate::buffer::types::DecompressError::InvalidData) => {
return self.corrupt_actions(ResourceError::DecompressionFailed);
}
}
} else {
data_after_random.to_vec()
};
let calculated_hash = compute_resource_hash(&decompressed, &self.random_hash);
if calculated_hash.as_slice() != self.resource_hash.as_slice() {
return self.corrupt_actions(ResourceError::HashMismatch);
}
let expected_proof = compute_expected_proof(&decompressed, &calculated_hash);
let proof_data = build_proof_data(&calculated_hash, &expected_proof);
let (data, metadata) = if self.has_metadata && self.segment_index == 1 {
match extract_metadata(&decompressed) {
Some((meta, rest)) => (rest, Some(meta)),
None => return self.corrupt_actions(ResourceError::InvalidPart),
}
} else {
(decompressed, None)
};
self.status = ResourceStatus::Complete;
vec![
ResourceAction::SendProof(proof_data),
ResourceAction::DataReceived { data, metadata },
ResourceAction::Completed,
]
}
pub fn handle_cancel(&mut self) -> Vec<ResourceAction> {
if self.status < ResourceStatus::Complete {
self.status = ResourceStatus::Failed;
return vec![ResourceAction::Failed(ResourceError::Rejected)];
}
vec![]
}
#[allow(clippy::type_complexity)]
pub fn tick(
&mut self,
now: f64,
decrypt_fn: &dyn Fn(&[u8]) -> Result<Vec<u8>, ()>,
compressor: &dyn Compressor,
) -> Vec<ResourceAction> {
if self.status >= ResourceStatus::Assembling {
return vec![];
}
if self.status == ResourceStatus::Transferring {
if self.received_count == self.total_parts {
return self.assemble(decrypt_fn, compressor);
}
let eifr = self.compute_eifr();
let retries_used = self.max_retries - self.retries_left;
let extra_wait = retries_used as f64 * RESOURCE_PER_RETRY_DELAY;
let expected_hmu_wait =
if eifr > 0.0 && (self.waiting_for_hmu || self.outstanding_parts == 0) {
(self.sdu as f64 * 8.0 * RESOURCE_HMU_WAIT_FACTOR) / eifr
} else {
0.0
};
let expected_tof = if self.outstanding_parts > 0 && eifr > 0.0 {
(self.outstanding_parts as f64 * self.sdu as f64 * 8.0) / eifr
} else if eifr > 0.0 {
(3.0 * self.sdu as f64) / eifr
} else {
10.0 };
let sleep_time = self.last_activity
+ self.part_timeout_factor * expected_tof
+ expected_hmu_wait
+ RESOURCE_RETRY_GRACE_TIME
+ extra_wait;
if now > sleep_time {
if self.retries_left > 0 {
self.window.on_timeout();
self.retries_left -= 1;
self.waiting_for_hmu = false;
return self.request_next(now);
} else {
self.status = ResourceStatus::Failed;
return vec![ResourceAction::Failed(ResourceError::MaxRetriesExceeded)];
}
}
}
vec![]
}
fn compute_eifr(&mut self) -> f64 {
let eifr = if self.req_data_rtt_rate > 0.0 {
self.req_data_rtt_rate * 8.0
} else if let Some(prev) = self.previous_eifr {
prev
} else {
let rtt = self.rtt.unwrap_or(self.link_rtt);
if rtt > 0.0 {
(self.sdu as f64 * 8.0) / rtt
} else {
10000.0
}
};
self.eifr = Some(eifr);
eifr
}
pub fn progress(&self) -> (usize, usize) {
(self.received_count, self.total_parts)
}
pub fn get_transfer_state(&self) -> (usize, Option<f64>) {
(self.window.window, self.eifr)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::buffer::types::{Compressor, DecompressError, NoopCompressor};
use crate::resource::advertisement::ResourceAdvertisement;
use crate::resource::sender::ResourceSender;
fn identity_encrypt(data: &[u8]) -> Vec<u8> {
data.to_vec()
}
fn identity_decrypt(data: &[u8]) -> Result<Vec<u8>, ()> {
Ok(data.to_vec())
}
struct ExpandingCompressor;
impl Compressor for ExpandingCompressor {
fn compress(&self, data: &[u8]) -> Option<Vec<u8>> {
Some(data[..data.len() / 2].to_vec())
}
fn decompress(&self, data: &[u8]) -> Option<Vec<u8>> {
self.decompress_bounded(data, usize::MAX).ok()
}
fn decompress_bounded(
&self,
data: &[u8],
max_output_size: usize,
) -> Result<Vec<u8>, DecompressError> {
let mut out = data.to_vec();
out.extend_from_slice(data);
if out.len() > max_output_size {
return Err(DecompressError::TooLarge);
}
Ok(out)
}
}
fn base_timeout(receiver: &ResourceReceiver, eifr: f64) -> f64 {
let expected_tof = if receiver.outstanding_parts > 0 {
(receiver.outstanding_parts as f64 * receiver.sdu as f64 * 8.0) / eifr
} else {
(3.0 * receiver.sdu as f64) / eifr
};
receiver.last_activity
+ receiver.part_timeout_factor * expected_tof
+ RESOURCE_RETRY_GRACE_TIME
}
fn hmu_timeout(receiver: &ResourceReceiver, eifr: f64) -> f64 {
let expected_hmu_wait = (receiver.sdu as f64 * 8.0 * RESOURCE_HMU_WAIT_FACTOR) / eifr;
base_timeout(receiver, eifr) + expected_hmu_wait
}
fn make_sender_receiver() -> (ResourceSender, ResourceReceiver) {
let mut rng = rns_crypto::FixedRng::new(&[0x42; 64]);
let data = b"Hello, Resource Transfer!";
let sender = ResourceSender::new(
data,
None,
RESOURCE_SDU,
&identity_encrypt,
&NoopCompressor,
&mut rng,
1000.0,
false,
false,
None,
1,
1,
None,
0.5,
6.0,
)
.unwrap();
let adv_data = sender.get_advertisement(0);
let receiver =
ResourceReceiver::from_advertisement(&adv_data, RESOURCE_SDU, 0.5, 1000.0, None, None)
.unwrap();
(sender, receiver)
}
#[test]
fn test_from_advertisement() {
let (sender, receiver) = make_sender_receiver();
assert_eq!(receiver.total_parts, sender.total_parts());
assert_eq!(receiver.transfer_size, sender.transfer_size as u64);
assert_eq!(receiver.resource_hash, sender.resource_hash.to_vec());
assert!(!receiver.advertisement_packet.is_empty());
assert_eq!(
receiver.max_decompressed_size,
RESOURCE_AUTO_COMPRESS_MAX_SIZE
);
}
#[test]
fn test_from_advertisement_rejects_huge_part_count() {
let adv = ResourceAdvertisement {
transfer_size: 1024,
data_size: 1024,
num_parts: u64::MAX,
resource_hash: vec![0x11; 32],
random_hash: vec![0x22; RESOURCE_RANDOM_HASH_SIZE],
original_hash: vec![0x33; 32],
hashmap: vec![0x44; RESOURCE_MAPHASH_LEN],
flags: AdvFlags {
encrypted: false,
compressed: false,
split: false,
is_request: false,
is_response: false,
has_metadata: false,
},
segment_index: 1,
total_segments: 1,
request_id: None,
};
let err = match ResourceReceiver::from_advertisement(
&adv.pack(0),
RESOURCE_SDU,
0.5,
1000.0,
None,
None,
) {
Ok(_) => panic!("huge advertisement should be rejected"),
Err(err) => err,
};
assert_eq!(err, ResourceError::TooLarge);
}
#[test]
fn test_from_advertisement_rejects_oversized_transfer() {
let adv = ResourceAdvertisement {
transfer_size: RESOURCE_AUTO_COMPRESS_MAX_SIZE as u64 + 1,
data_size: 1024,
num_parts: 1,
resource_hash: vec![0x11; 32],
random_hash: vec![0x22; RESOURCE_RANDOM_HASH_SIZE],
original_hash: vec![0x33; 32],
hashmap: vec![0x44; RESOURCE_MAPHASH_LEN],
flags: AdvFlags {
encrypted: false,
compressed: false,
split: false,
is_request: false,
is_response: false,
has_metadata: false,
},
segment_index: 1,
total_segments: 1,
request_id: None,
};
let err = match ResourceReceiver::from_advertisement(
&adv.pack(0),
RESOURCE_SDU,
0.5,
1000.0,
None,
None,
) {
Ok(_) => panic!("oversized advertisement should be rejected"),
Err(err) => err,
};
assert_eq!(err, ResourceError::TooLarge);
}
#[test]
fn test_accept() {
let (_, mut receiver) = make_sender_receiver();
let actions = receiver.accept(1000.0);
assert_eq!(receiver.status, ResourceStatus::Transferring);
assert!(!actions.is_empty());
assert!(actions
.iter()
.any(|a| matches!(a, ResourceAction::SendRequest(_))));
}
#[test]
fn test_reject() {
let (_, mut receiver) = make_sender_receiver();
let actions = receiver.reject();
assert_eq!(receiver.status, ResourceStatus::Rejected);
assert!(actions
.iter()
.any(|a| matches!(a, ResourceAction::SendCancelReceiver(_))));
}
#[test]
fn test_receive_part_stores() {
let (mut sender, mut receiver) = make_sender_receiver();
receiver.accept(1000.0);
let mut request = Vec::new();
request.push(RESOURCE_HASHMAP_IS_NOT_EXHAUSTED);
request.extend_from_slice(&sender.resource_hash);
request.extend_from_slice(&sender.part_hashes[0]);
let send_actions = sender.handle_request(&request, 1001.0);
let part_data = send_actions
.iter()
.find_map(|a| match a {
ResourceAction::SendPart(d) => Some(d.clone()),
_ => None,
})
.unwrap();
receiver.req_sent = 1000.5;
let _actions = receiver.receive_part(&part_data, 1001.0);
assert_eq!(receiver.received_count, 1);
}
#[test]
fn test_consecutive_completed_height() {
let (sender, mut receiver) = make_sender_receiver();
receiver.accept(1000.0);
if sender.total_parts() > 1 {
assert_eq!(receiver.consecutive_completed_height, -1);
}
}
#[test]
fn test_handle_cancel() {
let (_, mut receiver) = make_sender_receiver();
receiver.accept(1000.0);
let _actions = receiver.handle_cancel();
assert_eq!(receiver.status, ResourceStatus::Failed);
}
#[test]
fn test_assemble_compressed_resource_rejects_oversized_decompression() {
let data = b"oversized!";
let mut rng = rns_crypto::FixedRng::new(&[0x93; 64]);
let mut sender = ResourceSender::new(
data,
None,
RESOURCE_SDU,
&identity_encrypt,
&ExpandingCompressor,
&mut rng,
1000.0,
true,
false,
None,
1,
1,
None,
0.5,
6.0,
)
.unwrap();
let adv = sender.get_advertisement(0);
let mut receiver =
ResourceReceiver::from_advertisement(&adv, RESOURCE_SDU, 0.5, 1000.0, None, None)
.unwrap();
receiver.max_decompressed_size = data.len() - 1;
let request_data = receiver
.accept(1001.0)
.into_iter()
.find_map(|a| match a {
ResourceAction::SendRequest(d) => Some(d),
_ => None,
})
.unwrap();
let send_actions = sender.handle_request(&request_data, 1002.0);
receiver.req_sent = 1001.0;
for action in &send_actions {
if let ResourceAction::SendPart(part_data) = action {
receiver.receive_part(part_data, 1003.0);
}
}
let assemble_actions = receiver.assemble(&identity_decrypt, &ExpandingCompressor);
assert_eq!(receiver.status, ResourceStatus::Corrupt);
assert!(assemble_actions
.iter()
.any(|a| matches!(a, ResourceAction::SendCancelReceiver(_))));
assert!(assemble_actions
.iter()
.any(|a| matches!(a, ResourceAction::Failed(ResourceError::TooLarge))));
assert!(assemble_actions
.iter()
.any(|a| matches!(a, ResourceAction::TeardownLink)));
}
#[test]
fn test_full_transfer_small_data() {
let data = b"small data";
let mut rng = rns_crypto::FixedRng::new(&[0x77; 64]);
let mut sender = ResourceSender::new(
data,
None,
RESOURCE_SDU,
&identity_encrypt,
&NoopCompressor,
&mut rng,
1000.0,
false,
false,
None,
1,
1,
None,
0.5,
6.0,
)
.unwrap();
let adv = sender.get_advertisement(0);
let mut receiver =
ResourceReceiver::from_advertisement(&adv, RESOURCE_SDU, 0.5, 1000.0, None, None)
.unwrap();
let req_actions = receiver.accept(1001.0);
assert_eq!(receiver.status, ResourceStatus::Transferring);
let request_data = req_actions
.iter()
.find_map(|a| match a {
ResourceAction::SendRequest(d) => Some(d.clone()),
_ => None,
})
.unwrap();
let send_actions = sender.handle_request(&request_data, 1002.0);
receiver.req_sent = 1001.0;
for action in &send_actions {
if let ResourceAction::SendPart(part_data) = action {
receiver.receive_part(part_data, 1003.0);
}
}
assert_eq!(receiver.received_count, receiver.total_parts);
let assemble_actions = receiver.assemble(&identity_decrypt, &NoopCompressor);
let has_proof = assemble_actions
.iter()
.any(|a| matches!(a, ResourceAction::SendProof(_)));
let has_data = assemble_actions
.iter()
.any(|a| matches!(a, ResourceAction::DataReceived { .. }));
let has_complete = assemble_actions
.iter()
.any(|a| matches!(a, ResourceAction::Completed));
assert!(has_proof, "Should send proof");
assert!(has_data, "Should return data");
assert!(has_complete, "Should be completed");
let received_data = assemble_actions
.iter()
.find_map(|a| match a {
ResourceAction::DataReceived { data, .. } => Some(data.clone()),
_ => None,
})
.unwrap();
assert_eq!(received_data, data);
let proof_data = assemble_actions
.iter()
.find_map(|a| match a {
ResourceAction::SendProof(d) => Some(d.clone()),
_ => None,
})
.unwrap();
let _proof_actions = sender.handle_proof(&proof_data, 1004.0);
assert_eq!(sender.status, ResourceStatus::Complete);
}
#[test]
fn test_full_transfer_with_metadata() {
let data = b"data with metadata";
let metadata = b"some metadata";
let mut rng = rns_crypto::FixedRng::new(&[0x88; 64]);
let mut sender = ResourceSender::new(
data,
Some(metadata),
RESOURCE_SDU,
&identity_encrypt,
&NoopCompressor,
&mut rng,
1000.0,
false,
false,
None,
1,
1,
None,
0.5,
6.0,
)
.unwrap();
assert!(sender.flags.has_metadata);
let adv = sender.get_advertisement(0);
let mut receiver =
ResourceReceiver::from_advertisement(&adv, RESOURCE_SDU, 0.5, 1000.0, None, None)
.unwrap();
assert!(receiver.has_metadata);
let req_actions = receiver.accept(1001.0);
let request_data = req_actions
.iter()
.find_map(|a| match a {
ResourceAction::SendRequest(d) => Some(d.clone()),
_ => None,
})
.unwrap();
let send_actions = sender.handle_request(&request_data, 1002.0);
receiver.req_sent = 1001.0;
for action in &send_actions {
if let ResourceAction::SendPart(part_data) = action {
receiver.receive_part(part_data, 1003.0);
}
}
let assemble_actions = receiver.assemble(&identity_decrypt, &NoopCompressor);
let (recv_data, recv_meta) = assemble_actions
.iter()
.find_map(|a| match a {
ResourceAction::DataReceived { data, metadata } => {
Some((data.clone(), metadata.clone()))
}
_ => None,
})
.unwrap();
assert_eq!(recv_data, data);
assert_eq!(recv_meta.unwrap(), metadata);
}
#[test]
fn test_previous_window_restore() {
let (_, _receiver) = make_sender_receiver();
let adv_data = {
let mut rng = rns_crypto::FixedRng::new(&[0x42; 64]);
let sender = ResourceSender::new(
b"test",
None,
RESOURCE_SDU,
&identity_encrypt,
&NoopCompressor,
&mut rng,
1000.0,
false,
false,
None,
1,
1,
None,
0.5,
6.0,
)
.unwrap();
sender.get_advertisement(0)
};
let receiver = ResourceReceiver::from_advertisement(
&adv_data,
RESOURCE_SDU,
0.5,
1000.0,
Some(8),
Some(50000.0),
)
.unwrap();
assert_eq!(receiver.window.window, 8);
}
#[test]
fn test_tick_timeout_retry() {
let (_, mut receiver) = make_sender_receiver();
receiver.accept(1000.0);
receiver.rtt = Some(0.1);
let actions = receiver.tick(9999.0, &identity_decrypt, &NoopCompressor);
assert!(!actions.is_empty() || receiver.retries_left < RESOURCE_MAX_RETRIES);
}
#[test]
fn test_tick_waiting_for_hmu_gets_extra_timeout() {
let (_, mut receiver) = make_sender_receiver();
receiver.accept(1000.0);
receiver.waiting_for_hmu = true;
receiver.outstanding_parts = 0;
let eifr = 10_000.0;
receiver.previous_eifr = Some(eifr);
receiver.last_activity = 1000.0;
let old_timeout = base_timeout(&receiver, eifr);
let now = old_timeout + 0.01;
let actions = receiver.tick(now, &identity_decrypt, &NoopCompressor);
assert!(actions.is_empty(), "receiver should keep waiting for HMU");
assert_eq!(receiver.retries_left, RESOURCE_MAX_RETRIES);
assert_eq!(receiver.status, ResourceStatus::Transferring);
}
#[test]
fn test_tick_zero_outstanding_parts_gets_extra_timeout_without_hmu_flag() {
let (_, mut receiver) = make_sender_receiver();
receiver.accept(1000.0);
receiver.waiting_for_hmu = false;
receiver.outstanding_parts = 0;
let eifr = 10_000.0;
receiver.previous_eifr = Some(eifr);
receiver.last_activity = 1000.0;
let old_timeout = base_timeout(&receiver, eifr);
let now = old_timeout + 0.01;
let actions = receiver.tick(now, &identity_decrypt, &NoopCompressor);
assert!(
actions.is_empty(),
"receiver should keep waiting for follow-up hashmap data"
);
assert_eq!(receiver.retries_left, RESOURCE_MAX_RETRIES);
assert_eq!(receiver.status, ResourceStatus::Transferring);
}
#[test]
fn test_tick_waiting_for_hmu_retries_after_extended_timeout() {
let (_, mut receiver) = make_sender_receiver();
receiver.accept(1000.0);
receiver.waiting_for_hmu = true;
receiver.outstanding_parts = 0;
let eifr = 10_000.0;
receiver.previous_eifr = Some(eifr);
receiver.last_activity = 1000.0;
let now = hmu_timeout(&receiver, eifr) + 0.01;
let _actions = receiver.tick(now, &identity_decrypt, &NoopCompressor);
assert_eq!(receiver.retries_left, RESOURCE_MAX_RETRIES - 1);
assert!(!receiver.waiting_for_hmu);
}
#[test]
fn test_tick_inflight_parts_do_not_get_hmu_timeout_extension() {
let (_, mut receiver) = make_sender_receiver();
receiver.accept(1000.0);
receiver.waiting_for_hmu = false;
receiver.outstanding_parts = 2;
let eifr = 10_000.0;
receiver.previous_eifr = Some(eifr);
receiver.last_activity = 1000.0;
let now = base_timeout(&receiver, eifr) + 0.01;
let _actions = receiver.tick(now, &identity_decrypt, &NoopCompressor);
assert_eq!(receiver.retries_left, RESOURCE_MAX_RETRIES - 1);
}
#[test]
fn test_tick_max_retries_exceeded() {
let (_, mut receiver) = make_sender_receiver();
receiver.accept(1000.0);
receiver.retries_left = 0;
receiver.rtt = Some(0.001);
receiver.eifr = Some(100000.0);
let _actions = receiver.tick(9999.0, &identity_decrypt, &NoopCompressor);
assert_eq!(receiver.status, ResourceStatus::Failed);
}
}