#![allow(unused_variables, dead_code)]
#![cfg(feature = "rns-hooks-wasm")]
use std::path::PathBuf;
#[cfg(feature = "rns-hooks-builtin")]
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc;
use std::time::{Duration, Instant};
use rns_core::transport::types::InterfaceId;
use rns_crypto::identity::Identity;
use rns_crypto::OsRng;
#[cfg(feature = "rns-hooks-builtin")]
use rns_hooks_crate::{BuiltinHookCall, BuiltinHookHost, HookError, HookResult};
use rns_net::{
AnnouncedIdentity, Callbacks, DestHash, Destination, IdentityHash, InterfaceConfig, NodeConfig,
PacketHash, ProofStrategy, RnsNode, TcpClientConfig, TcpServerConfig, MODE_FULL,
};
fn wasm_bytes(name: &str) -> Vec<u8> {
let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("../rns-hooks/target/wasm-examples")
.join(format!("{}.wasm", name));
std::fs::read(&path).unwrap_or_else(|_| {
panic!(
"{} not found, run rns-hooks/build-examples.sh",
path.display()
)
})
}
#[cfg(feature = "rns-hooks-builtin")]
static BUILTIN_ID_COUNTER: AtomicUsize = AtomicUsize::new(0);
#[cfg(feature = "rns-hooks-builtin")]
fn register_builtin_continue_hook(label: &str) -> String {
let id = format!(
"test.rns_net.{}.{}.{}",
label,
std::process::id(),
BUILTIN_ID_COUNTER.fetch_add(1, Ordering::SeqCst)
);
rns_hooks_crate::register_builtin_hook(id.clone(), builtin_continue_hook)
.expect("register built-in test hook");
id
}
#[cfg(feature = "rns-hooks-builtin")]
fn builtin_continue_hook(
_call: BuiltinHookCall<'_>,
_host: &mut BuiltinHookHost,
) -> Result<HookResult, HookError> {
Ok(HookResult::continue_result())
}
#[derive(Debug, Clone)]
enum TestEvent {
Announce(AnnouncedIdentity),
Delivery {
dest_hash: DestHash,
raw: Vec<u8>,
packet_hash: PacketHash,
},
InterfaceUp(InterfaceId),
}
struct TestCallbacks {
tx: mpsc::Sender<TestEvent>,
}
impl TestCallbacks {
fn new(tx: mpsc::Sender<TestEvent>) -> Self {
TestCallbacks { tx }
}
}
impl Callbacks for TestCallbacks {
fn on_announce(&mut self, announced: AnnouncedIdentity) {
let _ = self.tx.send(TestEvent::Announce(announced));
}
fn on_path_updated(&mut self, _: DestHash, _: u8) {}
fn on_local_delivery(&mut self, dest_hash: DestHash, raw: Vec<u8>, packet_hash: PacketHash) {
let _ = self.tx.send(TestEvent::Delivery {
dest_hash,
raw,
packet_hash,
});
}
fn on_interface_up(&mut self, id: InterfaceId) {
let _ = self.tx.send(TestEvent::InterfaceUp(id));
}
fn on_interface_down(&mut self, _: InterfaceId) {}
}
struct TransportCallbacks;
impl Callbacks for TransportCallbacks {
fn on_announce(&mut self, _: AnnouncedIdentity) {}
fn on_path_updated(&mut self, _: DestHash, _: u8) {}
fn on_local_delivery(&mut self, _: DestHash, _: Vec<u8>, _: PacketHash) {}
}
fn find_free_port() -> u16 {
use std::sync::atomic::{AtomicU16, Ordering};
static NEXT_PORT: AtomicU16 = AtomicU16::new(0);
let pid = std::process::id() as u16;
let base = 20_000 + (pid % 250) * 160;
let _ = NEXT_PORT.compare_exchange(0, base, Ordering::SeqCst, Ordering::SeqCst);
loop {
let port = NEXT_PORT.fetch_add(1, Ordering::SeqCst);
if std::net::TcpListener::bind(("127.0.0.1", port)).is_ok() {
return port;
}
}
}
fn decrypt_delivery(raw: &[u8], identity: &Identity) -> Option<Vec<u8>> {
let packet = rns_core::packet::RawPacket::unpack(raw).ok()?;
identity.decrypt(&packet.data).ok()
}
fn wait_for_event<F, T>(
rx: &mpsc::Receiver<TestEvent>,
timeout: Duration,
mut predicate: F,
) -> Option<T>
where
F: FnMut(&TestEvent) -> Option<T>,
{
let deadline = Instant::now() + timeout;
loop {
let remaining = deadline
.checked_duration_since(Instant::now())
.unwrap_or(Duration::ZERO);
if remaining.is_zero() {
return None;
}
match rx.recv_timeout(remaining) {
Ok(event) => {
if let Some(result) = predicate(&event) {
return Some(result);
}
}
Err(_) => return None,
}
}
}
fn wait_for_announce(
rx: &mpsc::Receiver<TestEvent>,
expected_hash: &DestHash,
timeout: Duration,
) -> Option<AnnouncedIdentity> {
wait_for_event(rx, timeout, |event| match event {
TestEvent::Announce(a) if a.dest_hash == *expected_hash => Some(a.clone()),
_ => None,
})
}
fn wait_for_delivery(
rx: &mpsc::Receiver<TestEvent>,
timeout: Duration,
) -> Option<(DestHash, Vec<u8>, PacketHash)> {
wait_for_event(rx, timeout, |event| match event {
TestEvent::Delivery {
dest_hash,
raw,
packet_hash,
} => Some((dest_hash.clone(), raw.clone(), packet_hash.clone())),
_ => None,
})
}
const TIMEOUT: Duration = Duration::from_secs(10);
const SETTLE: Duration = Duration::from_millis(500);
const APP_NAME: &str = "e2e_hooks_test";
fn wait_for_interface_up(rx: &mpsc::Receiver<TestEvent>, timeout: Duration) {
wait_for_event(rx, timeout, |event| match event {
TestEvent::InterfaceUp(_) => Some(()),
_ => None,
})
.expect("Timed out waiting for InterfaceUp");
}
fn start_transport_node(port: u16) -> RnsNode {
RnsNode::start(
NodeConfig {
panic_on_interface_error: false,
transport_enabled: true,
identity: Some(Identity::new(&mut OsRng)),
interfaces: vec![InterfaceConfig {
name: String::new(),
type_name: "TCPServerInterface".to_string(),
config_data: Box::new(TcpServerConfig {
name: "Transport TCP".into(),
listen_ip: "127.0.0.1".into(),
listen_port: port,
..TcpServerConfig::default()
}),
mode: MODE_FULL,
ingress_control: rns_core::transport::types::IngressControlConfig::enabled(),
ifac: None,
discovery: None,
}],
share_instance: false,
instance_name: "default".into(),
shared_instance_port: 37428,
rpc_port: 0,
cache_dir: None,
ratchet_store: None,
ratchet_expiry: std::time::Duration::from_secs(rns_core::constants::RATCHET_EXPIRY),
management: Default::default(),
probe_port: None,
probe_addrs: vec![],
probe_protocol: rns_core::holepunch::ProbeProtocol::Rnsp,
device: None,
hooks: Vec::new(),
known_destinations_ttl: Duration::from_secs(48 * 60 * 60),
known_destinations_max_entries: 8192,
announce_table_ttl: std::time::Duration::from_secs(
rns_core::constants::ANNOUNCE_TABLE_TTL as u64,
),
announce_table_max_bytes: rns_core::constants::ANNOUNCE_TABLE_MAX_BYTES,
announce_sig_cache_enabled: true,
announce_sig_cache_max_entries: rns_core::constants::ANNOUNCE_SIG_CACHE_MAXSIZE,
announce_sig_cache_ttl: std::time::Duration::from_secs(
rns_core::constants::ANNOUNCE_SIG_CACHE_TTL as u64,
),
discover_interfaces: false,
discovery_required_value: None,
respond_to_probes: false,
prefer_shorter_path: false,
max_paths_per_destination: 1,
packet_hashlist_max_entries: rns_core::constants::HASHLIST_MAXSIZE,
max_discovery_pr_tags: rns_core::constants::MAX_PR_TAGS,
max_path_destinations: usize::MAX,
max_tunnel_destinations_total: usize::MAX,
driver_event_queue_capacity: rns_net::event::DEFAULT_EVENT_QUEUE_CAPACITY,
interface_writer_queue_capacity:
rns_net::interface::DEFAULT_ASYNC_WRITER_QUEUE_CAPACITY,
#[cfg(feature = "iface-backbone")]
backbone_peer_pool: None,
registry: None,
#[cfg(feature = "hooks")]
provider_bridge: None,
},
Box::new(TransportCallbacks),
)
.expect("Failed to start transport node")
}
fn start_client_node(port: u16, identity: &Identity, callbacks: Box<dyn Callbacks>) -> RnsNode {
RnsNode::start(
NodeConfig {
panic_on_interface_error: false,
transport_enabled: false,
identity: Some(Identity::from_private_key(
&identity.get_private_key().unwrap(),
)),
interfaces: vec![InterfaceConfig {
name: String::new(),
type_name: "TCPClientInterface".to_string(),
config_data: Box::new(TcpClientConfig {
name: "Client TCP".into(),
target_host: "127.0.0.1".into(),
target_port: port,
..Default::default()
}),
mode: MODE_FULL,
ingress_control: rns_core::transport::types::IngressControlConfig::enabled(),
ifac: None,
discovery: None,
}],
share_instance: false,
instance_name: "default".into(),
shared_instance_port: 37428,
rpc_port: 0,
cache_dir: None,
ratchet_store: None,
ratchet_expiry: std::time::Duration::from_secs(rns_core::constants::RATCHET_EXPIRY),
management: Default::default(),
probe_port: None,
probe_addrs: vec![],
probe_protocol: rns_core::holepunch::ProbeProtocol::Rnsp,
device: None,
hooks: Vec::new(),
known_destinations_ttl: Duration::from_secs(48 * 60 * 60),
known_destinations_max_entries: 8192,
announce_table_ttl: std::time::Duration::from_secs(
rns_core::constants::ANNOUNCE_TABLE_TTL as u64,
),
announce_table_max_bytes: rns_core::constants::ANNOUNCE_TABLE_MAX_BYTES,
announce_sig_cache_enabled: true,
announce_sig_cache_max_entries: rns_core::constants::ANNOUNCE_SIG_CACHE_MAXSIZE,
announce_sig_cache_ttl: std::time::Duration::from_secs(
rns_core::constants::ANNOUNCE_SIG_CACHE_TTL as u64,
),
discover_interfaces: false,
discovery_required_value: None,
respond_to_probes: false,
prefer_shorter_path: false,
max_paths_per_destination: 1,
packet_hashlist_max_entries: rns_core::constants::HASHLIST_MAXSIZE,
max_discovery_pr_tags: rns_core::constants::MAX_PR_TAGS,
max_path_destinations: usize::MAX,
max_tunnel_destinations_total: usize::MAX,
driver_event_queue_capacity: rns_net::event::DEFAULT_EVENT_QUEUE_CAPACITY,
interface_writer_queue_capacity:
rns_net::interface::DEFAULT_ASYNC_WRITER_QUEUE_CAPACITY,
#[cfg(feature = "iface-backbone")]
backbone_peer_pool: None,
registry: None,
#[cfg(feature = "hooks")]
provider_bridge: None,
},
callbacks,
)
.expect("Failed to start client node")
}
#[allow(clippy::type_complexity)]
fn setup_two_peers() -> (
RnsNode,
RnsNode,
mpsc::Receiver<TestEvent>,
RnsNode,
mpsc::Receiver<TestEvent>,
Identity,
Identity,
Destination,
Destination,
) {
let port = find_free_port();
let transport = start_transport_node(port);
let alice_identity = Identity::new(&mut OsRng);
let alice_ih = IdentityHash(*alice_identity.hash());
let alice_dest = Destination::single_in(APP_NAME, &["msg", "rx"], alice_ih)
.set_proof_strategy(ProofStrategy::ProveAll);
let bob_identity = Identity::new(&mut OsRng);
let bob_ih = IdentityHash(*bob_identity.hash());
let bob_dest = Destination::single_in(APP_NAME, &["msg", "rx"], bob_ih)
.set_proof_strategy(ProofStrategy::ProveAll);
let (alice_tx, alice_rx) = mpsc::channel();
let alice_node = start_client_node(
port,
&alice_identity,
Box::new(TestCallbacks::new(alice_tx)),
);
alice_node
.register_destination_with_proof(
&alice_dest,
Some(alice_identity.get_private_key().unwrap()),
)
.unwrap();
let (bob_tx, bob_rx) = mpsc::channel();
let bob_node = start_client_node(port, &bob_identity, Box::new(TestCallbacks::new(bob_tx)));
bob_node
.register_destination_with_proof(&bob_dest, Some(bob_identity.get_private_key().unwrap()))
.unwrap();
wait_for_interface_up(&alice_rx, TIMEOUT);
wait_for_interface_up(&bob_rx, TIMEOUT);
std::thread::sleep(SETTLE);
(
transport,
alice_node,
alice_rx,
bob_node,
bob_rx,
alice_identity,
bob_identity,
alice_dest,
bob_dest,
)
}
fn announce_bob_to_alice(
bob_node: &RnsNode,
bob_dest: &Destination,
bob_id: &Identity,
alice_rx: &mpsc::Receiver<TestEvent>,
) -> AnnouncedIdentity {
for _ in 0..10 {
let _ = bob_node.announce(bob_dest, bob_id, Some(b"Bob"));
if let Some(ann) = wait_for_announce(alice_rx, &bob_dest.hash, Duration::from_secs(2)) {
return ann;
}
}
panic!("Alice never received Bob's announce after retries");
}
#[test]
fn test_load_list_unload_hooks() {
let port = find_free_port();
let transport = start_transport_node(port);
let identity = Identity::new(&mut OsRng);
let (tx, _rx) = mpsc::channel();
let node = start_client_node(port, &identity, Box::new(TestCallbacks::new(tx)));
std::thread::sleep(SETTLE);
let result = node
.load_hook(
"packet_logger".into(),
wasm_bytes("packet_logger"),
"PreIngress".into(),
10,
)
.expect("send failed");
assert!(result.is_ok(), "load_hook failed: {:?}", result.err());
let hooks = node.list_hooks().expect("list_hooks send failed");
assert_eq!(hooks.len(), 1);
assert_eq!(hooks[0].name, "packet_logger");
assert_eq!(hooks[0].attach_point, "PreIngress");
assert_eq!(hooks[0].priority, 10);
assert!(hooks[0].enabled);
let result = node
.unload_hook("packet_logger".into(), "PreIngress".into())
.expect("send failed");
assert!(result.is_ok(), "unload_hook failed: {:?}", result.err());
let hooks = node.list_hooks().expect("list_hooks send failed");
assert!(
hooks.is_empty(),
"Expected no hooks after unload, got {:?}",
hooks
);
node.shutdown();
transport.shutdown();
}
#[test]
fn test_load_list_unload_backbone_peer_hooks() {
let port = find_free_port();
let transport = start_transport_node(port);
let identity = Identity::new(&mut OsRng);
let (tx, _rx) = mpsc::channel();
let node = start_client_node(port, &identity, Box::new(TestCallbacks::new(tx)));
std::thread::sleep(SETTLE);
let result = node
.load_hook(
"packet_logger".into(),
wasm_bytes("packet_logger"),
"BackbonePeerConnected".into(),
5,
)
.expect("send failed");
assert!(result.is_ok(), "load_hook failed: {:?}", result.err());
let hooks = node.list_hooks().expect("list_hooks send failed");
assert_eq!(hooks.len(), 1);
assert_eq!(hooks[0].name, "packet_logger");
assert_eq!(hooks[0].attach_point, "BackbonePeerConnected");
assert_eq!(hooks[0].priority, 5);
assert!(hooks[0].enabled);
let result = node
.unload_hook("packet_logger".into(), "BackbonePeerConnected".into())
.expect("send failed");
assert!(result.is_ok(), "unload_hook failed: {:?}", result.err());
let hooks = node.list_hooks().expect("list_hooks send failed");
assert!(
hooks.is_empty(),
"Expected no hooks after unload, got {:?}",
hooks
);
node.shutdown();
transport.shutdown();
}
#[test]
fn test_reload_hook() {
let port = find_free_port();
let transport = start_transport_node(port);
let identity = Identity::new(&mut OsRng);
let (tx, _rx) = mpsc::channel();
let node = start_client_node(port, &identity, Box::new(TestCallbacks::new(tx)));
std::thread::sleep(SETTLE);
node.load_hook(
"my_hook".into(),
wasm_bytes("packet_logger"),
"PreIngress".into(),
5,
)
.expect("send failed")
.expect("load_hook failed");
let result = node
.reload_hook(
"my_hook".into(),
"PreIngress".into(),
wasm_bytes("announce_filter"),
)
.expect("send failed");
assert!(result.is_ok(), "reload_hook failed: {:?}", result.err());
let hooks = node.list_hooks().expect("list_hooks send failed");
assert_eq!(hooks.len(), 1);
assert_eq!(hooks[0].name, "my_hook");
assert_eq!(hooks[0].priority, 5);
node.shutdown();
transport.shutdown();
}
#[test]
fn test_enable_disable_hook() {
let port = find_free_port();
let transport = start_transport_node(port);
let identity = Identity::new(&mut OsRng);
let (tx, _rx) = mpsc::channel();
let node = start_client_node(port, &identity, Box::new(TestCallbacks::new(tx)));
std::thread::sleep(SETTLE);
node.load_hook(
"toggle_hook".into(),
wasm_bytes("packet_logger"),
"PreIngress".into(),
5,
)
.expect("send failed")
.expect("load_hook failed");
node.set_hook_enabled("toggle_hook".into(), "PreIngress".into(), false)
.expect("send failed")
.expect("disable failed");
let hooks = node.list_hooks().expect("list_hooks send failed");
assert_eq!(hooks.len(), 1);
assert!(!hooks[0].enabled);
node.set_hook_enabled("toggle_hook".into(), "PreIngress".into(), true)
.expect("send failed")
.expect("enable failed");
let hooks = node.list_hooks().expect("list_hooks send failed");
assert!(hooks[0].enabled);
node.shutdown();
transport.shutdown();
}
#[test]
fn test_set_hook_priority_reorders_listing() {
let port = find_free_port();
let transport = start_transport_node(port);
let identity = Identity::new(&mut OsRng);
let (tx, _rx) = mpsc::channel();
let node = start_client_node(port, &identity, Box::new(TestCallbacks::new(tx)));
std::thread::sleep(SETTLE);
node.load_hook(
"low".into(),
wasm_bytes("packet_logger"),
"PreIngress".into(),
1,
)
.expect("send failed")
.expect("load low failed");
node.load_hook(
"high".into(),
wasm_bytes("announce_filter"),
"PreIngress".into(),
10,
)
.expect("send failed")
.expect("load high failed");
node.set_hook_priority("low".into(), "PreIngress".into(), 20)
.expect("send failed")
.expect("set priority failed");
let hooks = node.list_hooks().expect("list_hooks send failed");
assert_eq!(hooks.len(), 2);
assert_eq!(hooks[0].name, "low");
assert_eq!(hooks[0].priority, 20);
assert_eq!(hooks[1].name, "high");
node.shutdown();
transport.shutdown();
}
#[test]
fn test_unload_nonexistent_returns_error() {
let port = find_free_port();
let transport = start_transport_node(port);
let identity = Identity::new(&mut OsRng);
let (tx, _rx) = mpsc::channel();
let node = start_client_node(port, &identity, Box::new(TestCallbacks::new(tx)));
std::thread::sleep(SETTLE);
let result = node
.unload_hook("nope".into(), "PreIngress".into())
.expect("send failed");
assert!(
result.is_err(),
"Expected error for nonexistent hook unload"
);
node.shutdown();
transport.shutdown();
}
#[test]
fn test_reload_nonexistent_returns_error() {
let port = find_free_port();
let transport = start_transport_node(port);
let identity = Identity::new(&mut OsRng);
let (tx, _rx) = mpsc::channel();
let node = start_client_node(port, &identity, Box::new(TestCallbacks::new(tx)));
std::thread::sleep(SETTLE);
let result = node
.reload_hook(
"nope".into(),
"PreIngress".into(),
wasm_bytes("packet_logger"),
)
.expect("send failed");
assert!(
result.is_err(),
"Expected error for nonexistent hook reload"
);
node.shutdown();
transport.shutdown();
}
#[test]
fn test_load_invalid_wasm_returns_error() {
let port = find_free_port();
let transport = start_transport_node(port);
let identity = Identity::new(&mut OsRng);
let (tx, _rx) = mpsc::channel();
let node = start_client_node(port, &identity, Box::new(TestCallbacks::new(tx)));
std::thread::sleep(SETTLE);
let result = node
.load_hook(
"bad".into(),
vec![0xFF, 0x00, 0xDE, 0xAD],
"PreIngress".into(),
0,
)
.expect("send failed");
assert!(result.is_err(), "Expected error for invalid WASM bytes");
node.shutdown();
transport.shutdown();
}
#[test]
#[cfg(feature = "rns-hooks-builtin")]
fn test_load_list_unload_builtin_hook() {
let port = find_free_port();
let transport = start_transport_node(port);
let identity = Identity::new(&mut OsRng);
let (tx, _rx) = mpsc::channel();
let node = start_client_node(port, &identity, Box::new(TestCallbacks::new(tx)));
std::thread::sleep(SETTLE);
let builtin_id = register_builtin_continue_hook("lifecycle");
let result = node
.load_builtin_hook(
"builtin_continue".into(),
builtin_id,
"PreIngress".into(),
10,
)
.expect("send failed");
assert!(
result.is_ok(),
"load_builtin_hook failed: {:?}",
result.err()
);
let hooks = node.list_hooks().expect("list_hooks send failed");
assert_eq!(hooks.len(), 1);
assert_eq!(hooks[0].name, "builtin_continue");
assert_eq!(hooks[0].hook_type, "builtin");
assert_eq!(hooks[0].attach_point, "PreIngress");
assert_eq!(hooks[0].priority, 10);
assert!(hooks[0].enabled);
node.unload_hook("builtin_continue".into(), "PreIngress".into())
.expect("send failed")
.expect("unload_hook failed");
assert!(node
.list_hooks()
.expect("list_hooks send failed")
.is_empty());
node.shutdown();
transport.shutdown();
}
#[test]
#[cfg(feature = "rns-hooks-builtin")]
fn test_reload_builtin_hook_preserves_listing_state() {
let port = find_free_port();
let transport = start_transport_node(port);
let identity = Identity::new(&mut OsRng);
let (tx, _rx) = mpsc::channel();
let node = start_client_node(port, &identity, Box::new(TestCallbacks::new(tx)));
std::thread::sleep(SETTLE);
let first_builtin_id = register_builtin_continue_hook("reload_a");
let second_builtin_id = register_builtin_continue_hook("reload_b");
node.load_builtin_hook(
"reloadable_builtin".into(),
first_builtin_id,
"Tick".into(),
7,
)
.expect("send failed")
.expect("load_builtin_hook failed");
let result = node
.reload_builtin_hook(
"reloadable_builtin".into(),
"Tick".into(),
second_builtin_id,
)
.expect("send failed");
assert!(
result.is_ok(),
"reload_builtin_hook failed: {:?}",
result.err()
);
let hooks = node.list_hooks().expect("list_hooks send failed");
assert_eq!(hooks.len(), 1);
assert_eq!(hooks[0].name, "reloadable_builtin");
assert_eq!(hooks[0].hook_type, "builtin");
assert_eq!(hooks[0].attach_point, "Tick");
assert_eq!(hooks[0].priority, 7);
assert!(hooks[0].enabled);
node.shutdown();
transport.shutdown();
}
#[test]
#[cfg(feature = "rns-hooks-builtin")]
fn test_load_unknown_builtin_hook_returns_error() {
let port = find_free_port();
let transport = start_transport_node(port);
let identity = Identity::new(&mut OsRng);
let (tx, _rx) = mpsc::channel();
let node = start_client_node(port, &identity, Box::new(TestCallbacks::new(tx)));
std::thread::sleep(SETTLE);
let missing_id = format!(
"test.rns_net.missing.{}.{}",
std::process::id(),
BUILTIN_ID_COUNTER.fetch_add(1, Ordering::SeqCst)
);
let result = node
.load_builtin_hook("missing_builtin".into(), missing_id, "PreIngress".into(), 0)
.expect("send failed");
assert!(
result.is_err(),
"Expected error for unregistered built-in hook"
);
node.shutdown();
transport.shutdown();
}
#[test]
fn test_packet_logger_does_not_block_delivery() {
let (
transport,
alice_node,
alice_rx,
bob_node,
bob_rx,
_alice_id,
bob_id,
_alice_dest,
bob_dest,
) = setup_two_peers();
let bob_announced = announce_bob_to_alice(&bob_node, &bob_dest, &bob_id, &alice_rx);
bob_node
.load_hook(
"packet_logger".into(),
wasm_bytes("packet_logger"),
"PreIngress".into(),
0,
)
.expect("send failed")
.expect("load_hook failed");
let dest_to_bob = Destination::single_out(APP_NAME, &["msg", "rx"], &bob_announced);
let plaintext = b"Hello through hook!";
alice_node.send_packet(&dest_to_bob, plaintext).unwrap();
let (_, raw, _) = wait_for_delivery(&bob_rx, TIMEOUT)
.expect("Bob did not receive message with packet_logger hook active");
let decrypted = decrypt_delivery(&raw, &bob_id).expect("Decryption failed");
assert_eq!(decrypted, plaintext);
alice_node.shutdown();
bob_node.shutdown();
transport.shutdown();
}
#[test]
#[cfg(feature = "rns-hooks-builtin")]
fn test_builtin_hook_does_not_block_delivery() {
let (
transport,
alice_node,
alice_rx,
bob_node,
bob_rx,
_alice_id,
bob_id,
_alice_dest,
bob_dest,
) = setup_two_peers();
let bob_announced = announce_bob_to_alice(&bob_node, &bob_dest, &bob_id, &alice_rx);
let builtin_id = register_builtin_continue_hook("traffic");
bob_node
.load_builtin_hook(
"builtin_continue".into(),
builtin_id,
"PreIngress".into(),
0,
)
.expect("send failed")
.expect("load_builtin_hook failed");
let dest_to_bob = Destination::single_out(APP_NAME, &["msg", "rx"], &bob_announced);
let plaintext = b"Hello through built-in hook!";
alice_node.send_packet(&dest_to_bob, plaintext).unwrap();
let (_, raw, _) = wait_for_delivery(&bob_rx, TIMEOUT)
.expect("Bob did not receive message with built-in hook active");
let decrypted = decrypt_delivery(&raw, &bob_id).expect("Decryption failed");
assert_eq!(decrypted, plaintext);
alice_node.shutdown();
bob_node.shutdown();
transport.shutdown();
}
#[test]
fn test_announce_filter_allows_low_hop_announces() {
let (
transport,
alice_node,
alice_rx,
bob_node,
_bob_rx,
_alice_id,
bob_id,
_alice_dest,
bob_dest,
) = setup_two_peers();
alice_node
.load_hook(
"announce_filter".into(),
wasm_bytes("announce_filter"),
"AnnounceReceived".into(),
0,
)
.expect("send failed")
.expect("load_hook failed");
let bob_announced = announce_bob_to_alice(&bob_node, &bob_dest, &bob_id, &alice_rx);
assert_eq!(bob_announced.dest_hash, bob_dest.hash);
alice_node.shutdown();
bob_node.shutdown();
transport.shutdown();
}
#[test]
fn test_multiple_hooks_on_same_point() {
let (
transport,
alice_node,
alice_rx,
bob_node,
_bob_rx,
_alice_id,
bob_id,
_alice_dest,
bob_dest,
) = setup_two_peers();
alice_node
.load_hook(
"announce_filter".into(),
wasm_bytes("announce_filter"),
"AnnounceReceived".into(),
100,
)
.expect("send failed")
.expect("load announce_filter failed");
alice_node
.load_hook(
"packet_logger".into(),
wasm_bytes("packet_logger"),
"AnnounceReceived".into(),
0,
)
.expect("send failed")
.expect("load packet_logger failed");
let hooks = alice_node.list_hooks().expect("list_hooks send failed");
assert_eq!(hooks.len(), 2, "Expected 2 hooks, got {:?}", hooks);
let bob_announced = announce_bob_to_alice(&bob_node, &bob_dest, &bob_id, &alice_rx);
assert_eq!(bob_announced.dest_hash, bob_dest.hash);
alice_node.shutdown();
bob_node.shutdown();
transport.shutdown();
}
#[test]
fn test_load_hook_after_traffic_flowing() {
let (
transport,
alice_node,
alice_rx,
bob_node,
bob_rx,
_alice_id,
bob_id,
_alice_dest,
bob_dest,
) = setup_two_peers();
let bob_announced = announce_bob_to_alice(&bob_node, &bob_dest, &bob_id, &alice_rx);
let dest_to_bob = Destination::single_out(APP_NAME, &["msg", "rx"], &bob_announced);
alice_node
.send_packet(&dest_to_bob, b"before hook")
.unwrap();
let (_, raw, _) =
wait_for_delivery(&bob_rx, TIMEOUT).expect("Bob did not receive pre-hook message");
let decrypted = decrypt_delivery(&raw, &bob_id).expect("Decryption failed");
assert_eq!(decrypted, b"before hook");
bob_node
.load_hook(
"packet_logger".into(),
wasm_bytes("packet_logger"),
"PreIngress".into(),
0,
)
.expect("send failed")
.expect("load_hook failed");
alice_node.send_packet(&dest_to_bob, b"after hook").unwrap();
let (_, raw, _) =
wait_for_delivery(&bob_rx, TIMEOUT).expect("Bob did not receive post-hook message");
let decrypted = decrypt_delivery(&raw, &bob_id).expect("Decryption failed");
assert_eq!(decrypted, b"after hook");
alice_node.shutdown();
bob_node.shutdown();
transport.shutdown();
}
#[test]
fn test_unload_hook_restores_behavior() {
let (
transport,
alice_node,
alice_rx,
bob_node,
bob_rx,
_alice_id,
bob_id,
_alice_dest,
bob_dest,
) = setup_two_peers();
let bob_announced = announce_bob_to_alice(&bob_node, &bob_dest, &bob_id, &alice_rx);
let dest_to_bob = Destination::single_out(APP_NAME, &["msg", "rx"], &bob_announced);
bob_node
.load_hook(
"packet_logger".into(),
wasm_bytes("packet_logger"),
"PreIngress".into(),
0,
)
.expect("send failed")
.expect("load_hook failed");
alice_node.send_packet(&dest_to_bob, b"with hook").unwrap();
let (_, raw, _) =
wait_for_delivery(&bob_rx, TIMEOUT).expect("Bob did not receive message with hook active");
let decrypted = decrypt_delivery(&raw, &bob_id).expect("Decryption failed");
assert_eq!(decrypted, b"with hook");
bob_node
.unload_hook("packet_logger".into(), "PreIngress".into())
.expect("send failed")
.expect("unload_hook failed");
alice_node
.send_packet(&dest_to_bob, b"after unload")
.unwrap();
let (_, raw, _) =
wait_for_delivery(&bob_rx, TIMEOUT).expect("Bob did not receive message after hook unload");
let decrypted = decrypt_delivery(&raw, &bob_id).expect("Decryption failed");
assert_eq!(decrypted, b"after unload");
alice_node.shutdown();
bob_node.shutdown();
transport.shutdown();
}
#[test]
fn test_rate_limiter_drops_excess() {
let (
transport,
alice_node,
alice_rx,
bob_node,
bob_rx,
_alice_id,
bob_id,
_alice_dest,
bob_dest,
) = setup_two_peers();
let bob_announced = announce_bob_to_alice(&bob_node, &bob_dest, &bob_id, &alice_rx);
let dest_to_bob = Destination::single_out(APP_NAME, &["msg", "rx"], &bob_announced);
bob_node
.load_hook(
"rate_limiter".into(),
wasm_bytes("rate_limiter"),
"PreIngress".into(),
0,
)
.expect("send failed")
.expect("load_hook failed");
for i in 0..3u8 {
let msg = [b'r', b'l', b'0' + i];
alice_node.send_packet(&dest_to_bob, &msg).unwrap();
let (_, raw, _) = wait_for_delivery(&bob_rx, TIMEOUT)
.expect("Bob did not receive message through rate_limiter");
let decrypted = decrypt_delivery(&raw, &bob_id).expect("Decryption failed");
assert_eq!(decrypted, &msg);
}
alice_node.shutdown();
bob_node.shutdown();
transport.shutdown();
}
#[test]
fn test_allowlist_blocks_unknown() {
let (
transport,
alice_node,
alice_rx,
bob_node,
_bob_rx,
_alice_id,
bob_id,
_alice_dest,
bob_dest,
) = setup_two_peers();
alice_node
.load_hook(
"allowlist".into(),
wasm_bytes("allowlist"),
"AnnounceReceived".into(),
0,
)
.expect("send failed")
.expect("load_hook failed");
for _ in 0..3 {
let _ = bob_node.announce(&bob_dest, &bob_id, Some(b"Bob"));
}
std::thread::sleep(Duration::from_secs(3));
let got = wait_for_announce(&alice_rx, &bob_dest.hash, Duration::from_secs(2));
assert!(
got.is_none(),
"Expected allowlist to drop announce, but Alice received it"
);
alice_node.shutdown();
bob_node.shutdown();
transport.shutdown();
}
#[test]
fn test_packet_mirror_does_not_block() {
let (
transport,
alice_node,
alice_rx,
bob_node,
bob_rx,
_alice_id,
bob_id,
_alice_dest,
bob_dest,
) = setup_two_peers();
let bob_announced = announce_bob_to_alice(&bob_node, &bob_dest, &bob_id, &alice_rx);
let dest_to_bob = Destination::single_out(APP_NAME, &["msg", "rx"], &bob_announced);
bob_node
.load_hook(
"packet_mirror".into(),
wasm_bytes("packet_mirror"),
"PreIngress".into(),
0,
)
.expect("send failed")
.expect("load_hook failed");
alice_node
.send_packet(&dest_to_bob, b"mirror test")
.unwrap();
let (_, raw, _) = wait_for_delivery(&bob_rx, TIMEOUT)
.expect("Bob did not receive message with packet_mirror active");
let decrypted = decrypt_delivery(&raw, &bob_id).expect("Decryption failed");
assert_eq!(decrypted, b"mirror test");
alice_node.shutdown();
bob_node.shutdown();
transport.shutdown();
}
#[test]
fn test_link_guard_loads_and_continues() {
let (
transport,
alice_node,
alice_rx,
bob_node,
bob_rx,
_alice_id,
bob_id,
_alice_dest,
bob_dest,
) = setup_two_peers();
let bob_announced = announce_bob_to_alice(&bob_node, &bob_dest, &bob_id, &alice_rx);
let dest_to_bob = Destination::single_out(APP_NAME, &["msg", "rx"], &bob_announced);
bob_node
.load_hook(
"link_guard".into(),
wasm_bytes("link_guard"),
"LinkRequestReceived".into(),
0,
)
.expect("send failed")
.expect("load_hook failed");
alice_node.send_packet(&dest_to_bob, b"guard test").unwrap();
let (_, raw, _) = wait_for_delivery(&bob_rx, TIMEOUT)
.expect("Bob did not receive message with link_guard active");
let decrypted = decrypt_delivery(&raw, &bob_id).expect("Decryption failed");
assert_eq!(decrypted, b"guard test");
alice_node.shutdown();
bob_node.shutdown();
transport.shutdown();
}
#[test]
fn test_announce_dedup_loads_and_allows_first() {
let (
transport,
alice_node,
alice_rx,
bob_node,
_bob_rx,
_alice_id,
bob_id,
_alice_dest,
bob_dest,
) = setup_two_peers();
alice_node
.load_hook(
"announce_dedup".into(),
wasm_bytes("announce_dedup"),
"AnnounceRetransmit".into(),
0,
)
.expect("send failed")
.expect("load_hook failed");
let bob_announced = announce_bob_to_alice(&bob_node, &bob_dest, &bob_id, &alice_rx);
assert_eq!(bob_announced.dest_hash, bob_dest.hash);
alice_node.shutdown();
bob_node.shutdown();
transport.shutdown();
}
#[test]
fn test_metrics_does_not_interfere() {
let (
transport,
alice_node,
alice_rx,
bob_node,
bob_rx,
_alice_id,
bob_id,
_alice_dest,
bob_dest,
) = setup_two_peers();
let bob_announced = announce_bob_to_alice(&bob_node, &bob_dest, &bob_id, &alice_rx);
let dest_to_bob = Destination::single_out(APP_NAME, &["msg", "rx"], &bob_announced);
bob_node
.load_hook(
"metrics".into(),
wasm_bytes("metrics"),
"PreIngress".into(),
0,
)
.expect("send failed")
.expect("load_hook failed");
for i in 0..5u8 {
let msg = [b'm', b'e', b't', b'0' + i];
alice_node.send_packet(&dest_to_bob, &msg).unwrap();
let (_, raw, _) = wait_for_delivery(&bob_rx, TIMEOUT)
.expect("Bob did not receive message with metrics hook active");
let decrypted = decrypt_delivery(&raw, &bob_id).expect("Decryption failed");
assert_eq!(decrypted, &msg);
}
alice_node.shutdown();
bob_node.shutdown();
transport.shutdown();
}