pub(crate) mod circhop;
pub(crate) mod backward;
pub(crate) mod forward;
pub(crate) mod hop_mgr;
pub(crate) mod macros;
pub(crate) mod stream;
use std::result::Result as StdResult;
use std::sync::Arc;
use derive_deftly::Deftly;
use futures::channel::mpsc;
use futures::{FutureExt as _, StreamExt as _, select_biased};
use oneshot_fused_workaround as oneshot;
use tracing::trace;
use tor_cell::chancell::CircId;
use tor_rtcompat::{DynTimeProvider, Runtime};
use crate::channel::Channel;
use crate::circuit::reactor::backward::BackwardHandler;
use crate::circuit::reactor::forward::ForwardHandler;
use crate::circuit::reactor::hop_mgr::HopMgr;
use crate::circuit::reactor::stream::ReadyStreamMsg;
use crate::circuit::{CircuitRxReceiver, UniqId};
use crate::memquota::CircuitAccount;
use crate::util::err::ReactorError;
use crate::client::circuit::padding::{PaddingController, PaddingEventStream};
use backward::BackwardReactor;
use forward::ForwardReactor;
use macros::derive_deftly_template_CircuitReactor;
pub(crate) type ReactorResultChannel<T> = oneshot::Sender<crate::Result<T>>;
#[derive(derive_more::Debug)]
pub(crate) struct CircReactorHandle<F: ForwardHandler, B: BackwardHandler> {
#[debug(skip)]
pub(crate) control: mpsc::UnboundedSender<CtrlMsg<F::CtrlMsg, B::CtrlMsg>>,
#[debug(skip)]
pub(crate) command: mpsc::UnboundedSender<CtrlCmd<F::CtrlCmd, B::CtrlCmd>>,
pub(crate) time_provider: DynTimeProvider,
pub(crate) memquota: CircuitAccount,
}
#[allow(unused)] pub(crate) enum CtrlCmd<F, B> {
Forward(forward::CtrlCmd<F>),
Backward(backward::CtrlCmd<B>),
Shutdown,
}
#[allow(unused)] pub(crate) enum CtrlMsg<F, B> {
Forward(forward::CtrlMsg<F>),
Backward(backward::CtrlMsg<B>),
}
#[derive(Deftly)]
#[derive_deftly(CircuitReactor)]
#[deftly(reactor_name = "circuit reactor")]
#[deftly(only_run_once)]
#[deftly(run_inner_fn = "Self::run_inner")]
#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
pub(crate) struct Reactor<R: Runtime, F: ForwardHandler, B: BackwardHandler> {
unique_id: UniqId,
forward: Option<ForwardReactor<R, F>>,
backward: Option<BackwardReactor<B>>,
control: mpsc::UnboundedReceiver<CtrlMsg<F::CtrlMsg, B::CtrlMsg>>,
command: mpsc::UnboundedReceiver<CtrlCmd<F::CtrlCmd, B::CtrlCmd>>,
fwd_ctrl: ReactorCtrl<forward::CtrlCmd<F::CtrlCmd>, forward::CtrlMsg<F::CtrlMsg>>,
bwd_ctrl: ReactorCtrl<backward::CtrlCmd<B::CtrlCmd>, backward::CtrlMsg<B::CtrlMsg>>,
}
struct ReactorCtrl<C, M> {
command_tx: mpsc::UnboundedSender<C>,
control_tx: mpsc::UnboundedSender<M>,
}
impl<C, M> ReactorCtrl<C, M> {
fn new(command_tx: mpsc::UnboundedSender<C>, control_tx: mpsc::UnboundedSender<M>) -> Self {
Self {
command_tx,
control_tx,
}
}
fn send_cmd(&mut self, cmd: C) -> Result<(), ReactorError> {
self.command_tx
.unbounded_send(cmd)
.map_err(|_| ReactorError::Shutdown)
}
fn send_msg(&mut self, msg: M) -> Result<(), ReactorError> {
self.control_tx
.unbounded_send(msg)
.map_err(|_| ReactorError::Shutdown)
}
}
pub(crate) trait ControlHandler {
type CtrlMsg;
type CtrlCmd;
fn handle_cmd(&mut self, cmd: Self::CtrlCmd) -> StdResult<(), ReactorError>;
fn handle_msg(&mut self, msg: Self::CtrlMsg) -> StdResult<(), ReactorError>;
}
#[allow(unused)] impl<R: Runtime, F: ForwardHandler + ControlHandler, B: BackwardHandler + ControlHandler>
Reactor<R, F, B>
{
#[allow(clippy::too_many_arguments)] pub(crate) fn new(
runtime: R,
channel: &Arc<Channel>,
circ_id: CircId,
unique_id: UniqId,
inbound_chan_rx: CircuitRxReceiver,
forward_impl: F,
backward_impl: B,
hop_mgr: HopMgr<R>,
padding_ctrl: PaddingController,
padding_event_stream: PaddingEventStream,
bwd_rx: mpsc::Receiver<ReadyStreamMsg>,
fwd_events: mpsc::Receiver<F::CircEvent>,
memquota: &CircuitAccount,
) -> (Self, CircReactorHandle<F, B>) {
#[allow(clippy::disallowed_methods)]
let (backward_reactor_tx, forward_reactor_rx) = mpsc::channel(0);
let (control_tx, control_rx) = mpsc::unbounded();
let (command_tx, command_rx) = mpsc::unbounded();
let (fwd_control_tx, fwd_control_rx) = mpsc::unbounded();
let (fwd_command_tx, fwd_command_rx) = mpsc::unbounded();
let (bwd_control_tx, bwd_control_rx) = mpsc::unbounded();
let (bwd_command_tx, bwd_command_rx) = mpsc::unbounded();
let fwd_ctrl = ReactorCtrl::new(fwd_command_tx, fwd_control_tx);
let bwd_ctrl = ReactorCtrl::new(bwd_command_tx, bwd_control_tx);
let handle = CircReactorHandle {
control: control_tx,
command: command_tx,
time_provider: DynTimeProvider::new(runtime.clone()),
memquota: memquota.clone(),
};
let hops = Arc::clone(hop_mgr.hops());
let forward = ForwardReactor::new(
runtime.clone(),
unique_id,
forward_impl,
hop_mgr,
inbound_chan_rx,
fwd_control_rx,
fwd_command_rx,
backward_reactor_tx,
fwd_events,
padding_ctrl.clone(),
);
let backward = BackwardReactor::new(
runtime,
channel,
circ_id,
unique_id,
backward_impl,
hops,
forward_reactor_rx,
bwd_control_rx,
bwd_command_rx,
padding_ctrl,
padding_event_stream,
bwd_rx,
);
let reactor = Reactor {
unique_id,
forward: Some(forward),
backward: Some(backward),
control: control_rx,
command: command_rx,
fwd_ctrl,
bwd_ctrl,
};
(reactor, handle)
}
pub(crate) async fn run_inner(&mut self) -> StdResult<(), ReactorError> {
let (forward, backward) = (|| Some((self.forward.take()?, self.backward.take()?)))()
.expect("relay reactor spawned twice?!");
let mut forward = Box::pin(forward.run()).fuse();
let mut backward = Box::pin(backward.run()).fuse();
loop {
select_biased! {
res = self.command.next() => {
let Some(cmd) = res else {
trace!(
circ_id = %self.unique_id,
reason = "command channel drop",
"reactor shutdown",
);
return Err(ReactorError::Shutdown);
};
self.handle_command(cmd)?;
},
res = self.control.next() => {
let Some(msg) = res else {
trace!(
circ_id = %self.unique_id,
reason = "control channel drop",
"reactor shutdown",
);
return Err(ReactorError::Shutdown);
};
self.handle_control(msg)?;
},
res = forward => return Ok(res?),
res = backward => return Ok(res?),
}
}
}
fn handle_shutdown(&self) -> StdResult<(), ReactorError> {
trace!(
tunnel_id = %self.unique_id,
"reactor shutdown due to explicit request",
);
Err(ReactorError::Shutdown)
}
fn handle_command(
&mut self,
cmd: CtrlCmd<F::CtrlCmd, B::CtrlCmd>,
) -> StdResult<(), ReactorError> {
match cmd {
CtrlCmd::Forward(c) => self.fwd_ctrl.send_cmd(c),
CtrlCmd::Backward(c) => self.bwd_ctrl.send_cmd(c),
CtrlCmd::Shutdown => self.handle_shutdown(),
}
}
fn handle_control(
&mut self,
cmd: CtrlMsg<F::CtrlMsg, B::CtrlMsg>,
) -> StdResult<(), ReactorError> {
match cmd {
CtrlMsg::Forward(c) => self.fwd_ctrl.send_msg(c),
CtrlMsg::Backward(c) => self.bwd_ctrl.send_msg(c),
}
}
}
#[cfg(test)]
pub(crate) mod test {
#![allow(clippy::bool_assert_comparison)]
#![allow(clippy::clone_on_copy)]
#![allow(clippy::dbg_macro)]
#![allow(clippy::mixed_attributes_style)]
#![allow(clippy::print_stderr)]
#![allow(clippy::print_stdout)]
#![allow(clippy::single_char_pattern)]
#![allow(clippy::unwrap_used)]
#![allow(clippy::unchecked_time_subtraction)]
#![allow(clippy::useless_vec)]
#![allow(clippy::needless_pass_by_value)]
use tor_basic_utils::test_rng::testing_rng;
use tor_cell::chancell::{BoxedCellBody, msg as chanmsg};
use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId, msg as relaymsg};
use chanmsg::AnyChanMsg;
#[cfg(feature = "hs-service")]
use crate::client::stream::IncomingStreamRequestFilter;
pub(crate) fn rmsg_to_ccmsg(
id: Option<StreamId>,
msg: relaymsg::AnyRelayMsg,
early: bool,
) -> AnyChanMsg {
let rfmt = RelayCellFormat::V0;
let body: BoxedCellBody = AnyRelayMsgOuter::new(id, msg)
.encode(rfmt, &mut testing_rng())
.unwrap();
let chanmsg = chanmsg::Relay::from(body);
if early {
let chanmsg = chanmsg::RelayEarly::from(chanmsg);
AnyChanMsg::RelayEarly(chanmsg)
} else {
AnyChanMsg::Relay(chanmsg)
}
}
#[cfg(any(feature = "hs-service", feature = "relay"))]
pub(crate) struct AllowAllStreamsFilter;
#[cfg(any(feature = "hs-service", feature = "relay"))]
impl IncomingStreamRequestFilter for AllowAllStreamsFilter {
fn disposition(
&mut self,
_ctx: &crate::client::stream::IncomingStreamRequestContext<'_>,
_circ: &crate::circuit::CircHopSyncView<'_>,
) -> crate::Result<crate::client::stream::IncomingStreamRequestDisposition> {
Ok(crate::client::stream::IncomingStreamRequestDisposition::Accept)
}
}
}