use crate::circuit::UniqId;
use crate::circuit::reactor::backward::BackwardReactorCmd;
use crate::circuit::reactor::hop_mgr::HopMgr;
use crate::circuit::reactor::macros::derive_deftly_template_CircuitReactor;
use crate::circuit::reactor::stream::StreamMsg;
use crate::circuit::reactor::{ControlHandler, ReactorResultChannel};
use crate::congestion::sendme;
use crate::stream::cmdcheck::AnyCmdChecker;
use crate::stream::msg_streamid;
use crate::util::err::ReactorError;
use crate::{Error, HopNum, Result};
#[cfg(any(feature = "hs-service", feature = "relay"))]
use crate::stream::incoming::{
IncomingStreamRequestFilter, IncomingStreamRequestHandler, StreamReqSender,
};
use crate::client::circuit::padding::PaddingController;
use tor_cell::chancell::msg::AnyChanMsg;
use tor_cell::relaycell::msg::{Sendme, SendmeTag};
use tor_cell::relaycell::{
AnyRelayMsgOuter, RelayCellDecoderResult, RelayCellFormat, RelayCmd, UnparsedRelayMsg,
};
use tor_error::internal;
use tor_linkspec::HasRelayIds;
use tor_rtcompat::Runtime;
use derive_deftly::Deftly;
use futures::SinkExt;
use futures::channel::mpsc;
use futures::{FutureExt as _, StreamExt, select_biased};
use tracing::debug;
use std::result::Result as StdResult;
use crate::circuit::CircuitRxReceiver;
#[derive(Deftly)]
#[derive_deftly(CircuitReactor)]
#[deftly(reactor_name = "forward reactor")]
#[deftly(run_inner_fn = "Self::run_once")]
#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
pub(super) struct ForwardReactor<R: Runtime, F: ForwardHandler> {
runtime: R,
unique_id: UniqId,
inner: F,
command_rx: mpsc::UnboundedReceiver<CtrlCmd<F::CtrlCmd>>,
control_rx: mpsc::UnboundedReceiver<CtrlMsg<F::CtrlMsg>>,
inbound_chan_rx: CircuitRxReceiver,
backward_reactor_tx: mpsc::Sender<BackwardReactorCmd>,
hop_mgr: HopMgr<R>,
circ_events: mpsc::Receiver<F::CircEvent>,
padding_ctrl: PaddingController,
}
pub(crate) enum CtrlCmd<C> {
#[cfg(any(feature = "hs-service", feature = "relay"))]
AwaitStreamRequests {
incoming_sender: StreamReqSender,
cmd_checker: AnyCmdChecker,
done: ReactorResultChannel<()>,
hop: Option<HopNum>,
filter: Box<dyn IncomingStreamRequestFilter>,
},
#[allow(unused)] Custom(C),
}
pub(crate) enum CtrlMsg<M> {
#[allow(unused)] Custom(M),
}
pub(crate) trait ForwardHandler: ControlHandler {
type BuildSpec: HasRelayIds;
type CircChanMsg: TryFrom<AnyChanMsg, Error = crate::Error>;
type CircEvent;
async fn handle_meta_msg<R: Runtime>(
&mut self,
runtime: &R,
early: bool,
hopnum: Option<HopNum>,
msg: UnparsedRelayMsg,
relay_cell_format: RelayCellFormat,
) -> StdResult<(), ReactorError>;
async fn handle_forward_cell<R: Runtime>(
&mut self,
hop_mgr: &mut HopMgr<R>,
cell: Self::CircChanMsg,
) -> StdResult<Option<ForwardCellDisposition>, ReactorError>;
fn handle_event(
&mut self,
event: Self::CircEvent,
) -> StdResult<Option<BackwardReactorCmd>, ReactorError>;
async fn outbound_chan_ready(&mut self) -> Result<()>;
}
pub(crate) enum ForwardCellDisposition {
HandleRecognizedRelay {
cell: RelayCellDecoderResult,
early: bool,
hopnum: Option<HopNum>,
tag: SendmeTag,
},
}
impl<R: Runtime, F: ForwardHandler> ForwardReactor<R, F> {
#[allow(clippy::too_many_arguments)] pub(super) fn new(
runtime: R,
unique_id: UniqId,
inner: F,
hop_mgr: HopMgr<R>,
inbound_chan_rx: CircuitRxReceiver,
control_rx: mpsc::UnboundedReceiver<CtrlMsg<F::CtrlMsg>>,
command_rx: mpsc::UnboundedReceiver<CtrlCmd<F::CtrlCmd>>,
backward_reactor_tx: mpsc::Sender<BackwardReactorCmd>,
circ_events: mpsc::Receiver<F::CircEvent>,
padding_ctrl: PaddingController,
) -> Self {
Self {
runtime,
unique_id,
inbound_chan_rx,
control_rx,
command_rx,
inner,
backward_reactor_tx,
hop_mgr,
circ_events,
padding_ctrl,
}
}
async fn run_once(&mut self) -> StdResult<(), ReactorError> {
let outbound_chan_ready = self.inner.outbound_chan_ready();
let inbound_chan_rx_fut = async {
outbound_chan_ready.await?;
Ok(self.inbound_chan_rx.next().await)
};
select_biased! {
res = self.command_rx.next().fuse() => {
let cmd = res.ok_or_else(|| ReactorError::Shutdown)?;
self.handle_cmd(cmd)
}
res = self.control_rx.next().fuse() => {
let msg = res.ok_or_else(|| ReactorError::Shutdown)?;
self.handle_msg(msg)
}
res = self.circ_events.next().fuse() => {
let ev = res.ok_or_else(|| ReactorError::Shutdown)?;
if let Some(cmd) = self.inner.handle_event(ev)? {
self.send_reactor_cmd(cmd).await?;
}
Ok(())
}
res = inbound_chan_rx_fut.fuse() => {
let cell = res.map_err(ReactorError::Err)?;
let Some(cell) = cell else {
debug!(
circ_id = %self.unique_id,
"Backward channel has closed, shutting down forward relay reactor",
);
return Err(ReactorError::Shutdown);
};
let cell: F::CircChanMsg = cell.try_into()?;
let Some(disp) = self.inner.handle_forward_cell(&mut self.hop_mgr, cell).await? else {
return Ok(());
};
match disp {
ForwardCellDisposition::HandleRecognizedRelay { cell, early, hopnum, tag } => {
self.handle_relay_cell(cell, early, hopnum, tag).await
}
}
},
}
}
fn handle_cmd(&mut self, cmd: CtrlCmd<F::CtrlCmd>) -> StdResult<(), ReactorError> {
match cmd {
#[cfg(any(feature = "hs-service", feature = "relay"))]
CtrlCmd::AwaitStreamRequests {
incoming_sender,
cmd_checker,
done,
hop,
filter,
} => {
let handler = IncomingStreamRequestHandler {
incoming_sender,
cmd_checker,
hop_num: hop,
filter,
};
let ret = self.hop_mgr.set_incoming_handler(handler);
let _ = done.send(ret); Ok(())
}
CtrlCmd::Custom(c) => self.inner.handle_cmd(c),
}
}
fn handle_msg(&mut self, msg: CtrlMsg<F::CtrlMsg>) -> StdResult<(), ReactorError> {
match msg {
CtrlMsg::Custom(c) => self.inner.handle_msg(c),
}
}
fn note_relay_cell_received(
&self,
hopnum: Option<HopNum>,
c_t_w: bool,
) -> Result<(RelayCellFormat, bool)> {
let mut hops = self.hop_mgr.hops().write().expect("poisoned lock");
let hop = hops
.get_mut(hopnum)
.ok_or_else(|| internal!("msg from non-existent hop???"))?;
hop.inbound.decrement_cell_limit()?;
let send_circ_sendme = if c_t_w {
hop.ccontrol
.lock()
.expect("poisoned lock")
.note_data_received()?
} else {
false
};
let relay_cell_format = hop.settings.relay_crypt_protocol().relay_cell_format();
Ok((relay_cell_format, send_circ_sendme))
}
async fn handle_relay_cell(
&mut self,
decode_res: RelayCellDecoderResult,
early: bool,
hopnum: Option<HopNum>,
tag: SendmeTag,
) -> StdResult<(), ReactorError> {
let hopnum_padding = hopnum.unwrap_or_else(|| HopNum::from(0));
if decode_res.is_padding() {
self.padding_ctrl.decrypted_padding(hopnum_padding)?;
} else {
self.padding_ctrl.decrypted_data(hopnum_padding);
}
let c_t_w = decode_res.cmds().any(sendme::cmd_counts_towards_windows);
let (relay_cell_format, send_circ_sendme) = self.note_relay_cell_received(hopnum, c_t_w)?;
if send_circ_sendme {
let sendme = Sendme::from(tag);
let msg = AnyRelayMsgOuter::new(None, sendme.into());
let forward = BackwardReactorCmd::SendRelayMsg { hop: hopnum, msg };
self.send_reactor_cmd(forward).await?;
}
let (mut msgs, incomplete) = decode_res.into_parts();
while let Some(msg) = msgs.next() {
match self
.handle_relay_msg(early, hopnum, msg, relay_cell_format, c_t_w)
.await
{
Ok(()) => continue,
Err(e) => {
for m in msgs {
debug!(
circ_id = %self.unique_id,
"Ignoring relay msg received after triggering shutdown: {m:?}",
);
}
if let Some(incomplete) = incomplete {
debug!(
circ_id = %self.unique_id,
"Ignoring partial relay msg received after triggering shutdown: {:?}",
incomplete,
);
}
return Err(e);
}
}
}
Ok(())
}
async fn handle_relay_msg(
&mut self,
early: bool,
hop: Option<HopNum>,
msg: UnparsedRelayMsg,
relay_cell_format: RelayCellFormat,
cell_counts_toward_windows: bool,
) -> StdResult<(), ReactorError> {
let streamid = msg_streamid(&msg)?;
let Some(sid) = streamid else {
return self
.handle_meta_msg(early, hop, msg, relay_cell_format)
.await;
};
let msg = StreamMsg {
sid,
msg,
cell_counts_toward_windows,
};
self.hop_mgr.send(hop, msg).await
}
async fn handle_meta_msg(
&mut self,
early: bool,
hopnum: Option<HopNum>,
msg: UnparsedRelayMsg,
relay_cell_format: RelayCellFormat,
) -> StdResult<(), ReactorError> {
match msg.cmd() {
RelayCmd::SENDME => {
let sendme = msg
.decode::<Sendme>()
.map_err(|e| Error::from_bytes_err(e, "sendme message"))?
.into_msg();
let cmd = BackwardReactorCmd::HandleSendme {
hop: hopnum,
sendme,
};
self.send_reactor_cmd(cmd).await
}
_ => {
self.inner
.handle_meta_msg(&self.runtime, early, hopnum, msg, relay_cell_format)
.await
}
}
}
async fn send_reactor_cmd(
&mut self,
forward: BackwardReactorCmd,
) -> StdResult<(), ReactorError> {
self.backward_reactor_tx.send(forward).await.map_err(|_| {
ReactorError::Shutdown
})
}
}