Skip to main content

tor_proto/client/
reactor.rs

1//! Code to handle incoming cells on a circuit.
2//!
3//! ## On message validation
4//!
5//! There are three steps for validating an incoming message on a stream:
6//!
7//! 1. Is the message contextually appropriate? (e.g., no more than one
8//!    `CONNECTED` message per stream.) This is handled by calling
9//!    [`CmdChecker::check_msg`](crate::stream::cmdcheck::CmdChecker::check_msg).
10//! 2. Does the message comply with flow-control rules? (e.g., no more SENDMEs
11//!    than we've sent data for.) This is handled within the reactor by the
12//!    `StreamFlowCtrl`. For half-closed streams which don't send stream
13//!    SENDMEs, an additional receive-window check is performed in the
14//!    `halfstream` module.
15//! 3. Does the message have an acceptable command type, and is the message
16//!    well-formed? For open streams, the streams themselves handle this check.
17//!    For half-closed streams, the reactor handles it by calling
18//!    `consume_checked_msg()`.
19
20pub(crate) mod circuit;
21mod conflux;
22mod control;
23
24use crate::circuit::circhop::{ReactorStreamComponents, SendRelayCell};
25use crate::circuit::{CircuitRxReceiver, UniqId};
26use crate::client::circuit::ClientCircChanMsg;
27use crate::client::circuit::padding::{PaddingController, PaddingEvent, PaddingEventStream};
28use crate::client::{HopLocation, TargetHop};
29use crate::crypto::cell::HopNum;
30use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
31use crate::memquota::CircuitAccount;
32use crate::stream::CloseStreamBehavior;
33use crate::streammap;
34use crate::tunnel::{TunnelId, TunnelScopedCircId};
35use crate::util::err::ReactorError;
36use crate::util::skew::ClockSkew;
37use crate::util::timeout::TimeoutEstimator;
38use crate::{Error, Result};
39use circuit::Circuit;
40use conflux::ConfluxSet;
41use control::ControlHandler;
42use std::cmp::Ordering;
43use std::collections::BinaryHeap;
44use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
45use tor_cell::relaycell::msg::Sendme;
46use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId, UnparsedRelayMsg};
47use tor_error::{Bug, bad_api_usage, debug_report, internal, into_bad_api_usage};
48use tor_rtcompat::{DynTimeProvider, SleepProvider};
49
50use cfg_if::cfg_if;
51use futures::StreamExt;
52use futures::channel::mpsc;
53use futures::{FutureExt as _, select_biased};
54use oneshot_fused_workaround as oneshot;
55
56use std::result::Result as StdResult;
57use std::sync::Arc;
58use std::time::Duration;
59
60use crate::channel::Channel;
61use crate::conflux::msghandler::RemoveLegReason;
62use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey};
63use circuit::CircuitCmd;
64use derive_more::From;
65use smallvec::smallvec;
66use tor_cell::chancell::CircId;
67use tor_llcrypto::pk;
68use tracing::{debug, info, instrument, trace, warn};
69
70use super::circuit::{MutableState, TunnelMutableState};
71use crate::circuit::reactor::ReactorResultChannel;
72
73#[cfg(feature = "hs-service")]
74use crate::stream::incoming::IncomingStreamRequestHandler;
75
76#[cfg(feature = "conflux")]
77use {
78    crate::conflux::msghandler::{ConfluxCmd, OooRelayMsg},
79    crate::util::err::ConfluxHandshakeError,
80};
81
82pub(super) use control::{CtrlCmd, CtrlMsg, FlowCtrlMsg};
83
84/// Contains a list of conflux handshake results.
85#[cfg(feature = "conflux")]
86pub(super) type ConfluxHandshakeResult = Vec<StdResult<(), ConfluxHandshakeError>>;
87
88/// The type of oneshot channel used to inform reactor users of the outcome
89/// of a client-side conflux handshake.
90///
91/// Contains a list of handshake results, one for each circuit that we were asked
92/// to link in the tunnel.
93#[cfg(feature = "conflux")]
94pub(super) type ConfluxLinkResultChannel = ReactorResultChannel<ConfluxHandshakeResult>;
95
96/// A handshake type, to be used when creating circuit hops.
97#[derive(Clone, Debug)]
98pub(crate) enum CircuitHandshake {
99    /// Use the CREATE_FAST handshake.
100    CreateFast,
101    /// Use the ntor handshake.
102    Ntor {
103        /// The public key of the relay.
104        public_key: NtorPublicKey,
105        /// The Ed25519 identity of the relay, which is verified against the
106        /// identity held in the circuit's channel.
107        ed_identity: pk::ed25519::Ed25519Identity,
108    },
109    /// Use the ntor-v3 handshake.
110    NtorV3 {
111        /// The public key of the relay.
112        public_key: NtorV3PublicKey,
113    },
114}
115
116// TODO: the RunOnceCmd/RunOnceCmdInner/CircuitCmd/CircuitEvent enum
117// proliferation is a bit bothersome, but unavoidable with the current design.
118//
119// We should consider getting rid of some of these enums (if possible),
120// and coming up with more intuitive names.
121
122/// One or more [`RunOnceCmdInner`] to run inside [`Reactor::run_once`].
123#[derive(From, Debug)]
124#[allow(clippy::large_enum_variant)] // TODO #2003: resolve this
125enum RunOnceCmd {
126    /// Run a single `RunOnceCmdInner` command.
127    Single(RunOnceCmdInner),
128    /// Run multiple `RunOnceCmdInner` commands.
129    //
130    // Note: this whole enum *could* be replaced with Vec<RunOnceCmdInner>,
131    // but most of the time we're only going to have *one* RunOnceCmdInner
132    // to run per run_once() loop. The enum enables us avoid the extra heap
133    // allocation for the `RunOnceCmd::Single` case.
134    Multiple(Vec<RunOnceCmdInner>),
135}
136
137/// Instructions for running something in the reactor loop.
138///
139/// Run at the end of [`Reactor::run_once`].
140//
141// TODO: many of the variants of this enum have an identical CtrlMsg counterpart.
142// We should consider making each variant a tuple variant and deduplicating the fields.
143#[derive(educe::Educe)]
144#[educe(Debug)]
145enum RunOnceCmdInner {
146    /// Send a RELAY cell.
147    Send {
148        /// The leg the cell should be sent on.
149        leg: UniqId,
150        /// The cell to send.
151        cell: SendRelayCell,
152        /// A channel for sending completion notifications.
153        done: Option<ReactorResultChannel<()>>,
154    },
155    /// Send a given control message on this circuit, and install a control-message handler to
156    /// receive responses.
157    #[cfg(feature = "send-control-msg")]
158    SendMsgAndInstallHandler {
159        /// The message to send, if any
160        msg: Option<AnyRelayMsgOuter>,
161        /// A message handler to install.
162        ///
163        /// If this is `None`, there must already be a message handler installed
164        #[educe(Debug(ignore))]
165        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
166        /// A sender that we use to tell the caller that the message was sent
167        /// and the handler installed.
168        done: oneshot::Sender<Result<()>>,
169    },
170    /// Handle a SENDME message.
171    HandleSendMe {
172        /// The leg the SENDME was received on.
173        leg: UniqId,
174        /// The hop number.
175        hop: HopNum,
176        /// The SENDME message to handle.
177        sendme: Sendme,
178    },
179    /// Begin a stream with the provided hop in this circuit.
180    ///
181    /// Uses the provided stream ID, and sends the provided message to that hop.
182    BeginStream {
183        /// The cell to send.
184        cell: SendRelayCell,
185        /// The ID of the stream to return on the oneshot channel.
186        stream_id: StreamId,
187        /// The location of the hop on the tunnel. We don't use this (and `Circuit`s shouldn't need
188        /// to worry about legs anyways), but need it so that we can pass it back in `done` to the
189        /// caller.
190        hop: HopLocation,
191        /// The circuit leg to begin the stream on.
192        leg: UniqId,
193        /// Components that are needed to interact with the new stream.
194        stream_components: ReactorStreamComponents,
195        /// Oneshot channel to notify on completion, with the allocated stream ID.
196        done: ReactorResultChannel<(
197            StreamId,
198            HopLocation,
199            RelayCellFormat,
200            ReactorStreamComponents,
201        )>,
202    },
203    /// Consider sending an XON message with the given `rate`.
204    MaybeSendXon {
205        /// The drain rate to advertise in the XON message.
206        rate: XonKbpsEwma,
207        /// The ID of the stream to send the message on.
208        stream_id: StreamId,
209        /// The location of the hop on the tunnel.
210        hop: HopLocation,
211    },
212    /// Close the specified stream.
213    CloseStream {
214        /// The hop number.
215        hop: HopLocation,
216        /// The ID of the stream to close.
217        sid: StreamId,
218        /// The stream-closing behavior.
219        behav: CloseStreamBehavior,
220        /// The reason for closing the stream.
221        reason: streammap::TerminateReason,
222        /// A channel for sending completion notifications.
223        done: Option<ReactorResultChannel<()>>,
224    },
225    /// Get the clock skew claimed by the first hop of the circuit.
226    FirstHopClockSkew {
227        /// Oneshot channel to return the clock skew.
228        answer: oneshot::Sender<StdResult<ClockSkew, Bug>>,
229    },
230    /// Remove a circuit leg from the conflux set.
231    RemoveLeg {
232        /// The circuit leg to remove.
233        leg: UniqId,
234        /// The reason for removal.
235        ///
236        /// This is only used for conflux circuits that get removed
237        /// before the conflux handshake is complete.
238        ///
239        /// The [`RemoveLegReason`] is mapped by the reactor to a
240        /// [`ConfluxHandshakeError`] that is sent to the initiator of the
241        /// handshake to indicate the reason the handshake failed.
242        reason: RemoveLegReason,
243    },
244    /// A circuit has completed the conflux handshake,
245    /// and wants to send the specified cell.
246    ///
247    /// This is similar to [`RunOnceCmdInner::Send`],
248    /// but needs to remain a separate variant,
249    /// because in addition to instructing the reactor to send a cell,
250    /// it also notifies it that the conflux handshake is complete on the specified `leg`.
251    /// This enables the reactor to save the handshake result (`Ok(())`),
252    /// and, if there are no other legs still in the handshake phase,
253    /// send the result to the handshake initiator.
254    #[cfg(feature = "conflux")]
255    ConfluxHandshakeComplete {
256        /// The circuit leg that has completed the handshake,
257        /// This is the leg the cell should be sent on.
258        leg: UniqId,
259        /// The cell to send.
260        cell: SendRelayCell,
261    },
262    /// Send a LINK cell on each of the unlinked circuit legs in the conflux set of this reactor.
263    #[cfg(feature = "conflux")]
264    Link {
265        /// The circuits to link into the tunnel
266        #[educe(Debug(ignore))]
267        circuits: Vec<Circuit>,
268        /// Oneshot channel for notifying of conflux handshake completion.
269        answer: ConfluxLinkResultChannel,
270    },
271    /// Enqueue an out-of-order cell in ooo_msg.
272    #[cfg(feature = "conflux")]
273    Enqueue {
274        /// The leg the entry originated from.
275        leg: UniqId,
276        /// The out-of-order message.
277        msg: OooRelayMsg,
278    },
279    /// Take a padding-related event on a circuit leg.
280    #[cfg(feature = "circ-padding")]
281    PaddingAction {
282        /// The leg to event on.
283        leg: UniqId,
284        /// The event to take.
285        padding_event: PaddingEvent,
286    },
287    /// Perform a clean shutdown on this circuit.
288    CleanShutdown,
289}
290
291impl RunOnceCmdInner {
292    /// Create a [`RunOnceCmdInner`] out of a [`CircuitCmd`] and [`UniqId`].
293    fn from_circuit_cmd(leg: UniqId, cmd: CircuitCmd) -> Self {
294        match cmd {
295            CircuitCmd::Send(cell) => Self::Send {
296                leg,
297                cell,
298                done: None,
299            },
300            CircuitCmd::HandleSendMe { hop, sendme } => Self::HandleSendMe { leg, hop, sendme },
301            CircuitCmd::CloseStream {
302                hop,
303                sid,
304                behav,
305                reason,
306            } => Self::CloseStream {
307                hop: HopLocation::Hop((leg, hop)),
308                sid,
309                behav,
310                reason,
311                done: None,
312            },
313            #[cfg(feature = "conflux")]
314            CircuitCmd::Conflux(ConfluxCmd::RemoveLeg(reason)) => Self::RemoveLeg { leg, reason },
315            #[cfg(feature = "conflux")]
316            CircuitCmd::Conflux(ConfluxCmd::HandshakeComplete { hop, early, cell }) => {
317                let cell = SendRelayCell {
318                    hop: Some(hop),
319                    early,
320                    cell,
321                };
322                Self::ConfluxHandshakeComplete { leg, cell }
323            }
324            #[cfg(feature = "conflux")]
325            CircuitCmd::Enqueue(msg) => Self::Enqueue { leg, msg },
326            CircuitCmd::CleanShutdown => Self::CleanShutdown,
327        }
328    }
329}
330
331/// A command to execute at the end of [`Reactor::run_once`].
332#[derive(From, Debug)]
333#[allow(clippy::large_enum_variant)] // TODO #2003: should we resolve this?
334enum CircuitEvent {
335    /// Run a single `CircuitCmd` command.
336    RunCmd {
337        /// The unique identifier of the circuit leg to run the command on
338        leg: UniqId,
339        /// The command to run.
340        cmd: CircuitCmd,
341    },
342    /// Handle a control message
343    HandleControl(CtrlMsg),
344    /// Handle an input message.
345    HandleCell {
346        /// The unique identifier of the circuit leg the message was received on.
347        leg: UniqId,
348        /// The message to handle.
349        cell: ClientCircChanMsg,
350    },
351    /// Remove the specified circuit leg from the conflux set.
352    ///
353    /// Returned whenever a single circuit leg needs to be removed
354    /// from the reactor's conflux set, without necessarily tearing down
355    /// the whole set or shutting down the reactor.
356    ///
357    /// Note: this event *can* cause the reactor to shut down
358    /// (and the conflux set to be closed).
359    ///
360    /// See the [`ConfluxSet::remove`] docs for more on the exact behavior of this command.
361    RemoveLeg {
362        /// The leg to remove.
363        leg: UniqId,
364        /// The reason for removal.
365        ///
366        /// This is only used for conflux circuits that get removed
367        /// before the conflux handshake is complete.
368        ///
369        /// The [`RemoveLegReason`] is mapped by the reactor to a
370        /// [`ConfluxHandshakeError`] that is sent to the initiator of the
371        /// handshake to indicate the reason the handshake failed.
372        reason: RemoveLegReason,
373    },
374    /// Take some event (blocking or unblocking a circuit, or sending padding)
375    /// based on the circuit padding backend code.
376    PaddingAction {
377        /// The leg on which to take the padding event .
378        leg: UniqId,
379        /// The event to take.
380        padding_event: PaddingEvent,
381    },
382    /// Protocol violation. This leads for now to the close of the circuit reactor. The
383    /// error causing the violation is set in err.
384    ProtoViolation {
385        /// The error that causes this protocol violation.
386        err: crate::Error,
387    },
388}
389
390impl CircuitEvent {
391    /// Return the ordering with which we should handle this event
392    /// within a list of events returned by a single call to next_circ_event().
393    ///
394    /// NOTE: Please do not make this any more complicated:
395    /// It is a consequence of a kludge that we need this sorting at all.
396    /// Assuming that eventually, we switch away from the current
397    /// poll-oriented `next_circ_event` design,
398    /// we may be able to get rid of this entirely.
399    fn order_within_batch(&self) -> u8 {
400        use CircuitEvent as CA;
401        use PaddingEvent as PE;
402        // This immediate state MUST NOT be used for events emitting cells. At the moment, it is
403        // only used by the protocol violation event which leads to a shutdown of the reactor.
404        const IMMEDIATE: u8 = 0;
405        const EARLY: u8 = 1;
406        const NORMAL: u8 = 2;
407        const LATE: u8 = 3;
408
409        // We use this ordering to move any "StartBlocking" to the _end_ of a batch and
410        // "StopBlocking" to the start.
411        //
412        // This way, we can be sure that we will handle any "send data" operations
413        // (and tell the Padder about them) _before_  we tell the Padder
414        // that we have blocked the circuit.
415        //
416        // This keeps things a bit more logical.
417        match self {
418            CA::RunCmd { .. } => NORMAL,
419            CA::HandleControl(..) => NORMAL,
420            CA::HandleCell { .. } => NORMAL,
421            CA::RemoveLeg { .. } => NORMAL,
422            #[cfg(feature = "circ-padding")]
423            CA::PaddingAction { padding_event, .. } => match padding_event {
424                PE::StopBlocking => EARLY,
425                PE::SendPadding(..) => NORMAL,
426                PE::StartBlocking(..) => LATE,
427            },
428            #[cfg(not(feature = "circ-padding"))]
429            CA::PaddingAction { .. } => NORMAL,
430            CA::ProtoViolation { .. } => IMMEDIATE,
431        }
432    }
433}
434
435/// An object that's waiting for a meta cell (one not associated with a stream) in order to make
436/// progress.
437///
438/// # Background
439///
440/// The `Reactor` can't have async functions that send and receive cells, because its job is to
441/// send and receive cells: if one of its functions tried to do that, it would just hang forever.
442///
443/// To get around this problem, the reactor can send some cells, and then make one of these
444/// `MetaCellHandler` objects, which will be run when the reply arrives.
445pub(crate) trait MetaCellHandler: Send {
446    /// The hop we're expecting the message to come from. This is compared against the hop
447    /// from which we actually receive messages, and an error is thrown if the two don't match.
448    fn expected_hop(&self) -> HopLocation;
449    /// Called when the message we were waiting for arrives.
450    ///
451    /// Gets a copy of the `Reactor` in order to do anything it likes there.
452    ///
453    /// If this function returns an error, the reactor will shut down.
454    fn handle_msg(
455        &mut self,
456        msg: UnparsedRelayMsg,
457        reactor: &mut Circuit,
458    ) -> Result<MetaCellDisposition>;
459}
460
461/// A possible successful outcome of giving a message to a [`MsgHandler`](super::msghandler::MsgHandler).
462#[derive(Debug, Clone, PartialEq)]
463#[cfg_attr(feature = "send-control-msg", visibility::make(pub))]
464#[non_exhaustive]
465pub(crate) enum MetaCellDisposition {
466    /// The message was consumed; the handler should remain installed.
467    #[cfg(feature = "send-control-msg")]
468    Consumed,
469    /// The message was consumed; the handler should be uninstalled.
470    ConversationFinished,
471    /// The message was consumed; the circuit should be closed.
472    #[cfg(feature = "send-control-msg")]
473    CloseCirc,
474    // TODO: Eventually we might want the ability to have multiple handlers
475    // installed, and to let them say "not for me, maybe for somebody else?".
476    // But right now we don't need that.
477}
478
479/// Unwrap the specified [`Option`], returning a [`ReactorError::Shutdown`] if it is `None`.
480///
481/// This is a macro instead of a function to work around borrowck errors
482/// in the select! from run_once().
483macro_rules! unwrap_or_shutdown {
484    ($self:expr, $res:expr, $reason:expr) => {{
485        match $res {
486            None => {
487                trace!(
488                    tunnel_id = %$self.tunnel_id,
489                    reason = %$reason,
490                    "reactor shutdown"
491                );
492                Err(ReactorError::Shutdown)
493            }
494            Some(v) => Ok(v),
495        }
496    }};
497}
498
499/// Object to handle incoming cells and background tasks on a circuit
500///
501/// This type is returned when you finish a circuit; you need to spawn a
502/// new task that calls `run()` on it.
503#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
504pub struct Reactor {
505    /// Receiver for control messages for this reactor, sent by `ClientCirc` objects.
506    ///
507    /// This channel is polled in [`Reactor::run_once`], but only if the `chan_sender` sink
508    /// is ready to accept cells.
509    control: mpsc::UnboundedReceiver<CtrlMsg>,
510    /// Receiver for command messages for this reactor, sent by `ClientCirc` objects.
511    ///
512    /// This channel is polled in [`Reactor::run_once`].
513    ///
514    /// NOTE: this is a separate channel from `control`, because some messages
515    /// have higher priority and need to be handled even if the `chan_sender` is not
516    /// ready (whereas `control` messages are not read until the `chan_sender` sink
517    /// is ready to accept cells).
518    command: mpsc::UnboundedReceiver<CtrlCmd>,
519    /// A oneshot sender that is used to alert other tasks when this reactor is
520    /// finally dropped.
521    ///
522    /// It is a sender for Void because we never actually want to send anything here;
523    /// we only want to generate canceled events.
524    #[allow(dead_code)] // the only purpose of this field is to be dropped.
525    reactor_closed_tx: oneshot::Sender<void::Void>,
526    /// A set of circuits that form a tunnel.
527    ///
528    /// Contains 1 or more circuits.
529    ///
530    /// Circuits may be added to this set throughout the lifetime of the reactor.
531    ///
532    /// Sometimes, the reactor will remove circuits from this set,
533    /// for example if the `LINKED` message takes too long to arrive,
534    /// or if congestion control negotiation fails.
535    /// The reactor will continue running with the remaining circuits.
536    /// It will shut down if *all* the circuits are removed.
537    ///
538    // TODO(conflux): document all the reasons why the reactor might
539    // chose to tear down a circuit or tunnel (timeouts, protocol violations, etc.)
540    circuits: ConfluxSet,
541    /// An identifier for logging about this tunnel reactor.
542    tunnel_id: TunnelId,
543    /// Handlers, shared with `Circuit`.
544    cell_handlers: CellHandlers,
545    /// The time provider, used for conflux handshake timeouts.
546    runtime: DynTimeProvider,
547    /// The conflux handshake context, if there is an on-going handshake.
548    ///
549    /// Set to `None` if this is a single-path tunnel,
550    /// or if none of the circuit legs from our conflux set
551    /// are currently in the conflux handshake phase.
552    #[cfg(feature = "conflux")]
553    conflux_hs_ctx: Option<ConfluxHandshakeCtx>,
554    /// A min-heap buffering all the out-of-order messages received so far.
555    ///
556    /// TODO(conflux): this becomes a DoS vector unless we impose a limit
557    /// on its size. We should make this participate in the memquota memory
558    /// tracking system, somehow.
559    #[cfg(feature = "conflux")]
560    ooo_msgs: BinaryHeap<ConfluxHeapEntry>,
561}
562
563/// The context for an on-going conflux handshake.
564#[cfg(feature = "conflux")]
565struct ConfluxHandshakeCtx {
566    /// A channel for notifying the caller of the outcome of a CONFLUX_LINK request.
567    answer: ConfluxLinkResultChannel,
568    /// The number of legs that are currently doing the handshake.
569    num_legs: usize,
570    /// The handshake results we have collected so far.
571    results: ConfluxHandshakeResult,
572}
573
574/// An out-of-order message buffered in [`Reactor::ooo_msgs`].
575#[derive(Debug)]
576#[cfg(feature = "conflux")]
577struct ConfluxHeapEntry {
578    /// The leg id this message came from.
579    leg_id: UniqId,
580    /// The out of order message
581    msg: OooRelayMsg,
582}
583
584#[cfg(feature = "conflux")]
585impl Ord for ConfluxHeapEntry {
586    fn cmp(&self, other: &Self) -> Ordering {
587        self.msg.cmp(&other.msg)
588    }
589}
590
591#[cfg(feature = "conflux")]
592impl PartialOrd for ConfluxHeapEntry {
593    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
594        Some(self.cmp(other))
595    }
596}
597
598#[cfg(feature = "conflux")]
599impl PartialEq for ConfluxHeapEntry {
600    fn eq(&self, other: &Self) -> bool {
601        self.msg == other.msg
602    }
603}
604
605#[cfg(feature = "conflux")]
606impl Eq for ConfluxHeapEntry {}
607
608/// Cell handlers, shared between the Reactor and its underlying `Circuit`s.
609struct CellHandlers {
610    /// A handler for a meta cell, together with a result channel to notify on completion.
611    ///
612    /// NOTE(prop349): this is part of Arti's "Base Circuit Hop Handler".
613    ///
614    /// Upon sending an EXTEND cell, the [`ControlHandler`] sets this handler
615    /// to [`CircuitExtender`](circuit::extender::CircuitExtender).
616    /// The handler is then used in [`Circuit::handle_meta_cell`] for handling
617    /// all the meta cells received on the circuit that are not SENDMEs or TRUNCATE
618    /// (which are handled separately) or conflux cells
619    /// (which are handled by the conflux handlers).
620    ///
621    /// The handler is uninstalled after the receipt of the EXTENDED cell,
622    /// so any subsequent EXTENDED cells will cause the circuit to be torn down.
623    meta_handler: Option<Box<dyn MetaCellHandler + Send>>,
624    /// A handler for incoming stream requests.
625    #[cfg(feature = "hs-service")]
626    incoming_stream_req_handler: Option<IncomingStreamRequestHandler>,
627}
628
629impl Reactor {
630    /// Create a new circuit reactor.
631    ///
632    /// The reactor will send outbound messages on `channel`, receive incoming
633    /// messages on `input`, and identify this circuit by the channel-local
634    /// [`CircId`] provided.
635    ///
636    /// The internal unique identifier for this circuit will be `unique_id`.
637    #[allow(clippy::type_complexity, clippy::too_many_arguments)] // TODO
638    pub(super) fn new(
639        channel: Arc<Channel>,
640        channel_id: CircId,
641        unique_id: UniqId,
642        input: CircuitRxReceiver,
643        runtime: DynTimeProvider,
644        memquota: CircuitAccount,
645        padding_ctrl: PaddingController,
646        padding_stream: PaddingEventStream,
647        timeouts: Arc<dyn TimeoutEstimator + Send>,
648    ) -> (
649        Self,
650        mpsc::UnboundedSender<CtrlMsg>,
651        mpsc::UnboundedSender<CtrlCmd>,
652        oneshot::Receiver<void::Void>,
653        Arc<TunnelMutableState>,
654    ) {
655        let tunnel_id = TunnelId::next();
656        let (control_tx, control_rx) = mpsc::unbounded();
657        let (command_tx, command_rx) = mpsc::unbounded();
658        let mutable = Arc::new(MutableState::default());
659
660        let (reactor_closed_tx, reactor_closed_rx) = oneshot::channel();
661
662        let cell_handlers = CellHandlers {
663            meta_handler: None,
664            #[cfg(feature = "hs-service")]
665            incoming_stream_req_handler: None,
666        };
667
668        let unique_id = TunnelScopedCircId::new(tunnel_id, unique_id);
669        let circuit_leg = Circuit::new(
670            runtime.clone(),
671            channel,
672            channel_id,
673            unique_id,
674            input,
675            memquota,
676            Arc::clone(&mutable),
677            padding_ctrl,
678            padding_stream,
679            timeouts,
680        );
681
682        let (circuits, mutable) = ConfluxSet::new(tunnel_id, circuit_leg);
683
684        let reactor = Reactor {
685            circuits,
686            control: control_rx,
687            command: command_rx,
688            reactor_closed_tx,
689            tunnel_id,
690            cell_handlers,
691            runtime,
692            #[cfg(feature = "conflux")]
693            conflux_hs_ctx: None,
694            #[cfg(feature = "conflux")]
695            ooo_msgs: Default::default(),
696        };
697
698        (reactor, control_tx, command_tx, reactor_closed_rx, mutable)
699    }
700
701    /// Launch the reactor, and run until the circuit closes or we
702    /// encounter an error.
703    ///
704    /// Once this method returns, the circuit is dead and cannot be
705    /// used again.
706    #[instrument(level = "trace", skip_all)]
707    pub async fn run(mut self) -> Result<()> {
708        trace!(tunnel_id = %self.tunnel_id, "Running tunnel reactor");
709        let result: Result<()> = loop {
710            match self.run_once().await {
711                Ok(()) => (),
712                Err(ReactorError::Shutdown) => break Ok(()),
713                Err(ReactorError::Err(e)) => break Err(e),
714            }
715        };
716
717        // Log that the reactor stopped, possibly with the associated error as a report.
718        // May log at a higher level depending on the error kind.
719        const MSG: &str = "Tunnel reactor stopped";
720        match &result {
721            Ok(()) => trace!(tunnel_id = %self.tunnel_id, "{MSG}"),
722            Err(e) => debug_report!(e, tunnel_id = %self.tunnel_id, "{MSG}"),
723        }
724
725        result
726    }
727
728    /// Helper for run: doesn't mark the circuit closed on finish.  Only
729    /// processes one cell or control message.
730    #[instrument(level = "trace", skip_all)]
731    async fn run_once(&mut self) -> StdResult<(), ReactorError> {
732        // If all the circuits are closed, shut down the reactor
733        if self.circuits.is_empty() {
734            trace!(
735                tunnel_id = %self.tunnel_id,
736                "Tunnel reactor shutting down: all circuits have closed",
737            );
738
739            return Err(ReactorError::Shutdown);
740        }
741
742        // If this is a single path circuit, we need to wait until the first hop
743        // is created before doing anything else
744        let single_path_with_hops = self
745            .circuits
746            .single_leg_mut()
747            .is_ok_and(|leg| !leg.has_hops());
748        if single_path_with_hops {
749            self.wait_for_create().await?;
750
751            return Ok(());
752        }
753
754        // Prioritize the buffered messages.
755        //
756        // Note: if any of the messages are ready to be handled,
757        // this will block the reactor until we are done processing them
758        //
759        // TODO circpad: If this is a problem, we might want to re-order things so that we
760        // prioritize padding instead.  On the other hand, this should be fixed by refactoring
761        // circuit and tunnel reactors, so we might do well to just leave it alone for now.
762        #[cfg(feature = "conflux")]
763        self.try_dequeue_ooo_msgs().await?;
764
765        let mut events = select_biased! {
766            res = self.command.next() => {
767                let cmd = unwrap_or_shutdown!(self, res, "command channel drop")?;
768                return ControlHandler::new(self).handle_cmd(cmd);
769            },
770            // Check whether we've got a control message pending.
771            //
772            // Note: unfortunately, reading from control here means we might start
773            // handling control messages before our chan_senders are ready.
774            // With the current design, this is inevitable: we can't know which circuit leg
775            // a control message is meant for without first reading the control message from
776            // the channel, and at that point, we can't know for sure whether that particular
777            // circuit is ready for sending.
778            ret = self.control.next() => {
779                let msg = unwrap_or_shutdown!(self, ret, "control drop")?;
780                smallvec![CircuitEvent::HandleControl(msg)]
781            },
782            res = self.circuits.next_circ_event(&self.runtime).fuse() => res?,
783        };
784
785        // Put the events into the order that we need to execute them in.
786        //
787        // (Yes, this _does_ have to be a stable sort.  Not all events may be freely re-ordered
788        // with respect to one another.)
789        events.sort_by_key(|a| a.order_within_batch());
790
791        for event in events {
792            let cmd = match event {
793                CircuitEvent::RunCmd { leg, cmd } => Some(RunOnceCmd::Single(
794                    RunOnceCmdInner::from_circuit_cmd(leg, cmd),
795                )),
796                CircuitEvent::HandleControl(ctrl) => ControlHandler::new(self)
797                    .handle_msg(ctrl)?
798                    .map(RunOnceCmd::Single),
799                CircuitEvent::HandleCell { leg, cell } => {
800                    let circ = self
801                        .circuits
802                        .leg_mut(leg)
803                        .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
804
805                    let circ_cmds = circ.handle_cell(&mut self.cell_handlers, leg, cell)?;
806                    if circ_cmds.is_empty() {
807                        None
808                    } else {
809                        // TODO: we return RunOnceCmd::Multiple even if there's a single command.
810                        //
811                        // See the TODO on `Circuit::handle_cell`.
812                        let cmd = RunOnceCmd::Multiple(
813                            circ_cmds
814                                .into_iter()
815                                .map(|cmd| RunOnceCmdInner::from_circuit_cmd(leg, cmd))
816                                .collect(),
817                        );
818
819                        Some(cmd)
820                    }
821                }
822                CircuitEvent::RemoveLeg { leg, reason } => {
823                    Some(RunOnceCmdInner::RemoveLeg { leg, reason }.into())
824                }
825                CircuitEvent::PaddingAction { leg, padding_event } => {
826                    cfg_if! {
827                        if #[cfg(feature = "circ-padding")] {
828                            Some(RunOnceCmdInner::PaddingAction { leg, padding_event }.into())
829                        } else {
830                            // If padding isn't enabled, we never generate a padding event,
831                            // so we can be sure this case will never be called.
832                            void::unreachable(padding_event.0);
833                        }
834                    }
835                }
836                CircuitEvent::ProtoViolation { err } => {
837                    return Err(err.into());
838                }
839            };
840
841            if let Some(cmd) = cmd {
842                self.handle_run_once_cmd(cmd).await?;
843            }
844        }
845
846        Ok(())
847    }
848
849    /// Try to process the previously-out-of-order messages we might have buffered.
850    #[cfg(feature = "conflux")]
851    #[instrument(level = "trace", skip_all)]
852    async fn try_dequeue_ooo_msgs(&mut self) -> StdResult<(), ReactorError> {
853        // Check if we're ready to dequeue any of the previously out-of-order cells.
854        while let Some(entry) = self.ooo_msgs.peek() {
855            let should_pop = self.circuits.is_seqno_in_order(entry.msg.seqno);
856
857            if !should_pop {
858                break;
859            }
860
861            let entry = self.ooo_msgs.pop().expect("item just disappeared?!");
862
863            let circ = self
864                .circuits
865                .leg_mut(entry.leg_id)
866                .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
867            let handlers = &mut self.cell_handlers;
868            let cmd = circ
869                .handle_in_order_relay_msg(
870                    handlers,
871                    entry.msg.hopnum,
872                    entry.leg_id,
873                    entry.msg.cell_counts_towards_windows,
874                    entry.msg.streamid,
875                    entry.msg.msg,
876                )?
877                .map(|cmd| {
878                    RunOnceCmd::Single(RunOnceCmdInner::from_circuit_cmd(entry.leg_id, cmd))
879                });
880
881            if let Some(cmd) = cmd {
882                self.handle_run_once_cmd(cmd).await?;
883            }
884        }
885
886        Ok(())
887    }
888
889    /// Handle a [`RunOnceCmd`].
890    #[instrument(level = "trace", skip_all)]
891    async fn handle_run_once_cmd(&mut self, cmd: RunOnceCmd) -> StdResult<(), ReactorError> {
892        match cmd {
893            RunOnceCmd::Single(cmd) => return self.handle_single_run_once_cmd(cmd).await,
894            RunOnceCmd::Multiple(cmds) => {
895                // While we know `sendable` is ready to accept *one* cell,
896                // we can't be certain it will be able to accept *all* of the cells
897                // that need to be sent here. This means we *may* end up buffering
898                // in its underlying SometimesUnboundedSink! That is OK, because
899                // RunOnceCmd::Multiple is only used for handling packed cells.
900                for cmd in cmds {
901                    self.handle_single_run_once_cmd(cmd).await?;
902                }
903            }
904        }
905
906        Ok(())
907    }
908
909    /// Handle a [`RunOnceCmd`].
910    #[instrument(level = "trace", skip_all)]
911    async fn handle_single_run_once_cmd(
912        &mut self,
913        cmd: RunOnceCmdInner,
914    ) -> StdResult<(), ReactorError> {
915        match cmd {
916            RunOnceCmdInner::Send { leg, cell, done } => {
917                // TODO: check the cc window
918                let res = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
919                if let Some(done) = done {
920                    // Don't care if the receiver goes away
921                    let _ = done.send(res.clone());
922                }
923                res?;
924            }
925            #[cfg(feature = "send-control-msg")]
926            RunOnceCmdInner::SendMsgAndInstallHandler { msg, handler, done } => {
927                let cell: Result<Option<SendRelayCell>> =
928                    self.prepare_msg_and_install_handler(msg, handler);
929
930                match cell {
931                    Ok(Some(cell)) => {
932                        // TODO(conflux): let the RunOnceCmdInner specify which leg to send the cell on
933                        let outcome = self.circuits.send_relay_cell_on_leg(cell, None).await;
934                        // don't care if receiver goes away.
935                        let _ = done.send(outcome.clone());
936                        outcome?;
937                    }
938                    Ok(None) => {
939                        // don't care if receiver goes away.
940                        let _ = done.send(Ok(()));
941                    }
942                    Err(e) => {
943                        // don't care if receiver goes away.
944                        let _ = done.send(Err(e.clone()));
945                        return Err(e.into());
946                    }
947                }
948            }
949            RunOnceCmdInner::BeginStream {
950                leg,
951                cell,
952                stream_id,
953                hop,
954                stream_components,
955                done,
956            } => {
957                let circ = self
958                    .circuits
959                    .leg_mut(leg)
960                    .ok_or_else(|| internal!("leg disappeared?!"))?;
961                let cell_hop = cell.hop.expect("missing hop in client SendRelayCell?!");
962                let relay_format = circ
963                    .hop_mut(cell_hop)
964                    // TODO: Is this the right error type here? Or should there be a "HopDisappeared"?
965                    .ok_or(Error::NoSuchHop)?
966                    .relay_cell_format();
967
968                let outcome = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
969                // don't care if receiver goes away.
970                let _ = done.send(
971                    outcome
972                        .clone()
973                        .map(|_| (stream_id, hop, relay_format, stream_components)),
974                );
975                outcome?;
976            }
977            RunOnceCmdInner::CloseStream {
978                hop,
979                sid,
980                behav,
981                reason,
982                done,
983            } => {
984                let result = {
985                    let (leg_id, hop_num) = self
986                        .resolve_hop_location(hop)
987                        .map_err(into_bad_api_usage!("Could not resolve {hop:?}"))?;
988                    let leg = self
989                        .circuits
990                        .leg_mut(leg_id)
991                        .ok_or(bad_api_usage!("No leg for id {:?}", leg_id))?;
992                    Ok::<_, Bug>((leg, hop_num))
993                };
994
995                let (leg, hop_num) = match result {
996                    Ok(x) => x,
997                    Err(e) => {
998                        if let Some(done) = done {
999                            // don't care if the sender goes away
1000                            let e = into_bad_api_usage!("Could not resolve {hop:?}")(e);
1001                            let _ = done.send(Err(e.into()));
1002                        }
1003                        return Ok(());
1004                    }
1005                };
1006
1007                let max_rtt = {
1008                    let hop = leg
1009                        .hop(hop_num)
1010                        .ok_or_else(|| internal!("the hop we resolved disappeared?!"))?;
1011                    let ccontrol = hop.ccontrol();
1012
1013                    // Note: if we have no measurements for the RTT, this will be set to 0,
1014                    // and the timeout will be 2 * CBT.
1015                    ccontrol
1016                        .rtt()
1017                        .max_rtt_usec()
1018                        .map(|rtt| Duration::from_millis(u64::from(rtt)))
1019                        .unwrap_or_default()
1020                };
1021
1022                // The length of the circuit up until the hop that has the half-streeam.
1023                //
1024                // +1, because HopNums are zero-based.
1025                let circ_len = usize::from(hop_num) + 1;
1026
1027                // We double the CBT to account for rend circuits,
1028                // which are twice as long (otherwise we risk expiring
1029                // the rend half-streams too soon).
1030                let timeout = std::cmp::max(max_rtt, 2 * leg.estimate_cbt(circ_len));
1031                let expire_at = self.runtime.now() + timeout;
1032
1033                let res: Result<()> = leg
1034                    .close_stream(hop_num, sid, behav, reason, expire_at)
1035                    .await;
1036
1037                if let Some(done) = done {
1038                    // don't care if the sender goes away
1039                    let _ = done.send(res);
1040                }
1041            }
1042            RunOnceCmdInner::MaybeSendXon {
1043                rate,
1044                stream_id,
1045                hop,
1046            } => {
1047                let (leg_id, hop_num) = match self.resolve_hop_location(hop) {
1048                    Ok(x) => x,
1049                    Err(NoJoinPointError) => {
1050                        // A stream tried to send an XON message message to the join point of
1051                        // a tunnel that has never had a join point. Currently in arti, only a
1052                        // `StreamTarget` asks us to send an XON message, and this tunnel
1053                        // originally created the `StreamTarget` to begin with. So this is a
1054                        // legitimate bug somewhere in the tunnel code.
1055                        return Err(
1056                            internal!(
1057                                "Could not send an XON message to a join point on a tunnel without a join point",
1058                            )
1059                            .into()
1060                        );
1061                    }
1062                };
1063
1064                let Some(leg) = self.circuits.leg_mut(leg_id) else {
1065                    // The leg has disappeared. This is fine since the stream may have ended and
1066                    // been cleaned up while this `CtrlMsg::MaybeSendXon` message was queued.
1067                    // It is possible that is a bug and this is an incorrect leg number, but
1068                    // it's not currently possible to differentiate between an incorrect leg
1069                    // number and a tunnel leg that has been closed.
1070                    debug!("Could not send an XON message on a leg that does not exist. Ignoring.");
1071                    return Ok(());
1072                };
1073
1074                let Some(hop) = leg.hop_mut(hop_num) else {
1075                    // The hop has disappeared. This is fine since the circuit may have been
1076                    // been truncated while the `CtrlMsg::MaybeSendXon` message was queued.
1077                    // It is possible that is a bug and this is an incorrect hop number, but
1078                    // it's not currently possible to differentiate between an incorrect hop
1079                    // number and a circuit hop that has been removed.
1080                    debug!("Could not send an XON message on a hop that does not exist. Ignoring.");
1081                    return Ok(());
1082                };
1083
1084                let Some(msg) = hop.maybe_send_xon(rate, stream_id)? else {
1085                    // Nothing to do.
1086                    return Ok(());
1087                };
1088
1089                let cell = AnyRelayMsgOuter::new(Some(stream_id), msg.into());
1090                let cell = SendRelayCell {
1091                    hop: Some(hop_num),
1092                    early: false,
1093                    cell,
1094                };
1095
1096                leg.send_relay_cell(cell).await?;
1097            }
1098            RunOnceCmdInner::HandleSendMe { leg, hop, sendme } => {
1099                let leg = self
1100                    .circuits
1101                    .leg_mut(leg)
1102                    .ok_or_else(|| internal!("leg disappeared?!"))?;
1103                // NOTE: it's okay to await. We are only awaiting on the congestion_signals
1104                // future which *should* resolve immediately
1105                let signals = leg.chan_sender.congestion_signals().await;
1106                leg.handle_sendme(hop, sendme, signals)?;
1107            }
1108            RunOnceCmdInner::FirstHopClockSkew { answer } => {
1109                let res = self.circuits.single_leg_mut().map(|leg| leg.clock_skew());
1110
1111                // don't care if the sender goes away
1112                let _ = answer.send(res.map_err(Into::into));
1113            }
1114            RunOnceCmdInner::CleanShutdown => {
1115                trace!(tunnel_id = %self.tunnel_id, "reactor shutdown due to handled cell");
1116                return Err(ReactorError::Shutdown);
1117            }
1118            RunOnceCmdInner::RemoveLeg { leg, reason } => {
1119                debug!(tunnel_id = %self.tunnel_id, reason = %reason, "removing circuit leg");
1120
1121                let circ = self.circuits.remove(leg)?;
1122                let is_conflux_pending = circ.is_conflux_pending();
1123
1124                // Drop the removed leg. This will cause it to close if it's not already closed.
1125                drop(circ);
1126
1127                // If we reach this point, it means we have more than one leg
1128                // (otherwise the .remove() would've returned a Shutdown error),
1129                // so we expect there to be a ConfluxHandshakeContext installed.
1130
1131                #[cfg(feature = "conflux")]
1132                if is_conflux_pending {
1133                    let (error, proto_violation): (_, Option<Error>) = match &reason {
1134                        RemoveLegReason::ConfluxHandshakeTimeout => {
1135                            (ConfluxHandshakeError::Timeout, None)
1136                        }
1137                        RemoveLegReason::ConfluxHandshakeErr(e) => {
1138                            (ConfluxHandshakeError::Link(e.clone()), Some(e.clone()))
1139                        }
1140                        RemoveLegReason::ChannelClosed => {
1141                            (ConfluxHandshakeError::ChannelClosed, None)
1142                        }
1143                    };
1144
1145                    self.note_conflux_handshake_result(Err(error), proto_violation.is_some())?;
1146
1147                    if let Some(e) = proto_violation {
1148                        tor_error::warn_report!(
1149                            e,
1150                            tunnel_id = %self.tunnel_id,
1151                            "Malformed conflux handshake, tearing down tunnel",
1152                        );
1153
1154                        return Err(e.into());
1155                    }
1156                }
1157            }
1158            #[cfg(feature = "conflux")]
1159            RunOnceCmdInner::ConfluxHandshakeComplete { leg, cell } => {
1160                // Note: on the client-side, the handshake is considered complete once the
1161                // RELAY_CONFLUX_LINKED_ACK is sent (roughly upon receipt of the LINKED cell).
1162                //
1163                // We're optimistic here, and declare the handshake a success *before*
1164                // sending the LINKED_ACK response. I think this is OK though,
1165                // because if the send_relay_cell() below fails, the reactor will shut
1166                // down anyway. OTOH, marking the handshake as complete slightly early
1167                // means that on the happy path, the circuit is marked as usable sooner,
1168                // instead of blocking on the sending of the LINKED_ACK.
1169                self.note_conflux_handshake_result(Ok(()), false)?;
1170
1171                let res = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
1172
1173                res?;
1174            }
1175            #[cfg(feature = "conflux")]
1176            RunOnceCmdInner::Link { circuits, answer } => {
1177                // Add the specified circuits to our conflux set,
1178                // and send a LINK cell down each unlinked leg.
1179                //
1180                // NOTE: this will block the reactor until all the cells are sent.
1181                self.handle_link_circuits(circuits, answer).await?;
1182            }
1183            #[cfg(feature = "conflux")]
1184            RunOnceCmdInner::Enqueue { leg, msg } => {
1185                let entry = ConfluxHeapEntry { leg_id: leg, msg };
1186                self.ooo_msgs.push(entry);
1187            }
1188            #[cfg(feature = "circ-padding")]
1189            RunOnceCmdInner::PaddingAction { leg, padding_event } => {
1190                // TODO: If we someday move back to having a per-circuit reactor,
1191                // this event would logically belong there, not on the tunnel reactor.
1192                self.circuits.run_padding_event(leg, padding_event).await?;
1193            }
1194        }
1195
1196        Ok(())
1197    }
1198
1199    /// Wait for a [`CtrlMsg::Create`] to come along to set up the circuit.
1200    ///
1201    /// Returns an error if an unexpected `CtrlMsg` is received.
1202    #[instrument(level = "trace", skip_all)]
1203    async fn wait_for_create(&mut self) -> StdResult<(), ReactorError> {
1204        let msg = select_biased! {
1205            res = self.command.next() => {
1206                let cmd = unwrap_or_shutdown!(self, res, "shutdown channel drop")?;
1207                match cmd {
1208                    CtrlCmd::Shutdown => return self.handle_shutdown().map(|_| ()),
1209                    #[cfg(test)]
1210                    CtrlCmd::AddFakeHop {
1211                        relay_cell_format: format,
1212                        fwd_lasthop,
1213                        rev_lasthop,
1214                        peer_id,
1215                        params,
1216                        done,
1217                    } => {
1218                        let leg = self.circuits.single_leg_mut()?;
1219                        leg.handle_add_fake_hop(format, fwd_lasthop, rev_lasthop, peer_id, &params, done);
1220                        return Ok(())
1221                    },
1222                    _ => {
1223                        trace!("reactor shutdown due to unexpected command: {:?}", cmd);
1224                        return Err(Error::CircProto(format!("Unexpected control {cmd:?} on client circuit")).into());
1225                    }
1226                }
1227            },
1228            res = self.control.next() => unwrap_or_shutdown!(self, res, "control drop")?,
1229        };
1230
1231        match msg {
1232            CtrlMsg::Create {
1233                recv_created,
1234                handshake,
1235                settings,
1236                done,
1237            } => {
1238                // TODO(conflux): instead of crashing the reactor, it might be better
1239                // to send the error via the done channel instead
1240                let leg = self.circuits.single_leg_mut()?;
1241                leg.handle_create(recv_created, handshake, settings, done)
1242                    .await
1243            }
1244            _ => {
1245                trace!("reactor shutdown due to unexpected cell: {:?}", msg);
1246
1247                Err(Error::CircProto(format!("Unexpected {msg:?} cell on client circuit")).into())
1248            }
1249        }
1250    }
1251
1252    /// Add the specified handshake result to our `ConfluxHandshakeContext`.
1253    ///
1254    /// If all the circuits we were waiting on have finished the conflux handshake,
1255    /// the `ConfluxHandshakeContext` is consumed, and the results we have collected
1256    /// are sent to the handshake initiator.
1257    #[cfg(feature = "conflux")]
1258    #[instrument(level = "trace", skip_all)]
1259    fn note_conflux_handshake_result(
1260        &mut self,
1261        res: StdResult<(), ConfluxHandshakeError>,
1262        reactor_is_closing: bool,
1263    ) -> StdResult<(), ReactorError> {
1264        let tunnel_complete = match self.conflux_hs_ctx.as_mut() {
1265            Some(conflux_ctx) => {
1266                conflux_ctx.results.push(res);
1267                // Whether all the legs have finished linking:
1268                conflux_ctx.results.len() == conflux_ctx.num_legs
1269            }
1270            None => {
1271                return Err(internal!("no conflux handshake context").into());
1272            }
1273        };
1274
1275        if tunnel_complete || reactor_is_closing {
1276            // Time to remove the conflux handshake context
1277            // and extract the results we have collected
1278            let conflux_ctx = self.conflux_hs_ctx.take().expect("context disappeared?!");
1279
1280            let success_count = conflux_ctx.results.iter().filter(|res| res.is_ok()).count();
1281            let leg_count = conflux_ctx.results.len();
1282
1283            info!(
1284                tunnel_id = %self.tunnel_id,
1285                "conflux tunnel ready ({success_count}/{leg_count} circuits successfully linked)",
1286            );
1287
1288            send_conflux_outcome(conflux_ctx.answer, Ok(conflux_ctx.results))?;
1289
1290            // We don't expect to receive any more handshake results,
1291            // at least not until we get another LinkCircuits control message,
1292            // which will install a new ConfluxHandshakeCtx with a channel
1293            // for us to send updates on
1294        }
1295
1296        Ok(())
1297    }
1298
1299    /// Prepare a `SendRelayCell` request, and install the given meta-cell handler.
1300    fn prepare_msg_and_install_handler(
1301        &mut self,
1302        msg: Option<AnyRelayMsgOuter>,
1303        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
1304    ) -> Result<Option<SendRelayCell>> {
1305        let msg = msg
1306            .map(|msg| {
1307                let handlers = &mut self.cell_handlers;
1308                let handler = handler
1309                    .as_ref()
1310                    .or(handlers.meta_handler.as_ref())
1311                    .ok_or_else(|| internal!("tried to use an ended Conversation"))?;
1312                // We should always have a precise HopLocation here so this should never fails but
1313                // in case we have a ::JointPoint, we'll notice.
1314                let hop = handler.expected_hop().hop_num().ok_or(bad_api_usage!(
1315                    "MsgHandler doesn't have a precise HopLocation"
1316                ))?;
1317                Ok::<_, crate::Error>(SendRelayCell {
1318                    hop: Some(hop),
1319                    early: false,
1320                    cell: msg,
1321                })
1322            })
1323            .transpose()?;
1324
1325        if let Some(handler) = handler {
1326            self.cell_handlers.set_meta_handler(handler)?;
1327        }
1328
1329        Ok(msg)
1330    }
1331
1332    /// Handle a shutdown request.
1333    fn handle_shutdown(&self) -> StdResult<Option<RunOnceCmdInner>, ReactorError> {
1334        trace!(
1335            tunnel_id = %self.tunnel_id,
1336            "reactor shutdown due to explicit request",
1337        );
1338
1339        Err(ReactorError::Shutdown)
1340    }
1341
1342    /// Handle a request to shutdown the reactor and return the only [`Circuit`] in this tunnel.
1343    ///
1344    /// Returns an error over the `answer` channel if the reactor has no circuits,
1345    /// or more than one circuit. The reactor will shut down regardless.
1346    #[cfg(feature = "conflux")]
1347    fn handle_shutdown_and_return_circuit(
1348        &mut self,
1349        answer: oneshot::Sender<StdResult<Circuit, Bug>>,
1350    ) -> StdResult<(), ReactorError> {
1351        // Don't care if the receiver goes away
1352        let _ = answer.send(self.circuits.take_single_leg());
1353        self.handle_shutdown().map(|_| ())
1354    }
1355
1356    /// Resolves a [`TargetHop`] to a [`HopLocation`].
1357    ///
1358    /// After resolving a `TargetHop::LastHop`,
1359    /// the `HopLocation` can become stale if a single-path circuit is later extended or truncated.
1360    /// This means that the `HopLocation` can become stale from one reactor iteration to the next.
1361    ///
1362    /// It's generally okay to hold on to a (possibly stale) `HopLocation`
1363    /// if you need a fixed hop position in the tunnel.
1364    /// For example if we open a stream to `TargetHop::LastHop`,
1365    /// we would want to store the stream position as a `HopLocation` and not a `TargetHop::LastHop`
1366    /// as we don't want the stream position to change as the tunnel is extended or truncated.
1367    ///
1368    /// Returns [`NoHopsBuiltError`] if trying to resolve `TargetHop::LastHop`
1369    /// and the tunnel has no hops
1370    /// (either has no legs, or has legs which contain no hops).
1371    fn resolve_target_hop(&self, hop: TargetHop) -> StdResult<HopLocation, NoHopsBuiltError> {
1372        match hop {
1373            TargetHop::Hop(hop) => Ok(hop),
1374            TargetHop::LastHop => {
1375                if let Ok(leg) = self.circuits.single_leg() {
1376                    let leg_id = leg.unique_id();
1377                    // single-path tunnel
1378                    let hop = leg.last_hop_num().ok_or(NoHopsBuiltError)?;
1379                    Ok(HopLocation::Hop((leg_id, hop)))
1380                } else if !self.circuits.is_empty() {
1381                    // multi-path tunnel
1382                    Ok(HopLocation::JoinPoint)
1383                } else {
1384                    // no legs
1385                    Err(NoHopsBuiltError)
1386                }
1387            }
1388        }
1389    }
1390
1391    /// Resolves a [`HopLocation`] to a [`UniqId`] and [`HopNum`].
1392    ///
1393    /// After resolving a `HopLocation::JoinPoint`,
1394    /// the [`UniqId`] and [`HopNum`] can become stale if the primary leg changes.
1395    ///
1396    /// You should try to only resolve to a specific [`UniqId`] and [`HopNum`] immediately before you
1397    /// need them,
1398    /// and you should not hold on to the resolved [`UniqId`] and [`HopNum`] between reactor
1399    /// iterations as the primary leg may change from one iteration to the next.
1400    ///
1401    /// Returns [`NoJoinPointError`] if trying to resolve `HopLocation::JoinPoint`
1402    /// but it does not have a join point.
1403    #[instrument(level = "trace", skip_all)]
1404    fn resolve_hop_location(
1405        &self,
1406        hop: HopLocation,
1407    ) -> StdResult<(UniqId, HopNum), NoJoinPointError> {
1408        match hop {
1409            HopLocation::Hop((leg_id, hop_num)) => Ok((leg_id, hop_num)),
1410            HopLocation::JoinPoint => {
1411                if let Some((leg_id, hop_num)) = self.circuits.primary_join_point() {
1412                    Ok((leg_id, hop_num))
1413                } else {
1414                    // Attempted to get the join point of a non-multipath tunnel.
1415                    Err(NoJoinPointError)
1416                }
1417            }
1418        }
1419    }
1420
1421    /// Resolve a [`TargetHop`] directly into a [`UniqId`] and [`HopNum`].
1422    ///
1423    /// This is a helper function that basically calls both resolve_target_hop and
1424    /// resolve_hop_location back to back.
1425    ///
1426    /// It returns None on failure to resolve meaning that if you want more detailed error on why
1427    /// it failed, explicitly use the resolve_hop_location() and resolve_target_hop() functions.
1428    pub(crate) fn target_hop_to_hopnum_id(&self, hop: TargetHop) -> Option<(UniqId, HopNum)> {
1429        self.resolve_target_hop(hop)
1430            .ok()
1431            .and_then(|resolved| self.resolve_hop_location(resolved).ok())
1432    }
1433
1434    /// Install or remove a padder at a given hop.
1435    #[cfg(feature = "circ-padding-manual")]
1436    fn set_padding_at_hop(
1437        &self,
1438        hop: HopLocation,
1439        padder: Option<super::circuit::padding::CircuitPadder>,
1440    ) -> Result<()> {
1441        let HopLocation::Hop((leg_id, hop_num)) = hop else {
1442            return Err(bad_api_usage!("Padding to the join point is not supported.").into());
1443        };
1444        let circ = self.circuits.leg(leg_id).ok_or(Error::NoSuchHop)?;
1445        circ.set_padding_at_hop(hop_num, padder)?;
1446        Ok(())
1447    }
1448
1449    /// Does congestion control use stream SENDMEs for the given hop?
1450    ///
1451    /// Returns `None` if either the `leg` or `hop` don't exist.
1452    fn uses_stream_sendme(&self, leg: UniqId, hop: HopNum) -> Option<bool> {
1453        self.circuits.uses_stream_sendme(leg, hop)
1454    }
1455
1456    /// Handle a request to link some extra circuits in the reactor's conflux set.
1457    ///
1458    /// The circuits are validated, and if they do not have the same length,
1459    /// or if they do not all have the same last hop, an error is returned on
1460    /// the `answer` channel, and the conflux handshake is *not* initiated.
1461    ///
1462    /// If validation succeeds, the circuits are added to this reactor's conflux set,
1463    /// and the conflux handshake is initiated (by sending a LINK cell on each leg).
1464    ///
1465    /// NOTE: this blocks the reactor main loop until all the cells are sent.
1466    #[cfg(feature = "conflux")]
1467    #[instrument(level = "trace", skip_all)]
1468    async fn handle_link_circuits(
1469        &mut self,
1470        circuits: Vec<Circuit>,
1471        answer: ConfluxLinkResultChannel,
1472    ) -> StdResult<(), ReactorError> {
1473        use tor_error::warn_report;
1474
1475        if self.conflux_hs_ctx.is_some() {
1476            let err = internal!("conflux linking already in progress");
1477            send_conflux_outcome(answer, Err(err.into()))?;
1478
1479            return Ok(());
1480        }
1481
1482        let unlinked_legs = self.circuits.num_unlinked();
1483
1484        // We need to send the LINK cell on each of the new circuits
1485        // and on each of the existing, unlinked legs from self.circuits.
1486        //
1487        // In reality, there can only be one such circuit
1488        // (the "initial" one from the previously single-path tunnel),
1489        // because any circuits that to complete the conflux handshake
1490        // get removed from the set.
1491        let num_legs = circuits.len() + unlinked_legs;
1492
1493        // Note: add_legs validates `circuits`
1494        let res = async {
1495            self.circuits.add_legs(circuits, &self.runtime)?;
1496            self.circuits.link_circuits(&self.runtime).await
1497        }
1498        .await;
1499
1500        if let Err(e) = res {
1501            warn_report!(e, "Failed to link conflux circuits");
1502
1503            send_conflux_outcome(answer, Err(e))?;
1504        } else {
1505            // Save the channel, to notify the user of completion.
1506            self.conflux_hs_ctx = Some(ConfluxHandshakeCtx {
1507                answer,
1508                num_legs,
1509                results: Default::default(),
1510            });
1511        }
1512
1513        Ok(())
1514    }
1515}
1516
1517/// Notify the conflux handshake initiator of the handshake outcome.
1518///
1519/// Returns an error if the initiator has done away.
1520#[cfg(feature = "conflux")]
1521fn send_conflux_outcome(
1522    tx: ConfluxLinkResultChannel,
1523    res: Result<ConfluxHandshakeResult>,
1524) -> StdResult<(), ReactorError> {
1525    if tx.send(res).is_err() {
1526        tracing::warn!("conflux initiator went away before handshake completed?");
1527        return Err(ReactorError::Shutdown);
1528    }
1529
1530    Ok(())
1531}
1532
1533/// The tunnel does not have any hops.
1534#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
1535#[non_exhaustive]
1536#[error("no hops have been built for this tunnel")]
1537pub(crate) struct NoHopsBuiltError;
1538
1539/// The tunnel does not have a join point.
1540#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
1541#[non_exhaustive]
1542#[error("the tunnel does not have a join point")]
1543pub(crate) struct NoJoinPointError;
1544
1545impl CellHandlers {
1546    /// Try to install a given meta-cell handler to receive any unusual cells on
1547    /// this circuit, along with a result channel to notify on completion.
1548    fn set_meta_handler(&mut self, handler: Box<dyn MetaCellHandler + Send>) -> Result<()> {
1549        if self.meta_handler.is_none() {
1550            self.meta_handler = Some(handler);
1551            Ok(())
1552        } else {
1553            Err(Error::from(internal!(
1554                "Tried to install a meta-cell handler before the old one was gone."
1555            )))
1556        }
1557    }
1558
1559    /// Try to install a given cell handler on this circuit.
1560    #[cfg(feature = "hs-service")]
1561    fn set_incoming_stream_req_handler(
1562        &mut self,
1563        handler: IncomingStreamRequestHandler,
1564    ) -> Result<()> {
1565        if self.incoming_stream_req_handler.is_none() {
1566            self.incoming_stream_req_handler = Some(handler);
1567            Ok(())
1568        } else {
1569            Err(Error::from(internal!(
1570                "Tried to install a BEGIN cell handler before the old one was gone."
1571            )))
1572        }
1573    }
1574}
1575
1576#[cfg(test)]
1577mod test {
1578    // Tested in [`crate::client::circuit::test`].
1579}