use crate::{
primitives::RouterId,
runtime::{Counter, MetricsHandle, Runtime},
transport::ssu2::{
message::data::RelayBlock,
metrics::DUPLICATE_PKTS,
relay::types::{RejectionReason, RelayCommand},
session::active::Ssu2Session,
},
};
use alloc::vec::Vec;
use core::net::SocketAddr;
const LOG_TARGET: &str = "emissary::ssu2::active::relay";
impl<R: Runtime> Ssu2Session<R> {
fn insert_relay_message(&mut self, nonce: u32) -> bool {
if self.duplicate_filter.insert(nonce) {
return true;
}
tracing::debug!(
target: LOG_TARGET,
router_id = %self.router_id,
?nonce,
"ignoring duplicate message",
);
self.router_ctx
.metrics_handle()
.counter(DUPLICATE_PKTS)
.increment_with_label(1, "kind", "relay");
false
}
pub fn handle_relay_request(
&mut self,
nonce: u32,
relay_tag: u32,
address: SocketAddr,
message: Vec<u8>,
signature: Vec<u8>,
) {
if self.insert_relay_message(nonce) {
self.relay_handle.handle_relay_request(
self.router_id.clone(),
nonce,
relay_tag,
address,
message,
signature,
);
}
}
pub fn handle_relay_intro(
&mut self,
alice_router_id: RouterId,
nonce: u32,
relay_tag: u32,
address: SocketAddr,
message: Vec<u8>,
signature: Vec<u8>,
) {
if self.insert_relay_message(nonce) {
self.relay_handle.handle_relay_intro(
alice_router_id,
self.router_id.clone(),
self.pending_router_info.take(),
nonce,
relay_tag,
address,
message,
signature,
);
}
}
pub fn handle_relay_response(
&mut self,
nonce: u32,
address: Option<SocketAddr>,
token: Option<u64>,
rejection: Option<RejectionReason>,
message: Vec<u8>,
signature: Vec<u8>,
) {
if self.insert_relay_message(nonce) {
self.relay_handle
.handle_relay_response(nonce, address, token, rejection, message, signature);
}
}
pub fn handle_relay_command(&mut self, command: RelayCommand) {
match command {
RelayCommand::RelayRequest {
nonce,
message,
signature,
} => {
tracing::trace!(
target: LOG_TARGET,
router_id = %self.router_id,
?nonce,
"send relay request",
);
self.transmission.schedule(RelayBlock::Request { message, signature });
}
RelayCommand::RelayResponse {
nonce,
rejection,
message,
signature,
token,
} => {
tracing::trace!(
target: LOG_TARGET,
router_id = %self.router_id,
?nonce,
"send relay response",
);
self.transmission.schedule(RelayBlock::Response {
rejection,
message,
signature,
token,
});
}
RelayCommand::RelayIntro {
router_id,
router_info,
message,
signature,
} => {
tracing::trace!(
target: LOG_TARGET,
router_id = %self.router_id,
"send relay intro",
);
self.transmission.schedule((
RelayBlock::Intro {
router_id,
message,
signature,
},
router_info,
));
}
RelayCommand::Dummy => unreachable!(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
crypto::SigningPrivateKey,
router::context::builder::RouterContextBuilder,
runtime::{mock::MockRuntime, UdpSocket},
subsystem::SubsystemEvent,
transport::ssu2::{
peer_test::types::{PeerTestEvent, PeerTestEventRecycle, PeerTestHandle},
relay::types::{RelayEvent, RelayHandle},
session::{active::Ssu2SessionContext, KeyContext},
Packet,
},
};
use thingbuf::mpsc::{channel, with_recycle, Receiver, Sender};
async fn make_session() -> (
Ssu2Session<MockRuntime>,
Sender<Packet>,
Receiver<SubsystemEvent>,
Receiver<PeerTestEvent, PeerTestEventRecycle>,
Receiver<RelayEvent>,
) {
let (pkt_tx, pkt_rx) = channel(16);
let ctx = Ssu2SessionContext {
address: "127.0.0.1:8888".parse().unwrap(),
dst_id: 1337,
intro_key: [0xaa; 32],
max_payload_size: 1472,
pkt_rx,
recv_key_ctx: KeyContext {
k_data: [0xbb; 32],
k_header_2: [0xcc; 32],
},
router_id: RouterId::random(),
send_key_ctx: KeyContext {
k_data: [0xdd; 32],
k_header_2: [0xee; 32],
},
verifying_key: SigningPrivateKey::random(&mut MockRuntime::rng()).public(),
};
let socket = <MockRuntime as Runtime>::UdpSocket::bind("127.0.0.1:0".parse().unwrap())
.await
.unwrap();
let (transport_tx, transport_rx) = channel(16);
let router_ctx = RouterContextBuilder::default().build();
let (peer_test_tx, peer_test_rx) = with_recycle(16, PeerTestEventRecycle::default());
let peer_test_handle = PeerTestHandle::new(peer_test_tx);
let (relay_tx, relay_rx) = channel(16);
let relay_handle = RelayHandle::new(relay_tx);
(
Ssu2Session::<MockRuntime>::new(
ctx,
socket,
transport_tx,
router_ctx,
peer_test_handle,
relay_handle,
),
pkt_tx,
transport_rx,
peer_test_rx,
relay_rx,
)
}
#[tokio::test]
async fn duplicate_relay_request() {
let (mut session, _pkt_tx, _transport_rx, _peer_test_rx, relay_rx) = make_session().await;
session.handle_relay_request(
1337,
1338,
"127.0.0.1:9999".parse().unwrap(),
vec![1, 3, 3, 7],
vec![0xff; 64],
);
match relay_rx.try_recv().unwrap() {
RelayEvent::RelayRequest {
nonce,
relay_tag,
message,
..
} => {
assert_eq!(nonce, 1337);
assert_eq!(relay_tag, 1338);
assert_eq!(message, vec![1, 3, 3, 7]);
}
_ => panic!("invalid event"),
}
session.handle_relay_request(
1337,
1338,
"127.0.0.1:9999".parse().unwrap(),
vec![1, 3, 3, 7],
vec![0xff; 64],
);
assert!(relay_rx.try_recv().is_err());
}
#[tokio::test]
async fn duplicate_relay_intro() {
let (mut session, _pkt_tx, _transport_rx, _peer_test_rx, relay_rx) = make_session().await;
session.handle_relay_intro(
RouterId::random(),
1337,
1338,
"127.0.0.1:9999".parse().unwrap(),
vec![1, 3, 3, 7],
vec![0xff; 64],
);
match relay_rx.try_recv().unwrap() {
RelayEvent::RelayIntro {
nonce,
relay_tag,
message,
..
} => {
assert_eq!(nonce, 1337);
assert_eq!(relay_tag, 1338);
assert_eq!(message, vec![1, 3, 3, 7]);
}
_ => panic!("invalid event"),
}
session.handle_relay_intro(
RouterId::random(),
1337,
1338,
"127.0.0.1:9999".parse().unwrap(),
vec![1, 3, 3, 7],
vec![0xff; 64],
);
assert!(relay_rx.try_recv().is_err());
}
#[tokio::test]
async fn duplicate_relay_response() {
let (mut session, _pkt_tx, _transport_rx, _peer_test_rx, relay_rx) = make_session().await;
session.handle_relay_response(
1337,
None,
None,
Some(RejectionReason::Unspecified),
vec![1, 3, 3, 7],
vec![0xaa; 64],
);
match relay_rx.try_recv().unwrap() {
RelayEvent::RelayResponse {
nonce,
rejection,
message,
..
} => {
assert_eq!(nonce, 1337);
assert_eq!(rejection, Some(RejectionReason::Unspecified));
assert_eq!(message, vec![1, 3, 3, 7]);
}
_ => panic!("invalid event"),
}
session.handle_relay_response(
1337,
None,
None,
Some(RejectionReason::Unspecified),
vec![1, 3, 3, 7],
vec![0xaa; 64],
);
assert!(relay_rx.try_recv().is_err());
}
}