mod extend_handler;
use extend_handler::ExtendRequestHandler;
use crate::channel::{Channel, ChannelSender};
use crate::circuit::CircuitRxReceiver;
use crate::circuit::UniqId;
use crate::circuit::reactor::ControlHandler;
use crate::circuit::reactor::backward::BackwardReactorCmd;
use crate::circuit::reactor::forward::{ForwardCellDisposition, ForwardHandler};
use crate::circuit::reactor::hop_mgr::HopMgr;
use crate::crypto::cell::OutboundRelayLayer;
use crate::crypto::cell::RelayCellBody;
use crate::relay::RelayCircChanMsg;
use crate::util::err::ReactorError;
use crate::{Error, HopNum, Result};
use crate::client::circuit::padding::QueuedCellPaddingInfo;
use crate::relay::channel_provider::ChannelProvider;
use crate::relay::reactor::CircuitAccount;
use tor_cell::chancell::msg::{AnyChanMsg, Destroy, PaddingNegotiate, Relay};
use tor_cell::chancell::{AnyChanCell, BoxedCellBody, ChanMsg, CircId};
use tor_cell::relaycell::msg::{Extended2, SendmeTag};
use tor_cell::relaycell::{RelayCellDecoderResult, RelayCellFormat, RelayCmd, UnparsedRelayMsg};
use tor_error::internal;
use tor_linkspec::OwnedChanTarget;
use tor_rtcompat::Runtime;
use futures::channel::mpsc;
use futures::{SinkExt as _, future};
use tracing::trace;
use std::result::Result as StdResult;
use std::sync::Arc;
use std::task::Poll;
type CtrlMsg = ();
type CtrlCmd = ();
const MAX_RELAY_EARLY_CELLS_PER_CIRCUIT: usize = 8;
pub(crate) struct Forward {
unique_id: UniqId,
outbound: Option<Outbound>,
crypto_out: Box<dyn OutboundRelayLayer + Send>,
relay_early_count: usize,
extend_handler: ExtendRequestHandler,
}
pub(crate) enum CircEvent {
ExtendResult(StdResult<ExtendResult, ReactorError>),
}
pub(crate) struct ExtendResult {
extended2: Extended2,
outbound: Outbound,
outbound_chan_rx: CircuitRxReceiver,
}
struct Outbound {
circ_id: CircId,
channel: Arc<Channel>,
outbound_chan_tx: ChannelSender,
}
enum CellDecodeResult {
Recognized(SendmeTag, RelayCellDecoderResult),
Unrecognizd(RelayCellBody),
}
impl Forward {
pub(crate) fn new(
inbound_chan: &Arc<Channel>,
unique_id: UniqId,
crypto_out: Box<dyn OutboundRelayLayer + Send>,
chan_provider: Arc<dyn ChannelProvider<BuildSpec = OwnedChanTarget> + Send + Sync>,
event_tx: mpsc::Sender<CircEvent>,
memquota: CircuitAccount,
) -> Self {
let inbound_peer = Arc::clone(inbound_chan.peer_info());
let extend_handler =
ExtendRequestHandler::new(unique_id, chan_provider, inbound_peer, event_tx, memquota);
Self {
unique_id,
outbound: None,
crypto_out,
relay_early_count: 0,
extend_handler,
}
}
fn decode_relay_cell<R: Runtime>(
&mut self,
hop_mgr: &mut HopMgr<R>,
cell: Relay,
) -> Result<(Option<HopNum>, CellDecodeResult)> {
let hopnum = None;
let cmd = cell.cmd();
let mut body = cell.into_relay_body().into();
let Some(tag) = self.crypto_out.decrypt_outbound(cmd, &mut body) else {
return Ok((hopnum, CellDecodeResult::Unrecognizd(body)));
};
let mut hops = hop_mgr.hops().write().expect("poisoned lock");
let decode_res = hops
.get_mut(hopnum)
.ok_or_else(|| internal!("msg from non-existent hop???"))?
.inbound
.decode(body.into())?;
Ok((hopnum, CellDecodeResult::Recognized(tag, decode_res)))
}
#[allow(clippy::unnecessary_wraps)] fn handle_drop(&mut self) -> StdResult<(), ReactorError> {
cfg_if::cfg_if! {
if #[cfg(feature = "circ-padding")] {
Err(internal!("relay circuit padding not yet supported").into())
} else {
Ok(())
}
}
}
fn handle_extend_result(
&mut self,
res: StdResult<ExtendResult, ReactorError>,
) -> StdResult<Option<BackwardReactorCmd>, ReactorError> {
let ExtendResult {
extended2,
outbound,
outbound_chan_rx,
} = res?;
self.outbound = Some(outbound);
Ok(Some(BackwardReactorCmd::HandleCircuitExtended {
hop: None,
extended2,
outbound_chan_rx,
}))
}
fn handle_relay_cell<R: Runtime>(
&mut self,
hop_mgr: &mut HopMgr<R>,
cell: Relay,
early: bool,
) -> StdResult<Option<ForwardCellDisposition>, ReactorError> {
if early {
self.relay_early_count += 1;
if self.relay_early_count > MAX_RELAY_EARLY_CELLS_PER_CIRCUIT {
return Err(
Error::CircProto("Circuit received too many RELAY_EARLY cells".into()).into(),
);
}
}
let (hopnum, res) = self.decode_relay_cell(hop_mgr, cell)?;
let (tag, decode_res) = match res {
CellDecodeResult::Unrecognizd(body) => {
self.handle_unrecognized_cell(body, None, early)?;
return Ok(None);
}
CellDecodeResult::Recognized(tag, res) => (tag, res),
};
Ok(Some(ForwardCellDisposition::HandleRecognizedRelay {
cell: decode_res,
early,
hopnum,
tag,
}))
}
fn handle_unrecognized_cell(
&mut self,
body: RelayCellBody,
info: Option<QueuedCellPaddingInfo>,
early: bool,
) -> StdResult<(), ReactorError> {
trace!(
circ_id = %self.unique_id,
"Forwarding unrecognized cell"
);
let Some(chan) = self.outbound.as_mut() else {
return Err(Error::CircProto(
"Asked to forward cell before the circuit was extended?!".into(),
)
.into());
};
let msg = Relay::from(BoxedCellBody::from(body));
let relay = if early {
AnyChanMsg::RelayEarly(msg.into())
} else {
AnyChanMsg::Relay(msg)
};
let cell = AnyChanCell::new(Some(chan.circ_id), relay);
chan.outbound_chan_tx.start_send_unpin((cell, info))?;
Ok(())
}
#[allow(clippy::unused_async)] async fn handle_truncate(&mut self) -> StdResult<(), ReactorError> {
Err(internal!("TRUNCATE is not implemented").into())
}
#[allow(clippy::needless_pass_by_value)] fn handle_destroy_cell(&mut self, _cell: Destroy) -> StdResult<(), ReactorError> {
Err(internal!("DESTROY is not implemented").into())
}
#[allow(clippy::needless_pass_by_value)] fn handle_padding_negotiate(&mut self, _cell: PaddingNegotiate) -> StdResult<(), ReactorError> {
Err(internal!("PADDING_NEGOTIATE is not implemented").into())
}
}
impl ForwardHandler for Forward {
type BuildSpec = OwnedChanTarget;
type CircChanMsg = RelayCircChanMsg;
type CircEvent = CircEvent;
async fn handle_meta_msg<R: Runtime>(
&mut self,
runtime: &R,
early: bool,
_hopnum: Option<HopNum>,
msg: UnparsedRelayMsg,
_relay_cell_format: RelayCellFormat,
) -> StdResult<(), ReactorError> {
match msg.cmd() {
RelayCmd::DROP => self.handle_drop(),
RelayCmd::EXTEND2 => self.extend_handler.handle_extend2(runtime, early, msg),
RelayCmd::TRUNCATE => self.handle_truncate().await,
cmd => Err(internal!("relay cmd {cmd} not supported").into()),
}
}
async fn handle_forward_cell<R: Runtime>(
&mut self,
hop_mgr: &mut HopMgr<R>,
cell: RelayCircChanMsg,
) -> StdResult<Option<ForwardCellDisposition>, ReactorError> {
use RelayCircChanMsg::*;
match cell {
Relay(r) => self.handle_relay_cell(hop_mgr, r, false),
RelayEarly(r) => self.handle_relay_cell(hop_mgr, r.into(), true),
Destroy(d) => {
self.handle_destroy_cell(d)?;
Ok(None)
}
PaddingNegotiate(p) => {
self.handle_padding_negotiate(p)?;
Ok(None)
}
}
}
fn handle_event(
&mut self,
event: Self::CircEvent,
) -> StdResult<Option<BackwardReactorCmd>, ReactorError> {
match event {
CircEvent::ExtendResult(res) => self.handle_extend_result(res),
}
}
async fn outbound_chan_ready(&mut self) -> Result<()> {
future::poll_fn(|cx| match &mut self.outbound {
Some(chan) => {
let _ = chan.outbound_chan_tx.poll_flush_unpin(cx);
chan.outbound_chan_tx.poll_ready_unpin(cx)
}
None => {
Poll::Ready(Ok(()))
}
})
.await
}
}
impl ControlHandler for Forward {
type CtrlMsg = CtrlMsg;
type CtrlCmd = CtrlCmd;
fn handle_cmd(&mut self, cmd: Self::CtrlCmd) -> StdResult<(), ReactorError> {
let () = cmd;
Ok(())
}
fn handle_msg(&mut self, msg: Self::CtrlMsg) -> StdResult<(), ReactorError> {
let () = msg;
Ok(())
}
}
impl Drop for Forward {
fn drop(&mut self) {
if let Some(outbound) = self.outbound.as_mut() {
let _ = outbound.channel.close_circuit(outbound.circ_id);
}
}
}