tor_proto/client/reactor.rs
1//! Code to handle incoming cells on a circuit.
2//!
3//! ## On message validation
4//!
5//! There are three steps for validating an incoming message on a stream:
6//!
7//! 1. Is the message contextually appropriate? (e.g., no more than one
8//! `CONNECTED` message per stream.) This is handled by calling
9//! [`CmdChecker::check_msg`](crate::stream::cmdcheck::CmdChecker::check_msg).
10//! 2. Does the message comply with flow-control rules? (e.g., no more SENDMEs
11//! than we've sent data for.) This is handled within the reactor by the
12//! `StreamFlowCtrl`. For half-closed streams which don't send stream
13//! SENDMEs, an additional receive-window check is performed in the
14//! `halfstream` module.
15//! 3. Does the message have an acceptable command type, and is the message
16//! well-formed? For open streams, the streams themselves handle this check.
17//! For half-closed streams, the reactor handles it by calling
18//! `consume_checked_msg()`.
19
20pub(crate) mod circuit;
21mod conflux;
22mod control;
23pub(super) mod syncview;
24
25use crate::circuit::circhop::SendRelayCell;
26use crate::circuit::{CircuitRxReceiver, UniqId};
27use crate::client::circuit::ClientCircChanMsg;
28use crate::client::circuit::padding::{PaddingController, PaddingEvent, PaddingEventStream};
29use crate::client::{HopLocation, TargetHop};
30use crate::crypto::cell::HopNum;
31use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
32use crate::memquota::CircuitAccount;
33use crate::stream::CloseStreamBehavior;
34use crate::streammap;
35use crate::tunnel::{TunnelId, TunnelScopedCircId};
36use crate::util::err::ReactorError;
37use crate::util::skew::ClockSkew;
38use crate::util::timeout::TimeoutEstimator;
39use crate::{Error, Result};
40use circuit::Circuit;
41use conflux::ConfluxSet;
42use control::ControlHandler;
43use std::cmp::Ordering;
44use std::collections::BinaryHeap;
45use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
46use tor_cell::relaycell::msg::Sendme;
47use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId, UnparsedRelayMsg};
48use tor_error::{Bug, bad_api_usage, debug_report, internal, into_bad_api_usage};
49use tor_rtcompat::{DynTimeProvider, SleepProvider};
50
51use cfg_if::cfg_if;
52use futures::StreamExt;
53use futures::channel::mpsc;
54use futures::{FutureExt as _, select_biased};
55use oneshot_fused_workaround as oneshot;
56
57use std::result::Result as StdResult;
58use std::sync::Arc;
59use std::time::Duration;
60
61use crate::channel::Channel;
62use crate::conflux::msghandler::RemoveLegReason;
63use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey};
64use circuit::CircuitCmd;
65use derive_more::From;
66use smallvec::smallvec;
67use tor_cell::chancell::CircId;
68use tor_llcrypto::pk;
69use tracing::{debug, info, instrument, trace, warn};
70
71use super::circuit::{MutableState, TunnelMutableState};
72use crate::circuit::reactor::ReactorResultChannel;
73
74#[cfg(feature = "hs-service")]
75use crate::stream::incoming::IncomingStreamRequestHandler;
76
77#[cfg(feature = "conflux")]
78use {
79 crate::conflux::msghandler::{ConfluxCmd, OooRelayMsg},
80 crate::util::err::ConfluxHandshakeError,
81};
82
83pub(super) use control::{CtrlCmd, CtrlMsg, FlowCtrlMsg};
84
85/// Contains a list of conflux handshake results.
86#[cfg(feature = "conflux")]
87pub(super) type ConfluxHandshakeResult = Vec<StdResult<(), ConfluxHandshakeError>>;
88
89/// The type of oneshot channel used to inform reactor users of the outcome
90/// of a client-side conflux handshake.
91///
92/// Contains a list of handshake results, one for each circuit that we were asked
93/// to link in the tunnel.
94#[cfg(feature = "conflux")]
95pub(super) type ConfluxLinkResultChannel = ReactorResultChannel<ConfluxHandshakeResult>;
96
97/// A handshake type, to be used when creating circuit hops.
98#[derive(Clone, Debug)]
99pub(crate) enum CircuitHandshake {
100 /// Use the CREATE_FAST handshake.
101 CreateFast,
102 /// Use the ntor handshake.
103 Ntor {
104 /// The public key of the relay.
105 public_key: NtorPublicKey,
106 /// The Ed25519 identity of the relay, which is verified against the
107 /// identity held in the circuit's channel.
108 ed_identity: pk::ed25519::Ed25519Identity,
109 },
110 /// Use the ntor-v3 handshake.
111 NtorV3 {
112 /// The public key of the relay.
113 public_key: NtorV3PublicKey,
114 },
115}
116
117// TODO: the RunOnceCmd/RunOnceCmdInner/CircuitCmd/CircuitEvent enum
118// proliferation is a bit bothersome, but unavoidable with the current design.
119//
120// We should consider getting rid of some of these enums (if possible),
121// and coming up with more intuitive names.
122
123/// One or more [`RunOnceCmdInner`] to run inside [`Reactor::run_once`].
124#[derive(From, Debug)]
125#[allow(clippy::large_enum_variant)] // TODO #2003: resolve this
126enum RunOnceCmd {
127 /// Run a single `RunOnceCmdInner` command.
128 Single(RunOnceCmdInner),
129 /// Run multiple `RunOnceCmdInner` commands.
130 //
131 // Note: this whole enum *could* be replaced with Vec<RunOnceCmdInner>,
132 // but most of the time we're only going to have *one* RunOnceCmdInner
133 // to run per run_once() loop. The enum enables us avoid the extra heap
134 // allocation for the `RunOnceCmd::Single` case.
135 Multiple(Vec<RunOnceCmdInner>),
136}
137
138/// Instructions for running something in the reactor loop.
139///
140/// Run at the end of [`Reactor::run_once`].
141//
142// TODO: many of the variants of this enum have an identical CtrlMsg counterpart.
143// We should consider making each variant a tuple variant and deduplicating the fields.
144#[derive(educe::Educe)]
145#[educe(Debug)]
146enum RunOnceCmdInner {
147 /// Send a RELAY cell.
148 Send {
149 /// The leg the cell should be sent on.
150 leg: UniqId,
151 /// The cell to send.
152 cell: SendRelayCell,
153 /// A channel for sending completion notifications.
154 done: Option<ReactorResultChannel<()>>,
155 },
156 /// Send a given control message on this circuit, and install a control-message handler to
157 /// receive responses.
158 #[cfg(feature = "send-control-msg")]
159 SendMsgAndInstallHandler {
160 /// The message to send, if any
161 msg: Option<AnyRelayMsgOuter>,
162 /// A message handler to install.
163 ///
164 /// If this is `None`, there must already be a message handler installed
165 #[educe(Debug(ignore))]
166 handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
167 /// A sender that we use to tell the caller that the message was sent
168 /// and the handler installed.
169 done: oneshot::Sender<Result<()>>,
170 },
171 /// Handle a SENDME message.
172 HandleSendMe {
173 /// The leg the SENDME was received on.
174 leg: UniqId,
175 /// The hop number.
176 hop: HopNum,
177 /// The SENDME message to handle.
178 sendme: Sendme,
179 },
180 /// Begin a stream with the provided hop in this circuit.
181 ///
182 /// Uses the provided stream ID, and sends the provided message to that hop.
183 BeginStream {
184 /// The cell to send.
185 cell: Result<(SendRelayCell, StreamId)>,
186 /// The location of the hop on the tunnel. We don't use this (and `Circuit`s shouldn't need
187 /// to worry about legs anyways), but need it so that we can pass it back in `done` to the
188 /// caller.
189 hop: HopLocation,
190 /// The circuit leg to begin the stream on.
191 leg: UniqId,
192 /// Oneshot channel to notify on completion, with the allocated stream ID.
193 done: ReactorResultChannel<(StreamId, HopLocation, RelayCellFormat)>,
194 },
195 /// Consider sending an XON message with the given `rate`.
196 MaybeSendXon {
197 /// The drain rate to advertise in the XON message.
198 rate: XonKbpsEwma,
199 /// The ID of the stream to send the message on.
200 stream_id: StreamId,
201 /// The location of the hop on the tunnel.
202 hop: HopLocation,
203 },
204 /// Close the specified stream.
205 CloseStream {
206 /// The hop number.
207 hop: HopLocation,
208 /// The ID of the stream to close.
209 sid: StreamId,
210 /// The stream-closing behavior.
211 behav: CloseStreamBehavior,
212 /// The reason for closing the stream.
213 reason: streammap::TerminateReason,
214 /// A channel for sending completion notifications.
215 done: Option<ReactorResultChannel<()>>,
216 },
217 /// Get the clock skew claimed by the first hop of the circuit.
218 FirstHopClockSkew {
219 /// Oneshot channel to return the clock skew.
220 answer: oneshot::Sender<StdResult<ClockSkew, Bug>>,
221 },
222 /// Remove a circuit leg from the conflux set.
223 RemoveLeg {
224 /// The circuit leg to remove.
225 leg: UniqId,
226 /// The reason for removal.
227 ///
228 /// This is only used for conflux circuits that get removed
229 /// before the conflux handshake is complete.
230 ///
231 /// The [`RemoveLegReason`] is mapped by the reactor to a
232 /// [`ConfluxHandshakeError`] that is sent to the initiator of the
233 /// handshake to indicate the reason the handshake failed.
234 reason: RemoveLegReason,
235 },
236 /// A circuit has completed the conflux handshake,
237 /// and wants to send the specified cell.
238 ///
239 /// This is similar to [`RunOnceCmdInner::Send`],
240 /// but needs to remain a separate variant,
241 /// because in addition to instructing the reactor to send a cell,
242 /// it also notifies it that the conflux handshake is complete on the specified `leg`.
243 /// This enables the reactor to save the handshake result (`Ok(())`),
244 /// and, if there are no other legs still in the handshake phase,
245 /// send the result to the handshake initiator.
246 #[cfg(feature = "conflux")]
247 ConfluxHandshakeComplete {
248 /// The circuit leg that has completed the handshake,
249 /// This is the leg the cell should be sent on.
250 leg: UniqId,
251 /// The cell to send.
252 cell: SendRelayCell,
253 },
254 /// Send a LINK cell on each of the unlinked circuit legs in the conflux set of this reactor.
255 #[cfg(feature = "conflux")]
256 Link {
257 /// The circuits to link into the tunnel
258 #[educe(Debug(ignore))]
259 circuits: Vec<Circuit>,
260 /// Oneshot channel for notifying of conflux handshake completion.
261 answer: ConfluxLinkResultChannel,
262 },
263 /// Enqueue an out-of-order cell in ooo_msg.
264 #[cfg(feature = "conflux")]
265 Enqueue {
266 /// The leg the entry originated from.
267 leg: UniqId,
268 /// The out-of-order message.
269 msg: OooRelayMsg,
270 },
271 /// Take a padding-related event on a circuit leg.
272 #[cfg(feature = "circ-padding")]
273 PaddingAction {
274 /// The leg to event on.
275 leg: UniqId,
276 /// The event to take.
277 padding_event: PaddingEvent,
278 },
279 /// Perform a clean shutdown on this circuit.
280 CleanShutdown,
281}
282
283impl RunOnceCmdInner {
284 /// Create a [`RunOnceCmdInner`] out of a [`CircuitCmd`] and [`UniqId`].
285 fn from_circuit_cmd(leg: UniqId, cmd: CircuitCmd) -> Self {
286 match cmd {
287 CircuitCmd::Send(cell) => Self::Send {
288 leg,
289 cell,
290 done: None,
291 },
292 CircuitCmd::HandleSendMe { hop, sendme } => Self::HandleSendMe { leg, hop, sendme },
293 CircuitCmd::CloseStream {
294 hop,
295 sid,
296 behav,
297 reason,
298 } => Self::CloseStream {
299 hop: HopLocation::Hop((leg, hop)),
300 sid,
301 behav,
302 reason,
303 done: None,
304 },
305 #[cfg(feature = "conflux")]
306 CircuitCmd::Conflux(ConfluxCmd::RemoveLeg(reason)) => Self::RemoveLeg { leg, reason },
307 #[cfg(feature = "conflux")]
308 CircuitCmd::Conflux(ConfluxCmd::HandshakeComplete { hop, early, cell }) => {
309 let cell = SendRelayCell {
310 hop: Some(hop),
311 early,
312 cell,
313 };
314 Self::ConfluxHandshakeComplete { leg, cell }
315 }
316 #[cfg(feature = "conflux")]
317 CircuitCmd::Enqueue(msg) => Self::Enqueue { leg, msg },
318 CircuitCmd::CleanShutdown => Self::CleanShutdown,
319 }
320 }
321}
322
323/// A command to execute at the end of [`Reactor::run_once`].
324#[derive(From, Debug)]
325#[allow(clippy::large_enum_variant)] // TODO #2003: should we resolve this?
326enum CircuitEvent {
327 /// Run a single `CircuitCmd` command.
328 RunCmd {
329 /// The unique identifier of the circuit leg to run the command on
330 leg: UniqId,
331 /// The command to run.
332 cmd: CircuitCmd,
333 },
334 /// Handle a control message
335 HandleControl(CtrlMsg),
336 /// Handle an input message.
337 HandleCell {
338 /// The unique identifier of the circuit leg the message was received on.
339 leg: UniqId,
340 /// The message to handle.
341 cell: ClientCircChanMsg,
342 },
343 /// Remove the specified circuit leg from the conflux set.
344 ///
345 /// Returned whenever a single circuit leg needs to be be removed
346 /// from the reactor's conflux set, without necessarily tearing down
347 /// the whole set or shutting down the reactor.
348 ///
349 /// Note: this event *can* cause the reactor to shut down
350 /// (and the conflux set to be closed).
351 ///
352 /// See the [`ConfluxSet::remove`] docs for more on the exact behavior of this command.
353 RemoveLeg {
354 /// The leg to remove.
355 leg: UniqId,
356 /// The reason for removal.
357 ///
358 /// This is only used for conflux circuits that get removed
359 /// before the conflux handshake is complete.
360 ///
361 /// The [`RemoveLegReason`] is mapped by the reactor to a
362 /// [`ConfluxHandshakeError`] that is sent to the initiator of the
363 /// handshake to indicate the reason the handshake failed.
364 reason: RemoveLegReason,
365 },
366 /// Take some event (blocking or unblocking a circuit, or sending padding)
367 /// based on the circuit padding backend code.
368 PaddingAction {
369 /// The leg on which to take the padding event .
370 leg: UniqId,
371 /// The event to take.
372 padding_event: PaddingEvent,
373 },
374 /// Protocol violation. This leads for now to the close of the circuit reactor. The
375 /// error causing the violation is set in err.
376 ProtoViolation {
377 /// The error that causes this protocol violation.
378 err: crate::Error,
379 },
380}
381
382impl CircuitEvent {
383 /// Return the ordering with which we should handle this event
384 /// within a list of events returned by a single call to next_circ_event().
385 ///
386 /// NOTE: Please do not make this any more complicated:
387 /// It is a consequence of a kludge that we need this sorting at all.
388 /// Assuming that eventually, we switch away from the current
389 /// poll-oriented `next_circ_event` design,
390 /// we may be able to get rid of this entirely.
391 fn order_within_batch(&self) -> u8 {
392 use CircuitEvent as CA;
393 use PaddingEvent as PE;
394 // This immediate state MUST NOT be used for events emitting cells. At the moment, it is
395 // only used by the protocol violation event which leads to a shutdown of the reactor.
396 const IMMEDIATE: u8 = 0;
397 const EARLY: u8 = 1;
398 const NORMAL: u8 = 2;
399 const LATE: u8 = 3;
400
401 // We use this ordering to move any "StartBlocking" to the _end_ of a batch and
402 // "StopBlocking" to the start.
403 //
404 // This way, we can be sure that we will handle any "send data" operations
405 // (and tell the Padder about them) _before_ we tell the Padder
406 // that we have blocked the circuit.
407 //
408 // This keeps things a bit more logical.
409 match self {
410 CA::RunCmd { .. } => NORMAL,
411 CA::HandleControl(..) => NORMAL,
412 CA::HandleCell { .. } => NORMAL,
413 CA::RemoveLeg { .. } => NORMAL,
414 #[cfg(feature = "circ-padding")]
415 CA::PaddingAction { padding_event, .. } => match padding_event {
416 PE::StopBlocking => EARLY,
417 PE::SendPadding(..) => NORMAL,
418 PE::StartBlocking(..) => LATE,
419 },
420 #[cfg(not(feature = "circ-padding"))]
421 CA::PaddingAction { .. } => NORMAL,
422 CA::ProtoViolation { .. } => IMMEDIATE,
423 }
424 }
425}
426
427/// An object that's waiting for a meta cell (one not associated with a stream) in order to make
428/// progress.
429///
430/// # Background
431///
432/// The `Reactor` can't have async functions that send and receive cells, because its job is to
433/// send and receive cells: if one of its functions tried to do that, it would just hang forever.
434///
435/// To get around this problem, the reactor can send some cells, and then make one of these
436/// `MetaCellHandler` objects, which will be run when the reply arrives.
437pub(crate) trait MetaCellHandler: Send {
438 /// The hop we're expecting the message to come from. This is compared against the hop
439 /// from which we actually receive messages, and an error is thrown if the two don't match.
440 fn expected_hop(&self) -> HopLocation;
441 /// Called when the message we were waiting for arrives.
442 ///
443 /// Gets a copy of the `Reactor` in order to do anything it likes there.
444 ///
445 /// If this function returns an error, the reactor will shut down.
446 fn handle_msg(
447 &mut self,
448 msg: UnparsedRelayMsg,
449 reactor: &mut Circuit,
450 ) -> Result<MetaCellDisposition>;
451}
452
453/// A possible successful outcome of giving a message to a [`MsgHandler`](super::msghandler::MsgHandler).
454#[derive(Debug, Clone)]
455#[cfg_attr(feature = "send-control-msg", visibility::make(pub))]
456#[non_exhaustive]
457pub(crate) enum MetaCellDisposition {
458 /// The message was consumed; the handler should remain installed.
459 #[cfg(feature = "send-control-msg")]
460 Consumed,
461 /// The message was consumed; the handler should be uninstalled.
462 ConversationFinished,
463 /// The message was consumed; the circuit should be closed.
464 #[cfg(feature = "send-control-msg")]
465 CloseCirc,
466 // TODO: Eventually we might want the ability to have multiple handlers
467 // installed, and to let them say "not for me, maybe for somebody else?".
468 // But right now we don't need that.
469}
470
471/// Unwrap the specified [`Option`], returning a [`ReactorError::Shutdown`] if it is `None`.
472///
473/// This is a macro instead of a function to work around borrowck errors
474/// in the select! from run_once().
475macro_rules! unwrap_or_shutdown {
476 ($self:expr, $res:expr, $reason:expr) => {{
477 match $res {
478 None => {
479 trace!(
480 tunnel_id = %$self.tunnel_id,
481 reason = %$reason,
482 "reactor shutdown"
483 );
484 Err(ReactorError::Shutdown)
485 }
486 Some(v) => Ok(v),
487 }
488 }};
489}
490
491/// Object to handle incoming cells and background tasks on a circuit
492///
493/// This type is returned when you finish a circuit; you need to spawn a
494/// new task that calls `run()` on it.
495#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
496pub struct Reactor {
497 /// Receiver for control messages for this reactor, sent by `ClientCirc` objects.
498 ///
499 /// This channel is polled in [`Reactor::run_once`], but only if the `chan_sender` sink
500 /// is ready to accept cells.
501 control: mpsc::UnboundedReceiver<CtrlMsg>,
502 /// Receiver for command messages for this reactor, sent by `ClientCirc` objects.
503 ///
504 /// This channel is polled in [`Reactor::run_once`].
505 ///
506 /// NOTE: this is a separate channel from `control`, because some messages
507 /// have higher priority and need to be handled even if the `chan_sender` is not
508 /// ready (whereas `control` messages are not read until the `chan_sender` sink
509 /// is ready to accept cells).
510 command: mpsc::UnboundedReceiver<CtrlCmd>,
511 /// A oneshot sender that is used to alert other tasks when this reactor is
512 /// finally dropped.
513 ///
514 /// It is a sender for Void because we never actually want to send anything here;
515 /// we only want to generate canceled events.
516 #[allow(dead_code)] // the only purpose of this field is to be dropped.
517 reactor_closed_tx: oneshot::Sender<void::Void>,
518 /// A set of circuits that form a tunnel.
519 ///
520 /// Contains 1 or more circuits.
521 ///
522 /// Circuits may be added to this set throughout the lifetime of the reactor.
523 ///
524 /// Sometimes, the reactor will remove circuits from this set,
525 /// for example if the `LINKED` message takes too long to arrive,
526 /// or if congestion control negotiation fails.
527 /// The reactor will continue running with the remaining circuits.
528 /// It will shut down if *all* the circuits are removed.
529 ///
530 // TODO(conflux): document all the reasons why the reactor might
531 // chose to tear down a circuit or tunnel (timeouts, protocol violations, etc.)
532 circuits: ConfluxSet,
533 /// An identifier for logging about this tunnel reactor.
534 tunnel_id: TunnelId,
535 /// Handlers, shared with `Circuit`.
536 cell_handlers: CellHandlers,
537 /// The time provider, used for conflux handshake timeouts.
538 runtime: DynTimeProvider,
539 /// The conflux handshake context, if there is an on-going handshake.
540 ///
541 /// Set to `None` if this is a single-path tunnel,
542 /// or if none of the circuit legs from our conflux set
543 /// are currently in the conflux handshake phase.
544 #[cfg(feature = "conflux")]
545 conflux_hs_ctx: Option<ConfluxHandshakeCtx>,
546 /// A min-heap buffering all the out-of-order messages received so far.
547 ///
548 /// TODO(conflux): this becomes a DoS vector unless we impose a limit
549 /// on its size. We should make this participate in the memquota memory
550 /// tracking system, somehow.
551 #[cfg(feature = "conflux")]
552 ooo_msgs: BinaryHeap<ConfluxHeapEntry>,
553}
554
555/// The context for an on-going conflux handshake.
556#[cfg(feature = "conflux")]
557struct ConfluxHandshakeCtx {
558 /// A channel for notifying the caller of the outcome of a CONFLUX_LINK request.
559 answer: ConfluxLinkResultChannel,
560 /// The number of legs that are currently doing the handshake.
561 num_legs: usize,
562 /// The handshake results we have collected so far.
563 results: ConfluxHandshakeResult,
564}
565
566/// An out-of-order message buffered in [`Reactor::ooo_msgs`].
567#[derive(Debug)]
568#[cfg(feature = "conflux")]
569struct ConfluxHeapEntry {
570 /// The leg id this message came from.
571 leg_id: UniqId,
572 /// The out of order message
573 msg: OooRelayMsg,
574}
575
576#[cfg(feature = "conflux")]
577impl Ord for ConfluxHeapEntry {
578 fn cmp(&self, other: &Self) -> Ordering {
579 self.msg.cmp(&other.msg)
580 }
581}
582
583#[cfg(feature = "conflux")]
584impl PartialOrd for ConfluxHeapEntry {
585 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
586 Some(self.cmp(other))
587 }
588}
589
590#[cfg(feature = "conflux")]
591impl PartialEq for ConfluxHeapEntry {
592 fn eq(&self, other: &Self) -> bool {
593 self.msg == other.msg
594 }
595}
596
597#[cfg(feature = "conflux")]
598impl Eq for ConfluxHeapEntry {}
599
600/// Cell handlers, shared between the Reactor and its underlying `Circuit`s.
601struct CellHandlers {
602 /// A handler for a meta cell, together with a result channel to notify on completion.
603 ///
604 /// NOTE(prop349): this is part of Arti's "Base Circuit Hop Handler".
605 ///
606 /// Upon sending an EXTEND cell, the [`ControlHandler`] sets this handler
607 /// to [`CircuitExtender`](circuit::extender::CircuitExtender).
608 /// The handler is then used in [`Circuit::handle_meta_cell`] for handling
609 /// all the meta cells received on the circuit that are not SENDMEs or TRUNCATE
610 /// (which are handled separately) or conflux cells
611 /// (which are handled by the conflux handlers).
612 ///
613 /// The handler is uninstalled after the receipt of the EXTENDED cell,
614 /// so any subsequent EXTENDED cells will cause the circuit to be torn down.
615 meta_handler: Option<Box<dyn MetaCellHandler + Send>>,
616 /// A handler for incoming stream requests.
617 #[cfg(feature = "hs-service")]
618 incoming_stream_req_handler: Option<IncomingStreamRequestHandler>,
619}
620
621impl Reactor {
622 /// Create a new circuit reactor.
623 ///
624 /// The reactor will send outbound messages on `channel`, receive incoming
625 /// messages on `input`, and identify this circuit by the channel-local
626 /// [`CircId`] provided.
627 ///
628 /// The internal unique identifier for this circuit will be `unique_id`.
629 #[allow(clippy::type_complexity, clippy::too_many_arguments)] // TODO
630 pub(super) fn new(
631 channel: Arc<Channel>,
632 channel_id: CircId,
633 unique_id: UniqId,
634 input: CircuitRxReceiver,
635 runtime: DynTimeProvider,
636 memquota: CircuitAccount,
637 padding_ctrl: PaddingController,
638 padding_stream: PaddingEventStream,
639 timeouts: Arc<dyn TimeoutEstimator + Send>,
640 ) -> (
641 Self,
642 mpsc::UnboundedSender<CtrlMsg>,
643 mpsc::UnboundedSender<CtrlCmd>,
644 oneshot::Receiver<void::Void>,
645 Arc<TunnelMutableState>,
646 ) {
647 let tunnel_id = TunnelId::next();
648 let (control_tx, control_rx) = mpsc::unbounded();
649 let (command_tx, command_rx) = mpsc::unbounded();
650 let mutable = Arc::new(MutableState::default());
651
652 let (reactor_closed_tx, reactor_closed_rx) = oneshot::channel();
653
654 let cell_handlers = CellHandlers {
655 meta_handler: None,
656 #[cfg(feature = "hs-service")]
657 incoming_stream_req_handler: None,
658 };
659
660 let unique_id = TunnelScopedCircId::new(tunnel_id, unique_id);
661 let circuit_leg = Circuit::new(
662 runtime.clone(),
663 channel,
664 channel_id,
665 unique_id,
666 input,
667 memquota,
668 Arc::clone(&mutable),
669 padding_ctrl,
670 padding_stream,
671 timeouts,
672 );
673
674 let (circuits, mutable) = ConfluxSet::new(tunnel_id, circuit_leg);
675
676 let reactor = Reactor {
677 circuits,
678 control: control_rx,
679 command: command_rx,
680 reactor_closed_tx,
681 tunnel_id,
682 cell_handlers,
683 runtime,
684 #[cfg(feature = "conflux")]
685 conflux_hs_ctx: None,
686 #[cfg(feature = "conflux")]
687 ooo_msgs: Default::default(),
688 };
689
690 (reactor, control_tx, command_tx, reactor_closed_rx, mutable)
691 }
692
693 /// Launch the reactor, and run until the circuit closes or we
694 /// encounter an error.
695 ///
696 /// Once this method returns, the circuit is dead and cannot be
697 /// used again.
698 #[instrument(level = "trace", skip_all)]
699 pub async fn run(mut self) -> Result<()> {
700 trace!(tunnel_id = %self.tunnel_id, "Running tunnel reactor");
701 let result: Result<()> = loop {
702 match self.run_once().await {
703 Ok(()) => (),
704 Err(ReactorError::Shutdown) => break Ok(()),
705 Err(ReactorError::Err(e)) => break Err(e),
706 }
707 };
708
709 // Log that the reactor stopped, possibly with the associated error as a report.
710 // May log at a higher level depending on the error kind.
711 const MSG: &str = "Tunnel reactor stopped";
712 match &result {
713 Ok(()) => trace!(tunnel_id = %self.tunnel_id, "{MSG}"),
714 Err(e) => debug_report!(e, tunnel_id = %self.tunnel_id, "{MSG}"),
715 }
716
717 result
718 }
719
720 /// Helper for run: doesn't mark the circuit closed on finish. Only
721 /// processes one cell or control message.
722 #[instrument(level = "trace", skip_all)]
723 async fn run_once(&mut self) -> StdResult<(), ReactorError> {
724 // If all the circuits are closed, shut down the reactor
725 if self.circuits.is_empty() {
726 trace!(
727 tunnel_id = %self.tunnel_id,
728 "Tunnel reactor shutting down: all circuits have closed",
729 );
730
731 return Err(ReactorError::Shutdown);
732 }
733
734 // If this is a single path circuit, we need to wait until the first hop
735 // is created before doing anything else
736 let single_path_with_hops = self
737 .circuits
738 .single_leg_mut()
739 .is_ok_and(|leg| !leg.has_hops());
740 if single_path_with_hops {
741 self.wait_for_create().await?;
742
743 return Ok(());
744 }
745
746 // Prioritize the buffered messages.
747 //
748 // Note: if any of the messages are ready to be handled,
749 // this will block the reactor until we are done processing them
750 //
751 // TODO circpad: If this is a problem, we might want to re-order things so that we
752 // prioritize padding instead. On the other hand, this should be fixed by refactoring
753 // circuit and tunnel reactors, so we might do well to just leave it alone for now.
754 #[cfg(feature = "conflux")]
755 self.try_dequeue_ooo_msgs().await?;
756
757 let mut events = select_biased! {
758 res = self.command.next() => {
759 let cmd = unwrap_or_shutdown!(self, res, "command channel drop")?;
760 return ControlHandler::new(self).handle_cmd(cmd);
761 },
762 // Check whether we've got a control message pending.
763 //
764 // Note: unfortunately, reading from control here means we might start
765 // handling control messages before our chan_senders are ready.
766 // With the current design, this is inevitable: we can't know which circuit leg
767 // a control message is meant for without first reading the control message from
768 // the channel, and at that point, we can't know for sure whether that particular
769 // circuit is ready for sending.
770 ret = self.control.next() => {
771 let msg = unwrap_or_shutdown!(self, ret, "control drop")?;
772 smallvec![CircuitEvent::HandleControl(msg)]
773 },
774 res = self.circuits.next_circ_event(&self.runtime).fuse() => res?,
775 };
776
777 // Put the events into the order that we need to execute them in.
778 //
779 // (Yes, this _does_ have to be a stable sort. Not all events may be freely re-ordered
780 // with respect to one another.)
781 events.sort_by_key(|a| a.order_within_batch());
782
783 for event in events {
784 let cmd = match event {
785 CircuitEvent::RunCmd { leg, cmd } => Some(RunOnceCmd::Single(
786 RunOnceCmdInner::from_circuit_cmd(leg, cmd),
787 )),
788 CircuitEvent::HandleControl(ctrl) => ControlHandler::new(self)
789 .handle_msg(ctrl)?
790 .map(RunOnceCmd::Single),
791 CircuitEvent::HandleCell { leg, cell } => {
792 let circ = self
793 .circuits
794 .leg_mut(leg)
795 .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
796
797 let circ_cmds = circ.handle_cell(&mut self.cell_handlers, leg, cell)?;
798 if circ_cmds.is_empty() {
799 None
800 } else {
801 // TODO: we return RunOnceCmd::Multiple even if there's a single command.
802 //
803 // See the TODO on `Circuit::handle_cell`.
804 let cmd = RunOnceCmd::Multiple(
805 circ_cmds
806 .into_iter()
807 .map(|cmd| RunOnceCmdInner::from_circuit_cmd(leg, cmd))
808 .collect(),
809 );
810
811 Some(cmd)
812 }
813 }
814 CircuitEvent::RemoveLeg { leg, reason } => {
815 Some(RunOnceCmdInner::RemoveLeg { leg, reason }.into())
816 }
817 CircuitEvent::PaddingAction { leg, padding_event } => {
818 cfg_if! {
819 if #[cfg(feature = "circ-padding")] {
820 Some(RunOnceCmdInner::PaddingAction { leg, padding_event }.into())
821 } else {
822 // If padding isn't enabled, we never generate a padding event,
823 // so we can be sure this case will never be called.
824 void::unreachable(padding_event.0);
825 }
826 }
827 }
828 CircuitEvent::ProtoViolation { err } => {
829 return Err(err.into());
830 }
831 };
832
833 if let Some(cmd) = cmd {
834 self.handle_run_once_cmd(cmd).await?;
835 }
836 }
837
838 Ok(())
839 }
840
841 /// Try to process the previously-out-of-order messages we might have buffered.
842 #[cfg(feature = "conflux")]
843 #[instrument(level = "trace", skip_all)]
844 async fn try_dequeue_ooo_msgs(&mut self) -> StdResult<(), ReactorError> {
845 // Check if we're ready to dequeue any of the previously out-of-order cells.
846 while let Some(entry) = self.ooo_msgs.peek() {
847 let should_pop = self.circuits.is_seqno_in_order(entry.msg.seqno);
848
849 if !should_pop {
850 break;
851 }
852
853 let entry = self.ooo_msgs.pop().expect("item just disappeared?!");
854
855 let circ = self
856 .circuits
857 .leg_mut(entry.leg_id)
858 .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
859 let handlers = &mut self.cell_handlers;
860 let cmd = circ
861 .handle_in_order_relay_msg(
862 handlers,
863 entry.msg.hopnum,
864 entry.leg_id,
865 entry.msg.cell_counts_towards_windows,
866 entry.msg.streamid,
867 entry.msg.msg,
868 )?
869 .map(|cmd| {
870 RunOnceCmd::Single(RunOnceCmdInner::from_circuit_cmd(entry.leg_id, cmd))
871 });
872
873 if let Some(cmd) = cmd {
874 self.handle_run_once_cmd(cmd).await?;
875 }
876 }
877
878 Ok(())
879 }
880
881 /// Handle a [`RunOnceCmd`].
882 #[instrument(level = "trace", skip_all)]
883 async fn handle_run_once_cmd(&mut self, cmd: RunOnceCmd) -> StdResult<(), ReactorError> {
884 match cmd {
885 RunOnceCmd::Single(cmd) => return self.handle_single_run_once_cmd(cmd).await,
886 RunOnceCmd::Multiple(cmds) => {
887 // While we know `sendable` is ready to accept *one* cell,
888 // we can't be certain it will be able to accept *all* of the cells
889 // that need to be sent here. This means we *may* end up buffering
890 // in its underlying SometimesUnboundedSink! That is OK, because
891 // RunOnceCmd::Multiple is only used for handling packed cells.
892 for cmd in cmds {
893 self.handle_single_run_once_cmd(cmd).await?;
894 }
895 }
896 }
897
898 Ok(())
899 }
900
901 /// Handle a [`RunOnceCmd`].
902 #[instrument(level = "trace", skip_all)]
903 async fn handle_single_run_once_cmd(
904 &mut self,
905 cmd: RunOnceCmdInner,
906 ) -> StdResult<(), ReactorError> {
907 match cmd {
908 RunOnceCmdInner::Send { leg, cell, done } => {
909 // TODO: check the cc window
910 let res = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
911 if let Some(done) = done {
912 // Don't care if the receiver goes away
913 let _ = done.send(res.clone());
914 }
915 res?;
916 }
917 #[cfg(feature = "send-control-msg")]
918 RunOnceCmdInner::SendMsgAndInstallHandler { msg, handler, done } => {
919 let cell: Result<Option<SendRelayCell>> =
920 self.prepare_msg_and_install_handler(msg, handler);
921
922 match cell {
923 Ok(Some(cell)) => {
924 // TODO(conflux): let the RunOnceCmdInner specify which leg to send the cell on
925 let outcome = self.circuits.send_relay_cell_on_leg(cell, None).await;
926 // don't care if receiver goes away.
927 let _ = done.send(outcome.clone());
928 outcome?;
929 }
930 Ok(None) => {
931 // don't care if receiver goes away.
932 let _ = done.send(Ok(()));
933 }
934 Err(e) => {
935 // don't care if receiver goes away.
936 let _ = done.send(Err(e.clone()));
937 return Err(e.into());
938 }
939 }
940 }
941 RunOnceCmdInner::BeginStream {
942 leg,
943 cell,
944 hop,
945 done,
946 } => {
947 match cell {
948 Ok((cell, stream_id)) => {
949 let circ = self
950 .circuits
951 .leg_mut(leg)
952 .ok_or_else(|| internal!("leg disappeared?!"))?;
953 let cell_hop = cell.hop.expect("missing hop in client SendRelayCell?!");
954 let relay_format = circ
955 .hop_mut(cell_hop)
956 // TODO: Is this the right error type here? Or should there be a "HopDisappeared"?
957 .ok_or(Error::NoSuchHop)?
958 .relay_cell_format();
959
960 let outcome = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
961 // don't care if receiver goes away.
962 let _ = done.send(outcome.clone().map(|_| (stream_id, hop, relay_format)));
963 outcome?;
964 }
965 Err(e) => {
966 // don't care if receiver goes away.
967 let _ = done.send(Err(e.clone()));
968 return Err(e.into());
969 }
970 }
971 }
972 RunOnceCmdInner::CloseStream {
973 hop,
974 sid,
975 behav,
976 reason,
977 done,
978 } => {
979 let result = {
980 let (leg_id, hop_num) = self
981 .resolve_hop_location(hop)
982 .map_err(into_bad_api_usage!("Could not resolve {hop:?}"))?;
983 let leg = self
984 .circuits
985 .leg_mut(leg_id)
986 .ok_or(bad_api_usage!("No leg for id {:?}", leg_id))?;
987 Ok::<_, Bug>((leg, hop_num))
988 };
989
990 let (leg, hop_num) = match result {
991 Ok(x) => x,
992 Err(e) => {
993 if let Some(done) = done {
994 // don't care if the sender goes away
995 let e = into_bad_api_usage!("Could not resolve {hop:?}")(e);
996 let _ = done.send(Err(e.into()));
997 }
998 return Ok(());
999 }
1000 };
1001
1002 let max_rtt = {
1003 let hop = leg
1004 .hop(hop_num)
1005 .ok_or_else(|| internal!("the hop we resolved disappeared?!"))?;
1006 let ccontrol = hop.ccontrol();
1007
1008 // Note: if we have no measurements for the RTT, this will be set to 0,
1009 // and the timeout will be 2 * CBT.
1010 ccontrol
1011 .rtt()
1012 .max_rtt_usec()
1013 .map(|rtt| Duration::from_millis(u64::from(rtt)))
1014 .unwrap_or_default()
1015 };
1016
1017 // The length of the circuit up until the hop that has the half-streeam.
1018 //
1019 // +1, because HopNums are zero-based.
1020 let circ_len = usize::from(hop_num) + 1;
1021
1022 // We double the CBT to account for rend circuits,
1023 // which are twice as long (otherwise we risk expiring
1024 // the rend half-streams too soon).
1025 let timeout = std::cmp::max(max_rtt, 2 * leg.estimate_cbt(circ_len));
1026 let expire_at = self.runtime.now() + timeout;
1027
1028 let res: Result<()> = leg
1029 .close_stream(hop_num, sid, behav, reason, expire_at)
1030 .await;
1031
1032 if let Some(done) = done {
1033 // don't care if the sender goes away
1034 let _ = done.send(res);
1035 }
1036 }
1037 RunOnceCmdInner::MaybeSendXon {
1038 rate,
1039 stream_id,
1040 hop,
1041 } => {
1042 let (leg_id, hop_num) = match self.resolve_hop_location(hop) {
1043 Ok(x) => x,
1044 Err(NoJoinPointError) => {
1045 // A stream tried to send an XON message message to the join point of
1046 // a tunnel that has never had a join point. Currently in arti, only a
1047 // `StreamTarget` asks us to send an XON message, and this tunnel
1048 // originally created the `StreamTarget` to begin with. So this is a
1049 // legitimate bug somewhere in the tunnel code.
1050 return Err(
1051 internal!(
1052 "Could not send an XON message to a join point on a tunnel without a join point",
1053 )
1054 .into()
1055 );
1056 }
1057 };
1058
1059 let Some(leg) = self.circuits.leg_mut(leg_id) else {
1060 // The leg has disappeared. This is fine since the stream may have ended and
1061 // been cleaned up while this `CtrlMsg::MaybeSendXon` message was queued.
1062 // It is possible that is a bug and this is an incorrect leg number, but
1063 // it's not currently possible to differentiate between an incorrect leg
1064 // number and a tunnel leg that has been closed.
1065 debug!("Could not send an XON message on a leg that does not exist. Ignoring.");
1066 return Ok(());
1067 };
1068
1069 let Some(hop) = leg.hop_mut(hop_num) else {
1070 // The hop has disappeared. This is fine since the circuit may have been
1071 // been truncated while the `CtrlMsg::MaybeSendXon` message was queued.
1072 // It is possible that is a bug and this is an incorrect hop number, but
1073 // it's not currently possible to differentiate between an incorrect hop
1074 // number and a circuit hop that has been removed.
1075 debug!("Could not send an XON message on a hop that does not exist. Ignoring.");
1076 return Ok(());
1077 };
1078
1079 let Some(msg) = hop.maybe_send_xon(rate, stream_id)? else {
1080 // Nothing to do.
1081 return Ok(());
1082 };
1083
1084 let cell = AnyRelayMsgOuter::new(Some(stream_id), msg.into());
1085 let cell = SendRelayCell {
1086 hop: Some(hop_num),
1087 early: false,
1088 cell,
1089 };
1090
1091 leg.send_relay_cell(cell).await?;
1092 }
1093 RunOnceCmdInner::HandleSendMe { leg, hop, sendme } => {
1094 let leg = self
1095 .circuits
1096 .leg_mut(leg)
1097 .ok_or_else(|| internal!("leg disappeared?!"))?;
1098 // NOTE: it's okay to await. We are only awaiting on the congestion_signals
1099 // future which *should* resolve immediately
1100 let signals = leg.chan_sender.congestion_signals().await;
1101 leg.handle_sendme(hop, sendme, signals)?;
1102 }
1103 RunOnceCmdInner::FirstHopClockSkew { answer } => {
1104 let res = self.circuits.single_leg_mut().map(|leg| leg.clock_skew());
1105
1106 // don't care if the sender goes away
1107 let _ = answer.send(res.map_err(Into::into));
1108 }
1109 RunOnceCmdInner::CleanShutdown => {
1110 trace!(tunnel_id = %self.tunnel_id, "reactor shutdown due to handled cell");
1111 return Err(ReactorError::Shutdown);
1112 }
1113 RunOnceCmdInner::RemoveLeg { leg, reason } => {
1114 warn!(tunnel_id = %self.tunnel_id, reason = %reason, "removing circuit leg");
1115
1116 let circ = self.circuits.remove(leg)?;
1117 let is_conflux_pending = circ.is_conflux_pending();
1118
1119 // Drop the removed leg. This will cause it to close if it's not already closed.
1120 drop(circ);
1121
1122 // If we reach this point, it means we have more than one leg
1123 // (otherwise the .remove() would've returned a Shutdown error),
1124 // so we expect there to be a ConfluxHandshakeContext installed.
1125
1126 #[cfg(feature = "conflux")]
1127 if is_conflux_pending {
1128 let (error, proto_violation): (_, Option<Error>) = match &reason {
1129 RemoveLegReason::ConfluxHandshakeTimeout => {
1130 (ConfluxHandshakeError::Timeout, None)
1131 }
1132 RemoveLegReason::ConfluxHandshakeErr(e) => {
1133 (ConfluxHandshakeError::Link(e.clone()), Some(e.clone()))
1134 }
1135 RemoveLegReason::ChannelClosed => {
1136 (ConfluxHandshakeError::ChannelClosed, None)
1137 }
1138 };
1139
1140 self.note_conflux_handshake_result(Err(error), proto_violation.is_some())?;
1141
1142 if let Some(e) = proto_violation {
1143 tor_error::warn_report!(
1144 e,
1145 tunnel_id = %self.tunnel_id,
1146 "Malformed conflux handshake, tearing down tunnel",
1147 );
1148
1149 return Err(e.into());
1150 }
1151 }
1152 }
1153 #[cfg(feature = "conflux")]
1154 RunOnceCmdInner::ConfluxHandshakeComplete { leg, cell } => {
1155 // Note: on the client-side, the handshake is considered complete once the
1156 // RELAY_CONFLUX_LINKED_ACK is sent (roughly upon receipt of the LINKED cell).
1157 //
1158 // We're optimistic here, and declare the handshake a success *before*
1159 // sending the LINKED_ACK response. I think this is OK though,
1160 // because if the send_relay_cell() below fails, the reactor will shut
1161 // down anyway. OTOH, marking the handshake as complete slightly early
1162 // means that on the happy path, the circuit is marked as usable sooner,
1163 // instead of blocking on the sending of the LINKED_ACK.
1164 self.note_conflux_handshake_result(Ok(()), false)?;
1165
1166 let res = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
1167
1168 res?;
1169 }
1170 #[cfg(feature = "conflux")]
1171 RunOnceCmdInner::Link { circuits, answer } => {
1172 // Add the specified circuits to our conflux set,
1173 // and send a LINK cell down each unlinked leg.
1174 //
1175 // NOTE: this will block the reactor until all the cells are sent.
1176 self.handle_link_circuits(circuits, answer).await?;
1177 }
1178 #[cfg(feature = "conflux")]
1179 RunOnceCmdInner::Enqueue { leg, msg } => {
1180 let entry = ConfluxHeapEntry { leg_id: leg, msg };
1181 self.ooo_msgs.push(entry);
1182 }
1183 #[cfg(feature = "circ-padding")]
1184 RunOnceCmdInner::PaddingAction { leg, padding_event } => {
1185 // TODO: If we someday move back to having a per-circuit reactor,
1186 // this event would logically belong there, not on the tunnel reactor.
1187 self.circuits.run_padding_event(leg, padding_event).await?;
1188 }
1189 }
1190
1191 Ok(())
1192 }
1193
1194 /// Wait for a [`CtrlMsg::Create`] to come along to set up the circuit.
1195 ///
1196 /// Returns an error if an unexpected `CtrlMsg` is received.
1197 #[instrument(level = "trace", skip_all)]
1198 async fn wait_for_create(&mut self) -> StdResult<(), ReactorError> {
1199 let msg = select_biased! {
1200 res = self.command.next() => {
1201 let cmd = unwrap_or_shutdown!(self, res, "shutdown channel drop")?;
1202 match cmd {
1203 CtrlCmd::Shutdown => return self.handle_shutdown().map(|_| ()),
1204 #[cfg(test)]
1205 CtrlCmd::AddFakeHop {
1206 relay_cell_format: format,
1207 fwd_lasthop,
1208 rev_lasthop,
1209 peer_id,
1210 params,
1211 done,
1212 } => {
1213 let leg = self.circuits.single_leg_mut()?;
1214 leg.handle_add_fake_hop(format, fwd_lasthop, rev_lasthop, peer_id, ¶ms, done);
1215 return Ok(())
1216 },
1217 _ => {
1218 trace!("reactor shutdown due to unexpected command: {:?}", cmd);
1219 return Err(Error::CircProto(format!("Unexpected control {cmd:?} on client circuit")).into());
1220 }
1221 }
1222 },
1223 res = self.control.next() => unwrap_or_shutdown!(self, res, "control drop")?,
1224 };
1225
1226 match msg {
1227 CtrlMsg::Create {
1228 recv_created,
1229 handshake,
1230 settings,
1231 done,
1232 } => {
1233 // TODO(conflux): instead of crashing the reactor, it might be better
1234 // to send the error via the done channel instead
1235 let leg = self.circuits.single_leg_mut()?;
1236 leg.handle_create(recv_created, handshake, settings, done)
1237 .await
1238 }
1239 _ => {
1240 trace!("reactor shutdown due to unexpected cell: {:?}", msg);
1241
1242 Err(Error::CircProto(format!("Unexpected {msg:?} cell on client circuit")).into())
1243 }
1244 }
1245 }
1246
1247 /// Add the specified handshake result to our `ConfluxHandshakeContext`.
1248 ///
1249 /// If all the circuits we were waiting on have finished the conflux handshake,
1250 /// the `ConfluxHandshakeContext` is consumed, and the results we have collected
1251 /// are sent to the handshake initiator.
1252 #[cfg(feature = "conflux")]
1253 #[instrument(level = "trace", skip_all)]
1254 fn note_conflux_handshake_result(
1255 &mut self,
1256 res: StdResult<(), ConfluxHandshakeError>,
1257 reactor_is_closing: bool,
1258 ) -> StdResult<(), ReactorError> {
1259 let tunnel_complete = match self.conflux_hs_ctx.as_mut() {
1260 Some(conflux_ctx) => {
1261 conflux_ctx.results.push(res);
1262 // Whether all the legs have finished linking:
1263 conflux_ctx.results.len() == conflux_ctx.num_legs
1264 }
1265 None => {
1266 return Err(internal!("no conflux handshake context").into());
1267 }
1268 };
1269
1270 if tunnel_complete || reactor_is_closing {
1271 // Time to remove the conflux handshake context
1272 // and extract the results we have collected
1273 let conflux_ctx = self.conflux_hs_ctx.take().expect("context disappeared?!");
1274
1275 let success_count = conflux_ctx.results.iter().filter(|res| res.is_ok()).count();
1276 let leg_count = conflux_ctx.results.len();
1277
1278 info!(
1279 tunnel_id = %self.tunnel_id,
1280 "conflux tunnel ready ({success_count}/{leg_count} circuits successfully linked)",
1281 );
1282
1283 send_conflux_outcome(conflux_ctx.answer, Ok(conflux_ctx.results))?;
1284
1285 // We don't expect to receive any more handshake results,
1286 // at least not until we get another LinkCircuits control message,
1287 // which will install a new ConfluxHandshakeCtx with a channel
1288 // for us to send updates on
1289 }
1290
1291 Ok(())
1292 }
1293
1294 /// Prepare a `SendRelayCell` request, and install the given meta-cell handler.
1295 fn prepare_msg_and_install_handler(
1296 &mut self,
1297 msg: Option<AnyRelayMsgOuter>,
1298 handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
1299 ) -> Result<Option<SendRelayCell>> {
1300 let msg = msg
1301 .map(|msg| {
1302 let handlers = &mut self.cell_handlers;
1303 let handler = handler
1304 .as_ref()
1305 .or(handlers.meta_handler.as_ref())
1306 .ok_or_else(|| internal!("tried to use an ended Conversation"))?;
1307 // We should always have a precise HopLocation here so this should never fails but
1308 // in case we have a ::JointPoint, we'll notice.
1309 let hop = handler.expected_hop().hop_num().ok_or(bad_api_usage!(
1310 "MsgHandler doesn't have a precise HopLocation"
1311 ))?;
1312 Ok::<_, crate::Error>(SendRelayCell {
1313 hop: Some(hop),
1314 early: false,
1315 cell: msg,
1316 })
1317 })
1318 .transpose()?;
1319
1320 if let Some(handler) = handler {
1321 self.cell_handlers.set_meta_handler(handler)?;
1322 }
1323
1324 Ok(msg)
1325 }
1326
1327 /// Handle a shutdown request.
1328 fn handle_shutdown(&self) -> StdResult<Option<RunOnceCmdInner>, ReactorError> {
1329 trace!(
1330 tunnel_id = %self.tunnel_id,
1331 "reactor shutdown due to explicit request",
1332 );
1333
1334 Err(ReactorError::Shutdown)
1335 }
1336
1337 /// Handle a request to shutdown the reactor and return the only [`Circuit`] in this tunnel.
1338 ///
1339 /// Returns an error over the `answer` channel if the reactor has no circuits,
1340 /// or more than one circuit. The reactor will shut down regardless.
1341 #[cfg(feature = "conflux")]
1342 fn handle_shutdown_and_return_circuit(
1343 &mut self,
1344 answer: oneshot::Sender<StdResult<Circuit, Bug>>,
1345 ) -> StdResult<(), ReactorError> {
1346 // Don't care if the receiver goes away
1347 let _ = answer.send(self.circuits.take_single_leg());
1348 self.handle_shutdown().map(|_| ())
1349 }
1350
1351 /// Resolves a [`TargetHop`] to a [`HopLocation`].
1352 ///
1353 /// After resolving a `TargetHop::LastHop`,
1354 /// the `HopLocation` can become stale if a single-path circuit is later extended or truncated.
1355 /// This means that the `HopLocation` can become stale from one reactor iteration to the next.
1356 ///
1357 /// It's generally okay to hold on to a (possibly stale) `HopLocation`
1358 /// if you need a fixed hop position in the tunnel.
1359 /// For example if we open a stream to `TargetHop::LastHop`,
1360 /// we would want to store the stream position as a `HopLocation` and not a `TargetHop::LastHop`
1361 /// as we don't want the stream position to change as the tunnel is extended or truncated.
1362 ///
1363 /// Returns [`NoHopsBuiltError`] if trying to resolve `TargetHop::LastHop`
1364 /// and the tunnel has no hops
1365 /// (either has no legs, or has legs which contain no hops).
1366 fn resolve_target_hop(&self, hop: TargetHop) -> StdResult<HopLocation, NoHopsBuiltError> {
1367 match hop {
1368 TargetHop::Hop(hop) => Ok(hop),
1369 TargetHop::LastHop => {
1370 if let Ok(leg) = self.circuits.single_leg() {
1371 let leg_id = leg.unique_id();
1372 // single-path tunnel
1373 let hop = leg.last_hop_num().ok_or(NoHopsBuiltError)?;
1374 Ok(HopLocation::Hop((leg_id, hop)))
1375 } else if !self.circuits.is_empty() {
1376 // multi-path tunnel
1377 Ok(HopLocation::JoinPoint)
1378 } else {
1379 // no legs
1380 Err(NoHopsBuiltError)
1381 }
1382 }
1383 }
1384 }
1385
1386 /// Resolves a [`HopLocation`] to a [`UniqId`] and [`HopNum`].
1387 ///
1388 /// After resolving a `HopLocation::JoinPoint`,
1389 /// the [`UniqId`] and [`HopNum`] can become stale if the primary leg changes.
1390 ///
1391 /// You should try to only resolve to a specific [`UniqId`] and [`HopNum`] immediately before you
1392 /// need them,
1393 /// and you should not hold on to the resolved [`UniqId`] and [`HopNum`] between reactor
1394 /// iterations as the primary leg may change from one iteration to the next.
1395 ///
1396 /// Returns [`NoJoinPointError`] if trying to resolve `HopLocation::JoinPoint`
1397 /// but it does not have a join point.
1398 #[instrument(level = "trace", skip_all)]
1399 fn resolve_hop_location(
1400 &self,
1401 hop: HopLocation,
1402 ) -> StdResult<(UniqId, HopNum), NoJoinPointError> {
1403 match hop {
1404 HopLocation::Hop((leg_id, hop_num)) => Ok((leg_id, hop_num)),
1405 HopLocation::JoinPoint => {
1406 if let Some((leg_id, hop_num)) = self.circuits.primary_join_point() {
1407 Ok((leg_id, hop_num))
1408 } else {
1409 // Attempted to get the join point of a non-multipath tunnel.
1410 Err(NoJoinPointError)
1411 }
1412 }
1413 }
1414 }
1415
1416 /// Resolve a [`TargetHop`] directly into a [`UniqId`] and [`HopNum`].
1417 ///
1418 /// This is a helper function that basically calls both resolve_target_hop and
1419 /// resolve_hop_location back to back.
1420 ///
1421 /// It returns None on failure to resolve meaning that if you want more detailed error on why
1422 /// it failed, explicitly use the resolve_hop_location() and resolve_target_hop() functions.
1423 pub(crate) fn target_hop_to_hopnum_id(&self, hop: TargetHop) -> Option<(UniqId, HopNum)> {
1424 self.resolve_target_hop(hop)
1425 .ok()
1426 .and_then(|resolved| self.resolve_hop_location(resolved).ok())
1427 }
1428
1429 /// Install or remove a padder at a given hop.
1430 #[cfg(feature = "circ-padding-manual")]
1431 fn set_padding_at_hop(
1432 &self,
1433 hop: HopLocation,
1434 padder: Option<super::circuit::padding::CircuitPadder>,
1435 ) -> Result<()> {
1436 let HopLocation::Hop((leg_id, hop_num)) = hop else {
1437 return Err(bad_api_usage!("Padding to the join point is not supported.").into());
1438 };
1439 let circ = self.circuits.leg(leg_id).ok_or(Error::NoSuchHop)?;
1440 circ.set_padding_at_hop(hop_num, padder)?;
1441 Ok(())
1442 }
1443
1444 /// Does congestion control use stream SENDMEs for the given hop?
1445 ///
1446 /// Returns `None` if either the `leg` or `hop` don't exist.
1447 fn uses_stream_sendme(&self, leg: UniqId, hop: HopNum) -> Option<bool> {
1448 self.circuits.uses_stream_sendme(leg, hop)
1449 }
1450
1451 /// Handle a request to link some extra circuits in the reactor's conflux set.
1452 ///
1453 /// The circuits are validated, and if they do not have the same length,
1454 /// or if they do not all have the same last hop, an error is returned on
1455 /// the `answer` channel, and the conflux handshake is *not* initiated.
1456 ///
1457 /// If validation succeeds, the circuits are added to this reactor's conflux set,
1458 /// and the conflux handshake is initiated (by sending a LINK cell on each leg).
1459 ///
1460 /// NOTE: this blocks the reactor main loop until all the cells are sent.
1461 #[cfg(feature = "conflux")]
1462 #[instrument(level = "trace", skip_all)]
1463 async fn handle_link_circuits(
1464 &mut self,
1465 circuits: Vec<Circuit>,
1466 answer: ConfluxLinkResultChannel,
1467 ) -> StdResult<(), ReactorError> {
1468 use tor_error::warn_report;
1469
1470 if self.conflux_hs_ctx.is_some() {
1471 let err = internal!("conflux linking already in progress");
1472 send_conflux_outcome(answer, Err(err.into()))?;
1473
1474 return Ok(());
1475 }
1476
1477 let unlinked_legs = self.circuits.num_unlinked();
1478
1479 // We need to send the LINK cell on each of the new circuits
1480 // and on each of the existing, unlinked legs from self.circuits.
1481 //
1482 // In reality, there can only be one such circuit
1483 // (the "initial" one from the previously single-path tunnel),
1484 // because any circuits that to complete the conflux handshake
1485 // get removed from the set.
1486 let num_legs = circuits.len() + unlinked_legs;
1487
1488 // Note: add_legs validates `circuits`
1489 let res = async {
1490 self.circuits.add_legs(circuits, &self.runtime)?;
1491 self.circuits.link_circuits(&self.runtime).await
1492 }
1493 .await;
1494
1495 if let Err(e) = res {
1496 warn_report!(e, "Failed to link conflux circuits");
1497
1498 send_conflux_outcome(answer, Err(e))?;
1499 } else {
1500 // Save the channel, to notify the user of completion.
1501 self.conflux_hs_ctx = Some(ConfluxHandshakeCtx {
1502 answer,
1503 num_legs,
1504 results: Default::default(),
1505 });
1506 }
1507
1508 Ok(())
1509 }
1510}
1511
1512/// Notify the conflux handshake initiator of the handshake outcome.
1513///
1514/// Returns an error if the initiator has done away.
1515#[cfg(feature = "conflux")]
1516fn send_conflux_outcome(
1517 tx: ConfluxLinkResultChannel,
1518 res: Result<ConfluxHandshakeResult>,
1519) -> StdResult<(), ReactorError> {
1520 if tx.send(res).is_err() {
1521 tracing::warn!("conflux initiator went away before handshake completed?");
1522 return Err(ReactorError::Shutdown);
1523 }
1524
1525 Ok(())
1526}
1527
1528/// The tunnel does not have any hops.
1529#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
1530#[non_exhaustive]
1531#[error("no hops have been built for this tunnel")]
1532pub(crate) struct NoHopsBuiltError;
1533
1534/// The tunnel does not have a join point.
1535#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
1536#[non_exhaustive]
1537#[error("the tunnel does not have a join point")]
1538pub(crate) struct NoJoinPointError;
1539
1540impl CellHandlers {
1541 /// Try to install a given meta-cell handler to receive any unusual cells on
1542 /// this circuit, along with a result channel to notify on completion.
1543 fn set_meta_handler(&mut self, handler: Box<dyn MetaCellHandler + Send>) -> Result<()> {
1544 if self.meta_handler.is_none() {
1545 self.meta_handler = Some(handler);
1546 Ok(())
1547 } else {
1548 Err(Error::from(internal!(
1549 "Tried to install a meta-cell handler before the old one was gone."
1550 )))
1551 }
1552 }
1553
1554 /// Try to install a given cell handler on this circuit.
1555 #[cfg(feature = "hs-service")]
1556 fn set_incoming_stream_req_handler(
1557 &mut self,
1558 handler: IncomingStreamRequestHandler,
1559 ) -> Result<()> {
1560 if self.incoming_stream_req_handler.is_none() {
1561 self.incoming_stream_req_handler = Some(handler);
1562 Ok(())
1563 } else {
1564 Err(Error::from(internal!(
1565 "Tried to install a BEGIN cell handler before the old one was gone."
1566 )))
1567 }
1568 }
1569}
1570
1571#[cfg(test)]
1572mod test {
1573 // Tested in [`crate::client::circuit::test`].
1574}