Skip to main content

tor_proto/client/
reactor.rs

1//! Code to handle incoming cells on a circuit.
2//!
3//! ## On message validation
4//!
5//! There are three steps for validating an incoming message on a stream:
6//!
7//! 1. Is the message contextually appropriate? (e.g., no more than one
8//!    `CONNECTED` message per stream.) This is handled by calling
9//!    [`CmdChecker::check_msg`](crate::stream::cmdcheck::CmdChecker::check_msg).
10//! 2. Does the message comply with flow-control rules? (e.g., no more SENDMEs
11//!    than we've sent data for.) This is handled within the reactor by the
12//!    `StreamFlowCtrl`. For half-closed streams which don't send stream
13//!    SENDMEs, an additional receive-window check is performed in the
14//!    `halfstream` module.
15//! 3. Does the message have an acceptable command type, and is the message
16//!    well-formed? For open streams, the streams themselves handle this check.
17//!    For half-closed streams, the reactor handles it by calling
18//!    `consume_checked_msg()`.
19
20pub(crate) mod circuit;
21mod conflux;
22mod control;
23pub(super) mod syncview;
24
25use crate::circuit::circhop::SendRelayCell;
26use crate::circuit::{CircuitRxReceiver, UniqId};
27use crate::client::circuit::ClientCircChanMsg;
28use crate::client::circuit::padding::{PaddingController, PaddingEvent, PaddingEventStream};
29use crate::client::{HopLocation, TargetHop};
30use crate::crypto::cell::HopNum;
31use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
32use crate::memquota::CircuitAccount;
33use crate::stream::CloseStreamBehavior;
34use crate::streammap;
35use crate::tunnel::{TunnelId, TunnelScopedCircId};
36use crate::util::err::ReactorError;
37use crate::util::skew::ClockSkew;
38use crate::util::timeout::TimeoutEstimator;
39use crate::{Error, Result};
40use circuit::Circuit;
41use conflux::ConfluxSet;
42use control::ControlHandler;
43use std::cmp::Ordering;
44use std::collections::BinaryHeap;
45use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
46use tor_cell::relaycell::msg::Sendme;
47use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId, UnparsedRelayMsg};
48use tor_error::{Bug, bad_api_usage, debug_report, internal, into_bad_api_usage};
49use tor_rtcompat::{DynTimeProvider, SleepProvider};
50
51use cfg_if::cfg_if;
52use futures::StreamExt;
53use futures::channel::mpsc;
54use futures::{FutureExt as _, select_biased};
55use oneshot_fused_workaround as oneshot;
56
57use std::result::Result as StdResult;
58use std::sync::Arc;
59use std::time::Duration;
60
61use crate::channel::Channel;
62use crate::conflux::msghandler::RemoveLegReason;
63use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey};
64use circuit::CircuitCmd;
65use derive_more::From;
66use smallvec::smallvec;
67use tor_cell::chancell::CircId;
68use tor_llcrypto::pk;
69use tracing::{debug, info, instrument, trace, warn};
70
71use super::circuit::{MutableState, TunnelMutableState};
72use crate::circuit::reactor::ReactorResultChannel;
73
74#[cfg(feature = "hs-service")]
75use crate::stream::incoming::IncomingStreamRequestHandler;
76
77#[cfg(feature = "conflux")]
78use {
79    crate::conflux::msghandler::{ConfluxCmd, OooRelayMsg},
80    crate::util::err::ConfluxHandshakeError,
81};
82
83pub(super) use control::{CtrlCmd, CtrlMsg, FlowCtrlMsg};
84
85/// Contains a list of conflux handshake results.
86#[cfg(feature = "conflux")]
87pub(super) type ConfluxHandshakeResult = Vec<StdResult<(), ConfluxHandshakeError>>;
88
89/// The type of oneshot channel used to inform reactor users of the outcome
90/// of a client-side conflux handshake.
91///
92/// Contains a list of handshake results, one for each circuit that we were asked
93/// to link in the tunnel.
94#[cfg(feature = "conflux")]
95pub(super) type ConfluxLinkResultChannel = ReactorResultChannel<ConfluxHandshakeResult>;
96
97/// A handshake type, to be used when creating circuit hops.
98#[derive(Clone, Debug)]
99pub(crate) enum CircuitHandshake {
100    /// Use the CREATE_FAST handshake.
101    CreateFast,
102    /// Use the ntor handshake.
103    Ntor {
104        /// The public key of the relay.
105        public_key: NtorPublicKey,
106        /// The Ed25519 identity of the relay, which is verified against the
107        /// identity held in the circuit's channel.
108        ed_identity: pk::ed25519::Ed25519Identity,
109    },
110    /// Use the ntor-v3 handshake.
111    NtorV3 {
112        /// The public key of the relay.
113        public_key: NtorV3PublicKey,
114    },
115}
116
117// TODO: the RunOnceCmd/RunOnceCmdInner/CircuitCmd/CircuitEvent enum
118// proliferation is a bit bothersome, but unavoidable with the current design.
119//
120// We should consider getting rid of some of these enums (if possible),
121// and coming up with more intuitive names.
122
123/// One or more [`RunOnceCmdInner`] to run inside [`Reactor::run_once`].
124#[derive(From, Debug)]
125#[allow(clippy::large_enum_variant)] // TODO #2003: resolve this
126enum RunOnceCmd {
127    /// Run a single `RunOnceCmdInner` command.
128    Single(RunOnceCmdInner),
129    /// Run multiple `RunOnceCmdInner` commands.
130    //
131    // Note: this whole enum *could* be replaced with Vec<RunOnceCmdInner>,
132    // but most of the time we're only going to have *one* RunOnceCmdInner
133    // to run per run_once() loop. The enum enables us avoid the extra heap
134    // allocation for the `RunOnceCmd::Single` case.
135    Multiple(Vec<RunOnceCmdInner>),
136}
137
138/// Instructions for running something in the reactor loop.
139///
140/// Run at the end of [`Reactor::run_once`].
141//
142// TODO: many of the variants of this enum have an identical CtrlMsg counterpart.
143// We should consider making each variant a tuple variant and deduplicating the fields.
144#[derive(educe::Educe)]
145#[educe(Debug)]
146enum RunOnceCmdInner {
147    /// Send a RELAY cell.
148    Send {
149        /// The leg the cell should be sent on.
150        leg: UniqId,
151        /// The cell to send.
152        cell: SendRelayCell,
153        /// A channel for sending completion notifications.
154        done: Option<ReactorResultChannel<()>>,
155    },
156    /// Send a given control message on this circuit, and install a control-message handler to
157    /// receive responses.
158    #[cfg(feature = "send-control-msg")]
159    SendMsgAndInstallHandler {
160        /// The message to send, if any
161        msg: Option<AnyRelayMsgOuter>,
162        /// A message handler to install.
163        ///
164        /// If this is `None`, there must already be a message handler installed
165        #[educe(Debug(ignore))]
166        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
167        /// A sender that we use to tell the caller that the message was sent
168        /// and the handler installed.
169        done: oneshot::Sender<Result<()>>,
170    },
171    /// Handle a SENDME message.
172    HandleSendMe {
173        /// The leg the SENDME was received on.
174        leg: UniqId,
175        /// The hop number.
176        hop: HopNum,
177        /// The SENDME message to handle.
178        sendme: Sendme,
179    },
180    /// Begin a stream with the provided hop in this circuit.
181    ///
182    /// Uses the provided stream ID, and sends the provided message to that hop.
183    BeginStream {
184        /// The cell to send.
185        cell: Result<(SendRelayCell, StreamId)>,
186        /// The location of the hop on the tunnel. We don't use this (and `Circuit`s shouldn't need
187        /// to worry about legs anyways), but need it so that we can pass it back in `done` to the
188        /// caller.
189        hop: HopLocation,
190        /// The circuit leg to begin the stream on.
191        leg: UniqId,
192        /// Oneshot channel to notify on completion, with the allocated stream ID.
193        done: ReactorResultChannel<(StreamId, HopLocation, RelayCellFormat)>,
194    },
195    /// Consider sending an XON message with the given `rate`.
196    MaybeSendXon {
197        /// The drain rate to advertise in the XON message.
198        rate: XonKbpsEwma,
199        /// The ID of the stream to send the message on.
200        stream_id: StreamId,
201        /// The location of the hop on the tunnel.
202        hop: HopLocation,
203    },
204    /// Close the specified stream.
205    CloseStream {
206        /// The hop number.
207        hop: HopLocation,
208        /// The ID of the stream to close.
209        sid: StreamId,
210        /// The stream-closing behavior.
211        behav: CloseStreamBehavior,
212        /// The reason for closing the stream.
213        reason: streammap::TerminateReason,
214        /// A channel for sending completion notifications.
215        done: Option<ReactorResultChannel<()>>,
216    },
217    /// Get the clock skew claimed by the first hop of the circuit.
218    FirstHopClockSkew {
219        /// Oneshot channel to return the clock skew.
220        answer: oneshot::Sender<StdResult<ClockSkew, Bug>>,
221    },
222    /// Remove a circuit leg from the conflux set.
223    RemoveLeg {
224        /// The circuit leg to remove.
225        leg: UniqId,
226        /// The reason for removal.
227        ///
228        /// This is only used for conflux circuits that get removed
229        /// before the conflux handshake is complete.
230        ///
231        /// The [`RemoveLegReason`] is mapped by the reactor to a
232        /// [`ConfluxHandshakeError`] that is sent to the initiator of the
233        /// handshake to indicate the reason the handshake failed.
234        reason: RemoveLegReason,
235    },
236    /// A circuit has completed the conflux handshake,
237    /// and wants to send the specified cell.
238    ///
239    /// This is similar to [`RunOnceCmdInner::Send`],
240    /// but needs to remain a separate variant,
241    /// because in addition to instructing the reactor to send a cell,
242    /// it also notifies it that the conflux handshake is complete on the specified `leg`.
243    /// This enables the reactor to save the handshake result (`Ok(())`),
244    /// and, if there are no other legs still in the handshake phase,
245    /// send the result to the handshake initiator.
246    #[cfg(feature = "conflux")]
247    ConfluxHandshakeComplete {
248        /// The circuit leg that has completed the handshake,
249        /// This is the leg the cell should be sent on.
250        leg: UniqId,
251        /// The cell to send.
252        cell: SendRelayCell,
253    },
254    /// Send a LINK cell on each of the unlinked circuit legs in the conflux set of this reactor.
255    #[cfg(feature = "conflux")]
256    Link {
257        /// The circuits to link into the tunnel
258        #[educe(Debug(ignore))]
259        circuits: Vec<Circuit>,
260        /// Oneshot channel for notifying of conflux handshake completion.
261        answer: ConfluxLinkResultChannel,
262    },
263    /// Enqueue an out-of-order cell in ooo_msg.
264    #[cfg(feature = "conflux")]
265    Enqueue {
266        /// The leg the entry originated from.
267        leg: UniqId,
268        /// The out-of-order message.
269        msg: OooRelayMsg,
270    },
271    /// Take a padding-related event on a circuit leg.
272    #[cfg(feature = "circ-padding")]
273    PaddingAction {
274        /// The leg to event on.
275        leg: UniqId,
276        /// The event to take.
277        padding_event: PaddingEvent,
278    },
279    /// Perform a clean shutdown on this circuit.
280    CleanShutdown,
281}
282
283impl RunOnceCmdInner {
284    /// Create a [`RunOnceCmdInner`] out of a [`CircuitCmd`] and [`UniqId`].
285    fn from_circuit_cmd(leg: UniqId, cmd: CircuitCmd) -> Self {
286        match cmd {
287            CircuitCmd::Send(cell) => Self::Send {
288                leg,
289                cell,
290                done: None,
291            },
292            CircuitCmd::HandleSendMe { hop, sendme } => Self::HandleSendMe { leg, hop, sendme },
293            CircuitCmd::CloseStream {
294                hop,
295                sid,
296                behav,
297                reason,
298            } => Self::CloseStream {
299                hop: HopLocation::Hop((leg, hop)),
300                sid,
301                behav,
302                reason,
303                done: None,
304            },
305            #[cfg(feature = "conflux")]
306            CircuitCmd::Conflux(ConfluxCmd::RemoveLeg(reason)) => Self::RemoveLeg { leg, reason },
307            #[cfg(feature = "conflux")]
308            CircuitCmd::Conflux(ConfluxCmd::HandshakeComplete { hop, early, cell }) => {
309                let cell = SendRelayCell {
310                    hop: Some(hop),
311                    early,
312                    cell,
313                };
314                Self::ConfluxHandshakeComplete { leg, cell }
315            }
316            #[cfg(feature = "conflux")]
317            CircuitCmd::Enqueue(msg) => Self::Enqueue { leg, msg },
318            CircuitCmd::CleanShutdown => Self::CleanShutdown,
319        }
320    }
321}
322
323/// A command to execute at the end of [`Reactor::run_once`].
324#[derive(From, Debug)]
325#[allow(clippy::large_enum_variant)] // TODO #2003: should we resolve this?
326enum CircuitEvent {
327    /// Run a single `CircuitCmd` command.
328    RunCmd {
329        /// The unique identifier of the circuit leg to run the command on
330        leg: UniqId,
331        /// The command to run.
332        cmd: CircuitCmd,
333    },
334    /// Handle a control message
335    HandleControl(CtrlMsg),
336    /// Handle an input message.
337    HandleCell {
338        /// The unique identifier of the circuit leg the message was received on.
339        leg: UniqId,
340        /// The message to handle.
341        cell: ClientCircChanMsg,
342    },
343    /// Remove the specified circuit leg from the conflux set.
344    ///
345    /// Returned whenever a single circuit leg needs to be be removed
346    /// from the reactor's conflux set, without necessarily tearing down
347    /// the whole set or shutting down the reactor.
348    ///
349    /// Note: this event *can* cause the reactor to shut down
350    /// (and the conflux set to be closed).
351    ///
352    /// See the [`ConfluxSet::remove`] docs for more on the exact behavior of this command.
353    RemoveLeg {
354        /// The leg to remove.
355        leg: UniqId,
356        /// The reason for removal.
357        ///
358        /// This is only used for conflux circuits that get removed
359        /// before the conflux handshake is complete.
360        ///
361        /// The [`RemoveLegReason`] is mapped by the reactor to a
362        /// [`ConfluxHandshakeError`] that is sent to the initiator of the
363        /// handshake to indicate the reason the handshake failed.
364        reason: RemoveLegReason,
365    },
366    /// Take some event (blocking or unblocking a circuit, or sending padding)
367    /// based on the circuit padding backend code.
368    PaddingAction {
369        /// The leg on which to take the padding event .
370        leg: UniqId,
371        /// The event to take.
372        padding_event: PaddingEvent,
373    },
374    /// Protocol violation. This leads for now to the close of the circuit reactor. The
375    /// error causing the violation is set in err.
376    ProtoViolation {
377        /// The error that causes this protocol violation.
378        err: crate::Error,
379    },
380}
381
382impl CircuitEvent {
383    /// Return the ordering with which we should handle this event
384    /// within a list of events returned by a single call to next_circ_event().
385    ///
386    /// NOTE: Please do not make this any more complicated:
387    /// It is a consequence of a kludge that we need this sorting at all.
388    /// Assuming that eventually, we switch away from the current
389    /// poll-oriented `next_circ_event` design,
390    /// we may be able to get rid of this entirely.
391    fn order_within_batch(&self) -> u8 {
392        use CircuitEvent as CA;
393        use PaddingEvent as PE;
394        // This immediate state MUST NOT be used for events emitting cells. At the moment, it is
395        // only used by the protocol violation event which leads to a shutdown of the reactor.
396        const IMMEDIATE: u8 = 0;
397        const EARLY: u8 = 1;
398        const NORMAL: u8 = 2;
399        const LATE: u8 = 3;
400
401        // We use this ordering to move any "StartBlocking" to the _end_ of a batch and
402        // "StopBlocking" to the start.
403        //
404        // This way, we can be sure that we will handle any "send data" operations
405        // (and tell the Padder about them) _before_  we tell the Padder
406        // that we have blocked the circuit.
407        //
408        // This keeps things a bit more logical.
409        match self {
410            CA::RunCmd { .. } => NORMAL,
411            CA::HandleControl(..) => NORMAL,
412            CA::HandleCell { .. } => NORMAL,
413            CA::RemoveLeg { .. } => NORMAL,
414            #[cfg(feature = "circ-padding")]
415            CA::PaddingAction { padding_event, .. } => match padding_event {
416                PE::StopBlocking => EARLY,
417                PE::SendPadding(..) => NORMAL,
418                PE::StartBlocking(..) => LATE,
419            },
420            #[cfg(not(feature = "circ-padding"))]
421            CA::PaddingAction { .. } => NORMAL,
422            CA::ProtoViolation { .. } => IMMEDIATE,
423        }
424    }
425}
426
427/// An object that's waiting for a meta cell (one not associated with a stream) in order to make
428/// progress.
429///
430/// # Background
431///
432/// The `Reactor` can't have async functions that send and receive cells, because its job is to
433/// send and receive cells: if one of its functions tried to do that, it would just hang forever.
434///
435/// To get around this problem, the reactor can send some cells, and then make one of these
436/// `MetaCellHandler` objects, which will be run when the reply arrives.
437pub(crate) trait MetaCellHandler: Send {
438    /// The hop we're expecting the message to come from. This is compared against the hop
439    /// from which we actually receive messages, and an error is thrown if the two don't match.
440    fn expected_hop(&self) -> HopLocation;
441    /// Called when the message we were waiting for arrives.
442    ///
443    /// Gets a copy of the `Reactor` in order to do anything it likes there.
444    ///
445    /// If this function returns an error, the reactor will shut down.
446    fn handle_msg(
447        &mut self,
448        msg: UnparsedRelayMsg,
449        reactor: &mut Circuit,
450    ) -> Result<MetaCellDisposition>;
451}
452
453/// A possible successful outcome of giving a message to a [`MsgHandler`](super::msghandler::MsgHandler).
454#[derive(Debug, Clone)]
455#[cfg_attr(feature = "send-control-msg", visibility::make(pub))]
456#[non_exhaustive]
457pub(crate) enum MetaCellDisposition {
458    /// The message was consumed; the handler should remain installed.
459    #[cfg(feature = "send-control-msg")]
460    Consumed,
461    /// The message was consumed; the handler should be uninstalled.
462    ConversationFinished,
463    /// The message was consumed; the circuit should be closed.
464    #[cfg(feature = "send-control-msg")]
465    CloseCirc,
466    // TODO: Eventually we might want the ability to have multiple handlers
467    // installed, and to let them say "not for me, maybe for somebody else?".
468    // But right now we don't need that.
469}
470
471/// Unwrap the specified [`Option`], returning a [`ReactorError::Shutdown`] if it is `None`.
472///
473/// This is a macro instead of a function to work around borrowck errors
474/// in the select! from run_once().
475macro_rules! unwrap_or_shutdown {
476    ($self:expr, $res:expr, $reason:expr) => {{
477        match $res {
478            None => {
479                trace!(
480                    tunnel_id = %$self.tunnel_id,
481                    reason = %$reason,
482                    "reactor shutdown"
483                );
484                Err(ReactorError::Shutdown)
485            }
486            Some(v) => Ok(v),
487        }
488    }};
489}
490
491/// Object to handle incoming cells and background tasks on a circuit
492///
493/// This type is returned when you finish a circuit; you need to spawn a
494/// new task that calls `run()` on it.
495#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
496pub struct Reactor {
497    /// Receiver for control messages for this reactor, sent by `ClientCirc` objects.
498    ///
499    /// This channel is polled in [`Reactor::run_once`], but only if the `chan_sender` sink
500    /// is ready to accept cells.
501    control: mpsc::UnboundedReceiver<CtrlMsg>,
502    /// Receiver for command messages for this reactor, sent by `ClientCirc` objects.
503    ///
504    /// This channel is polled in [`Reactor::run_once`].
505    ///
506    /// NOTE: this is a separate channel from `control`, because some messages
507    /// have higher priority and need to be handled even if the `chan_sender` is not
508    /// ready (whereas `control` messages are not read until the `chan_sender` sink
509    /// is ready to accept cells).
510    command: mpsc::UnboundedReceiver<CtrlCmd>,
511    /// A oneshot sender that is used to alert other tasks when this reactor is
512    /// finally dropped.
513    ///
514    /// It is a sender for Void because we never actually want to send anything here;
515    /// we only want to generate canceled events.
516    #[allow(dead_code)] // the only purpose of this field is to be dropped.
517    reactor_closed_tx: oneshot::Sender<void::Void>,
518    /// A set of circuits that form a tunnel.
519    ///
520    /// Contains 1 or more circuits.
521    ///
522    /// Circuits may be added to this set throughout the lifetime of the reactor.
523    ///
524    /// Sometimes, the reactor will remove circuits from this set,
525    /// for example if the `LINKED` message takes too long to arrive,
526    /// or if congestion control negotiation fails.
527    /// The reactor will continue running with the remaining circuits.
528    /// It will shut down if *all* the circuits are removed.
529    ///
530    // TODO(conflux): document all the reasons why the reactor might
531    // chose to tear down a circuit or tunnel (timeouts, protocol violations, etc.)
532    circuits: ConfluxSet,
533    /// An identifier for logging about this tunnel reactor.
534    tunnel_id: TunnelId,
535    /// Handlers, shared with `Circuit`.
536    cell_handlers: CellHandlers,
537    /// The time provider, used for conflux handshake timeouts.
538    runtime: DynTimeProvider,
539    /// The conflux handshake context, if there is an on-going handshake.
540    ///
541    /// Set to `None` if this is a single-path tunnel,
542    /// or if none of the circuit legs from our conflux set
543    /// are currently in the conflux handshake phase.
544    #[cfg(feature = "conflux")]
545    conflux_hs_ctx: Option<ConfluxHandshakeCtx>,
546    /// A min-heap buffering all the out-of-order messages received so far.
547    ///
548    /// TODO(conflux): this becomes a DoS vector unless we impose a limit
549    /// on its size. We should make this participate in the memquota memory
550    /// tracking system, somehow.
551    #[cfg(feature = "conflux")]
552    ooo_msgs: BinaryHeap<ConfluxHeapEntry>,
553}
554
555/// The context for an on-going conflux handshake.
556#[cfg(feature = "conflux")]
557struct ConfluxHandshakeCtx {
558    /// A channel for notifying the caller of the outcome of a CONFLUX_LINK request.
559    answer: ConfluxLinkResultChannel,
560    /// The number of legs that are currently doing the handshake.
561    num_legs: usize,
562    /// The handshake results we have collected so far.
563    results: ConfluxHandshakeResult,
564}
565
566/// An out-of-order message buffered in [`Reactor::ooo_msgs`].
567#[derive(Debug)]
568#[cfg(feature = "conflux")]
569struct ConfluxHeapEntry {
570    /// The leg id this message came from.
571    leg_id: UniqId,
572    /// The out of order message
573    msg: OooRelayMsg,
574}
575
576#[cfg(feature = "conflux")]
577impl Ord for ConfluxHeapEntry {
578    fn cmp(&self, other: &Self) -> Ordering {
579        self.msg.cmp(&other.msg)
580    }
581}
582
583#[cfg(feature = "conflux")]
584impl PartialOrd for ConfluxHeapEntry {
585    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
586        Some(self.cmp(other))
587    }
588}
589
590#[cfg(feature = "conflux")]
591impl PartialEq for ConfluxHeapEntry {
592    fn eq(&self, other: &Self) -> bool {
593        self.msg == other.msg
594    }
595}
596
597#[cfg(feature = "conflux")]
598impl Eq for ConfluxHeapEntry {}
599
600/// Cell handlers, shared between the Reactor and its underlying `Circuit`s.
601struct CellHandlers {
602    /// A handler for a meta cell, together with a result channel to notify on completion.
603    ///
604    /// NOTE(prop349): this is part of Arti's "Base Circuit Hop Handler".
605    ///
606    /// Upon sending an EXTEND cell, the [`ControlHandler`] sets this handler
607    /// to [`CircuitExtender`](circuit::extender::CircuitExtender).
608    /// The handler is then used in [`Circuit::handle_meta_cell`] for handling
609    /// all the meta cells received on the circuit that are not SENDMEs or TRUNCATE
610    /// (which are handled separately) or conflux cells
611    /// (which are handled by the conflux handlers).
612    ///
613    /// The handler is uninstalled after the receipt of the EXTENDED cell,
614    /// so any subsequent EXTENDED cells will cause the circuit to be torn down.
615    meta_handler: Option<Box<dyn MetaCellHandler + Send>>,
616    /// A handler for incoming stream requests.
617    #[cfg(feature = "hs-service")]
618    incoming_stream_req_handler: Option<IncomingStreamRequestHandler>,
619}
620
621impl Reactor {
622    /// Create a new circuit reactor.
623    ///
624    /// The reactor will send outbound messages on `channel`, receive incoming
625    /// messages on `input`, and identify this circuit by the channel-local
626    /// [`CircId`] provided.
627    ///
628    /// The internal unique identifier for this circuit will be `unique_id`.
629    #[allow(clippy::type_complexity, clippy::too_many_arguments)] // TODO
630    pub(super) fn new(
631        channel: Arc<Channel>,
632        channel_id: CircId,
633        unique_id: UniqId,
634        input: CircuitRxReceiver,
635        runtime: DynTimeProvider,
636        memquota: CircuitAccount,
637        padding_ctrl: PaddingController,
638        padding_stream: PaddingEventStream,
639        timeouts: Arc<dyn TimeoutEstimator + Send>,
640    ) -> (
641        Self,
642        mpsc::UnboundedSender<CtrlMsg>,
643        mpsc::UnboundedSender<CtrlCmd>,
644        oneshot::Receiver<void::Void>,
645        Arc<TunnelMutableState>,
646    ) {
647        let tunnel_id = TunnelId::next();
648        let (control_tx, control_rx) = mpsc::unbounded();
649        let (command_tx, command_rx) = mpsc::unbounded();
650        let mutable = Arc::new(MutableState::default());
651
652        let (reactor_closed_tx, reactor_closed_rx) = oneshot::channel();
653
654        let cell_handlers = CellHandlers {
655            meta_handler: None,
656            #[cfg(feature = "hs-service")]
657            incoming_stream_req_handler: None,
658        };
659
660        let unique_id = TunnelScopedCircId::new(tunnel_id, unique_id);
661        let circuit_leg = Circuit::new(
662            runtime.clone(),
663            channel,
664            channel_id,
665            unique_id,
666            input,
667            memquota,
668            Arc::clone(&mutable),
669            padding_ctrl,
670            padding_stream,
671            timeouts,
672        );
673
674        let (circuits, mutable) = ConfluxSet::new(tunnel_id, circuit_leg);
675
676        let reactor = Reactor {
677            circuits,
678            control: control_rx,
679            command: command_rx,
680            reactor_closed_tx,
681            tunnel_id,
682            cell_handlers,
683            runtime,
684            #[cfg(feature = "conflux")]
685            conflux_hs_ctx: None,
686            #[cfg(feature = "conflux")]
687            ooo_msgs: Default::default(),
688        };
689
690        (reactor, control_tx, command_tx, reactor_closed_rx, mutable)
691    }
692
693    /// Launch the reactor, and run until the circuit closes or we
694    /// encounter an error.
695    ///
696    /// Once this method returns, the circuit is dead and cannot be
697    /// used again.
698    #[instrument(level = "trace", skip_all)]
699    pub async fn run(mut self) -> Result<()> {
700        trace!(tunnel_id = %self.tunnel_id, "Running tunnel reactor");
701        let result: Result<()> = loop {
702            match self.run_once().await {
703                Ok(()) => (),
704                Err(ReactorError::Shutdown) => break Ok(()),
705                Err(ReactorError::Err(e)) => break Err(e),
706            }
707        };
708
709        // Log that the reactor stopped, possibly with the associated error as a report.
710        // May log at a higher level depending on the error kind.
711        const MSG: &str = "Tunnel reactor stopped";
712        match &result {
713            Ok(()) => trace!(tunnel_id = %self.tunnel_id, "{MSG}"),
714            Err(e) => debug_report!(e, tunnel_id = %self.tunnel_id, "{MSG}"),
715        }
716
717        result
718    }
719
720    /// Helper for run: doesn't mark the circuit closed on finish.  Only
721    /// processes one cell or control message.
722    #[instrument(level = "trace", skip_all)]
723    async fn run_once(&mut self) -> StdResult<(), ReactorError> {
724        // If all the circuits are closed, shut down the reactor
725        if self.circuits.is_empty() {
726            trace!(
727                tunnel_id = %self.tunnel_id,
728                "Tunnel reactor shutting down: all circuits have closed",
729            );
730
731            return Err(ReactorError::Shutdown);
732        }
733
734        // If this is a single path circuit, we need to wait until the first hop
735        // is created before doing anything else
736        let single_path_with_hops = self
737            .circuits
738            .single_leg_mut()
739            .is_ok_and(|leg| !leg.has_hops());
740        if single_path_with_hops {
741            self.wait_for_create().await?;
742
743            return Ok(());
744        }
745
746        // Prioritize the buffered messages.
747        //
748        // Note: if any of the messages are ready to be handled,
749        // this will block the reactor until we are done processing them
750        //
751        // TODO circpad: If this is a problem, we might want to re-order things so that we
752        // prioritize padding instead.  On the other hand, this should be fixed by refactoring
753        // circuit and tunnel reactors, so we might do well to just leave it alone for now.
754        #[cfg(feature = "conflux")]
755        self.try_dequeue_ooo_msgs().await?;
756
757        let mut events = select_biased! {
758            res = self.command.next() => {
759                let cmd = unwrap_or_shutdown!(self, res, "command channel drop")?;
760                return ControlHandler::new(self).handle_cmd(cmd);
761            },
762            // Check whether we've got a control message pending.
763            //
764            // Note: unfortunately, reading from control here means we might start
765            // handling control messages before our chan_senders are ready.
766            // With the current design, this is inevitable: we can't know which circuit leg
767            // a control message is meant for without first reading the control message from
768            // the channel, and at that point, we can't know for sure whether that particular
769            // circuit is ready for sending.
770            ret = self.control.next() => {
771                let msg = unwrap_or_shutdown!(self, ret, "control drop")?;
772                smallvec![CircuitEvent::HandleControl(msg)]
773            },
774            res = self.circuits.next_circ_event(&self.runtime).fuse() => res?,
775        };
776
777        // Put the events into the order that we need to execute them in.
778        //
779        // (Yes, this _does_ have to be a stable sort.  Not all events may be freely re-ordered
780        // with respect to one another.)
781        events.sort_by_key(|a| a.order_within_batch());
782
783        for event in events {
784            let cmd = match event {
785                CircuitEvent::RunCmd { leg, cmd } => Some(RunOnceCmd::Single(
786                    RunOnceCmdInner::from_circuit_cmd(leg, cmd),
787                )),
788                CircuitEvent::HandleControl(ctrl) => ControlHandler::new(self)
789                    .handle_msg(ctrl)?
790                    .map(RunOnceCmd::Single),
791                CircuitEvent::HandleCell { leg, cell } => {
792                    let circ = self
793                        .circuits
794                        .leg_mut(leg)
795                        .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
796
797                    let circ_cmds = circ.handle_cell(&mut self.cell_handlers, leg, cell)?;
798                    if circ_cmds.is_empty() {
799                        None
800                    } else {
801                        // TODO: we return RunOnceCmd::Multiple even if there's a single command.
802                        //
803                        // See the TODO on `Circuit::handle_cell`.
804                        let cmd = RunOnceCmd::Multiple(
805                            circ_cmds
806                                .into_iter()
807                                .map(|cmd| RunOnceCmdInner::from_circuit_cmd(leg, cmd))
808                                .collect(),
809                        );
810
811                        Some(cmd)
812                    }
813                }
814                CircuitEvent::RemoveLeg { leg, reason } => {
815                    Some(RunOnceCmdInner::RemoveLeg { leg, reason }.into())
816                }
817                CircuitEvent::PaddingAction { leg, padding_event } => {
818                    cfg_if! {
819                        if #[cfg(feature = "circ-padding")] {
820                            Some(RunOnceCmdInner::PaddingAction { leg, padding_event }.into())
821                        } else {
822                            // If padding isn't enabled, we never generate a padding event,
823                            // so we can be sure this case will never be called.
824                            void::unreachable(padding_event.0);
825                        }
826                    }
827                }
828                CircuitEvent::ProtoViolation { err } => {
829                    return Err(err.into());
830                }
831            };
832
833            if let Some(cmd) = cmd {
834                self.handle_run_once_cmd(cmd).await?;
835            }
836        }
837
838        Ok(())
839    }
840
841    /// Try to process the previously-out-of-order messages we might have buffered.
842    #[cfg(feature = "conflux")]
843    #[instrument(level = "trace", skip_all)]
844    async fn try_dequeue_ooo_msgs(&mut self) -> StdResult<(), ReactorError> {
845        // Check if we're ready to dequeue any of the previously out-of-order cells.
846        while let Some(entry) = self.ooo_msgs.peek() {
847            let should_pop = self.circuits.is_seqno_in_order(entry.msg.seqno);
848
849            if !should_pop {
850                break;
851            }
852
853            let entry = self.ooo_msgs.pop().expect("item just disappeared?!");
854
855            let circ = self
856                .circuits
857                .leg_mut(entry.leg_id)
858                .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
859            let handlers = &mut self.cell_handlers;
860            let cmd = circ
861                .handle_in_order_relay_msg(
862                    handlers,
863                    entry.msg.hopnum,
864                    entry.leg_id,
865                    entry.msg.cell_counts_towards_windows,
866                    entry.msg.streamid,
867                    entry.msg.msg,
868                )?
869                .map(|cmd| {
870                    RunOnceCmd::Single(RunOnceCmdInner::from_circuit_cmd(entry.leg_id, cmd))
871                });
872
873            if let Some(cmd) = cmd {
874                self.handle_run_once_cmd(cmd).await?;
875            }
876        }
877
878        Ok(())
879    }
880
881    /// Handle a [`RunOnceCmd`].
882    #[instrument(level = "trace", skip_all)]
883    async fn handle_run_once_cmd(&mut self, cmd: RunOnceCmd) -> StdResult<(), ReactorError> {
884        match cmd {
885            RunOnceCmd::Single(cmd) => return self.handle_single_run_once_cmd(cmd).await,
886            RunOnceCmd::Multiple(cmds) => {
887                // While we know `sendable` is ready to accept *one* cell,
888                // we can't be certain it will be able to accept *all* of the cells
889                // that need to be sent here. This means we *may* end up buffering
890                // in its underlying SometimesUnboundedSink! That is OK, because
891                // RunOnceCmd::Multiple is only used for handling packed cells.
892                for cmd in cmds {
893                    self.handle_single_run_once_cmd(cmd).await?;
894                }
895            }
896        }
897
898        Ok(())
899    }
900
901    /// Handle a [`RunOnceCmd`].
902    #[instrument(level = "trace", skip_all)]
903    async fn handle_single_run_once_cmd(
904        &mut self,
905        cmd: RunOnceCmdInner,
906    ) -> StdResult<(), ReactorError> {
907        match cmd {
908            RunOnceCmdInner::Send { leg, cell, done } => {
909                // TODO: check the cc window
910                let res = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
911                if let Some(done) = done {
912                    // Don't care if the receiver goes away
913                    let _ = done.send(res.clone());
914                }
915                res?;
916            }
917            #[cfg(feature = "send-control-msg")]
918            RunOnceCmdInner::SendMsgAndInstallHandler { msg, handler, done } => {
919                let cell: Result<Option<SendRelayCell>> =
920                    self.prepare_msg_and_install_handler(msg, handler);
921
922                match cell {
923                    Ok(Some(cell)) => {
924                        // TODO(conflux): let the RunOnceCmdInner specify which leg to send the cell on
925                        let outcome = self.circuits.send_relay_cell_on_leg(cell, None).await;
926                        // don't care if receiver goes away.
927                        let _ = done.send(outcome.clone());
928                        outcome?;
929                    }
930                    Ok(None) => {
931                        // don't care if receiver goes away.
932                        let _ = done.send(Ok(()));
933                    }
934                    Err(e) => {
935                        // don't care if receiver goes away.
936                        let _ = done.send(Err(e.clone()));
937                        return Err(e.into());
938                    }
939                }
940            }
941            RunOnceCmdInner::BeginStream {
942                leg,
943                cell,
944                hop,
945                done,
946            } => {
947                match cell {
948                    Ok((cell, stream_id)) => {
949                        let circ = self
950                            .circuits
951                            .leg_mut(leg)
952                            .ok_or_else(|| internal!("leg disappeared?!"))?;
953                        let cell_hop = cell.hop.expect("missing hop in client SendRelayCell?!");
954                        let relay_format = circ
955                            .hop_mut(cell_hop)
956                            // TODO: Is this the right error type here? Or should there be a "HopDisappeared"?
957                            .ok_or(Error::NoSuchHop)?
958                            .relay_cell_format();
959
960                        let outcome = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
961                        // don't care if receiver goes away.
962                        let _ = done.send(outcome.clone().map(|_| (stream_id, hop, relay_format)));
963                        outcome?;
964                    }
965                    Err(e) => {
966                        // don't care if receiver goes away.
967                        let _ = done.send(Err(e.clone()));
968                        return Err(e.into());
969                    }
970                }
971            }
972            RunOnceCmdInner::CloseStream {
973                hop,
974                sid,
975                behav,
976                reason,
977                done,
978            } => {
979                let result = {
980                    let (leg_id, hop_num) = self
981                        .resolve_hop_location(hop)
982                        .map_err(into_bad_api_usage!("Could not resolve {hop:?}"))?;
983                    let leg = self
984                        .circuits
985                        .leg_mut(leg_id)
986                        .ok_or(bad_api_usage!("No leg for id {:?}", leg_id))?;
987                    Ok::<_, Bug>((leg, hop_num))
988                };
989
990                let (leg, hop_num) = match result {
991                    Ok(x) => x,
992                    Err(e) => {
993                        if let Some(done) = done {
994                            // don't care if the sender goes away
995                            let e = into_bad_api_usage!("Could not resolve {hop:?}")(e);
996                            let _ = done.send(Err(e.into()));
997                        }
998                        return Ok(());
999                    }
1000                };
1001
1002                let max_rtt = {
1003                    let hop = leg
1004                        .hop(hop_num)
1005                        .ok_or_else(|| internal!("the hop we resolved disappeared?!"))?;
1006                    let ccontrol = hop.ccontrol();
1007
1008                    // Note: if we have no measurements for the RTT, this will be set to 0,
1009                    // and the timeout will be 2 * CBT.
1010                    ccontrol
1011                        .rtt()
1012                        .max_rtt_usec()
1013                        .map(|rtt| Duration::from_millis(u64::from(rtt)))
1014                        .unwrap_or_default()
1015                };
1016
1017                // The length of the circuit up until the hop that has the half-streeam.
1018                //
1019                // +1, because HopNums are zero-based.
1020                let circ_len = usize::from(hop_num) + 1;
1021
1022                // We double the CBT to account for rend circuits,
1023                // which are twice as long (otherwise we risk expiring
1024                // the rend half-streams too soon).
1025                let timeout = std::cmp::max(max_rtt, 2 * leg.estimate_cbt(circ_len));
1026                let expire_at = self.runtime.now() + timeout;
1027
1028                let res: Result<()> = leg
1029                    .close_stream(hop_num, sid, behav, reason, expire_at)
1030                    .await;
1031
1032                if let Some(done) = done {
1033                    // don't care if the sender goes away
1034                    let _ = done.send(res);
1035                }
1036            }
1037            RunOnceCmdInner::MaybeSendXon {
1038                rate,
1039                stream_id,
1040                hop,
1041            } => {
1042                let (leg_id, hop_num) = match self.resolve_hop_location(hop) {
1043                    Ok(x) => x,
1044                    Err(NoJoinPointError) => {
1045                        // A stream tried to send an XON message message to the join point of
1046                        // a tunnel that has never had a join point. Currently in arti, only a
1047                        // `StreamTarget` asks us to send an XON message, and this tunnel
1048                        // originally created the `StreamTarget` to begin with. So this is a
1049                        // legitimate bug somewhere in the tunnel code.
1050                        return Err(
1051                            internal!(
1052                                "Could not send an XON message to a join point on a tunnel without a join point",
1053                            )
1054                            .into()
1055                        );
1056                    }
1057                };
1058
1059                let Some(leg) = self.circuits.leg_mut(leg_id) else {
1060                    // The leg has disappeared. This is fine since the stream may have ended and
1061                    // been cleaned up while this `CtrlMsg::MaybeSendXon` message was queued.
1062                    // It is possible that is a bug and this is an incorrect leg number, but
1063                    // it's not currently possible to differentiate between an incorrect leg
1064                    // number and a tunnel leg that has been closed.
1065                    debug!("Could not send an XON message on a leg that does not exist. Ignoring.");
1066                    return Ok(());
1067                };
1068
1069                let Some(hop) = leg.hop_mut(hop_num) else {
1070                    // The hop has disappeared. This is fine since the circuit may have been
1071                    // been truncated while the `CtrlMsg::MaybeSendXon` message was queued.
1072                    // It is possible that is a bug and this is an incorrect hop number, but
1073                    // it's not currently possible to differentiate between an incorrect hop
1074                    // number and a circuit hop that has been removed.
1075                    debug!("Could not send an XON message on a hop that does not exist. Ignoring.");
1076                    return Ok(());
1077                };
1078
1079                let Some(msg) = hop.maybe_send_xon(rate, stream_id)? else {
1080                    // Nothing to do.
1081                    return Ok(());
1082                };
1083
1084                let cell = AnyRelayMsgOuter::new(Some(stream_id), msg.into());
1085                let cell = SendRelayCell {
1086                    hop: Some(hop_num),
1087                    early: false,
1088                    cell,
1089                };
1090
1091                leg.send_relay_cell(cell).await?;
1092            }
1093            RunOnceCmdInner::HandleSendMe { leg, hop, sendme } => {
1094                let leg = self
1095                    .circuits
1096                    .leg_mut(leg)
1097                    .ok_or_else(|| internal!("leg disappeared?!"))?;
1098                // NOTE: it's okay to await. We are only awaiting on the congestion_signals
1099                // future which *should* resolve immediately
1100                let signals = leg.chan_sender.congestion_signals().await;
1101                leg.handle_sendme(hop, sendme, signals)?;
1102            }
1103            RunOnceCmdInner::FirstHopClockSkew { answer } => {
1104                let res = self.circuits.single_leg_mut().map(|leg| leg.clock_skew());
1105
1106                // don't care if the sender goes away
1107                let _ = answer.send(res.map_err(Into::into));
1108            }
1109            RunOnceCmdInner::CleanShutdown => {
1110                trace!(tunnel_id = %self.tunnel_id, "reactor shutdown due to handled cell");
1111                return Err(ReactorError::Shutdown);
1112            }
1113            RunOnceCmdInner::RemoveLeg { leg, reason } => {
1114                warn!(tunnel_id = %self.tunnel_id, reason = %reason, "removing circuit leg");
1115
1116                let circ = self.circuits.remove(leg)?;
1117                let is_conflux_pending = circ.is_conflux_pending();
1118
1119                // Drop the removed leg. This will cause it to close if it's not already closed.
1120                drop(circ);
1121
1122                // If we reach this point, it means we have more than one leg
1123                // (otherwise the .remove() would've returned a Shutdown error),
1124                // so we expect there to be a ConfluxHandshakeContext installed.
1125
1126                #[cfg(feature = "conflux")]
1127                if is_conflux_pending {
1128                    let (error, proto_violation): (_, Option<Error>) = match &reason {
1129                        RemoveLegReason::ConfluxHandshakeTimeout => {
1130                            (ConfluxHandshakeError::Timeout, None)
1131                        }
1132                        RemoveLegReason::ConfluxHandshakeErr(e) => {
1133                            (ConfluxHandshakeError::Link(e.clone()), Some(e.clone()))
1134                        }
1135                        RemoveLegReason::ChannelClosed => {
1136                            (ConfluxHandshakeError::ChannelClosed, None)
1137                        }
1138                    };
1139
1140                    self.note_conflux_handshake_result(Err(error), proto_violation.is_some())?;
1141
1142                    if let Some(e) = proto_violation {
1143                        tor_error::warn_report!(
1144                            e,
1145                            tunnel_id = %self.tunnel_id,
1146                            "Malformed conflux handshake, tearing down tunnel",
1147                        );
1148
1149                        return Err(e.into());
1150                    }
1151                }
1152            }
1153            #[cfg(feature = "conflux")]
1154            RunOnceCmdInner::ConfluxHandshakeComplete { leg, cell } => {
1155                // Note: on the client-side, the handshake is considered complete once the
1156                // RELAY_CONFLUX_LINKED_ACK is sent (roughly upon receipt of the LINKED cell).
1157                //
1158                // We're optimistic here, and declare the handshake a success *before*
1159                // sending the LINKED_ACK response. I think this is OK though,
1160                // because if the send_relay_cell() below fails, the reactor will shut
1161                // down anyway. OTOH, marking the handshake as complete slightly early
1162                // means that on the happy path, the circuit is marked as usable sooner,
1163                // instead of blocking on the sending of the LINKED_ACK.
1164                self.note_conflux_handshake_result(Ok(()), false)?;
1165
1166                let res = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
1167
1168                res?;
1169            }
1170            #[cfg(feature = "conflux")]
1171            RunOnceCmdInner::Link { circuits, answer } => {
1172                // Add the specified circuits to our conflux set,
1173                // and send a LINK cell down each unlinked leg.
1174                //
1175                // NOTE: this will block the reactor until all the cells are sent.
1176                self.handle_link_circuits(circuits, answer).await?;
1177            }
1178            #[cfg(feature = "conflux")]
1179            RunOnceCmdInner::Enqueue { leg, msg } => {
1180                let entry = ConfluxHeapEntry { leg_id: leg, msg };
1181                self.ooo_msgs.push(entry);
1182            }
1183            #[cfg(feature = "circ-padding")]
1184            RunOnceCmdInner::PaddingAction { leg, padding_event } => {
1185                // TODO: If we someday move back to having a per-circuit reactor,
1186                // this event would logically belong there, not on the tunnel reactor.
1187                self.circuits.run_padding_event(leg, padding_event).await?;
1188            }
1189        }
1190
1191        Ok(())
1192    }
1193
1194    /// Wait for a [`CtrlMsg::Create`] to come along to set up the circuit.
1195    ///
1196    /// Returns an error if an unexpected `CtrlMsg` is received.
1197    #[instrument(level = "trace", skip_all)]
1198    async fn wait_for_create(&mut self) -> StdResult<(), ReactorError> {
1199        let msg = select_biased! {
1200            res = self.command.next() => {
1201                let cmd = unwrap_or_shutdown!(self, res, "shutdown channel drop")?;
1202                match cmd {
1203                    CtrlCmd::Shutdown => return self.handle_shutdown().map(|_| ()),
1204                    #[cfg(test)]
1205                    CtrlCmd::AddFakeHop {
1206                        relay_cell_format: format,
1207                        fwd_lasthop,
1208                        rev_lasthop,
1209                        peer_id,
1210                        params,
1211                        done,
1212                    } => {
1213                        let leg = self.circuits.single_leg_mut()?;
1214                        leg.handle_add_fake_hop(format, fwd_lasthop, rev_lasthop, peer_id, &params, done);
1215                        return Ok(())
1216                    },
1217                    _ => {
1218                        trace!("reactor shutdown due to unexpected command: {:?}", cmd);
1219                        return Err(Error::CircProto(format!("Unexpected control {cmd:?} on client circuit")).into());
1220                    }
1221                }
1222            },
1223            res = self.control.next() => unwrap_or_shutdown!(self, res, "control drop")?,
1224        };
1225
1226        match msg {
1227            CtrlMsg::Create {
1228                recv_created,
1229                handshake,
1230                settings,
1231                done,
1232            } => {
1233                // TODO(conflux): instead of crashing the reactor, it might be better
1234                // to send the error via the done channel instead
1235                let leg = self.circuits.single_leg_mut()?;
1236                leg.handle_create(recv_created, handshake, settings, done)
1237                    .await
1238            }
1239            _ => {
1240                trace!("reactor shutdown due to unexpected cell: {:?}", msg);
1241
1242                Err(Error::CircProto(format!("Unexpected {msg:?} cell on client circuit")).into())
1243            }
1244        }
1245    }
1246
1247    /// Add the specified handshake result to our `ConfluxHandshakeContext`.
1248    ///
1249    /// If all the circuits we were waiting on have finished the conflux handshake,
1250    /// the `ConfluxHandshakeContext` is consumed, and the results we have collected
1251    /// are sent to the handshake initiator.
1252    #[cfg(feature = "conflux")]
1253    #[instrument(level = "trace", skip_all)]
1254    fn note_conflux_handshake_result(
1255        &mut self,
1256        res: StdResult<(), ConfluxHandshakeError>,
1257        reactor_is_closing: bool,
1258    ) -> StdResult<(), ReactorError> {
1259        let tunnel_complete = match self.conflux_hs_ctx.as_mut() {
1260            Some(conflux_ctx) => {
1261                conflux_ctx.results.push(res);
1262                // Whether all the legs have finished linking:
1263                conflux_ctx.results.len() == conflux_ctx.num_legs
1264            }
1265            None => {
1266                return Err(internal!("no conflux handshake context").into());
1267            }
1268        };
1269
1270        if tunnel_complete || reactor_is_closing {
1271            // Time to remove the conflux handshake context
1272            // and extract the results we have collected
1273            let conflux_ctx = self.conflux_hs_ctx.take().expect("context disappeared?!");
1274
1275            let success_count = conflux_ctx.results.iter().filter(|res| res.is_ok()).count();
1276            let leg_count = conflux_ctx.results.len();
1277
1278            info!(
1279                tunnel_id = %self.tunnel_id,
1280                "conflux tunnel ready ({success_count}/{leg_count} circuits successfully linked)",
1281            );
1282
1283            send_conflux_outcome(conflux_ctx.answer, Ok(conflux_ctx.results))?;
1284
1285            // We don't expect to receive any more handshake results,
1286            // at least not until we get another LinkCircuits control message,
1287            // which will install a new ConfluxHandshakeCtx with a channel
1288            // for us to send updates on
1289        }
1290
1291        Ok(())
1292    }
1293
1294    /// Prepare a `SendRelayCell` request, and install the given meta-cell handler.
1295    fn prepare_msg_and_install_handler(
1296        &mut self,
1297        msg: Option<AnyRelayMsgOuter>,
1298        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
1299    ) -> Result<Option<SendRelayCell>> {
1300        let msg = msg
1301            .map(|msg| {
1302                let handlers = &mut self.cell_handlers;
1303                let handler = handler
1304                    .as_ref()
1305                    .or(handlers.meta_handler.as_ref())
1306                    .ok_or_else(|| internal!("tried to use an ended Conversation"))?;
1307                // We should always have a precise HopLocation here so this should never fails but
1308                // in case we have a ::JointPoint, we'll notice.
1309                let hop = handler.expected_hop().hop_num().ok_or(bad_api_usage!(
1310                    "MsgHandler doesn't have a precise HopLocation"
1311                ))?;
1312                Ok::<_, crate::Error>(SendRelayCell {
1313                    hop: Some(hop),
1314                    early: false,
1315                    cell: msg,
1316                })
1317            })
1318            .transpose()?;
1319
1320        if let Some(handler) = handler {
1321            self.cell_handlers.set_meta_handler(handler)?;
1322        }
1323
1324        Ok(msg)
1325    }
1326
1327    /// Handle a shutdown request.
1328    fn handle_shutdown(&self) -> StdResult<Option<RunOnceCmdInner>, ReactorError> {
1329        trace!(
1330            tunnel_id = %self.tunnel_id,
1331            "reactor shutdown due to explicit request",
1332        );
1333
1334        Err(ReactorError::Shutdown)
1335    }
1336
1337    /// Handle a request to shutdown the reactor and return the only [`Circuit`] in this tunnel.
1338    ///
1339    /// Returns an error over the `answer` channel if the reactor has no circuits,
1340    /// or more than one circuit. The reactor will shut down regardless.
1341    #[cfg(feature = "conflux")]
1342    fn handle_shutdown_and_return_circuit(
1343        &mut self,
1344        answer: oneshot::Sender<StdResult<Circuit, Bug>>,
1345    ) -> StdResult<(), ReactorError> {
1346        // Don't care if the receiver goes away
1347        let _ = answer.send(self.circuits.take_single_leg());
1348        self.handle_shutdown().map(|_| ())
1349    }
1350
1351    /// Resolves a [`TargetHop`] to a [`HopLocation`].
1352    ///
1353    /// After resolving a `TargetHop::LastHop`,
1354    /// the `HopLocation` can become stale if a single-path circuit is later extended or truncated.
1355    /// This means that the `HopLocation` can become stale from one reactor iteration to the next.
1356    ///
1357    /// It's generally okay to hold on to a (possibly stale) `HopLocation`
1358    /// if you need a fixed hop position in the tunnel.
1359    /// For example if we open a stream to `TargetHop::LastHop`,
1360    /// we would want to store the stream position as a `HopLocation` and not a `TargetHop::LastHop`
1361    /// as we don't want the stream position to change as the tunnel is extended or truncated.
1362    ///
1363    /// Returns [`NoHopsBuiltError`] if trying to resolve `TargetHop::LastHop`
1364    /// and the tunnel has no hops
1365    /// (either has no legs, or has legs which contain no hops).
1366    fn resolve_target_hop(&self, hop: TargetHop) -> StdResult<HopLocation, NoHopsBuiltError> {
1367        match hop {
1368            TargetHop::Hop(hop) => Ok(hop),
1369            TargetHop::LastHop => {
1370                if let Ok(leg) = self.circuits.single_leg() {
1371                    let leg_id = leg.unique_id();
1372                    // single-path tunnel
1373                    let hop = leg.last_hop_num().ok_or(NoHopsBuiltError)?;
1374                    Ok(HopLocation::Hop((leg_id, hop)))
1375                } else if !self.circuits.is_empty() {
1376                    // multi-path tunnel
1377                    Ok(HopLocation::JoinPoint)
1378                } else {
1379                    // no legs
1380                    Err(NoHopsBuiltError)
1381                }
1382            }
1383        }
1384    }
1385
1386    /// Resolves a [`HopLocation`] to a [`UniqId`] and [`HopNum`].
1387    ///
1388    /// After resolving a `HopLocation::JoinPoint`,
1389    /// the [`UniqId`] and [`HopNum`] can become stale if the primary leg changes.
1390    ///
1391    /// You should try to only resolve to a specific [`UniqId`] and [`HopNum`] immediately before you
1392    /// need them,
1393    /// and you should not hold on to the resolved [`UniqId`] and [`HopNum`] between reactor
1394    /// iterations as the primary leg may change from one iteration to the next.
1395    ///
1396    /// Returns [`NoJoinPointError`] if trying to resolve `HopLocation::JoinPoint`
1397    /// but it does not have a join point.
1398    #[instrument(level = "trace", skip_all)]
1399    fn resolve_hop_location(
1400        &self,
1401        hop: HopLocation,
1402    ) -> StdResult<(UniqId, HopNum), NoJoinPointError> {
1403        match hop {
1404            HopLocation::Hop((leg_id, hop_num)) => Ok((leg_id, hop_num)),
1405            HopLocation::JoinPoint => {
1406                if let Some((leg_id, hop_num)) = self.circuits.primary_join_point() {
1407                    Ok((leg_id, hop_num))
1408                } else {
1409                    // Attempted to get the join point of a non-multipath tunnel.
1410                    Err(NoJoinPointError)
1411                }
1412            }
1413        }
1414    }
1415
1416    /// Resolve a [`TargetHop`] directly into a [`UniqId`] and [`HopNum`].
1417    ///
1418    /// This is a helper function that basically calls both resolve_target_hop and
1419    /// resolve_hop_location back to back.
1420    ///
1421    /// It returns None on failure to resolve meaning that if you want more detailed error on why
1422    /// it failed, explicitly use the resolve_hop_location() and resolve_target_hop() functions.
1423    pub(crate) fn target_hop_to_hopnum_id(&self, hop: TargetHop) -> Option<(UniqId, HopNum)> {
1424        self.resolve_target_hop(hop)
1425            .ok()
1426            .and_then(|resolved| self.resolve_hop_location(resolved).ok())
1427    }
1428
1429    /// Install or remove a padder at a given hop.
1430    #[cfg(feature = "circ-padding-manual")]
1431    fn set_padding_at_hop(
1432        &self,
1433        hop: HopLocation,
1434        padder: Option<super::circuit::padding::CircuitPadder>,
1435    ) -> Result<()> {
1436        let HopLocation::Hop((leg_id, hop_num)) = hop else {
1437            return Err(bad_api_usage!("Padding to the join point is not supported.").into());
1438        };
1439        let circ = self.circuits.leg(leg_id).ok_or(Error::NoSuchHop)?;
1440        circ.set_padding_at_hop(hop_num, padder)?;
1441        Ok(())
1442    }
1443
1444    /// Does congestion control use stream SENDMEs for the given hop?
1445    ///
1446    /// Returns `None` if either the `leg` or `hop` don't exist.
1447    fn uses_stream_sendme(&self, leg: UniqId, hop: HopNum) -> Option<bool> {
1448        self.circuits.uses_stream_sendme(leg, hop)
1449    }
1450
1451    /// Handle a request to link some extra circuits in the reactor's conflux set.
1452    ///
1453    /// The circuits are validated, and if they do not have the same length,
1454    /// or if they do not all have the same last hop, an error is returned on
1455    /// the `answer` channel, and the conflux handshake is *not* initiated.
1456    ///
1457    /// If validation succeeds, the circuits are added to this reactor's conflux set,
1458    /// and the conflux handshake is initiated (by sending a LINK cell on each leg).
1459    ///
1460    /// NOTE: this blocks the reactor main loop until all the cells are sent.
1461    #[cfg(feature = "conflux")]
1462    #[instrument(level = "trace", skip_all)]
1463    async fn handle_link_circuits(
1464        &mut self,
1465        circuits: Vec<Circuit>,
1466        answer: ConfluxLinkResultChannel,
1467    ) -> StdResult<(), ReactorError> {
1468        use tor_error::warn_report;
1469
1470        if self.conflux_hs_ctx.is_some() {
1471            let err = internal!("conflux linking already in progress");
1472            send_conflux_outcome(answer, Err(err.into()))?;
1473
1474            return Ok(());
1475        }
1476
1477        let unlinked_legs = self.circuits.num_unlinked();
1478
1479        // We need to send the LINK cell on each of the new circuits
1480        // and on each of the existing, unlinked legs from self.circuits.
1481        //
1482        // In reality, there can only be one such circuit
1483        // (the "initial" one from the previously single-path tunnel),
1484        // because any circuits that to complete the conflux handshake
1485        // get removed from the set.
1486        let num_legs = circuits.len() + unlinked_legs;
1487
1488        // Note: add_legs validates `circuits`
1489        let res = async {
1490            self.circuits.add_legs(circuits, &self.runtime)?;
1491            self.circuits.link_circuits(&self.runtime).await
1492        }
1493        .await;
1494
1495        if let Err(e) = res {
1496            warn_report!(e, "Failed to link conflux circuits");
1497
1498            send_conflux_outcome(answer, Err(e))?;
1499        } else {
1500            // Save the channel, to notify the user of completion.
1501            self.conflux_hs_ctx = Some(ConfluxHandshakeCtx {
1502                answer,
1503                num_legs,
1504                results: Default::default(),
1505            });
1506        }
1507
1508        Ok(())
1509    }
1510}
1511
1512/// Notify the conflux handshake initiator of the handshake outcome.
1513///
1514/// Returns an error if the initiator has done away.
1515#[cfg(feature = "conflux")]
1516fn send_conflux_outcome(
1517    tx: ConfluxLinkResultChannel,
1518    res: Result<ConfluxHandshakeResult>,
1519) -> StdResult<(), ReactorError> {
1520    if tx.send(res).is_err() {
1521        tracing::warn!("conflux initiator went away before handshake completed?");
1522        return Err(ReactorError::Shutdown);
1523    }
1524
1525    Ok(())
1526}
1527
1528/// The tunnel does not have any hops.
1529#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
1530#[non_exhaustive]
1531#[error("no hops have been built for this tunnel")]
1532pub(crate) struct NoHopsBuiltError;
1533
1534/// The tunnel does not have a join point.
1535#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
1536#[non_exhaustive]
1537#[error("the tunnel does not have a join point")]
1538pub(crate) struct NoJoinPointError;
1539
1540impl CellHandlers {
1541    /// Try to install a given meta-cell handler to receive any unusual cells on
1542    /// this circuit, along with a result channel to notify on completion.
1543    fn set_meta_handler(&mut self, handler: Box<dyn MetaCellHandler + Send>) -> Result<()> {
1544        if self.meta_handler.is_none() {
1545            self.meta_handler = Some(handler);
1546            Ok(())
1547        } else {
1548            Err(Error::from(internal!(
1549                "Tried to install a meta-cell handler before the old one was gone."
1550            )))
1551        }
1552    }
1553
1554    /// Try to install a given cell handler on this circuit.
1555    #[cfg(feature = "hs-service")]
1556    fn set_incoming_stream_req_handler(
1557        &mut self,
1558        handler: IncomingStreamRequestHandler,
1559    ) -> Result<()> {
1560        if self.incoming_stream_req_handler.is_none() {
1561            self.incoming_stream_req_handler = Some(handler);
1562            Ok(())
1563        } else {
1564            Err(Error::from(internal!(
1565                "Tried to install a BEGIN cell handler before the old one was gone."
1566            )))
1567        }
1568    }
1569}
1570
1571#[cfg(test)]
1572mod test {
1573    // Tested in [`crate::client::circuit::test`].
1574}