use crate::channel::Channel;
use crate::circuit::UniqId;
use crate::circuit::cell_sender::CircuitCellSender;
use crate::circuit::reactor::ControlHandler;
use crate::circuit::reactor::circhop::CircHopList;
use crate::circuit::reactor::macros::derive_deftly_template_CircuitReactor;
use crate::circuit::reactor::stream::ReadyStreamMsg;
use crate::congestion::{CongestionControl, sendme};
use crate::crypto::cell::RelayCellBody;
use crate::util::err::ReactorError;
use crate::util::poll_all::PollAll;
use crate::{Error, HopNum, Result};
use crate::client::circuit::padding::{
self, PaddingController, PaddingEvent, PaddingEventStream, QueuedCellPaddingInfo,
};
use tor_cell::chancell::msg::{AnyChanMsg, Relay};
use tor_cell::chancell::{AnyChanCell, BoxedCellBody, ChanCmd, CircId};
use tor_cell::relaycell::msg::{Sendme, SendmeTag};
use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, RelayCmd};
use tor_error::internal;
use tor_rtcompat::{DynTimeProvider, Runtime};
use derive_deftly::Deftly;
use futures::SinkExt;
use futures::channel::mpsc;
use futures::{FutureExt as _, StreamExt, future, select_biased};
use tracing::{debug, trace};
use std::pin::Pin;
use std::result::Result as StdResult;
use std::sync::{Arc, Mutex, RwLock};
use crate::circuit::CircuitRxReceiver;
#[cfg(feature = "circ-padding")]
use crate::circuit::padding::{CircPaddingDisposition, padding_disposition};
#[cfg(feature = "relay")]
use tor_cell::relaycell::msg::Extended2;
#[derive(Deftly)]
#[derive_deftly(CircuitReactor)]
#[deftly(reactor_name = "backward reactor")]
#[deftly(run_inner_fn = "Self::run_once")]
#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
pub(super) struct BackwardReactor<B: BackwardHandler> {
time_provider: DynTimeProvider,
unique_id: UniqId,
circ_id: CircId,
channel: Arc<Channel>,
inner: B,
outbound_chan_rx: Option<CircuitRxReceiver>,
hops: Arc<RwLock<CircHopList>>,
inbound_chan_tx: CircuitCellSender,
command_rx: mpsc::UnboundedReceiver<CtrlCmd<B::CtrlCmd>>,
control_rx: mpsc::UnboundedReceiver<CtrlMsg<B::CtrlMsg>>,
forward_reactor_rx: mpsc::Receiver<BackwardReactorCmd>,
stream_rx: mpsc::Receiver<ReadyStreamMsg>,
padding_ctrl: PaddingController,
padding_event_stream: PaddingEventStream,
#[cfg(feature = "circ-padding")]
padding_block: Option<padding::StartBlocking>,
}
pub(crate) enum CtrlMsg<M> {
#[allow(unused)] Custom(M),
}
pub(crate) enum CtrlCmd<C> {
#[allow(unused)] Custom(C),
}
pub(crate) trait BackwardHandler: ControlHandler {
type CircChanMsg: TryFrom<AnyChanMsg, Error = crate::Error> + Send;
fn encrypt_relay_cell(
&mut self,
cmd: ChanCmd,
body: &mut RelayCellBody,
hop: Option<HopNum>,
) -> SendmeTag;
fn handle_backward_cell(
&mut self,
circ_id: UniqId,
cell: Self::CircChanMsg,
) -> StdResult<BackwardCellDisposition, ReactorError>;
}
pub(crate) enum BackwardCellDisposition {
Forward(AnyChanMsg),
}
#[allow(unused)] impl<B: BackwardHandler> BackwardReactor<B> {
#[allow(clippy::too_many_arguments)] pub(super) fn new<R: Runtime>(
runtime: R,
channel: &Arc<Channel>,
circ_id: CircId,
unique_id: UniqId,
inner: B,
hops: Arc<RwLock<CircHopList>>,
forward_reactor_rx: mpsc::Receiver<BackwardReactorCmd>,
control_rx: mpsc::UnboundedReceiver<CtrlMsg<B::CtrlMsg>>,
command_rx: mpsc::UnboundedReceiver<CtrlCmd<B::CtrlCmd>>,
padding_ctrl: PaddingController,
padding_event_stream: PaddingEventStream,
stream_rx: mpsc::Receiver<ReadyStreamMsg>,
) -> Self {
let channel = Arc::clone(channel);
let inbound_chan_tx = CircuitCellSender::from_channel_sender(channel.sender());
Self {
time_provider: DynTimeProvider::new(runtime),
outbound_chan_rx: None,
channel,
inner,
hops,
inbound_chan_tx,
unique_id,
circ_id,
forward_reactor_rx,
control_rx,
command_rx,
stream_rx,
padding_ctrl,
padding_event_stream,
#[cfg(feature = "circ-padding")]
padding_block: None,
}
}
async fn run_once(&mut self) -> StdResult<(), ReactorError> {
use postage::prelude::{Sink as _, Stream as _};
const PER_LOOP_EVENT_COUNT: usize = 3;
let mut poll_all =
PollAll::<PER_LOOP_EVENT_COUNT, Option<CircuitEvent<B::CircChanMsg>>>::new();
let backward_chan_ready = future::poll_fn(|cx| {
let _ = self.inbound_chan_tx.poll_flush_unpin(cx);
self.inbound_chan_tx.poll_ready_unpin(cx)
});
poll_all.push(async {
self.stream_rx.next().await.map(CircuitEvent::Send)
});
poll_all.push(async {
let event = match self.forward_reactor_rx.next().await {
Some(cmd) => CircuitEvent::Forwarded(cmd),
None => {
CircuitEvent::ForwardShutdown
}
};
Some(event)
});
poll_all.push(async {
let event = if let Some(outbound_chan_rx) = self.outbound_chan_rx.as_mut() {
match outbound_chan_rx.next().await {
Some(msg) => match msg.try_into() {
Err(e) => CircuitEvent::ProtoViolation(e),
Ok(cell) => CircuitEvent::Cell(cell),
},
None => {
CircuitEvent::ForwardShutdown
}
}
} else {
future::pending().await
};
Some(event)
});
let poll_all = async move {
let _ = backward_chan_ready.await;
poll_all.await
};
let events = select_biased! {
res = self.command_rx.next().fuse() => {
let cmd = res.ok_or_else(|| ReactorError::Shutdown)?;
self.handle_cmd(cmd)?;
return Ok(());
}
res = self.control_rx.next().fuse() => {
let msg = res.ok_or_else(|| ReactorError::Shutdown)?;
self.handle_msg(msg)?;
return Ok(());
}
res = self.padding_event_stream.next().fuse() => {
let event = res.ok_or_else(|| ReactorError::Shutdown)?;
cfg_if::cfg_if! {
if #[cfg(feature = "circ-padding")] {
self.run_padding_event(event).await?;
} else {
void::unreachable(event.0);
}
}
return Ok(())
}
res = poll_all.fuse() => res,
};
for event in events.into_iter().flatten() {
self.handle_event(event).await?;
}
Ok(())
}
fn handle_cmd(&mut self, cmd: CtrlCmd<B::CtrlCmd>) -> StdResult<(), ReactorError> {
match cmd {
CtrlCmd::Custom(c) => self.inner.handle_cmd(c),
}
}
fn handle_msg(&mut self, msg: CtrlMsg<B::CtrlMsg>) -> StdResult<(), ReactorError> {
match msg {
CtrlMsg::Custom(c) => self.inner.handle_msg(c),
}
}
#[cfg(feature = "circ-padding")]
async fn run_padding_event(
&mut self,
padding_event: PaddingEvent,
) -> StdResult<(), ReactorError> {
use PaddingEvent as E;
match padding_event {
E::SendPadding(send_padding) => {
self.send_padding(send_padding).await?;
}
E::StartBlocking(start_blocking) => {
self.start_blocking_for_padding(start_blocking);
}
E::StopBlocking => {
self.stop_blocking_for_padding();
}
}
Ok(())
}
#[cfg(feature = "circ-padding")]
async fn send_padding(&mut self, send_padding: padding::SendPadding) -> Result<()> {
use CircPaddingDisposition::*;
let target_hop = send_padding.hop;
match padding_disposition(
&send_padding,
&self.inbound_chan_tx,
self.padding_block.as_ref(),
) {
QueuePaddingNormally => {
let queue_info = self.padding_ctrl.queued_padding(target_hop, send_padding);
self.queue_padding_cell_for_hop(target_hop, queue_info)
.await?;
}
QueuePaddingAndBypass => {
let queue_info = self.padding_ctrl.queued_padding(target_hop, send_padding);
self.queue_padding_cell_for_hop(target_hop, queue_info)
.await?;
}
TreatQueuedCellAsPadding => {
self.padding_ctrl
.replaceable_padding_already_queued(target_hop, send_padding);
}
}
Ok(())
}
#[cfg(feature = "circ-padding")]
pub(super) fn start_blocking_for_padding(&mut self, block: padding::StartBlocking) {
self.inbound_chan_tx.start_blocking();
self.padding_block = Some(block);
}
#[cfg(feature = "circ-padding")]
pub(super) fn stop_blocking_for_padding(&mut self) {
self.inbound_chan_tx.stop_blocking();
self.padding_block = None;
}
#[cfg(feature = "circ-padding")]
async fn queue_padding_cell_for_hop(
&mut self,
target_hop: HopNum,
queue_info: Option<QueuedCellPaddingInfo>,
) -> Result<()> {
use tor_cell::relaycell::msg::Drop as DropMsg;
let msg = AnyRelayMsgOuter::new(None, DropMsg::default().into());
let hopnum = Some(target_hop);
let (relay_cell_format, ccontrol) = self.hop_info(hopnum)?;
self.send_relay_cell_inner(hopnum, relay_cell_format, msg, false, &ccontrol, queue_info)
.await
}
#[cfg(feature = "circ-padding")]
fn padding_disposition(&self, send_padding: &padding::SendPadding) -> CircPaddingDisposition {
crate::circuit::padding::padding_disposition(
send_padding,
&self.inbound_chan_tx,
self.padding_block.as_ref(),
)
}
async fn handle_event(
&mut self,
event: CircuitEvent<B::CircChanMsg>,
) -> StdResult<(), ReactorError> {
use CircuitEvent::*;
match event {
Cell(cell) => self.handle_backward_cell(cell).await,
Send(msg) => {
let ReadyStreamMsg {
hop,
relay_cell_format,
msg,
ccontrol,
} = msg;
self.send_relay_cell(hop, relay_cell_format, msg, false, &ccontrol)
.await?;
Ok(())
}
Forwarded(cmd) => self.handle_reactor_cmd(cmd).await,
ForwardShutdown => {
trace!(
circ_id = %self.unique_id,
"Backward relay reactor shutdown (forward reactor has closed)",
);
Err(ReactorError::Shutdown)
}
ProtoViolation(err) => Err(err.into()),
}
}
fn hop_info(
&self,
hopnum: Option<HopNum>,
) -> Result<(RelayCellFormat, Arc<Mutex<CongestionControl>>)> {
let hops = self.hops.read().expect("poisoned lock");
let hop = hops
.get(hopnum)
.ok_or_else(|| internal!("tried to send padding to non-existent hop?!"))?;
let relay_cell_format = hop.settings.relay_crypt_protocol().relay_cell_format();
let ccontrol = Arc::clone(&hop.ccontrol);
Ok((relay_cell_format, ccontrol))
}
async fn handle_reactor_cmd(&mut self, msg: BackwardReactorCmd) -> StdResult<(), ReactorError> {
use BackwardReactorCmd::*;
match msg {
SendRelayMsg { hop, msg } => {
self.send_relay_msg(hop, msg).await?;
}
HandleSendme { hop, sendme } => {
self.handle_sendme(hop, sendme).await?;
return Ok(());
}
#[cfg(feature = "relay")]
HandleCircuitExtended {
hop,
extended2,
outbound_chan_rx,
} => {
self.outbound_chan_rx = Some(outbound_chan_rx);
let msg = AnyRelayMsgOuter::new(None, extended2.into());
self.send_relay_msg(hop, msg).await?;
debug!(
circ_id = %self.unique_id,
"Extended circuit to the next hop"
);
}
}
Ok(())
}
async fn send_relay_msg(
&mut self,
hopnum: Option<HopNum>,
msg: AnyRelayMsgOuter,
) -> StdResult<(), ReactorError> {
let (relay_cell_format, ccontrol) = self.hop_info(hopnum)?;
let cmd = msg.cmd();
trace!(
circ_id = %self.unique_id,
hopnum=?hopnum,
cmd = %cmd,
"Sending backward cell"
);
self.send_relay_cell(hopnum, relay_cell_format, msg, false, &ccontrol)
.await?;
if cmd == RelayCmd::SENDME {
ccontrol.lock().expect("poisoned lock").note_sendme_sent();
}
Ok(())
}
async fn handle_sendme(
&mut self,
hopnum: Option<HopNum>,
sendme: Sendme,
) -> StdResult<(), ReactorError> {
let tag = sendme
.into_sendme_tag()
.ok_or_else(|| Error::CircProto("missing tag on circuit sendme".into()))?;
let signals = self.inbound_chan_tx.congestion_signals().await;
let hops = self.hops.read().expect("poisoned lock");
let hop = hops
.get(hopnum)
.ok_or_else(|| internal!("tried to send padding to non-existent hop?!"))?;
hop.ccontrol
.lock()
.expect("poisoned lock")
.note_sendme_received(&self.time_provider, tag, signals)?;
Ok(())
}
fn encode_relay_cell(
&mut self,
relay_format: RelayCellFormat,
hop: Option<HopNum>,
early: bool,
msg: AnyRelayMsgOuter,
) -> Result<(AnyChanMsg, SendmeTag)> {
let mut body: RelayCellBody = msg
.encode(relay_format, &mut rand::rng())
.map_err(|e| Error::from_cell_enc(e, "relay cell body"))?
.into();
let cmd = if early {
ChanCmd::RELAY_EARLY
} else {
ChanCmd::RELAY
};
let tag = self.inner.encrypt_relay_cell(cmd, &mut body, hop);
let msg = Relay::from(BoxedCellBody::from(body));
let msg = if early {
AnyChanMsg::RelayEarly(msg.into())
} else {
AnyChanMsg::Relay(msg)
};
Ok((msg, tag))
}
async fn send_relay_cell(
&mut self,
hop: Option<HopNum>,
relay_cell_format: RelayCellFormat,
msg: AnyRelayMsgOuter,
early: bool,
ccontrol: &Arc<Mutex<CongestionControl>>,
) -> Result<()> {
self.send_relay_cell_inner(hop, relay_cell_format, msg, early, ccontrol, None)
.await
}
async fn send_relay_cell_inner(
&mut self,
hop: Option<HopNum>,
relay_cell_format: RelayCellFormat,
msg: AnyRelayMsgOuter,
early: bool,
ccontrol: &Arc<Mutex<CongestionControl>>,
padding_info: Option<QueuedCellPaddingInfo>,
) -> Result<()> {
let c_t_w = sendme::cmd_counts_towards_windows(msg.cmd());
let (msg, tag) = self.encode_relay_cell(relay_cell_format, hop, early, msg)?;
let cell = AnyChanCell::new(Some(self.circ_id), msg);
let hop = hop.unwrap_or_else(|| HopNum::from(0));
let padding_info = padding_info.or_else(|| self.padding_ctrl.queued_data(hop));
Pin::new(&mut self.inbound_chan_tx)
.send_unbounded((cell, padding_info))
.await?;
if c_t_w {
ccontrol
.lock()
.expect("poisoned lock")
.note_data_sent(&self.time_provider, &tag)?;
}
Ok(())
}
async fn handle_backward_cell(&mut self, cell: B::CircChanMsg) -> StdResult<(), ReactorError> {
match self.inner.handle_backward_cell(self.unique_id, cell)? {
BackwardCellDisposition::Forward(cell) => {
let cell = AnyChanCell::new(Some(self.circ_id), cell);
self.inbound_chan_tx
.send((cell, None))
.await
.map_err(ReactorError::Err)
}
}
}
}
impl<B: BackwardHandler> Drop for BackwardReactor<B> {
fn drop(&mut self) {
let _ = self.channel.close_circuit(self.circ_id);
}
}
enum CircuitEvent<M> {
Cell(M),
Send(ReadyStreamMsg),
Forwarded(BackwardReactorCmd),
ForwardShutdown,
ProtoViolation(Error),
}
pub(crate) enum BackwardReactorCmd {
HandleSendme {
hop: Option<HopNum>,
sendme: Sendme,
},
SendRelayMsg {
hop: Option<HopNum>,
msg: AnyRelayMsgOuter,
},
#[cfg(feature = "relay")]
HandleCircuitExtended {
hop: Option<HopNum>,
extended2: Extended2,
outbound_chan_rx: CircuitRxReceiver,
},
}