tor_proto/client/
reactor.rs

1//! Code to handle incoming cells on a circuit.
2//!
3//! ## On message validation
4//!
5//! There are three steps for validating an incoming message on a stream:
6//!
7//! 1. Is the message contextually appropriate? (e.g., no more than one
8//!    `CONNECTED` message per stream.) This is handled by calling
9//!    [`CmdChecker::check_msg`](crate::stream::cmdcheck::CmdChecker::check_msg).
10//! 2. Does the message comply with flow-control rules? (e.g., no more SENDMEs
11//!    than we've sent data for.) This is handled within the reactor by the
12//!    `StreamFlowCtrl`. For half-closed streams which don't send stream
13//!    SENDMEs, an additional receive-window check is performed in the
14//!    `halfstream` module.
15//! 3. Does the message have an acceptable command type, and is the message
16//!    well-formed? For open streams, the streams themselves handle this check.
17//!    For half-closed streams, the reactor handles it by calling
18//!    `consume_checked_msg()`.
19
20pub(crate) mod circuit;
21mod conflux;
22mod control;
23pub(super) mod syncview;
24
25use crate::circuit::UniqId;
26use crate::circuit::celltypes::ClientCircChanMsg;
27use crate::circuit::circhop::SendRelayCell;
28use crate::client::circuit::padding::{PaddingController, PaddingEvent, PaddingEventStream};
29use crate::client::circuit::{CircuitRxReceiver, TimeoutEstimator};
30#[cfg(feature = "hs-service")]
31use crate::client::stream::{IncomingStreamRequest, IncomingStreamRequestFilter};
32use crate::client::{HopLocation, TargetHop};
33use crate::crypto::cell::HopNum;
34use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
35use crate::memquota::{CircuitAccount, StreamAccount};
36use crate::stream::cmdcheck::AnyCmdChecker;
37use crate::stream::flow_ctrl::state::StreamRateLimit;
38use crate::stream::flow_ctrl::xon_xoff::reader::DrainRateRequest;
39use crate::stream::queue::StreamQueueReceiver;
40use crate::stream::{CloseStreamBehavior, StreamMpscSender};
41use crate::streammap;
42use crate::tunnel::{TunnelId, TunnelScopedCircId};
43use crate::util::err::ReactorError;
44use crate::util::notify::NotifyReceiver;
45use crate::util::skew::ClockSkew;
46use crate::{Error, Result};
47use circuit::Circuit;
48use conflux::ConfluxSet;
49use control::ControlHandler;
50use postage::watch;
51use std::cmp::Ordering;
52use std::collections::BinaryHeap;
53use std::mem::size_of;
54use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
55use tor_cell::relaycell::msg::{AnyRelayMsg, Sendme};
56use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId, UnparsedRelayMsg};
57use tor_error::{Bug, bad_api_usage, debug_report, internal, into_bad_api_usage};
58use tor_rtcompat::{DynTimeProvider, SleepProvider};
59
60use cfg_if::cfg_if;
61use futures::StreamExt;
62use futures::channel::mpsc;
63use futures::{FutureExt as _, select_biased};
64use oneshot_fused_workaround as oneshot;
65
66use std::result::Result as StdResult;
67use std::sync::Arc;
68use std::time::Duration;
69
70use crate::channel::Channel;
71use crate::conflux::msghandler::RemoveLegReason;
72use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey};
73use circuit::CircuitCmd;
74use derive_deftly::Deftly;
75use derive_more::From;
76use smallvec::smallvec;
77use tor_cell::chancell::CircId;
78use tor_llcrypto::pk;
79use tor_memquota::derive_deftly_template_HasMemoryCost;
80use tor_memquota::mq_queue::{self, MpscSpec};
81use tracing::{debug, info, instrument, trace, warn};
82
83use super::circuit::{MutableState, TunnelMutableState};
84
85#[cfg(feature = "conflux")]
86use {
87    crate::conflux::msghandler::{ConfluxCmd, OooRelayMsg},
88    crate::util::err::ConfluxHandshakeError,
89};
90
91pub(super) use control::{CtrlCmd, CtrlMsg, FlowCtrlMsg};
92
93/// The type of a oneshot channel used to inform reactor of the result of an operation.
94pub(super) type ReactorResultChannel<T> = oneshot::Sender<Result<T>>;
95
96/// Contains a list of conflux handshake results.
97#[cfg(feature = "conflux")]
98pub(super) type ConfluxHandshakeResult = Vec<StdResult<(), ConfluxHandshakeError>>;
99
100/// The type of oneshot channel used to inform reactor users of the outcome
101/// of a client-side conflux handshake.
102///
103/// Contains a list of handshake results, one for each circuit that we were asked
104/// to link in the tunnel.
105#[cfg(feature = "conflux")]
106pub(super) type ConfluxLinkResultChannel = ReactorResultChannel<ConfluxHandshakeResult>;
107
108pub(crate) use circuit::{RECV_WINDOW_INIT, STREAM_READER_BUFFER};
109
110/// MPSC queue containing stream requests
111#[cfg(feature = "hs-service")]
112type StreamReqSender = mq_queue::Sender<StreamReqInfo, MpscSpec>;
113
114/// A handshake type, to be used when creating circuit hops.
115#[derive(Clone, Debug)]
116pub(crate) enum CircuitHandshake {
117    /// Use the CREATE_FAST handshake.
118    CreateFast,
119    /// Use the ntor handshake.
120    Ntor {
121        /// The public key of the relay.
122        public_key: NtorPublicKey,
123        /// The Ed25519 identity of the relay, which is verified against the
124        /// identity held in the circuit's channel.
125        ed_identity: pk::ed25519::Ed25519Identity,
126    },
127    /// Use the ntor-v3 handshake.
128    NtorV3 {
129        /// The public key of the relay.
130        public_key: NtorV3PublicKey,
131    },
132}
133
134// TODO: the RunOnceCmd/RunOnceCmdInner/CircuitCmd/CircuitAction enum
135// proliferation is a bit bothersome, but unavoidable with the current design.
136//
137// We should consider getting rid of some of these enums (if possible),
138// and coming up with more intuitive names.
139
140/// One or more [`RunOnceCmdInner`] to run inside [`Reactor::run_once`].
141#[derive(From, Debug)]
142#[allow(clippy::large_enum_variant)] // TODO #2003: resolve this
143enum RunOnceCmd {
144    /// Run a single `RunOnceCmdInner` command.
145    Single(RunOnceCmdInner),
146    /// Run multiple `RunOnceCmdInner` commands.
147    //
148    // Note: this whole enum *could* be replaced with Vec<RunOnceCmdInner>,
149    // but most of the time we're only going to have *one* RunOnceCmdInner
150    // to run per run_once() loop. The enum enables us avoid the extra heap
151    // allocation for the `RunOnceCmd::Single` case.
152    Multiple(Vec<RunOnceCmdInner>),
153}
154
155/// Instructions for running something in the reactor loop.
156///
157/// Run at the end of [`Reactor::run_once`].
158//
159// TODO: many of the variants of this enum have an identical CtrlMsg counterpart.
160// We should consider making each variant a tuple variant and deduplicating the fields.
161#[derive(educe::Educe)]
162#[educe(Debug)]
163enum RunOnceCmdInner {
164    /// Send a RELAY cell.
165    Send {
166        /// The leg the cell should be sent on.
167        leg: UniqId,
168        /// The cell to send.
169        cell: SendRelayCell,
170        /// A channel for sending completion notifications.
171        done: Option<ReactorResultChannel<()>>,
172    },
173    /// Send a given control message on this circuit, and install a control-message handler to
174    /// receive responses.
175    #[cfg(feature = "send-control-msg")]
176    SendMsgAndInstallHandler {
177        /// The message to send, if any
178        msg: Option<AnyRelayMsgOuter>,
179        /// A message handler to install.
180        ///
181        /// If this is `None`, there must already be a message handler installed
182        #[educe(Debug(ignore))]
183        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
184        /// A sender that we use to tell the caller that the message was sent
185        /// and the handler installed.
186        done: oneshot::Sender<Result<()>>,
187    },
188    /// Handle a SENDME message.
189    HandleSendMe {
190        /// The leg the SENDME was received on.
191        leg: UniqId,
192        /// The hop number.
193        hop: HopNum,
194        /// The SENDME message to handle.
195        sendme: Sendme,
196    },
197    /// Begin a stream with the provided hop in this circuit.
198    ///
199    /// Uses the provided stream ID, and sends the provided message to that hop.
200    BeginStream {
201        /// The cell to send.
202        cell: Result<(SendRelayCell, StreamId)>,
203        /// The location of the hop on the tunnel. We don't use this (and `Circuit`s shouldn't need
204        /// to worry about legs anyways), but need it so that we can pass it back in `done` to the
205        /// caller.
206        hop: HopLocation,
207        /// The circuit leg to begin the stream on.
208        leg: UniqId,
209        /// Oneshot channel to notify on completion, with the allocated stream ID.
210        done: ReactorResultChannel<(StreamId, HopLocation, RelayCellFormat)>,
211    },
212    /// Consider sending an XON message with the given `rate`.
213    MaybeSendXon {
214        /// The drain rate to advertise in the XON message.
215        rate: XonKbpsEwma,
216        /// The ID of the stream to send the message on.
217        stream_id: StreamId,
218        /// The location of the hop on the tunnel.
219        hop: HopLocation,
220    },
221    /// Close the specified stream.
222    CloseStream {
223        /// The hop number.
224        hop: HopLocation,
225        /// The ID of the stream to close.
226        sid: StreamId,
227        /// The stream-closing behavior.
228        behav: CloseStreamBehavior,
229        /// The reason for closing the stream.
230        reason: streammap::TerminateReason,
231        /// A channel for sending completion notifications.
232        done: Option<ReactorResultChannel<()>>,
233    },
234    /// Get the clock skew claimed by the first hop of the circuit.
235    FirstHopClockSkew {
236        /// Oneshot channel to return the clock skew.
237        answer: oneshot::Sender<StdResult<ClockSkew, Bug>>,
238    },
239    /// Remove a circuit leg from the conflux set.
240    RemoveLeg {
241        /// The circuit leg to remove.
242        leg: UniqId,
243        /// The reason for removal.
244        ///
245        /// This is only used for conflux circuits that get removed
246        /// before the conflux handshake is complete.
247        ///
248        /// The [`RemoveLegReason`] is mapped by the reactor to a
249        /// [`ConfluxHandshakeError`] that is sent to the initiator of the
250        /// handshake to indicate the reason the handshake failed.
251        reason: RemoveLegReason,
252    },
253    /// A circuit has completed the conflux handshake,
254    /// and wants to send the specified cell.
255    ///
256    /// This is similar to [`RunOnceCmdInner::Send`],
257    /// but needs to remain a separate variant,
258    /// because in addition to instructing the reactor to send a cell,
259    /// it also notifies it that the conflux handshake is complete on the specified `leg`.
260    /// This enables the reactor to save the handshake result (`Ok(())`),
261    /// and, if there are no other legs still in the handshake phase,
262    /// send the result to the handshake initiator.
263    #[cfg(feature = "conflux")]
264    ConfluxHandshakeComplete {
265        /// The circuit leg that has completed the handshake,
266        /// This is the leg the cell should be sent on.
267        leg: UniqId,
268        /// The cell to send.
269        cell: SendRelayCell,
270    },
271    /// Send a LINK cell on each of the unlinked circuit legs in the conflux set of this reactor.
272    #[cfg(feature = "conflux")]
273    Link {
274        /// The circuits to link into the tunnel
275        #[educe(Debug(ignore))]
276        circuits: Vec<Circuit>,
277        /// Oneshot channel for notifying of conflux handshake completion.
278        answer: ConfluxLinkResultChannel,
279    },
280    /// Enqueue an out-of-order cell in ooo_msg.
281    #[cfg(feature = "conflux")]
282    Enqueue {
283        /// The leg the entry originated from.
284        leg: UniqId,
285        /// The out-of-order message.
286        msg: OooRelayMsg,
287    },
288    /// Take a padding-related action on a circuit leg.
289    #[cfg(feature = "circ-padding")]
290    PaddingAction {
291        /// The leg to action on.
292        leg: UniqId,
293        /// The action to take.
294        padding_action: PaddingEvent,
295    },
296    /// Perform a clean shutdown on this circuit.
297    CleanShutdown,
298}
299
300impl RunOnceCmdInner {
301    /// Create a [`RunOnceCmdInner`] out of a [`CircuitCmd`] and [`UniqId`].
302    fn from_circuit_cmd(leg: UniqId, cmd: CircuitCmd) -> Self {
303        match cmd {
304            CircuitCmd::Send(cell) => Self::Send {
305                leg,
306                cell,
307                done: None,
308            },
309            CircuitCmd::HandleSendMe { hop, sendme } => Self::HandleSendMe { leg, hop, sendme },
310            CircuitCmd::CloseStream {
311                hop,
312                sid,
313                behav,
314                reason,
315            } => Self::CloseStream {
316                hop: HopLocation::Hop((leg, hop)),
317                sid,
318                behav,
319                reason,
320                done: None,
321            },
322            #[cfg(feature = "conflux")]
323            CircuitCmd::Conflux(ConfluxCmd::RemoveLeg(reason)) => Self::RemoveLeg { leg, reason },
324            #[cfg(feature = "conflux")]
325            CircuitCmd::Conflux(ConfluxCmd::HandshakeComplete { hop, early, cell }) => {
326                let cell = SendRelayCell {
327                    hop: Some(hop),
328                    early,
329                    cell,
330                };
331                Self::ConfluxHandshakeComplete { leg, cell }
332            }
333            #[cfg(feature = "conflux")]
334            CircuitCmd::Enqueue(msg) => Self::Enqueue { leg, msg },
335            CircuitCmd::CleanShutdown => Self::CleanShutdown,
336        }
337    }
338}
339
340/// A command to execute at the end of [`Reactor::run_once`].
341#[derive(From, Debug)]
342#[allow(clippy::large_enum_variant)] // TODO #2003: should we resolve this?
343enum CircuitAction {
344    /// Run a single `CircuitCmd` command.
345    RunCmd {
346        /// The unique identifier of the circuit leg to run the command on
347        leg: UniqId,
348        /// The command to run.
349        cmd: CircuitCmd,
350    },
351    /// Handle a control message
352    HandleControl(CtrlMsg),
353    /// Handle an input message.
354    HandleCell {
355        /// The unique identifier of the circuit leg the message was received on.
356        leg: UniqId,
357        /// The message to handle.
358        cell: ClientCircChanMsg,
359    },
360    /// Remove the specified circuit leg from the conflux set.
361    ///
362    /// Returned whenever a single circuit leg needs to be be removed
363    /// from the reactor's conflux set, without necessarily tearing down
364    /// the whole set or shutting down the reactor.
365    ///
366    /// Note: this action *can* cause the reactor to shut down
367    /// (and the conflux set to be closed).
368    ///
369    /// See the [`ConfluxSet::remove`] docs for more on the exact behavior of this command.
370    RemoveLeg {
371        /// The leg to remove.
372        leg: UniqId,
373        /// The reason for removal.
374        ///
375        /// This is only used for conflux circuits that get removed
376        /// before the conflux handshake is complete.
377        ///
378        /// The [`RemoveLegReason`] is mapped by the reactor to a
379        /// [`ConfluxHandshakeError`] that is sent to the initiator of the
380        /// handshake to indicate the reason the handshake failed.
381        reason: RemoveLegReason,
382    },
383    /// Take some action (blocking or unblocking a circuit, or sending padding)
384    /// based on the circuit padding backend code.
385    PaddingAction {
386        /// The leg on which to take the padding action.
387        leg: UniqId,
388        /// The action to take.
389        padding_action: PaddingEvent,
390    },
391}
392
393impl CircuitAction {
394    /// Return the ordering with which we should handle this action
395    /// within a list of actions returned by a single call to next_circ_action().
396    ///
397    /// NOTE: Please do not make this any more complicated:
398    /// It is a consequence of a kludge that we need this sorting at all.
399    /// Assuming that eventually, we switch away from the current
400    /// poll-oriented `next_circ_action` design,
401    /// we may be able to get rid of this entirely.
402    fn order_within_batch(&self) -> u8 {
403        use CircuitAction as CA;
404        use PaddingEvent as PE;
405        const EARLY: u8 = 0;
406        const NORMAL: u8 = 1;
407        const LATE: u8 = 2;
408
409        // We use this ordering to move any "StartBlocking" to the _end_ of a batch and
410        // "StopBlocking" to the start.
411        //
412        // This way, we can be sure that we will handle any "send data" operations
413        // (and tell the Padder about them) _before_  we tell the Padder
414        // that we have blocked the circuit.
415        //
416        // This keeps things a bit more logical.
417        match self {
418            CA::RunCmd { .. } => NORMAL,
419            CA::HandleControl(..) => NORMAL,
420            CA::HandleCell { .. } => NORMAL,
421            CA::RemoveLeg { .. } => NORMAL,
422            #[cfg(feature = "circ-padding")]
423            CA::PaddingAction { padding_action, .. } => match padding_action {
424                PE::StopBlocking => EARLY,
425                PE::SendPadding(..) => NORMAL,
426                PE::StartBlocking(..) => LATE,
427            },
428            #[cfg(not(feature = "circ-padding"))]
429            CA::PaddingAction { .. } => NORMAL,
430        }
431    }
432}
433
434/// An object that's waiting for a meta cell (one not associated with a stream) in order to make
435/// progress.
436///
437/// # Background
438///
439/// The `Reactor` can't have async functions that send and receive cells, because its job is to
440/// send and receive cells: if one of its functions tried to do that, it would just hang forever.
441///
442/// To get around this problem, the reactor can send some cells, and then make one of these
443/// `MetaCellHandler` objects, which will be run when the reply arrives.
444pub(crate) trait MetaCellHandler: Send {
445    /// The hop we're expecting the message to come from. This is compared against the hop
446    /// from which we actually receive messages, and an error is thrown if the two don't match.
447    fn expected_hop(&self) -> HopLocation;
448    /// Called when the message we were waiting for arrives.
449    ///
450    /// Gets a copy of the `Reactor` in order to do anything it likes there.
451    ///
452    /// If this function returns an error, the reactor will shut down.
453    fn handle_msg(
454        &mut self,
455        msg: UnparsedRelayMsg,
456        reactor: &mut Circuit,
457    ) -> Result<MetaCellDisposition>;
458}
459
460/// A possible successful outcome of giving a message to a [`MsgHandler`](super::msghandler::MsgHandler).
461#[derive(Debug, Clone)]
462#[cfg_attr(feature = "send-control-msg", visibility::make(pub))]
463#[non_exhaustive]
464pub(crate) enum MetaCellDisposition {
465    /// The message was consumed; the handler should remain installed.
466    #[cfg(feature = "send-control-msg")]
467    Consumed,
468    /// The message was consumed; the handler should be uninstalled.
469    ConversationFinished,
470    /// The message was consumed; the circuit should be closed.
471    #[cfg(feature = "send-control-msg")]
472    CloseCirc,
473    // TODO: Eventually we might want the ability to have multiple handlers
474    // installed, and to let them say "not for me, maybe for somebody else?".
475    // But right now we don't need that.
476}
477
478/// Unwrap the specified [`Option`], returning a [`ReactorError::Shutdown`] if it is `None`.
479///
480/// This is a macro instead of a function to work around borrowck errors
481/// in the select! from run_once().
482macro_rules! unwrap_or_shutdown {
483    ($self:expr, $res:expr, $reason:expr) => {{
484        match $res {
485            None => {
486                trace!(
487                    tunnel_id = %$self.tunnel_id,
488                    reason = %$reason,
489                    "reactor shutdown"
490                );
491                Err(ReactorError::Shutdown)
492            }
493            Some(v) => Ok(v),
494        }
495    }};
496}
497
498/// Object to handle incoming cells and background tasks on a circuit
499///
500/// This type is returned when you finish a circuit; you need to spawn a
501/// new task that calls `run()` on it.
502#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
503pub struct Reactor {
504    /// Receiver for control messages for this reactor, sent by `ClientCirc` objects.
505    ///
506    /// This channel is polled in [`Reactor::run_once`], but only if the `chan_sender` sink
507    /// is ready to accept cells.
508    control: mpsc::UnboundedReceiver<CtrlMsg>,
509    /// Receiver for command messages for this reactor, sent by `ClientCirc` objects.
510    ///
511    /// This channel is polled in [`Reactor::run_once`].
512    ///
513    /// NOTE: this is a separate channel from `control`, because some messages
514    /// have higher priority and need to be handled even if the `chan_sender` is not
515    /// ready (whereas `control` messages are not read until the `chan_sender` sink
516    /// is ready to accept cells).
517    command: mpsc::UnboundedReceiver<CtrlCmd>,
518    /// A oneshot sender that is used to alert other tasks when this reactor is
519    /// finally dropped.
520    ///
521    /// It is a sender for Void because we never actually want to send anything here;
522    /// we only want to generate canceled events.
523    #[allow(dead_code)] // the only purpose of this field is to be dropped.
524    reactor_closed_tx: oneshot::Sender<void::Void>,
525    /// A set of circuits that form a tunnel.
526    ///
527    /// Contains 1 or more circuits.
528    ///
529    /// Circuits may be added to this set throughout the lifetime of the reactor.
530    ///
531    /// Sometimes, the reactor will remove circuits from this set,
532    /// for example if the `LINKED` message takes too long to arrive,
533    /// or if congestion control negotiation fails.
534    /// The reactor will continue running with the remaining circuits.
535    /// It will shut down if *all* the circuits are removed.
536    ///
537    // TODO(conflux): document all the reasons why the reactor might
538    // chose to tear down a circuit or tunnel (timeouts, protocol violations, etc.)
539    circuits: ConfluxSet,
540    /// An identifier for logging about this tunnel reactor.
541    tunnel_id: TunnelId,
542    /// Handlers, shared with `Circuit`.
543    cell_handlers: CellHandlers,
544    /// The time provider, used for conflux handshake timeouts.
545    runtime: DynTimeProvider,
546    /// The conflux handshake context, if there is an on-going handshake.
547    ///
548    /// Set to `None` if this is a single-path tunnel,
549    /// or if none of the circuit legs from our conflux set
550    /// are currently in the conflux handshake phase.
551    #[cfg(feature = "conflux")]
552    conflux_hs_ctx: Option<ConfluxHandshakeCtx>,
553    /// A min-heap buffering all the out-of-order messages received so far.
554    ///
555    /// TODO(conflux): this becomes a DoS vector unless we impose a limit
556    /// on its size. We should make this participate in the memquota memory
557    /// tracking system, somehow.
558    #[cfg(feature = "conflux")]
559    ooo_msgs: BinaryHeap<ConfluxHeapEntry>,
560}
561
562/// The context for an on-going conflux handshake.
563#[cfg(feature = "conflux")]
564struct ConfluxHandshakeCtx {
565    /// A channel for notifying the caller of the outcome of a CONFLUX_LINK request.
566    answer: ConfluxLinkResultChannel,
567    /// The number of legs that are currently doing the handshake.
568    num_legs: usize,
569    /// The handshake results we have collected so far.
570    results: ConfluxHandshakeResult,
571}
572
573/// An out-of-order message buffered in [`Reactor::ooo_msgs`].
574#[derive(Debug)]
575#[cfg(feature = "conflux")]
576struct ConfluxHeapEntry {
577    /// The leg id this message came from.
578    leg_id: UniqId,
579    /// The out of order message
580    msg: OooRelayMsg,
581}
582
583#[cfg(feature = "conflux")]
584impl Ord for ConfluxHeapEntry {
585    fn cmp(&self, other: &Self) -> Ordering {
586        self.msg.cmp(&other.msg)
587    }
588}
589
590#[cfg(feature = "conflux")]
591impl PartialOrd for ConfluxHeapEntry {
592    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
593        Some(self.cmp(other))
594    }
595}
596
597#[cfg(feature = "conflux")]
598impl PartialEq for ConfluxHeapEntry {
599    fn eq(&self, other: &Self) -> bool {
600        self.msg == other.msg
601    }
602}
603
604#[cfg(feature = "conflux")]
605impl Eq for ConfluxHeapEntry {}
606
607/// Cell handlers, shared between the Reactor and its underlying `Circuit`s.
608struct CellHandlers {
609    /// A handler for a meta cell, together with a result channel to notify on completion.
610    ///
611    /// NOTE(prop349): this is part of Arti's "Base Circuit Hop Handler".
612    ///
613    /// Upon sending an EXTEND cell, the [`ControlHandler`] sets this handler
614    /// to [`CircuitExtender`](circuit::extender::CircuitExtender).
615    /// The handler is then used in [`Circuit::handle_meta_cell`] for handling
616    /// all the meta cells received on the circuit that are not SENDMEs or TRUNCATE
617    /// (which are handled separately) or conflux cells
618    /// (which are handled by the conflux handlers).
619    ///
620    /// The handler is uninstalled after the receipt of the EXTENDED cell,
621    /// so any subsequent EXTENDED cells will cause the circuit to be torn down.
622    meta_handler: Option<Box<dyn MetaCellHandler + Send>>,
623    /// A handler for incoming stream requests.
624    #[cfg(feature = "hs-service")]
625    incoming_stream_req_handler: Option<IncomingStreamRequestHandler>,
626}
627
628/// Information about an incoming stream request.
629#[cfg(feature = "hs-service")]
630#[derive(Debug, Deftly)]
631#[derive_deftly(HasMemoryCost)]
632pub(crate) struct StreamReqInfo {
633    /// The [`IncomingStreamRequest`].
634    pub(crate) req: IncomingStreamRequest,
635    /// The ID of the stream being requested.
636    pub(crate) stream_id: StreamId,
637    /// The [`HopNum`].
638    //
639    // TODO: When we add support for exit relays, we need to turn this into an Option<HopNum>.
640    // (For outbound messages (towards relays), there is only one hop that can send them: the client.)
641    //
642    // TODO: For onion services, we might be able to enforce the HopNum earlier: we would never accept an
643    // incoming stream request from two separate hops.  (There is only one that's valid.)
644    pub(crate) hop: HopLocation,
645    /// The format which must be used with this stream to encode messages.
646    #[deftly(has_memory_cost(indirect_size = "0"))]
647    pub(crate) relay_cell_format: RelayCellFormat,
648    /// A channel for receiving messages from this stream.
649    #[deftly(has_memory_cost(indirect_size = "0"))] // estimate
650    pub(crate) receiver: StreamQueueReceiver,
651    /// A channel for sending messages to be sent on this stream.
652    #[deftly(has_memory_cost(indirect_size = "size_of::<AnyRelayMsg>()"))] // estimate
653    pub(crate) msg_tx: StreamMpscSender<AnyRelayMsg>,
654    /// A [`Stream`](futures::Stream) that provides updates to the rate limit for sending data.
655    // TODO(arti#2068): we should consider making this an `Option`
656    // the `watch::Sender` owns the indirect data
657    #[deftly(has_memory_cost(indirect_size = "0"))]
658    pub(crate) rate_limit_stream: watch::Receiver<StreamRateLimit>,
659    /// A [`Stream`](futures::Stream) that provides notifications when a new drain rate is
660    /// requested.
661    #[deftly(has_memory_cost(indirect_size = "0"))]
662    pub(crate) drain_rate_request_stream: NotifyReceiver<DrainRateRequest>,
663    /// The memory quota account to be used for this stream
664    #[deftly(has_memory_cost(indirect_size = "0"))] // estimate (it contains an Arc)
665    pub(crate) memquota: StreamAccount,
666}
667
668/// Data required for handling an incoming stream request.
669#[cfg(feature = "hs-service")]
670#[derive(educe::Educe)]
671#[educe(Debug)]
672struct IncomingStreamRequestHandler {
673    /// A sender for sharing information about an incoming stream request.
674    incoming_sender: StreamReqSender,
675    /// A [`AnyCmdChecker`] for validating incoming stream requests.
676    cmd_checker: AnyCmdChecker,
677    /// The hop to expect incoming stream requests from.
678    hop_num: HopNum,
679    /// An [`IncomingStreamRequestFilter`] for checking whether the user wants
680    /// this request, or wants to reject it immediately.
681    #[educe(Debug(ignore))]
682    filter: Box<dyn IncomingStreamRequestFilter>,
683}
684
685impl Reactor {
686    /// Create a new circuit reactor.
687    ///
688    /// The reactor will send outbound messages on `channel`, receive incoming
689    /// messages on `input`, and identify this circuit by the channel-local
690    /// [`CircId`] provided.
691    ///
692    /// The internal unique identifier for this circuit will be `unique_id`.
693    #[allow(clippy::type_complexity, clippy::too_many_arguments)] // TODO
694    pub(super) fn new(
695        channel: Arc<Channel>,
696        channel_id: CircId,
697        unique_id: UniqId,
698        input: CircuitRxReceiver,
699        runtime: DynTimeProvider,
700        memquota: CircuitAccount,
701        padding_ctrl: PaddingController,
702        padding_stream: PaddingEventStream,
703        timeouts: Arc<dyn TimeoutEstimator + Send>,
704    ) -> (
705        Self,
706        mpsc::UnboundedSender<CtrlMsg>,
707        mpsc::UnboundedSender<CtrlCmd>,
708        oneshot::Receiver<void::Void>,
709        Arc<TunnelMutableState>,
710    ) {
711        let tunnel_id = TunnelId::next();
712        let (control_tx, control_rx) = mpsc::unbounded();
713        let (command_tx, command_rx) = mpsc::unbounded();
714        let mutable = Arc::new(MutableState::default());
715
716        let (reactor_closed_tx, reactor_closed_rx) = oneshot::channel();
717
718        let cell_handlers = CellHandlers {
719            meta_handler: None,
720            #[cfg(feature = "hs-service")]
721            incoming_stream_req_handler: None,
722        };
723
724        let unique_id = TunnelScopedCircId::new(tunnel_id, unique_id);
725        let circuit_leg = Circuit::new(
726            runtime.clone(),
727            channel,
728            channel_id,
729            unique_id,
730            input,
731            memquota,
732            Arc::clone(&mutable),
733            padding_ctrl,
734            padding_stream,
735            timeouts,
736        );
737
738        let (circuits, mutable) = ConfluxSet::new(tunnel_id, circuit_leg);
739
740        let reactor = Reactor {
741            circuits,
742            control: control_rx,
743            command: command_rx,
744            reactor_closed_tx,
745            tunnel_id,
746            cell_handlers,
747            runtime,
748            #[cfg(feature = "conflux")]
749            conflux_hs_ctx: None,
750            #[cfg(feature = "conflux")]
751            ooo_msgs: Default::default(),
752        };
753
754        (reactor, control_tx, command_tx, reactor_closed_rx, mutable)
755    }
756
757    /// Launch the reactor, and run until the circuit closes or we
758    /// encounter an error.
759    ///
760    /// Once this method returns, the circuit is dead and cannot be
761    /// used again.
762    #[instrument(level = "trace", skip_all)]
763    pub async fn run(mut self) -> Result<()> {
764        trace!(tunnel_id = %self.tunnel_id, "Running tunnel reactor");
765        let result: Result<()> = loop {
766            match self.run_once().await {
767                Ok(()) => (),
768                Err(ReactorError::Shutdown) => break Ok(()),
769                Err(ReactorError::Err(e)) => break Err(e),
770            }
771        };
772
773        // Log that the reactor stopped, possibly with the associated error as a report.
774        // May log at a higher level depending on the error kind.
775        const MSG: &str = "Tunnel reactor stopped";
776        match &result {
777            Ok(()) => trace!(tunnel_id = %self.tunnel_id, "{MSG}"),
778            Err(e) => debug_report!(e, tunnel_id = %self.tunnel_id, "{MSG}"),
779        }
780
781        result
782    }
783
784    /// Helper for run: doesn't mark the circuit closed on finish.  Only
785    /// processes one cell or control message.
786    #[instrument(level = "trace", skip_all)]
787    async fn run_once(&mut self) -> StdResult<(), ReactorError> {
788        // If all the circuits are closed, shut down the reactor
789        if self.circuits.is_empty() {
790            trace!(
791                tunnel_id = %self.tunnel_id,
792                "Tunnel reactor shutting down: all circuits have closed",
793            );
794
795            return Err(ReactorError::Shutdown);
796        }
797
798        // If this is a single path circuit, we need to wait until the first hop
799        // is created before doing anything else
800        let single_path_with_hops = self
801            .circuits
802            .single_leg_mut()
803            .is_ok_and(|leg| !leg.has_hops());
804        if single_path_with_hops {
805            self.wait_for_create().await?;
806
807            return Ok(());
808        }
809
810        // Prioritize the buffered messages.
811        //
812        // Note: if any of the messages are ready to be handled,
813        // this will block the reactor until we are done processing them
814        //
815        // TODO circpad: If this is a problem, we might want to re-order things so that we
816        // prioritize padding instead.  On the other hand, this should be fixed by refactoring
817        // circuit and tunnel reactors, so we might do well to just leave it alone for now.
818        #[cfg(feature = "conflux")]
819        self.try_dequeue_ooo_msgs().await?;
820
821        let mut actions = select_biased! {
822            res = self.command.next() => {
823                let cmd = unwrap_or_shutdown!(self, res, "command channel drop")?;
824                return ControlHandler::new(self).handle_cmd(cmd);
825            },
826            // Check whether we've got a control message pending.
827            //
828            // Note: unfortunately, reading from control here means we might start
829            // handling control messages before our chan_senders are ready.
830            // With the current design, this is inevitable: we can't know which circuit leg
831            // a control message is meant for without first reading the control message from
832            // the channel, and at that point, we can't know for sure whether that particular
833            // circuit is ready for sending.
834            ret = self.control.next() => {
835                let msg = unwrap_or_shutdown!(self, ret, "control drop")?;
836                smallvec![CircuitAction::HandleControl(msg)]
837            },
838            res = self.circuits.next_circ_action(&self.runtime).fuse() => res?,
839        };
840
841        // Put the actions into the order that we need to execute them in.
842        //
843        // (Yes, this _does_ have to be a stable sort.  Not all actions may be freely re-ordered
844        // with respect to one another.)
845        actions.sort_by_key(|a| a.order_within_batch());
846
847        for action in actions {
848            let cmd = match action {
849                CircuitAction::RunCmd { leg, cmd } => Some(RunOnceCmd::Single(
850                    RunOnceCmdInner::from_circuit_cmd(leg, cmd),
851                )),
852                CircuitAction::HandleControl(ctrl) => ControlHandler::new(self)
853                    .handle_msg(ctrl)?
854                    .map(RunOnceCmd::Single),
855                CircuitAction::HandleCell { leg, cell } => {
856                    let circ = self
857                        .circuits
858                        .leg_mut(leg)
859                        .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
860
861                    let circ_cmds = circ.handle_cell(&mut self.cell_handlers, leg, cell)?;
862                    if circ_cmds.is_empty() {
863                        None
864                    } else {
865                        // TODO: we return RunOnceCmd::Multiple even if there's a single command.
866                        //
867                        // See the TODO on `Circuit::handle_cell`.
868                        let cmd = RunOnceCmd::Multiple(
869                            circ_cmds
870                                .into_iter()
871                                .map(|cmd| RunOnceCmdInner::from_circuit_cmd(leg, cmd))
872                                .collect(),
873                        );
874
875                        Some(cmd)
876                    }
877                }
878                CircuitAction::RemoveLeg { leg, reason } => {
879                    Some(RunOnceCmdInner::RemoveLeg { leg, reason }.into())
880                }
881                CircuitAction::PaddingAction {
882                    leg,
883                    padding_action,
884                } => {
885                    cfg_if! {
886                        if #[cfg(feature = "circ-padding")] {
887                            Some(RunOnceCmdInner::PaddingAction { leg, padding_action }.into())
888                        } else {
889                            // If padding isn't enabled, we never generate a padding action,
890                            // so we can be sure this case will never be called.
891                            void::unreachable(padding_action.0);
892                        }
893                    }
894                }
895            };
896
897            if let Some(cmd) = cmd {
898                self.handle_run_once_cmd(cmd).await?;
899            }
900        }
901
902        Ok(())
903    }
904
905    /// Try to process the previously-out-of-order messages we might have buffered.
906    #[cfg(feature = "conflux")]
907    #[instrument(level = "trace", skip_all)]
908    async fn try_dequeue_ooo_msgs(&mut self) -> StdResult<(), ReactorError> {
909        // Check if we're ready to dequeue any of the previously out-of-order cells.
910        while let Some(entry) = self.ooo_msgs.peek() {
911            let should_pop = self.circuits.is_seqno_in_order(entry.msg.seqno);
912
913            if !should_pop {
914                break;
915            }
916
917            let entry = self.ooo_msgs.pop().expect("item just disappeared?!");
918
919            let circ = self
920                .circuits
921                .leg_mut(entry.leg_id)
922                .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
923            let handlers = &mut self.cell_handlers;
924            let cmd = circ
925                .handle_in_order_relay_msg(
926                    handlers,
927                    entry.msg.hopnum,
928                    entry.leg_id,
929                    entry.msg.cell_counts_towards_windows,
930                    entry.msg.streamid,
931                    entry.msg.msg,
932                )?
933                .map(|cmd| {
934                    RunOnceCmd::Single(RunOnceCmdInner::from_circuit_cmd(entry.leg_id, cmd))
935                });
936
937            if let Some(cmd) = cmd {
938                self.handle_run_once_cmd(cmd).await?;
939            }
940        }
941
942        Ok(())
943    }
944
945    /// Handle a [`RunOnceCmd`].
946    #[instrument(level = "trace", skip_all)]
947    async fn handle_run_once_cmd(&mut self, cmd: RunOnceCmd) -> StdResult<(), ReactorError> {
948        match cmd {
949            RunOnceCmd::Single(cmd) => return self.handle_single_run_once_cmd(cmd).await,
950            RunOnceCmd::Multiple(cmds) => {
951                // While we know `sendable` is ready to accept *one* cell,
952                // we can't be certain it will be able to accept *all* of the cells
953                // that need to be sent here. This means we *may* end up buffering
954                // in its underlying SometimesUnboundedSink! That is OK, because
955                // RunOnceCmd::Multiple is only used for handling packed cells.
956                for cmd in cmds {
957                    self.handle_single_run_once_cmd(cmd).await?;
958                }
959            }
960        }
961
962        Ok(())
963    }
964
965    /// Handle a [`RunOnceCmd`].
966    #[instrument(level = "trace", skip_all)]
967    async fn handle_single_run_once_cmd(
968        &mut self,
969        cmd: RunOnceCmdInner,
970    ) -> StdResult<(), ReactorError> {
971        match cmd {
972            RunOnceCmdInner::Send { leg, cell, done } => {
973                // TODO: check the cc window
974                let res = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
975                if let Some(done) = done {
976                    // Don't care if the receiver goes away
977                    let _ = done.send(res.clone());
978                }
979                res?;
980            }
981            #[cfg(feature = "send-control-msg")]
982            RunOnceCmdInner::SendMsgAndInstallHandler { msg, handler, done } => {
983                let cell: Result<Option<SendRelayCell>> =
984                    self.prepare_msg_and_install_handler(msg, handler);
985
986                match cell {
987                    Ok(Some(cell)) => {
988                        // TODO(conflux): let the RunOnceCmdInner specify which leg to send the cell on
989                        let outcome = self.circuits.send_relay_cell_on_leg(cell, None).await;
990                        // don't care if receiver goes away.
991                        let _ = done.send(outcome.clone());
992                        outcome?;
993                    }
994                    Ok(None) => {
995                        // don't care if receiver goes away.
996                        let _ = done.send(Ok(()));
997                    }
998                    Err(e) => {
999                        // don't care if receiver goes away.
1000                        let _ = done.send(Err(e.clone()));
1001                        return Err(e.into());
1002                    }
1003                }
1004            }
1005            RunOnceCmdInner::BeginStream {
1006                leg,
1007                cell,
1008                hop,
1009                done,
1010            } => {
1011                match cell {
1012                    Ok((cell, stream_id)) => {
1013                        let circ = self
1014                            .circuits
1015                            .leg_mut(leg)
1016                            .ok_or_else(|| internal!("leg disappeared?!"))?;
1017                        let cell_hop = cell.hop.expect("missing hop in client SendRelayCell?!");
1018                        let relay_format = circ
1019                            .hop_mut(cell_hop)
1020                            // TODO: Is this the right error type here? Or should there be a "HopDisappeared"?
1021                            .ok_or(Error::NoSuchHop)?
1022                            .relay_cell_format();
1023
1024                        let outcome = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
1025                        // don't care if receiver goes away.
1026                        let _ = done.send(outcome.clone().map(|_| (stream_id, hop, relay_format)));
1027                        outcome?;
1028                    }
1029                    Err(e) => {
1030                        // don't care if receiver goes away.
1031                        let _ = done.send(Err(e.clone()));
1032                        return Err(e.into());
1033                    }
1034                }
1035            }
1036            RunOnceCmdInner::CloseStream {
1037                hop,
1038                sid,
1039                behav,
1040                reason,
1041                done,
1042            } => {
1043                let result = {
1044                    let (leg_id, hop_num) = self
1045                        .resolve_hop_location(hop)
1046                        .map_err(into_bad_api_usage!("Could not resolve {hop:?}"))?;
1047                    let leg = self
1048                        .circuits
1049                        .leg_mut(leg_id)
1050                        .ok_or(bad_api_usage!("No leg for id {:?}", leg_id))?;
1051                    Ok::<_, Bug>((leg, hop_num))
1052                };
1053
1054                let (leg, hop_num) = match result {
1055                    Ok(x) => x,
1056                    Err(e) => {
1057                        if let Some(done) = done {
1058                            // don't care if the sender goes away
1059                            let e = into_bad_api_usage!("Could not resolve {hop:?}")(e);
1060                            let _ = done.send(Err(e.into()));
1061                        }
1062                        return Ok(());
1063                    }
1064                };
1065
1066                let max_rtt = {
1067                    let hop = leg
1068                        .hop(hop_num)
1069                        .ok_or_else(|| internal!("the hop we resolved disappeared?!"))?;
1070                    let ccontrol = hop.ccontrol();
1071
1072                    // Note: if we have no measurements for the RTT, this will be set to 0,
1073                    // and the timeout will be 2 * CBT.
1074                    ccontrol
1075                        .rtt()
1076                        .max_rtt_usec()
1077                        .map(|rtt| Duration::from_millis(u64::from(rtt)))
1078                        .unwrap_or_default()
1079                };
1080
1081                // The length of the circuit up until the hop that has the half-streeam.
1082                //
1083                // +1, because HopNums are zero-based.
1084                let circ_len = usize::from(hop_num) + 1;
1085
1086                // We double the CBT to account for rend circuits,
1087                // which are twice as long (otherwise we risk expiring
1088                // the rend half-streams too soon).
1089                let timeout = std::cmp::max(max_rtt, 2 * leg.estimate_cbt(circ_len));
1090                let expire_at = self.runtime.now() + timeout;
1091
1092                let res: Result<()> = leg
1093                    .close_stream(hop_num, sid, behav, reason, expire_at)
1094                    .await;
1095
1096                if let Some(done) = done {
1097                    // don't care if the sender goes away
1098                    let _ = done.send(res);
1099                }
1100            }
1101            RunOnceCmdInner::MaybeSendXon {
1102                rate,
1103                stream_id,
1104                hop,
1105            } => {
1106                let (leg_id, hop_num) = match self.resolve_hop_location(hop) {
1107                    Ok(x) => x,
1108                    Err(NoJoinPointError) => {
1109                        // A stream tried to send an XON message message to the join point of
1110                        // a tunnel that has never had a join point. Currently in arti, only a
1111                        // `StreamTarget` asks us to send an XON message, and this tunnel
1112                        // originally created the `StreamTarget` to begin with. So this is a
1113                        // legitimate bug somewhere in the tunnel code.
1114                        return Err(
1115                            internal!(
1116                                "Could not send an XON message to a join point on a tunnel without a join point",
1117                            )
1118                            .into()
1119                        );
1120                    }
1121                };
1122
1123                let Some(leg) = self.circuits.leg_mut(leg_id) else {
1124                    // The leg has disappeared. This is fine since the stream may have ended and
1125                    // been cleaned up while this `CtrlMsg::MaybeSendXon` message was queued.
1126                    // It is possible that is a bug and this is an incorrect leg number, but
1127                    // it's not currently possible to differentiate between an incorrect leg
1128                    // number and a tunnel leg that has been closed.
1129                    debug!("Could not send an XON message on a leg that does not exist. Ignoring.");
1130                    return Ok(());
1131                };
1132
1133                let Some(hop) = leg.hop_mut(hop_num) else {
1134                    // The hop has disappeared. This is fine since the circuit may have been
1135                    // been truncated while the `CtrlMsg::MaybeSendXon` message was queued.
1136                    // It is possible that is a bug and this is an incorrect hop number, but
1137                    // it's not currently possible to differentiate between an incorrect hop
1138                    // number and a circuit hop that has been removed.
1139                    debug!("Could not send an XON message on a hop that does not exist. Ignoring.");
1140                    return Ok(());
1141                };
1142
1143                let Some(msg) = hop.maybe_send_xon(rate, stream_id)? else {
1144                    // Nothing to do.
1145                    return Ok(());
1146                };
1147
1148                let cell = AnyRelayMsgOuter::new(Some(stream_id), msg.into());
1149                let cell = SendRelayCell {
1150                    hop: Some(hop_num),
1151                    early: false,
1152                    cell,
1153                };
1154
1155                leg.send_relay_cell(cell).await?;
1156            }
1157            RunOnceCmdInner::HandleSendMe { leg, hop, sendme } => {
1158                let leg = self
1159                    .circuits
1160                    .leg_mut(leg)
1161                    .ok_or_else(|| internal!("leg disappeared?!"))?;
1162                // NOTE: it's okay to await. We are only awaiting on the congestion_signals
1163                // future which *should* resolve immediately
1164                let signals = leg.chan_sender.congestion_signals().await;
1165                leg.handle_sendme(hop, sendme, signals)?;
1166            }
1167            RunOnceCmdInner::FirstHopClockSkew { answer } => {
1168                let res = self.circuits.single_leg_mut().map(|leg| leg.clock_skew());
1169
1170                // don't care if the sender goes away
1171                let _ = answer.send(res.map_err(Into::into));
1172            }
1173            RunOnceCmdInner::CleanShutdown => {
1174                trace!(tunnel_id = %self.tunnel_id, "reactor shutdown due to handled cell");
1175                return Err(ReactorError::Shutdown);
1176            }
1177            RunOnceCmdInner::RemoveLeg { leg, reason } => {
1178                warn!(tunnel_id = %self.tunnel_id, reason = %reason, "removing circuit leg");
1179
1180                let circ = self.circuits.remove(leg)?;
1181                let is_conflux_pending = circ.is_conflux_pending();
1182
1183                // Drop the removed leg. This will cause it to close if it's not already closed.
1184                drop(circ);
1185
1186                // If we reach this point, it means we have more than one leg
1187                // (otherwise the .remove() would've returned a Shutdown error),
1188                // so we expect there to be a ConfluxHandshakeContext installed.
1189
1190                #[cfg(feature = "conflux")]
1191                if is_conflux_pending {
1192                    let (error, proto_violation): (_, Option<Error>) = match &reason {
1193                        RemoveLegReason::ConfluxHandshakeTimeout => {
1194                            (ConfluxHandshakeError::Timeout, None)
1195                        }
1196                        RemoveLegReason::ConfluxHandshakeErr(e) => {
1197                            (ConfluxHandshakeError::Link(e.clone()), Some(e.clone()))
1198                        }
1199                        RemoveLegReason::ChannelClosed => {
1200                            (ConfluxHandshakeError::ChannelClosed, None)
1201                        }
1202                    };
1203
1204                    self.note_conflux_handshake_result(Err(error), proto_violation.is_some())?;
1205
1206                    if let Some(e) = proto_violation {
1207                        tor_error::warn_report!(
1208                            e,
1209                            tunnel_id = %self.tunnel_id,
1210                            "Malformed conflux handshake, tearing down tunnel",
1211                        );
1212
1213                        return Err(e.into());
1214                    }
1215                }
1216            }
1217            #[cfg(feature = "conflux")]
1218            RunOnceCmdInner::ConfluxHandshakeComplete { leg, cell } => {
1219                // Note: on the client-side, the handshake is considered complete once the
1220                // RELAY_CONFLUX_LINKED_ACK is sent (roughly upon receipt of the LINKED cell).
1221                //
1222                // We're optimistic here, and declare the handshake a success *before*
1223                // sending the LINKED_ACK response. I think this is OK though,
1224                // because if the send_relay_cell() below fails, the reactor will shut
1225                // down anyway. OTOH, marking the handshake as complete slightly early
1226                // means that on the happy path, the circuit is marked as usable sooner,
1227                // instead of blocking on the sending of the LINKED_ACK.
1228                self.note_conflux_handshake_result(Ok(()), false)?;
1229
1230                let res = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
1231
1232                res?;
1233            }
1234            #[cfg(feature = "conflux")]
1235            RunOnceCmdInner::Link { circuits, answer } => {
1236                // Add the specified circuits to our conflux set,
1237                // and send a LINK cell down each unlinked leg.
1238                //
1239                // NOTE: this will block the reactor until all the cells are sent.
1240                self.handle_link_circuits(circuits, answer).await?;
1241            }
1242            #[cfg(feature = "conflux")]
1243            RunOnceCmdInner::Enqueue { leg, msg } => {
1244                let entry = ConfluxHeapEntry { leg_id: leg, msg };
1245                self.ooo_msgs.push(entry);
1246            }
1247            #[cfg(feature = "circ-padding")]
1248            RunOnceCmdInner::PaddingAction {
1249                leg,
1250                padding_action,
1251            } => {
1252                // TODO: If we someday move back to having a per-circuit reactor,
1253                // this action would logically belong there, not on the tunnel reactor.
1254                self.circuits
1255                    .run_padding_action(leg, padding_action)
1256                    .await?;
1257            }
1258        }
1259
1260        Ok(())
1261    }
1262
1263    /// Wait for a [`CtrlMsg::Create`] to come along to set up the circuit.
1264    ///
1265    /// Returns an error if an unexpected `CtrlMsg` is received.
1266    #[instrument(level = "trace", skip_all)]
1267    async fn wait_for_create(&mut self) -> StdResult<(), ReactorError> {
1268        let msg = select_biased! {
1269            res = self.command.next() => {
1270                let cmd = unwrap_or_shutdown!(self, res, "shutdown channel drop")?;
1271                match cmd {
1272                    CtrlCmd::Shutdown => return self.handle_shutdown().map(|_| ()),
1273                    #[cfg(test)]
1274                    CtrlCmd::AddFakeHop {
1275                        relay_cell_format: format,
1276                        fwd_lasthop,
1277                        rev_lasthop,
1278                        peer_id,
1279                        params,
1280                        done,
1281                    } => {
1282                        let leg = self.circuits.single_leg_mut()?;
1283                        leg.handle_add_fake_hop(format, fwd_lasthop, rev_lasthop, peer_id, &params, done);
1284                        return Ok(())
1285                    },
1286                    _ => {
1287                        trace!("reactor shutdown due to unexpected command: {:?}", cmd);
1288                        return Err(Error::CircProto(format!("Unexpected control {cmd:?} on client circuit")).into());
1289                    }
1290                }
1291            },
1292            res = self.control.next() => unwrap_or_shutdown!(self, res, "control drop")?,
1293        };
1294
1295        match msg {
1296            CtrlMsg::Create {
1297                recv_created,
1298                handshake,
1299                settings,
1300                done,
1301            } => {
1302                // TODO(conflux): instead of crashing the reactor, it might be better
1303                // to send the error via the done channel instead
1304                let leg = self.circuits.single_leg_mut()?;
1305                leg.handle_create(recv_created, handshake, settings, done)
1306                    .await
1307            }
1308            _ => {
1309                trace!("reactor shutdown due to unexpected cell: {:?}", msg);
1310
1311                Err(Error::CircProto(format!("Unexpected {msg:?} cell on client circuit")).into())
1312            }
1313        }
1314    }
1315
1316    /// Add the specified handshake result to our `ConfluxHandshakeContext`.
1317    ///
1318    /// If all the circuits we were waiting on have finished the conflux handshake,
1319    /// the `ConfluxHandshakeContext` is consumed, and the results we have collected
1320    /// are sent to the handshake initiator.
1321    #[cfg(feature = "conflux")]
1322    #[instrument(level = "trace", skip_all)]
1323    fn note_conflux_handshake_result(
1324        &mut self,
1325        res: StdResult<(), ConfluxHandshakeError>,
1326        reactor_is_closing: bool,
1327    ) -> StdResult<(), ReactorError> {
1328        let tunnel_complete = match self.conflux_hs_ctx.as_mut() {
1329            Some(conflux_ctx) => {
1330                conflux_ctx.results.push(res);
1331                // Whether all the legs have finished linking:
1332                conflux_ctx.results.len() == conflux_ctx.num_legs
1333            }
1334            None => {
1335                return Err(internal!("no conflux handshake context").into());
1336            }
1337        };
1338
1339        if tunnel_complete || reactor_is_closing {
1340            // Time to remove the conflux handshake context
1341            // and extract the results we have collected
1342            let conflux_ctx = self.conflux_hs_ctx.take().expect("context disappeared?!");
1343
1344            let success_count = conflux_ctx.results.iter().filter(|res| res.is_ok()).count();
1345            let leg_count = conflux_ctx.results.len();
1346
1347            info!(
1348                tunnel_id = %self.tunnel_id,
1349                "conflux tunnel ready ({success_count}/{leg_count} circuits successfully linked)",
1350            );
1351
1352            send_conflux_outcome(conflux_ctx.answer, Ok(conflux_ctx.results))?;
1353
1354            // We don't expect to receive any more handshake results,
1355            // at least not until we get another LinkCircuits control message,
1356            // which will install a new ConfluxHandshakeCtx with a channel
1357            // for us to send updates on
1358        }
1359
1360        Ok(())
1361    }
1362
1363    /// Prepare a `SendRelayCell` request, and install the given meta-cell handler.
1364    fn prepare_msg_and_install_handler(
1365        &mut self,
1366        msg: Option<AnyRelayMsgOuter>,
1367        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
1368    ) -> Result<Option<SendRelayCell>> {
1369        let msg = msg
1370            .map(|msg| {
1371                let handlers = &mut self.cell_handlers;
1372                let handler = handler
1373                    .as_ref()
1374                    .or(handlers.meta_handler.as_ref())
1375                    .ok_or_else(|| internal!("tried to use an ended Conversation"))?;
1376                // We should always have a precise HopLocation here so this should never fails but
1377                // in case we have a ::JointPoint, we'll notice.
1378                let hop = handler.expected_hop().hop_num().ok_or(bad_api_usage!(
1379                    "MsgHandler doesn't have a precise HopLocation"
1380                ))?;
1381                Ok::<_, crate::Error>(SendRelayCell {
1382                    hop: Some(hop),
1383                    early: false,
1384                    cell: msg,
1385                })
1386            })
1387            .transpose()?;
1388
1389        if let Some(handler) = handler {
1390            self.cell_handlers.set_meta_handler(handler)?;
1391        }
1392
1393        Ok(msg)
1394    }
1395
1396    /// Handle a shutdown request.
1397    fn handle_shutdown(&self) -> StdResult<Option<RunOnceCmdInner>, ReactorError> {
1398        trace!(
1399            tunnel_id = %self.tunnel_id,
1400            "reactor shutdown due to explicit request",
1401        );
1402
1403        Err(ReactorError::Shutdown)
1404    }
1405
1406    /// Handle a request to shutdown the reactor and return the only [`Circuit`] in this tunnel.
1407    ///
1408    /// Returns an error over the `answer` channel if the reactor has no circuits,
1409    /// or more than one circuit. The reactor will shut down regardless.
1410    #[cfg(feature = "conflux")]
1411    fn handle_shutdown_and_return_circuit(
1412        &mut self,
1413        answer: oneshot::Sender<StdResult<Circuit, Bug>>,
1414    ) -> StdResult<(), ReactorError> {
1415        // Don't care if the receiver goes away
1416        let _ = answer.send(self.circuits.take_single_leg());
1417        self.handle_shutdown().map(|_| ())
1418    }
1419
1420    /// Resolves a [`TargetHop`] to a [`HopLocation`].
1421    ///
1422    /// After resolving a `TargetHop::LastHop`,
1423    /// the `HopLocation` can become stale if a single-path circuit is later extended or truncated.
1424    /// This means that the `HopLocation` can become stale from one reactor iteration to the next.
1425    ///
1426    /// It's generally okay to hold on to a (possibly stale) `HopLocation`
1427    /// if you need a fixed hop position in the tunnel.
1428    /// For example if we open a stream to `TargetHop::LastHop`,
1429    /// we would want to store the stream position as a `HopLocation` and not a `TargetHop::LastHop`
1430    /// as we don't want the stream position to change as the tunnel is extended or truncated.
1431    ///
1432    /// Returns [`NoHopsBuiltError`] if trying to resolve `TargetHop::LastHop`
1433    /// and the tunnel has no hops
1434    /// (either has no legs, or has legs which contain no hops).
1435    fn resolve_target_hop(&self, hop: TargetHop) -> StdResult<HopLocation, NoHopsBuiltError> {
1436        match hop {
1437            TargetHop::Hop(hop) => Ok(hop),
1438            TargetHop::LastHop => {
1439                if let Ok(leg) = self.circuits.single_leg() {
1440                    let leg_id = leg.unique_id();
1441                    // single-path tunnel
1442                    let hop = leg.last_hop_num().ok_or(NoHopsBuiltError)?;
1443                    Ok(HopLocation::Hop((leg_id, hop)))
1444                } else if !self.circuits.is_empty() {
1445                    // multi-path tunnel
1446                    Ok(HopLocation::JoinPoint)
1447                } else {
1448                    // no legs
1449                    Err(NoHopsBuiltError)
1450                }
1451            }
1452        }
1453    }
1454
1455    /// Resolves a [`HopLocation`] to a [`UniqId`] and [`HopNum`].
1456    ///
1457    /// After resolving a `HopLocation::JoinPoint`,
1458    /// the [`UniqId`] and [`HopNum`] can become stale if the primary leg changes.
1459    ///
1460    /// You should try to only resolve to a specific [`UniqId`] and [`HopNum`] immediately before you
1461    /// need them,
1462    /// and you should not hold on to the resolved [`UniqId`] and [`HopNum`] between reactor
1463    /// iterations as the primary leg may change from one iteration to the next.
1464    ///
1465    /// Returns [`NoJoinPointError`] if trying to resolve `HopLocation::JoinPoint`
1466    /// but it does not have a join point.
1467    #[instrument(level = "trace", skip_all)]
1468    fn resolve_hop_location(
1469        &self,
1470        hop: HopLocation,
1471    ) -> StdResult<(UniqId, HopNum), NoJoinPointError> {
1472        match hop {
1473            HopLocation::Hop((leg_id, hop_num)) => Ok((leg_id, hop_num)),
1474            HopLocation::JoinPoint => {
1475                if let Some((leg_id, hop_num)) = self.circuits.primary_join_point() {
1476                    Ok((leg_id, hop_num))
1477                } else {
1478                    // Attempted to get the join point of a non-multipath tunnel.
1479                    Err(NoJoinPointError)
1480                }
1481            }
1482        }
1483    }
1484
1485    /// Resolve a [`TargetHop`] directly into a [`UniqId`] and [`HopNum`].
1486    ///
1487    /// This is a helper function that basically calls both resolve_target_hop and
1488    /// resolve_hop_location back to back.
1489    ///
1490    /// It returns None on failure to resolve meaning that if you want more detailed error on why
1491    /// it failed, explicitly use the resolve_hop_location() and resolve_target_hop() functions.
1492    pub(crate) fn target_hop_to_hopnum_id(&self, hop: TargetHop) -> Option<(UniqId, HopNum)> {
1493        self.resolve_target_hop(hop)
1494            .ok()
1495            .and_then(|resolved| self.resolve_hop_location(resolved).ok())
1496    }
1497
1498    /// Install or remove a padder at a given hop.
1499    #[cfg(feature = "circ-padding-manual")]
1500    fn set_padding_at_hop(
1501        &self,
1502        hop: HopLocation,
1503        padder: Option<super::circuit::padding::CircuitPadder>,
1504    ) -> Result<()> {
1505        let HopLocation::Hop((leg_id, hop_num)) = hop else {
1506            return Err(bad_api_usage!("Padding to the join point is not supported.").into());
1507        };
1508        let circ = self.circuits.leg(leg_id).ok_or(Error::NoSuchHop)?;
1509        circ.set_padding_at_hop(hop_num, padder)?;
1510        Ok(())
1511    }
1512
1513    /// Does congestion control use stream SENDMEs for the given hop?
1514    ///
1515    /// Returns `None` if either the `leg` or `hop` don't exist.
1516    fn uses_stream_sendme(&self, leg: UniqId, hop: HopNum) -> Option<bool> {
1517        self.circuits.uses_stream_sendme(leg, hop)
1518    }
1519
1520    /// Handle a request to link some extra circuits in the reactor's conflux set.
1521    ///
1522    /// The circuits are validated, and if they do not have the same length,
1523    /// or if they do not all have the same last hop, an error is returned on
1524    /// the `answer` channel, and the conflux handshake is *not* initiated.
1525    ///
1526    /// If validation succeeds, the circuits are added to this reactor's conflux set,
1527    /// and the conflux handshake is initiated (by sending a LINK cell on each leg).
1528    ///
1529    /// NOTE: this blocks the reactor main loop until all the cells are sent.
1530    #[cfg(feature = "conflux")]
1531    #[instrument(level = "trace", skip_all)]
1532    async fn handle_link_circuits(
1533        &mut self,
1534        circuits: Vec<Circuit>,
1535        answer: ConfluxLinkResultChannel,
1536    ) -> StdResult<(), ReactorError> {
1537        use tor_error::warn_report;
1538
1539        if self.conflux_hs_ctx.is_some() {
1540            let err = internal!("conflux linking already in progress");
1541            send_conflux_outcome(answer, Err(err.into()))?;
1542
1543            return Ok(());
1544        }
1545
1546        let unlinked_legs = self.circuits.num_unlinked();
1547
1548        // We need to send the LINK cell on each of the new circuits
1549        // and on each of the existing, unlinked legs from self.circuits.
1550        //
1551        // In reality, there can only be one such circuit
1552        // (the "initial" one from the previously single-path tunnel),
1553        // because any circuits that to complete the conflux handshake
1554        // get removed from the set.
1555        let num_legs = circuits.len() + unlinked_legs;
1556
1557        // Note: add_legs validates `circuits`
1558        let res = async {
1559            self.circuits.add_legs(circuits, &self.runtime)?;
1560            self.circuits.link_circuits(&self.runtime).await
1561        }
1562        .await;
1563
1564        if let Err(e) = res {
1565            warn_report!(e, "Failed to link conflux circuits");
1566
1567            send_conflux_outcome(answer, Err(e))?;
1568        } else {
1569            // Save the channel, to notify the user of completion.
1570            self.conflux_hs_ctx = Some(ConfluxHandshakeCtx {
1571                answer,
1572                num_legs,
1573                results: Default::default(),
1574            });
1575        }
1576
1577        Ok(())
1578    }
1579}
1580
1581/// Notify the conflux handshake initiator of the handshake outcome.
1582///
1583/// Returns an error if the initiator has done away.
1584#[cfg(feature = "conflux")]
1585fn send_conflux_outcome(
1586    tx: ConfluxLinkResultChannel,
1587    res: Result<ConfluxHandshakeResult>,
1588) -> StdResult<(), ReactorError> {
1589    if tx.send(res).is_err() {
1590        tracing::warn!("conflux initiator went away before handshake completed?");
1591        return Err(ReactorError::Shutdown);
1592    }
1593
1594    Ok(())
1595}
1596
1597/// The tunnel does not have any hops.
1598#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
1599#[non_exhaustive]
1600#[error("no hops have been built for this tunnel")]
1601pub(crate) struct NoHopsBuiltError;
1602
1603/// The tunnel does not have a join point.
1604#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
1605#[non_exhaustive]
1606#[error("the tunnel does not have a join point")]
1607pub(crate) struct NoJoinPointError;
1608
1609impl CellHandlers {
1610    /// Try to install a given meta-cell handler to receive any unusual cells on
1611    /// this circuit, along with a result channel to notify on completion.
1612    fn set_meta_handler(&mut self, handler: Box<dyn MetaCellHandler + Send>) -> Result<()> {
1613        if self.meta_handler.is_none() {
1614            self.meta_handler = Some(handler);
1615            Ok(())
1616        } else {
1617            Err(Error::from(internal!(
1618                "Tried to install a meta-cell handler before the old one was gone."
1619            )))
1620        }
1621    }
1622
1623    /// Try to install a given cell handler on this circuit.
1624    #[cfg(feature = "hs-service")]
1625    fn set_incoming_stream_req_handler(
1626        &mut self,
1627        handler: IncomingStreamRequestHandler,
1628    ) -> Result<()> {
1629        if self.incoming_stream_req_handler.is_none() {
1630            self.incoming_stream_req_handler = Some(handler);
1631            Ok(())
1632        } else {
1633            Err(Error::from(internal!(
1634                "Tried to install a BEGIN cell handler before the old one was gone."
1635            )))
1636        }
1637    }
1638}
1639
1640#[cfg(test)]
1641mod test {
1642    // Tested in [`crate::client::circuit::test`].
1643}