Skip to main content

tor_proto/client/
reactor.rs

1//! Code to handle incoming cells on a circuit.
2//!
3//! ## On message validation
4//!
5//! There are three steps for validating an incoming message on a stream:
6//!
7//! 1. Is the message contextually appropriate? (e.g., no more than one
8//!    `CONNECTED` message per stream.) This is handled by calling
9//!    [`CmdChecker::check_msg`](crate::stream::cmdcheck::CmdChecker::check_msg).
10//! 2. Does the message comply with flow-control rules? (e.g., no more SENDMEs
11//!    than we've sent data for.) This is handled within the reactor by the
12//!    `StreamFlowCtrl`. For half-closed streams which don't send stream
13//!    SENDMEs, an additional receive-window check is performed in the
14//!    `halfstream` module.
15//! 3. Does the message have an acceptable command type, and is the message
16//!    well-formed? For open streams, the streams themselves handle this check.
17//!    For half-closed streams, the reactor handles it by calling
18//!    `consume_checked_msg()`.
19
20pub(crate) mod circuit;
21mod conflux;
22mod control;
23
24use crate::circuit::circhop::SendRelayCell;
25use crate::circuit::{CircuitRxReceiver, UniqId};
26use crate::client::circuit::ClientCircChanMsg;
27use crate::client::circuit::padding::{PaddingController, PaddingEvent, PaddingEventStream};
28use crate::client::{HopLocation, TargetHop};
29use crate::crypto::cell::HopNum;
30use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
31use crate::memquota::CircuitAccount;
32use crate::stream::CloseStreamBehavior;
33use crate::streammap;
34use crate::tunnel::{TunnelId, TunnelScopedCircId};
35use crate::util::err::ReactorError;
36use crate::util::skew::ClockSkew;
37use crate::util::timeout::TimeoutEstimator;
38use crate::{Error, Result};
39use circuit::Circuit;
40use conflux::ConfluxSet;
41use control::ControlHandler;
42use std::cmp::Ordering;
43use std::collections::BinaryHeap;
44use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
45use tor_cell::relaycell::msg::Sendme;
46use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId, UnparsedRelayMsg};
47use tor_error::{Bug, bad_api_usage, debug_report, internal, into_bad_api_usage};
48use tor_rtcompat::{DynTimeProvider, SleepProvider};
49
50use cfg_if::cfg_if;
51use futures::StreamExt;
52use futures::channel::mpsc;
53use futures::{FutureExt as _, select_biased};
54use oneshot_fused_workaround as oneshot;
55
56use std::result::Result as StdResult;
57use std::sync::Arc;
58use std::time::Duration;
59
60use crate::channel::Channel;
61use crate::conflux::msghandler::RemoveLegReason;
62use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey};
63use circuit::CircuitCmd;
64use derive_more::From;
65use smallvec::smallvec;
66use tor_cell::chancell::CircId;
67use tor_llcrypto::pk;
68use tracing::{debug, info, instrument, trace, warn};
69
70use super::circuit::{MutableState, TunnelMutableState};
71use crate::circuit::reactor::ReactorResultChannel;
72
73#[cfg(feature = "hs-service")]
74use crate::stream::incoming::IncomingStreamRequestHandler;
75
76#[cfg(feature = "conflux")]
77use {
78    crate::conflux::msghandler::{ConfluxCmd, OooRelayMsg},
79    crate::util::err::ConfluxHandshakeError,
80};
81
82pub(super) use control::{CtrlCmd, CtrlMsg, FlowCtrlMsg};
83
84/// Contains a list of conflux handshake results.
85#[cfg(feature = "conflux")]
86pub(super) type ConfluxHandshakeResult = Vec<StdResult<(), ConfluxHandshakeError>>;
87
88/// The type of oneshot channel used to inform reactor users of the outcome
89/// of a client-side conflux handshake.
90///
91/// Contains a list of handshake results, one for each circuit that we were asked
92/// to link in the tunnel.
93#[cfg(feature = "conflux")]
94pub(super) type ConfluxLinkResultChannel = ReactorResultChannel<ConfluxHandshakeResult>;
95
96/// A handshake type, to be used when creating circuit hops.
97#[derive(Clone, Debug)]
98pub(crate) enum CircuitHandshake {
99    /// Use the CREATE_FAST handshake.
100    CreateFast,
101    /// Use the ntor handshake.
102    Ntor {
103        /// The public key of the relay.
104        public_key: NtorPublicKey,
105        /// The Ed25519 identity of the relay, which is verified against the
106        /// identity held in the circuit's channel.
107        ed_identity: pk::ed25519::Ed25519Identity,
108    },
109    /// Use the ntor-v3 handshake.
110    NtorV3 {
111        /// The public key of the relay.
112        public_key: NtorV3PublicKey,
113    },
114}
115
116// TODO: the RunOnceCmd/RunOnceCmdInner/CircuitCmd/CircuitEvent enum
117// proliferation is a bit bothersome, but unavoidable with the current design.
118//
119// We should consider getting rid of some of these enums (if possible),
120// and coming up with more intuitive names.
121
122/// One or more [`RunOnceCmdInner`] to run inside [`Reactor::run_once`].
123#[derive(From, Debug)]
124#[allow(clippy::large_enum_variant)] // TODO #2003: resolve this
125enum RunOnceCmd {
126    /// Run a single `RunOnceCmdInner` command.
127    Single(RunOnceCmdInner),
128    /// Run multiple `RunOnceCmdInner` commands.
129    //
130    // Note: this whole enum *could* be replaced with Vec<RunOnceCmdInner>,
131    // but most of the time we're only going to have *one* RunOnceCmdInner
132    // to run per run_once() loop. The enum enables us avoid the extra heap
133    // allocation for the `RunOnceCmd::Single` case.
134    Multiple(Vec<RunOnceCmdInner>),
135}
136
137/// Instructions for running something in the reactor loop.
138///
139/// Run at the end of [`Reactor::run_once`].
140//
141// TODO: many of the variants of this enum have an identical CtrlMsg counterpart.
142// We should consider making each variant a tuple variant and deduplicating the fields.
143#[derive(educe::Educe)]
144#[educe(Debug)]
145enum RunOnceCmdInner {
146    /// Send a RELAY cell.
147    Send {
148        /// The leg the cell should be sent on.
149        leg: UniqId,
150        /// The cell to send.
151        cell: SendRelayCell,
152        /// A channel for sending completion notifications.
153        done: Option<ReactorResultChannel<()>>,
154    },
155    /// Send a given control message on this circuit, and install a control-message handler to
156    /// receive responses.
157    #[cfg(feature = "send-control-msg")]
158    SendMsgAndInstallHandler {
159        /// The message to send, if any
160        msg: Option<AnyRelayMsgOuter>,
161        /// A message handler to install.
162        ///
163        /// If this is `None`, there must already be a message handler installed
164        #[educe(Debug(ignore))]
165        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
166        /// A sender that we use to tell the caller that the message was sent
167        /// and the handler installed.
168        done: oneshot::Sender<Result<()>>,
169    },
170    /// Handle a SENDME message.
171    HandleSendMe {
172        /// The leg the SENDME was received on.
173        leg: UniqId,
174        /// The hop number.
175        hop: HopNum,
176        /// The SENDME message to handle.
177        sendme: Sendme,
178    },
179    /// Begin a stream with the provided hop in this circuit.
180    ///
181    /// Uses the provided stream ID, and sends the provided message to that hop.
182    BeginStream {
183        /// The cell to send.
184        cell: Result<(SendRelayCell, StreamId)>,
185        /// The location of the hop on the tunnel. We don't use this (and `Circuit`s shouldn't need
186        /// to worry about legs anyways), but need it so that we can pass it back in `done` to the
187        /// caller.
188        hop: HopLocation,
189        /// The circuit leg to begin the stream on.
190        leg: UniqId,
191        /// Oneshot channel to notify on completion, with the allocated stream ID.
192        done: ReactorResultChannel<(StreamId, HopLocation, RelayCellFormat)>,
193    },
194    /// Consider sending an XON message with the given `rate`.
195    MaybeSendXon {
196        /// The drain rate to advertise in the XON message.
197        rate: XonKbpsEwma,
198        /// The ID of the stream to send the message on.
199        stream_id: StreamId,
200        /// The location of the hop on the tunnel.
201        hop: HopLocation,
202    },
203    /// Close the specified stream.
204    CloseStream {
205        /// The hop number.
206        hop: HopLocation,
207        /// The ID of the stream to close.
208        sid: StreamId,
209        /// The stream-closing behavior.
210        behav: CloseStreamBehavior,
211        /// The reason for closing the stream.
212        reason: streammap::TerminateReason,
213        /// A channel for sending completion notifications.
214        done: Option<ReactorResultChannel<()>>,
215    },
216    /// Get the clock skew claimed by the first hop of the circuit.
217    FirstHopClockSkew {
218        /// Oneshot channel to return the clock skew.
219        answer: oneshot::Sender<StdResult<ClockSkew, Bug>>,
220    },
221    /// Remove a circuit leg from the conflux set.
222    RemoveLeg {
223        /// The circuit leg to remove.
224        leg: UniqId,
225        /// The reason for removal.
226        ///
227        /// This is only used for conflux circuits that get removed
228        /// before the conflux handshake is complete.
229        ///
230        /// The [`RemoveLegReason`] is mapped by the reactor to a
231        /// [`ConfluxHandshakeError`] that is sent to the initiator of the
232        /// handshake to indicate the reason the handshake failed.
233        reason: RemoveLegReason,
234    },
235    /// A circuit has completed the conflux handshake,
236    /// and wants to send the specified cell.
237    ///
238    /// This is similar to [`RunOnceCmdInner::Send`],
239    /// but needs to remain a separate variant,
240    /// because in addition to instructing the reactor to send a cell,
241    /// it also notifies it that the conflux handshake is complete on the specified `leg`.
242    /// This enables the reactor to save the handshake result (`Ok(())`),
243    /// and, if there are no other legs still in the handshake phase,
244    /// send the result to the handshake initiator.
245    #[cfg(feature = "conflux")]
246    ConfluxHandshakeComplete {
247        /// The circuit leg that has completed the handshake,
248        /// This is the leg the cell should be sent on.
249        leg: UniqId,
250        /// The cell to send.
251        cell: SendRelayCell,
252    },
253    /// Send a LINK cell on each of the unlinked circuit legs in the conflux set of this reactor.
254    #[cfg(feature = "conflux")]
255    Link {
256        /// The circuits to link into the tunnel
257        #[educe(Debug(ignore))]
258        circuits: Vec<Circuit>,
259        /// Oneshot channel for notifying of conflux handshake completion.
260        answer: ConfluxLinkResultChannel,
261    },
262    /// Enqueue an out-of-order cell in ooo_msg.
263    #[cfg(feature = "conflux")]
264    Enqueue {
265        /// The leg the entry originated from.
266        leg: UniqId,
267        /// The out-of-order message.
268        msg: OooRelayMsg,
269    },
270    /// Take a padding-related event on a circuit leg.
271    #[cfg(feature = "circ-padding")]
272    PaddingAction {
273        /// The leg to event on.
274        leg: UniqId,
275        /// The event to take.
276        padding_event: PaddingEvent,
277    },
278    /// Perform a clean shutdown on this circuit.
279    CleanShutdown,
280}
281
282impl RunOnceCmdInner {
283    /// Create a [`RunOnceCmdInner`] out of a [`CircuitCmd`] and [`UniqId`].
284    fn from_circuit_cmd(leg: UniqId, cmd: CircuitCmd) -> Self {
285        match cmd {
286            CircuitCmd::Send(cell) => Self::Send {
287                leg,
288                cell,
289                done: None,
290            },
291            CircuitCmd::HandleSendMe { hop, sendme } => Self::HandleSendMe { leg, hop, sendme },
292            CircuitCmd::CloseStream {
293                hop,
294                sid,
295                behav,
296                reason,
297            } => Self::CloseStream {
298                hop: HopLocation::Hop((leg, hop)),
299                sid,
300                behav,
301                reason,
302                done: None,
303            },
304            #[cfg(feature = "conflux")]
305            CircuitCmd::Conflux(ConfluxCmd::RemoveLeg(reason)) => Self::RemoveLeg { leg, reason },
306            #[cfg(feature = "conflux")]
307            CircuitCmd::Conflux(ConfluxCmd::HandshakeComplete { hop, early, cell }) => {
308                let cell = SendRelayCell {
309                    hop: Some(hop),
310                    early,
311                    cell,
312                };
313                Self::ConfluxHandshakeComplete { leg, cell }
314            }
315            #[cfg(feature = "conflux")]
316            CircuitCmd::Enqueue(msg) => Self::Enqueue { leg, msg },
317            CircuitCmd::CleanShutdown => Self::CleanShutdown,
318        }
319    }
320}
321
322/// A command to execute at the end of [`Reactor::run_once`].
323#[derive(From, Debug)]
324#[allow(clippy::large_enum_variant)] // TODO #2003: should we resolve this?
325enum CircuitEvent {
326    /// Run a single `CircuitCmd` command.
327    RunCmd {
328        /// The unique identifier of the circuit leg to run the command on
329        leg: UniqId,
330        /// The command to run.
331        cmd: CircuitCmd,
332    },
333    /// Handle a control message
334    HandleControl(CtrlMsg),
335    /// Handle an input message.
336    HandleCell {
337        /// The unique identifier of the circuit leg the message was received on.
338        leg: UniqId,
339        /// The message to handle.
340        cell: ClientCircChanMsg,
341    },
342    /// Remove the specified circuit leg from the conflux set.
343    ///
344    /// Returned whenever a single circuit leg needs to be be removed
345    /// from the reactor's conflux set, without necessarily tearing down
346    /// the whole set or shutting down the reactor.
347    ///
348    /// Note: this event *can* cause the reactor to shut down
349    /// (and the conflux set to be closed).
350    ///
351    /// See the [`ConfluxSet::remove`] docs for more on the exact behavior of this command.
352    RemoveLeg {
353        /// The leg to remove.
354        leg: UniqId,
355        /// The reason for removal.
356        ///
357        /// This is only used for conflux circuits that get removed
358        /// before the conflux handshake is complete.
359        ///
360        /// The [`RemoveLegReason`] is mapped by the reactor to a
361        /// [`ConfluxHandshakeError`] that is sent to the initiator of the
362        /// handshake to indicate the reason the handshake failed.
363        reason: RemoveLegReason,
364    },
365    /// Take some event (blocking or unblocking a circuit, or sending padding)
366    /// based on the circuit padding backend code.
367    PaddingAction {
368        /// The leg on which to take the padding event .
369        leg: UniqId,
370        /// The event to take.
371        padding_event: PaddingEvent,
372    },
373    /// Protocol violation. This leads for now to the close of the circuit reactor. The
374    /// error causing the violation is set in err.
375    ProtoViolation {
376        /// The error that causes this protocol violation.
377        err: crate::Error,
378    },
379}
380
381impl CircuitEvent {
382    /// Return the ordering with which we should handle this event
383    /// within a list of events returned by a single call to next_circ_event().
384    ///
385    /// NOTE: Please do not make this any more complicated:
386    /// It is a consequence of a kludge that we need this sorting at all.
387    /// Assuming that eventually, we switch away from the current
388    /// poll-oriented `next_circ_event` design,
389    /// we may be able to get rid of this entirely.
390    fn order_within_batch(&self) -> u8 {
391        use CircuitEvent as CA;
392        use PaddingEvent as PE;
393        // This immediate state MUST NOT be used for events emitting cells. At the moment, it is
394        // only used by the protocol violation event which leads to a shutdown of the reactor.
395        const IMMEDIATE: u8 = 0;
396        const EARLY: u8 = 1;
397        const NORMAL: u8 = 2;
398        const LATE: u8 = 3;
399
400        // We use this ordering to move any "StartBlocking" to the _end_ of a batch and
401        // "StopBlocking" to the start.
402        //
403        // This way, we can be sure that we will handle any "send data" operations
404        // (and tell the Padder about them) _before_  we tell the Padder
405        // that we have blocked the circuit.
406        //
407        // This keeps things a bit more logical.
408        match self {
409            CA::RunCmd { .. } => NORMAL,
410            CA::HandleControl(..) => NORMAL,
411            CA::HandleCell { .. } => NORMAL,
412            CA::RemoveLeg { .. } => NORMAL,
413            #[cfg(feature = "circ-padding")]
414            CA::PaddingAction { padding_event, .. } => match padding_event {
415                PE::StopBlocking => EARLY,
416                PE::SendPadding(..) => NORMAL,
417                PE::StartBlocking(..) => LATE,
418            },
419            #[cfg(not(feature = "circ-padding"))]
420            CA::PaddingAction { .. } => NORMAL,
421            CA::ProtoViolation { .. } => IMMEDIATE,
422        }
423    }
424}
425
426/// An object that's waiting for a meta cell (one not associated with a stream) in order to make
427/// progress.
428///
429/// # Background
430///
431/// The `Reactor` can't have async functions that send and receive cells, because its job is to
432/// send and receive cells: if one of its functions tried to do that, it would just hang forever.
433///
434/// To get around this problem, the reactor can send some cells, and then make one of these
435/// `MetaCellHandler` objects, which will be run when the reply arrives.
436pub(crate) trait MetaCellHandler: Send {
437    /// The hop we're expecting the message to come from. This is compared against the hop
438    /// from which we actually receive messages, and an error is thrown if the two don't match.
439    fn expected_hop(&self) -> HopLocation;
440    /// Called when the message we were waiting for arrives.
441    ///
442    /// Gets a copy of the `Reactor` in order to do anything it likes there.
443    ///
444    /// If this function returns an error, the reactor will shut down.
445    fn handle_msg(
446        &mut self,
447        msg: UnparsedRelayMsg,
448        reactor: &mut Circuit,
449    ) -> Result<MetaCellDisposition>;
450}
451
452/// A possible successful outcome of giving a message to a [`MsgHandler`](super::msghandler::MsgHandler).
453#[derive(Debug, Clone)]
454#[cfg_attr(feature = "send-control-msg", visibility::make(pub))]
455#[non_exhaustive]
456pub(crate) enum MetaCellDisposition {
457    /// The message was consumed; the handler should remain installed.
458    #[cfg(feature = "send-control-msg")]
459    Consumed,
460    /// The message was consumed; the handler should be uninstalled.
461    ConversationFinished,
462    /// The message was consumed; the circuit should be closed.
463    #[cfg(feature = "send-control-msg")]
464    CloseCirc,
465    // TODO: Eventually we might want the ability to have multiple handlers
466    // installed, and to let them say "not for me, maybe for somebody else?".
467    // But right now we don't need that.
468}
469
470/// Unwrap the specified [`Option`], returning a [`ReactorError::Shutdown`] if it is `None`.
471///
472/// This is a macro instead of a function to work around borrowck errors
473/// in the select! from run_once().
474macro_rules! unwrap_or_shutdown {
475    ($self:expr, $res:expr, $reason:expr) => {{
476        match $res {
477            None => {
478                trace!(
479                    tunnel_id = %$self.tunnel_id,
480                    reason = %$reason,
481                    "reactor shutdown"
482                );
483                Err(ReactorError::Shutdown)
484            }
485            Some(v) => Ok(v),
486        }
487    }};
488}
489
490/// Object to handle incoming cells and background tasks on a circuit
491///
492/// This type is returned when you finish a circuit; you need to spawn a
493/// new task that calls `run()` on it.
494#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
495pub struct Reactor {
496    /// Receiver for control messages for this reactor, sent by `ClientCirc` objects.
497    ///
498    /// This channel is polled in [`Reactor::run_once`], but only if the `chan_sender` sink
499    /// is ready to accept cells.
500    control: mpsc::UnboundedReceiver<CtrlMsg>,
501    /// Receiver for command messages for this reactor, sent by `ClientCirc` objects.
502    ///
503    /// This channel is polled in [`Reactor::run_once`].
504    ///
505    /// NOTE: this is a separate channel from `control`, because some messages
506    /// have higher priority and need to be handled even if the `chan_sender` is not
507    /// ready (whereas `control` messages are not read until the `chan_sender` sink
508    /// is ready to accept cells).
509    command: mpsc::UnboundedReceiver<CtrlCmd>,
510    /// A oneshot sender that is used to alert other tasks when this reactor is
511    /// finally dropped.
512    ///
513    /// It is a sender for Void because we never actually want to send anything here;
514    /// we only want to generate canceled events.
515    #[allow(dead_code)] // the only purpose of this field is to be dropped.
516    reactor_closed_tx: oneshot::Sender<void::Void>,
517    /// A set of circuits that form a tunnel.
518    ///
519    /// Contains 1 or more circuits.
520    ///
521    /// Circuits may be added to this set throughout the lifetime of the reactor.
522    ///
523    /// Sometimes, the reactor will remove circuits from this set,
524    /// for example if the `LINKED` message takes too long to arrive,
525    /// or if congestion control negotiation fails.
526    /// The reactor will continue running with the remaining circuits.
527    /// It will shut down if *all* the circuits are removed.
528    ///
529    // TODO(conflux): document all the reasons why the reactor might
530    // chose to tear down a circuit or tunnel (timeouts, protocol violations, etc.)
531    circuits: ConfluxSet,
532    /// An identifier for logging about this tunnel reactor.
533    tunnel_id: TunnelId,
534    /// Handlers, shared with `Circuit`.
535    cell_handlers: CellHandlers,
536    /// The time provider, used for conflux handshake timeouts.
537    runtime: DynTimeProvider,
538    /// The conflux handshake context, if there is an on-going handshake.
539    ///
540    /// Set to `None` if this is a single-path tunnel,
541    /// or if none of the circuit legs from our conflux set
542    /// are currently in the conflux handshake phase.
543    #[cfg(feature = "conflux")]
544    conflux_hs_ctx: Option<ConfluxHandshakeCtx>,
545    /// A min-heap buffering all the out-of-order messages received so far.
546    ///
547    /// TODO(conflux): this becomes a DoS vector unless we impose a limit
548    /// on its size. We should make this participate in the memquota memory
549    /// tracking system, somehow.
550    #[cfg(feature = "conflux")]
551    ooo_msgs: BinaryHeap<ConfluxHeapEntry>,
552}
553
554/// The context for an on-going conflux handshake.
555#[cfg(feature = "conflux")]
556struct ConfluxHandshakeCtx {
557    /// A channel for notifying the caller of the outcome of a CONFLUX_LINK request.
558    answer: ConfluxLinkResultChannel,
559    /// The number of legs that are currently doing the handshake.
560    num_legs: usize,
561    /// The handshake results we have collected so far.
562    results: ConfluxHandshakeResult,
563}
564
565/// An out-of-order message buffered in [`Reactor::ooo_msgs`].
566#[derive(Debug)]
567#[cfg(feature = "conflux")]
568struct ConfluxHeapEntry {
569    /// The leg id this message came from.
570    leg_id: UniqId,
571    /// The out of order message
572    msg: OooRelayMsg,
573}
574
575#[cfg(feature = "conflux")]
576impl Ord for ConfluxHeapEntry {
577    fn cmp(&self, other: &Self) -> Ordering {
578        self.msg.cmp(&other.msg)
579    }
580}
581
582#[cfg(feature = "conflux")]
583impl PartialOrd for ConfluxHeapEntry {
584    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
585        Some(self.cmp(other))
586    }
587}
588
589#[cfg(feature = "conflux")]
590impl PartialEq for ConfluxHeapEntry {
591    fn eq(&self, other: &Self) -> bool {
592        self.msg == other.msg
593    }
594}
595
596#[cfg(feature = "conflux")]
597impl Eq for ConfluxHeapEntry {}
598
599/// Cell handlers, shared between the Reactor and its underlying `Circuit`s.
600struct CellHandlers {
601    /// A handler for a meta cell, together with a result channel to notify on completion.
602    ///
603    /// NOTE(prop349): this is part of Arti's "Base Circuit Hop Handler".
604    ///
605    /// Upon sending an EXTEND cell, the [`ControlHandler`] sets this handler
606    /// to [`CircuitExtender`](circuit::extender::CircuitExtender).
607    /// The handler is then used in [`Circuit::handle_meta_cell`] for handling
608    /// all the meta cells received on the circuit that are not SENDMEs or TRUNCATE
609    /// (which are handled separately) or conflux cells
610    /// (which are handled by the conflux handlers).
611    ///
612    /// The handler is uninstalled after the receipt of the EXTENDED cell,
613    /// so any subsequent EXTENDED cells will cause the circuit to be torn down.
614    meta_handler: Option<Box<dyn MetaCellHandler + Send>>,
615    /// A handler for incoming stream requests.
616    #[cfg(feature = "hs-service")]
617    incoming_stream_req_handler: Option<IncomingStreamRequestHandler>,
618}
619
620impl Reactor {
621    /// Create a new circuit reactor.
622    ///
623    /// The reactor will send outbound messages on `channel`, receive incoming
624    /// messages on `input`, and identify this circuit by the channel-local
625    /// [`CircId`] provided.
626    ///
627    /// The internal unique identifier for this circuit will be `unique_id`.
628    #[allow(clippy::type_complexity, clippy::too_many_arguments)] // TODO
629    pub(super) fn new(
630        channel: Arc<Channel>,
631        channel_id: CircId,
632        unique_id: UniqId,
633        input: CircuitRxReceiver,
634        runtime: DynTimeProvider,
635        memquota: CircuitAccount,
636        padding_ctrl: PaddingController,
637        padding_stream: PaddingEventStream,
638        timeouts: Arc<dyn TimeoutEstimator + Send>,
639    ) -> (
640        Self,
641        mpsc::UnboundedSender<CtrlMsg>,
642        mpsc::UnboundedSender<CtrlCmd>,
643        oneshot::Receiver<void::Void>,
644        Arc<TunnelMutableState>,
645    ) {
646        let tunnel_id = TunnelId::next();
647        let (control_tx, control_rx) = mpsc::unbounded();
648        let (command_tx, command_rx) = mpsc::unbounded();
649        let mutable = Arc::new(MutableState::default());
650
651        let (reactor_closed_tx, reactor_closed_rx) = oneshot::channel();
652
653        let cell_handlers = CellHandlers {
654            meta_handler: None,
655            #[cfg(feature = "hs-service")]
656            incoming_stream_req_handler: None,
657        };
658
659        let unique_id = TunnelScopedCircId::new(tunnel_id, unique_id);
660        let circuit_leg = Circuit::new(
661            runtime.clone(),
662            channel,
663            channel_id,
664            unique_id,
665            input,
666            memquota,
667            Arc::clone(&mutable),
668            padding_ctrl,
669            padding_stream,
670            timeouts,
671        );
672
673        let (circuits, mutable) = ConfluxSet::new(tunnel_id, circuit_leg);
674
675        let reactor = Reactor {
676            circuits,
677            control: control_rx,
678            command: command_rx,
679            reactor_closed_tx,
680            tunnel_id,
681            cell_handlers,
682            runtime,
683            #[cfg(feature = "conflux")]
684            conflux_hs_ctx: None,
685            #[cfg(feature = "conflux")]
686            ooo_msgs: Default::default(),
687        };
688
689        (reactor, control_tx, command_tx, reactor_closed_rx, mutable)
690    }
691
692    /// Launch the reactor, and run until the circuit closes or we
693    /// encounter an error.
694    ///
695    /// Once this method returns, the circuit is dead and cannot be
696    /// used again.
697    #[instrument(level = "trace", skip_all)]
698    pub async fn run(mut self) -> Result<()> {
699        trace!(tunnel_id = %self.tunnel_id, "Running tunnel reactor");
700        let result: Result<()> = loop {
701            match self.run_once().await {
702                Ok(()) => (),
703                Err(ReactorError::Shutdown) => break Ok(()),
704                Err(ReactorError::Err(e)) => break Err(e),
705            }
706        };
707
708        // Log that the reactor stopped, possibly with the associated error as a report.
709        // May log at a higher level depending on the error kind.
710        const MSG: &str = "Tunnel reactor stopped";
711        match &result {
712            Ok(()) => trace!(tunnel_id = %self.tunnel_id, "{MSG}"),
713            Err(e) => debug_report!(e, tunnel_id = %self.tunnel_id, "{MSG}"),
714        }
715
716        result
717    }
718
719    /// Helper for run: doesn't mark the circuit closed on finish.  Only
720    /// processes one cell or control message.
721    #[instrument(level = "trace", skip_all)]
722    async fn run_once(&mut self) -> StdResult<(), ReactorError> {
723        // If all the circuits are closed, shut down the reactor
724        if self.circuits.is_empty() {
725            trace!(
726                tunnel_id = %self.tunnel_id,
727                "Tunnel reactor shutting down: all circuits have closed",
728            );
729
730            return Err(ReactorError::Shutdown);
731        }
732
733        // If this is a single path circuit, we need to wait until the first hop
734        // is created before doing anything else
735        let single_path_with_hops = self
736            .circuits
737            .single_leg_mut()
738            .is_ok_and(|leg| !leg.has_hops());
739        if single_path_with_hops {
740            self.wait_for_create().await?;
741
742            return Ok(());
743        }
744
745        // Prioritize the buffered messages.
746        //
747        // Note: if any of the messages are ready to be handled,
748        // this will block the reactor until we are done processing them
749        //
750        // TODO circpad: If this is a problem, we might want to re-order things so that we
751        // prioritize padding instead.  On the other hand, this should be fixed by refactoring
752        // circuit and tunnel reactors, so we might do well to just leave it alone for now.
753        #[cfg(feature = "conflux")]
754        self.try_dequeue_ooo_msgs().await?;
755
756        let mut events = select_biased! {
757            res = self.command.next() => {
758                let cmd = unwrap_or_shutdown!(self, res, "command channel drop")?;
759                return ControlHandler::new(self).handle_cmd(cmd);
760            },
761            // Check whether we've got a control message pending.
762            //
763            // Note: unfortunately, reading from control here means we might start
764            // handling control messages before our chan_senders are ready.
765            // With the current design, this is inevitable: we can't know which circuit leg
766            // a control message is meant for without first reading the control message from
767            // the channel, and at that point, we can't know for sure whether that particular
768            // circuit is ready for sending.
769            ret = self.control.next() => {
770                let msg = unwrap_or_shutdown!(self, ret, "control drop")?;
771                smallvec![CircuitEvent::HandleControl(msg)]
772            },
773            res = self.circuits.next_circ_event(&self.runtime).fuse() => res?,
774        };
775
776        // Put the events into the order that we need to execute them in.
777        //
778        // (Yes, this _does_ have to be a stable sort.  Not all events may be freely re-ordered
779        // with respect to one another.)
780        events.sort_by_key(|a| a.order_within_batch());
781
782        for event in events {
783            let cmd = match event {
784                CircuitEvent::RunCmd { leg, cmd } => Some(RunOnceCmd::Single(
785                    RunOnceCmdInner::from_circuit_cmd(leg, cmd),
786                )),
787                CircuitEvent::HandleControl(ctrl) => ControlHandler::new(self)
788                    .handle_msg(ctrl)?
789                    .map(RunOnceCmd::Single),
790                CircuitEvent::HandleCell { leg, cell } => {
791                    let circ = self
792                        .circuits
793                        .leg_mut(leg)
794                        .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
795
796                    let circ_cmds = circ.handle_cell(&mut self.cell_handlers, leg, cell)?;
797                    if circ_cmds.is_empty() {
798                        None
799                    } else {
800                        // TODO: we return RunOnceCmd::Multiple even if there's a single command.
801                        //
802                        // See the TODO on `Circuit::handle_cell`.
803                        let cmd = RunOnceCmd::Multiple(
804                            circ_cmds
805                                .into_iter()
806                                .map(|cmd| RunOnceCmdInner::from_circuit_cmd(leg, cmd))
807                                .collect(),
808                        );
809
810                        Some(cmd)
811                    }
812                }
813                CircuitEvent::RemoveLeg { leg, reason } => {
814                    Some(RunOnceCmdInner::RemoveLeg { leg, reason }.into())
815                }
816                CircuitEvent::PaddingAction { leg, padding_event } => {
817                    cfg_if! {
818                        if #[cfg(feature = "circ-padding")] {
819                            Some(RunOnceCmdInner::PaddingAction { leg, padding_event }.into())
820                        } else {
821                            // If padding isn't enabled, we never generate a padding event,
822                            // so we can be sure this case will never be called.
823                            void::unreachable(padding_event.0);
824                        }
825                    }
826                }
827                CircuitEvent::ProtoViolation { err } => {
828                    return Err(err.into());
829                }
830            };
831
832            if let Some(cmd) = cmd {
833                self.handle_run_once_cmd(cmd).await?;
834            }
835        }
836
837        Ok(())
838    }
839
840    /// Try to process the previously-out-of-order messages we might have buffered.
841    #[cfg(feature = "conflux")]
842    #[instrument(level = "trace", skip_all)]
843    async fn try_dequeue_ooo_msgs(&mut self) -> StdResult<(), ReactorError> {
844        // Check if we're ready to dequeue any of the previously out-of-order cells.
845        while let Some(entry) = self.ooo_msgs.peek() {
846            let should_pop = self.circuits.is_seqno_in_order(entry.msg.seqno);
847
848            if !should_pop {
849                break;
850            }
851
852            let entry = self.ooo_msgs.pop().expect("item just disappeared?!");
853
854            let circ = self
855                .circuits
856                .leg_mut(entry.leg_id)
857                .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
858            let handlers = &mut self.cell_handlers;
859            let cmd = circ
860                .handle_in_order_relay_msg(
861                    handlers,
862                    entry.msg.hopnum,
863                    entry.leg_id,
864                    entry.msg.cell_counts_towards_windows,
865                    entry.msg.streamid,
866                    entry.msg.msg,
867                )?
868                .map(|cmd| {
869                    RunOnceCmd::Single(RunOnceCmdInner::from_circuit_cmd(entry.leg_id, cmd))
870                });
871
872            if let Some(cmd) = cmd {
873                self.handle_run_once_cmd(cmd).await?;
874            }
875        }
876
877        Ok(())
878    }
879
880    /// Handle a [`RunOnceCmd`].
881    #[instrument(level = "trace", skip_all)]
882    async fn handle_run_once_cmd(&mut self, cmd: RunOnceCmd) -> StdResult<(), ReactorError> {
883        match cmd {
884            RunOnceCmd::Single(cmd) => return self.handle_single_run_once_cmd(cmd).await,
885            RunOnceCmd::Multiple(cmds) => {
886                // While we know `sendable` is ready to accept *one* cell,
887                // we can't be certain it will be able to accept *all* of the cells
888                // that need to be sent here. This means we *may* end up buffering
889                // in its underlying SometimesUnboundedSink! That is OK, because
890                // RunOnceCmd::Multiple is only used for handling packed cells.
891                for cmd in cmds {
892                    self.handle_single_run_once_cmd(cmd).await?;
893                }
894            }
895        }
896
897        Ok(())
898    }
899
900    /// Handle a [`RunOnceCmd`].
901    #[instrument(level = "trace", skip_all)]
902    async fn handle_single_run_once_cmd(
903        &mut self,
904        cmd: RunOnceCmdInner,
905    ) -> StdResult<(), ReactorError> {
906        match cmd {
907            RunOnceCmdInner::Send { leg, cell, done } => {
908                // TODO: check the cc window
909                let res = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
910                if let Some(done) = done {
911                    // Don't care if the receiver goes away
912                    let _ = done.send(res.clone());
913                }
914                res?;
915            }
916            #[cfg(feature = "send-control-msg")]
917            RunOnceCmdInner::SendMsgAndInstallHandler { msg, handler, done } => {
918                let cell: Result<Option<SendRelayCell>> =
919                    self.prepare_msg_and_install_handler(msg, handler);
920
921                match cell {
922                    Ok(Some(cell)) => {
923                        // TODO(conflux): let the RunOnceCmdInner specify which leg to send the cell on
924                        let outcome = self.circuits.send_relay_cell_on_leg(cell, None).await;
925                        // don't care if receiver goes away.
926                        let _ = done.send(outcome.clone());
927                        outcome?;
928                    }
929                    Ok(None) => {
930                        // don't care if receiver goes away.
931                        let _ = done.send(Ok(()));
932                    }
933                    Err(e) => {
934                        // don't care if receiver goes away.
935                        let _ = done.send(Err(e.clone()));
936                        return Err(e.into());
937                    }
938                }
939            }
940            RunOnceCmdInner::BeginStream {
941                leg,
942                cell,
943                hop,
944                done,
945            } => {
946                match cell {
947                    Ok((cell, stream_id)) => {
948                        let circ = self
949                            .circuits
950                            .leg_mut(leg)
951                            .ok_or_else(|| internal!("leg disappeared?!"))?;
952                        let cell_hop = cell.hop.expect("missing hop in client SendRelayCell?!");
953                        let relay_format = circ
954                            .hop_mut(cell_hop)
955                            // TODO: Is this the right error type here? Or should there be a "HopDisappeared"?
956                            .ok_or(Error::NoSuchHop)?
957                            .relay_cell_format();
958
959                        let outcome = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
960                        // don't care if receiver goes away.
961                        let _ = done.send(outcome.clone().map(|_| (stream_id, hop, relay_format)));
962                        outcome?;
963                    }
964                    Err(e) => {
965                        // don't care if receiver goes away.
966                        let _ = done.send(Err(e.clone()));
967                        return Err(e.into());
968                    }
969                }
970            }
971            RunOnceCmdInner::CloseStream {
972                hop,
973                sid,
974                behav,
975                reason,
976                done,
977            } => {
978                let result = {
979                    let (leg_id, hop_num) = self
980                        .resolve_hop_location(hop)
981                        .map_err(into_bad_api_usage!("Could not resolve {hop:?}"))?;
982                    let leg = self
983                        .circuits
984                        .leg_mut(leg_id)
985                        .ok_or(bad_api_usage!("No leg for id {:?}", leg_id))?;
986                    Ok::<_, Bug>((leg, hop_num))
987                };
988
989                let (leg, hop_num) = match result {
990                    Ok(x) => x,
991                    Err(e) => {
992                        if let Some(done) = done {
993                            // don't care if the sender goes away
994                            let e = into_bad_api_usage!("Could not resolve {hop:?}")(e);
995                            let _ = done.send(Err(e.into()));
996                        }
997                        return Ok(());
998                    }
999                };
1000
1001                let max_rtt = {
1002                    let hop = leg
1003                        .hop(hop_num)
1004                        .ok_or_else(|| internal!("the hop we resolved disappeared?!"))?;
1005                    let ccontrol = hop.ccontrol();
1006
1007                    // Note: if we have no measurements for the RTT, this will be set to 0,
1008                    // and the timeout will be 2 * CBT.
1009                    ccontrol
1010                        .rtt()
1011                        .max_rtt_usec()
1012                        .map(|rtt| Duration::from_millis(u64::from(rtt)))
1013                        .unwrap_or_default()
1014                };
1015
1016                // The length of the circuit up until the hop that has the half-streeam.
1017                //
1018                // +1, because HopNums are zero-based.
1019                let circ_len = usize::from(hop_num) + 1;
1020
1021                // We double the CBT to account for rend circuits,
1022                // which are twice as long (otherwise we risk expiring
1023                // the rend half-streams too soon).
1024                let timeout = std::cmp::max(max_rtt, 2 * leg.estimate_cbt(circ_len));
1025                let expire_at = self.runtime.now() + timeout;
1026
1027                let res: Result<()> = leg
1028                    .close_stream(hop_num, sid, behav, reason, expire_at)
1029                    .await;
1030
1031                if let Some(done) = done {
1032                    // don't care if the sender goes away
1033                    let _ = done.send(res);
1034                }
1035            }
1036            RunOnceCmdInner::MaybeSendXon {
1037                rate,
1038                stream_id,
1039                hop,
1040            } => {
1041                let (leg_id, hop_num) = match self.resolve_hop_location(hop) {
1042                    Ok(x) => x,
1043                    Err(NoJoinPointError) => {
1044                        // A stream tried to send an XON message message to the join point of
1045                        // a tunnel that has never had a join point. Currently in arti, only a
1046                        // `StreamTarget` asks us to send an XON message, and this tunnel
1047                        // originally created the `StreamTarget` to begin with. So this is a
1048                        // legitimate bug somewhere in the tunnel code.
1049                        return Err(
1050                            internal!(
1051                                "Could not send an XON message to a join point on a tunnel without a join point",
1052                            )
1053                            .into()
1054                        );
1055                    }
1056                };
1057
1058                let Some(leg) = self.circuits.leg_mut(leg_id) else {
1059                    // The leg has disappeared. This is fine since the stream may have ended and
1060                    // been cleaned up while this `CtrlMsg::MaybeSendXon` message was queued.
1061                    // It is possible that is a bug and this is an incorrect leg number, but
1062                    // it's not currently possible to differentiate between an incorrect leg
1063                    // number and a tunnel leg that has been closed.
1064                    debug!("Could not send an XON message on a leg that does not exist. Ignoring.");
1065                    return Ok(());
1066                };
1067
1068                let Some(hop) = leg.hop_mut(hop_num) else {
1069                    // The hop has disappeared. This is fine since the circuit may have been
1070                    // been truncated while the `CtrlMsg::MaybeSendXon` message was queued.
1071                    // It is possible that is a bug and this is an incorrect hop number, but
1072                    // it's not currently possible to differentiate between an incorrect hop
1073                    // number and a circuit hop that has been removed.
1074                    debug!("Could not send an XON message on a hop that does not exist. Ignoring.");
1075                    return Ok(());
1076                };
1077
1078                let Some(msg) = hop.maybe_send_xon(rate, stream_id)? else {
1079                    // Nothing to do.
1080                    return Ok(());
1081                };
1082
1083                let cell = AnyRelayMsgOuter::new(Some(stream_id), msg.into());
1084                let cell = SendRelayCell {
1085                    hop: Some(hop_num),
1086                    early: false,
1087                    cell,
1088                };
1089
1090                leg.send_relay_cell(cell).await?;
1091            }
1092            RunOnceCmdInner::HandleSendMe { leg, hop, sendme } => {
1093                let leg = self
1094                    .circuits
1095                    .leg_mut(leg)
1096                    .ok_or_else(|| internal!("leg disappeared?!"))?;
1097                // NOTE: it's okay to await. We are only awaiting on the congestion_signals
1098                // future which *should* resolve immediately
1099                let signals = leg.chan_sender.congestion_signals().await;
1100                leg.handle_sendme(hop, sendme, signals)?;
1101            }
1102            RunOnceCmdInner::FirstHopClockSkew { answer } => {
1103                let res = self.circuits.single_leg_mut().map(|leg| leg.clock_skew());
1104
1105                // don't care if the sender goes away
1106                let _ = answer.send(res.map_err(Into::into));
1107            }
1108            RunOnceCmdInner::CleanShutdown => {
1109                trace!(tunnel_id = %self.tunnel_id, "reactor shutdown due to handled cell");
1110                return Err(ReactorError::Shutdown);
1111            }
1112            RunOnceCmdInner::RemoveLeg { leg, reason } => {
1113                warn!(tunnel_id = %self.tunnel_id, reason = %reason, "removing circuit leg");
1114
1115                let circ = self.circuits.remove(leg)?;
1116                let is_conflux_pending = circ.is_conflux_pending();
1117
1118                // Drop the removed leg. This will cause it to close if it's not already closed.
1119                drop(circ);
1120
1121                // If we reach this point, it means we have more than one leg
1122                // (otherwise the .remove() would've returned a Shutdown error),
1123                // so we expect there to be a ConfluxHandshakeContext installed.
1124
1125                #[cfg(feature = "conflux")]
1126                if is_conflux_pending {
1127                    let (error, proto_violation): (_, Option<Error>) = match &reason {
1128                        RemoveLegReason::ConfluxHandshakeTimeout => {
1129                            (ConfluxHandshakeError::Timeout, None)
1130                        }
1131                        RemoveLegReason::ConfluxHandshakeErr(e) => {
1132                            (ConfluxHandshakeError::Link(e.clone()), Some(e.clone()))
1133                        }
1134                        RemoveLegReason::ChannelClosed => {
1135                            (ConfluxHandshakeError::ChannelClosed, None)
1136                        }
1137                    };
1138
1139                    self.note_conflux_handshake_result(Err(error), proto_violation.is_some())?;
1140
1141                    if let Some(e) = proto_violation {
1142                        tor_error::warn_report!(
1143                            e,
1144                            tunnel_id = %self.tunnel_id,
1145                            "Malformed conflux handshake, tearing down tunnel",
1146                        );
1147
1148                        return Err(e.into());
1149                    }
1150                }
1151            }
1152            #[cfg(feature = "conflux")]
1153            RunOnceCmdInner::ConfluxHandshakeComplete { leg, cell } => {
1154                // Note: on the client-side, the handshake is considered complete once the
1155                // RELAY_CONFLUX_LINKED_ACK is sent (roughly upon receipt of the LINKED cell).
1156                //
1157                // We're optimistic here, and declare the handshake a success *before*
1158                // sending the LINKED_ACK response. I think this is OK though,
1159                // because if the send_relay_cell() below fails, the reactor will shut
1160                // down anyway. OTOH, marking the handshake as complete slightly early
1161                // means that on the happy path, the circuit is marked as usable sooner,
1162                // instead of blocking on the sending of the LINKED_ACK.
1163                self.note_conflux_handshake_result(Ok(()), false)?;
1164
1165                let res = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
1166
1167                res?;
1168            }
1169            #[cfg(feature = "conflux")]
1170            RunOnceCmdInner::Link { circuits, answer } => {
1171                // Add the specified circuits to our conflux set,
1172                // and send a LINK cell down each unlinked leg.
1173                //
1174                // NOTE: this will block the reactor until all the cells are sent.
1175                self.handle_link_circuits(circuits, answer).await?;
1176            }
1177            #[cfg(feature = "conflux")]
1178            RunOnceCmdInner::Enqueue { leg, msg } => {
1179                let entry = ConfluxHeapEntry { leg_id: leg, msg };
1180                self.ooo_msgs.push(entry);
1181            }
1182            #[cfg(feature = "circ-padding")]
1183            RunOnceCmdInner::PaddingAction { leg, padding_event } => {
1184                // TODO: If we someday move back to having a per-circuit reactor,
1185                // this event would logically belong there, not on the tunnel reactor.
1186                self.circuits.run_padding_event(leg, padding_event).await?;
1187            }
1188        }
1189
1190        Ok(())
1191    }
1192
1193    /// Wait for a [`CtrlMsg::Create`] to come along to set up the circuit.
1194    ///
1195    /// Returns an error if an unexpected `CtrlMsg` is received.
1196    #[instrument(level = "trace", skip_all)]
1197    async fn wait_for_create(&mut self) -> StdResult<(), ReactorError> {
1198        let msg = select_biased! {
1199            res = self.command.next() => {
1200                let cmd = unwrap_or_shutdown!(self, res, "shutdown channel drop")?;
1201                match cmd {
1202                    CtrlCmd::Shutdown => return self.handle_shutdown().map(|_| ()),
1203                    #[cfg(test)]
1204                    CtrlCmd::AddFakeHop {
1205                        relay_cell_format: format,
1206                        fwd_lasthop,
1207                        rev_lasthop,
1208                        peer_id,
1209                        params,
1210                        done,
1211                    } => {
1212                        let leg = self.circuits.single_leg_mut()?;
1213                        leg.handle_add_fake_hop(format, fwd_lasthop, rev_lasthop, peer_id, &params, done);
1214                        return Ok(())
1215                    },
1216                    _ => {
1217                        trace!("reactor shutdown due to unexpected command: {:?}", cmd);
1218                        return Err(Error::CircProto(format!("Unexpected control {cmd:?} on client circuit")).into());
1219                    }
1220                }
1221            },
1222            res = self.control.next() => unwrap_or_shutdown!(self, res, "control drop")?,
1223        };
1224
1225        match msg {
1226            CtrlMsg::Create {
1227                recv_created,
1228                handshake,
1229                settings,
1230                done,
1231            } => {
1232                // TODO(conflux): instead of crashing the reactor, it might be better
1233                // to send the error via the done channel instead
1234                let leg = self.circuits.single_leg_mut()?;
1235                leg.handle_create(recv_created, handshake, settings, done)
1236                    .await
1237            }
1238            _ => {
1239                trace!("reactor shutdown due to unexpected cell: {:?}", msg);
1240
1241                Err(Error::CircProto(format!("Unexpected {msg:?} cell on client circuit")).into())
1242            }
1243        }
1244    }
1245
1246    /// Add the specified handshake result to our `ConfluxHandshakeContext`.
1247    ///
1248    /// If all the circuits we were waiting on have finished the conflux handshake,
1249    /// the `ConfluxHandshakeContext` is consumed, and the results we have collected
1250    /// are sent to the handshake initiator.
1251    #[cfg(feature = "conflux")]
1252    #[instrument(level = "trace", skip_all)]
1253    fn note_conflux_handshake_result(
1254        &mut self,
1255        res: StdResult<(), ConfluxHandshakeError>,
1256        reactor_is_closing: bool,
1257    ) -> StdResult<(), ReactorError> {
1258        let tunnel_complete = match self.conflux_hs_ctx.as_mut() {
1259            Some(conflux_ctx) => {
1260                conflux_ctx.results.push(res);
1261                // Whether all the legs have finished linking:
1262                conflux_ctx.results.len() == conflux_ctx.num_legs
1263            }
1264            None => {
1265                return Err(internal!("no conflux handshake context").into());
1266            }
1267        };
1268
1269        if tunnel_complete || reactor_is_closing {
1270            // Time to remove the conflux handshake context
1271            // and extract the results we have collected
1272            let conflux_ctx = self.conflux_hs_ctx.take().expect("context disappeared?!");
1273
1274            let success_count = conflux_ctx.results.iter().filter(|res| res.is_ok()).count();
1275            let leg_count = conflux_ctx.results.len();
1276
1277            info!(
1278                tunnel_id = %self.tunnel_id,
1279                "conflux tunnel ready ({success_count}/{leg_count} circuits successfully linked)",
1280            );
1281
1282            send_conflux_outcome(conflux_ctx.answer, Ok(conflux_ctx.results))?;
1283
1284            // We don't expect to receive any more handshake results,
1285            // at least not until we get another LinkCircuits control message,
1286            // which will install a new ConfluxHandshakeCtx with a channel
1287            // for us to send updates on
1288        }
1289
1290        Ok(())
1291    }
1292
1293    /// Prepare a `SendRelayCell` request, and install the given meta-cell handler.
1294    fn prepare_msg_and_install_handler(
1295        &mut self,
1296        msg: Option<AnyRelayMsgOuter>,
1297        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
1298    ) -> Result<Option<SendRelayCell>> {
1299        let msg = msg
1300            .map(|msg| {
1301                let handlers = &mut self.cell_handlers;
1302                let handler = handler
1303                    .as_ref()
1304                    .or(handlers.meta_handler.as_ref())
1305                    .ok_or_else(|| internal!("tried to use an ended Conversation"))?;
1306                // We should always have a precise HopLocation here so this should never fails but
1307                // in case we have a ::JointPoint, we'll notice.
1308                let hop = handler.expected_hop().hop_num().ok_or(bad_api_usage!(
1309                    "MsgHandler doesn't have a precise HopLocation"
1310                ))?;
1311                Ok::<_, crate::Error>(SendRelayCell {
1312                    hop: Some(hop),
1313                    early: false,
1314                    cell: msg,
1315                })
1316            })
1317            .transpose()?;
1318
1319        if let Some(handler) = handler {
1320            self.cell_handlers.set_meta_handler(handler)?;
1321        }
1322
1323        Ok(msg)
1324    }
1325
1326    /// Handle a shutdown request.
1327    fn handle_shutdown(&self) -> StdResult<Option<RunOnceCmdInner>, ReactorError> {
1328        trace!(
1329            tunnel_id = %self.tunnel_id,
1330            "reactor shutdown due to explicit request",
1331        );
1332
1333        Err(ReactorError::Shutdown)
1334    }
1335
1336    /// Handle a request to shutdown the reactor and return the only [`Circuit`] in this tunnel.
1337    ///
1338    /// Returns an error over the `answer` channel if the reactor has no circuits,
1339    /// or more than one circuit. The reactor will shut down regardless.
1340    #[cfg(feature = "conflux")]
1341    fn handle_shutdown_and_return_circuit(
1342        &mut self,
1343        answer: oneshot::Sender<StdResult<Circuit, Bug>>,
1344    ) -> StdResult<(), ReactorError> {
1345        // Don't care if the receiver goes away
1346        let _ = answer.send(self.circuits.take_single_leg());
1347        self.handle_shutdown().map(|_| ())
1348    }
1349
1350    /// Resolves a [`TargetHop`] to a [`HopLocation`].
1351    ///
1352    /// After resolving a `TargetHop::LastHop`,
1353    /// the `HopLocation` can become stale if a single-path circuit is later extended or truncated.
1354    /// This means that the `HopLocation` can become stale from one reactor iteration to the next.
1355    ///
1356    /// It's generally okay to hold on to a (possibly stale) `HopLocation`
1357    /// if you need a fixed hop position in the tunnel.
1358    /// For example if we open a stream to `TargetHop::LastHop`,
1359    /// we would want to store the stream position as a `HopLocation` and not a `TargetHop::LastHop`
1360    /// as we don't want the stream position to change as the tunnel is extended or truncated.
1361    ///
1362    /// Returns [`NoHopsBuiltError`] if trying to resolve `TargetHop::LastHop`
1363    /// and the tunnel has no hops
1364    /// (either has no legs, or has legs which contain no hops).
1365    fn resolve_target_hop(&self, hop: TargetHop) -> StdResult<HopLocation, NoHopsBuiltError> {
1366        match hop {
1367            TargetHop::Hop(hop) => Ok(hop),
1368            TargetHop::LastHop => {
1369                if let Ok(leg) = self.circuits.single_leg() {
1370                    let leg_id = leg.unique_id();
1371                    // single-path tunnel
1372                    let hop = leg.last_hop_num().ok_or(NoHopsBuiltError)?;
1373                    Ok(HopLocation::Hop((leg_id, hop)))
1374                } else if !self.circuits.is_empty() {
1375                    // multi-path tunnel
1376                    Ok(HopLocation::JoinPoint)
1377                } else {
1378                    // no legs
1379                    Err(NoHopsBuiltError)
1380                }
1381            }
1382        }
1383    }
1384
1385    /// Resolves a [`HopLocation`] to a [`UniqId`] and [`HopNum`].
1386    ///
1387    /// After resolving a `HopLocation::JoinPoint`,
1388    /// the [`UniqId`] and [`HopNum`] can become stale if the primary leg changes.
1389    ///
1390    /// You should try to only resolve to a specific [`UniqId`] and [`HopNum`] immediately before you
1391    /// need them,
1392    /// and you should not hold on to the resolved [`UniqId`] and [`HopNum`] between reactor
1393    /// iterations as the primary leg may change from one iteration to the next.
1394    ///
1395    /// Returns [`NoJoinPointError`] if trying to resolve `HopLocation::JoinPoint`
1396    /// but it does not have a join point.
1397    #[instrument(level = "trace", skip_all)]
1398    fn resolve_hop_location(
1399        &self,
1400        hop: HopLocation,
1401    ) -> StdResult<(UniqId, HopNum), NoJoinPointError> {
1402        match hop {
1403            HopLocation::Hop((leg_id, hop_num)) => Ok((leg_id, hop_num)),
1404            HopLocation::JoinPoint => {
1405                if let Some((leg_id, hop_num)) = self.circuits.primary_join_point() {
1406                    Ok((leg_id, hop_num))
1407                } else {
1408                    // Attempted to get the join point of a non-multipath tunnel.
1409                    Err(NoJoinPointError)
1410                }
1411            }
1412        }
1413    }
1414
1415    /// Resolve a [`TargetHop`] directly into a [`UniqId`] and [`HopNum`].
1416    ///
1417    /// This is a helper function that basically calls both resolve_target_hop and
1418    /// resolve_hop_location back to back.
1419    ///
1420    /// It returns None on failure to resolve meaning that if you want more detailed error on why
1421    /// it failed, explicitly use the resolve_hop_location() and resolve_target_hop() functions.
1422    pub(crate) fn target_hop_to_hopnum_id(&self, hop: TargetHop) -> Option<(UniqId, HopNum)> {
1423        self.resolve_target_hop(hop)
1424            .ok()
1425            .and_then(|resolved| self.resolve_hop_location(resolved).ok())
1426    }
1427
1428    /// Install or remove a padder at a given hop.
1429    #[cfg(feature = "circ-padding-manual")]
1430    fn set_padding_at_hop(
1431        &self,
1432        hop: HopLocation,
1433        padder: Option<super::circuit::padding::CircuitPadder>,
1434    ) -> Result<()> {
1435        let HopLocation::Hop((leg_id, hop_num)) = hop else {
1436            return Err(bad_api_usage!("Padding to the join point is not supported.").into());
1437        };
1438        let circ = self.circuits.leg(leg_id).ok_or(Error::NoSuchHop)?;
1439        circ.set_padding_at_hop(hop_num, padder)?;
1440        Ok(())
1441    }
1442
1443    /// Does congestion control use stream SENDMEs for the given hop?
1444    ///
1445    /// Returns `None` if either the `leg` or `hop` don't exist.
1446    fn uses_stream_sendme(&self, leg: UniqId, hop: HopNum) -> Option<bool> {
1447        self.circuits.uses_stream_sendme(leg, hop)
1448    }
1449
1450    /// Handle a request to link some extra circuits in the reactor's conflux set.
1451    ///
1452    /// The circuits are validated, and if they do not have the same length,
1453    /// or if they do not all have the same last hop, an error is returned on
1454    /// the `answer` channel, and the conflux handshake is *not* initiated.
1455    ///
1456    /// If validation succeeds, the circuits are added to this reactor's conflux set,
1457    /// and the conflux handshake is initiated (by sending a LINK cell on each leg).
1458    ///
1459    /// NOTE: this blocks the reactor main loop until all the cells are sent.
1460    #[cfg(feature = "conflux")]
1461    #[instrument(level = "trace", skip_all)]
1462    async fn handle_link_circuits(
1463        &mut self,
1464        circuits: Vec<Circuit>,
1465        answer: ConfluxLinkResultChannel,
1466    ) -> StdResult<(), ReactorError> {
1467        use tor_error::warn_report;
1468
1469        if self.conflux_hs_ctx.is_some() {
1470            let err = internal!("conflux linking already in progress");
1471            send_conflux_outcome(answer, Err(err.into()))?;
1472
1473            return Ok(());
1474        }
1475
1476        let unlinked_legs = self.circuits.num_unlinked();
1477
1478        // We need to send the LINK cell on each of the new circuits
1479        // and on each of the existing, unlinked legs from self.circuits.
1480        //
1481        // In reality, there can only be one such circuit
1482        // (the "initial" one from the previously single-path tunnel),
1483        // because any circuits that to complete the conflux handshake
1484        // get removed from the set.
1485        let num_legs = circuits.len() + unlinked_legs;
1486
1487        // Note: add_legs validates `circuits`
1488        let res = async {
1489            self.circuits.add_legs(circuits, &self.runtime)?;
1490            self.circuits.link_circuits(&self.runtime).await
1491        }
1492        .await;
1493
1494        if let Err(e) = res {
1495            warn_report!(e, "Failed to link conflux circuits");
1496
1497            send_conflux_outcome(answer, Err(e))?;
1498        } else {
1499            // Save the channel, to notify the user of completion.
1500            self.conflux_hs_ctx = Some(ConfluxHandshakeCtx {
1501                answer,
1502                num_legs,
1503                results: Default::default(),
1504            });
1505        }
1506
1507        Ok(())
1508    }
1509}
1510
1511/// Notify the conflux handshake initiator of the handshake outcome.
1512///
1513/// Returns an error if the initiator has done away.
1514#[cfg(feature = "conflux")]
1515fn send_conflux_outcome(
1516    tx: ConfluxLinkResultChannel,
1517    res: Result<ConfluxHandshakeResult>,
1518) -> StdResult<(), ReactorError> {
1519    if tx.send(res).is_err() {
1520        tracing::warn!("conflux initiator went away before handshake completed?");
1521        return Err(ReactorError::Shutdown);
1522    }
1523
1524    Ok(())
1525}
1526
1527/// The tunnel does not have any hops.
1528#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
1529#[non_exhaustive]
1530#[error("no hops have been built for this tunnel")]
1531pub(crate) struct NoHopsBuiltError;
1532
1533/// The tunnel does not have a join point.
1534#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
1535#[non_exhaustive]
1536#[error("the tunnel does not have a join point")]
1537pub(crate) struct NoJoinPointError;
1538
1539impl CellHandlers {
1540    /// Try to install a given meta-cell handler to receive any unusual cells on
1541    /// this circuit, along with a result channel to notify on completion.
1542    fn set_meta_handler(&mut self, handler: Box<dyn MetaCellHandler + Send>) -> Result<()> {
1543        if self.meta_handler.is_none() {
1544            self.meta_handler = Some(handler);
1545            Ok(())
1546        } else {
1547            Err(Error::from(internal!(
1548                "Tried to install a meta-cell handler before the old one was gone."
1549            )))
1550        }
1551    }
1552
1553    /// Try to install a given cell handler on this circuit.
1554    #[cfg(feature = "hs-service")]
1555    fn set_incoming_stream_req_handler(
1556        &mut self,
1557        handler: IncomingStreamRequestHandler,
1558    ) -> Result<()> {
1559        if self.incoming_stream_req_handler.is_none() {
1560            self.incoming_stream_req_handler = Some(handler);
1561            Ok(())
1562        } else {
1563            Err(Error::from(internal!(
1564                "Tried to install a BEGIN cell handler before the old one was gone."
1565            )))
1566        }
1567    }
1568}
1569
1570#[cfg(test)]
1571mod test {
1572    // Tested in [`crate::client::circuit::test`].
1573}