use std::collections::HashMap;
use super::compressor::Bzip2Compressor;
use rns_core::channel::{Channel, Sequence};
use rns_core::constants;
use rns_core::link::types::{LinkId, LinkState, TeardownReason};
use rns_core::link::{LinkAction, LinkEngine, LinkMode};
use rns_core::packet::{PacketFlags, RawPacket};
use rns_core::resource::{ResourceAction, ResourceReceiver, ResourceSender};
use rns_crypto::ed25519::Ed25519PrivateKey;
use rns_crypto::Rng;
use super::time;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ResourceStrategy {
AcceptNone,
AcceptAll,
AcceptApp,
}
impl Default for ResourceStrategy {
fn default() -> Self {
ResourceStrategy::AcceptNone
}
}
struct ManagedLink {
engine: LinkEngine,
channel: Option<Channel>,
pending_channel_packets: HashMap<[u8; 32], Sequence>,
channel_send_ok: u64,
channel_send_not_ready: u64,
channel_send_too_big: u64,
channel_send_other_error: u64,
channel_messages_received: u64,
channel_proofs_sent: u64,
channel_proofs_received: u64,
dest_hash: [u8; 16],
remote_identity: Option<([u8; 16], [u8; 64])>,
dest_sig_pub_bytes: Option<[u8; 32]>,
incoming_resources: Vec<ResourceReceiver>,
outgoing_resources: Vec<ResourceSender>,
incoming_splits: HashMap<[u8; 32], IncomingSplitTransfer>,
outgoing_splits: HashMap<[u8; 32], OutgoingSplitTransfer>,
resource_strategy: ResourceStrategy,
route_interface: Option<rns_core::transport::types::InterfaceId>,
route_transport_id: Option<[u8; 16]>,
}
struct IncomingSplitTransfer {
total_segments: u64,
completed_segments: u64,
current_segment_index: u64,
current_received_parts: usize,
current_total_parts: usize,
data: Vec<u8>,
metadata: Option<Vec<u8>>,
is_response: bool,
}
struct OutgoingSplitTransfer {
total_segments: u64,
completed_segments: u64,
current_segment_index: u64,
current_sent_parts: usize,
current_total_parts: usize,
}
struct LinkDestination {
sig_prv: Ed25519PrivateKey,
sig_pub_bytes: [u8; 32],
resource_strategy: ResourceStrategy,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RequestResponse {
Bytes(Vec<u8>),
Resource {
data: Vec<u8>,
metadata: Option<Vec<u8>>,
auto_compress: bool,
},
}
impl From<Vec<u8>> for RequestResponse {
fn from(data: Vec<u8>) -> Self {
RequestResponse::Bytes(data)
}
}
struct RequestHandlerEntry {
path: String,
path_hash: [u8; 16],
allowed_list: Option<Vec<[u8; 16]>>,
handler: Box<
dyn Fn(LinkId, &str, &[u8], Option<&([u8; 16], [u8; 64])>) -> Option<RequestResponse>
+ Send,
>,
}
#[derive(Debug)]
pub enum LinkManagerAction {
SendPacket {
raw: Vec<u8>,
dest_type: u8,
attached_interface: Option<rns_core::transport::types::InterfaceId>,
},
LinkEstablished {
link_id: LinkId,
dest_hash: [u8; 16],
rtt: f64,
is_initiator: bool,
},
LinkClosed {
link_id: LinkId,
reason: Option<TeardownReason>,
},
RemoteIdentified {
link_id: LinkId,
identity_hash: [u8; 16],
public_key: [u8; 64],
},
RegisterLinkDest { link_id: LinkId },
DeregisterLinkDest { link_id: LinkId },
ManagementRequest {
link_id: LinkId,
path_hash: [u8; 16],
data: Vec<u8>,
request_id: [u8; 16],
remote_identity: Option<([u8; 16], [u8; 64])>,
},
ResourceReceived {
link_id: LinkId,
data: Vec<u8>,
metadata: Option<Vec<u8>>,
},
ResourceCompleted { link_id: LinkId },
ResourceFailed { link_id: LinkId, error: String },
ResourceProgress {
link_id: LinkId,
received: usize,
total: usize,
},
ResourceAcceptQuery {
link_id: LinkId,
resource_hash: Vec<u8>,
transfer_size: u64,
has_metadata: bool,
},
ChannelMessageReceived {
link_id: LinkId,
msgtype: u16,
payload: Vec<u8>,
},
LinkDataReceived {
link_id: LinkId,
context: u8,
data: Vec<u8>,
},
ResponseReceived {
link_id: LinkId,
request_id: [u8; 16],
data: Vec<u8>,
metadata: Option<Vec<u8>>,
},
LinkRequestReceived {
link_id: LinkId,
receiving_interface: rns_core::transport::types::InterfaceId,
},
}
pub struct LinkManager {
links: HashMap<LinkId, ManagedLink>,
link_destinations: HashMap<[u8; 16], LinkDestination>,
request_handlers: Vec<RequestHandlerEntry>,
management_paths: Vec<[u8; 16]>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct LinkRouteHint {
pub interface: rns_core::transport::types::InterfaceId,
pub transport_id: Option<[u8; 16]>,
}
impl LinkManager {
fn resource_sdu_for_link(link: &ManagedLink) -> usize {
let mtu = link.engine.mtu() as usize;
let derived = mtu.saturating_sub(constants::HEADER_MAXSIZE + constants::IFAC_MIN_SIZE);
if derived > 0 {
derived
} else {
constants::RESOURCE_SDU
}
}
fn split_progress_parts(
segment_index: u64,
total_segments: u64,
current_done: usize,
current_total: usize,
sdu: usize,
) -> (usize, usize) {
let max_parts_per_segment = constants::RESOURCE_MAX_EFFICIENT_SIZE.div_ceil(sdu.max(1));
let total = (total_segments as usize).saturating_mul(max_parts_per_segment);
let completed_segments = segment_index.saturating_sub(1) as usize;
let completed = completed_segments.saturating_mul(max_parts_per_segment);
let current = if current_total == 0 {
0
} else if current_total < max_parts_per_segment {
let scaled =
(current_done as f64) * (max_parts_per_segment as f64 / current_total as f64);
scaled.floor() as usize
} else {
current_done
};
(completed.saturating_add(current).min(total), total)
}
fn resource_hash_key(hash: &[u8]) -> Option<[u8; 32]> {
let mut key = [0u8; 32];
if hash.len() != key.len() {
return None;
}
key.copy_from_slice(hash);
Some(key)
}
fn incoming_split_progress(split: &IncomingSplitTransfer, sdu: usize) -> (usize, usize) {
Self::split_progress_parts(
split.current_segment_index,
split.total_segments,
split.current_received_parts,
split.current_total_parts,
sdu,
)
}
fn outgoing_split_progress(split: &OutgoingSplitTransfer, sdu: usize) -> (usize, usize) {
Self::split_progress_parts(
split.current_segment_index,
split.total_segments,
split.current_sent_parts,
split.current_total_parts,
sdu,
)
}
pub fn new() -> Self {
LinkManager {
links: HashMap::new(),
link_destinations: HashMap::new(),
request_handlers: Vec::new(),
management_paths: Vec::new(),
}
}
pub fn register_management_path(&mut self, path_hash: [u8; 16]) {
if !self.management_paths.contains(&path_hash) {
self.management_paths.push(path_hash);
}
}
pub fn get_derived_key(&self, link_id: &LinkId) -> Option<Vec<u8>> {
self.links
.get(link_id)
.and_then(|link| link.engine.derived_key().map(|dk| dk.to_vec()))
}
pub fn get_link_route_hint(&self, link_id: &LinkId) -> Option<LinkRouteHint> {
self.links.get(link_id).and_then(|link| {
link.route_interface.map(|interface| LinkRouteHint {
interface,
transport_id: link.route_transport_id,
})
})
}
pub fn set_link_route_hint(
&mut self,
link_id: &LinkId,
interface: rns_core::transport::types::InterfaceId,
transport_id: Option<[u8; 16]>,
) -> bool {
let Some(link) = self.links.get_mut(link_id) else {
return false;
};
link.route_interface = Some(interface);
link.route_transport_id = transport_id;
true
}
pub fn register_link_destination(
&mut self,
dest_hash: [u8; 16],
sig_prv: Ed25519PrivateKey,
sig_pub_bytes: [u8; 32],
resource_strategy: ResourceStrategy,
) {
self.link_destinations.insert(
dest_hash,
LinkDestination {
sig_prv,
sig_pub_bytes,
resource_strategy,
},
);
}
pub fn deregister_link_destination(&mut self, dest_hash: &[u8; 16]) {
self.link_destinations.remove(dest_hash);
}
pub fn register_request_handler<F>(
&mut self,
path: &str,
allowed_list: Option<Vec<[u8; 16]>>,
handler: F,
) where
F: Fn(LinkId, &str, &[u8], Option<&([u8; 16], [u8; 64])>) -> Option<Vec<u8>>
+ Send
+ 'static,
{
let path_hash = compute_path_hash(path);
self.request_handlers.push(RequestHandlerEntry {
path: path.to_string(),
path_hash,
allowed_list,
handler: Box::new(move |link_id, p, data, remote| {
handler(link_id, p, data, remote).map(RequestResponse::Bytes)
}),
});
}
pub fn register_request_handler_response<F>(
&mut self,
path: &str,
allowed_list: Option<Vec<[u8; 16]>>,
handler: F,
) where
F: Fn(LinkId, &str, &[u8], Option<&([u8; 16], [u8; 64])>) -> Option<RequestResponse>
+ Send
+ 'static,
{
let path_hash = compute_path_hash(path);
self.request_handlers.push(RequestHandlerEntry {
path: path.to_string(),
path_hash,
allowed_list,
handler: Box::new(handler),
});
}
pub fn create_link(
&mut self,
dest_hash: &[u8; 16],
dest_sig_pub_bytes: &[u8; 32],
hops: u8,
mtu: u32,
rng: &mut dyn Rng,
) -> (LinkId, Vec<LinkManagerAction>) {
let mode = LinkMode::Aes256Cbc;
let (mut engine, request_data) =
LinkEngine::new_initiator(dest_hash, hops, mode, Some(mtu), time::now(), rng);
let flags = PacketFlags {
header_type: constants::HEADER_1,
context_flag: constants::FLAG_UNSET,
transport_type: constants::TRANSPORT_BROADCAST,
destination_type: constants::DESTINATION_SINGLE,
packet_type: constants::PACKET_TYPE_LINKREQUEST,
};
let packet = match RawPacket::pack(
flags,
0,
dest_hash,
None,
constants::CONTEXT_NONE,
&request_data,
) {
Ok(p) => p,
Err(_) => {
return ([0u8; 16], Vec::new());
}
};
engine.set_link_id_from_hashable(&packet.get_hashable_part(), request_data.len());
let link_id = *engine.link_id();
let managed = ManagedLink {
engine,
channel: None,
pending_channel_packets: HashMap::new(),
channel_send_ok: 0,
channel_send_not_ready: 0,
channel_send_too_big: 0,
channel_send_other_error: 0,
channel_messages_received: 0,
channel_proofs_sent: 0,
channel_proofs_received: 0,
dest_hash: *dest_hash,
remote_identity: None,
dest_sig_pub_bytes: Some(*dest_sig_pub_bytes),
incoming_resources: Vec::new(),
outgoing_resources: Vec::new(),
incoming_splits: HashMap::new(),
outgoing_splits: HashMap::new(),
resource_strategy: ResourceStrategy::default(),
route_interface: None,
route_transport_id: None,
};
self.links.insert(link_id, managed);
let mut actions = Vec::new();
actions.push(LinkManagerAction::RegisterLinkDest { link_id });
actions.push(LinkManagerAction::SendPacket {
raw: packet.raw,
dest_type: constants::DESTINATION_LINK,
attached_interface: None,
});
(link_id, actions)
}
pub fn handle_local_delivery(
&mut self,
dest_hash: [u8; 16],
raw: &[u8],
packet_hash: [u8; 32],
receiving_interface: rns_core::transport::types::InterfaceId,
rng: &mut dyn Rng,
) -> Vec<LinkManagerAction> {
let packet = match RawPacket::unpack(raw) {
Ok(p) => p,
Err(_) => return Vec::new(),
};
match packet.flags.packet_type {
constants::PACKET_TYPE_LINKREQUEST => {
self.handle_linkrequest(&dest_hash, &packet, receiving_interface, rng)
}
constants::PACKET_TYPE_PROOF if packet.context == constants::CONTEXT_LRPROOF => {
self.handle_lrproof(&dest_hash, &packet, receiving_interface, rng)
}
constants::PACKET_TYPE_PROOF => self.handle_link_proof(&dest_hash, &packet, rng),
constants::PACKET_TYPE_DATA => {
self.handle_link_data(&dest_hash, &packet, packet_hash, receiving_interface, rng)
}
_ => Vec::new(),
}
}
fn handle_linkrequest(
&mut self,
dest_hash: &[u8; 16],
packet: &RawPacket,
receiving_interface: rns_core::transport::types::InterfaceId,
rng: &mut dyn Rng,
) -> Vec<LinkManagerAction> {
let ld = match self.link_destinations.get(dest_hash) {
Some(ld) => ld,
None => return Vec::new(),
};
let hashable = packet.get_hashable_part();
let now = time::now();
let (engine, lrproof_data) = match LinkEngine::new_responder(
&ld.sig_prv,
&ld.sig_pub_bytes,
&packet.data,
&hashable,
dest_hash,
packet.hops,
now,
rng,
) {
Ok(r) => r,
Err(e) => {
log::debug!("LINKREQUEST rejected: {}", e);
return Vec::new();
}
};
let link_id = *engine.link_id();
log::debug!(
"LINKREQUEST accepted: link={:02x?} iface={} header_type={} transport_id_present={} hops={}",
&link_id[..4],
receiving_interface.0,
packet.flags.header_type,
packet.transport_id.is_some(),
packet.hops
);
let managed = ManagedLink {
engine,
channel: None,
pending_channel_packets: HashMap::new(),
channel_send_ok: 0,
channel_send_not_ready: 0,
channel_send_too_big: 0,
channel_send_other_error: 0,
channel_messages_received: 0,
channel_proofs_sent: 0,
channel_proofs_received: 0,
dest_hash: *dest_hash,
remote_identity: None,
dest_sig_pub_bytes: None,
incoming_resources: Vec::new(),
outgoing_resources: Vec::new(),
incoming_splits: HashMap::new(),
outgoing_splits: HashMap::new(),
resource_strategy: ld.resource_strategy,
route_interface: Some(receiving_interface),
route_transport_id: if packet.flags.header_type == constants::HEADER_2 {
packet.transport_id
} else {
None
},
};
self.links.insert(link_id, managed);
let flags = PacketFlags {
header_type: constants::HEADER_1,
context_flag: constants::FLAG_UNSET,
transport_type: constants::TRANSPORT_BROADCAST,
destination_type: constants::DESTINATION_LINK,
packet_type: constants::PACKET_TYPE_PROOF,
};
let mut actions = Vec::new();
actions.push(LinkManagerAction::RegisterLinkDest { link_id });
if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
flags,
packet.hops,
&link_id,
None,
constants::CONTEXT_LRPROOF,
&lrproof_data,
) {
log::debug!(
"LRPROOF queued: link={:02x?} route_iface={} route_tid_present={} hops={}",
&link_id[..4],
receiving_interface.0,
packet.transport_id.is_some(),
packet.hops
);
actions.push(LinkManagerAction::SendPacket {
raw,
dest_type: constants::DESTINATION_LINK,
attached_interface: None,
});
}
if packet.hops != 0 {
if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
flags,
0,
&link_id,
None,
constants::CONTEXT_LRPROOF,
&lrproof_data,
) {
log::debug!(
"LRPROOF fallback queued: link={:02x?} route_iface={} hops=0",
&link_id[..4],
receiving_interface.0
);
actions.push(LinkManagerAction::SendPacket {
raw,
dest_type: constants::DESTINATION_LINK,
attached_interface: None,
});
}
}
if packet.hops < u8::MAX {
let hops_plus_one = packet.hops + 1;
if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
flags,
hops_plus_one,
&link_id,
None,
constants::CONTEXT_LRPROOF,
&lrproof_data,
) {
log::debug!(
"LRPROOF +1 queued: link={:02x?} route_iface={} hops={}",
&link_id[..4],
receiving_interface.0,
hops_plus_one
);
actions.push(LinkManagerAction::SendPacket {
raw,
dest_type: constants::DESTINATION_LINK,
attached_interface: None,
});
}
}
actions.push(LinkManagerAction::LinkRequestReceived {
link_id,
receiving_interface,
});
actions
}
fn handle_link_proof(
&mut self,
link_id: &LinkId,
packet: &RawPacket,
rng: &mut dyn Rng,
) -> Vec<LinkManagerAction> {
if packet.data.len() < 32 {
return Vec::new();
}
let mut tracked_hash = [0u8; 32];
tracked_hash.copy_from_slice(&packet.data[..32]);
let Some(link) = self.links.get_mut(link_id) else {
return Vec::new();
};
let Some(sequence) = link.pending_channel_packets.remove(&tracked_hash) else {
return Vec::new();
};
link.channel_proofs_received += 1;
let Some(channel) = link.channel.as_mut() else {
return Vec::new();
};
let chan_actions = channel.packet_delivered(sequence);
let _ = channel;
let _ = link;
self.process_channel_actions(link_id, chan_actions, rng)
}
fn build_link_packet_proof(
&mut self,
link_id: &LinkId,
packet_hash: &[u8; 32],
) -> Vec<LinkManagerAction> {
let dest_hash = match self.links.get(link_id) {
Some(link) => link.dest_hash,
None => return Vec::new(),
};
let Some(ld) = self.link_destinations.get(&dest_hash) else {
return Vec::new();
};
if let Some(link) = self.links.get_mut(link_id) {
link.channel_proofs_sent += 1;
}
let signature = ld.sig_prv.sign(packet_hash);
let mut proof_data = Vec::with_capacity(96);
proof_data.extend_from_slice(packet_hash);
proof_data.extend_from_slice(&signature);
let flags = PacketFlags {
header_type: constants::HEADER_1,
context_flag: constants::FLAG_UNSET,
transport_type: constants::TRANSPORT_BROADCAST,
destination_type: constants::DESTINATION_LINK,
packet_type: constants::PACKET_TYPE_PROOF,
};
if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
flags,
0,
link_id,
None,
constants::CONTEXT_NONE,
&proof_data,
) {
vec![LinkManagerAction::SendPacket {
raw,
dest_type: constants::DESTINATION_LINK,
attached_interface: None,
}]
} else {
Vec::new()
}
}
fn handle_lrproof(
&mut self,
link_id_bytes: &[u8; 16],
packet: &RawPacket,
receiving_interface: rns_core::transport::types::InterfaceId,
rng: &mut dyn Rng,
) -> Vec<LinkManagerAction> {
let link = match self.links.get_mut(link_id_bytes) {
Some(l) => l,
None => return Vec::new(),
};
link.route_interface = Some(receiving_interface);
if packet.flags.header_type == constants::HEADER_2 {
if let Some(transport_id) = packet.transport_id {
link.route_transport_id = Some(transport_id);
}
}
log::debug!(
"LRPROOF received: link={:02x?} iface={} header_type={} transport_id_present={}",
&link_id_bytes[..4],
receiving_interface.0,
packet.flags.header_type,
packet.transport_id.is_some()
);
if link.engine.state() != LinkState::Pending || !link.engine.is_initiator() {
return Vec::new();
}
let dest_sig_pub_bytes = match link.dest_sig_pub_bytes {
Some(b) => b,
None => {
log::debug!("LRPROOF: no destination signing key available");
return Vec::new();
}
};
let now = time::now();
let (lrrtt_encrypted, link_actions) =
match link
.engine
.handle_lrproof(&packet.data, &dest_sig_pub_bytes, now, rng)
{
Ok(r) => r,
Err(e) => {
log::debug!("LRPROOF validation failed: {}", e);
return Vec::new();
}
};
let link_id = *link.engine.link_id();
let mut actions = Vec::new();
actions.extend(self.process_link_actions(&link_id, &link_actions));
let flags = PacketFlags {
header_type: constants::HEADER_1,
context_flag: constants::FLAG_UNSET,
transport_type: constants::TRANSPORT_BROADCAST,
destination_type: constants::DESTINATION_LINK,
packet_type: constants::PACKET_TYPE_DATA,
};
if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
flags,
0,
&link_id,
None,
constants::CONTEXT_LRRTT,
&lrrtt_encrypted,
) {
actions.push(LinkManagerAction::SendPacket {
raw,
dest_type: constants::DESTINATION_LINK,
attached_interface: None,
});
}
if let Some(link) = self.links.get_mut(&link_id) {
if link.engine.state() == LinkState::Active {
let rtt = link.engine.rtt().unwrap_or(1.0);
link.channel = Some(Channel::new(rtt));
}
}
actions
}
fn handle_link_data(
&mut self,
link_id_bytes: &[u8; 16],
packet: &RawPacket,
packet_hash: [u8; 32],
receiving_interface: rns_core::transport::types::InterfaceId,
rng: &mut dyn Rng,
) -> Vec<LinkManagerAction> {
enum LinkDataResult<'a> {
Lrrtt {
link_id: LinkId,
link_actions: Vec<LinkAction>,
},
Identify {
link_id: LinkId,
link_actions: Vec<LinkAction>,
},
Keepalive {
link_id: LinkId,
inbound_actions: Vec<LinkAction>,
},
LinkClose {
link_id: LinkId,
teardown_actions: Vec<LinkAction>,
},
Channel {
link_id: LinkId,
inbound_actions: Vec<LinkAction>,
plaintext: Vec<u8>,
packet_hash: [u8; 32],
},
Request {
link_id: LinkId,
inbound_actions: Vec<LinkAction>,
plaintext: Vec<u8>,
request_id: [u8; 16],
},
Response {
link_id: LinkId,
inbound_actions: Vec<LinkAction>,
plaintext: Vec<u8>,
},
Generic {
link_id: LinkId,
inbound_actions: Vec<LinkAction>,
plaintext: Vec<u8>,
context: u8,
packet_hash: [u8; 32],
},
ResourceAdv {
link_id: LinkId,
inbound_actions: Vec<LinkAction>,
plaintext: Vec<u8>,
},
ResourceReq {
link_id: LinkId,
inbound_actions: Vec<LinkAction>,
plaintext: Vec<u8>,
},
ResourceHmu {
link_id: LinkId,
inbound_actions: Vec<LinkAction>,
plaintext: Vec<u8>,
},
ResourcePart {
link_id: LinkId,
inbound_actions: Vec<LinkAction>,
raw_data: &'a [u8],
},
ResourcePrf {
link_id: LinkId,
inbound_actions: Vec<LinkAction>,
plaintext: Vec<u8>,
},
ResourceIcl {
link_id: LinkId,
inbound_actions: Vec<LinkAction>,
},
ResourceRcl {
link_id: LinkId,
inbound_actions: Vec<LinkAction>,
},
Error,
}
let result = {
let link = match self.links.get_mut(link_id_bytes) {
Some(l) => l,
None => return Vec::new(),
};
link.route_interface = Some(receiving_interface);
if packet.flags.header_type == constants::HEADER_2 {
if let Some(transport_id) = packet.transport_id {
link.route_transport_id = Some(transport_id);
}
} else {
link.route_transport_id = None;
}
match packet.context {
constants::CONTEXT_LRRTT => {
match link.engine.handle_lrrtt(&packet.data, time::now()) {
Ok(link_actions) => {
let link_id = *link.engine.link_id();
LinkDataResult::Lrrtt {
link_id,
link_actions,
}
}
Err(e) => {
log::debug!("LRRTT handling failed: {}", e);
LinkDataResult::Error
}
}
}
constants::CONTEXT_LINKIDENTIFY => {
match link.engine.handle_identify(&packet.data) {
Ok(link_actions) => {
let link_id = *link.engine.link_id();
link.remote_identity = link.engine.remote_identity().cloned();
LinkDataResult::Identify {
link_id,
link_actions,
}
}
Err(e) => {
log::debug!("LINKIDENTIFY failed: {}", e);
LinkDataResult::Error
}
}
}
constants::CONTEXT_KEEPALIVE => {
let inbound_actions = link.engine.record_inbound(time::now());
let link_id = *link.engine.link_id();
LinkDataResult::Keepalive {
link_id,
inbound_actions,
}
}
constants::CONTEXT_LINKCLOSE => {
let teardown_actions = link.engine.handle_teardown();
let link_id = *link.engine.link_id();
LinkDataResult::LinkClose {
link_id,
teardown_actions,
}
}
constants::CONTEXT_CHANNEL => match link.engine.decrypt(&packet.data) {
Ok(plaintext) => {
let inbound_actions = link.engine.record_inbound(time::now());
let link_id = *link.engine.link_id();
LinkDataResult::Channel {
link_id,
inbound_actions,
plaintext,
packet_hash,
}
}
Err(_) => LinkDataResult::Error,
},
constants::CONTEXT_REQUEST => match link.engine.decrypt(&packet.data) {
Ok(plaintext) => {
let inbound_actions = link.engine.record_inbound(time::now());
let link_id = *link.engine.link_id();
let request_id = packet.get_truncated_hash();
LinkDataResult::Request {
link_id,
inbound_actions,
plaintext,
request_id,
}
}
Err(_) => LinkDataResult::Error,
},
constants::CONTEXT_RESPONSE => match link.engine.decrypt(&packet.data) {
Ok(plaintext) => {
let inbound_actions = link.engine.record_inbound(time::now());
let link_id = *link.engine.link_id();
LinkDataResult::Response {
link_id,
inbound_actions,
plaintext,
}
}
Err(_) => LinkDataResult::Error,
},
constants::CONTEXT_RESOURCE_ADV => match link.engine.decrypt(&packet.data) {
Ok(plaintext) => {
let inbound_actions = link.engine.record_inbound(time::now());
let link_id = *link.engine.link_id();
LinkDataResult::ResourceAdv {
link_id,
inbound_actions,
plaintext,
}
}
Err(_) => LinkDataResult::Error,
},
constants::CONTEXT_RESOURCE_REQ => match link.engine.decrypt(&packet.data) {
Ok(plaintext) => {
let inbound_actions = link.engine.record_inbound(time::now());
let link_id = *link.engine.link_id();
LinkDataResult::ResourceReq {
link_id,
inbound_actions,
plaintext,
}
}
Err(_) => LinkDataResult::Error,
},
constants::CONTEXT_RESOURCE_HMU => match link.engine.decrypt(&packet.data) {
Ok(plaintext) => {
let inbound_actions = link.engine.record_inbound(time::now());
let link_id = *link.engine.link_id();
LinkDataResult::ResourceHmu {
link_id,
inbound_actions,
plaintext,
}
}
Err(_) => LinkDataResult::Error,
},
constants::CONTEXT_RESOURCE => {
let inbound_actions = link.engine.record_inbound(time::now());
let link_id = *link.engine.link_id();
LinkDataResult::ResourcePart {
link_id,
inbound_actions,
raw_data: &packet.data,
}
}
constants::CONTEXT_RESOURCE_PRF => match link.engine.decrypt(&packet.data) {
Ok(plaintext) => {
let inbound_actions = link.engine.record_inbound(time::now());
let link_id = *link.engine.link_id();
LinkDataResult::ResourcePrf {
link_id,
inbound_actions,
plaintext,
}
}
Err(_) => LinkDataResult::Error,
},
constants::CONTEXT_RESOURCE_ICL => {
let _ = link.engine.decrypt(&packet.data); let inbound_actions = link.engine.record_inbound(time::now());
let link_id = *link.engine.link_id();
LinkDataResult::ResourceIcl {
link_id,
inbound_actions,
}
}
constants::CONTEXT_RESOURCE_RCL => {
let _ = link.engine.decrypt(&packet.data); let inbound_actions = link.engine.record_inbound(time::now());
let link_id = *link.engine.link_id();
LinkDataResult::ResourceRcl {
link_id,
inbound_actions,
}
}
_ => match link.engine.decrypt(&packet.data) {
Ok(plaintext) => {
let inbound_actions = link.engine.record_inbound(time::now());
let link_id = *link.engine.link_id();
LinkDataResult::Generic {
link_id,
inbound_actions,
plaintext,
context: packet.context,
packet_hash,
}
}
Err(_) => LinkDataResult::Error,
},
}
};
let mut actions = Vec::new();
match result {
LinkDataResult::Lrrtt {
link_id,
link_actions,
} => {
actions.extend(self.process_link_actions(&link_id, &link_actions));
if let Some(link) = self.links.get_mut(&link_id) {
if link.engine.state() == LinkState::Active {
let rtt = link.engine.rtt().unwrap_or(1.0);
link.channel = Some(Channel::new(rtt));
}
}
}
LinkDataResult::Identify {
link_id,
link_actions,
} => {
actions.extend(self.process_link_actions(&link_id, &link_actions));
}
LinkDataResult::Keepalive {
link_id,
inbound_actions,
} => {
actions.extend(self.process_link_actions(&link_id, &inbound_actions));
}
LinkDataResult::LinkClose {
link_id,
teardown_actions,
} => {
actions.extend(self.process_link_actions(&link_id, &teardown_actions));
}
LinkDataResult::Channel {
link_id,
inbound_actions,
plaintext,
packet_hash,
} => {
actions.extend(self.process_link_actions(&link_id, &inbound_actions));
if let Some(link) = self.links.get_mut(&link_id) {
if let Some(ref mut channel) = link.channel {
let chan_actions = channel.receive(&plaintext, time::now());
link.channel_messages_received += chan_actions
.iter()
.filter(|action| {
matches!(
action,
rns_core::channel::ChannelAction::MessageReceived { .. }
)
})
.count()
as u64;
let _ = link;
actions.extend(self.process_channel_actions(&link_id, chan_actions, rng));
}
}
actions.extend(self.build_link_packet_proof(&link_id, &packet_hash));
}
LinkDataResult::Request {
link_id,
inbound_actions,
plaintext,
request_id,
} => {
actions.extend(self.process_link_actions(&link_id, &inbound_actions));
actions.extend(self.handle_request(&link_id, &plaintext, request_id, rng));
}
LinkDataResult::Response {
link_id,
inbound_actions,
plaintext,
} => {
actions.extend(self.process_link_actions(&link_id, &inbound_actions));
actions.extend(self.handle_response(&link_id, &plaintext, None));
}
LinkDataResult::Generic {
link_id,
inbound_actions,
plaintext,
context,
packet_hash,
} => {
actions.extend(self.process_link_actions(&link_id, &inbound_actions));
actions.push(LinkManagerAction::LinkDataReceived {
link_id,
context,
data: plaintext,
});
actions.extend(self.build_link_packet_proof(&link_id, &packet_hash));
}
LinkDataResult::ResourceAdv {
link_id,
inbound_actions,
plaintext,
} => {
actions.extend(self.process_link_actions(&link_id, &inbound_actions));
actions.extend(self.handle_resource_adv(&link_id, &plaintext, rng));
}
LinkDataResult::ResourceReq {
link_id,
inbound_actions,
plaintext,
} => {
actions.extend(self.process_link_actions(&link_id, &inbound_actions));
actions.extend(self.handle_resource_req(&link_id, &plaintext, rng));
}
LinkDataResult::ResourceHmu {
link_id,
inbound_actions,
plaintext,
} => {
actions.extend(self.process_link_actions(&link_id, &inbound_actions));
actions.extend(self.handle_resource_hmu(&link_id, &plaintext, rng));
}
LinkDataResult::ResourcePart {
link_id,
inbound_actions,
raw_data,
} => {
actions.extend(self.process_link_actions(&link_id, &inbound_actions));
actions.extend(self.handle_resource_part(&link_id, &raw_data, rng));
}
LinkDataResult::ResourcePrf {
link_id,
inbound_actions,
plaintext,
} => {
actions.extend(self.process_link_actions(&link_id, &inbound_actions));
actions.extend(self.handle_resource_prf(&link_id, &plaintext, rng));
}
LinkDataResult::ResourceIcl {
link_id,
inbound_actions,
} => {
actions.extend(self.process_link_actions(&link_id, &inbound_actions));
actions.extend(self.handle_resource_icl(&link_id));
}
LinkDataResult::ResourceRcl {
link_id,
inbound_actions,
} => {
actions.extend(self.process_link_actions(&link_id, &inbound_actions));
actions.extend(self.handle_resource_rcl(&link_id));
}
LinkDataResult::Error => {}
}
actions
}
fn handle_request(
&mut self,
link_id: &LinkId,
plaintext: &[u8],
request_id: [u8; 16],
rng: &mut dyn Rng,
) -> Vec<LinkManagerAction> {
use rns_core::msgpack::{self, Value};
let arr = match msgpack::unpack_exact(plaintext) {
Ok(Value::Array(arr)) if arr.len() >= 3 => arr,
_ => return Vec::new(),
};
let path_hash_bytes = match &arr[1] {
Value::Bin(b) if b.len() == 16 => b,
_ => return Vec::new(),
};
let mut path_hash = [0u8; 16];
path_hash.copy_from_slice(path_hash_bytes);
let request_data = msgpack::pack(&arr[2]);
if self.management_paths.contains(&path_hash) {
let remote_identity = self
.links
.get(link_id)
.and_then(|l| l.remote_identity)
.map(|(h, k)| (h, k));
return vec![LinkManagerAction::ManagementRequest {
link_id: *link_id,
path_hash,
data: request_data,
request_id,
remote_identity,
}];
}
let handler_idx = self
.request_handlers
.iter()
.position(|h| h.path_hash == path_hash);
let handler_idx = match handler_idx {
Some(i) => i,
None => return Vec::new(),
};
let remote_identity = self
.links
.get(link_id)
.and_then(|l| l.remote_identity.as_ref());
let handler = &self.request_handlers[handler_idx];
if let Some(ref allowed) = handler.allowed_list {
match remote_identity {
Some((identity_hash, _)) => {
if !allowed.contains(identity_hash) {
log::debug!("Request denied: identity not in allowed list");
return Vec::new();
}
}
None => {
log::debug!("Request denied: peer not identified");
return Vec::new();
}
}
}
let path = handler.path.clone();
let response = (handler.handler)(*link_id, &path, &request_data, remote_identity);
let mut actions = Vec::new();
if let Some(response) = response {
match response {
RequestResponse::Bytes(response_data) => {
let mut response_actions =
self.build_response_packet(link_id, &request_id, &response_data, rng);
if response_actions.is_empty() {
response_actions.extend(self.send_response_resource(
link_id,
&request_id,
&response_data,
None,
true,
rng,
));
}
actions.extend(response_actions);
}
RequestResponse::Resource {
data,
metadata,
auto_compress,
} => {
actions.extend(self.send_response_resource(
link_id,
&request_id,
&data,
metadata.as_deref(),
auto_compress,
rng,
));
}
}
}
actions
}
fn build_response_packet(
&self,
link_id: &LinkId,
request_id: &[u8; 16],
response_data: &[u8],
rng: &mut dyn Rng,
) -> Vec<LinkManagerAction> {
use rns_core::msgpack::{self, Value};
let response_value = msgpack::unpack_exact(response_data)
.unwrap_or_else(|_| Value::Bin(response_data.to_vec()));
let response_array = Value::Array(vec![Value::Bin(request_id.to_vec()), response_value]);
let response_plaintext = msgpack::pack(&response_array);
let mut actions = Vec::new();
if let Some(link) = self.links.get(link_id) {
if let Ok(encrypted) = link.engine.encrypt(&response_plaintext, rng) {
let flags = PacketFlags {
header_type: constants::HEADER_1,
context_flag: constants::FLAG_UNSET,
transport_type: constants::TRANSPORT_BROADCAST,
destination_type: constants::DESTINATION_LINK,
packet_type: constants::PACKET_TYPE_DATA,
};
let max_mtu = link.engine.mtu() as usize;
if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash_with_max_mtu(
flags,
0,
link_id,
None,
constants::CONTEXT_RESPONSE,
&encrypted,
max_mtu,
) {
actions.push(LinkManagerAction::SendPacket {
raw,
dest_type: constants::DESTINATION_LINK,
attached_interface: None,
});
}
}
}
actions
}
fn send_response_resource(
&mut self,
link_id: &LinkId,
request_id: &[u8; 16],
response_data: &[u8],
metadata: Option<&[u8]>,
auto_compress: bool,
rng: &mut dyn Rng,
) -> Vec<LinkManagerAction> {
use rns_core::msgpack::{self, Value};
let link = match self.links.get_mut(link_id) {
Some(l) => l,
None => return Vec::new(),
};
if link.engine.state() != LinkState::Active {
return Vec::new();
}
let now = time::now();
let response_value = msgpack::unpack_exact(response_data)
.unwrap_or_else(|_| Value::Bin(response_data.to_vec()));
let response_array = Value::Array(vec![Value::Bin(request_id.to_vec()), response_value]);
let resource_payload = msgpack::pack(&response_array);
let senders = match Self::build_resource_senders(
link,
&resource_payload,
metadata,
auto_compress,
true, Some(request_id.to_vec()),
rng,
now,
) {
Ok(s) => s,
Err(e) => {
log::debug!("Failed to create response ResourceSender: {}", e);
return Vec::new();
}
};
let adv_actions = Self::start_resource_senders(link, senders, now);
let _ = link;
self.process_resource_actions(link_id, adv_actions, rng)
}
pub fn send_management_response(
&mut self,
link_id: &LinkId,
request_id: &[u8; 16],
response_data: &[u8],
rng: &mut dyn Rng,
) -> Vec<LinkManagerAction> {
let mut actions = self.build_response_packet(link_id, request_id, response_data, rng);
if actions.is_empty() {
actions.extend(self.send_response_resource(
link_id,
request_id,
response_data,
None,
true,
rng,
));
}
actions
}
pub fn send_request(
&self,
link_id: &LinkId,
path: &str,
data: &[u8],
rng: &mut dyn Rng,
) -> Vec<LinkManagerAction> {
use rns_core::msgpack::{self, Value};
let link = match self.links.get(link_id) {
Some(l) => l,
None => return Vec::new(),
};
if link.engine.state() != LinkState::Active {
return Vec::new();
}
let path_hash = compute_path_hash(path);
let data_value = msgpack::unpack_exact(data).unwrap_or_else(|_| Value::Bin(data.to_vec()));
let request_array = Value::Array(vec![
Value::Float(time::now()),
Value::Bin(path_hash.to_vec()),
data_value,
]);
let plaintext = msgpack::pack(&request_array);
let encrypted = match link.engine.encrypt(&plaintext, rng) {
Ok(e) => e,
Err(_) => return Vec::new(),
};
let flags = PacketFlags {
header_type: constants::HEADER_1,
context_flag: constants::FLAG_UNSET,
transport_type: constants::TRANSPORT_BROADCAST,
destination_type: constants::DESTINATION_LINK,
packet_type: constants::PACKET_TYPE_DATA,
};
let mut actions = Vec::new();
if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
flags,
0,
link_id,
None,
constants::CONTEXT_REQUEST,
&encrypted,
) {
actions.push(LinkManagerAction::SendPacket {
raw,
dest_type: constants::DESTINATION_LINK,
attached_interface: None,
});
}
actions
}
pub fn send_on_link(
&self,
link_id: &LinkId,
plaintext: &[u8],
context: u8,
rng: &mut dyn Rng,
) -> Vec<LinkManagerAction> {
let link = match self.links.get(link_id) {
Some(l) => l,
None => return Vec::new(),
};
if link.engine.state() != LinkState::Active {
return Vec::new();
}
let encrypted = match link.engine.encrypt(plaintext, rng) {
Ok(e) => e,
Err(_) => return Vec::new(),
};
let flags = PacketFlags {
header_type: constants::HEADER_1,
context_flag: constants::FLAG_UNSET,
transport_type: constants::TRANSPORT_BROADCAST,
destination_type: constants::DESTINATION_LINK,
packet_type: constants::PACKET_TYPE_DATA,
};
let mut actions = Vec::new();
if let Ok((raw, _packet_hash)) =
RawPacket::pack_raw_with_hash(flags, 0, link_id, None, context, &encrypted)
{
actions.push(LinkManagerAction::SendPacket {
raw,
dest_type: constants::DESTINATION_LINK,
attached_interface: None,
});
}
actions
}
pub fn identify(
&self,
link_id: &LinkId,
identity: &rns_crypto::identity::Identity,
rng: &mut dyn Rng,
) -> Vec<LinkManagerAction> {
let link = match self.links.get(link_id) {
Some(l) => l,
None => return Vec::new(),
};
let encrypted = match link.engine.build_identify(identity, rng) {
Ok(e) => e,
Err(_) => return Vec::new(),
};
let flags = PacketFlags {
header_type: constants::HEADER_1,
context_flag: constants::FLAG_UNSET,
transport_type: constants::TRANSPORT_BROADCAST,
destination_type: constants::DESTINATION_LINK,
packet_type: constants::PACKET_TYPE_DATA,
};
let mut actions = Vec::new();
if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
flags,
0,
link_id,
None,
constants::CONTEXT_LINKIDENTIFY,
&encrypted,
) {
actions.push(LinkManagerAction::SendPacket {
raw,
dest_type: constants::DESTINATION_LINK,
attached_interface: None,
});
}
actions
}
pub fn teardown_link(&mut self, link_id: &LinkId) -> Vec<LinkManagerAction> {
let link = match self.links.get_mut(link_id) {
Some(l) => l,
None => return Vec::new(),
};
let teardown_actions = link.engine.teardown();
if let Some(ref mut channel) = link.channel {
channel.shutdown();
}
let mut actions = self.process_link_actions(link_id, &teardown_actions);
let flags = PacketFlags {
header_type: constants::HEADER_1,
context_flag: constants::FLAG_UNSET,
transport_type: constants::TRANSPORT_BROADCAST,
destination_type: constants::DESTINATION_LINK,
packet_type: constants::PACKET_TYPE_DATA,
};
if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
flags,
0,
link_id,
None,
constants::CONTEXT_LINKCLOSE,
&[],
) {
actions.push(LinkManagerAction::SendPacket {
raw,
dest_type: constants::DESTINATION_LINK,
attached_interface: None,
});
}
actions
}
pub fn teardown_all_links(&mut self) -> Vec<LinkManagerAction> {
let link_ids: Vec<LinkId> = self.links.keys().copied().collect();
let mut actions = Vec::new();
for link_id in link_ids {
actions.extend(self.teardown_link(&link_id));
}
actions
}
fn handle_response(
&self,
link_id: &LinkId,
plaintext: &[u8],
metadata: Option<Vec<u8>>,
) -> Vec<LinkManagerAction> {
use rns_core::msgpack;
let arr = match msgpack::unpack_exact(plaintext) {
Ok(msgpack::Value::Array(arr)) if arr.len() >= 2 => arr,
_ => return Vec::new(),
};
let request_id_bytes = match &arr[0] {
msgpack::Value::Bin(b) if b.len() == 16 => b,
_ => return Vec::new(),
};
let mut request_id = [0u8; 16];
request_id.copy_from_slice(request_id_bytes);
let response_data = msgpack::pack(&arr[1]);
vec![LinkManagerAction::ResponseReceived {
link_id: *link_id,
request_id,
data: response_data,
metadata,
}]
}
fn build_resource_senders(
link: &ManagedLink,
data: &[u8],
metadata: Option<&[u8]>,
auto_compress: bool,
is_response: bool,
request_id: Option<Vec<u8>>,
rng: &mut dyn Rng,
now: f64,
) -> Result<Vec<ResourceSender>, rns_core::resource::ResourceError> {
let link_rtt = link.engine.rtt().unwrap_or(1.0);
let resource_sdu = Self::resource_sdu_for_link(link);
let metadata_overhead = metadata.map(|m| 3 + m.len()).unwrap_or(0);
let logical_size = metadata_overhead + data.len();
if logical_size <= constants::RESOURCE_MAX_EFFICIENT_SIZE {
let enc_rng = std::cell::RefCell::new(rns_crypto::OsRng);
let encrypt_fn = |plaintext: &[u8]| -> Vec<u8> {
link.engine
.encrypt(plaintext, &mut *enc_rng.borrow_mut())
.unwrap_or_else(|_| plaintext.to_vec())
};
return ResourceSender::new(
data,
metadata,
resource_sdu,
&encrypt_fn,
&Bzip2Compressor,
rng,
now,
auto_compress,
is_response,
request_id,
1,
1,
None,
link_rtt,
6.0,
)
.map(|sender| vec![sender]);
}
if metadata_overhead > constants::RESOURCE_MAX_EFFICIENT_SIZE {
return Err(rns_core::resource::ResourceError::TooLarge);
}
let first_payload_len = core::cmp::min(
data.len(),
constants::RESOURCE_MAX_EFFICIENT_SIZE - metadata_overhead,
);
let remaining = data.len().saturating_sub(first_payload_len);
let total_segments = 1 + remaining.div_ceil(constants::RESOURCE_MAX_EFFICIENT_SIZE) as u64;
let enc_rng = std::cell::RefCell::new(rns_crypto::OsRng);
let encrypt_fn = |plaintext: &[u8]| -> Vec<u8> {
link.engine
.encrypt(plaintext, &mut *enc_rng.borrow_mut())
.unwrap_or_else(|_| plaintext.to_vec())
};
let mut senders = Vec::new();
let mut first = ResourceSender::new(
&data[..first_payload_len],
metadata,
resource_sdu,
&encrypt_fn,
&Bzip2Compressor,
rng,
now,
auto_compress,
is_response,
request_id.clone(),
1,
total_segments,
None,
link_rtt,
6.0,
)?;
first.data_size = logical_size;
let original_hash = first.original_hash;
let has_metadata = metadata.is_some();
senders.push(first);
let mut offset = first_payload_len;
let mut segment_index = 2;
while offset < data.len() {
let end = core::cmp::min(offset + constants::RESOURCE_MAX_EFFICIENT_SIZE, data.len());
let mut sender = ResourceSender::new(
&data[offset..end],
None,
resource_sdu,
&encrypt_fn,
&Bzip2Compressor,
rng,
now,
auto_compress,
is_response,
request_id.clone(),
segment_index,
total_segments,
Some(original_hash),
link_rtt,
6.0,
)?;
sender.data_size = logical_size;
sender.flags.has_metadata = has_metadata;
senders.push(sender);
segment_index += 1;
offset = end;
}
Ok(senders)
}
fn start_resource_senders(
link: &mut ManagedLink,
mut senders: Vec<ResourceSender>,
now: f64,
) -> Vec<ResourceAction> {
if senders.is_empty() {
return Vec::new();
}
let mut first = senders.remove(0);
let adv_actions = first.advertise(now);
if first.total_segments > 1 {
let original_hash = first.original_hash;
let split = OutgoingSplitTransfer {
total_segments: first.total_segments,
completed_segments: 0,
current_segment_index: first.segment_index,
current_sent_parts: 0,
current_total_parts: first.total_parts(),
};
link.outgoing_splits.insert(original_hash, split);
}
link.outgoing_resources.push(first);
link.outgoing_resources.extend(senders);
adv_actions
}
fn handle_resource_adv(
&mut self,
link_id: &LinkId,
adv_plaintext: &[u8],
rng: &mut dyn Rng,
) -> Vec<LinkManagerAction> {
let link = match self.links.get_mut(link_id) {
Some(l) => l,
None => return Vec::new(),
};
let link_rtt = link.engine.rtt().unwrap_or(1.0);
let resource_sdu = Self::resource_sdu_for_link(link);
let now = time::now();
let receiver = match ResourceReceiver::from_advertisement(
adv_plaintext,
resource_sdu,
link_rtt,
now,
None,
None,
) {
Ok(r) => r,
Err(e) => {
log::debug!("Resource ADV rejected: {}", e);
return Vec::new();
}
};
let strategy = link.resource_strategy;
let resource_hash = receiver.resource_hash.clone();
let transfer_size = receiver.transfer_size;
let has_metadata = receiver.has_metadata;
let is_response = receiver.flags.is_response;
let is_split = receiver.flags.split;
let segment_index = receiver.segment_index;
let total_segments = receiver.total_segments;
let original_hash = match Self::resource_hash_key(&receiver.original_hash) {
Some(key) => key,
None => return Vec::new(),
};
if is_split && segment_index > 1 {
let should_accept = link
.incoming_splits
.get(&original_hash)
.is_some_and(|split| {
split.completed_segments + 1 == segment_index
&& split.total_segments == total_segments
});
if !should_accept {
let reject_actions = {
let mut r = receiver;
r.reject()
};
let _ = link;
return self.process_resource_actions(link_id, reject_actions, rng);
}
let current_total_parts = receiver.total_parts;
link.incoming_resources.push(receiver);
let idx = link.incoming_resources.len() - 1;
if let Some(split) = link.incoming_splits.get_mut(&original_hash) {
split.current_segment_index = segment_index;
split.current_received_parts = 0;
split.current_total_parts = current_total_parts;
}
let resource_actions = link.incoming_resources[idx].accept(now);
let _ = link;
return self.process_resource_actions(link_id, resource_actions, rng);
}
if is_response {
if is_split {
link.incoming_splits.insert(
original_hash,
IncomingSplitTransfer {
total_segments,
completed_segments: 0,
current_segment_index: segment_index,
current_received_parts: 0,
current_total_parts: receiver.total_parts,
data: Vec::new(),
metadata: None,
is_response,
},
);
}
link.incoming_resources.push(receiver);
let idx = link.incoming_resources.len() - 1;
let resource_actions = link.incoming_resources[idx].accept(now);
let _ = link;
return self.process_resource_actions(link_id, resource_actions, rng);
}
match strategy {
ResourceStrategy::AcceptNone => {
let reject_actions = {
let mut r = receiver;
r.reject()
};
self.process_resource_actions(link_id, reject_actions, rng)
}
ResourceStrategy::AcceptAll => {
if is_split {
link.incoming_splits.insert(
original_hash,
IncomingSplitTransfer {
total_segments,
completed_segments: 0,
current_segment_index: segment_index,
current_received_parts: 0,
current_total_parts: receiver.total_parts,
data: Vec::new(),
metadata: None,
is_response,
},
);
}
link.incoming_resources.push(receiver);
let idx = link.incoming_resources.len() - 1;
let resource_actions = link.incoming_resources[idx].accept(now);
let _ = link;
self.process_resource_actions(link_id, resource_actions, rng)
}
ResourceStrategy::AcceptApp => {
link.incoming_resources.push(receiver);
vec![LinkManagerAction::ResourceAcceptQuery {
link_id: *link_id,
resource_hash,
transfer_size,
has_metadata,
}]
}
}
}
pub fn accept_resource(
&mut self,
link_id: &LinkId,
resource_hash: &[u8],
accept: bool,
rng: &mut dyn Rng,
) -> Vec<LinkManagerAction> {
let link = match self.links.get_mut(link_id) {
Some(l) => l,
None => return Vec::new(),
};
let now = time::now();
let idx = link
.incoming_resources
.iter()
.position(|r| r.resource_hash == resource_hash);
let idx = match idx {
Some(i) => i,
None => return Vec::new(),
};
if accept && link.incoming_resources[idx].flags.split {
if let Some(original_hash) =
Self::resource_hash_key(&link.incoming_resources[idx].original_hash)
{
link.incoming_splits
.entry(original_hash)
.or_insert_with(|| IncomingSplitTransfer {
total_segments: link.incoming_resources[idx].total_segments,
completed_segments: 0,
current_segment_index: link.incoming_resources[idx].segment_index,
current_received_parts: 0,
current_total_parts: link.incoming_resources[idx].total_parts,
data: Vec::new(),
metadata: None,
is_response: link.incoming_resources[idx].flags.is_response,
});
}
}
let resource_actions = if accept {
link.incoming_resources[idx].accept(now)
} else {
link.incoming_resources[idx].reject()
};
let _ = link;
self.process_resource_actions(link_id, resource_actions, rng)
}
fn handle_resource_req(
&mut self,
link_id: &LinkId,
plaintext: &[u8],
rng: &mut dyn Rng,
) -> Vec<LinkManagerAction> {
let link = match self.links.get_mut(link_id) {
Some(l) => l,
None => return Vec::new(),
};
let now = time::now();
let mut all_actions = Vec::new();
let mut progress_update = None;
for sender in &mut link.outgoing_resources {
if sender.flags.split && sender.status == rns_core::resource::ResourceStatus::Queued {
continue;
}
let before_sent = sender.sent_parts;
let resource_actions = sender.handle_request(plaintext, now);
if !resource_actions.is_empty() {
if sender.sent_parts != before_sent {
if sender.flags.split {
if let Some(split) = link.outgoing_splits.get_mut(&sender.original_hash) {
split.current_segment_index = sender.segment_index;
split.current_sent_parts = sender.sent_parts;
split.current_total_parts = sender.total_parts();
progress_update =
Some(Self::outgoing_split_progress(split, sender.sdu));
}
} else {
progress_update = Some((sender.sent_parts, sender.total_parts()));
}
}
all_actions.extend(resource_actions);
break;
}
}
let _ = link;
let mut out = self.process_resource_actions(link_id, all_actions, rng);
if let Some((received, total)) = progress_update {
out.push(LinkManagerAction::ResourceProgress {
link_id: *link_id,
received,
total,
});
}
out
}
fn handle_resource_hmu(
&mut self,
link_id: &LinkId,
plaintext: &[u8],
rng: &mut dyn Rng,
) -> Vec<LinkManagerAction> {
let link = match self.links.get_mut(link_id) {
Some(l) => l,
None => return Vec::new(),
};
let now = time::now();
let mut all_actions = Vec::new();
for receiver in &mut link.incoming_resources {
let resource_actions = receiver.handle_hashmap_update(plaintext, now);
if !resource_actions.is_empty() {
all_actions.extend(resource_actions);
break;
}
}
let _ = link;
self.process_resource_actions(link_id, all_actions, rng)
}
fn handle_resource_part(
&mut self,
link_id: &LinkId,
raw_data: &[u8],
rng: &mut dyn Rng,
) -> Vec<LinkManagerAction> {
let link = match self.links.get_mut(link_id) {
Some(l) => l,
None => return Vec::new(),
};
let now = time::now();
let resource_sdu = Self::resource_sdu_for_link(link);
let mut all_actions = Vec::new();
let mut assemble_idx = None;
let mut assembled_is_response = false;
for (idx, receiver) in link.incoming_resources.iter_mut().enumerate() {
if receiver.status >= rns_core::resource::ResourceStatus::Complete {
continue;
}
let resource_actions = receiver.receive_part(raw_data, now);
if !resource_actions.is_empty() {
if receiver.received_count == receiver.total_parts {
assemble_idx = Some(idx);
}
if receiver.flags.split {
if let Some(key) = Self::resource_hash_key(&receiver.original_hash) {
if let Some(split) = link.incoming_splits.get_mut(&key) {
split.current_segment_index = receiver.segment_index;
split.current_received_parts = receiver.received_count;
split.current_total_parts = receiver.total_parts;
let (received, total) =
Self::incoming_split_progress(split, resource_sdu);
for action in resource_actions {
match action {
ResourceAction::ProgressUpdate { .. } => {
all_actions.push(ResourceAction::ProgressUpdate {
received,
total,
});
}
other => all_actions.push(other),
}
}
} else {
all_actions.extend(resource_actions);
}
} else {
all_actions.extend(resource_actions);
}
} else {
all_actions.extend(resource_actions);
}
break;
}
}
if let Some(idx) = assemble_idx {
let split_key = if link.incoming_resources[idx].flags.split {
Self::resource_hash_key(&link.incoming_resources[idx].original_hash)
} else {
None
};
let split_segment_index = link.incoming_resources[idx].segment_index;
let split_segment_total = link.incoming_resources[idx].total_segments;
let split_segment_parts = link.incoming_resources[idx].total_parts;
let split_is_response = link.incoming_resources[idx].flags.is_response;
let decrypt_fn = |ciphertext: &[u8]| -> Result<Vec<u8>, ()> {
link.engine.decrypt(ciphertext).map_err(|_| ())
};
let mut assemble_actions =
link.incoming_resources[idx].assemble(&decrypt_fn, &Bzip2Compressor);
assembled_is_response = split_is_response;
if let Some(key) = split_key {
let mut converted_actions = Vec::new();
let mut segment_data = None;
let mut segment_metadata = None;
for action in assemble_actions {
match action {
ResourceAction::DataReceived { data, metadata } => {
segment_data = Some(data);
segment_metadata = metadata;
}
ResourceAction::Completed => {}
other => converted_actions.push(other),
}
}
if let Some(data) = segment_data {
if let Some(split) = link.incoming_splits.get_mut(&key) {
split.data.extend_from_slice(&data);
if segment_metadata.is_some() {
split.metadata = segment_metadata;
}
split.completed_segments = split_segment_index;
split.current_segment_index = split_segment_index;
split.current_received_parts = split_segment_parts;
split.current_total_parts = split_segment_parts;
}
if split_segment_index == split_segment_total {
if let Some(split) = link.incoming_splits.remove(&key) {
assembled_is_response = split.is_response;
converted_actions.push(ResourceAction::DataReceived {
data: split.data,
metadata: split.metadata,
});
converted_actions.push(ResourceAction::Completed);
}
}
}
assemble_actions = converted_actions;
}
all_actions.extend(assemble_actions);
}
let _ = link;
let mut out = self.process_resource_actions(link_id, all_actions, rng);
if assembled_is_response {
let mut converted = Vec::new();
for action in out {
match action {
LinkManagerAction::ResourceReceived { data, metadata, .. } => {
converted.extend(self.handle_response(link_id, &data, metadata));
}
LinkManagerAction::ResourceAcceptQuery { .. } => {
}
other => converted.push(other),
}
}
out = converted;
}
out
}
fn handle_resource_prf(
&mut self,
link_id: &LinkId,
plaintext: &[u8],
rng: &mut dyn Rng,
) -> Vec<LinkManagerAction> {
let link = match self.links.get_mut(link_id) {
Some(l) => l,
None => return Vec::new(),
};
let now = time::now();
let mut result_actions = Vec::new();
let mut completed_sender = None;
let mut failed_split = None;
let proof_hash = plaintext.get(..32);
for sender in &mut link.outgoing_resources {
if proof_hash.is_some_and(|hash| hash != sender.resource_hash.as_slice()) {
continue;
}
let resource_actions = sender.handle_proof(plaintext, now);
if !resource_actions.is_empty() {
if resource_actions
.iter()
.any(|action| matches!(action, ResourceAction::Completed))
{
completed_sender = Some((
sender.original_hash,
sender.segment_index,
sender.total_segments,
sender.total_parts(),
));
}
if sender.flags.split
&& resource_actions
.iter()
.any(|action| matches!(action, ResourceAction::Failed(_)))
{
failed_split = Some(sender.original_hash);
}
result_actions.extend(resource_actions);
break;
}
}
let mut actions = Vec::new();
let mut advertise_next = None;
for ra in result_actions {
match ra {
ResourceAction::Completed => {
if let Some((original_hash, segment_index, total_segments, total_parts)) =
completed_sender
{
if total_segments > 1 && segment_index < total_segments {
if let Some(split) = link.outgoing_splits.get_mut(&original_hash) {
split.completed_segments = segment_index;
split.current_segment_index = segment_index;
split.current_sent_parts = total_parts;
split.current_total_parts = total_parts;
if let Some(next) = link.outgoing_resources.iter_mut().find(|s| {
s.flags.split
&& s.original_hash == original_hash
&& s.segment_index == segment_index + 1
}) {
split.current_segment_index = next.segment_index;
split.current_sent_parts = 0;
split.current_total_parts = next.total_parts();
advertise_next = Some(next.advertise(now));
}
}
} else {
link.outgoing_splits.remove(&original_hash);
actions
.push(LinkManagerAction::ResourceCompleted { link_id: *link_id });
}
} else {
actions.push(LinkManagerAction::ResourceCompleted { link_id: *link_id });
}
}
ResourceAction::Failed(e) => {
if let Some(original_hash) = failed_split {
link.outgoing_splits.remove(&original_hash);
}
actions.push(LinkManagerAction::ResourceFailed {
link_id: *link_id,
error: format!("{}", e),
});
}
_ => {}
}
}
link.outgoing_resources
.retain(|s| s.status < rns_core::resource::ResourceStatus::Complete);
let _ = link;
if let Some(next_actions) = advertise_next {
actions.extend(self.process_resource_actions(link_id, next_actions, rng));
}
actions
}
fn handle_resource_icl(&mut self, link_id: &LinkId) -> Vec<LinkManagerAction> {
let link = match self.links.get_mut(link_id) {
Some(l) => l,
None => return Vec::new(),
};
let mut actions = Vec::new();
for receiver in &mut link.incoming_resources {
let ra = receiver.handle_cancel();
for a in ra {
if let ResourceAction::Failed(ref e) = a {
actions.push(LinkManagerAction::ResourceFailed {
link_id: *link_id,
error: format!("{}", e),
});
}
}
}
link.incoming_resources
.retain(|r| r.status < rns_core::resource::ResourceStatus::Complete);
link.incoming_splits.clear();
actions
}
fn handle_resource_rcl(&mut self, link_id: &LinkId) -> Vec<LinkManagerAction> {
let link = match self.links.get_mut(link_id) {
Some(l) => l,
None => return Vec::new(),
};
let mut actions = Vec::new();
for sender in &mut link.outgoing_resources {
let ra = sender.handle_reject();
for a in ra {
if let ResourceAction::Failed(ref e) = a {
actions.push(LinkManagerAction::ResourceFailed {
link_id: *link_id,
error: format!("{}", e),
});
}
}
}
link.outgoing_resources
.retain(|s| s.status < rns_core::resource::ResourceStatus::Complete);
link.outgoing_splits.clear();
actions
}
fn process_resource_actions(
&mut self,
link_id: &LinkId,
actions: Vec<ResourceAction>,
rng: &mut dyn Rng,
) -> Vec<LinkManagerAction> {
let mut result = Vec::new();
for action in actions {
match action {
ResourceAction::SendAdvertisement(data) => {
let encrypted = self
.links
.get(link_id)
.and_then(|link| link.engine.encrypt(&data, rng).ok());
if let Some(encrypted) = encrypted {
result.extend(self.build_link_packet(
link_id,
constants::CONTEXT_RESOURCE_ADV,
&encrypted,
));
}
}
ResourceAction::SendPart(data) => {
result.extend(self.build_link_packet(
link_id,
constants::CONTEXT_RESOURCE,
&data,
));
}
ResourceAction::SendRequest(data) => {
let encrypted = self
.links
.get(link_id)
.and_then(|link| link.engine.encrypt(&data, rng).ok());
if let Some(encrypted) = encrypted {
result.extend(self.build_link_packet(
link_id,
constants::CONTEXT_RESOURCE_REQ,
&encrypted,
));
}
}
ResourceAction::SendHmu(data) => {
let encrypted = self
.links
.get(link_id)
.and_then(|link| link.engine.encrypt(&data, rng).ok());
if let Some(encrypted) = encrypted {
result.extend(self.build_link_packet(
link_id,
constants::CONTEXT_RESOURCE_HMU,
&encrypted,
));
}
}
ResourceAction::SendProof(data) => {
let encrypted = self
.links
.get(link_id)
.and_then(|link| link.engine.encrypt(&data, rng).ok());
if let Some(encrypted) = encrypted {
result.extend(self.build_link_packet(
link_id,
constants::CONTEXT_RESOURCE_PRF,
&encrypted,
));
}
}
ResourceAction::SendCancelInitiator(data) => {
let encrypted = self
.links
.get(link_id)
.and_then(|link| link.engine.encrypt(&data, rng).ok());
if let Some(encrypted) = encrypted {
result.extend(self.build_link_packet(
link_id,
constants::CONTEXT_RESOURCE_ICL,
&encrypted,
));
}
}
ResourceAction::SendCancelReceiver(data) => {
let encrypted = self
.links
.get(link_id)
.and_then(|link| link.engine.encrypt(&data, rng).ok());
if let Some(encrypted) = encrypted {
result.extend(self.build_link_packet(
link_id,
constants::CONTEXT_RESOURCE_RCL,
&encrypted,
));
}
}
ResourceAction::DataReceived { data, metadata } => {
result.push(LinkManagerAction::ResourceReceived {
link_id: *link_id,
data,
metadata,
});
}
ResourceAction::Completed => {
result.push(LinkManagerAction::ResourceCompleted { link_id: *link_id });
}
ResourceAction::Failed(e) => {
result.push(LinkManagerAction::ResourceFailed {
link_id: *link_id,
error: format!("{}", e),
});
}
ResourceAction::TeardownLink => {
let teardown_actions = match self.links.get_mut(link_id) {
Some(link) => link.engine.handle_teardown(),
None => Vec::new(),
};
result.extend(self.process_link_actions(link_id, &teardown_actions));
}
ResourceAction::ProgressUpdate { received, total } => {
result.push(LinkManagerAction::ResourceProgress {
link_id: *link_id,
received,
total,
});
}
}
}
result
}
fn build_link_packet(
&self,
link_id: &LinkId,
context: u8,
data: &[u8],
) -> Vec<LinkManagerAction> {
let flags = PacketFlags {
header_type: constants::HEADER_1,
context_flag: constants::FLAG_UNSET,
transport_type: constants::TRANSPORT_BROADCAST,
destination_type: constants::DESTINATION_LINK,
packet_type: constants::PACKET_TYPE_DATA,
};
let mut actions = Vec::new();
let max_mtu = self
.links
.get(link_id)
.map(|l| l.engine.mtu() as usize)
.unwrap_or(constants::MTU);
if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash_with_max_mtu(
flags, 0, link_id, None, context, data, max_mtu,
) {
actions.push(LinkManagerAction::SendPacket {
raw,
dest_type: constants::DESTINATION_LINK,
attached_interface: None,
});
}
actions
}
pub fn send_resource(
&mut self,
link_id: &LinkId,
data: &[u8],
metadata: Option<&[u8]>,
rng: &mut dyn Rng,
) -> Vec<LinkManagerAction> {
self.send_resource_with_auto_compress(link_id, data, metadata, true, rng)
}
pub fn send_resource_with_auto_compress(
&mut self,
link_id: &LinkId,
data: &[u8],
metadata: Option<&[u8]>,
auto_compress: bool,
rng: &mut dyn Rng,
) -> Vec<LinkManagerAction> {
let link = match self.links.get_mut(link_id) {
Some(l) => l,
None => return Vec::new(),
};
if link.engine.state() != LinkState::Active {
return Vec::new();
}
let now = time::now();
let senders = match Self::build_resource_senders(
link,
data,
metadata,
auto_compress,
false, None, rng,
now,
) {
Ok(s) => s,
Err(e) => {
log::debug!("Failed to create ResourceSender: {}", e);
return Vec::new();
}
};
let adv_actions = Self::start_resource_senders(link, senders, now);
let _ = link;
self.process_resource_actions(link_id, adv_actions, rng)
}
pub fn set_resource_strategy(&mut self, link_id: &LinkId, strategy: ResourceStrategy) {
if let Some(link) = self.links.get_mut(link_id) {
link.resource_strategy = strategy;
}
}
pub fn flush_channel_tx(&mut self, link_id: &LinkId) {
if let Some(link) = self.links.get_mut(link_id) {
if let Some(ref mut channel) = link.channel {
channel.flush_tx();
}
}
}
pub fn send_channel_message(
&mut self,
link_id: &LinkId,
msgtype: u16,
payload: &[u8],
rng: &mut dyn Rng,
) -> Result<Vec<LinkManagerAction>, String> {
let link = match self.links.get_mut(link_id) {
Some(l) => l,
None => return Err("unknown link".to_string()),
};
let channel = match link.channel {
Some(ref mut ch) => ch,
None => return Err("link has no active channel".to_string()),
};
let link_mdu = link.engine.mdu();
let now = time::now();
let chan_actions = match channel.send(msgtype, payload, now, link_mdu) {
Ok(a) => {
link.channel_send_ok += 1;
a
}
Err(e) => {
log::debug!("Channel send failed: {:?}", e);
match e {
rns_core::channel::ChannelError::NotReady => link.channel_send_not_ready += 1,
rns_core::channel::ChannelError::MessageTooBig => {
link.channel_send_too_big += 1;
}
rns_core::channel::ChannelError::InvalidEnvelope => {
link.channel_send_other_error += 1;
}
}
return Err(e.to_string());
}
};
let _ = link;
Ok(self.process_channel_actions(link_id, chan_actions, rng))
}
pub fn tick(&mut self, rng: &mut dyn Rng) -> Vec<LinkManagerAction> {
let now = time::now();
let mut all_actions = Vec::new();
let link_ids: Vec<LinkId> = self.links.keys().copied().collect();
for link_id in &link_ids {
let link = match self.links.get_mut(link_id) {
Some(l) => l,
None => continue,
};
let tick_actions = link.engine.tick(now);
all_actions.extend(self.process_link_actions(link_id, &tick_actions));
let link = match self.links.get_mut(link_id) {
Some(l) => l,
None => continue,
};
if link.engine.needs_keepalive(now) {
let flags = PacketFlags {
header_type: constants::HEADER_1,
context_flag: constants::FLAG_UNSET,
transport_type: constants::TRANSPORT_BROADCAST,
destination_type: constants::DESTINATION_LINK,
packet_type: constants::PACKET_TYPE_DATA,
};
if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
flags,
0,
link_id,
None,
constants::CONTEXT_KEEPALIVE,
&[],
) {
all_actions.push(LinkManagerAction::SendPacket {
raw,
dest_type: constants::DESTINATION_LINK,
attached_interface: None,
});
link.engine.record_outbound(now, true);
}
}
if let Some(channel) = link.channel.as_mut() {
let chan_actions = channel.tick(now);
let _ = channel;
let _ = link;
all_actions.extend(self.process_channel_actions(link_id, chan_actions, rng));
}
}
for link_id in &link_ids {
let link = match self.links.get_mut(link_id) {
Some(l) => l,
None => continue,
};
let mut sender_actions = Vec::new();
for sender in &mut link.outgoing_resources {
sender_actions.extend(sender.tick(now));
}
let mut receiver_actions = Vec::new();
for receiver in &mut link.incoming_resources {
let decrypt_fn = |ciphertext: &[u8]| -> Result<Vec<u8>, ()> {
link.engine.decrypt(ciphertext).map_err(|_| ())
};
receiver_actions.extend(receiver.tick(now, &decrypt_fn, &Bzip2Compressor));
}
link.outgoing_resources
.retain(|s| s.status < rns_core::resource::ResourceStatus::Complete);
link.incoming_resources
.retain(|r| r.status < rns_core::resource::ResourceStatus::Assembling);
let active_split_hashes: Vec<[u8; 32]> = link
.outgoing_resources
.iter()
.filter(|s| s.flags.split)
.map(|s| s.original_hash)
.collect();
link.outgoing_splits.retain(|original_hash, split| {
split.completed_segments < split.total_segments
&& active_split_hashes.contains(original_hash)
});
let _ = link;
all_actions.extend(self.process_resource_actions(link_id, sender_actions, rng));
all_actions.extend(self.process_resource_actions(link_id, receiver_actions, rng));
}
let closed: Vec<LinkId> = self
.links
.iter()
.filter(|(_, l)| l.engine.state() == LinkState::Closed)
.map(|(id, _)| *id)
.collect();
for id in closed {
self.links.remove(&id);
all_actions.push(LinkManagerAction::DeregisterLinkDest { link_id: id });
}
all_actions
}
pub fn is_link_destination(&self, dest_hash: &[u8; 16]) -> bool {
self.links.contains_key(dest_hash) || self.link_destinations.contains_key(dest_hash)
}
pub fn link_state(&self, link_id: &LinkId) -> Option<LinkState> {
self.links.get(link_id).map(|l| l.engine.state())
}
pub fn link_rtt(&self, link_id: &LinkId) -> Option<f64> {
self.links.get(link_id).and_then(|l| l.engine.rtt())
}
pub fn set_link_rtt(&mut self, link_id: &LinkId, rtt: f64) {
if let Some(link) = self.links.get_mut(link_id) {
link.engine.set_rtt(rtt);
}
}
pub fn record_link_inbound(&mut self, link_id: &LinkId) {
if let Some(link) = self.links.get_mut(link_id) {
link.engine.record_inbound(time::now());
}
}
pub fn set_link_mtu(&mut self, link_id: &LinkId, mtu: u32) {
if let Some(link) = self.links.get_mut(link_id) {
link.engine.set_mtu(mtu);
}
}
pub fn link_count(&self) -> usize {
self.links.len()
}
pub fn resource_transfer_count(&self) -> usize {
self.links
.values()
.map(|managed| {
managed
.incoming_resources
.iter()
.filter(|resource| !resource.flags.split)
.count()
+ managed.incoming_splits.len()
+ managed
.outgoing_resources
.iter()
.filter(|resource| !resource.flags.split)
.count()
+ managed.outgoing_splits.len()
})
.sum()
}
pub fn cancel_all_resources(&mut self, rng: &mut dyn Rng) -> Vec<LinkManagerAction> {
let link_ids: Vec<LinkId> = self.links.keys().copied().collect();
let mut all_actions = Vec::new();
for link_id in &link_ids {
let link = match self.links.get_mut(link_id) {
Some(l) => l,
None => continue,
};
let mut sender_actions = Vec::new();
for sender in &mut link.outgoing_resources {
sender_actions.extend(sender.cancel());
}
let mut receiver_actions = Vec::new();
for receiver in &mut link.incoming_resources {
receiver_actions.extend(receiver.reject());
}
link.outgoing_resources
.retain(|s| s.status < rns_core::resource::ResourceStatus::Complete);
link.incoming_resources
.retain(|r| r.status < rns_core::resource::ResourceStatus::Assembling);
link.outgoing_splits.clear();
link.incoming_splits.clear();
let _ = link;
all_actions.extend(self.process_resource_actions(link_id, sender_actions, rng));
all_actions.extend(self.process_resource_actions(link_id, receiver_actions, rng));
}
all_actions
}
pub fn link_entries(&self) -> Vec<crate::event::LinkInfoEntry> {
self.links
.iter()
.map(|(link_id, managed)| {
let state = match managed.engine.state() {
LinkState::Pending => "pending",
LinkState::Handshake => "handshake",
LinkState::Active => "active",
LinkState::Stale => "stale",
LinkState::Closed => "closed",
};
crate::event::LinkInfoEntry {
link_id: *link_id,
state: state.to_string(),
is_initiator: managed.engine.is_initiator(),
dest_hash: managed.dest_hash,
remote_identity: managed.remote_identity.as_ref().map(|(h, _)| *h),
rtt: managed.engine.rtt(),
channel_window: managed.channel.as_ref().map(|c| c.window()),
channel_outstanding: managed.channel.as_ref().map(|c| c.outstanding()),
pending_channel_packets: managed.pending_channel_packets.len(),
channel_send_ok: managed.channel_send_ok,
channel_send_not_ready: managed.channel_send_not_ready,
channel_send_too_big: managed.channel_send_too_big,
channel_send_other_error: managed.channel_send_other_error,
channel_messages_received: managed.channel_messages_received,
channel_proofs_sent: managed.channel_proofs_sent,
channel_proofs_received: managed.channel_proofs_received,
}
})
.collect()
}
pub fn resource_entries(&self) -> Vec<crate::event::ResourceInfoEntry> {
let mut entries = Vec::new();
for (link_id, managed) in &self.links {
let resource_sdu = Self::resource_sdu_for_link(managed);
for split in managed.incoming_splits.values() {
let (received, total) = Self::incoming_split_progress(split, resource_sdu);
entries.push(crate::event::ResourceInfoEntry {
link_id: *link_id,
direction: "incoming".to_string(),
total_parts: total,
transferred_parts: received,
complete: received >= total && total > 0,
});
}
for recv in &managed.incoming_resources {
if recv.flags.split {
continue;
}
let (received, total) = recv.progress();
entries.push(crate::event::ResourceInfoEntry {
link_id: *link_id,
direction: "incoming".to_string(),
total_parts: total,
transferred_parts: received,
complete: received >= total && total > 0,
});
}
for split in managed.outgoing_splits.values() {
let (sent, total) = Self::outgoing_split_progress(split, resource_sdu);
entries.push(crate::event::ResourceInfoEntry {
link_id: *link_id,
direction: "outgoing".to_string(),
total_parts: total,
transferred_parts: sent,
complete: sent >= total && total > 0,
});
}
for send in &managed.outgoing_resources {
if send.flags.split {
continue;
}
let total = send.total_parts();
let sent = send.sent_parts;
entries.push(crate::event::ResourceInfoEntry {
link_id: *link_id,
direction: "outgoing".to_string(),
total_parts: total,
transferred_parts: sent,
complete: sent >= total && total > 0,
});
}
}
entries
}
fn process_link_actions(
&self,
link_id: &LinkId,
actions: &[LinkAction],
) -> Vec<LinkManagerAction> {
let mut result = Vec::new();
for action in actions {
match action {
LinkAction::StateChanged {
new_state, reason, ..
} => match new_state {
LinkState::Closed => {
result.push(LinkManagerAction::LinkClosed {
link_id: *link_id,
reason: *reason,
});
}
_ => {}
},
LinkAction::LinkEstablished {
rtt, is_initiator, ..
} => {
let dest_hash = self
.links
.get(link_id)
.map(|l| l.dest_hash)
.unwrap_or([0u8; 16]);
result.push(LinkManagerAction::LinkEstablished {
link_id: *link_id,
dest_hash,
rtt: *rtt,
is_initiator: *is_initiator,
});
}
LinkAction::RemoteIdentified {
identity_hash,
public_key,
..
} => {
result.push(LinkManagerAction::RemoteIdentified {
link_id: *link_id,
identity_hash: *identity_hash,
public_key: *public_key,
});
}
LinkAction::DataReceived { .. } => {
}
}
}
result
}
fn process_channel_actions(
&mut self,
link_id: &LinkId,
actions: Vec<rns_core::channel::ChannelAction>,
rng: &mut dyn Rng,
) -> Vec<LinkManagerAction> {
let mut result = Vec::new();
for action in actions {
match action {
rns_core::channel::ChannelAction::SendOnLink { raw, sequence } => {
let encrypted = match self.links.get(link_id) {
Some(link) => match link.engine.encrypt(&raw, rng) {
Ok(encrypted) => encrypted,
Err(_) => continue,
},
None => continue,
};
let flags = PacketFlags {
header_type: constants::HEADER_1,
context_flag: constants::FLAG_UNSET,
transport_type: constants::TRANSPORT_BROADCAST,
destination_type: constants::DESTINATION_LINK,
packet_type: constants::PACKET_TYPE_DATA,
};
if let Ok((raw_bytes, packet_hash)) = RawPacket::pack_raw_with_hash(
flags,
0,
link_id,
None,
constants::CONTEXT_CHANNEL,
&encrypted,
) {
if let Some(link_mut) = self.links.get_mut(link_id) {
link_mut
.pending_channel_packets
.insert(packet_hash, sequence);
}
result.push(LinkManagerAction::SendPacket {
raw: raw_bytes,
dest_type: constants::DESTINATION_LINK,
attached_interface: None,
});
}
}
rns_core::channel::ChannelAction::MessageReceived {
msgtype, payload, ..
} => {
result.push(LinkManagerAction::ChannelMessageReceived {
link_id: *link_id,
msgtype,
payload,
});
}
rns_core::channel::ChannelAction::TeardownLink => {
result.push(LinkManagerAction::LinkClosed {
link_id: *link_id,
reason: Some(TeardownReason::Timeout),
});
}
}
}
result
}
}
impl Default for LinkManager {
fn default() -> Self {
Self::new()
}
}
fn compute_path_hash(path: &str) -> [u8; 16] {
let full = rns_core::hash::full_hash(path.as_bytes());
let mut result = [0u8; 16];
result.copy_from_slice(&full[..16]);
result
}
#[cfg(test)]
mod tests {
use super::*;
use rns_crypto::identity::Identity;
use rns_crypto::{FixedRng, OsRng};
fn make_rng(seed: u8) -> FixedRng {
FixedRng::new(&[seed; 128])
}
fn make_dest_keys(rng: &mut dyn Rng) -> (Ed25519PrivateKey, [u8; 32]) {
let sig_prv = Ed25519PrivateKey::generate(rng);
let sig_pub_bytes = sig_prv.public_key().public_bytes();
(sig_prv, sig_pub_bytes)
}
#[test]
fn test_register_link_destination() {
let mut mgr = LinkManager::new();
let mut rng = make_rng(0x01);
let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
let dest_hash = [0xDD; 16];
mgr.register_link_destination(
dest_hash,
sig_prv,
sig_pub_bytes,
ResourceStrategy::AcceptNone,
);
assert!(mgr.is_link_destination(&dest_hash));
mgr.deregister_link_destination(&dest_hash);
assert!(!mgr.is_link_destination(&dest_hash));
}
#[test]
fn test_create_link() {
let mut mgr = LinkManager::new();
let mut rng = OsRng;
let dest_hash = [0xDD; 16];
let sig_pub_bytes = [0xAA; 32]; let (link_id, actions) = mgr.create_link(
&dest_hash,
&sig_pub_bytes,
1,
constants::MTU as u32,
&mut rng,
);
assert_ne!(link_id, [0u8; 16]);
assert_eq!(actions.len(), 2);
assert!(matches!(
actions[0],
LinkManagerAction::RegisterLinkDest { .. }
));
assert!(matches!(actions[1], LinkManagerAction::SendPacket { .. }));
assert_eq!(mgr.link_state(&link_id), Some(LinkState::Pending));
}
#[test]
fn test_full_handshake_via_manager() {
let mut rng = OsRng;
let dest_hash = [0xDD; 16];
let mut responder_mgr = LinkManager::new();
let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
responder_mgr.register_link_destination(
dest_hash,
sig_prv,
sig_pub_bytes,
ResourceStrategy::AcceptNone,
);
let mut initiator_mgr = LinkManager::new();
let (link_id, init_actions) = initiator_mgr.create_link(
&dest_hash,
&sig_pub_bytes,
1,
constants::MTU as u32,
&mut rng,
);
assert_eq!(init_actions.len(), 2);
let linkrequest_raw = match &init_actions[1] {
LinkManagerAction::SendPacket { raw, .. } => raw.clone(),
_ => panic!("Expected SendPacket"),
};
let lr_packet = RawPacket::unpack(&linkrequest_raw).unwrap();
let resp_actions = responder_mgr.handle_local_delivery(
lr_packet.destination_hash,
&linkrequest_raw,
lr_packet.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
assert!(resp_actions.len() >= 2);
assert!(matches!(
resp_actions[0],
LinkManagerAction::RegisterLinkDest { .. }
));
let lrproof_raw = match &resp_actions[1] {
LinkManagerAction::SendPacket { raw, .. } => raw.clone(),
_ => panic!("Expected SendPacket for LRPROOF"),
};
let lrproof_packet = RawPacket::unpack(&lrproof_raw).unwrap();
let init_actions2 = initiator_mgr.handle_local_delivery(
lrproof_packet.destination_hash,
&lrproof_raw,
lrproof_packet.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
let has_established = init_actions2
.iter()
.any(|a| matches!(a, LinkManagerAction::LinkEstablished { .. }));
assert!(has_established, "Initiator should emit LinkEstablished");
let lrrtt_raw = init_actions2
.iter()
.find_map(|a| match a {
LinkManagerAction::SendPacket { raw, .. } => Some(raw.clone()),
_ => None,
})
.expect("Should have LRRTT SendPacket");
let lrrtt_packet = RawPacket::unpack(&lrrtt_raw).unwrap();
let resp_link_id = lrrtt_packet.destination_hash;
let resp_actions2 = responder_mgr.handle_local_delivery(
resp_link_id,
&lrrtt_raw,
lrrtt_packet.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
let has_established = resp_actions2
.iter()
.any(|a| matches!(a, LinkManagerAction::LinkEstablished { .. }));
assert!(has_established, "Responder should emit LinkEstablished");
assert_eq!(initiator_mgr.link_state(&link_id), Some(LinkState::Active));
assert_eq!(responder_mgr.link_state(&link_id), Some(LinkState::Active));
assert!(initiator_mgr.link_rtt(&link_id).is_some());
assert!(responder_mgr.link_rtt(&link_id).is_some());
}
#[test]
fn test_encrypted_data_exchange() {
let mut rng = OsRng;
let dest_hash = [0xDD; 16];
let mut resp_mgr = LinkManager::new();
let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
resp_mgr.register_link_destination(
dest_hash,
sig_prv,
sig_pub_bytes,
ResourceStrategy::AcceptNone,
);
let mut init_mgr = LinkManager::new();
let (link_id, init_actions) = init_mgr.create_link(
&dest_hash,
&sig_pub_bytes,
1,
constants::MTU as u32,
&mut rng,
);
let lr_raw = extract_send_packet(&init_actions);
let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
let resp_actions = resp_mgr.handle_local_delivery(
lr_pkt.destination_hash,
&lr_raw,
lr_pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
let init_actions2 = init_mgr.handle_local_delivery(
lrproof_pkt.destination_hash,
&lrproof_raw,
lrproof_pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
let lrrtt_raw = extract_any_send_packet(&init_actions2);
let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
resp_mgr.handle_local_delivery(
lrrtt_pkt.destination_hash,
&lrrtt_raw,
lrrtt_pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
let actions =
init_mgr.send_on_link(&link_id, b"hello link!", constants::CONTEXT_NONE, &mut rng);
assert_eq!(actions.len(), 1);
assert!(matches!(actions[0], LinkManagerAction::SendPacket { .. }));
}
#[test]
fn test_request_response() {
let mut rng = OsRng;
let dest_hash = [0xDD; 16];
let mut resp_mgr = LinkManager::new();
let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
resp_mgr.register_link_destination(
dest_hash,
sig_prv,
sig_pub_bytes,
ResourceStrategy::AcceptNone,
);
resp_mgr.register_request_handler("/status", None, |_link_id, _path, _data, _remote| {
Some(b"OK".to_vec())
});
let mut init_mgr = LinkManager::new();
let (link_id, init_actions) = init_mgr.create_link(
&dest_hash,
&sig_pub_bytes,
1,
constants::MTU as u32,
&mut rng,
);
let lr_raw = extract_send_packet(&init_actions);
let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
let resp_actions = resp_mgr.handle_local_delivery(
lr_pkt.destination_hash,
&lr_raw,
lr_pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
let init_actions2 = init_mgr.handle_local_delivery(
lrproof_pkt.destination_hash,
&lrproof_raw,
lrproof_pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
let lrrtt_raw = extract_any_send_packet(&init_actions2);
let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
resp_mgr.handle_local_delivery(
lrrtt_pkt.destination_hash,
&lrrtt_raw,
lrrtt_pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
let req_actions = init_mgr.send_request(&link_id, "/status", b"query", &mut rng);
assert_eq!(req_actions.len(), 1);
let req_raw = extract_send_packet_from(&req_actions);
let req_pkt = RawPacket::unpack(&req_raw).unwrap();
let resp_actions = resp_mgr.handle_local_delivery(
req_pkt.destination_hash,
&req_raw,
req_pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
let has_response = resp_actions
.iter()
.any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
assert!(has_response, "Handler should produce a response packet");
}
#[test]
fn test_send_request_wraps_invalid_msgpack_data_as_bin() {
use std::sync::{Arc, Mutex};
let (init_mgr, mut resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
let invalid = vec![0xC1];
let expected = rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(invalid.clone()));
let captured = Arc::new(Mutex::new(None::<Vec<u8>>));
let captured_for_handler = Arc::clone(&captured);
resp_mgr.register_request_handler("/bin", None, move |_link_id, _path, data, _remote| {
*captured_for_handler.lock().unwrap() = Some(data.to_vec());
Some(rns_core::msgpack::pack(&rns_core::msgpack::Value::Bool(
true,
)))
});
let req_actions = init_mgr.send_request(&link_id, "/bin", &invalid, &mut rng);
let req_raw = extract_send_packet_from(&req_actions);
let req_pkt = RawPacket::unpack(&req_raw).unwrap();
let resp_actions = resp_mgr.handle_local_delivery(
req_pkt.destination_hash,
&req_raw,
req_pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
assert!(
resp_actions
.iter()
.any(|a| matches!(a, LinkManagerAction::SendPacket { .. })),
"handler should still produce a response"
);
assert_eq!(*captured.lock().unwrap(), Some(expected));
}
#[test]
fn test_invalid_response_bytes_are_returned_as_msgpack_bin() {
let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
let invalid_response = vec![0xC1];
let expected =
rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(invalid_response.clone()));
resp_mgr.register_request_handler("/invalid-response", None, {
let invalid_response = invalid_response.clone();
move |_link_id, _path, _data, _remote| Some(invalid_response.clone())
});
let req_actions = init_mgr.send_request(&link_id, "/invalid-response", b"\xc0", &mut rng);
let req_raw = extract_send_packet_from(&req_actions);
let req_pkt = RawPacket::unpack(&req_raw).unwrap();
let resp_actions = resp_mgr.handle_local_delivery(
req_pkt.destination_hash,
&req_raw,
req_pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
let resp_raw = extract_any_send_packet(&resp_actions);
let resp_pkt = RawPacket::unpack(&resp_raw).unwrap();
let init_actions = init_mgr.handle_local_delivery(
resp_pkt.destination_hash,
&resp_raw,
resp_pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
let response_data = init_actions
.iter()
.find_map(|action| match action {
LinkManagerAction::ResponseReceived { data, .. } => Some(data.clone()),
_ => None,
})
.expect("initiator should receive a response");
assert_eq!(response_data, expected);
}
#[test]
fn test_request_acl_deny_unidentified() {
let mut rng = OsRng;
let dest_hash = [0xDD; 16];
let mut resp_mgr = LinkManager::new();
let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
resp_mgr.register_link_destination(
dest_hash,
sig_prv,
sig_pub_bytes,
ResourceStrategy::AcceptNone,
);
resp_mgr.register_request_handler(
"/restricted",
Some(vec![[0xAA; 16]]),
|_link_id, _path, _data, _remote| Some(b"secret".to_vec()),
);
let mut init_mgr = LinkManager::new();
let (link_id, init_actions) = init_mgr.create_link(
&dest_hash,
&sig_pub_bytes,
1,
constants::MTU as u32,
&mut rng,
);
let lr_raw = extract_send_packet(&init_actions);
let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
let resp_actions = resp_mgr.handle_local_delivery(
lr_pkt.destination_hash,
&lr_raw,
lr_pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
let init_actions2 = init_mgr.handle_local_delivery(
lrproof_pkt.destination_hash,
&lrproof_raw,
lrproof_pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
let lrrtt_raw = extract_any_send_packet(&init_actions2);
let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
resp_mgr.handle_local_delivery(
lrrtt_pkt.destination_hash,
&lrrtt_raw,
lrrtt_pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
let req_actions = init_mgr.send_request(&link_id, "/restricted", b"query", &mut rng);
let req_raw = extract_send_packet_from(&req_actions);
let req_pkt = RawPacket::unpack(&req_raw).unwrap();
let resp_actions = resp_mgr.handle_local_delivery(
req_pkt.destination_hash,
&req_raw,
req_pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
let has_response = resp_actions
.iter()
.any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
assert!(!has_response, "Unidentified peer should be denied");
}
#[test]
fn test_teardown_link() {
let mut rng = OsRng;
let dest_hash = [0xDD; 16];
let mut mgr = LinkManager::new();
let dummy_sig = [0xAA; 32];
let (link_id, _) =
mgr.create_link(&dest_hash, &dummy_sig, 1, constants::MTU as u32, &mut rng);
assert_eq!(mgr.link_count(), 1);
let actions = mgr.teardown_link(&link_id);
let has_close = actions
.iter()
.any(|a| matches!(a, LinkManagerAction::LinkClosed { .. }));
assert!(has_close);
let tick_actions = mgr.tick(&mut rng);
let has_deregister = tick_actions
.iter()
.any(|a| matches!(a, LinkManagerAction::DeregisterLinkDest { .. }));
assert!(has_deregister);
assert_eq!(mgr.link_count(), 0);
}
#[test]
fn test_identify_on_link() {
let mut rng = OsRng;
let dest_hash = [0xDD; 16];
let mut resp_mgr = LinkManager::new();
let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
resp_mgr.register_link_destination(
dest_hash,
sig_prv,
sig_pub_bytes,
ResourceStrategy::AcceptNone,
);
let mut init_mgr = LinkManager::new();
let (link_id, init_actions) = init_mgr.create_link(
&dest_hash,
&sig_pub_bytes,
1,
constants::MTU as u32,
&mut rng,
);
let lr_raw = extract_send_packet(&init_actions);
let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
let resp_actions = resp_mgr.handle_local_delivery(
lr_pkt.destination_hash,
&lr_raw,
lr_pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
let init_actions2 = init_mgr.handle_local_delivery(
lrproof_pkt.destination_hash,
&lrproof_raw,
lrproof_pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
let lrrtt_raw = extract_any_send_packet(&init_actions2);
let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
resp_mgr.handle_local_delivery(
lrrtt_pkt.destination_hash,
&lrrtt_raw,
lrrtt_pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
let identity = Identity::new(&mut rng);
let id_actions = init_mgr.identify(&link_id, &identity, &mut rng);
assert_eq!(id_actions.len(), 1);
let id_raw = extract_send_packet_from(&id_actions);
let id_pkt = RawPacket::unpack(&id_raw).unwrap();
let resp_actions = resp_mgr.handle_local_delivery(
id_pkt.destination_hash,
&id_raw,
id_pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
let has_identified = resp_actions
.iter()
.any(|a| matches!(a, LinkManagerAction::RemoteIdentified { .. }));
assert!(has_identified, "Responder should emit RemoteIdentified");
}
#[test]
fn test_path_hash_computation() {
let h1 = compute_path_hash("/status");
let h2 = compute_path_hash("/path");
assert_ne!(h1, h2);
assert_eq!(h1, compute_path_hash("/status"));
}
#[test]
fn test_link_count() {
let mut mgr = LinkManager::new();
let mut rng = OsRng;
assert_eq!(mgr.link_count(), 0);
let dummy_sig = [0xAA; 32];
mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
assert_eq!(mgr.link_count(), 1);
mgr.create_link(&[0x22; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
assert_eq!(mgr.link_count(), 2);
}
fn extract_send_packet(actions: &[LinkManagerAction]) -> Vec<u8> {
extract_send_packet_at(actions, actions.len() - 1)
}
fn extract_send_packet_at(actions: &[LinkManagerAction], idx: usize) -> Vec<u8> {
match &actions[idx] {
LinkManagerAction::SendPacket { raw, .. } => raw.clone(),
other => panic!("Expected SendPacket at index {}, got {:?}", idx, other),
}
}
fn extract_any_send_packet(actions: &[LinkManagerAction]) -> Vec<u8> {
actions
.iter()
.find_map(|a| match a {
LinkManagerAction::SendPacket { raw, .. } => Some(raw.clone()),
_ => None,
})
.expect("Expected at least one SendPacket action")
}
fn extract_send_packet_from(actions: &[LinkManagerAction]) -> Vec<u8> {
extract_any_send_packet(actions)
}
fn setup_active_link() -> (LinkManager, LinkManager, LinkId) {
let mut rng = OsRng;
let dest_hash = [0xDD; 16];
let mut resp_mgr = LinkManager::new();
let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
resp_mgr.register_link_destination(
dest_hash,
sig_prv,
sig_pub_bytes,
ResourceStrategy::AcceptNone,
);
let mut init_mgr = LinkManager::new();
let (link_id, init_actions) = init_mgr.create_link(
&dest_hash,
&sig_pub_bytes,
1,
constants::MTU as u32,
&mut rng,
);
let lr_raw = extract_send_packet(&init_actions);
let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
let resp_actions = resp_mgr.handle_local_delivery(
lr_pkt.destination_hash,
&lr_raw,
lr_pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
let init_actions2 = init_mgr.handle_local_delivery(
lrproof_pkt.destination_hash,
&lrproof_raw,
lrproof_pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
let lrrtt_raw = extract_any_send_packet(&init_actions2);
let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
resp_mgr.handle_local_delivery(
lrrtt_pkt.destination_hash,
&lrrtt_raw,
lrrtt_pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
assert_eq!(init_mgr.link_state(&link_id), Some(LinkState::Active));
assert_eq!(resp_mgr.link_state(&link_id), Some(LinkState::Active));
(init_mgr, resp_mgr, link_id)
}
#[test]
fn test_resource_strategy_default() {
let mut mgr = LinkManager::new();
let mut rng = OsRng;
let dummy_sig = [0xAA; 32];
let (link_id, _) =
mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
let link = mgr.links.get(&link_id).unwrap();
assert_eq!(link.resource_strategy, ResourceStrategy::AcceptNone);
}
#[test]
fn test_set_resource_strategy() {
let mut mgr = LinkManager::new();
let mut rng = OsRng;
let dummy_sig = [0xAA; 32];
let (link_id, _) =
mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
assert_eq!(
mgr.links.get(&link_id).unwrap().resource_strategy,
ResourceStrategy::AcceptAll
);
mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
assert_eq!(
mgr.links.get(&link_id).unwrap().resource_strategy,
ResourceStrategy::AcceptApp
);
}
#[test]
fn test_send_resource_on_active_link() {
let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
let data = vec![0xAB; 100]; let actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
let has_send = actions
.iter()
.any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
assert!(
has_send,
"send_resource should emit advertisement SendPacket"
);
}
fn first_resource_advertisement(
mgr: &LinkManager,
link_id: &LinkId,
actions: &[LinkManagerAction],
) -> rns_core::resource::ResourceAdvertisement {
let adv_raw = actions
.iter()
.find_map(|action| match action {
LinkManagerAction::SendPacket { raw, .. } => {
let pkt = RawPacket::unpack(raw).ok()?;
(pkt.context == constants::CONTEXT_RESOURCE_ADV).then_some(raw)
}
_ => None,
})
.expect("sender should emit a resource advertisement");
let adv_pkt = RawPacket::unpack(adv_raw).unwrap();
let plaintext = mgr
.links
.get(link_id)
.unwrap()
.engine
.decrypt(&adv_pkt.data)
.unwrap();
rns_core::resource::ResourceAdvertisement::unpack(&plaintext).unwrap()
}
fn deterministic_bytes(len: usize) -> Vec<u8> {
let mut state = 0x1234_5678u32;
(0..len)
.map(|_| {
state = state.wrapping_mul(1664525).wrapping_add(1013904223);
(state >> 16) as u8
})
.collect()
}
fn drive_link_manager_packets(
init_mgr: &mut LinkManager,
resp_mgr: &mut LinkManager,
initial_actions: Vec<LinkManagerAction>,
initial_source: char,
rng: &mut dyn Rng,
max_rounds: usize,
) -> (
Option<Vec<u8>>,
bool,
Vec<(char, usize, usize)>,
Vec<(char, String)>,
usize,
) {
let mut pending: Vec<(char, LinkManagerAction)> = initial_actions
.into_iter()
.map(|a| (initial_source, a))
.collect();
let mut rounds = 0;
let mut received_data = None;
let mut sender_completed = false;
let mut progress = Vec::new();
let mut failures = Vec::new();
while !pending.is_empty() && rounds < max_rounds {
rounds += 1;
let mut next = Vec::new();
for (source, action) in pending.drain(..) {
let LinkManagerAction::SendPacket { raw, .. } = action else {
continue;
};
let pkt = RawPacket::unpack(&raw).unwrap();
let target_actions = if source == 'i' {
resp_mgr.handle_local_delivery(
pkt.destination_hash,
&raw,
pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
rng,
)
} else {
init_mgr.handle_local_delivery(
pkt.destination_hash,
&raw,
pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
rng,
)
};
let target_source = if source == 'i' { 'r' } else { 'i' };
for action in &target_actions {
match action {
LinkManagerAction::ResourceReceived { data, .. } => {
received_data = Some(data.clone());
}
LinkManagerAction::ResourceCompleted { .. } => {
sender_completed = true;
}
LinkManagerAction::ResourceProgress {
received, total, ..
} => {
progress.push((target_source, *received, *total));
}
LinkManagerAction::ResourceFailed { error, .. } => {
failures.push((target_source, error.clone()));
}
_ => {}
}
}
next.extend(target_actions.into_iter().map(|a| (target_source, a)));
}
pending = next;
}
(received_data, sender_completed, progress, failures, rounds)
}
#[test]
fn test_send_resource_auto_compress_option_controls_adv_flag() {
let data = vec![0x41; 2048];
let (mut compressed_mgr, _resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
let actions =
compressed_mgr.send_resource_with_auto_compress(&link_id, &data, None, true, &mut rng);
let adv = first_resource_advertisement(&compressed_mgr, &link_id, &actions);
assert!(
adv.flags.compressed,
"compressible resource should compress"
);
let (mut plain_mgr, _resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
let actions =
plain_mgr.send_resource_with_auto_compress(&link_id, &data, None, false, &mut rng);
let adv = first_resource_advertisement(&plain_mgr, &link_id, &actions);
assert!(
!adv.flags.compressed,
"auto_compress=false should keep resource uncompressed"
);
}
#[test]
fn test_send_resource_on_inactive_link() {
let mut mgr = LinkManager::new();
let mut rng = OsRng;
let dummy_sig = [0xAA; 32];
let (link_id, _) =
mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
let actions = mgr.send_resource(&link_id, b"data", None, &mut rng);
assert!(actions.is_empty(), "Cannot send resource on inactive link");
}
#[test]
fn test_send_resource_without_session_key_uses_encrypt_fallback_path() {
let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
init_mgr
.links
.get_mut(&link_id)
.unwrap()
.engine
.clear_session_for_testing();
let actions = init_mgr.send_resource(&link_id, b"data", None, &mut rng);
assert!(
actions.is_empty(),
"without a session key, no advertisement should be emitted"
);
assert_eq!(
init_mgr
.links
.get(&link_id)
.map(|managed| managed.outgoing_resources.len()),
Some(1)
);
}
#[test]
fn test_resource_adv_rejected_by_accept_none() {
let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
let data = vec![0xCD; 100];
let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
for action in &adv_actions {
if let LinkManagerAction::SendPacket { raw, .. } = action {
let pkt = RawPacket::unpack(raw).unwrap();
let resp_actions = resp_mgr.handle_local_delivery(
pkt.destination_hash,
raw,
pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
let has_resource_received = resp_actions
.iter()
.any(|a| matches!(a, LinkManagerAction::ResourceReceived { .. }));
assert!(
!has_resource_received,
"AcceptNone should not accept resource"
);
}
}
}
#[test]
fn test_resource_adv_accepted_by_accept_all() {
let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
let data = vec![0xCD; 100];
let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
for action in &adv_actions {
if let LinkManagerAction::SendPacket { raw, .. } = action {
let pkt = RawPacket::unpack(raw).unwrap();
let resp_actions = resp_mgr.handle_local_delivery(
pkt.destination_hash,
raw,
pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
let has_send = resp_actions
.iter()
.any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
assert!(has_send, "AcceptAll should accept and request parts");
}
}
}
#[test]
fn test_resource_accept_app_query() {
let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
let data = vec![0xCD; 100];
let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
let mut got_query = false;
for action in &adv_actions {
if let LinkManagerAction::SendPacket { raw, .. } = action {
let pkt = RawPacket::unpack(raw).unwrap();
let resp_actions = resp_mgr.handle_local_delivery(
pkt.destination_hash,
raw,
pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
for a in &resp_actions {
if matches!(a, LinkManagerAction::ResourceAcceptQuery { .. }) {
got_query = true;
}
}
}
}
assert!(got_query, "AcceptApp should emit ResourceAcceptQuery");
}
#[test]
fn test_resource_accept_app_accept() {
let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
let data = vec![0xCD; 100];
let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
for action in &adv_actions {
if let LinkManagerAction::SendPacket { raw, .. } = action {
let pkt = RawPacket::unpack(raw).unwrap();
let resp_actions = resp_mgr.handle_local_delivery(
pkt.destination_hash,
raw,
pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
for a in &resp_actions {
if let LinkManagerAction::ResourceAcceptQuery {
link_id: lid,
resource_hash,
..
} = a
{
let accept_actions =
resp_mgr.accept_resource(lid, resource_hash, true, &mut rng);
let has_send = accept_actions
.iter()
.any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
assert!(
has_send,
"Accepting resource should produce request for parts"
);
}
}
}
}
}
#[test]
fn test_resource_accept_app_reject() {
let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
let data = vec![0xCD; 100];
let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
for action in &adv_actions {
if let LinkManagerAction::SendPacket { raw, .. } = action {
let pkt = RawPacket::unpack(raw).unwrap();
let resp_actions = resp_mgr.handle_local_delivery(
pkt.destination_hash,
raw,
pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
for a in &resp_actions {
if let LinkManagerAction::ResourceAcceptQuery {
link_id: lid,
resource_hash,
..
} = a
{
let reject_actions =
resp_mgr.accept_resource(lid, resource_hash, false, &mut rng);
let has_resource_received = reject_actions
.iter()
.any(|a| matches!(a, LinkManagerAction::ResourceReceived { .. }));
assert!(!has_resource_received);
}
}
}
}
}
#[test]
fn test_resource_full_transfer() {
let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
let original_data = b"Hello, Resource Transfer!".to_vec();
let adv_actions = init_mgr.send_resource(&link_id, &original_data, None, &mut rng);
let mut pending: Vec<(char, LinkManagerAction)> =
adv_actions.into_iter().map(|a| ('i', a)).collect();
let mut rounds = 0;
let max_rounds = 50;
let mut resource_received = false;
let mut sender_completed = false;
while !pending.is_empty() && rounds < max_rounds {
rounds += 1;
let mut next: Vec<(char, LinkManagerAction)> = Vec::new();
for (source, action) in pending.drain(..) {
if let LinkManagerAction::SendPacket { raw, .. } = action {
let pkt = RawPacket::unpack(&raw).unwrap();
let target_actions = if source == 'i' {
resp_mgr.handle_local_delivery(
pkt.destination_hash,
&raw,
pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
)
} else {
init_mgr.handle_local_delivery(
pkt.destination_hash,
&raw,
pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
)
};
let target_source = if source == 'i' { 'r' } else { 'i' };
for a in &target_actions {
match a {
LinkManagerAction::ResourceReceived { data, .. } => {
assert_eq!(*data, original_data);
resource_received = true;
}
LinkManagerAction::ResourceCompleted { .. } => {
sender_completed = true;
}
_ => {}
}
}
next.extend(target_actions.into_iter().map(|a| (target_source, a)));
}
}
pending = next;
}
assert!(
resource_received,
"Responder should receive resource data (rounds={})",
rounds
);
assert!(
sender_completed,
"Sender should get completion proof (rounds={})",
rounds
);
}
#[test]
fn test_split_resource_advertisement_and_progress_entries() {
let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
let data = deterministic_bytes(constants::RESOURCE_MAX_EFFICIENT_SIZE + 1024);
let actions =
init_mgr.send_resource_with_auto_compress(&link_id, &data, None, false, &mut rng);
let adv = first_resource_advertisement(&init_mgr, &link_id, &actions);
assert!(adv.flags.split);
assert_eq!(adv.segment_index, 1);
assert_eq!(adv.total_segments, 2);
assert_eq!(adv.data_size, data.len() as u64);
let managed = init_mgr.links.get(&link_id).unwrap();
assert_eq!(managed.outgoing_splits.len(), 1);
assert_eq!(
managed
.outgoing_resources
.iter()
.filter(|sender| sender.flags.split)
.count(),
2
);
let entries = init_mgr.resource_entries();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].direction, "outgoing");
assert!(entries[0].total_parts > managed.outgoing_resources[0].total_parts());
assert_eq!(entries[0].transferred_parts, 0);
}
#[test]
fn test_split_resource_full_transfer_and_monotonic_progress() {
let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
let original_data = deterministic_bytes(constants::RESOURCE_MAX_EFFICIENT_SIZE + 2048);
let initial_actions = init_mgr.send_resource_with_auto_compress(
&link_id,
&original_data,
None,
false,
&mut rng,
);
let (received_data, sender_completed, progress, failures, rounds) =
drive_link_manager_packets(
&mut init_mgr,
&mut resp_mgr,
initial_actions,
'i',
&mut rng,
10_000,
);
assert!(
received_data.as_ref().is_some_and(|data| data == &original_data),
"split transfer did not deliver payload in {rounds} rounds; sender_completed={sender_completed}; failures={failures:?}; last_progress={:?}; init_entries={:?}; resp_entries={:?}",
progress.last(),
init_mgr.resource_entries(),
resp_mgr.resource_entries()
);
assert!(
sender_completed,
"sender did not complete in {rounds} rounds"
);
assert!(
progress
.iter()
.any(|(_, received, total)| received == total),
"expected final progress update"
);
let mut init_last = 0;
let mut resp_last = 0;
for (side, received, total) in progress {
assert!(received <= total);
match side {
'i' => {
assert!(
received >= init_last,
"initiator progress regressed from {init_last} to {received}"
);
init_last = received;
}
'r' => {
assert!(
received >= resp_last,
"responder progress regressed from {resp_last} to {received}"
);
resp_last = received;
}
_ => unreachable!(),
}
}
}
#[test]
fn test_split_resource_accept_app_queries_only_first_segment() {
let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
let original_data = deterministic_bytes(constants::RESOURCE_MAX_EFFICIENT_SIZE + 1024);
let adv_actions = init_mgr.send_resource_with_auto_compress(
&link_id,
&original_data,
None,
false,
&mut rng,
);
let adv_raw = extract_any_send_packet(&adv_actions);
let adv_pkt = RawPacket::unpack(&adv_raw).unwrap();
let query_actions = resp_mgr.handle_local_delivery(
adv_pkt.destination_hash,
&adv_raw,
adv_pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
let queries: Vec<_> = query_actions
.iter()
.filter_map(|action| match action {
LinkManagerAction::ResourceAcceptQuery { resource_hash, .. } => {
Some(resource_hash.clone())
}
_ => None,
})
.collect();
assert_eq!(queries.len(), 1);
let accept_actions = resp_mgr.accept_resource(&link_id, &queries[0], true, &mut rng);
let (received_data, sender_completed, _progress, failures, rounds) =
drive_link_manager_packets(
&mut init_mgr,
&mut resp_mgr,
accept_actions,
'r',
&mut rng,
10_000,
);
assert!(
failures.is_empty(),
"split AcceptApp transfer failed: {failures:?}"
);
assert!(
received_data
.as_ref()
.is_some_and(|data| data == &original_data),
"split AcceptApp transfer did not deliver in {rounds} rounds"
);
assert!(sender_completed);
}
#[test]
fn test_resource_cancel_icl() {
let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
let data = vec![0xAB; 2000];
let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
for action in &adv_actions {
if let LinkManagerAction::SendPacket { raw, .. } = action {
let pkt = RawPacket::unpack(raw).unwrap();
resp_mgr.handle_local_delivery(
pkt.destination_hash,
raw,
pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
}
}
assert!(!resp_mgr
.links
.get(&link_id)
.unwrap()
.incoming_resources
.is_empty());
let icl_actions = resp_mgr.handle_resource_icl(&link_id);
let has_failed = icl_actions
.iter()
.any(|a| matches!(a, LinkManagerAction::ResourceFailed { .. }));
assert!(has_failed, "ICL should produce ResourceFailed");
}
#[test]
fn test_resource_cancel_rcl() {
let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
let data = vec![0xAB; 2000];
init_mgr.send_resource(&link_id, &data, None, &mut rng);
assert!(!init_mgr
.links
.get(&link_id)
.unwrap()
.outgoing_resources
.is_empty());
let rcl_actions = init_mgr.handle_resource_rcl(&link_id);
let has_failed = rcl_actions
.iter()
.any(|a| matches!(a, LinkManagerAction::ResourceFailed { .. }));
assert!(has_failed, "RCL should produce ResourceFailed");
}
#[test]
fn test_cancel_all_resources_clears_active_transfers() {
let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
let actions = init_mgr.send_resource(&link_id, b"resource body", None, &mut rng);
assert!(!actions.is_empty());
assert_eq!(init_mgr.resource_transfer_count(), 1);
let cancel_actions = init_mgr.cancel_all_resources(&mut rng);
assert_eq!(init_mgr.resource_transfer_count(), 0);
assert!(cancel_actions
.iter()
.any(|action| matches!(action, LinkManagerAction::SendPacket { .. })));
}
#[test]
fn test_resource_tick_cleans_up() {
let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
let data = vec![0xAB; 100];
init_mgr.send_resource(&link_id, &data, None, &mut rng);
assert!(!init_mgr
.links
.get(&link_id)
.unwrap()
.outgoing_resources
.is_empty());
init_mgr.handle_resource_rcl(&link_id);
init_mgr.tick(&mut rng);
assert!(
init_mgr
.links
.get(&link_id)
.unwrap()
.outgoing_resources
.is_empty(),
"Tick should clean up completed/failed outgoing resources"
);
}
#[test]
fn test_build_link_packet() {
let (init_mgr, _resp_mgr, link_id) = setup_active_link();
let actions =
init_mgr.build_link_packet(&link_id, constants::CONTEXT_RESOURCE, b"test data");
assert_eq!(actions.len(), 1);
if let LinkManagerAction::SendPacket { raw, dest_type, .. } = &actions[0] {
let pkt = RawPacket::unpack(raw).unwrap();
assert_eq!(pkt.context, constants::CONTEXT_RESOURCE);
assert_eq!(*dest_type, constants::DESTINATION_LINK);
} else {
panic!("Expected SendPacket");
}
}
#[test]
fn test_build_link_packet_returns_empty_when_mtu_too_small() {
let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
init_mgr.set_link_mtu(&link_id, 84);
let actions =
init_mgr.build_link_packet(&link_id, constants::CONTEXT_RESOURCE, &[0xAA; 200]);
assert!(actions.is_empty(), "oversized packet should not be built");
}
#[test]
fn test_process_resource_actions_encrypted_variants_drop_on_encrypt_failure() {
let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
init_mgr
.links
.get_mut(&link_id)
.unwrap()
.engine
.clear_session_for_testing();
let cases = vec![
ResourceAction::SendAdvertisement(vec![1, 2, 3]),
ResourceAction::SendRequest(vec![4, 5, 6]),
ResourceAction::SendHmu(vec![7, 8, 9]),
ResourceAction::SendProof(vec![10, 11, 12]),
ResourceAction::SendCancelInitiator(vec![13, 14, 15]),
ResourceAction::SendCancelReceiver(vec![16, 17, 18]),
];
for action in cases {
let out = init_mgr.process_resource_actions(&link_id, vec![action], &mut rng);
assert!(
out.is_empty(),
"encrypt failure should suppress packet emission"
);
}
}
#[test]
fn test_channel_message_delivery() {
let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
let chan_actions = init_mgr
.send_channel_message(&link_id, 42, b"channel data", &mut rng)
.expect("active link channel send should succeed");
assert!(!chan_actions.is_empty());
let mut got_channel_msg = false;
for action in &chan_actions {
if let LinkManagerAction::SendPacket { raw, .. } = action {
let pkt = RawPacket::unpack(raw).unwrap();
let resp_actions = resp_mgr.handle_local_delivery(
pkt.destination_hash,
raw,
pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
for a in &resp_actions {
if let LinkManagerAction::ChannelMessageReceived {
msgtype, payload, ..
} = a
{
assert_eq!(*msgtype, 42);
assert_eq!(*payload, b"channel data");
got_channel_msg = true;
}
}
}
}
assert!(got_channel_msg, "Responder should receive channel message");
}
#[test]
fn test_channel_send_drops_packet_when_encrypt_fails() {
let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
init_mgr
.links
.get_mut(&link_id)
.unwrap()
.engine
.clear_session_for_testing();
let actions = init_mgr
.send_channel_message(&link_id, 42, b"channel data", &mut rng)
.expect("channel should still accept the message locally");
assert!(
actions.is_empty(),
"encrypt failure should suppress channel packet"
);
assert!(
init_mgr
.links
.get(&link_id)
.unwrap()
.pending_channel_packets
.is_empty(),
"failed packet encryption must not track a pending channel proof"
);
}
#[test]
fn test_channel_proof_reopens_send_window() {
let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
init_mgr
.send_channel_message(&link_id, 42, b"first", &mut rng)
.expect("first send should succeed");
init_mgr
.send_channel_message(&link_id, 42, b"second", &mut rng)
.expect("second send should succeed");
let err = init_mgr
.send_channel_message(&link_id, 42, b"third", &mut rng)
.expect_err("third send should hit the initial channel window");
assert_eq!(err, "Channel is not ready to send");
let queued_packets = init_mgr
.links
.get(&link_id)
.unwrap()
.pending_channel_packets
.clone();
assert_eq!(queued_packets.len(), 2);
for tracked_hash in queued_packets.keys().take(1) {
let mut proof_data = Vec::with_capacity(96);
proof_data.extend_from_slice(tracked_hash);
proof_data.extend_from_slice(&[0x11; 64]);
let flags = PacketFlags {
header_type: constants::HEADER_1,
context_flag: constants::FLAG_UNSET,
transport_type: constants::TRANSPORT_BROADCAST,
destination_type: constants::DESTINATION_LINK,
packet_type: constants::PACKET_TYPE_PROOF,
};
let proof = RawPacket::pack(
flags,
0,
&link_id,
None,
constants::CONTEXT_NONE,
&proof_data,
)
.expect("proof packet should pack");
let ack_actions = init_mgr.handle_local_delivery(
link_id,
&proof.raw,
proof.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
assert!(
ack_actions.is_empty(),
"proof delivery should only update channel state"
);
}
init_mgr
.send_channel_message(&link_id, 42, b"third", &mut rng)
.expect("proof should free one channel slot");
}
#[test]
fn test_generic_link_data_delivery() {
let (init_mgr, mut resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
let actions = init_mgr.send_on_link(&link_id, b"raw stuff", 0x42, &mut rng);
assert_eq!(actions.len(), 1);
let raw = extract_any_send_packet(&actions);
let pkt = RawPacket::unpack(&raw).unwrap();
let resp_actions = resp_mgr.handle_local_delivery(
pkt.destination_hash,
&raw,
pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
let has_data = resp_actions
.iter()
.any(|a| matches!(a, LinkManagerAction::LinkDataReceived { context: 0x42, .. }));
assert!(
has_data,
"Responder should receive LinkDataReceived for unknown context"
);
}
#[test]
fn test_invalid_encrypted_contexts_are_ignored() {
let (_init_mgr, mut resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
let contexts = [
constants::CONTEXT_CHANNEL,
constants::CONTEXT_REQUEST,
constants::CONTEXT_RESPONSE,
constants::CONTEXT_RESOURCE_ADV,
constants::CONTEXT_RESOURCE_REQ,
constants::CONTEXT_RESOURCE_HMU,
constants::CONTEXT_RESOURCE_PRF,
0x42,
];
for context in contexts {
let flags = PacketFlags {
header_type: constants::HEADER_1,
context_flag: constants::FLAG_UNSET,
transport_type: constants::TRANSPORT_BROADCAST,
destination_type: constants::DESTINATION_LINK,
packet_type: constants::PACKET_TYPE_DATA,
};
let pkt = RawPacket::pack(flags, 0, &link_id, None, context, b"invalid-ciphertext")
.expect("test packet should pack");
let actions = resp_mgr.handle_local_delivery(
pkt.destination_hash,
&pkt.raw,
pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
assert!(
actions.is_empty(),
"invalid ciphertext for context {context:#x} should be ignored"
);
}
}
#[test]
fn test_resource_part_without_matching_receiver_is_ignored() {
let (_init_mgr, mut resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
let flags = PacketFlags {
header_type: constants::HEADER_1,
context_flag: constants::FLAG_UNSET,
transport_type: constants::TRANSPORT_BROADCAST,
destination_type: constants::DESTINATION_LINK,
packet_type: constants::PACKET_TYPE_DATA,
};
let pkt = RawPacket::pack(
flags,
0,
&link_id,
None,
constants::CONTEXT_RESOURCE,
b"orphan-part",
)
.expect("test packet should pack");
let actions = resp_mgr.handle_local_delivery(
pkt.destination_hash,
&pkt.raw,
pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
assert!(actions.is_empty(), "orphan resource part should be ignored");
}
#[test]
fn test_response_delivery() {
let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
resp_mgr.register_request_handler("/echo", None, |_link_id, _path, data, _remote| {
Some(data.to_vec())
});
let req_actions = init_mgr.send_request(&link_id, "/echo", b"\xc0", &mut rng); assert!(!req_actions.is_empty());
let req_raw = extract_any_send_packet(&req_actions);
let req_pkt = RawPacket::unpack(&req_raw).unwrap();
let resp_actions = resp_mgr.handle_local_delivery(
req_pkt.destination_hash,
&req_raw,
req_pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
let has_resp_send = resp_actions
.iter()
.any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
assert!(has_resp_send, "Handler should produce response");
let resp_raw = extract_any_send_packet(&resp_actions);
let resp_pkt = RawPacket::unpack(&resp_raw).unwrap();
let init_actions = init_mgr.handle_local_delivery(
resp_pkt.destination_hash,
&resp_raw,
resp_pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
let has_response_received = init_actions
.iter()
.any(|a| matches!(a, LinkManagerAction::ResponseReceived { .. }));
assert!(
has_response_received,
"Initiator should receive ResponseReceived"
);
}
#[test]
fn test_large_response_uses_resource_fallback() {
let (init_mgr, mut resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
let large_payload: Vec<u8> = (0..5000u32).map(|i| (i & 0xFF) as u8).collect();
resp_mgr.register_request_handler("/large", None, {
let large_payload = large_payload.clone();
move |_link_id, _path, _data, _remote| Some(large_payload.clone())
});
let req_actions = init_mgr.send_request(&link_id, "/large", b"\xc0", &mut rng);
assert!(!req_actions.is_empty());
let req_raw = extract_any_send_packet(&req_actions);
let req_pkt = RawPacket::unpack(&req_raw).unwrap();
let resp_actions = resp_mgr.handle_local_delivery(
req_pkt.destination_hash,
&req_raw,
req_pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
let mut has_resource_adv = false;
let mut has_direct_response = false;
for action in &resp_actions {
if let LinkManagerAction::SendPacket { raw, .. } = action {
let pkt = RawPacket::unpack(raw).unwrap();
if pkt.context == constants::CONTEXT_RESOURCE_ADV {
has_resource_adv = true;
}
if pkt.context == constants::CONTEXT_RESPONSE {
has_direct_response = true;
}
}
}
assert!(
has_resource_adv,
"Large response should advertise a response resource"
);
assert!(
!has_direct_response,
"Large response should not use direct CONTEXT_RESPONSE packet"
);
}
#[test]
fn test_send_management_response_without_session_key_uses_resource_fallback_path() {
let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
init_mgr
.links
.get_mut(&link_id)
.unwrap()
.engine
.clear_session_for_testing();
let large_response: Vec<u8> = (0..5000u32).map(|i| (i & 0xFF) as u8).collect();
let actions =
init_mgr.send_management_response(&link_id, &[0x11; 16], &large_response, &mut rng);
assert!(
actions.is_empty(),
"without a session key, no response packets should be emitted"
);
assert_eq!(
init_mgr
.links
.get(&link_id)
.map(|managed| managed.outgoing_resources.len()),
Some(1)
);
}
#[test]
fn test_send_channel_message_on_no_channel() {
let mut mgr = LinkManager::new();
let mut rng = OsRng;
let dummy_sig = [0xAA; 32];
let (link_id, _) =
mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
let err = mgr
.send_channel_message(&link_id, 1, b"test", &mut rng)
.expect_err("pending link should reject channel send");
assert_eq!(err, "link has no active channel");
}
#[test]
fn test_send_on_link_requires_active() {
let mut mgr = LinkManager::new();
let mut rng = OsRng;
let dummy_sig = [0xAA; 32];
let (link_id, _) =
mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
let actions = mgr.send_on_link(&link_id, b"test", constants::CONTEXT_NONE, &mut rng);
assert!(actions.is_empty(), "Cannot send on pending link");
}
#[test]
fn test_send_on_link_unknown_link() {
let mgr = LinkManager::new();
let mut rng = OsRng;
let actions = mgr.send_on_link(&[0xFF; 16], b"test", constants::CONTEXT_NONE, &mut rng);
assert!(actions.is_empty());
}
#[test]
fn test_resource_full_transfer_large() {
let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
let original_data: Vec<u8> = (0..2000u32)
.map(|i| {
let pos = i as usize;
(pos ^ (pos >> 8) ^ (pos >> 16)) as u8
})
.collect();
let adv_actions = init_mgr.send_resource(&link_id, &original_data, None, &mut rng);
let mut pending: Vec<(char, LinkManagerAction)> =
adv_actions.into_iter().map(|a| ('i', a)).collect();
let mut rounds = 0;
let max_rounds = 200;
let mut resource_received = false;
let mut sender_completed = false;
while !pending.is_empty() && rounds < max_rounds {
rounds += 1;
let mut next: Vec<(char, LinkManagerAction)> = Vec::new();
for (source, action) in pending.drain(..) {
if let LinkManagerAction::SendPacket { raw, .. } = action {
let pkt = match RawPacket::unpack(&raw) {
Ok(p) => p,
Err(_) => continue,
};
let target_actions = if source == 'i' {
resp_mgr.handle_local_delivery(
pkt.destination_hash,
&raw,
pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
)
} else {
init_mgr.handle_local_delivery(
pkt.destination_hash,
&raw,
pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
)
};
let target_source = if source == 'i' { 'r' } else { 'i' };
for a in &target_actions {
match a {
LinkManagerAction::ResourceReceived { data, .. } => {
assert_eq!(*data, original_data);
resource_received = true;
}
LinkManagerAction::ResourceCompleted { .. } => {
sender_completed = true;
}
_ => {}
}
}
next.extend(target_actions.into_iter().map(|a| (target_source, a)));
}
}
pending = next;
}
assert!(
resource_received,
"Should receive large resource (rounds={})",
rounds
);
assert!(
sender_completed,
"Sender should complete (rounds={})",
rounds
);
}
#[test]
fn test_resource_receiver_stores_original_advertisement_plaintext() {
let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
let data = vec![0x41; 256];
let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
let adv_raw = adv_actions
.iter()
.find_map(|action| match action {
LinkManagerAction::SendPacket { raw, .. } => {
let pkt = RawPacket::unpack(raw).ok()?;
(pkt.context == constants::CONTEXT_RESOURCE_ADV).then_some(raw.clone())
}
_ => None,
})
.expect("sender should emit a resource advertisement");
let adv_pkt = RawPacket::unpack(&adv_raw).unwrap();
let adv_plaintext = resp_mgr
.links
.get(&link_id)
.unwrap()
.engine
.decrypt(&adv_pkt.data)
.unwrap();
let _resp_actions = resp_mgr.handle_local_delivery(
adv_pkt.destination_hash,
&adv_raw,
adv_pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
let receiver = resp_mgr
.links
.get(&link_id)
.and_then(|managed| managed.incoming_resources.first())
.expect("advertisement should create an incoming receiver");
assert_eq!(receiver.advertisement_packet, adv_plaintext);
assert_eq!(
receiver.max_decompressed_size,
constants::RESOURCE_AUTO_COMPRESS_MAX_SIZE
);
}
#[test]
fn test_corrupt_compressed_resource_rejects_and_tears_down_link() {
let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
let data = vec![b'A'; 4096];
let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
let mut request_actions = Vec::new();
for action in &adv_actions {
let LinkManagerAction::SendPacket { raw, .. } = action else {
continue;
};
let pkt = RawPacket::unpack(raw).unwrap();
let actions = resp_mgr.handle_local_delivery(
pkt.destination_hash,
raw,
pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
request_actions.extend(actions);
}
{
let receiver = resp_mgr
.links
.get_mut(&link_id)
.and_then(|managed| managed.incoming_resources.first_mut())
.expect("receiver should exist after advertisement");
assert!(receiver.flags.compressed, "test data should be compressed");
receiver.max_decompressed_size = 64;
}
let mut responder_actions = Vec::new();
for action in request_actions {
let LinkManagerAction::SendPacket { raw, .. } = action else {
continue;
};
let pkt = RawPacket::unpack(&raw).unwrap();
let actions = init_mgr.handle_local_delivery(
pkt.destination_hash,
&raw,
pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
for action in actions {
let LinkManagerAction::SendPacket { raw, .. } = &action else {
continue;
};
let pkt = RawPacket::unpack(raw).unwrap();
let delivered = resp_mgr.handle_local_delivery(
pkt.destination_hash,
raw,
pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
responder_actions.extend(delivered);
}
}
assert!(
responder_actions.iter().any(|action| matches!(
action,
LinkManagerAction::ResourceFailed { error, .. }
if error == "Resource too large"
)),
"corrupt oversized resource should fail with TooLarge"
);
assert!(
responder_actions.iter().any(|action| matches!(
action,
LinkManagerAction::LinkClosed { link_id: closed_id, .. } if *closed_id == link_id
)),
"corrupt oversized resource should tear down the link"
);
assert!(
responder_actions.iter().any(|action| match action {
LinkManagerAction::SendPacket { raw, .. } => RawPacket::unpack(raw)
.map(|pkt| pkt.context == constants::CONTEXT_RESOURCE_RCL)
.unwrap_or(false),
_ => false,
}),
"corrupt oversized resource should send a receiver cancel/reject packet"
);
assert_eq!(
resp_mgr
.links
.get(&link_id)
.map(|managed| managed.engine.state()),
Some(LinkState::Closed)
);
}
#[test]
fn test_resource_hmu_timeout_extension_in_link_manager_flow() {
let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
let mut state = 0x1234_5678u32;
let data: Vec<u8> = (0..50000)
.map(|_| {
state = state.wrapping_mul(1664525).wrapping_add(1013904223);
(state >> 16) as u8
})
.collect();
let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
let mut pending: Vec<(char, LinkManagerAction)> =
adv_actions.into_iter().map(|a| ('i', a)).collect();
let mut rounds = 0;
while rounds < 300 {
rounds += 1;
let mut next: Vec<(char, LinkManagerAction)> = Vec::new();
for (source, action) in pending.drain(..) {
let LinkManagerAction::SendPacket { raw, .. } = action else {
continue;
};
let pkt = match RawPacket::unpack(&raw) {
Ok(p) => p,
Err(_) => continue,
};
let target_actions = if source == 'i' {
resp_mgr.handle_local_delivery(
pkt.destination_hash,
&raw,
pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
)
} else {
init_mgr.handle_local_delivery(
pkt.destination_hash,
&raw,
pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
)
};
let target_source = if source == 'i' { 'r' } else { 'i' };
next.extend(target_actions.into_iter().map(|a| (target_source, a)));
}
if resp_mgr
.links
.get(&link_id)
.and_then(|managed| managed.incoming_resources.first())
.is_some_and(|receiver| receiver.waiting_for_hmu)
{
break;
}
pending = next;
}
assert!(
resp_mgr
.links
.get(&link_id)
.and_then(|managed| managed.incoming_resources.first())
.is_some_and(|receiver| receiver.waiting_for_hmu),
"expected receiver to reach a live HMU wait state"
);
let prime_actions = {
let managed = resp_mgr.links.get_mut(&link_id).unwrap();
let receiver = managed.incoming_resources.first_mut().unwrap();
let decrypt_fn = |ciphertext: &[u8]| -> Result<Vec<u8>, ()> {
managed.engine.decrypt(ciphertext).map_err(|_| ())
};
receiver.tick(
receiver.last_activity + 0.0001,
&decrypt_fn,
&Bzip2Compressor,
)
};
assert!(
!prime_actions
.iter()
.any(|a| matches!(a, ResourceAction::SendRequest(_))),
"fresh HMU wait state should not immediately emit a retry request"
);
let (late_delta, retries_before) = {
let managed = resp_mgr
.links
.get_mut(&link_id)
.expect("receiver link should still exist");
let receiver = managed
.incoming_resources
.first_mut()
.expect("receiver should have an active incoming resource");
assert!(
receiver.waiting_for_hmu,
"receiver should be waiting for HMU"
);
let eifr = receiver.eifr.unwrap_or_else(|| {
(constants::RESOURCE_SDU as f64 * 8.0) / receiver.rtt.unwrap_or(0.5)
});
let expected_tof = if receiver.outstanding_parts > 0 {
(receiver.outstanding_parts as f64 * constants::RESOURCE_SDU as f64 * 8.0) / eifr
} else {
(3.0 * constants::RESOURCE_SDU as f64) / eifr
};
let expected_hmu_wait =
(constants::RESOURCE_SDU as f64 * 8.0 * constants::RESOURCE_HMU_WAIT_FACTOR) / eifr;
let old_delta = constants::RESOURCE_PART_TIMEOUT_FACTOR_AFTER_RTT * expected_tof
+ constants::RESOURCE_RETRY_GRACE_TIME;
(
old_delta + expected_hmu_wait + expected_hmu_wait.max(1.0),
receiver.retries_left,
)
};
{
let managed = resp_mgr.links.get(&link_id).unwrap();
let receiver = managed.incoming_resources.first().unwrap();
assert_eq!(receiver.retries_left, retries_before);
assert!(
receiver.eifr.is_some(),
"receiver tick should have populated EIFR"
);
}
let late_resource_actions = {
let managed = resp_mgr.links.get_mut(&link_id).unwrap();
let receiver = managed.incoming_resources.first_mut().unwrap();
let decrypt_fn = |ciphertext: &[u8]| -> Result<Vec<u8>, ()> {
managed.engine.decrypt(ciphertext).map_err(|_| ())
};
receiver.tick(
receiver.last_activity + late_delta,
&decrypt_fn,
&Bzip2Compressor,
)
};
let late_actions =
resp_mgr.process_resource_actions(&link_id, late_resource_actions, &mut rng);
let retry_raw = late_actions
.iter()
.find_map(|a| match a {
LinkManagerAction::SendPacket { raw, .. } => {
let pkt = RawPacket::unpack(raw).ok()?;
(pkt.context == constants::CONTEXT_RESOURCE_REQ).then_some(raw.clone())
}
_ => None,
})
.expect("receiver should emit a resource retry request after extended timeout");
{
let managed = resp_mgr.links.get(&link_id).unwrap();
let receiver = managed.incoming_resources.first().unwrap();
assert_eq!(receiver.retries_left, retries_before - 1);
}
let retry_pkt = RawPacket::unpack(&retry_raw).unwrap();
let retry_plaintext = resp_mgr
.links
.get(&link_id)
.unwrap()
.engine
.decrypt(&retry_pkt.data)
.expect("retry request should decrypt");
assert_eq!(retry_plaintext[0], constants::RESOURCE_HASHMAP_IS_EXHAUSTED);
let retry_to_sender = init_mgr.handle_local_delivery(
retry_pkt.destination_hash,
&retry_raw,
retry_pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
assert!(
retry_to_sender.iter().any(|a| match a {
LinkManagerAction::SendPacket { raw, .. } => RawPacket::unpack(raw)
.map(|pkt| pkt.context == constants::CONTEXT_RESOURCE_HMU)
.unwrap_or(false),
_ => false,
}),
"sender should answer the exhausted retry request with a live HMU packet"
);
}
#[test]
fn test_process_resource_actions_mapping() {
let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
let actions = vec![
ResourceAction::DataReceived {
data: vec![1, 2, 3],
metadata: Some(vec![4, 5]),
},
ResourceAction::Completed,
ResourceAction::Failed(rns_core::resource::ResourceError::Timeout),
ResourceAction::ProgressUpdate {
received: 10,
total: 20,
},
ResourceAction::TeardownLink,
];
let result = init_mgr.process_resource_actions(&link_id, actions, &mut rng);
assert!(matches!(
result[0],
LinkManagerAction::ResourceReceived { .. }
));
assert!(matches!(
result[1],
LinkManagerAction::ResourceCompleted { .. }
));
assert!(matches!(
result[2],
LinkManagerAction::ResourceFailed { .. }
));
assert!(matches!(
result[3],
LinkManagerAction::ResourceProgress {
received: 10,
total: 20,
..
}
));
assert!(result
.iter()
.any(|action| matches!(action, LinkManagerAction::LinkClosed { .. })));
}
#[test]
fn test_link_state_empty() {
let mgr = LinkManager::new();
let fake_id = [0xAA; 16];
assert!(mgr.link_state(&fake_id).is_none());
}
#[test]
fn test_large_response_resource_completes_as_response() {
let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
let large_payload: Vec<u8> = (0..5000u32).map(|i| (i & 0xFF) as u8).collect();
let response_value = rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(large_payload));
resp_mgr.register_request_handler("/large", None, {
let response_value = response_value.clone();
move |_link_id, _path, _data, _remote| Some(response_value.clone())
});
let req_actions = init_mgr.send_request(&link_id, "/large", b"\xc0", &mut rng);
let req_raw = extract_any_send_packet(&req_actions);
let req_pkt = RawPacket::unpack(&req_raw).unwrap();
let request_id = req_pkt.get_truncated_hash();
let resp_actions = resp_mgr.handle_local_delivery(
req_pkt.destination_hash,
&req_raw,
req_pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
let mut pending: Vec<(char, LinkManagerAction)> =
resp_actions.into_iter().map(|a| ('r', a)).collect();
let mut rounds = 0;
let mut received_response = None;
while !pending.is_empty() && rounds < 200 {
rounds += 1;
let mut next = Vec::new();
for (source, action) in pending.drain(..) {
let LinkManagerAction::SendPacket { raw, .. } = action else {
continue;
};
let pkt = RawPacket::unpack(&raw).unwrap();
let target_actions = if source == 'r' {
init_mgr.handle_local_delivery(
pkt.destination_hash,
&raw,
pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
)
} else {
resp_mgr.handle_local_delivery(
pkt.destination_hash,
&raw,
pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
)
};
let target_source = if source == 'r' { 'i' } else { 'r' };
for target_action in &target_actions {
match target_action {
LinkManagerAction::ResponseReceived {
request_id: rid,
data,
..
} => {
received_response = Some((*rid, data.clone()));
}
LinkManagerAction::ResourceReceived { .. } => {
panic!("response resources must complete as ResponseReceived")
}
LinkManagerAction::ResourceAcceptQuery { .. } => {
panic!("response resources must bypass application acceptance")
}
_ => {}
}
}
next.extend(target_actions.into_iter().map(|a| (target_source, a)));
}
pending = next;
}
let (received_request_id, received_data) = received_response.unwrap_or_else(|| {
panic!(
"large response resource did not complete as ResponseReceived after {} rounds",
rounds
)
});
assert_eq!(received_request_id, request_id);
assert_eq!(received_data, response_value);
}
#[test]
fn test_response_resource_preserves_metadata() {
let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
let payload = b"bundle-data".to_vec();
let metadata = b"git-status-ok".to_vec();
let response_value = rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(payload));
resp_mgr.register_request_handler_response("/fetch", None, {
let response_value = response_value.clone();
let metadata = metadata.clone();
move |_link_id, _path, _data, _remote| {
Some(RequestResponse::Resource {
data: response_value.clone(),
metadata: Some(metadata.clone()),
auto_compress: false,
})
}
});
let req_actions = init_mgr.send_request(&link_id, "/fetch", b"\xc0", &mut rng);
let req_raw = extract_any_send_packet(&req_actions);
let req_pkt = RawPacket::unpack(&req_raw).unwrap();
let request_id = req_pkt.get_truncated_hash();
let resp_actions = resp_mgr.handle_local_delivery(
req_pkt.destination_hash,
&req_raw,
req_pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
let mut pending: Vec<(char, LinkManagerAction)> =
resp_actions.into_iter().map(|a| ('r', a)).collect();
let mut received_response = None;
for _ in 0..200 {
if pending.is_empty() || received_response.is_some() {
break;
}
let mut next = Vec::new();
for (source, action) in pending.drain(..) {
let LinkManagerAction::SendPacket { raw, .. } = action else {
continue;
};
let pkt = RawPacket::unpack(&raw).unwrap();
let target_actions = if source == 'r' {
init_mgr.handle_local_delivery(
pkt.destination_hash,
&raw,
pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
)
} else {
resp_mgr.handle_local_delivery(
pkt.destination_hash,
&raw,
pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
)
};
let target_source = if source == 'r' { 'i' } else { 'r' };
for target_action in &target_actions {
match target_action {
LinkManagerAction::ResponseReceived {
request_id: rid,
data,
metadata: response_metadata,
..
} => {
received_response =
Some((*rid, data.clone(), response_metadata.clone()));
}
LinkManagerAction::ResourceReceived { .. } => {
panic!("response resources must complete as ResponseReceived")
}
_ => {}
}
}
next.extend(target_actions.into_iter().map(|a| (target_source, a)));
}
pending = next;
}
let (received_request_id, received_data, received_metadata) = received_response
.expect("resource response with metadata should complete as ResponseReceived");
assert_eq!(received_request_id, request_id);
assert_eq!(received_data, response_value);
assert_eq!(received_metadata, Some(metadata));
}
#[test]
fn test_negotiated_mtu_response_uses_resource_before_global_mtu() {
let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
init_mgr.set_link_mtu(&link_id, 300);
resp_mgr.set_link_mtu(&link_id, 300);
let payload = vec![0xAB; 350];
let response_value = rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(payload));
resp_mgr.register_request_handler("/mtu", None, {
let response_value = response_value.clone();
move |_link_id, _path, _data, _remote| Some(response_value.clone())
});
let req_actions = init_mgr.send_request(&link_id, "/mtu", b"\xc0", &mut rng);
let req_raw = extract_any_send_packet(&req_actions);
let req_pkt = RawPacket::unpack(&req_raw).unwrap();
let resp_actions = resp_mgr.handle_local_delivery(
req_pkt.destination_hash,
&req_raw,
req_pkt.packet_hash,
rns_core::transport::types::InterfaceId(0),
&mut rng,
);
let mut has_resource_adv = false;
let mut direct_response_len = None;
for action in &resp_actions {
if let LinkManagerAction::SendPacket { raw, .. } = action {
let pkt = RawPacket::unpack(raw).unwrap();
has_resource_adv |= pkt.context == constants::CONTEXT_RESOURCE_ADV;
if pkt.context == constants::CONTEXT_RESPONSE {
direct_response_len = Some(raw.len());
}
}
}
assert!(
has_resource_adv,
"responses larger than the negotiated link MTU should use resource fallback"
);
assert!(
direct_response_len.is_none(),
"sent direct response of {} bytes on a 300 byte negotiated MTU",
direct_response_len.unwrap_or_default()
);
}
#[test]
fn test_large_management_response_uses_resource_fallback() {
let (_init_mgr, mut resp_mgr, link_id) = setup_active_link();
let mut rng = OsRng;
let payload = vec![0xBC; 5000];
let response_value = rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(payload));
let actions =
resp_mgr.send_management_response(&link_id, &[0x55; 16], &response_value, &mut rng);
let mut has_resource_adv = false;
let mut has_direct_response = false;
for action in &actions {
if let LinkManagerAction::SendPacket { raw, .. } = action {
let pkt = RawPacket::unpack(raw).unwrap();
has_resource_adv |= pkt.context == constants::CONTEXT_RESOURCE_ADV;
has_direct_response |= pkt.context == constants::CONTEXT_RESPONSE;
}
}
assert!(
has_resource_adv,
"large management responses should advertise a response resource"
);
assert!(
!has_direct_response,
"large management responses should not use a direct CONTEXT_RESPONSE packet"
);
}
}