pub(crate) mod circuit;
mod conflux;
mod control;
use crate::circuit::circhop::SendRelayCell;
use crate::circuit::{CircuitRxReceiver, UniqId};
use crate::client::circuit::ClientCircChanMsg;
use crate::client::circuit::padding::{PaddingController, PaddingEvent, PaddingEventStream};
use crate::client::{HopLocation, TargetHop};
use crate::crypto::cell::HopNum;
use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
use crate::memquota::CircuitAccount;
use crate::stream::CloseStreamBehavior;
use crate::streammap;
use crate::tunnel::{TunnelId, TunnelScopedCircId};
use crate::util::err::ReactorError;
use crate::util::skew::ClockSkew;
use crate::util::timeout::TimeoutEstimator;
use crate::{Error, Result};
use circuit::Circuit;
use conflux::ConfluxSet;
use control::ControlHandler;
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
use tor_cell::relaycell::msg::Sendme;
use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId, UnparsedRelayMsg};
use tor_error::{Bug, bad_api_usage, debug_report, internal, into_bad_api_usage};
use tor_rtcompat::{DynTimeProvider, SleepProvider};
use cfg_if::cfg_if;
use futures::StreamExt;
use futures::channel::mpsc;
use futures::{FutureExt as _, select_biased};
use oneshot_fused_workaround as oneshot;
use std::result::Result as StdResult;
use std::sync::Arc;
use std::time::Duration;
use crate::channel::Channel;
use crate::conflux::msghandler::RemoveLegReason;
use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey};
use circuit::CircuitCmd;
use derive_more::From;
use smallvec::smallvec;
use tor_cell::chancell::CircId;
use tor_llcrypto::pk;
use tracing::{debug, info, instrument, trace, warn};
use super::circuit::{MutableState, TunnelMutableState};
use crate::circuit::reactor::ReactorResultChannel;
#[cfg(feature = "hs-service")]
use crate::stream::incoming::IncomingStreamRequestHandler;
#[cfg(feature = "conflux")]
use {
crate::conflux::msghandler::{ConfluxCmd, OooRelayMsg},
crate::util::err::ConfluxHandshakeError,
};
pub(super) use control::{CtrlCmd, CtrlMsg, FlowCtrlMsg};
#[cfg(feature = "conflux")]
pub(super) type ConfluxHandshakeResult = Vec<StdResult<(), ConfluxHandshakeError>>;
#[cfg(feature = "conflux")]
pub(super) type ConfluxLinkResultChannel = ReactorResultChannel<ConfluxHandshakeResult>;
#[derive(Clone, Debug)]
pub(crate) enum CircuitHandshake {
CreateFast,
Ntor {
public_key: NtorPublicKey,
ed_identity: pk::ed25519::Ed25519Identity,
},
NtorV3 {
public_key: NtorV3PublicKey,
},
}
#[derive(From, Debug)]
#[allow(clippy::large_enum_variant)] enum RunOnceCmd {
Single(RunOnceCmdInner),
Multiple(Vec<RunOnceCmdInner>),
}
#[derive(educe::Educe)]
#[educe(Debug)]
enum RunOnceCmdInner {
Send {
leg: UniqId,
cell: SendRelayCell,
done: Option<ReactorResultChannel<()>>,
},
#[cfg(feature = "send-control-msg")]
SendMsgAndInstallHandler {
msg: Option<AnyRelayMsgOuter>,
#[educe(Debug(ignore))]
handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
done: oneshot::Sender<Result<()>>,
},
HandleSendMe {
leg: UniqId,
hop: HopNum,
sendme: Sendme,
},
BeginStream {
cell: Result<(SendRelayCell, StreamId)>,
hop: HopLocation,
leg: UniqId,
done: ReactorResultChannel<(StreamId, HopLocation, RelayCellFormat)>,
},
MaybeSendXon {
rate: XonKbpsEwma,
stream_id: StreamId,
hop: HopLocation,
},
CloseStream {
hop: HopLocation,
sid: StreamId,
behav: CloseStreamBehavior,
reason: streammap::TerminateReason,
done: Option<ReactorResultChannel<()>>,
},
FirstHopClockSkew {
answer: oneshot::Sender<StdResult<ClockSkew, Bug>>,
},
RemoveLeg {
leg: UniqId,
reason: RemoveLegReason,
},
#[cfg(feature = "conflux")]
ConfluxHandshakeComplete {
leg: UniqId,
cell: SendRelayCell,
},
#[cfg(feature = "conflux")]
Link {
#[educe(Debug(ignore))]
circuits: Vec<Circuit>,
answer: ConfluxLinkResultChannel,
},
#[cfg(feature = "conflux")]
Enqueue {
leg: UniqId,
msg: OooRelayMsg,
},
#[cfg(feature = "circ-padding")]
PaddingAction {
leg: UniqId,
padding_event: PaddingEvent,
},
CleanShutdown,
}
impl RunOnceCmdInner {
fn from_circuit_cmd(leg: UniqId, cmd: CircuitCmd) -> Self {
match cmd {
CircuitCmd::Send(cell) => Self::Send {
leg,
cell,
done: None,
},
CircuitCmd::HandleSendMe { hop, sendme } => Self::HandleSendMe { leg, hop, sendme },
CircuitCmd::CloseStream {
hop,
sid,
behav,
reason,
} => Self::CloseStream {
hop: HopLocation::Hop((leg, hop)),
sid,
behav,
reason,
done: None,
},
#[cfg(feature = "conflux")]
CircuitCmd::Conflux(ConfluxCmd::RemoveLeg(reason)) => Self::RemoveLeg { leg, reason },
#[cfg(feature = "conflux")]
CircuitCmd::Conflux(ConfluxCmd::HandshakeComplete { hop, early, cell }) => {
let cell = SendRelayCell {
hop: Some(hop),
early,
cell,
};
Self::ConfluxHandshakeComplete { leg, cell }
}
#[cfg(feature = "conflux")]
CircuitCmd::Enqueue(msg) => Self::Enqueue { leg, msg },
CircuitCmd::CleanShutdown => Self::CleanShutdown,
}
}
}
#[derive(From, Debug)]
#[allow(clippy::large_enum_variant)] enum CircuitEvent {
RunCmd {
leg: UniqId,
cmd: CircuitCmd,
},
HandleControl(CtrlMsg),
HandleCell {
leg: UniqId,
cell: ClientCircChanMsg,
},
RemoveLeg {
leg: UniqId,
reason: RemoveLegReason,
},
PaddingAction {
leg: UniqId,
padding_event: PaddingEvent,
},
ProtoViolation {
err: crate::Error,
},
}
impl CircuitEvent {
fn order_within_batch(&self) -> u8 {
use CircuitEvent as CA;
use PaddingEvent as PE;
const IMMEDIATE: u8 = 0;
const EARLY: u8 = 1;
const NORMAL: u8 = 2;
const LATE: u8 = 3;
match self {
CA::RunCmd { .. } => NORMAL,
CA::HandleControl(..) => NORMAL,
CA::HandleCell { .. } => NORMAL,
CA::RemoveLeg { .. } => NORMAL,
#[cfg(feature = "circ-padding")]
CA::PaddingAction { padding_event, .. } => match padding_event {
PE::StopBlocking => EARLY,
PE::SendPadding(..) => NORMAL,
PE::StartBlocking(..) => LATE,
},
#[cfg(not(feature = "circ-padding"))]
CA::PaddingAction { .. } => NORMAL,
CA::ProtoViolation { .. } => IMMEDIATE,
}
}
}
pub(crate) trait MetaCellHandler: Send {
fn expected_hop(&self) -> HopLocation;
fn handle_msg(
&mut self,
msg: UnparsedRelayMsg,
reactor: &mut Circuit,
) -> Result<MetaCellDisposition>;
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "send-control-msg", visibility::make(pub))]
#[non_exhaustive]
pub(crate) enum MetaCellDisposition {
#[cfg(feature = "send-control-msg")]
Consumed,
ConversationFinished,
#[cfg(feature = "send-control-msg")]
CloseCirc,
}
macro_rules! unwrap_or_shutdown {
($self:expr, $res:expr, $reason:expr) => {{
match $res {
None => {
trace!(
tunnel_id = %$self.tunnel_id,
reason = %$reason,
"reactor shutdown"
);
Err(ReactorError::Shutdown)
}
Some(v) => Ok(v),
}
}};
}
#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
pub struct Reactor {
control: mpsc::UnboundedReceiver<CtrlMsg>,
command: mpsc::UnboundedReceiver<CtrlCmd>,
#[allow(dead_code)] reactor_closed_tx: oneshot::Sender<void::Void>,
circuits: ConfluxSet,
tunnel_id: TunnelId,
cell_handlers: CellHandlers,
runtime: DynTimeProvider,
#[cfg(feature = "conflux")]
conflux_hs_ctx: Option<ConfluxHandshakeCtx>,
#[cfg(feature = "conflux")]
ooo_msgs: BinaryHeap<ConfluxHeapEntry>,
}
#[cfg(feature = "conflux")]
struct ConfluxHandshakeCtx {
answer: ConfluxLinkResultChannel,
num_legs: usize,
results: ConfluxHandshakeResult,
}
#[derive(Debug)]
#[cfg(feature = "conflux")]
struct ConfluxHeapEntry {
leg_id: UniqId,
msg: OooRelayMsg,
}
#[cfg(feature = "conflux")]
impl Ord for ConfluxHeapEntry {
fn cmp(&self, other: &Self) -> Ordering {
self.msg.cmp(&other.msg)
}
}
#[cfg(feature = "conflux")]
impl PartialOrd for ConfluxHeapEntry {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
#[cfg(feature = "conflux")]
impl PartialEq for ConfluxHeapEntry {
fn eq(&self, other: &Self) -> bool {
self.msg == other.msg
}
}
#[cfg(feature = "conflux")]
impl Eq for ConfluxHeapEntry {}
struct CellHandlers {
meta_handler: Option<Box<dyn MetaCellHandler + Send>>,
#[cfg(feature = "hs-service")]
incoming_stream_req_handler: Option<IncomingStreamRequestHandler>,
}
impl Reactor {
#[allow(clippy::type_complexity, clippy::too_many_arguments)] pub(super) fn new(
channel: Arc<Channel>,
channel_id: CircId,
unique_id: UniqId,
input: CircuitRxReceiver,
runtime: DynTimeProvider,
memquota: CircuitAccount,
padding_ctrl: PaddingController,
padding_stream: PaddingEventStream,
timeouts: Arc<dyn TimeoutEstimator + Send>,
) -> (
Self,
mpsc::UnboundedSender<CtrlMsg>,
mpsc::UnboundedSender<CtrlCmd>,
oneshot::Receiver<void::Void>,
Arc<TunnelMutableState>,
) {
let tunnel_id = TunnelId::next();
let (control_tx, control_rx) = mpsc::unbounded();
let (command_tx, command_rx) = mpsc::unbounded();
let mutable = Arc::new(MutableState::default());
let (reactor_closed_tx, reactor_closed_rx) = oneshot::channel();
let cell_handlers = CellHandlers {
meta_handler: None,
#[cfg(feature = "hs-service")]
incoming_stream_req_handler: None,
};
let unique_id = TunnelScopedCircId::new(tunnel_id, unique_id);
let circuit_leg = Circuit::new(
runtime.clone(),
channel,
channel_id,
unique_id,
input,
memquota,
Arc::clone(&mutable),
padding_ctrl,
padding_stream,
timeouts,
);
let (circuits, mutable) = ConfluxSet::new(tunnel_id, circuit_leg);
let reactor = Reactor {
circuits,
control: control_rx,
command: command_rx,
reactor_closed_tx,
tunnel_id,
cell_handlers,
runtime,
#[cfg(feature = "conflux")]
conflux_hs_ctx: None,
#[cfg(feature = "conflux")]
ooo_msgs: Default::default(),
};
(reactor, control_tx, command_tx, reactor_closed_rx, mutable)
}
#[instrument(level = "trace", skip_all)]
pub async fn run(mut self) -> Result<()> {
trace!(tunnel_id = %self.tunnel_id, "Running tunnel reactor");
let result: Result<()> = loop {
match self.run_once().await {
Ok(()) => (),
Err(ReactorError::Shutdown) => break Ok(()),
Err(ReactorError::Err(e)) => break Err(e),
}
};
const MSG: &str = "Tunnel reactor stopped";
match &result {
Ok(()) => trace!(tunnel_id = %self.tunnel_id, "{MSG}"),
Err(e) => debug_report!(e, tunnel_id = %self.tunnel_id, "{MSG}"),
}
result
}
#[instrument(level = "trace", skip_all)]
async fn run_once(&mut self) -> StdResult<(), ReactorError> {
if self.circuits.is_empty() {
trace!(
tunnel_id = %self.tunnel_id,
"Tunnel reactor shutting down: all circuits have closed",
);
return Err(ReactorError::Shutdown);
}
let single_path_with_hops = self
.circuits
.single_leg_mut()
.is_ok_and(|leg| !leg.has_hops());
if single_path_with_hops {
self.wait_for_create().await?;
return Ok(());
}
#[cfg(feature = "conflux")]
self.try_dequeue_ooo_msgs().await?;
let mut events = select_biased! {
res = self.command.next() => {
let cmd = unwrap_or_shutdown!(self, res, "command channel drop")?;
return ControlHandler::new(self).handle_cmd(cmd);
},
ret = self.control.next() => {
let msg = unwrap_or_shutdown!(self, ret, "control drop")?;
smallvec![CircuitEvent::HandleControl(msg)]
},
res = self.circuits.next_circ_event(&self.runtime).fuse() => res?,
};
events.sort_by_key(|a| a.order_within_batch());
for event in events {
let cmd = match event {
CircuitEvent::RunCmd { leg, cmd } => Some(RunOnceCmd::Single(
RunOnceCmdInner::from_circuit_cmd(leg, cmd),
)),
CircuitEvent::HandleControl(ctrl) => ControlHandler::new(self)
.handle_msg(ctrl)?
.map(RunOnceCmd::Single),
CircuitEvent::HandleCell { leg, cell } => {
let circ = self
.circuits
.leg_mut(leg)
.ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
let circ_cmds = circ.handle_cell(&mut self.cell_handlers, leg, cell)?;
if circ_cmds.is_empty() {
None
} else {
let cmd = RunOnceCmd::Multiple(
circ_cmds
.into_iter()
.map(|cmd| RunOnceCmdInner::from_circuit_cmd(leg, cmd))
.collect(),
);
Some(cmd)
}
}
CircuitEvent::RemoveLeg { leg, reason } => {
Some(RunOnceCmdInner::RemoveLeg { leg, reason }.into())
}
CircuitEvent::PaddingAction { leg, padding_event } => {
cfg_if! {
if #[cfg(feature = "circ-padding")] {
Some(RunOnceCmdInner::PaddingAction { leg, padding_event }.into())
} else {
void::unreachable(padding_event.0);
}
}
}
CircuitEvent::ProtoViolation { err } => {
return Err(err.into());
}
};
if let Some(cmd) = cmd {
self.handle_run_once_cmd(cmd).await?;
}
}
Ok(())
}
#[cfg(feature = "conflux")]
#[instrument(level = "trace", skip_all)]
async fn try_dequeue_ooo_msgs(&mut self) -> StdResult<(), ReactorError> {
while let Some(entry) = self.ooo_msgs.peek() {
let should_pop = self.circuits.is_seqno_in_order(entry.msg.seqno);
if !should_pop {
break;
}
let entry = self.ooo_msgs.pop().expect("item just disappeared?!");
let circ = self
.circuits
.leg_mut(entry.leg_id)
.ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
let handlers = &mut self.cell_handlers;
let cmd = circ
.handle_in_order_relay_msg(
handlers,
entry.msg.hopnum,
entry.leg_id,
entry.msg.cell_counts_towards_windows,
entry.msg.streamid,
entry.msg.msg,
)?
.map(|cmd| {
RunOnceCmd::Single(RunOnceCmdInner::from_circuit_cmd(entry.leg_id, cmd))
});
if let Some(cmd) = cmd {
self.handle_run_once_cmd(cmd).await?;
}
}
Ok(())
}
#[instrument(level = "trace", skip_all)]
async fn handle_run_once_cmd(&mut self, cmd: RunOnceCmd) -> StdResult<(), ReactorError> {
match cmd {
RunOnceCmd::Single(cmd) => return self.handle_single_run_once_cmd(cmd).await,
RunOnceCmd::Multiple(cmds) => {
for cmd in cmds {
self.handle_single_run_once_cmd(cmd).await?;
}
}
}
Ok(())
}
#[instrument(level = "trace", skip_all)]
async fn handle_single_run_once_cmd(
&mut self,
cmd: RunOnceCmdInner,
) -> StdResult<(), ReactorError> {
match cmd {
RunOnceCmdInner::Send { leg, cell, done } => {
let res = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
if let Some(done) = done {
let _ = done.send(res.clone());
}
res?;
}
#[cfg(feature = "send-control-msg")]
RunOnceCmdInner::SendMsgAndInstallHandler { msg, handler, done } => {
let cell: Result<Option<SendRelayCell>> =
self.prepare_msg_and_install_handler(msg, handler);
match cell {
Ok(Some(cell)) => {
let outcome = self.circuits.send_relay_cell_on_leg(cell, None).await;
let _ = done.send(outcome.clone());
outcome?;
}
Ok(None) => {
let _ = done.send(Ok(()));
}
Err(e) => {
let _ = done.send(Err(e.clone()));
return Err(e.into());
}
}
}
RunOnceCmdInner::BeginStream {
leg,
cell,
hop,
done,
} => {
match cell {
Ok((cell, stream_id)) => {
let circ = self
.circuits
.leg_mut(leg)
.ok_or_else(|| internal!("leg disappeared?!"))?;
let cell_hop = cell.hop.expect("missing hop in client SendRelayCell?!");
let relay_format = circ
.hop_mut(cell_hop)
.ok_or(Error::NoSuchHop)?
.relay_cell_format();
let outcome = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
let _ = done.send(outcome.clone().map(|_| (stream_id, hop, relay_format)));
outcome?;
}
Err(e) => {
let _ = done.send(Err(e.clone()));
return Err(e.into());
}
}
}
RunOnceCmdInner::CloseStream {
hop,
sid,
behav,
reason,
done,
} => {
let result = {
let (leg_id, hop_num) = self
.resolve_hop_location(hop)
.map_err(into_bad_api_usage!("Could not resolve {hop:?}"))?;
let leg = self
.circuits
.leg_mut(leg_id)
.ok_or(bad_api_usage!("No leg for id {:?}", leg_id))?;
Ok::<_, Bug>((leg, hop_num))
};
let (leg, hop_num) = match result {
Ok(x) => x,
Err(e) => {
if let Some(done) = done {
let e = into_bad_api_usage!("Could not resolve {hop:?}")(e);
let _ = done.send(Err(e.into()));
}
return Ok(());
}
};
let max_rtt = {
let hop = leg
.hop(hop_num)
.ok_or_else(|| internal!("the hop we resolved disappeared?!"))?;
let ccontrol = hop.ccontrol();
ccontrol
.rtt()
.max_rtt_usec()
.map(|rtt| Duration::from_millis(u64::from(rtt)))
.unwrap_or_default()
};
let circ_len = usize::from(hop_num) + 1;
let timeout = std::cmp::max(max_rtt, 2 * leg.estimate_cbt(circ_len));
let expire_at = self.runtime.now() + timeout;
let res: Result<()> = leg
.close_stream(hop_num, sid, behav, reason, expire_at)
.await;
if let Some(done) = done {
let _ = done.send(res);
}
}
RunOnceCmdInner::MaybeSendXon {
rate,
stream_id,
hop,
} => {
let (leg_id, hop_num) = match self.resolve_hop_location(hop) {
Ok(x) => x,
Err(NoJoinPointError) => {
return Err(
internal!(
"Could not send an XON message to a join point on a tunnel without a join point",
)
.into()
);
}
};
let Some(leg) = self.circuits.leg_mut(leg_id) else {
debug!("Could not send an XON message on a leg that does not exist. Ignoring.");
return Ok(());
};
let Some(hop) = leg.hop_mut(hop_num) else {
debug!("Could not send an XON message on a hop that does not exist. Ignoring.");
return Ok(());
};
let Some(msg) = hop.maybe_send_xon(rate, stream_id)? else {
return Ok(());
};
let cell = AnyRelayMsgOuter::new(Some(stream_id), msg.into());
let cell = SendRelayCell {
hop: Some(hop_num),
early: false,
cell,
};
leg.send_relay_cell(cell).await?;
}
RunOnceCmdInner::HandleSendMe { leg, hop, sendme } => {
let leg = self
.circuits
.leg_mut(leg)
.ok_or_else(|| internal!("leg disappeared?!"))?;
let signals = leg.chan_sender.congestion_signals().await;
leg.handle_sendme(hop, sendme, signals)?;
}
RunOnceCmdInner::FirstHopClockSkew { answer } => {
let res = self.circuits.single_leg_mut().map(|leg| leg.clock_skew());
let _ = answer.send(res.map_err(Into::into));
}
RunOnceCmdInner::CleanShutdown => {
trace!(tunnel_id = %self.tunnel_id, "reactor shutdown due to handled cell");
return Err(ReactorError::Shutdown);
}
RunOnceCmdInner::RemoveLeg { leg, reason } => {
warn!(tunnel_id = %self.tunnel_id, reason = %reason, "removing circuit leg");
let circ = self.circuits.remove(leg)?;
let is_conflux_pending = circ.is_conflux_pending();
drop(circ);
#[cfg(feature = "conflux")]
if is_conflux_pending {
let (error, proto_violation): (_, Option<Error>) = match &reason {
RemoveLegReason::ConfluxHandshakeTimeout => {
(ConfluxHandshakeError::Timeout, None)
}
RemoveLegReason::ConfluxHandshakeErr(e) => {
(ConfluxHandshakeError::Link(e.clone()), Some(e.clone()))
}
RemoveLegReason::ChannelClosed => {
(ConfluxHandshakeError::ChannelClosed, None)
}
};
self.note_conflux_handshake_result(Err(error), proto_violation.is_some())?;
if let Some(e) = proto_violation {
tor_error::warn_report!(
e,
tunnel_id = %self.tunnel_id,
"Malformed conflux handshake, tearing down tunnel",
);
return Err(e.into());
}
}
}
#[cfg(feature = "conflux")]
RunOnceCmdInner::ConfluxHandshakeComplete { leg, cell } => {
self.note_conflux_handshake_result(Ok(()), false)?;
let res = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
res?;
}
#[cfg(feature = "conflux")]
RunOnceCmdInner::Link { circuits, answer } => {
self.handle_link_circuits(circuits, answer).await?;
}
#[cfg(feature = "conflux")]
RunOnceCmdInner::Enqueue { leg, msg } => {
let entry = ConfluxHeapEntry { leg_id: leg, msg };
self.ooo_msgs.push(entry);
}
#[cfg(feature = "circ-padding")]
RunOnceCmdInner::PaddingAction { leg, padding_event } => {
self.circuits.run_padding_event(leg, padding_event).await?;
}
}
Ok(())
}
#[instrument(level = "trace", skip_all)]
async fn wait_for_create(&mut self) -> StdResult<(), ReactorError> {
let msg = select_biased! {
res = self.command.next() => {
let cmd = unwrap_or_shutdown!(self, res, "shutdown channel drop")?;
match cmd {
CtrlCmd::Shutdown => return self.handle_shutdown().map(|_| ()),
#[cfg(test)]
CtrlCmd::AddFakeHop {
relay_cell_format: format,
fwd_lasthop,
rev_lasthop,
peer_id,
params,
done,
} => {
let leg = self.circuits.single_leg_mut()?;
leg.handle_add_fake_hop(format, fwd_lasthop, rev_lasthop, peer_id, ¶ms, done);
return Ok(())
},
_ => {
trace!("reactor shutdown due to unexpected command: {:?}", cmd);
return Err(Error::CircProto(format!("Unexpected control {cmd:?} on client circuit")).into());
}
}
},
res = self.control.next() => unwrap_or_shutdown!(self, res, "control drop")?,
};
match msg {
CtrlMsg::Create {
recv_created,
handshake,
settings,
done,
} => {
let leg = self.circuits.single_leg_mut()?;
leg.handle_create(recv_created, handshake, settings, done)
.await
}
_ => {
trace!("reactor shutdown due to unexpected cell: {:?}", msg);
Err(Error::CircProto(format!("Unexpected {msg:?} cell on client circuit")).into())
}
}
}
#[cfg(feature = "conflux")]
#[instrument(level = "trace", skip_all)]
fn note_conflux_handshake_result(
&mut self,
res: StdResult<(), ConfluxHandshakeError>,
reactor_is_closing: bool,
) -> StdResult<(), ReactorError> {
let tunnel_complete = match self.conflux_hs_ctx.as_mut() {
Some(conflux_ctx) => {
conflux_ctx.results.push(res);
conflux_ctx.results.len() == conflux_ctx.num_legs
}
None => {
return Err(internal!("no conflux handshake context").into());
}
};
if tunnel_complete || reactor_is_closing {
let conflux_ctx = self.conflux_hs_ctx.take().expect("context disappeared?!");
let success_count = conflux_ctx.results.iter().filter(|res| res.is_ok()).count();
let leg_count = conflux_ctx.results.len();
info!(
tunnel_id = %self.tunnel_id,
"conflux tunnel ready ({success_count}/{leg_count} circuits successfully linked)",
);
send_conflux_outcome(conflux_ctx.answer, Ok(conflux_ctx.results))?;
}
Ok(())
}
fn prepare_msg_and_install_handler(
&mut self,
msg: Option<AnyRelayMsgOuter>,
handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
) -> Result<Option<SendRelayCell>> {
let msg = msg
.map(|msg| {
let handlers = &mut self.cell_handlers;
let handler = handler
.as_ref()
.or(handlers.meta_handler.as_ref())
.ok_or_else(|| internal!("tried to use an ended Conversation"))?;
let hop = handler.expected_hop().hop_num().ok_or(bad_api_usage!(
"MsgHandler doesn't have a precise HopLocation"
))?;
Ok::<_, crate::Error>(SendRelayCell {
hop: Some(hop),
early: false,
cell: msg,
})
})
.transpose()?;
if let Some(handler) = handler {
self.cell_handlers.set_meta_handler(handler)?;
}
Ok(msg)
}
fn handle_shutdown(&self) -> StdResult<Option<RunOnceCmdInner>, ReactorError> {
trace!(
tunnel_id = %self.tunnel_id,
"reactor shutdown due to explicit request",
);
Err(ReactorError::Shutdown)
}
#[cfg(feature = "conflux")]
fn handle_shutdown_and_return_circuit(
&mut self,
answer: oneshot::Sender<StdResult<Circuit, Bug>>,
) -> StdResult<(), ReactorError> {
let _ = answer.send(self.circuits.take_single_leg());
self.handle_shutdown().map(|_| ())
}
fn resolve_target_hop(&self, hop: TargetHop) -> StdResult<HopLocation, NoHopsBuiltError> {
match hop {
TargetHop::Hop(hop) => Ok(hop),
TargetHop::LastHop => {
if let Ok(leg) = self.circuits.single_leg() {
let leg_id = leg.unique_id();
let hop = leg.last_hop_num().ok_or(NoHopsBuiltError)?;
Ok(HopLocation::Hop((leg_id, hop)))
} else if !self.circuits.is_empty() {
Ok(HopLocation::JoinPoint)
} else {
Err(NoHopsBuiltError)
}
}
}
}
#[instrument(level = "trace", skip_all)]
fn resolve_hop_location(
&self,
hop: HopLocation,
) -> StdResult<(UniqId, HopNum), NoJoinPointError> {
match hop {
HopLocation::Hop((leg_id, hop_num)) => Ok((leg_id, hop_num)),
HopLocation::JoinPoint => {
if let Some((leg_id, hop_num)) = self.circuits.primary_join_point() {
Ok((leg_id, hop_num))
} else {
Err(NoJoinPointError)
}
}
}
}
pub(crate) fn target_hop_to_hopnum_id(&self, hop: TargetHop) -> Option<(UniqId, HopNum)> {
self.resolve_target_hop(hop)
.ok()
.and_then(|resolved| self.resolve_hop_location(resolved).ok())
}
#[cfg(feature = "circ-padding-manual")]
fn set_padding_at_hop(
&self,
hop: HopLocation,
padder: Option<super::circuit::padding::CircuitPadder>,
) -> Result<()> {
let HopLocation::Hop((leg_id, hop_num)) = hop else {
return Err(bad_api_usage!("Padding to the join point is not supported.").into());
};
let circ = self.circuits.leg(leg_id).ok_or(Error::NoSuchHop)?;
circ.set_padding_at_hop(hop_num, padder)?;
Ok(())
}
fn uses_stream_sendme(&self, leg: UniqId, hop: HopNum) -> Option<bool> {
self.circuits.uses_stream_sendme(leg, hop)
}
#[cfg(feature = "conflux")]
#[instrument(level = "trace", skip_all)]
async fn handle_link_circuits(
&mut self,
circuits: Vec<Circuit>,
answer: ConfluxLinkResultChannel,
) -> StdResult<(), ReactorError> {
use tor_error::warn_report;
if self.conflux_hs_ctx.is_some() {
let err = internal!("conflux linking already in progress");
send_conflux_outcome(answer, Err(err.into()))?;
return Ok(());
}
let unlinked_legs = self.circuits.num_unlinked();
let num_legs = circuits.len() + unlinked_legs;
let res = async {
self.circuits.add_legs(circuits, &self.runtime)?;
self.circuits.link_circuits(&self.runtime).await
}
.await;
if let Err(e) = res {
warn_report!(e, "Failed to link conflux circuits");
send_conflux_outcome(answer, Err(e))?;
} else {
self.conflux_hs_ctx = Some(ConfluxHandshakeCtx {
answer,
num_legs,
results: Default::default(),
});
}
Ok(())
}
}
#[cfg(feature = "conflux")]
fn send_conflux_outcome(
tx: ConfluxLinkResultChannel,
res: Result<ConfluxHandshakeResult>,
) -> StdResult<(), ReactorError> {
if tx.send(res).is_err() {
tracing::warn!("conflux initiator went away before handshake completed?");
return Err(ReactorError::Shutdown);
}
Ok(())
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
#[non_exhaustive]
#[error("no hops have been built for this tunnel")]
pub(crate) struct NoHopsBuiltError;
#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
#[non_exhaustive]
#[error("the tunnel does not have a join point")]
pub(crate) struct NoJoinPointError;
impl CellHandlers {
fn set_meta_handler(&mut self, handler: Box<dyn MetaCellHandler + Send>) -> Result<()> {
if self.meta_handler.is_none() {
self.meta_handler = Some(handler);
Ok(())
} else {
Err(Error::from(internal!(
"Tried to install a meta-cell handler before the old one was gone."
)))
}
}
#[cfg(feature = "hs-service")]
fn set_incoming_stream_req_handler(
&mut self,
handler: IncomingStreamRequestHandler,
) -> Result<()> {
if self.incoming_stream_req_handler.is_none() {
self.incoming_stream_req_handler = Some(handler);
Ok(())
} else {
Err(Error::from(internal!(
"Tried to install a BEGIN cell handler before the old one was gone."
)))
}
}
}
#[cfg(test)]
mod test {
}