tor_proto/client/reactor.rs
1//! Code to handle incoming cells on a circuit.
2//!
3//! ## On message validation
4//!
5//! There are three steps for validating an incoming message on a stream:
6//!
7//! 1. Is the message contextually appropriate? (e.g., no more than one
8//! `CONNECTED` message per stream.) This is handled by calling
9//! [`CmdChecker::check_msg`](crate::stream::cmdcheck::CmdChecker::check_msg).
10//! 2. Does the message comply with flow-control rules? (e.g., no more SENDMEs
11//! than we've sent data for.) This is handled within the reactor by the
12//! `StreamFlowCtrl`. For half-closed streams which don't send stream
13//! SENDMEs, an additional receive-window check is performed in the
14//! `halfstream` module.
15//! 3. Does the message have an acceptable command type, and is the message
16//! well-formed? For open streams, the streams themselves handle this check.
17//! For half-closed streams, the reactor handles it by calling
18//! `consume_checked_msg()`.
19
20pub(crate) mod circuit;
21mod conflux;
22mod control;
23
24use crate::circuit::circhop::SendRelayCell;
25use crate::circuit::{CircuitRxReceiver, UniqId};
26use crate::client::circuit::ClientCircChanMsg;
27use crate::client::circuit::padding::{PaddingController, PaddingEvent, PaddingEventStream};
28use crate::client::{HopLocation, TargetHop};
29use crate::crypto::cell::HopNum;
30use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
31use crate::memquota::CircuitAccount;
32use crate::stream::CloseStreamBehavior;
33use crate::streammap;
34use crate::tunnel::{TunnelId, TunnelScopedCircId};
35use crate::util::err::ReactorError;
36use crate::util::skew::ClockSkew;
37use crate::util::timeout::TimeoutEstimator;
38use crate::{Error, Result};
39use circuit::Circuit;
40use conflux::ConfluxSet;
41use control::ControlHandler;
42use std::cmp::Ordering;
43use std::collections::BinaryHeap;
44use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
45use tor_cell::relaycell::msg::Sendme;
46use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId, UnparsedRelayMsg};
47use tor_error::{Bug, bad_api_usage, debug_report, internal, into_bad_api_usage};
48use tor_rtcompat::{DynTimeProvider, SleepProvider};
49
50use cfg_if::cfg_if;
51use futures::StreamExt;
52use futures::channel::mpsc;
53use futures::{FutureExt as _, select_biased};
54use oneshot_fused_workaround as oneshot;
55
56use std::result::Result as StdResult;
57use std::sync::Arc;
58use std::time::Duration;
59
60use crate::channel::Channel;
61use crate::conflux::msghandler::RemoveLegReason;
62use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey};
63use circuit::CircuitCmd;
64use derive_more::From;
65use smallvec::smallvec;
66use tor_cell::chancell::CircId;
67use tor_llcrypto::pk;
68use tracing::{debug, info, instrument, trace, warn};
69
70use super::circuit::{MutableState, TunnelMutableState};
71use crate::circuit::reactor::ReactorResultChannel;
72
73#[cfg(feature = "hs-service")]
74use crate::stream::incoming::IncomingStreamRequestHandler;
75
76#[cfg(feature = "conflux")]
77use {
78 crate::conflux::msghandler::{ConfluxCmd, OooRelayMsg},
79 crate::util::err::ConfluxHandshakeError,
80};
81
82pub(super) use control::{CtrlCmd, CtrlMsg, FlowCtrlMsg};
83
84/// Contains a list of conflux handshake results.
85#[cfg(feature = "conflux")]
86pub(super) type ConfluxHandshakeResult = Vec<StdResult<(), ConfluxHandshakeError>>;
87
88/// The type of oneshot channel used to inform reactor users of the outcome
89/// of a client-side conflux handshake.
90///
91/// Contains a list of handshake results, one for each circuit that we were asked
92/// to link in the tunnel.
93#[cfg(feature = "conflux")]
94pub(super) type ConfluxLinkResultChannel = ReactorResultChannel<ConfluxHandshakeResult>;
95
96/// A handshake type, to be used when creating circuit hops.
97#[derive(Clone, Debug)]
98pub(crate) enum CircuitHandshake {
99 /// Use the CREATE_FAST handshake.
100 CreateFast,
101 /// Use the ntor handshake.
102 Ntor {
103 /// The public key of the relay.
104 public_key: NtorPublicKey,
105 /// The Ed25519 identity of the relay, which is verified against the
106 /// identity held in the circuit's channel.
107 ed_identity: pk::ed25519::Ed25519Identity,
108 },
109 /// Use the ntor-v3 handshake.
110 NtorV3 {
111 /// The public key of the relay.
112 public_key: NtorV3PublicKey,
113 },
114}
115
116// TODO: the RunOnceCmd/RunOnceCmdInner/CircuitCmd/CircuitEvent enum
117// proliferation is a bit bothersome, but unavoidable with the current design.
118//
119// We should consider getting rid of some of these enums (if possible),
120// and coming up with more intuitive names.
121
122/// One or more [`RunOnceCmdInner`] to run inside [`Reactor::run_once`].
123#[derive(From, Debug)]
124#[allow(clippy::large_enum_variant)] // TODO #2003: resolve this
125enum RunOnceCmd {
126 /// Run a single `RunOnceCmdInner` command.
127 Single(RunOnceCmdInner),
128 /// Run multiple `RunOnceCmdInner` commands.
129 //
130 // Note: this whole enum *could* be replaced with Vec<RunOnceCmdInner>,
131 // but most of the time we're only going to have *one* RunOnceCmdInner
132 // to run per run_once() loop. The enum enables us avoid the extra heap
133 // allocation for the `RunOnceCmd::Single` case.
134 Multiple(Vec<RunOnceCmdInner>),
135}
136
137/// Instructions for running something in the reactor loop.
138///
139/// Run at the end of [`Reactor::run_once`].
140//
141// TODO: many of the variants of this enum have an identical CtrlMsg counterpart.
142// We should consider making each variant a tuple variant and deduplicating the fields.
143#[derive(educe::Educe)]
144#[educe(Debug)]
145enum RunOnceCmdInner {
146 /// Send a RELAY cell.
147 Send {
148 /// The leg the cell should be sent on.
149 leg: UniqId,
150 /// The cell to send.
151 cell: SendRelayCell,
152 /// A channel for sending completion notifications.
153 done: Option<ReactorResultChannel<()>>,
154 },
155 /// Send a given control message on this circuit, and install a control-message handler to
156 /// receive responses.
157 #[cfg(feature = "send-control-msg")]
158 SendMsgAndInstallHandler {
159 /// The message to send, if any
160 msg: Option<AnyRelayMsgOuter>,
161 /// A message handler to install.
162 ///
163 /// If this is `None`, there must already be a message handler installed
164 #[educe(Debug(ignore))]
165 handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
166 /// A sender that we use to tell the caller that the message was sent
167 /// and the handler installed.
168 done: oneshot::Sender<Result<()>>,
169 },
170 /// Handle a SENDME message.
171 HandleSendMe {
172 /// The leg the SENDME was received on.
173 leg: UniqId,
174 /// The hop number.
175 hop: HopNum,
176 /// The SENDME message to handle.
177 sendme: Sendme,
178 },
179 /// Begin a stream with the provided hop in this circuit.
180 ///
181 /// Uses the provided stream ID, and sends the provided message to that hop.
182 BeginStream {
183 /// The cell to send.
184 cell: Result<(SendRelayCell, StreamId)>,
185 /// The location of the hop on the tunnel. We don't use this (and `Circuit`s shouldn't need
186 /// to worry about legs anyways), but need it so that we can pass it back in `done` to the
187 /// caller.
188 hop: HopLocation,
189 /// The circuit leg to begin the stream on.
190 leg: UniqId,
191 /// Oneshot channel to notify on completion, with the allocated stream ID.
192 done: ReactorResultChannel<(StreamId, HopLocation, RelayCellFormat)>,
193 },
194 /// Consider sending an XON message with the given `rate`.
195 MaybeSendXon {
196 /// The drain rate to advertise in the XON message.
197 rate: XonKbpsEwma,
198 /// The ID of the stream to send the message on.
199 stream_id: StreamId,
200 /// The location of the hop on the tunnel.
201 hop: HopLocation,
202 },
203 /// Close the specified stream.
204 CloseStream {
205 /// The hop number.
206 hop: HopLocation,
207 /// The ID of the stream to close.
208 sid: StreamId,
209 /// The stream-closing behavior.
210 behav: CloseStreamBehavior,
211 /// The reason for closing the stream.
212 reason: streammap::TerminateReason,
213 /// A channel for sending completion notifications.
214 done: Option<ReactorResultChannel<()>>,
215 },
216 /// Get the clock skew claimed by the first hop of the circuit.
217 FirstHopClockSkew {
218 /// Oneshot channel to return the clock skew.
219 answer: oneshot::Sender<StdResult<ClockSkew, Bug>>,
220 },
221 /// Remove a circuit leg from the conflux set.
222 RemoveLeg {
223 /// The circuit leg to remove.
224 leg: UniqId,
225 /// The reason for removal.
226 ///
227 /// This is only used for conflux circuits that get removed
228 /// before the conflux handshake is complete.
229 ///
230 /// The [`RemoveLegReason`] is mapped by the reactor to a
231 /// [`ConfluxHandshakeError`] that is sent to the initiator of the
232 /// handshake to indicate the reason the handshake failed.
233 reason: RemoveLegReason,
234 },
235 /// A circuit has completed the conflux handshake,
236 /// and wants to send the specified cell.
237 ///
238 /// This is similar to [`RunOnceCmdInner::Send`],
239 /// but needs to remain a separate variant,
240 /// because in addition to instructing the reactor to send a cell,
241 /// it also notifies it that the conflux handshake is complete on the specified `leg`.
242 /// This enables the reactor to save the handshake result (`Ok(())`),
243 /// and, if there are no other legs still in the handshake phase,
244 /// send the result to the handshake initiator.
245 #[cfg(feature = "conflux")]
246 ConfluxHandshakeComplete {
247 /// The circuit leg that has completed the handshake,
248 /// This is the leg the cell should be sent on.
249 leg: UniqId,
250 /// The cell to send.
251 cell: SendRelayCell,
252 },
253 /// Send a LINK cell on each of the unlinked circuit legs in the conflux set of this reactor.
254 #[cfg(feature = "conflux")]
255 Link {
256 /// The circuits to link into the tunnel
257 #[educe(Debug(ignore))]
258 circuits: Vec<Circuit>,
259 /// Oneshot channel for notifying of conflux handshake completion.
260 answer: ConfluxLinkResultChannel,
261 },
262 /// Enqueue an out-of-order cell in ooo_msg.
263 #[cfg(feature = "conflux")]
264 Enqueue {
265 /// The leg the entry originated from.
266 leg: UniqId,
267 /// The out-of-order message.
268 msg: OooRelayMsg,
269 },
270 /// Take a padding-related event on a circuit leg.
271 #[cfg(feature = "circ-padding")]
272 PaddingAction {
273 /// The leg to event on.
274 leg: UniqId,
275 /// The event to take.
276 padding_event: PaddingEvent,
277 },
278 /// Perform a clean shutdown on this circuit.
279 CleanShutdown,
280}
281
282impl RunOnceCmdInner {
283 /// Create a [`RunOnceCmdInner`] out of a [`CircuitCmd`] and [`UniqId`].
284 fn from_circuit_cmd(leg: UniqId, cmd: CircuitCmd) -> Self {
285 match cmd {
286 CircuitCmd::Send(cell) => Self::Send {
287 leg,
288 cell,
289 done: None,
290 },
291 CircuitCmd::HandleSendMe { hop, sendme } => Self::HandleSendMe { leg, hop, sendme },
292 CircuitCmd::CloseStream {
293 hop,
294 sid,
295 behav,
296 reason,
297 } => Self::CloseStream {
298 hop: HopLocation::Hop((leg, hop)),
299 sid,
300 behav,
301 reason,
302 done: None,
303 },
304 #[cfg(feature = "conflux")]
305 CircuitCmd::Conflux(ConfluxCmd::RemoveLeg(reason)) => Self::RemoveLeg { leg, reason },
306 #[cfg(feature = "conflux")]
307 CircuitCmd::Conflux(ConfluxCmd::HandshakeComplete { hop, early, cell }) => {
308 let cell = SendRelayCell {
309 hop: Some(hop),
310 early,
311 cell,
312 };
313 Self::ConfluxHandshakeComplete { leg, cell }
314 }
315 #[cfg(feature = "conflux")]
316 CircuitCmd::Enqueue(msg) => Self::Enqueue { leg, msg },
317 CircuitCmd::CleanShutdown => Self::CleanShutdown,
318 }
319 }
320}
321
322/// A command to execute at the end of [`Reactor::run_once`].
323#[derive(From, Debug)]
324#[allow(clippy::large_enum_variant)] // TODO #2003: should we resolve this?
325enum CircuitEvent {
326 /// Run a single `CircuitCmd` command.
327 RunCmd {
328 /// The unique identifier of the circuit leg to run the command on
329 leg: UniqId,
330 /// The command to run.
331 cmd: CircuitCmd,
332 },
333 /// Handle a control message
334 HandleControl(CtrlMsg),
335 /// Handle an input message.
336 HandleCell {
337 /// The unique identifier of the circuit leg the message was received on.
338 leg: UniqId,
339 /// The message to handle.
340 cell: ClientCircChanMsg,
341 },
342 /// Remove the specified circuit leg from the conflux set.
343 ///
344 /// Returned whenever a single circuit leg needs to be be removed
345 /// from the reactor's conflux set, without necessarily tearing down
346 /// the whole set or shutting down the reactor.
347 ///
348 /// Note: this event *can* cause the reactor to shut down
349 /// (and the conflux set to be closed).
350 ///
351 /// See the [`ConfluxSet::remove`] docs for more on the exact behavior of this command.
352 RemoveLeg {
353 /// The leg to remove.
354 leg: UniqId,
355 /// The reason for removal.
356 ///
357 /// This is only used for conflux circuits that get removed
358 /// before the conflux handshake is complete.
359 ///
360 /// The [`RemoveLegReason`] is mapped by the reactor to a
361 /// [`ConfluxHandshakeError`] that is sent to the initiator of the
362 /// handshake to indicate the reason the handshake failed.
363 reason: RemoveLegReason,
364 },
365 /// Take some event (blocking or unblocking a circuit, or sending padding)
366 /// based on the circuit padding backend code.
367 PaddingAction {
368 /// The leg on which to take the padding event .
369 leg: UniqId,
370 /// The event to take.
371 padding_event: PaddingEvent,
372 },
373 /// Protocol violation. This leads for now to the close of the circuit reactor. The
374 /// error causing the violation is set in err.
375 ProtoViolation {
376 /// The error that causes this protocol violation.
377 err: crate::Error,
378 },
379}
380
381impl CircuitEvent {
382 /// Return the ordering with which we should handle this event
383 /// within a list of events returned by a single call to next_circ_event().
384 ///
385 /// NOTE: Please do not make this any more complicated:
386 /// It is a consequence of a kludge that we need this sorting at all.
387 /// Assuming that eventually, we switch away from the current
388 /// poll-oriented `next_circ_event` design,
389 /// we may be able to get rid of this entirely.
390 fn order_within_batch(&self) -> u8 {
391 use CircuitEvent as CA;
392 use PaddingEvent as PE;
393 // This immediate state MUST NOT be used for events emitting cells. At the moment, it is
394 // only used by the protocol violation event which leads to a shutdown of the reactor.
395 const IMMEDIATE: u8 = 0;
396 const EARLY: u8 = 1;
397 const NORMAL: u8 = 2;
398 const LATE: u8 = 3;
399
400 // We use this ordering to move any "StartBlocking" to the _end_ of a batch and
401 // "StopBlocking" to the start.
402 //
403 // This way, we can be sure that we will handle any "send data" operations
404 // (and tell the Padder about them) _before_ we tell the Padder
405 // that we have blocked the circuit.
406 //
407 // This keeps things a bit more logical.
408 match self {
409 CA::RunCmd { .. } => NORMAL,
410 CA::HandleControl(..) => NORMAL,
411 CA::HandleCell { .. } => NORMAL,
412 CA::RemoveLeg { .. } => NORMAL,
413 #[cfg(feature = "circ-padding")]
414 CA::PaddingAction { padding_event, .. } => match padding_event {
415 PE::StopBlocking => EARLY,
416 PE::SendPadding(..) => NORMAL,
417 PE::StartBlocking(..) => LATE,
418 },
419 #[cfg(not(feature = "circ-padding"))]
420 CA::PaddingAction { .. } => NORMAL,
421 CA::ProtoViolation { .. } => IMMEDIATE,
422 }
423 }
424}
425
426/// An object that's waiting for a meta cell (one not associated with a stream) in order to make
427/// progress.
428///
429/// # Background
430///
431/// The `Reactor` can't have async functions that send and receive cells, because its job is to
432/// send and receive cells: if one of its functions tried to do that, it would just hang forever.
433///
434/// To get around this problem, the reactor can send some cells, and then make one of these
435/// `MetaCellHandler` objects, which will be run when the reply arrives.
436pub(crate) trait MetaCellHandler: Send {
437 /// The hop we're expecting the message to come from. This is compared against the hop
438 /// from which we actually receive messages, and an error is thrown if the two don't match.
439 fn expected_hop(&self) -> HopLocation;
440 /// Called when the message we were waiting for arrives.
441 ///
442 /// Gets a copy of the `Reactor` in order to do anything it likes there.
443 ///
444 /// If this function returns an error, the reactor will shut down.
445 fn handle_msg(
446 &mut self,
447 msg: UnparsedRelayMsg,
448 reactor: &mut Circuit,
449 ) -> Result<MetaCellDisposition>;
450}
451
452/// A possible successful outcome of giving a message to a [`MsgHandler`](super::msghandler::MsgHandler).
453#[derive(Debug, Clone)]
454#[cfg_attr(feature = "send-control-msg", visibility::make(pub))]
455#[non_exhaustive]
456pub(crate) enum MetaCellDisposition {
457 /// The message was consumed; the handler should remain installed.
458 #[cfg(feature = "send-control-msg")]
459 Consumed,
460 /// The message was consumed; the handler should be uninstalled.
461 ConversationFinished,
462 /// The message was consumed; the circuit should be closed.
463 #[cfg(feature = "send-control-msg")]
464 CloseCirc,
465 // TODO: Eventually we might want the ability to have multiple handlers
466 // installed, and to let them say "not for me, maybe for somebody else?".
467 // But right now we don't need that.
468}
469
470/// Unwrap the specified [`Option`], returning a [`ReactorError::Shutdown`] if it is `None`.
471///
472/// This is a macro instead of a function to work around borrowck errors
473/// in the select! from run_once().
474macro_rules! unwrap_or_shutdown {
475 ($self:expr, $res:expr, $reason:expr) => {{
476 match $res {
477 None => {
478 trace!(
479 tunnel_id = %$self.tunnel_id,
480 reason = %$reason,
481 "reactor shutdown"
482 );
483 Err(ReactorError::Shutdown)
484 }
485 Some(v) => Ok(v),
486 }
487 }};
488}
489
490/// Object to handle incoming cells and background tasks on a circuit
491///
492/// This type is returned when you finish a circuit; you need to spawn a
493/// new task that calls `run()` on it.
494#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
495pub struct Reactor {
496 /// Receiver for control messages for this reactor, sent by `ClientCirc` objects.
497 ///
498 /// This channel is polled in [`Reactor::run_once`], but only if the `chan_sender` sink
499 /// is ready to accept cells.
500 control: mpsc::UnboundedReceiver<CtrlMsg>,
501 /// Receiver for command messages for this reactor, sent by `ClientCirc` objects.
502 ///
503 /// This channel is polled in [`Reactor::run_once`].
504 ///
505 /// NOTE: this is a separate channel from `control`, because some messages
506 /// have higher priority and need to be handled even if the `chan_sender` is not
507 /// ready (whereas `control` messages are not read until the `chan_sender` sink
508 /// is ready to accept cells).
509 command: mpsc::UnboundedReceiver<CtrlCmd>,
510 /// A oneshot sender that is used to alert other tasks when this reactor is
511 /// finally dropped.
512 ///
513 /// It is a sender for Void because we never actually want to send anything here;
514 /// we only want to generate canceled events.
515 #[allow(dead_code)] // the only purpose of this field is to be dropped.
516 reactor_closed_tx: oneshot::Sender<void::Void>,
517 /// A set of circuits that form a tunnel.
518 ///
519 /// Contains 1 or more circuits.
520 ///
521 /// Circuits may be added to this set throughout the lifetime of the reactor.
522 ///
523 /// Sometimes, the reactor will remove circuits from this set,
524 /// for example if the `LINKED` message takes too long to arrive,
525 /// or if congestion control negotiation fails.
526 /// The reactor will continue running with the remaining circuits.
527 /// It will shut down if *all* the circuits are removed.
528 ///
529 // TODO(conflux): document all the reasons why the reactor might
530 // chose to tear down a circuit or tunnel (timeouts, protocol violations, etc.)
531 circuits: ConfluxSet,
532 /// An identifier for logging about this tunnel reactor.
533 tunnel_id: TunnelId,
534 /// Handlers, shared with `Circuit`.
535 cell_handlers: CellHandlers,
536 /// The time provider, used for conflux handshake timeouts.
537 runtime: DynTimeProvider,
538 /// The conflux handshake context, if there is an on-going handshake.
539 ///
540 /// Set to `None` if this is a single-path tunnel,
541 /// or if none of the circuit legs from our conflux set
542 /// are currently in the conflux handshake phase.
543 #[cfg(feature = "conflux")]
544 conflux_hs_ctx: Option<ConfluxHandshakeCtx>,
545 /// A min-heap buffering all the out-of-order messages received so far.
546 ///
547 /// TODO(conflux): this becomes a DoS vector unless we impose a limit
548 /// on its size. We should make this participate in the memquota memory
549 /// tracking system, somehow.
550 #[cfg(feature = "conflux")]
551 ooo_msgs: BinaryHeap<ConfluxHeapEntry>,
552}
553
554/// The context for an on-going conflux handshake.
555#[cfg(feature = "conflux")]
556struct ConfluxHandshakeCtx {
557 /// A channel for notifying the caller of the outcome of a CONFLUX_LINK request.
558 answer: ConfluxLinkResultChannel,
559 /// The number of legs that are currently doing the handshake.
560 num_legs: usize,
561 /// The handshake results we have collected so far.
562 results: ConfluxHandshakeResult,
563}
564
565/// An out-of-order message buffered in [`Reactor::ooo_msgs`].
566#[derive(Debug)]
567#[cfg(feature = "conflux")]
568struct ConfluxHeapEntry {
569 /// The leg id this message came from.
570 leg_id: UniqId,
571 /// The out of order message
572 msg: OooRelayMsg,
573}
574
575#[cfg(feature = "conflux")]
576impl Ord for ConfluxHeapEntry {
577 fn cmp(&self, other: &Self) -> Ordering {
578 self.msg.cmp(&other.msg)
579 }
580}
581
582#[cfg(feature = "conflux")]
583impl PartialOrd for ConfluxHeapEntry {
584 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
585 Some(self.cmp(other))
586 }
587}
588
589#[cfg(feature = "conflux")]
590impl PartialEq for ConfluxHeapEntry {
591 fn eq(&self, other: &Self) -> bool {
592 self.msg == other.msg
593 }
594}
595
596#[cfg(feature = "conflux")]
597impl Eq for ConfluxHeapEntry {}
598
599/// Cell handlers, shared between the Reactor and its underlying `Circuit`s.
600struct CellHandlers {
601 /// A handler for a meta cell, together with a result channel to notify on completion.
602 ///
603 /// NOTE(prop349): this is part of Arti's "Base Circuit Hop Handler".
604 ///
605 /// Upon sending an EXTEND cell, the [`ControlHandler`] sets this handler
606 /// to [`CircuitExtender`](circuit::extender::CircuitExtender).
607 /// The handler is then used in [`Circuit::handle_meta_cell`] for handling
608 /// all the meta cells received on the circuit that are not SENDMEs or TRUNCATE
609 /// (which are handled separately) or conflux cells
610 /// (which are handled by the conflux handlers).
611 ///
612 /// The handler is uninstalled after the receipt of the EXTENDED cell,
613 /// so any subsequent EXTENDED cells will cause the circuit to be torn down.
614 meta_handler: Option<Box<dyn MetaCellHandler + Send>>,
615 /// A handler for incoming stream requests.
616 #[cfg(feature = "hs-service")]
617 incoming_stream_req_handler: Option<IncomingStreamRequestHandler>,
618}
619
620impl Reactor {
621 /// Create a new circuit reactor.
622 ///
623 /// The reactor will send outbound messages on `channel`, receive incoming
624 /// messages on `input`, and identify this circuit by the channel-local
625 /// [`CircId`] provided.
626 ///
627 /// The internal unique identifier for this circuit will be `unique_id`.
628 #[allow(clippy::type_complexity, clippy::too_many_arguments)] // TODO
629 pub(super) fn new(
630 channel: Arc<Channel>,
631 channel_id: CircId,
632 unique_id: UniqId,
633 input: CircuitRxReceiver,
634 runtime: DynTimeProvider,
635 memquota: CircuitAccount,
636 padding_ctrl: PaddingController,
637 padding_stream: PaddingEventStream,
638 timeouts: Arc<dyn TimeoutEstimator + Send>,
639 ) -> (
640 Self,
641 mpsc::UnboundedSender<CtrlMsg>,
642 mpsc::UnboundedSender<CtrlCmd>,
643 oneshot::Receiver<void::Void>,
644 Arc<TunnelMutableState>,
645 ) {
646 let tunnel_id = TunnelId::next();
647 let (control_tx, control_rx) = mpsc::unbounded();
648 let (command_tx, command_rx) = mpsc::unbounded();
649 let mutable = Arc::new(MutableState::default());
650
651 let (reactor_closed_tx, reactor_closed_rx) = oneshot::channel();
652
653 let cell_handlers = CellHandlers {
654 meta_handler: None,
655 #[cfg(feature = "hs-service")]
656 incoming_stream_req_handler: None,
657 };
658
659 let unique_id = TunnelScopedCircId::new(tunnel_id, unique_id);
660 let circuit_leg = Circuit::new(
661 runtime.clone(),
662 channel,
663 channel_id,
664 unique_id,
665 input,
666 memquota,
667 Arc::clone(&mutable),
668 padding_ctrl,
669 padding_stream,
670 timeouts,
671 );
672
673 let (circuits, mutable) = ConfluxSet::new(tunnel_id, circuit_leg);
674
675 let reactor = Reactor {
676 circuits,
677 control: control_rx,
678 command: command_rx,
679 reactor_closed_tx,
680 tunnel_id,
681 cell_handlers,
682 runtime,
683 #[cfg(feature = "conflux")]
684 conflux_hs_ctx: None,
685 #[cfg(feature = "conflux")]
686 ooo_msgs: Default::default(),
687 };
688
689 (reactor, control_tx, command_tx, reactor_closed_rx, mutable)
690 }
691
692 /// Launch the reactor, and run until the circuit closes or we
693 /// encounter an error.
694 ///
695 /// Once this method returns, the circuit is dead and cannot be
696 /// used again.
697 #[instrument(level = "trace", skip_all)]
698 pub async fn run(mut self) -> Result<()> {
699 trace!(tunnel_id = %self.tunnel_id, "Running tunnel reactor");
700 let result: Result<()> = loop {
701 match self.run_once().await {
702 Ok(()) => (),
703 Err(ReactorError::Shutdown) => break Ok(()),
704 Err(ReactorError::Err(e)) => break Err(e),
705 }
706 };
707
708 // Log that the reactor stopped, possibly with the associated error as a report.
709 // May log at a higher level depending on the error kind.
710 const MSG: &str = "Tunnel reactor stopped";
711 match &result {
712 Ok(()) => trace!(tunnel_id = %self.tunnel_id, "{MSG}"),
713 Err(e) => debug_report!(e, tunnel_id = %self.tunnel_id, "{MSG}"),
714 }
715
716 result
717 }
718
719 /// Helper for run: doesn't mark the circuit closed on finish. Only
720 /// processes one cell or control message.
721 #[instrument(level = "trace", skip_all)]
722 async fn run_once(&mut self) -> StdResult<(), ReactorError> {
723 // If all the circuits are closed, shut down the reactor
724 if self.circuits.is_empty() {
725 trace!(
726 tunnel_id = %self.tunnel_id,
727 "Tunnel reactor shutting down: all circuits have closed",
728 );
729
730 return Err(ReactorError::Shutdown);
731 }
732
733 // If this is a single path circuit, we need to wait until the first hop
734 // is created before doing anything else
735 let single_path_with_hops = self
736 .circuits
737 .single_leg_mut()
738 .is_ok_and(|leg| !leg.has_hops());
739 if single_path_with_hops {
740 self.wait_for_create().await?;
741
742 return Ok(());
743 }
744
745 // Prioritize the buffered messages.
746 //
747 // Note: if any of the messages are ready to be handled,
748 // this will block the reactor until we are done processing them
749 //
750 // TODO circpad: If this is a problem, we might want to re-order things so that we
751 // prioritize padding instead. On the other hand, this should be fixed by refactoring
752 // circuit and tunnel reactors, so we might do well to just leave it alone for now.
753 #[cfg(feature = "conflux")]
754 self.try_dequeue_ooo_msgs().await?;
755
756 let mut events = select_biased! {
757 res = self.command.next() => {
758 let cmd = unwrap_or_shutdown!(self, res, "command channel drop")?;
759 return ControlHandler::new(self).handle_cmd(cmd);
760 },
761 // Check whether we've got a control message pending.
762 //
763 // Note: unfortunately, reading from control here means we might start
764 // handling control messages before our chan_senders are ready.
765 // With the current design, this is inevitable: we can't know which circuit leg
766 // a control message is meant for without first reading the control message from
767 // the channel, and at that point, we can't know for sure whether that particular
768 // circuit is ready for sending.
769 ret = self.control.next() => {
770 let msg = unwrap_or_shutdown!(self, ret, "control drop")?;
771 smallvec![CircuitEvent::HandleControl(msg)]
772 },
773 res = self.circuits.next_circ_event(&self.runtime).fuse() => res?,
774 };
775
776 // Put the events into the order that we need to execute them in.
777 //
778 // (Yes, this _does_ have to be a stable sort. Not all events may be freely re-ordered
779 // with respect to one another.)
780 events.sort_by_key(|a| a.order_within_batch());
781
782 for event in events {
783 let cmd = match event {
784 CircuitEvent::RunCmd { leg, cmd } => Some(RunOnceCmd::Single(
785 RunOnceCmdInner::from_circuit_cmd(leg, cmd),
786 )),
787 CircuitEvent::HandleControl(ctrl) => ControlHandler::new(self)
788 .handle_msg(ctrl)?
789 .map(RunOnceCmd::Single),
790 CircuitEvent::HandleCell { leg, cell } => {
791 let circ = self
792 .circuits
793 .leg_mut(leg)
794 .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
795
796 let circ_cmds = circ.handle_cell(&mut self.cell_handlers, leg, cell)?;
797 if circ_cmds.is_empty() {
798 None
799 } else {
800 // TODO: we return RunOnceCmd::Multiple even if there's a single command.
801 //
802 // See the TODO on `Circuit::handle_cell`.
803 let cmd = RunOnceCmd::Multiple(
804 circ_cmds
805 .into_iter()
806 .map(|cmd| RunOnceCmdInner::from_circuit_cmd(leg, cmd))
807 .collect(),
808 );
809
810 Some(cmd)
811 }
812 }
813 CircuitEvent::RemoveLeg { leg, reason } => {
814 Some(RunOnceCmdInner::RemoveLeg { leg, reason }.into())
815 }
816 CircuitEvent::PaddingAction { leg, padding_event } => {
817 cfg_if! {
818 if #[cfg(feature = "circ-padding")] {
819 Some(RunOnceCmdInner::PaddingAction { leg, padding_event }.into())
820 } else {
821 // If padding isn't enabled, we never generate a padding event,
822 // so we can be sure this case will never be called.
823 void::unreachable(padding_event.0);
824 }
825 }
826 }
827 CircuitEvent::ProtoViolation { err } => {
828 return Err(err.into());
829 }
830 };
831
832 if let Some(cmd) = cmd {
833 self.handle_run_once_cmd(cmd).await?;
834 }
835 }
836
837 Ok(())
838 }
839
840 /// Try to process the previously-out-of-order messages we might have buffered.
841 #[cfg(feature = "conflux")]
842 #[instrument(level = "trace", skip_all)]
843 async fn try_dequeue_ooo_msgs(&mut self) -> StdResult<(), ReactorError> {
844 // Check if we're ready to dequeue any of the previously out-of-order cells.
845 while let Some(entry) = self.ooo_msgs.peek() {
846 let should_pop = self.circuits.is_seqno_in_order(entry.msg.seqno);
847
848 if !should_pop {
849 break;
850 }
851
852 let entry = self.ooo_msgs.pop().expect("item just disappeared?!");
853
854 let circ = self
855 .circuits
856 .leg_mut(entry.leg_id)
857 .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
858 let handlers = &mut self.cell_handlers;
859 let cmd = circ
860 .handle_in_order_relay_msg(
861 handlers,
862 entry.msg.hopnum,
863 entry.leg_id,
864 entry.msg.cell_counts_towards_windows,
865 entry.msg.streamid,
866 entry.msg.msg,
867 )?
868 .map(|cmd| {
869 RunOnceCmd::Single(RunOnceCmdInner::from_circuit_cmd(entry.leg_id, cmd))
870 });
871
872 if let Some(cmd) = cmd {
873 self.handle_run_once_cmd(cmd).await?;
874 }
875 }
876
877 Ok(())
878 }
879
880 /// Handle a [`RunOnceCmd`].
881 #[instrument(level = "trace", skip_all)]
882 async fn handle_run_once_cmd(&mut self, cmd: RunOnceCmd) -> StdResult<(), ReactorError> {
883 match cmd {
884 RunOnceCmd::Single(cmd) => return self.handle_single_run_once_cmd(cmd).await,
885 RunOnceCmd::Multiple(cmds) => {
886 // While we know `sendable` is ready to accept *one* cell,
887 // we can't be certain it will be able to accept *all* of the cells
888 // that need to be sent here. This means we *may* end up buffering
889 // in its underlying SometimesUnboundedSink! That is OK, because
890 // RunOnceCmd::Multiple is only used for handling packed cells.
891 for cmd in cmds {
892 self.handle_single_run_once_cmd(cmd).await?;
893 }
894 }
895 }
896
897 Ok(())
898 }
899
900 /// Handle a [`RunOnceCmd`].
901 #[instrument(level = "trace", skip_all)]
902 async fn handle_single_run_once_cmd(
903 &mut self,
904 cmd: RunOnceCmdInner,
905 ) -> StdResult<(), ReactorError> {
906 match cmd {
907 RunOnceCmdInner::Send { leg, cell, done } => {
908 // TODO: check the cc window
909 let res = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
910 if let Some(done) = done {
911 // Don't care if the receiver goes away
912 let _ = done.send(res.clone());
913 }
914 res?;
915 }
916 #[cfg(feature = "send-control-msg")]
917 RunOnceCmdInner::SendMsgAndInstallHandler { msg, handler, done } => {
918 let cell: Result<Option<SendRelayCell>> =
919 self.prepare_msg_and_install_handler(msg, handler);
920
921 match cell {
922 Ok(Some(cell)) => {
923 // TODO(conflux): let the RunOnceCmdInner specify which leg to send the cell on
924 let outcome = self.circuits.send_relay_cell_on_leg(cell, None).await;
925 // don't care if receiver goes away.
926 let _ = done.send(outcome.clone());
927 outcome?;
928 }
929 Ok(None) => {
930 // don't care if receiver goes away.
931 let _ = done.send(Ok(()));
932 }
933 Err(e) => {
934 // don't care if receiver goes away.
935 let _ = done.send(Err(e.clone()));
936 return Err(e.into());
937 }
938 }
939 }
940 RunOnceCmdInner::BeginStream {
941 leg,
942 cell,
943 hop,
944 done,
945 } => {
946 match cell {
947 Ok((cell, stream_id)) => {
948 let circ = self
949 .circuits
950 .leg_mut(leg)
951 .ok_or_else(|| internal!("leg disappeared?!"))?;
952 let cell_hop = cell.hop.expect("missing hop in client SendRelayCell?!");
953 let relay_format = circ
954 .hop_mut(cell_hop)
955 // TODO: Is this the right error type here? Or should there be a "HopDisappeared"?
956 .ok_or(Error::NoSuchHop)?
957 .relay_cell_format();
958
959 let outcome = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
960 // don't care if receiver goes away.
961 let _ = done.send(outcome.clone().map(|_| (stream_id, hop, relay_format)));
962 outcome?;
963 }
964 Err(e) => {
965 // don't care if receiver goes away.
966 let _ = done.send(Err(e.clone()));
967 return Err(e.into());
968 }
969 }
970 }
971 RunOnceCmdInner::CloseStream {
972 hop,
973 sid,
974 behav,
975 reason,
976 done,
977 } => {
978 let result = {
979 let (leg_id, hop_num) = self
980 .resolve_hop_location(hop)
981 .map_err(into_bad_api_usage!("Could not resolve {hop:?}"))?;
982 let leg = self
983 .circuits
984 .leg_mut(leg_id)
985 .ok_or(bad_api_usage!("No leg for id {:?}", leg_id))?;
986 Ok::<_, Bug>((leg, hop_num))
987 };
988
989 let (leg, hop_num) = match result {
990 Ok(x) => x,
991 Err(e) => {
992 if let Some(done) = done {
993 // don't care if the sender goes away
994 let e = into_bad_api_usage!("Could not resolve {hop:?}")(e);
995 let _ = done.send(Err(e.into()));
996 }
997 return Ok(());
998 }
999 };
1000
1001 let max_rtt = {
1002 let hop = leg
1003 .hop(hop_num)
1004 .ok_or_else(|| internal!("the hop we resolved disappeared?!"))?;
1005 let ccontrol = hop.ccontrol();
1006
1007 // Note: if we have no measurements for the RTT, this will be set to 0,
1008 // and the timeout will be 2 * CBT.
1009 ccontrol
1010 .rtt()
1011 .max_rtt_usec()
1012 .map(|rtt| Duration::from_millis(u64::from(rtt)))
1013 .unwrap_or_default()
1014 };
1015
1016 // The length of the circuit up until the hop that has the half-streeam.
1017 //
1018 // +1, because HopNums are zero-based.
1019 let circ_len = usize::from(hop_num) + 1;
1020
1021 // We double the CBT to account for rend circuits,
1022 // which are twice as long (otherwise we risk expiring
1023 // the rend half-streams too soon).
1024 let timeout = std::cmp::max(max_rtt, 2 * leg.estimate_cbt(circ_len));
1025 let expire_at = self.runtime.now() + timeout;
1026
1027 let res: Result<()> = leg
1028 .close_stream(hop_num, sid, behav, reason, expire_at)
1029 .await;
1030
1031 if let Some(done) = done {
1032 // don't care if the sender goes away
1033 let _ = done.send(res);
1034 }
1035 }
1036 RunOnceCmdInner::MaybeSendXon {
1037 rate,
1038 stream_id,
1039 hop,
1040 } => {
1041 let (leg_id, hop_num) = match self.resolve_hop_location(hop) {
1042 Ok(x) => x,
1043 Err(NoJoinPointError) => {
1044 // A stream tried to send an XON message message to the join point of
1045 // a tunnel that has never had a join point. Currently in arti, only a
1046 // `StreamTarget` asks us to send an XON message, and this tunnel
1047 // originally created the `StreamTarget` to begin with. So this is a
1048 // legitimate bug somewhere in the tunnel code.
1049 return Err(
1050 internal!(
1051 "Could not send an XON message to a join point on a tunnel without a join point",
1052 )
1053 .into()
1054 );
1055 }
1056 };
1057
1058 let Some(leg) = self.circuits.leg_mut(leg_id) else {
1059 // The leg has disappeared. This is fine since the stream may have ended and
1060 // been cleaned up while this `CtrlMsg::MaybeSendXon` message was queued.
1061 // It is possible that is a bug and this is an incorrect leg number, but
1062 // it's not currently possible to differentiate between an incorrect leg
1063 // number and a tunnel leg that has been closed.
1064 debug!("Could not send an XON message on a leg that does not exist. Ignoring.");
1065 return Ok(());
1066 };
1067
1068 let Some(hop) = leg.hop_mut(hop_num) else {
1069 // The hop has disappeared. This is fine since the circuit may have been
1070 // been truncated while the `CtrlMsg::MaybeSendXon` message was queued.
1071 // It is possible that is a bug and this is an incorrect hop number, but
1072 // it's not currently possible to differentiate between an incorrect hop
1073 // number and a circuit hop that has been removed.
1074 debug!("Could not send an XON message on a hop that does not exist. Ignoring.");
1075 return Ok(());
1076 };
1077
1078 let Some(msg) = hop.maybe_send_xon(rate, stream_id)? else {
1079 // Nothing to do.
1080 return Ok(());
1081 };
1082
1083 let cell = AnyRelayMsgOuter::new(Some(stream_id), msg.into());
1084 let cell = SendRelayCell {
1085 hop: Some(hop_num),
1086 early: false,
1087 cell,
1088 };
1089
1090 leg.send_relay_cell(cell).await?;
1091 }
1092 RunOnceCmdInner::HandleSendMe { leg, hop, sendme } => {
1093 let leg = self
1094 .circuits
1095 .leg_mut(leg)
1096 .ok_or_else(|| internal!("leg disappeared?!"))?;
1097 // NOTE: it's okay to await. We are only awaiting on the congestion_signals
1098 // future which *should* resolve immediately
1099 let signals = leg.chan_sender.congestion_signals().await;
1100 leg.handle_sendme(hop, sendme, signals)?;
1101 }
1102 RunOnceCmdInner::FirstHopClockSkew { answer } => {
1103 let res = self.circuits.single_leg_mut().map(|leg| leg.clock_skew());
1104
1105 // don't care if the sender goes away
1106 let _ = answer.send(res.map_err(Into::into));
1107 }
1108 RunOnceCmdInner::CleanShutdown => {
1109 trace!(tunnel_id = %self.tunnel_id, "reactor shutdown due to handled cell");
1110 return Err(ReactorError::Shutdown);
1111 }
1112 RunOnceCmdInner::RemoveLeg { leg, reason } => {
1113 warn!(tunnel_id = %self.tunnel_id, reason = %reason, "removing circuit leg");
1114
1115 let circ = self.circuits.remove(leg)?;
1116 let is_conflux_pending = circ.is_conflux_pending();
1117
1118 // Drop the removed leg. This will cause it to close if it's not already closed.
1119 drop(circ);
1120
1121 // If we reach this point, it means we have more than one leg
1122 // (otherwise the .remove() would've returned a Shutdown error),
1123 // so we expect there to be a ConfluxHandshakeContext installed.
1124
1125 #[cfg(feature = "conflux")]
1126 if is_conflux_pending {
1127 let (error, proto_violation): (_, Option<Error>) = match &reason {
1128 RemoveLegReason::ConfluxHandshakeTimeout => {
1129 (ConfluxHandshakeError::Timeout, None)
1130 }
1131 RemoveLegReason::ConfluxHandshakeErr(e) => {
1132 (ConfluxHandshakeError::Link(e.clone()), Some(e.clone()))
1133 }
1134 RemoveLegReason::ChannelClosed => {
1135 (ConfluxHandshakeError::ChannelClosed, None)
1136 }
1137 };
1138
1139 self.note_conflux_handshake_result(Err(error), proto_violation.is_some())?;
1140
1141 if let Some(e) = proto_violation {
1142 tor_error::warn_report!(
1143 e,
1144 tunnel_id = %self.tunnel_id,
1145 "Malformed conflux handshake, tearing down tunnel",
1146 );
1147
1148 return Err(e.into());
1149 }
1150 }
1151 }
1152 #[cfg(feature = "conflux")]
1153 RunOnceCmdInner::ConfluxHandshakeComplete { leg, cell } => {
1154 // Note: on the client-side, the handshake is considered complete once the
1155 // RELAY_CONFLUX_LINKED_ACK is sent (roughly upon receipt of the LINKED cell).
1156 //
1157 // We're optimistic here, and declare the handshake a success *before*
1158 // sending the LINKED_ACK response. I think this is OK though,
1159 // because if the send_relay_cell() below fails, the reactor will shut
1160 // down anyway. OTOH, marking the handshake as complete slightly early
1161 // means that on the happy path, the circuit is marked as usable sooner,
1162 // instead of blocking on the sending of the LINKED_ACK.
1163 self.note_conflux_handshake_result(Ok(()), false)?;
1164
1165 let res = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
1166
1167 res?;
1168 }
1169 #[cfg(feature = "conflux")]
1170 RunOnceCmdInner::Link { circuits, answer } => {
1171 // Add the specified circuits to our conflux set,
1172 // and send a LINK cell down each unlinked leg.
1173 //
1174 // NOTE: this will block the reactor until all the cells are sent.
1175 self.handle_link_circuits(circuits, answer).await?;
1176 }
1177 #[cfg(feature = "conflux")]
1178 RunOnceCmdInner::Enqueue { leg, msg } => {
1179 let entry = ConfluxHeapEntry { leg_id: leg, msg };
1180 self.ooo_msgs.push(entry);
1181 }
1182 #[cfg(feature = "circ-padding")]
1183 RunOnceCmdInner::PaddingAction { leg, padding_event } => {
1184 // TODO: If we someday move back to having a per-circuit reactor,
1185 // this event would logically belong there, not on the tunnel reactor.
1186 self.circuits.run_padding_event(leg, padding_event).await?;
1187 }
1188 }
1189
1190 Ok(())
1191 }
1192
1193 /// Wait for a [`CtrlMsg::Create`] to come along to set up the circuit.
1194 ///
1195 /// Returns an error if an unexpected `CtrlMsg` is received.
1196 #[instrument(level = "trace", skip_all)]
1197 async fn wait_for_create(&mut self) -> StdResult<(), ReactorError> {
1198 let msg = select_biased! {
1199 res = self.command.next() => {
1200 let cmd = unwrap_or_shutdown!(self, res, "shutdown channel drop")?;
1201 match cmd {
1202 CtrlCmd::Shutdown => return self.handle_shutdown().map(|_| ()),
1203 #[cfg(test)]
1204 CtrlCmd::AddFakeHop {
1205 relay_cell_format: format,
1206 fwd_lasthop,
1207 rev_lasthop,
1208 peer_id,
1209 params,
1210 done,
1211 } => {
1212 let leg = self.circuits.single_leg_mut()?;
1213 leg.handle_add_fake_hop(format, fwd_lasthop, rev_lasthop, peer_id, ¶ms, done);
1214 return Ok(())
1215 },
1216 _ => {
1217 trace!("reactor shutdown due to unexpected command: {:?}", cmd);
1218 return Err(Error::CircProto(format!("Unexpected control {cmd:?} on client circuit")).into());
1219 }
1220 }
1221 },
1222 res = self.control.next() => unwrap_or_shutdown!(self, res, "control drop")?,
1223 };
1224
1225 match msg {
1226 CtrlMsg::Create {
1227 recv_created,
1228 handshake,
1229 settings,
1230 done,
1231 } => {
1232 // TODO(conflux): instead of crashing the reactor, it might be better
1233 // to send the error via the done channel instead
1234 let leg = self.circuits.single_leg_mut()?;
1235 leg.handle_create(recv_created, handshake, settings, done)
1236 .await
1237 }
1238 _ => {
1239 trace!("reactor shutdown due to unexpected cell: {:?}", msg);
1240
1241 Err(Error::CircProto(format!("Unexpected {msg:?} cell on client circuit")).into())
1242 }
1243 }
1244 }
1245
1246 /// Add the specified handshake result to our `ConfluxHandshakeContext`.
1247 ///
1248 /// If all the circuits we were waiting on have finished the conflux handshake,
1249 /// the `ConfluxHandshakeContext` is consumed, and the results we have collected
1250 /// are sent to the handshake initiator.
1251 #[cfg(feature = "conflux")]
1252 #[instrument(level = "trace", skip_all)]
1253 fn note_conflux_handshake_result(
1254 &mut self,
1255 res: StdResult<(), ConfluxHandshakeError>,
1256 reactor_is_closing: bool,
1257 ) -> StdResult<(), ReactorError> {
1258 let tunnel_complete = match self.conflux_hs_ctx.as_mut() {
1259 Some(conflux_ctx) => {
1260 conflux_ctx.results.push(res);
1261 // Whether all the legs have finished linking:
1262 conflux_ctx.results.len() == conflux_ctx.num_legs
1263 }
1264 None => {
1265 return Err(internal!("no conflux handshake context").into());
1266 }
1267 };
1268
1269 if tunnel_complete || reactor_is_closing {
1270 // Time to remove the conflux handshake context
1271 // and extract the results we have collected
1272 let conflux_ctx = self.conflux_hs_ctx.take().expect("context disappeared?!");
1273
1274 let success_count = conflux_ctx.results.iter().filter(|res| res.is_ok()).count();
1275 let leg_count = conflux_ctx.results.len();
1276
1277 info!(
1278 tunnel_id = %self.tunnel_id,
1279 "conflux tunnel ready ({success_count}/{leg_count} circuits successfully linked)",
1280 );
1281
1282 send_conflux_outcome(conflux_ctx.answer, Ok(conflux_ctx.results))?;
1283
1284 // We don't expect to receive any more handshake results,
1285 // at least not until we get another LinkCircuits control message,
1286 // which will install a new ConfluxHandshakeCtx with a channel
1287 // for us to send updates on
1288 }
1289
1290 Ok(())
1291 }
1292
1293 /// Prepare a `SendRelayCell` request, and install the given meta-cell handler.
1294 fn prepare_msg_and_install_handler(
1295 &mut self,
1296 msg: Option<AnyRelayMsgOuter>,
1297 handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
1298 ) -> Result<Option<SendRelayCell>> {
1299 let msg = msg
1300 .map(|msg| {
1301 let handlers = &mut self.cell_handlers;
1302 let handler = handler
1303 .as_ref()
1304 .or(handlers.meta_handler.as_ref())
1305 .ok_or_else(|| internal!("tried to use an ended Conversation"))?;
1306 // We should always have a precise HopLocation here so this should never fails but
1307 // in case we have a ::JointPoint, we'll notice.
1308 let hop = handler.expected_hop().hop_num().ok_or(bad_api_usage!(
1309 "MsgHandler doesn't have a precise HopLocation"
1310 ))?;
1311 Ok::<_, crate::Error>(SendRelayCell {
1312 hop: Some(hop),
1313 early: false,
1314 cell: msg,
1315 })
1316 })
1317 .transpose()?;
1318
1319 if let Some(handler) = handler {
1320 self.cell_handlers.set_meta_handler(handler)?;
1321 }
1322
1323 Ok(msg)
1324 }
1325
1326 /// Handle a shutdown request.
1327 fn handle_shutdown(&self) -> StdResult<Option<RunOnceCmdInner>, ReactorError> {
1328 trace!(
1329 tunnel_id = %self.tunnel_id,
1330 "reactor shutdown due to explicit request",
1331 );
1332
1333 Err(ReactorError::Shutdown)
1334 }
1335
1336 /// Handle a request to shutdown the reactor and return the only [`Circuit`] in this tunnel.
1337 ///
1338 /// Returns an error over the `answer` channel if the reactor has no circuits,
1339 /// or more than one circuit. The reactor will shut down regardless.
1340 #[cfg(feature = "conflux")]
1341 fn handle_shutdown_and_return_circuit(
1342 &mut self,
1343 answer: oneshot::Sender<StdResult<Circuit, Bug>>,
1344 ) -> StdResult<(), ReactorError> {
1345 // Don't care if the receiver goes away
1346 let _ = answer.send(self.circuits.take_single_leg());
1347 self.handle_shutdown().map(|_| ())
1348 }
1349
1350 /// Resolves a [`TargetHop`] to a [`HopLocation`].
1351 ///
1352 /// After resolving a `TargetHop::LastHop`,
1353 /// the `HopLocation` can become stale if a single-path circuit is later extended or truncated.
1354 /// This means that the `HopLocation` can become stale from one reactor iteration to the next.
1355 ///
1356 /// It's generally okay to hold on to a (possibly stale) `HopLocation`
1357 /// if you need a fixed hop position in the tunnel.
1358 /// For example if we open a stream to `TargetHop::LastHop`,
1359 /// we would want to store the stream position as a `HopLocation` and not a `TargetHop::LastHop`
1360 /// as we don't want the stream position to change as the tunnel is extended or truncated.
1361 ///
1362 /// Returns [`NoHopsBuiltError`] if trying to resolve `TargetHop::LastHop`
1363 /// and the tunnel has no hops
1364 /// (either has no legs, or has legs which contain no hops).
1365 fn resolve_target_hop(&self, hop: TargetHop) -> StdResult<HopLocation, NoHopsBuiltError> {
1366 match hop {
1367 TargetHop::Hop(hop) => Ok(hop),
1368 TargetHop::LastHop => {
1369 if let Ok(leg) = self.circuits.single_leg() {
1370 let leg_id = leg.unique_id();
1371 // single-path tunnel
1372 let hop = leg.last_hop_num().ok_or(NoHopsBuiltError)?;
1373 Ok(HopLocation::Hop((leg_id, hop)))
1374 } else if !self.circuits.is_empty() {
1375 // multi-path tunnel
1376 Ok(HopLocation::JoinPoint)
1377 } else {
1378 // no legs
1379 Err(NoHopsBuiltError)
1380 }
1381 }
1382 }
1383 }
1384
1385 /// Resolves a [`HopLocation`] to a [`UniqId`] and [`HopNum`].
1386 ///
1387 /// After resolving a `HopLocation::JoinPoint`,
1388 /// the [`UniqId`] and [`HopNum`] can become stale if the primary leg changes.
1389 ///
1390 /// You should try to only resolve to a specific [`UniqId`] and [`HopNum`] immediately before you
1391 /// need them,
1392 /// and you should not hold on to the resolved [`UniqId`] and [`HopNum`] between reactor
1393 /// iterations as the primary leg may change from one iteration to the next.
1394 ///
1395 /// Returns [`NoJoinPointError`] if trying to resolve `HopLocation::JoinPoint`
1396 /// but it does not have a join point.
1397 #[instrument(level = "trace", skip_all)]
1398 fn resolve_hop_location(
1399 &self,
1400 hop: HopLocation,
1401 ) -> StdResult<(UniqId, HopNum), NoJoinPointError> {
1402 match hop {
1403 HopLocation::Hop((leg_id, hop_num)) => Ok((leg_id, hop_num)),
1404 HopLocation::JoinPoint => {
1405 if let Some((leg_id, hop_num)) = self.circuits.primary_join_point() {
1406 Ok((leg_id, hop_num))
1407 } else {
1408 // Attempted to get the join point of a non-multipath tunnel.
1409 Err(NoJoinPointError)
1410 }
1411 }
1412 }
1413 }
1414
1415 /// Resolve a [`TargetHop`] directly into a [`UniqId`] and [`HopNum`].
1416 ///
1417 /// This is a helper function that basically calls both resolve_target_hop and
1418 /// resolve_hop_location back to back.
1419 ///
1420 /// It returns None on failure to resolve meaning that if you want more detailed error on why
1421 /// it failed, explicitly use the resolve_hop_location() and resolve_target_hop() functions.
1422 pub(crate) fn target_hop_to_hopnum_id(&self, hop: TargetHop) -> Option<(UniqId, HopNum)> {
1423 self.resolve_target_hop(hop)
1424 .ok()
1425 .and_then(|resolved| self.resolve_hop_location(resolved).ok())
1426 }
1427
1428 /// Install or remove a padder at a given hop.
1429 #[cfg(feature = "circ-padding-manual")]
1430 fn set_padding_at_hop(
1431 &self,
1432 hop: HopLocation,
1433 padder: Option<super::circuit::padding::CircuitPadder>,
1434 ) -> Result<()> {
1435 let HopLocation::Hop((leg_id, hop_num)) = hop else {
1436 return Err(bad_api_usage!("Padding to the join point is not supported.").into());
1437 };
1438 let circ = self.circuits.leg(leg_id).ok_or(Error::NoSuchHop)?;
1439 circ.set_padding_at_hop(hop_num, padder)?;
1440 Ok(())
1441 }
1442
1443 /// Does congestion control use stream SENDMEs for the given hop?
1444 ///
1445 /// Returns `None` if either the `leg` or `hop` don't exist.
1446 fn uses_stream_sendme(&self, leg: UniqId, hop: HopNum) -> Option<bool> {
1447 self.circuits.uses_stream_sendme(leg, hop)
1448 }
1449
1450 /// Handle a request to link some extra circuits in the reactor's conflux set.
1451 ///
1452 /// The circuits are validated, and if they do not have the same length,
1453 /// or if they do not all have the same last hop, an error is returned on
1454 /// the `answer` channel, and the conflux handshake is *not* initiated.
1455 ///
1456 /// If validation succeeds, the circuits are added to this reactor's conflux set,
1457 /// and the conflux handshake is initiated (by sending a LINK cell on each leg).
1458 ///
1459 /// NOTE: this blocks the reactor main loop until all the cells are sent.
1460 #[cfg(feature = "conflux")]
1461 #[instrument(level = "trace", skip_all)]
1462 async fn handle_link_circuits(
1463 &mut self,
1464 circuits: Vec<Circuit>,
1465 answer: ConfluxLinkResultChannel,
1466 ) -> StdResult<(), ReactorError> {
1467 use tor_error::warn_report;
1468
1469 if self.conflux_hs_ctx.is_some() {
1470 let err = internal!("conflux linking already in progress");
1471 send_conflux_outcome(answer, Err(err.into()))?;
1472
1473 return Ok(());
1474 }
1475
1476 let unlinked_legs = self.circuits.num_unlinked();
1477
1478 // We need to send the LINK cell on each of the new circuits
1479 // and on each of the existing, unlinked legs from self.circuits.
1480 //
1481 // In reality, there can only be one such circuit
1482 // (the "initial" one from the previously single-path tunnel),
1483 // because any circuits that to complete the conflux handshake
1484 // get removed from the set.
1485 let num_legs = circuits.len() + unlinked_legs;
1486
1487 // Note: add_legs validates `circuits`
1488 let res = async {
1489 self.circuits.add_legs(circuits, &self.runtime)?;
1490 self.circuits.link_circuits(&self.runtime).await
1491 }
1492 .await;
1493
1494 if let Err(e) = res {
1495 warn_report!(e, "Failed to link conflux circuits");
1496
1497 send_conflux_outcome(answer, Err(e))?;
1498 } else {
1499 // Save the channel, to notify the user of completion.
1500 self.conflux_hs_ctx = Some(ConfluxHandshakeCtx {
1501 answer,
1502 num_legs,
1503 results: Default::default(),
1504 });
1505 }
1506
1507 Ok(())
1508 }
1509}
1510
1511/// Notify the conflux handshake initiator of the handshake outcome.
1512///
1513/// Returns an error if the initiator has done away.
1514#[cfg(feature = "conflux")]
1515fn send_conflux_outcome(
1516 tx: ConfluxLinkResultChannel,
1517 res: Result<ConfluxHandshakeResult>,
1518) -> StdResult<(), ReactorError> {
1519 if tx.send(res).is_err() {
1520 tracing::warn!("conflux initiator went away before handshake completed?");
1521 return Err(ReactorError::Shutdown);
1522 }
1523
1524 Ok(())
1525}
1526
1527/// The tunnel does not have any hops.
1528#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
1529#[non_exhaustive]
1530#[error("no hops have been built for this tunnel")]
1531pub(crate) struct NoHopsBuiltError;
1532
1533/// The tunnel does not have a join point.
1534#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
1535#[non_exhaustive]
1536#[error("the tunnel does not have a join point")]
1537pub(crate) struct NoJoinPointError;
1538
1539impl CellHandlers {
1540 /// Try to install a given meta-cell handler to receive any unusual cells on
1541 /// this circuit, along with a result channel to notify on completion.
1542 fn set_meta_handler(&mut self, handler: Box<dyn MetaCellHandler + Send>) -> Result<()> {
1543 if self.meta_handler.is_none() {
1544 self.meta_handler = Some(handler);
1545 Ok(())
1546 } else {
1547 Err(Error::from(internal!(
1548 "Tried to install a meta-cell handler before the old one was gone."
1549 )))
1550 }
1551 }
1552
1553 /// Try to install a given cell handler on this circuit.
1554 #[cfg(feature = "hs-service")]
1555 fn set_incoming_stream_req_handler(
1556 &mut self,
1557 handler: IncomingStreamRequestHandler,
1558 ) -> Result<()> {
1559 if self.incoming_stream_req_handler.is_none() {
1560 self.incoming_stream_req_handler = Some(handler);
1561 Ok(())
1562 } else {
1563 Err(Error::from(internal!(
1564 "Tried to install a BEGIN cell handler before the old one was gone."
1565 )))
1566 }
1567 }
1568}
1569
1570#[cfg(test)]
1571mod test {
1572 // Tested in [`crate::client::circuit::test`].
1573}