pub(crate) mod backward;
pub(crate) mod forward;
use std::sync::Arc;
use std::time::Duration;
use futures::channel::mpsc;
use tor_cell::chancell::CircId;
use tor_linkspec::OwnedChanTarget;
use tor_rtcompat::Runtime;
use crate::channel::Channel;
use crate::circuit::circhop::{CircHopOutbound, HopSettings};
use crate::circuit::reactor::Reactor as BaseReactor;
use crate::circuit::reactor::hop_mgr::HopMgr;
use crate::circuit::reactor::stream;
use crate::circuit::{CircuitRxReceiver, UniqId};
use crate::crypto::cell::{InboundRelayLayer, OutboundRelayLayer};
use crate::memquota::CircuitAccount;
use crate::relay::RelayCirc;
use crate::relay::channel_provider::ChannelProvider;
use crate::relay::reactor::backward::Backward;
use crate::relay::reactor::forward::Forward;
use crate::client::circuit::padding::{PaddingController, PaddingEventStream};
type RelayBaseReactor<R> = BaseReactor<R, Forward, Backward>;
#[allow(unused)] #[must_use = "If you don't call run() on a reactor, the circuit won't work."]
pub(crate) struct Reactor<R: Runtime>(RelayBaseReactor<R>);
struct StreamHandler;
impl stream::StreamHandler for StreamHandler {
fn halfstream_expiry(&self, hop: &CircHopOutbound) -> Duration {
let ccontrol = hop.ccontrol();
ccontrol
.lock()
.expect("poisoned lock")
.rtt()
.max_rtt_usec()
.map(|rtt| Duration::from_millis(u64::from(rtt)))
.unwrap_or_default()
}
}
#[allow(unused)] impl<R: Runtime> Reactor<R> {
#[allow(clippy::too_many_arguments)] pub(crate) fn new(
runtime: R,
channel: &Arc<Channel>,
circ_id: CircId,
unique_id: UniqId,
input: CircuitRxReceiver,
crypto_in: Box<dyn InboundRelayLayer + Send>,
crypto_out: Box<dyn OutboundRelayLayer + Send>,
settings: &HopSettings,
chan_provider: Arc<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
padding_ctrl: PaddingController,
padding_event_stream: PaddingEventStream,
memquota: &CircuitAccount,
) -> crate::Result<(Self, Arc<RelayCirc>)> {
#[allow(clippy::disallowed_methods)]
let (stream_tx, stream_rx) = mpsc::channel(0);
let mut hop_mgr = HopMgr::new(
runtime.clone(),
unique_id,
StreamHandler,
stream_tx,
memquota.clone(),
);
hop_mgr.add_hop(settings.clone())?;
#[allow(clippy::disallowed_methods)]
let (fwd_ev_tx, fwd_ev_rx) = mpsc::channel(0);
let forward_foo = Forward::new(
unique_id,
crypto_out,
chan_provider,
fwd_ev_tx,
memquota.clone(),
);
let backward_foo = Backward::new(crypto_in);
let (inner, handle) = BaseReactor::new(
runtime,
channel,
circ_id,
unique_id,
input,
forward_foo,
backward_foo,
hop_mgr,
padding_ctrl,
padding_event_stream,
stream_rx,
fwd_ev_rx,
memquota,
);
let reactor = Self(inner);
let handle = Arc::new(RelayCirc(handle));
Ok((reactor, handle))
}
pub(crate) async fn run(mut self) -> crate::Result<()> {
self.0.run().await
}
}
#[cfg(test)]
pub(crate) mod test {
#![allow(clippy::bool_assert_comparison)]
#![allow(clippy::clone_on_copy)]
#![allow(clippy::dbg_macro)]
#![allow(clippy::mixed_attributes_style)]
#![allow(clippy::print_stderr)]
#![allow(clippy::print_stdout)]
#![allow(clippy::single_char_pattern)]
#![allow(clippy::unwrap_used)]
#![allow(clippy::unchecked_time_subtraction)]
#![allow(clippy::useless_vec)]
#![allow(clippy::needless_pass_by_value)]
use super::*;
use crate::channel::test::{CodecResult, new_reactor};
use crate::circuit::reactor::test::{AllowAllStreamsFilter, rmsg_to_ccmsg};
use crate::circuit::{CircParameters, CircuitRxSender};
use crate::client::circuit::padding::new_padding;
use crate::congestion::test_utils::params::build_cc_vegas_params;
use crate::crypto::cell::RelayCellBody;
use crate::crypto::cell::{InboundRelayLayer, OutboundRelayLayer};
use crate::fake_mpsc;
use crate::memquota::SpecificAccount as _;
use crate::relay::channel_provider::{ChannelProvider, OutboundChanSender};
use crate::stream::flow_ctrl::params::FlowCtrlParameters;
use crate::stream::incoming::{IncomingStream, IncomingStreamRequestFilter};
use futures::channel::mpsc::{Receiver, Sender};
use futures::{AsyncReadExt as _, SinkExt as _, StreamExt as _};
use tracing_test::traced_test;
use tor_cell::chancell::{AnyChanCell, ChanCell, ChanCmd, msg as chanmsg};
use tor_cell::relaycell::{
AnyRelayMsgOuter, RelayCellFormat, RelayCmd, StreamId, msg as relaymsg,
};
use tor_linkspec::{EncodedLinkSpec, LinkSpec};
use tor_protover::{Protocols, named};
use tor_rtcompat::SpawnExt;
use tor_rtcompat::{DynTimeProvider, Runtime};
use tor_rtmock::MockRuntime;
use chanmsg::{AnyChanMsg, DestroyReason, HandshakeType};
use relaymsg::SendmeTag;
use std::net::IpAddr;
use std::sync::{Arc, Mutex, mpsc};
struct DummyInboundCrypto {}
struct DummyOutboundCrypto {
recognized_rx: mpsc::Receiver<Recognized>,
}
const DUMMY_TAG: [u8; 20] = [1; 20];
impl InboundRelayLayer for DummyInboundCrypto {
fn originate(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) -> SendmeTag {
DUMMY_TAG.into()
}
fn encrypt_inbound(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) {}
}
impl OutboundRelayLayer for DummyOutboundCrypto {
fn decrypt_outbound(
&mut self,
_cmd: ChanCmd,
_cell: &mut RelayCellBody,
) -> Option<SendmeTag> {
let recognized = self.recognized_rx.recv().unwrap();
match recognized {
Recognized::Yes => Some(DUMMY_TAG.into()),
Recognized::No => None,
}
}
}
struct DummyChanProvider<R> {
runtime: R,
outbound: Arc<Mutex<Option<DummyChan>>>,
}
impl<R: Runtime> DummyChanProvider<R> {
fn new(runtime: R, outbound: Arc<Mutex<Option<DummyChan>>>) -> Self {
Self { runtime, outbound }
}
}
impl<R: Runtime> ChannelProvider for DummyChanProvider<R> {
type BuildSpec = OwnedChanTarget;
fn get_or_launch(
self: Arc<Self>,
_reactor_id: UniqId,
_target: Self::BuildSpec,
tx: OutboundChanSender,
) -> crate::Result<()> {
let dummy_chan = working_fake_channel(&self.runtime);
let chan = Arc::clone(&dummy_chan.channel);
{
let mut lock = self.outbound.lock().unwrap();
assert!(lock.is_none());
*lock = Some(dummy_chan);
}
tx.send(Ok(chan));
Ok(())
}
}
struct DummyChan {
rx: Receiver<AnyChanCell>,
tx: Sender<CodecResult>,
channel: Arc<Channel>,
}
struct ReactorTestCtrl {
relay_circ: Arc<RelayCirc>,
circmsg_send: CircuitRxSender,
inbound_chan: DummyChan,
outbound_chan: Arc<Mutex<Option<DummyChan>>>,
recognized_tx: mpsc::Sender<Recognized>,
}
enum Recognized {
Yes,
No,
}
impl ReactorTestCtrl {
fn spawn_reactor<R: Runtime>(rt: &R) -> Self {
let inbound_chan = working_fake_channel(rt);
let circid = CircId::new(1337).unwrap();
let unique_id = UniqId::new(8, 17);
let (padding_ctrl, padding_stream) = new_padding(DynTimeProvider::new(rt.clone()));
let (circmsg_send, circmsg_recv) = fake_mpsc(64);
let params = CircParameters::new(
true,
build_cc_vegas_params(),
FlowCtrlParameters::defaults_for_tests(),
);
let settings = HopSettings::from_params_and_caps(
crate::circuit::circhop::HopNegotiationType::Full,
¶ms,
&[named::FLOWCTRL_CC].into_iter().collect::<Protocols>(),
)
.unwrap();
let outbound_chan = Arc::new(Mutex::new(None));
let (recognized_tx, recognized_rx) = mpsc::channel();
let chan_provider = Arc::new(DummyChanProvider::new(
rt.clone(),
Arc::clone(&outbound_chan),
));
let (reactor, relay_circ) = Reactor::new(
rt.clone(),
&Arc::clone(&inbound_chan.channel),
circid,
unique_id,
circmsg_recv,
Box::new(DummyInboundCrypto {}),
Box::new(DummyOutboundCrypto { recognized_rx }),
&settings,
chan_provider,
padding_ctrl,
padding_stream,
&CircuitAccount::new_noop(),
)
.unwrap();
rt.spawn(async {
let _ = reactor.run().await;
})
.unwrap();
Self {
relay_circ,
circmsg_send,
recognized_tx,
inbound_chan,
outbound_chan,
}
}
async fn send_fwd(
&mut self,
id: Option<StreamId>,
msg: relaymsg::AnyRelayMsg,
recognized: Recognized,
early: bool,
) {
self.recognized_tx.send(recognized).unwrap();
self.circmsg_send
.send(rmsg_to_ccmsg(id, msg, early))
.await
.unwrap();
}
fn outbound_chan_launched(&self) -> bool {
self.outbound_chan.lock().unwrap().is_some()
}
async fn allow_stream_requests<'a, FILT>(
&self,
allow_commands: &'a [RelayCmd],
filter: FILT,
) -> impl futures::Stream<Item = IncomingStream> + use<'a, FILT>
where
FILT: IncomingStreamRequestFilter,
{
Arc::clone(&self.relay_circ)
.allow_stream_requests(allow_commands, filter)
.await
.unwrap()
}
async fn do_create2_handshake(
&mut self,
rt: &MockRuntime,
expected_hs_type: HandshakeType,
) {
let (circid, msg) = self.read_outbound().into_circid_and_msg();
let _create2 = match msg {
chanmsg::AnyChanMsg::Create2(c) => {
assert_eq!(c.handshake_type(), expected_hs_type);
c
}
_ => panic!("unexpected forwarded {msg:?}"),
};
let handshake = vec![];
let created2 = chanmsg::Created2::new(handshake);
self.write_outbound(circid, chanmsg::AnyChanMsg::Created2(created2));
rt.advance_until_stalled().await;
}
fn is_closing(&self) -> bool {
self.relay_circ.is_closing()
}
fn read_inbound(&mut self) -> ChanCell<AnyChanMsg> {
#[allow(deprecated)] self.inbound_chan.rx.try_next().unwrap().unwrap()
}
fn read_outbound(&mut self) -> ChanCell<AnyChanMsg> {
let mut lock = self.outbound_chan.lock().unwrap();
let chan = lock.as_mut().unwrap();
#[allow(deprecated)] chan.rx.try_next().unwrap().unwrap()
}
fn write_outbound(&mut self, circid: Option<CircId>, msg: chanmsg::AnyChanMsg) {
let mut lock = self.outbound_chan.lock().unwrap();
let chan = lock.as_mut().unwrap();
let cell = ChanCell::new(circid, msg);
chan.tx.try_send(Ok(cell)).unwrap();
}
}
fn working_fake_channel<R: Runtime>(rt: &R) -> DummyChan {
let (channel, chan_reactor, rx, tx) = new_reactor(rt.clone());
rt.spawn(async {
let _ignore = chan_reactor.run().await;
})
.unwrap();
DummyChan { tx, rx, channel }
}
fn dummy_linkspecs() -> Vec<EncodedLinkSpec> {
vec![
LinkSpec::Ed25519Id([43; 32].into()).encode().unwrap(),
LinkSpec::RsaId([45; 20].into()).encode().unwrap(),
LinkSpec::OrPort("127.0.0.1".parse::<IpAddr>().unwrap(), 999)
.encode()
.unwrap(),
]
}
fn assert_circuit_destroyed(ctrl: &mut ReactorTestCtrl, reason: DestroyReason) {
assert!(ctrl.is_closing());
let cell = ctrl.read_inbound();
match cell.msg() {
chanmsg::AnyChanMsg::Destroy(d) => {
assert_eq!(d.reason(), reason);
}
_ => panic!("unexpected ending {cell:?}"),
}
}
#[traced_test]
#[test]
fn reject_extend2_relay() {
tor_rtmock::MockRuntime::test_with_various(|rt| async move {
let mut ctrl = ReactorTestCtrl::spawn_reactor(&rt);
rt.advance_until_stalled().await;
let linkspecs = dummy_linkspecs();
let extend2 = relaymsg::Extend2::new(linkspecs, HandshakeType::NTOR_V3, vec![]).into();
ctrl.send_fwd(None, extend2, Recognized::Yes, false).await;
rt.advance_until_stalled().await;
assert!(logs_contain("got EXTEND2 in a RELAY cell?!"));
assert!(!ctrl.outbound_chan_launched());
assert_circuit_destroyed(&mut ctrl, DestroyReason::NONE);
});
}
#[traced_test]
#[test]
fn extend_and_forward() {
tor_rtmock::MockRuntime::test_with_various(|rt| async move {
let mut ctrl = ReactorTestCtrl::spawn_reactor(&rt);
rt.advance_until_stalled().await;
assert!(!ctrl.outbound_chan_launched());
let linkspecs = dummy_linkspecs();
let handshake_type = HandshakeType::NTOR_V3;
let extend2 = relaymsg::Extend2::new(linkspecs, handshake_type, vec![]).into();
ctrl.send_fwd(None, extend2, Recognized::Yes, true).await;
rt.advance_until_stalled().await;
assert!(logs_contain(
"Launched channel to the next hop circ_id=Circ 8.17"
));
assert!(ctrl.outbound_chan_launched());
assert!(!ctrl.is_closing());
ctrl.do_create2_handshake(&rt, handshake_type).await;
assert!(logs_contain("Got CREATED2 response from next hop"));
assert!(logs_contain("Extended circuit to the next hop"));
let early = false;
let begin = relaymsg::Begin::new("127.0.0.1", 1111, 0).unwrap();
ctrl.send_fwd(None, begin.clone().into(), Recognized::No, early)
.await;
rt.advance_until_stalled().await;
macro_rules! expect_cell {
($chanmsg:tt, $relaymsg:tt) => {{
let cell = ctrl.read_outbound();
let msg = match cell.msg() {
chanmsg::AnyChanMsg::$chanmsg(m) => {
let body = m.clone().into_relay_body();
AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, body).unwrap()
}
_ => panic!("unexpected forwarded {cell:?}"),
};
match msg.msg() {
relaymsg::AnyRelayMsg::$relaymsg(m) => m.clone(),
_ => panic!("unexpected cell {msg:?}"),
}
}};
}
let recvd_begin = expect_cell!(Relay, Begin);
assert_eq!(begin, recvd_begin);
let early = true;
let begin = relaymsg::Begin::new("127.0.0.1", 1111, 0).unwrap();
ctrl.send_fwd(None, begin.clone().into(), Recognized::No, early)
.await;
rt.advance_until_stalled().await;
let recvd_begin = expect_cell!(RelayEarly, Begin);
assert_eq!(begin, recvd_begin);
});
}
#[traced_test]
#[test]
fn forward_before_extend() {
tor_rtmock::MockRuntime::test_with_various(|rt| async move {
let mut ctrl = ReactorTestCtrl::spawn_reactor(&rt);
rt.advance_until_stalled().await;
let extend2 = relaymsg::End::new_misc().into();
ctrl.send_fwd(None, extend2, Recognized::No, true).await;
rt.advance_until_stalled().await;
assert!(logs_contain(
"Asked to forward cell before the circuit was extended?!"
));
assert_circuit_destroyed(&mut ctrl, DestroyReason::NONE);
});
}
#[traced_test]
#[test]
fn reject_invalid_begin() {
tor_rtmock::MockRuntime::test_with_various(|rt| async move {
let mut ctrl = ReactorTestCtrl::spawn_reactor(&rt);
rt.advance_until_stalled().await;
let _streams = ctrl
.allow_stream_requests(&[RelayCmd::BEGIN], AllowAllStreamsFilter)
.await;
let begin = relaymsg::Begin::new("127.0.0.1", 1111, 0).unwrap().into();
ctrl.send_fwd(None, begin, Recognized::Yes, false).await;
rt.advance_until_stalled().await;
assert!(logs_contain(
"Invalid stream ID [scrubbed] for relay command BEGIN"
));
assert_circuit_destroyed(&mut ctrl, DestroyReason::NONE);
});
}
#[traced_test]
#[test]
#[ignore] fn data_stream() {
tor_rtmock::MockRuntime::test_with_various(|rt| async move {
const TO_SEND: &[u8] = b"The bells were musical in the silvery sun";
let mut ctrl = ReactorTestCtrl::spawn_reactor(&rt);
rt.advance_until_stalled().await;
let mut incoming_streams = ctrl
.allow_stream_requests(&[RelayCmd::BEGIN], AllowAllStreamsFilter)
.await;
let begin = relaymsg::Begin::new("127.0.0.1", 1111, 0).unwrap().into();
ctrl.send_fwd(StreamId::new(1), begin, Recognized::Yes, false)
.await;
rt.advance_until_stalled().await;
let data = relaymsg::Data::new(TO_SEND).unwrap().into();
ctrl.send_fwd(StreamId::new(1), data, Recognized::Yes, false)
.await;
let pending = incoming_streams.next().await.unwrap();
let mut stream = pending
.accept_data(relaymsg::Connected::new_empty())
.await
.unwrap();
let mut recv_buf = [0_u8; TO_SEND.len()];
stream.read_exact(&mut recv_buf).await.unwrap();
assert_eq!(recv_buf, TO_SEND);
});
}
}