tor_proto/tunnel/
reactor.rs

1//! Code to handle incoming cells on a circuit.
2//!
3//! ## On message validation
4//!
5//! There are three steps for validating an incoming message on a stream:
6//!
7//! 1. Is the message contextually appropriate? (e.g., no more than one
8//!    `CONNECTED` message per stream.) This is handled by calling
9//!    [`CmdChecker::check_msg`](crate::stream::CmdChecker::check_msg).
10//! 2. Does the message comply with flow-control rules? (e.g., no more data than
11//!    we've gotten SENDMEs for.) For open streams, the stream itself handles
12//!    this; for half-closed streams, the reactor handles it using the
13//!    `halfstream` module.
14//! 3. Does the message have an acceptable command type, and is the message
15//!    well-formed? For open streams, the streams themselves handle this check.
16//!    For half-closed streams, the reactor handles it by calling
17//!    `consume_checked_msg()`.
18
19mod conflux;
20mod control;
21mod create;
22mod extender;
23pub(super) mod syncview;
24
25use super::handshake::RelayCryptLayerProtocol;
26use crate::congestion::sendme::{self, CircTag};
27use crate::congestion::{CongestionControl, CongestionSignals};
28use crate::crypto::binding::CircuitBinding;
29use crate::crypto::cell::{
30    HopNum, InboundClientCrypt, InboundClientLayer, OutboundClientCrypt, OutboundClientLayer,
31    RelayCellBody, SENDME_TAG_LEN,
32};
33use crate::crypto::handshake::fast::CreateFastClient;
34#[cfg(feature = "ntor_v3")]
35use crate::crypto::handshake::ntor_v3::{NtorV3Client, NtorV3PublicKey};
36use crate::memquota::{CircuitAccount, SpecificAccount as _, StreamAccount};
37use crate::stream::{AnyCmdChecker, StreamStatus};
38use crate::tunnel::circuit::celltypes::{ClientCircChanMsg, CreateResponse};
39use crate::tunnel::circuit::handshake::{BoxedClientLayer, HandshakeRole};
40use crate::tunnel::circuit::unique_id::UniqId;
41use crate::tunnel::circuit::MutableState;
42use crate::tunnel::circuit::{CircParameters, CircuitRxReceiver};
43use crate::tunnel::streammap::{
44    self, EndSentStreamEnt, OpenStreamEnt, ShouldSendEnd, StreamEntMut,
45};
46use crate::util::err::ReactorError;
47use crate::util::skew::ClockSkew;
48use crate::util::sometimes_unbounded_sink::SometimesUnboundedSink;
49use crate::util::SinkExt as _;
50use crate::{Error, Result};
51use conflux::ConfluxSet;
52use control::ControlHandler;
53use futures::stream::FuturesUnordered;
54use std::borrow::Borrow;
55use std::mem::size_of;
56use std::pin::Pin;
57use tor_cell::chancell::msg::{AnyChanMsg, HandshakeType, Relay};
58use tor_cell::relaycell::msg::{AnyRelayMsg, End, Sendme, Truncated};
59use tor_cell::relaycell::{
60    AnyRelayMsgOuter, RelayCellDecoder, RelayCellDecoderResult, RelayCellFormat, RelayCmd,
61    StreamId, UnparsedRelayMsg,
62};
63use tor_error::{internal, Bug};
64#[cfg(feature = "hs-service")]
65use {
66    crate::stream::{DataCmdChecker, IncomingStreamRequest, IncomingStreamRequestFilter},
67    tor_cell::relaycell::msg::Begin,
68};
69
70use futures::channel::mpsc;
71use futures::StreamExt;
72use futures::{select_biased, FutureExt as _, SinkExt as _, Stream};
73use oneshot_fused_workaround as oneshot;
74
75use std::result::Result as StdResult;
76use std::sync::{Arc, Mutex};
77use std::task::Poll;
78
79use crate::channel::{Channel, ChannelSender};
80use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey};
81use crate::crypto::handshake::{ClientHandshake, KeyGenerator};
82use crate::tunnel::circuit::path;
83use crate::tunnel::circuit::{StreamMpscReceiver, StreamMpscSender};
84use derive_deftly::Deftly;
85use derive_more::From;
86use safelog::sensitive as sv;
87use tor_async_utils::{SinkPrepareExt as _, SinkTrySend as _, SinkTrySendError as _};
88use tor_cell::chancell::{AnyChanCell, CircId};
89use tor_cell::chancell::{BoxedCellBody, ChanMsg};
90use tor_linkspec::RelayIds;
91use tor_llcrypto::pk;
92use tor_memquota::derive_deftly_template_HasMemoryCost;
93use tor_memquota::mq_queue::{self, ChannelSpec as _, MpscSpec};
94use tracing::{debug, trace, warn};
95
96use create::{Create2Wrap, CreateFastWrap, CreateHandshakeWrap};
97use extender::HandshakeAuxDataHandler;
98
99pub(super) use control::CtrlCmd;
100pub(super) use control::CtrlMsg;
101
102/// Initial value for outbound flow-control window on streams.
103pub(super) const SEND_WINDOW_INIT: u16 = 500;
104/// Initial value for inbound flow-control window on streams.
105pub(super) const RECV_WINDOW_INIT: u16 = 500;
106/// Size of the buffer used between the reactor and a `StreamReader`.
107///
108/// FIXME(eta): We pick 2× the receive window, which is very conservative (we arguably shouldn't
109///             get sent more than the receive window anyway!). We might do due to things that
110///             don't count towards the window though.
111pub(super) const STREAM_READER_BUFFER: usize = (2 * RECV_WINDOW_INIT) as usize;
112
113/// The type of a oneshot channel used to inform reactor users of the result of an operation.
114pub(super) type ReactorResultChannel<T> = oneshot::Sender<Result<T>>;
115
116/// MPSC queue containing stream requests
117#[cfg(feature = "hs-service")]
118type StreamReqSender = mq_queue::Sender<StreamReqInfo, MpscSpec>;
119
120/// A handshake type, to be used when creating circuit hops.
121#[derive(Clone, Debug)]
122pub(crate) enum CircuitHandshake {
123    /// Use the CREATE_FAST handshake.
124    CreateFast,
125    /// Use the ntor handshake.
126    Ntor {
127        /// The public key of the relay.
128        public_key: NtorPublicKey,
129        /// The Ed25519 identity of the relay, which is verified against the
130        /// identity held in the circuit's channel.
131        ed_identity: pk::ed25519::Ed25519Identity,
132    },
133    /// Use the ntor-v3 handshake.
134    #[cfg(feature = "ntor_v3")]
135    NtorV3 {
136        /// The public key of the relay.
137        public_key: NtorV3PublicKey,
138    },
139}
140
141/// A behavior to perform when closing a stream.
142///
143/// We don't use `Option<End>`` here, since the behavior of `SendNothing` is so surprising
144/// that we shouldn't let it pass unremarked.
145#[derive(Clone, Debug)]
146pub(crate) enum CloseStreamBehavior {
147    /// Send nothing at all, so that the other side will not realize we have
148    /// closed the stream.
149    ///
150    /// We should only do this for incoming onion service streams when we
151    /// want to black-hole the client's requests.
152    SendNothing,
153    /// Send an End cell, if we haven't already sent one.
154    SendEnd(End),
155}
156impl Default for CloseStreamBehavior {
157    fn default() -> Self {
158        Self::SendEnd(End::new_misc())
159    }
160}
161
162/// Represents the reactor's view of a single hop.
163pub(super) struct CircHop {
164    /// Reactor unique ID. Used for logging.
165    unique_id: UniqId,
166    /// Hop number in the path.
167    hop_num: HopNum,
168    /// Map from stream IDs to streams.
169    ///
170    /// We store this with the reactor instead of the circuit, since the
171    /// reactor needs it for every incoming cell on a stream, whereas
172    /// the circuit only needs it when allocating new streams.
173    ///
174    /// NOTE: this is behind a mutex because the reactor polls the `StreamMap`s
175    /// of all hops concurrently, in a [`FuturesUnordered`]. Without the mutex,
176    /// this wouldn't be possible, because it would mean holding multiple
177    /// mutable references to `self` (the reactor). Note, however,
178    /// that there should never be any contention on this mutex:
179    /// we never create more than one [`Circuit::ready_streams_iterator`] stream
180    /// at a time, and we never clone/lock the hop's `StreamMap` outside of
181    /// [`Circuit::ready_streams_iterator`].
182    ///
183    // TODO: encapsulate the Vec<CircHop> into a separate CircHops structure,
184    // and hide its internals from the Reactor. The CircHops implementation
185    // should enforce the invariant described in the note above.
186    map: Arc<Mutex<streammap::StreamMap>>,
187    /// Congestion control object.
188    ///
189    /// This object is also in charge of handling circuit level SENDME logic for this hop.
190    ccontrol: CongestionControl,
191    /// Decodes relay cells received from this hop.
192    inbound: RelayCellDecoder,
193}
194
195/// One or more [`RunOnceCmdInner`] to run inside [`Reactor::run_once`].
196#[derive(From, Debug)]
197enum RunOnceCmd {
198    /// Run a single `RunOnceCmdInner` command.
199    Single(RunOnceCmdInner),
200    /// Run multiple `RunOnceCmdInner` commands.
201    //
202    // Note: this whole enum *could* be replaced with Vec<RunOnceCmdInner>,
203    // but most of the time we're only going to have *one* RunOnceCmdInner
204    // to run per run_once() loop. The enum enables us avoid the extra heap
205    // allocation for the `RunOnceCmd::Single` case.
206    Multiple(Vec<RunOnceCmdInner>),
207}
208
209/// Instructions for running something in the reactor loop.
210///
211/// Run at the end of [`Reactor::run_once`].
212//
213// TODO: many of the variants of this enum have an identical CtrlMsg counterpart.
214// We should consider making each variant a tuple variant and deduplicating the fields.
215#[derive(educe::Educe)]
216#[educe(Debug)]
217enum RunOnceCmdInner {
218    /// Send a RELAY cell.
219    Send {
220        /// The cell to send.
221        cell: SendRelayCell,
222        /// A channel for sending completion notifications.
223        done: Option<ReactorResultChannel<()>>,
224    },
225    /// Send a given control message on this circuit, and install a control-message handler to
226    /// receive responses.
227    #[cfg(feature = "send-control-msg")]
228    SendMsgAndInstallHandler {
229        /// The message to send, if any
230        msg: Option<AnyRelayMsgOuter>,
231        /// A message handler to install.
232        ///
233        /// If this is `None`, there must already be a message handler installed
234        #[educe(Debug(ignore))]
235        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
236        /// A sender that we use to tell the caller that the message was sent
237        /// and the handler installed.
238        done: oneshot::Sender<Result<()>>,
239    },
240    /// Handle a SENDME message.
241    HandleSendMe {
242        /// The hop number.
243        hop: HopNum,
244        /// The SENDME message to handle.
245        sendme: Sendme,
246    },
247    /// Begin a stream with the provided hop in this circuit.
248    ///
249    /// Uses the provided stream ID, and sends the provided message to that hop.
250    BeginStream {
251        /// The cell to send.
252        cell: Result<(SendRelayCell, StreamId)>,
253        /// Oneshot channel to notify on completion, with the allocated stream ID.
254        done: ReactorResultChannel<StreamId>,
255    },
256    /// Close the specified stream.
257    CloseStream {
258        /// The hop number.
259        hop_num: HopNum,
260        /// The ID of the stream to close.
261        sid: StreamId,
262        /// The stream-closing behavior.
263        behav: CloseStreamBehavior,
264        /// The reason for closing the stream.
265        reason: streammap::TerminateReason,
266        /// A channel for sending completion notifications.
267        done: Option<ReactorResultChannel<()>>,
268    },
269    /// Get the clock skew claimed by the first hop of the circuit.
270    FirstHopClockSkew {
271        /// Oneshot channel to return the clock skew.
272        answer: oneshot::Sender<StdResult<ClockSkew, Bug>>,
273    },
274    /// Perform a clean shutdown on this circuit.
275    CleanShutdown,
276}
277
278// Cmd for sending a relay cell.
279//
280// The contents of this struct are passed to `send_relay_cell`
281#[derive(educe::Educe)]
282#[educe(Debug)]
283pub(crate) struct SendRelayCell {
284    /// The hop number.
285    pub(crate) hop: HopNum,
286    /// Whether to use a RELAY_EARLY cell.
287    pub(crate) early: bool,
288    /// The cell to send.
289    pub(crate) cell: AnyRelayMsgOuter,
290}
291
292/// A [`RunOnceCmdInner`] command to execute at the end of [`Reactor::run_once`].
293#[derive(From, Debug)]
294enum SelectResult {
295    /// Run a single `RunOnceCmdInner` command.
296    Single(RunOnceCmdInner),
297    /// Handle a control message
298    HandleControl(CtrlMsg),
299    /// Handle an input message.
300    HandleCell(ClientCircChanMsg),
301}
302
303impl CircHop {
304    /// Create a new hop.
305    pub(super) fn new(
306        unique_id: UniqId,
307        hop_num: HopNum,
308        format: RelayCellFormat,
309        params: &CircParameters,
310    ) -> Self {
311        CircHop {
312            unique_id,
313            hop_num,
314            map: Arc::new(Mutex::new(streammap::StreamMap::new())),
315            ccontrol: CongestionControl::new(&params.ccontrol),
316            inbound: RelayCellDecoder::new(format),
317        }
318    }
319
320    /// Start a stream. Creates an entry in the stream map with the given channels, and sends the
321    /// `message` to the provided hop.
322    fn begin_stream(
323        &mut self,
324        message: AnyRelayMsg,
325        sender: StreamMpscSender<UnparsedRelayMsg>,
326        rx: StreamMpscReceiver<AnyRelayMsg>,
327        cmd_checker: AnyCmdChecker,
328    ) -> Result<(SendRelayCell, StreamId)> {
329        let send_window = sendme::StreamSendWindow::new(SEND_WINDOW_INIT);
330        let r = self.map.lock().expect("lock poisoned").add_ent(
331            sender,
332            rx,
333            send_window,
334            cmd_checker,
335        )?;
336        let cell = AnyRelayMsgOuter::new(Some(r), message);
337        Ok((
338            SendRelayCell {
339                hop: self.hop_num,
340                early: false,
341                cell,
342            },
343            r,
344        ))
345    }
346
347    /// Close the stream associated with `id` because the stream was
348    /// dropped.
349    ///
350    /// If we have not already received an END cell on this stream, send one.
351    /// If no END cell is specified, an END cell with the reason byte set to
352    /// REASON_MISC will be sent.
353    fn close_stream(
354        &mut self,
355        id: StreamId,
356        message: CloseStreamBehavior,
357        why: streammap::TerminateReason,
358    ) -> Result<Option<SendRelayCell>> {
359        let should_send_end = self.map.lock().expect("lock poisoned").terminate(id, why)?;
360        trace!(
361            "{}: Ending stream {}; should_send_end={:?}",
362            self.unique_id,
363            id,
364            should_send_end
365        );
366        // TODO: I am about 80% sure that we only send an END cell if
367        // we didn't already get an END cell.  But I should double-check!
368        if let (ShouldSendEnd::Send, CloseStreamBehavior::SendEnd(end_message)) =
369            (should_send_end, message)
370        {
371            let end_cell = AnyRelayMsgOuter::new(Some(id), end_message.into());
372            let cell = SendRelayCell {
373                hop: self.hop_num,
374                early: false,
375                cell: end_cell,
376            };
377
378            return Ok(Some(cell));
379        }
380        Ok(None)
381    }
382}
383
384/// An object that's waiting for a meta cell (one not associated with a stream) in order to make
385/// progress.
386///
387/// # Background
388///
389/// The `Reactor` can't have async functions that send and receive cells, because its job is to
390/// send and receive cells: if one of its functions tried to do that, it would just hang forever.
391///
392/// To get around this problem, the reactor can send some cells, and then make one of these
393/// `MetaCellHandler` objects, which will be run when the reply arrives.
394pub(crate) trait MetaCellHandler: Send {
395    /// The hop we're expecting the message to come from. This is compared against the hop
396    /// from which we actually receive messages, and an error is thrown if the two don't match.
397    fn expected_hop(&self) -> HopNum;
398    /// Called when the message we were waiting for arrives.
399    ///
400    /// Gets a copy of the `Reactor` in order to do anything it likes there.
401    ///
402    /// If this function returns an error, the reactor will shut down.
403    fn handle_msg(
404        &mut self,
405        msg: UnparsedRelayMsg,
406        reactor: &mut Circuit,
407    ) -> Result<MetaCellDisposition>;
408}
409
410/// A possible successful outcome of giving a message to a [`MsgHandler`](super::msghandler::MsgHandler).
411#[derive(Debug, Clone)]
412#[cfg_attr(feature = "send-control-msg", visibility::make(pub))]
413#[non_exhaustive]
414pub(super) enum MetaCellDisposition {
415    /// The message was consumed; the handler should remain installed.
416    #[cfg(feature = "send-control-msg")]
417    Consumed,
418    /// The message was consumed; the handler should be uninstalled.
419    ConversationFinished,
420    /// The message was consumed; the circuit should be closed.
421    #[cfg(feature = "send-control-msg")]
422    CloseCirc,
423    // TODO: Eventually we might want the ability to have multiple handlers
424    // installed, and to let them say "not for me, maybe for somebody else?".
425    // But right now we don't need that.
426}
427
428/// A unique identifier for a circuit leg.
429///
430/// After the circuit is torn down, its `LegId` becomes invalid.
431/// The same `LegId` won't be reused for a future circuit.
432//
433// TODO(#1857): make this pub
434#[allow(unused)]
435#[derive(Copy, Clone, Debug, Eq, PartialEq)]
436pub(crate) struct LegId(pub(crate) LegIdKey);
437
438slotmap_careful::new_key_type! {
439    /// A key type for the circuit leg slotmap
440    ///
441    /// See [`LegId`].
442    pub(crate) struct LegIdKey;
443}
444
445/// Unwrap the specified [`Option`], returning a [`ReactorError::Shutdown`] if it is `None`.
446///
447/// This is a macro instead of a function to work around borrowck errors
448/// in the select! from run_once().
449macro_rules! unwrap_or_shutdown {
450    ($self:expr, $res:expr, $reason:expr) => {{
451        match $res {
452            None => {
453                trace!("{}: reactor shutdown due to {}", $self.unique_id, $reason);
454                Err(ReactorError::Shutdown)
455            }
456            Some(v) => Ok(v),
457        }
458    }};
459}
460
461/// Object to handle incoming cells and background tasks on a circuit
462///
463/// This type is returned when you finish a circuit; you need to spawn a
464/// new task that calls `run()` on it.
465#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
466pub struct Reactor {
467    /// Receiver for control messages for this reactor, sent by `ClientCirc` objects.
468    ///
469    /// This channel is polled in [`Reactor::run_once`], but only if the `chan_sender` sink
470    /// is ready to accept cells.
471    control: mpsc::UnboundedReceiver<CtrlMsg>,
472    /// Receiver for command messages for this reactor, sent by `ClientCirc` objects.
473    ///
474    /// This channel is polled in [`Reactor::run_once`].
475    ///
476    /// NOTE: this is a separate channel from `control`, because some messages
477    /// have higher priority and need to be handled even if the `chan_sender` is not
478    /// ready (whereas `control` messages are not read until the `chan_sender` sink
479    /// is ready to accept cells).
480    command: mpsc::UnboundedReceiver<CtrlCmd>,
481    /// A oneshot sender that is used to alert other tasks when this reactor is
482    /// finally dropped.
483    ///
484    /// It is a sender for Void because we never actually want to send anything here;
485    /// we only want to generate canceled events.
486    #[allow(dead_code)] // the only purpose of this field is to be dropped.
487    reactor_closed_tx: oneshot::Sender<void::Void>,
488    /// A set of circuits that form a tunnel.
489    ///
490    /// Contains 1 or more circuits.
491    ///
492    /// Circuits may be added to this set throughout the lifetime of the reactor.
493    //
494    // TODO(conflux): add a control command for adding a circuit leg,
495    // and update these docs to explain how legs are added
496    ///
497    /// Sometimes, the reactor will remove circuits from this set,
498    /// for example if the `LINKED` message takes too long to arrive,
499    /// or if congestion control negotiation fails.
500    /// The reactor will continue running with the remaining circuits.
501    /// It will shut down if *all* the circuits are removed.
502    ///
503    // TODO(conflux): document all the reasons why the reactor might
504    // chose to tear down a circuit or tunnel (timeouts, protocol violations, etc.)
505    circuits: ConfluxSet,
506    /// An identifier for logging about this reactor's circuit.
507    unique_id: UniqId,
508    /// Handlers, shared with `Circuit`.
509    cell_handlers: CellHandlers,
510}
511
512/// Cell handlers, shared between the Reactor and its underlying `Circuit`s.
513struct CellHandlers {
514    /// A handler for a meta cell, together with a result channel to notify on completion.
515    meta_handler: Option<Box<dyn MetaCellHandler + Send>>,
516    /// A handler for incoming stream requests.
517    #[cfg(feature = "hs-service")]
518    incoming_stream_req_handler: Option<IncomingStreamRequestHandler>,
519}
520
521/// A circuit "leg" from a tunnel.
522///
523/// Regular (non-multipath) circuits have a single leg.
524/// Conflux (multipath) circuits have `N` (usually, `N = 2`).
525pub(crate) struct Circuit {
526    /// The channel this circuit is attached to.
527    channel: Arc<Channel>,
528    /// Sender object used to actually send cells.
529    ///
530    /// NOTE: Control messages could potentially add unboundedly to this, although that's
531    ///       not likely to happen (and isn't triggereable from the network, either).
532    chan_sender: SometimesUnboundedSink<AnyChanCell, ChannelSender>,
533    /// Input stream, on which we receive ChanMsg objects from this circuit's
534    /// channel.
535    // TODO: could use a SPSC channel here instead.
536    input: CircuitRxReceiver,
537    /// The cryptographic state for this circuit for inbound cells.
538    /// This object is divided into multiple layers, each of which is
539    /// shared with one hop of the circuit.
540    crypto_in: InboundClientCrypt,
541    /// The cryptographic state for this circuit for outbound cells.
542    crypto_out: OutboundClientCrypt,
543    /// List of hops state objects used by the reactor
544    hops: Vec<CircHop>,
545    /// Mutable information about this circuit, shared with
546    /// [`ClientCirc`](super::ClientCirc).
547    ///
548    // TODO(conflux)/TODO(#1840): this belongs in the Reactor
549    mutable: Arc<Mutex<MutableState>>,
550    /// This circuit's identifier on the upstream channel.
551    channel_id: CircId,
552    /// An identifier for logging about this reactor's circuit.
553    unique_id: UniqId,
554    /// Memory quota account
555    #[allow(dead_code)] // Partly here to keep it alive as long as the circuit
556    memquota: CircuitAccount,
557}
558
559/// Information about an incoming stream request.
560#[cfg(feature = "hs-service")]
561#[derive(Debug, Deftly)]
562#[derive_deftly(HasMemoryCost)]
563pub(crate) struct StreamReqInfo {
564    /// The [`IncomingStreamRequest`].
565    pub(crate) req: IncomingStreamRequest,
566    /// The ID of the stream being requested.
567    pub(crate) stream_id: StreamId,
568    /// The [`HopNum`].
569    //
570    // TODO: When we add support for exit relays, we need to turn this into an Option<HopNum>.
571    // (For outbound messages (towards relays), there is only one hop that can send them: the client.)
572    //
573    // TODO: For onion services, we might be able to enforce the HopNum earlier: we would never accept an
574    // incoming stream request from two separate hops.  (There is only one that's valid.)
575    pub(crate) hop_num: HopNum,
576    /// A channel for receiving messages from this stream.
577    #[deftly(has_memory_cost(indirect_size = "0"))] // estimate
578    pub(crate) receiver: StreamMpscReceiver<UnparsedRelayMsg>,
579    /// A channel for sending messages to be sent on this stream.
580    #[deftly(has_memory_cost(indirect_size = "size_of::<AnyRelayMsg>()"))] // estimate
581    pub(crate) msg_tx: StreamMpscSender<AnyRelayMsg>,
582    /// The memory quota account to be used for this stream
583    #[deftly(has_memory_cost(indirect_size = "0"))] // estimate (it contains an Arc)
584    pub(crate) memquota: StreamAccount,
585}
586
587/// Data required for handling an incoming stream request.
588#[cfg(feature = "hs-service")]
589#[derive(educe::Educe)]
590#[educe(Debug)]
591struct IncomingStreamRequestHandler {
592    /// A sender for sharing information about an incoming stream request.
593    incoming_sender: StreamReqSender,
594    /// A [`AnyCmdChecker`] for validating incoming stream requests.
595    cmd_checker: AnyCmdChecker,
596    /// The hop to expect incoming stream requests from.
597    hop_num: HopNum,
598    /// An [`IncomingStreamRequestFilter`] for checking whether the user wants
599    /// this request, or wants to reject it immediately.
600    #[educe(Debug(ignore))]
601    filter: Box<dyn IncomingStreamRequestFilter>,
602}
603
604impl Reactor {
605    /// Create a new circuit reactor.
606    ///
607    /// The reactor will send outbound messages on `channel`, receive incoming
608    /// messages on `input`, and identify this circuit by the channel-local
609    /// [`CircId`] provided.
610    ///
611    /// The internal unique identifier for this circuit will be `unique_id`.
612    #[allow(clippy::type_complexity)] // TODO
613    pub(super) fn new(
614        channel: Arc<Channel>,
615        channel_id: CircId,
616        unique_id: UniqId,
617        input: CircuitRxReceiver,
618        memquota: CircuitAccount,
619    ) -> (
620        Self,
621        mpsc::UnboundedSender<CtrlMsg>,
622        mpsc::UnboundedSender<CtrlCmd>,
623        oneshot::Receiver<void::Void>,
624        Arc<Mutex<MutableState>>,
625    ) {
626        let crypto_out = OutboundClientCrypt::new();
627        let (control_tx, control_rx) = mpsc::unbounded();
628        let (command_tx, command_rx) = mpsc::unbounded();
629        let mutable = Arc::new(Mutex::new(MutableState::default()));
630
631        let (reactor_closed_tx, reactor_closed_rx) = oneshot::channel();
632
633        let chan_sender = SometimesUnboundedSink::new(channel.sender());
634
635        let cell_handlers = CellHandlers {
636            meta_handler: None,
637            #[cfg(feature = "hs-service")]
638            incoming_stream_req_handler: None,
639        };
640
641        let circuit_leg = Circuit {
642            channel,
643            chan_sender,
644            input,
645            crypto_in: InboundClientCrypt::new(),
646            hops: vec![],
647            unique_id,
648            channel_id,
649            crypto_out,
650            mutable: mutable.clone(),
651            memquota,
652        };
653
654        let reactor = Reactor {
655            circuits: ConfluxSet::new(circuit_leg),
656            control: control_rx,
657            command: command_rx,
658            reactor_closed_tx,
659            unique_id,
660            cell_handlers,
661        };
662
663        (reactor, control_tx, command_tx, reactor_closed_rx, mutable)
664    }
665
666    /// Launch the reactor, and run until the circuit closes or we
667    /// encounter an error.
668    ///
669    /// Once this method returns, the circuit is dead and cannot be
670    /// used again.
671    pub async fn run(mut self) -> Result<()> {
672        trace!("{}: Running circuit reactor", self.unique_id);
673        let result: Result<()> = loop {
674            match self.run_once().await {
675                Ok(()) => (),
676                Err(ReactorError::Shutdown) => break Ok(()),
677                Err(ReactorError::Err(e)) => break Err(e),
678            }
679        };
680        trace!("{}: Circuit reactor stopped: {:?}", self.unique_id, result);
681        result
682    }
683
684    /// Helper for run: doesn't mark the circuit closed on finish.  Only
685    /// processes one cell or control message.
686    async fn run_once(&mut self) -> StdResult<(), ReactorError> {
687        // If all the circuits are closed, shut down the reactor
688        //
689        // TODO(conflux): we might need to rethink this behavior
690        if self.circuits.is_empty() {
691            trace!(
692                "{}: Circuit reactor shutting down: all circuits have closed",
693                self.unique_id
694            );
695
696            return Err(ReactorError::Shutdown);
697        }
698
699        // If this is a single path circuit, we need to wait until the first hop
700        // is created before doing anything else
701        if self
702            .circuits
703            .single_leg_mut()
704            .is_ok_and(|c| c.hops.is_empty())
705        {
706            self.wait_for_create().await?;
707
708            return Ok(());
709        }
710
711        // TODO(conflux): support adding and linking circuits
712        // TODO(conflux): support switching the primary leg
713
714        // TODO(conflux): read from *all* the circuits, not just the primary
715        //
716        // Note: this is a big TODO, and will likely involve factoring out the
717        // chan_sender.prepare_send_from() call into a function on Circuit.
718        // Each Circuit will have its own control channel, for handling control
719        // messages meant for it (I imagine some/all CtrlMsgs will have a LegId
720        // field, and that the reactor will redirect the CtrlMsg to the appropriate
721        // Circuit's control channel?). Putting the control channel (which will
722        // probably receive a CtrlMsgInner type) inside the Circuit should enable
723        // us to lift the prepare_send_from() into a Circuit function, that will
724        // get called from ConfluxSet::poll_all_name_tbd() that can be used in
725        // this select_biased! to select between the channel readiness of *all*
726        // underlying circuits.
727        let primary_leg = self.circuits.primary_leg_mut()?;
728        let mut ready_streams = primary_leg.ready_streams_iterator();
729
730        // Note: We don't actually use the returned SinkSendable,
731        // and continue writing to the SometimesUboundedSink :(
732        let (cmd, _sendable) = select_biased! {
733                res = self.command.next() => {
734                    let cmd = unwrap_or_shutdown!(self, res, "command channel drop")?;
735                    return ControlHandler::new(self).handle_cmd(cmd);
736                },
737                res = primary_leg.chan_sender
738                    .prepare_send_from(async {
739                        select_biased! {
740                            // Check whether we've got a control message pending.
741                            ret = self.control.next() => {
742                                let msg = unwrap_or_shutdown!(self, ret, "control drop")?;
743                                Ok::<_, ReactorError>(Some(SelectResult::HandleControl(msg)))
744                            },
745                            // Check whether we've got an input message pending.
746                            ret = primary_leg.input.next().fuse() => {
747                                let cell = unwrap_or_shutdown!(self, ret, "input drop")?;
748                                Ok(Some(SelectResult::HandleCell(cell)))
749                            },
750                            ret = ready_streams.next().fuse() => {
751                                match ret {
752                                    Some(cmd) => {
753                                        let cmd = cmd?;
754                                        Ok(Some(SelectResult::Single(cmd)))
755                                    },
756                                    None => {
757                                        // There are no ready streams (for example, they may all be
758                                        // blocked due to congestion control), so there is nothing
759                                        // to do.
760                                        Ok(None)
761                                    }
762                                }
763                            }
764                        }
765                    }) => res?,
766        };
767        let cmd = cmd?;
768
769        let cmd = match cmd {
770            None => None,
771            Some(SelectResult::Single(cmd)) => Some(RunOnceCmd::Single(cmd)),
772            Some(SelectResult::HandleControl(ctrl)) => ControlHandler::new(self)
773                .handle_msg(ctrl)?
774                .map(RunOnceCmd::Single),
775            Some(SelectResult::HandleCell(cell)) => {
776                // TODO(conflux): put the LegId of the circuit the cell was received on
777                // inside HandleCell
778                //let circ = self.circuits.leg(leg_id)?;
779
780                let circ = self.circuits.primary_leg_mut()?;
781                circ.handle_cell(&mut self.cell_handlers, cell)?
782            }
783        };
784
785        if let Some(cmd) = cmd {
786            self.handle_run_once_cmd(cmd).await?;
787        }
788
789        Ok(())
790    }
791
792    /// Handle a [`RunOnceCmd`].
793    async fn handle_run_once_cmd(&mut self, cmd: RunOnceCmd) -> StdResult<(), ReactorError> {
794        match cmd {
795            RunOnceCmd::Single(cmd) => return self.handle_single_run_once_cmd(cmd).await,
796            RunOnceCmd::Multiple(cmds) => {
797                // While we know `sendable` is ready to accept *one* cell,
798                // we can't be certain it will be able to accept *all* of the cells
799                // that need to be sent here. This means we *may* end up buffering
800                // in its underlying SometimesUnboundedSink! That is OK, because
801                // RunOnceCmd::Multiple is only used for handling packed cells.
802                for cmd in cmds {
803                    self.handle_single_run_once_cmd(cmd).await?;
804                }
805            }
806        }
807
808        Ok(())
809    }
810
811    /// Handle a [`RunOnceCmd`].
812    async fn handle_single_run_once_cmd(
813        &mut self,
814        cmd: RunOnceCmdInner,
815    ) -> StdResult<(), ReactorError> {
816        match cmd {
817            RunOnceCmdInner::Send { cell, done } => {
818                // TODO: check the cc window
819
820                // TODO(conflux): let the RunOnceCmdInner specify which leg to send the cell on
821                let res = self.circuits.primary_leg_mut()?.send_relay_cell(cell).await;
822                if let Some(done) = done {
823                    // Don't care if the receiver goes away
824                    let _ = done.send(res.clone());
825                }
826                res?;
827            }
828            #[cfg(feature = "send-control-msg")]
829            RunOnceCmdInner::SendMsgAndInstallHandler { msg, handler, done } => {
830                let cell: Result<Option<SendRelayCell>> =
831                    self.prepare_msg_and_install_handler(msg, handler);
832
833                match cell {
834                    Ok(Some(cell)) => {
835                        // TODO(conflux): let the RunOnceCmdInner specify which leg to send the cell on
836                        let outcome = self.circuits.primary_leg_mut()?.send_relay_cell(cell).await;
837                        // don't care if receiver goes away.
838                        let _ = done.send(outcome.clone());
839                        outcome?;
840                    }
841                    Ok(None) => {
842                        // don't care if receiver goes away.
843                        let _ = done.send(Ok(()));
844                    }
845                    Err(e) => {
846                        // don't care if receiver goes away.
847                        let _ = done.send(Err(e.clone()));
848                        return Err(e.into());
849                    }
850                }
851            }
852            // TODO(conflux)/TODO(#1857): should this take a leg_id argument?
853            // Currently, we always begin streams on the primary leg
854            RunOnceCmdInner::BeginStream { cell, done } => {
855                match cell {
856                    Ok((cell, stream_id)) => {
857                        // TODO(conflux): let the RunOnceCmdInner specify which leg to send the cell on
858                        // (currently it is an error to use BeginStream on a multipath tunnel)
859                        let outcome = self.circuits.single_leg_mut()?.send_relay_cell(cell).await;
860                        // don't care if receiver goes away.
861                        let _ = done.send(outcome.clone().map(|_| stream_id));
862                        outcome?;
863                    }
864                    Err(e) => {
865                        // don't care if receiver goes away.
866                        let _ = done.send(Err(e.clone()));
867                        return Err(e.into());
868                    }
869                }
870            }
871            RunOnceCmdInner::CloseStream {
872                hop_num,
873                sid,
874                behav,
875                reason,
876                done,
877            } => {
878                // TODO(conflux): currently, it is an error to use CloseStream
879                // with a multi-path circuit.
880                let leg = self.circuits.single_leg_mut()?;
881                let res: Result<()> = leg.close_stream(hop_num, sid, behav, reason).await;
882
883                if let Some(done) = done {
884                    // don't care if the sender goes away
885                    let _ = done.send(res);
886                }
887            }
888            RunOnceCmdInner::HandleSendMe { hop, sendme } => {
889                // TODO(conflux): this should specify which leg of the circuit the SENDME
890                // came on
891                let leg = self.circuits.single_leg_mut()?;
892                // NOTE: it's okay to await. We are only awaiting on the congestion_signals
893                // future which *should* resolve immediately
894                let signals = leg.congestion_signals().await;
895                leg.handle_sendme(hop, sendme, signals)?;
896            }
897            RunOnceCmdInner::FirstHopClockSkew { answer } => {
898                let res = self
899                    .circuits
900                    .single_leg_mut()
901                    .map(|leg| leg.channel.clock_skew());
902
903                // don't care if the sender goes away
904                let _ = answer.send(res);
905            }
906            RunOnceCmdInner::CleanShutdown => {
907                trace!("{}: reactor shutdown due to handled cell", self.unique_id);
908                return Err(ReactorError::Shutdown);
909            }
910        }
911
912        Ok(())
913    }
914
915    /// Wait for a [`CtrlMsg::Create`] to come along to set up the circuit.
916    ///
917    /// Returns an error if an unexpected `CtrlMsg` is received.
918    async fn wait_for_create(&mut self) -> StdResult<(), ReactorError> {
919        let msg = select_biased! {
920            res = self.command.next() => {
921                let cmd = unwrap_or_shutdown!(self, res, "shutdown channel drop")?;
922                match cmd {
923                    CtrlCmd::Shutdown => return self.handle_shutdown().map(|_| ()),
924                    #[cfg(test)]
925                    CtrlCmd::AddFakeHop {
926                        relay_cell_format: format,
927                        fwd_lasthop,
928                        rev_lasthop,
929                        params,
930                        done,
931                    } => {
932                        self.circuits.single_leg_mut()?.handle_add_fake_hop(format, fwd_lasthop, rev_lasthop, &params, done);
933                        return Ok(())
934                    },
935                    _ => {
936                        trace!("reactor shutdown due to unexpected command: {:?}", cmd);
937                        return Err(Error::CircProto(format!("Unexpected control {cmd:?} on client circuit")).into());
938                    }
939                }
940            },
941            res = self.control.next() => unwrap_or_shutdown!(self, res, "control drop")?,
942        };
943
944        match msg {
945            CtrlMsg::Create {
946                recv_created,
947                handshake,
948                params,
949                done,
950            } => {
951                // TODO(conflux): instead of crashing the reactor, it might be better
952                // to send the error via the done channel instead
953                let leg = self.circuits.single_leg_mut()?;
954                leg.handle_create(recv_created, handshake, &params, done)
955                    .await
956            }
957            _ => {
958                trace!("reactor shutdown due to unexpected cell: {:?}", msg);
959
960                Err(Error::CircProto(format!("Unexpected {msg:?} cell on client circuit")).into())
961            }
962        }
963    }
964
965    /// Prepare a `SendRelayCell` request, and install the given meta-cell handler.
966    fn prepare_msg_and_install_handler(
967        &mut self,
968        msg: Option<AnyRelayMsgOuter>,
969        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
970    ) -> Result<Option<SendRelayCell>> {
971        let msg = msg
972            .map(|msg| {
973                let handlers = &mut self.cell_handlers;
974                let handler = handler
975                    .as_ref()
976                    .or(handlers.meta_handler.as_ref())
977                    .ok_or_else(|| internal!("tried to use an ended Conversation"))?;
978                Ok::<_, crate::Error>(SendRelayCell {
979                    hop: handler.expected_hop(),
980                    early: false,
981                    cell: msg,
982                })
983            })
984            .transpose()?;
985
986        if let Some(handler) = handler {
987            self.cell_handlers.set_meta_handler(handler)?;
988        }
989
990        Ok(msg)
991    }
992
993    /// Handle a shutdown request.
994    fn handle_shutdown(&self) -> StdResult<Option<RunOnceCmdInner>, ReactorError> {
995        trace!(
996            "{}: reactor shutdown due to explicit request",
997            self.unique_id
998        );
999
1000        Err(ReactorError::Shutdown)
1001    }
1002}
1003
1004impl Circuit {
1005    /// Handle a [`CtrlMsg::AddFakeHop`] message.
1006    #[cfg(test)]
1007    fn handle_add_fake_hop(
1008        &mut self,
1009        format: RelayCellFormat,
1010        fwd_lasthop: bool,
1011        rev_lasthop: bool,
1012        params: &CircParameters,
1013        done: ReactorResultChannel<()>,
1014    ) {
1015        use crate::tunnel::circuit::test::DummyCrypto;
1016
1017        let dummy_peer_id = tor_linkspec::OwnedChanTarget::builder()
1018            .ed_identity([4; 32].into())
1019            .rsa_identity([5; 20].into())
1020            .build()
1021            .expect("Could not construct fake hop");
1022
1023        let fwd = Box::new(DummyCrypto::new(fwd_lasthop));
1024        let rev = Box::new(DummyCrypto::new(rev_lasthop));
1025        let binding = None;
1026        self.add_hop(
1027            format,
1028            path::HopDetail::Relay(dummy_peer_id),
1029            fwd,
1030            rev,
1031            binding,
1032            params,
1033        );
1034        let _ = done.send(Ok(()));
1035    }
1036
1037    /// Encode `msg` and encrypt it, returning the resulting cell
1038    /// and tag that should be expected for an authenticated SENDME sent
1039    /// in response to that cell.
1040    fn encode_relay_cell(
1041        crypto_out: &mut OutboundClientCrypt,
1042        hop: HopNum,
1043        early: bool,
1044        msg: AnyRelayMsgOuter,
1045    ) -> Result<(AnyChanMsg, &[u8; SENDME_TAG_LEN])> {
1046        let mut body: RelayCellBody = msg
1047            .encode(&mut rand::thread_rng())
1048            .map_err(|e| Error::from_cell_enc(e, "relay cell body"))?
1049            .into();
1050        let tag = crypto_out.encrypt(&mut body, hop)?;
1051        let msg = Relay::from(BoxedCellBody::from(body));
1052        let msg = if early {
1053            AnyChanMsg::RelayEarly(msg.into())
1054        } else {
1055            AnyChanMsg::Relay(msg)
1056        };
1057
1058        Ok((msg, tag))
1059    }
1060
1061    /// Encode `msg`, encrypt it, and send it to the 'hop'th hop.
1062    ///
1063    /// If there is insufficient outgoing *circuit-level* or *stream-level*
1064    /// SENDME window, an error is returned instead.
1065    ///
1066    /// Does not check whether the cell is well-formed or reasonable.
1067    async fn send_relay_cell(&mut self, msg: SendRelayCell) -> Result<()> {
1068        let SendRelayCell {
1069            hop,
1070            early,
1071            cell: msg,
1072        } = msg;
1073
1074        let c_t_w = sendme::cmd_counts_towards_windows(msg.cmd());
1075        let stream_id = msg.stream_id();
1076        let hop_num = Into::<usize>::into(hop);
1077        let circhop = &mut self.hops[hop_num];
1078
1079        // We need to apply stream-level flow control *before* encoding the message.
1080        if c_t_w {
1081            if let Some(stream_id) = stream_id {
1082                let mut hop_map = circhop.map.lock().expect("lock poisoned");
1083                let Some(StreamEntMut::Open(ent)) = hop_map.get_mut(stream_id) else {
1084                    warn!(
1085                        "{}: sending a relay cell for non-existent or non-open stream with ID {}!",
1086                        self.unique_id, stream_id
1087                    );
1088                    return Err(Error::CircProto(format!(
1089                        "tried to send a relay cell on non-open stream {}",
1090                        sv(stream_id),
1091                    )));
1092                };
1093                ent.take_capacity_to_send(msg.msg())?;
1094            }
1095        }
1096        // NOTE(eta): Now that we've encrypted the cell, we *must* either send it or abort
1097        //            the whole circuit (e.g. by returning an error).
1098        let (msg, tag) = Self::encode_relay_cell(&mut self.crypto_out, hop, early, msg)?;
1099        // The cell counted for congestion control, inform our algorithm of such and pass down the
1100        // tag for authenticated SENDMEs.
1101        if c_t_w {
1102            circhop.ccontrol.note_data_sent(tag)?;
1103        }
1104
1105        let cell = AnyChanCell::new(Some(self.channel_id), msg);
1106        Pin::new(&mut self.chan_sender).send_unbounded(cell).await?;
1107
1108        Ok(())
1109    }
1110
1111    /// Helper: process a cell on a channel.  Most cells get ignored
1112    /// or rejected; a few get delivered to circuits.
1113    ///
1114    /// Return `CellStatus::CleanShutdown` if we should exit.
1115    fn handle_cell(
1116        &mut self,
1117        handlers: &mut CellHandlers,
1118        cell: ClientCircChanMsg,
1119    ) -> Result<Option<RunOnceCmd>> {
1120        trace!("{}: handling cell: {:?}", self.unique_id, cell);
1121        use ClientCircChanMsg::*;
1122        match cell {
1123            Relay(r) => self.handle_relay_cell(handlers, r),
1124            Destroy(d) => {
1125                let reason = d.reason();
1126                debug!(
1127                    "{}: Received DESTROY cell. Reason: {} [{}]",
1128                    self.unique_id,
1129                    reason.human_str(),
1130                    reason
1131                );
1132
1133                self.handle_destroy_cell()
1134                    .map(|c| Some(RunOnceCmd::Single(c)))
1135            }
1136        }
1137    }
1138
1139    /// Decode `cell`, returning its corresponding hop number, tag,
1140    /// and decoded body.
1141    fn decode_relay_cell(
1142        &mut self,
1143        cell: Relay,
1144    ) -> Result<(HopNum, CircTag, RelayCellDecoderResult)> {
1145        let mut body = cell.into_relay_body().into();
1146
1147        // Decrypt the cell. If it's recognized, then find the
1148        // corresponding hop.
1149        let (hopnum, tag) = self.crypto_in.decrypt(&mut body)?;
1150        // Make a copy of the authentication tag. TODO: I'd rather not
1151        // copy it, but I don't see a way around it right now.
1152        let tag = {
1153            let mut tag_copy = [0_u8; SENDME_TAG_LEN];
1154            // TODO(nickm): This could crash if the tag length changes.  We'll
1155            // have to refactor it then.
1156            tag_copy.copy_from_slice(tag);
1157            tag_copy
1158        };
1159
1160        // Decode the cell.
1161        let decode_res = self
1162            .hop_mut(hopnum)
1163            .ok_or_else(|| {
1164                Error::from(internal!(
1165                    "Trying to decode cell from nonexistent hop {:?}",
1166                    hopnum
1167                ))
1168            })?
1169            .inbound
1170            .decode(body.into())
1171            .map_err(|e| Error::from_bytes_err(e, "relay cell"))?;
1172
1173        Ok((hopnum, tag.into(), decode_res))
1174    }
1175
1176    /// React to a Relay or RelayEarly cell.
1177    fn handle_relay_cell(
1178        &mut self,
1179        handlers: &mut CellHandlers,
1180        cell: Relay,
1181    ) -> Result<Option<RunOnceCmd>> {
1182        let (hopnum, tag, decode_res) = self.decode_relay_cell(cell)?;
1183
1184        let c_t_w = decode_res.cmds().any(sendme::cmd_counts_towards_windows);
1185
1186        // Decrement the circuit sendme windows, and see if we need to
1187        // send a sendme cell.
1188        let send_circ_sendme = if c_t_w {
1189            self.hop_mut(hopnum)
1190                .ok_or_else(|| Error::CircProto("Sendme from nonexistent hop".into()))?
1191                .ccontrol
1192                .note_data_received()?
1193        } else {
1194            false
1195        };
1196
1197        let mut run_once_cmds = vec![];
1198        // If we do need to send a circuit-level SENDME cell, do so.
1199        if send_circ_sendme {
1200            // This always sends a V1 (tagged) sendme cell, and thereby assumes
1201            // that SendmeEmitMinVersion is no more than 1.  If the authorities
1202            // every increase that parameter to a higher number, this will
1203            // become incorrect.  (Higher numbers are not currently defined.)
1204            let sendme = Sendme::new_tag(tag.into());
1205            let cell = AnyRelayMsgOuter::new(None, sendme.into());
1206            run_once_cmds.push(RunOnceCmdInner::Send {
1207                cell: SendRelayCell {
1208                    hop: hopnum,
1209                    early: false,
1210                    cell,
1211                },
1212                done: None,
1213            });
1214
1215            // Inform congestion control of the SENDME we are sending. This is a circuit level one.
1216            self.hop_mut(hopnum)
1217                .ok_or_else(|| {
1218                    Error::from(internal!(
1219                        "Trying to send SENDME to nonexistent hop {:?}",
1220                        hopnum
1221                    ))
1222                })?
1223                .ccontrol
1224                .note_sendme_sent()?;
1225        }
1226
1227        let (mut msgs, incomplete) = decode_res.into_parts();
1228        while let Some(msg) = msgs.next() {
1229            let msg_status = self.handle_relay_msg(handlers, hopnum, c_t_w, msg)?;
1230
1231            match msg_status {
1232                None => continue,
1233                Some(msg @ RunOnceCmdInner::CleanShutdown) => {
1234                    for m in msgs {
1235                        debug!(
1236                            "{id}: Ignoring relay msg received after triggering shutdown: {m:?}",
1237                            id = self.unique_id
1238                        );
1239                    }
1240                    if let Some(incomplete) = incomplete {
1241                        debug!(
1242                            "{id}: Ignoring partial relay msg received after triggering shutdown: {:?}",
1243                            incomplete,
1244                            id=self.unique_id,
1245                        );
1246                    }
1247                    run_once_cmds.push(msg);
1248                    return Ok(Some(RunOnceCmd::Multiple(run_once_cmds)));
1249                }
1250                Some(msg) => {
1251                    run_once_cmds.push(msg);
1252                }
1253            }
1254        }
1255
1256        Ok(Some(RunOnceCmd::Multiple(run_once_cmds)))
1257    }
1258
1259    /// Handle a single incoming relay message.
1260    fn handle_relay_msg(
1261        &mut self,
1262        handlers: &mut CellHandlers,
1263        hopnum: HopNum,
1264        cell_counts_toward_windows: bool,
1265        msg: UnparsedRelayMsg,
1266    ) -> Result<Option<RunOnceCmdInner>> {
1267        // If this msg wants/refuses to have a Stream ID, does it
1268        // have/not have one?
1269        let streamid = msg_streamid(&msg)?;
1270
1271        // If this doesn't have a StreamId, it's a meta cell,
1272        // not meant for a particular stream.
1273        let Some(streamid) = streamid else {
1274            return self.handle_meta_cell(handlers, hopnum, msg);
1275        };
1276
1277        let hop = self
1278            .hop_mut(hopnum)
1279            .ok_or_else(|| Error::CircProto("Cell from nonexistent hop!".into()))?;
1280        let mut hop_map = hop.map.lock().expect("lock poisoned");
1281        match hop_map.get_mut(streamid) {
1282            Some(StreamEntMut::Open(ent)) => {
1283                let message_closes_stream =
1284                    Self::deliver_msg_to_stream(streamid, ent, cell_counts_toward_windows, msg)?;
1285
1286                if message_closes_stream {
1287                    hop_map.ending_msg_received(streamid)?;
1288                }
1289            }
1290            #[cfg(feature = "hs-service")]
1291            Some(StreamEntMut::EndSent(_))
1292                if matches!(
1293                    msg.cmd(),
1294                    RelayCmd::BEGIN | RelayCmd::BEGIN_DIR | RelayCmd::RESOLVE
1295                ) =>
1296            {
1297                // If the other side is sending us a BEGIN but hasn't yet acknowledged our END
1298                // message, just remove the old stream from the map and stop waiting for a
1299                // response
1300                hop_map.ending_msg_received(streamid)?;
1301                drop(hop_map);
1302                return self.handle_incoming_stream_request(handlers, msg, streamid, hopnum);
1303            }
1304            Some(StreamEntMut::EndSent(EndSentStreamEnt { half_stream, .. })) => {
1305                // We sent an end but maybe the other side hasn't heard.
1306
1307                match half_stream.handle_msg(msg)? {
1308                    StreamStatus::Open => {}
1309                    StreamStatus::Closed => {
1310                        hop_map.ending_msg_received(streamid)?;
1311                    }
1312                }
1313            }
1314            #[cfg(feature = "hs-service")]
1315            None if matches!(
1316                msg.cmd(),
1317                RelayCmd::BEGIN | RelayCmd::BEGIN_DIR | RelayCmd::RESOLVE
1318            ) =>
1319            {
1320                drop(hop_map);
1321                return self.handle_incoming_stream_request(handlers, msg, streamid, hopnum);
1322            }
1323            _ => {
1324                // No stream wants this message, or ever did.
1325                return Err(Error::CircProto(
1326                    "Cell received on nonexistent stream!?".into(),
1327                ));
1328            }
1329        }
1330        Ok(None)
1331    }
1332
1333    /// Deliver `msg` to the specified open stream entry `ent`.
1334    fn deliver_msg_to_stream(
1335        streamid: StreamId,
1336        ent: &mut OpenStreamEnt,
1337        cell_counts_toward_windows: bool,
1338        msg: UnparsedRelayMsg,
1339    ) -> Result<bool> {
1340        // The stream for this message exists, and is open.
1341
1342        if msg.cmd() == RelayCmd::SENDME {
1343            let _sendme = msg
1344                .decode::<Sendme>()
1345                .map_err(|e| Error::from_bytes_err(e, "Sendme message on stream"))?
1346                .into_msg();
1347            // We need to handle sendmes here, not in the stream's
1348            // recv() method, or else we'd never notice them if the
1349            // stream isn't reading.
1350            ent.put_for_incoming_sendme()?;
1351            return Ok(false);
1352        }
1353
1354        let message_closes_stream = ent.cmd_checker.check_msg(&msg)? == StreamStatus::Closed;
1355
1356        if let Err(e) = Pin::new(&mut ent.sink).try_send(msg) {
1357            if e.is_full() {
1358                // If we get here, we either have a logic bug (!), or an attacker
1359                // is sending us more cells than we asked for via congestion control.
1360                return Err(Error::CircProto(format!(
1361                    "Stream sink would block; received too many cells on stream ID {}",
1362                    sv(streamid),
1363                )));
1364            }
1365            if e.is_disconnected() && cell_counts_toward_windows {
1366                // the other side of the stream has gone away; remember
1367                // that we received a cell that we couldn't queue for it.
1368                //
1369                // Later this value will be recorded in a half-stream.
1370                ent.dropped += 1;
1371            }
1372        }
1373
1374        Ok(message_closes_stream)
1375    }
1376
1377    /// A helper for handling incoming stream requests.
1378    #[cfg(feature = "hs-service")]
1379    fn handle_incoming_stream_request(
1380        &mut self,
1381        handlers: &mut CellHandlers,
1382        msg: UnparsedRelayMsg,
1383        stream_id: StreamId,
1384        hop_num: HopNum,
1385    ) -> Result<Option<RunOnceCmdInner>> {
1386        use syncview::ClientCircSyncView;
1387        use tor_cell::relaycell::msg::EndReason;
1388        use tor_error::into_internal;
1389        use tor_log_ratelim::log_ratelim;
1390
1391        // We need to construct this early so that we don't double-borrow &mut self
1392
1393        let Some(handler) = handlers.incoming_stream_req_handler.as_mut() else {
1394            return Err(Error::CircProto(
1395                "Cannot handle BEGIN cells on this circuit".into(),
1396            ));
1397        };
1398
1399        if hop_num != handler.hop_num {
1400            return Err(Error::CircProto(format!(
1401                "Expecting incoming streams from {}, but received {} cell from unexpected hop {}",
1402                handler.hop_num.display(),
1403                msg.cmd(),
1404                hop_num.display()
1405            )));
1406        }
1407
1408        let message_closes_stream = handler.cmd_checker.check_msg(&msg)? == StreamStatus::Closed;
1409
1410        // TODO: we've already looked up the `hop` in handle_relay_cell, so we shouldn't
1411        // have to look it up again! However, we can't pass the `&mut hop` reference from
1412        // `handle_relay_cell` to this function, because that makes Rust angry (we'd be
1413        // borrowing self as mutable more than once).
1414        //
1415        // TODO: we _could_ use self.hops.get_mut(..) instead self.hop_mut(..) inside
1416        // handle_relay_cell to work around the problem described above
1417        let hop = self
1418            .hops
1419            .get_mut(Into::<usize>::into(hop_num))
1420            .ok_or(Error::CircuitClosed)?;
1421
1422        if message_closes_stream {
1423            hop.map
1424                .lock()
1425                .expect("lock poisoned")
1426                .ending_msg_received(stream_id)?;
1427
1428            return Ok(None);
1429        }
1430
1431        let begin = msg
1432            .decode::<Begin>()
1433            .map_err(|e| Error::from_bytes_err(e, "Invalid Begin message"))?
1434            .into_msg();
1435
1436        let req = IncomingStreamRequest::Begin(begin);
1437
1438        {
1439            use crate::stream::IncomingStreamRequestDisposition::*;
1440
1441            let ctx = crate::stream::IncomingStreamRequestContext { request: &req };
1442            // IMPORTANT: ClientCircSyncView::n_open_streams() (called via disposition() below)
1443            // accesses the stream map mutexes!
1444            //
1445            // This means it's very important not to call this function while any of the hop's
1446            // stream map mutex is held.
1447            let view = ClientCircSyncView::new(&self.hops);
1448
1449            match handler.filter.as_mut().disposition(&ctx, &view)? {
1450                Accept => {}
1451                CloseCircuit => return Ok(Some(RunOnceCmdInner::CleanShutdown)),
1452                RejectRequest(end) => {
1453                    let end_msg = AnyRelayMsgOuter::new(Some(stream_id), end.into());
1454                    let cell = SendRelayCell {
1455                        hop: hop_num,
1456                        early: false,
1457                        cell: end_msg,
1458                    };
1459                    return Ok(Some(RunOnceCmdInner::Send { cell, done: None }));
1460                }
1461            }
1462        }
1463
1464        // TODO: Sadly, we need to look up `&mut hop` yet again,
1465        // since we needed to pass `&self.hops` by reference to our filter above. :(
1466        let hop = self
1467            .hops
1468            .get_mut(Into::<usize>::into(hop_num))
1469            .ok_or(Error::CircuitClosed)?;
1470
1471        let memquota = StreamAccount::new(&self.memquota)?;
1472
1473        let (sender, receiver) = MpscSpec::new(STREAM_READER_BUFFER).new_mq(
1474            self.chan_sender.as_inner().time_provider().clone(),
1475            memquota.as_raw_account(),
1476        )?;
1477        let (msg_tx, msg_rx) = MpscSpec::new(super::CIRCUIT_BUFFER_SIZE).new_mq(
1478            self.chan_sender.as_inner().time_provider().clone(),
1479            memquota.as_raw_account(),
1480        )?;
1481
1482        let send_window = sendme::StreamSendWindow::new(SEND_WINDOW_INIT);
1483        let cmd_checker = DataCmdChecker::new_connected();
1484        hop.map.lock().expect("lock poisoned").add_ent_with_id(
1485            sender,
1486            msg_rx,
1487            send_window,
1488            stream_id,
1489            cmd_checker,
1490        )?;
1491
1492        let outcome = Pin::new(&mut handler.incoming_sender).try_send(StreamReqInfo {
1493            req,
1494            stream_id,
1495            hop_num,
1496            msg_tx,
1497            receiver,
1498            memquota,
1499        });
1500
1501        log_ratelim!("Delivering message to incoming stream handler"; outcome);
1502
1503        if let Err(e) = outcome {
1504            if e.is_full() {
1505                // The IncomingStreamRequestHandler's stream is full; it isn't
1506                // handling requests fast enough. So instead, we reply with an
1507                // END cell.
1508                let end_msg = AnyRelayMsgOuter::new(
1509                    Some(stream_id),
1510                    End::new_with_reason(EndReason::RESOURCELIMIT).into(),
1511                );
1512
1513                let cell = SendRelayCell {
1514                    hop: hop_num,
1515                    early: false,
1516                    cell: end_msg,
1517                };
1518                return Ok(Some(RunOnceCmdInner::Send { cell, done: None }));
1519            } else if e.is_disconnected() {
1520                // The IncomingStreamRequestHandler's stream has been dropped.
1521                // In the Tor protocol as it stands, this always means that the
1522                // circuit itself is out-of-use and should be closed. (See notes
1523                // on `allow_stream_requests.`)
1524                //
1525                // Note that we will _not_ reach this point immediately after
1526                // the IncomingStreamRequestHandler is dropped; we won't hit it
1527                // until we next get an incoming request.  Thus, if we do later
1528                // want to add early detection for a dropped
1529                // IncomingStreamRequestHandler, we need to do it elsewhere, in
1530                // a different way.
1531                debug!(
1532                    "{}: Incoming stream request receiver dropped",
1533                    self.unique_id
1534                );
1535                // This will _cause_ the circuit to get closed.
1536                return Err(Error::CircuitClosed);
1537            } else {
1538                // There are no errors like this with the current design of
1539                // futures::mpsc, but we shouldn't just ignore the possibility
1540                // that they'll be added later.
1541                return Err(Error::from((into_internal!(
1542                    "try_send failed unexpectedly"
1543                ))(e)));
1544            }
1545        }
1546
1547        Ok(None)
1548    }
1549
1550    /// Helper: process a destroy cell.
1551    #[allow(clippy::unnecessary_wraps)]
1552    fn handle_destroy_cell(&mut self) -> Result<RunOnceCmdInner> {
1553        // I think there is nothing more to do here.
1554        Ok(RunOnceCmdInner::CleanShutdown)
1555    }
1556
1557    /// Handle a [`CtrlMsg::Create`] message.
1558    async fn handle_create(
1559        &mut self,
1560        recv_created: oneshot::Receiver<CreateResponse>,
1561        handshake: CircuitHandshake,
1562        params: &CircParameters,
1563        done: ReactorResultChannel<()>,
1564    ) -> StdResult<(), ReactorError> {
1565        let ret = match handshake {
1566            CircuitHandshake::CreateFast => self.create_firsthop_fast(recv_created, params).await,
1567            CircuitHandshake::Ntor {
1568                public_key,
1569                ed_identity,
1570            } => {
1571                self.create_firsthop_ntor(recv_created, ed_identity, public_key, params)
1572                    .await
1573            }
1574            #[cfg(feature = "ntor_v3")]
1575            CircuitHandshake::NtorV3 { public_key } => {
1576                self.create_firsthop_ntor_v3(recv_created, public_key, params)
1577                    .await
1578            }
1579        };
1580        let _ = done.send(ret); // don't care if sender goes away
1581
1582        // TODO: maybe we don't need to flush here?
1583        // (we could let run_once() handle all the flushing)
1584        self.chan_sender.flush().await?;
1585
1586        Ok(())
1587    }
1588
1589    /// Helper: create the first hop of a circuit.
1590    ///
1591    /// This is parameterized not just on the RNG, but a wrapper object to
1592    /// build the right kind of create cell, and a handshake object to perform
1593    /// the cryptographic handshake.
1594    async fn create_impl<H, W, M>(
1595        &mut self,
1596        cell_protocol: RelayCryptLayerProtocol,
1597        recvcreated: oneshot::Receiver<CreateResponse>,
1598        wrap: &W,
1599        key: &H::KeyType,
1600        params: &CircParameters,
1601        msg: &M,
1602    ) -> Result<()>
1603    where
1604        H: ClientHandshake + HandshakeAuxDataHandler,
1605        W: CreateHandshakeWrap,
1606        H::KeyGen: KeyGenerator,
1607        M: Borrow<H::ClientAuxData>,
1608    {
1609        // We don't need to shut down the circuit on failure here, since this
1610        // function consumes the PendingClientCirc and only returns
1611        // a ClientCirc on success.
1612
1613        let (state, msg) = {
1614            // done like this because holding the RNG across an await boundary makes the future
1615            // non-Send
1616            let mut rng = rand::thread_rng();
1617            H::client1(&mut rng, key, msg)?
1618        };
1619        let create_cell = wrap.to_chanmsg(msg);
1620        trace!(
1621            "{}: Extending to hop 1 with {}",
1622            self.unique_id,
1623            create_cell.cmd()
1624        );
1625        self.send_msg(create_cell).await?;
1626
1627        let reply = recvcreated
1628            .await
1629            .map_err(|_| Error::CircProto("Circuit closed while waiting".into()))?;
1630
1631        let relay_handshake = wrap.decode_chanmsg(reply)?;
1632        let (server_msg, keygen) = H::client2(state, relay_handshake)?;
1633
1634        H::handle_server_aux_data(params, &server_msg)?;
1635
1636        let relay_cell_format = cell_protocol.relay_cell_format();
1637        let BoxedClientLayer { fwd, back, binding } =
1638            cell_protocol.construct_layers(HandshakeRole::Initiator, keygen)?;
1639
1640        trace!("{}: Handshake complete; circuit created.", self.unique_id);
1641
1642        let peer_id = self.channel.target().clone();
1643
1644        self.add_hop(
1645            relay_cell_format,
1646            path::HopDetail::Relay(peer_id),
1647            fwd,
1648            back,
1649            binding,
1650            params,
1651        );
1652        Ok(())
1653    }
1654
1655    /// Use the (questionable!) CREATE_FAST handshake to connect to the
1656    /// first hop of this circuit.
1657    ///
1658    /// There's no authentication in CREATE_FAST,
1659    /// so we don't need to know whom we're connecting to: we're just
1660    /// connecting to whichever relay the channel is for.
1661    async fn create_firsthop_fast(
1662        &mut self,
1663        recvcreated: oneshot::Receiver<CreateResponse>,
1664        params: &CircParameters,
1665    ) -> Result<()> {
1666        // In a CREATE_FAST handshake, we can't negotiate a format other than this.
1667        let protocol = RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0);
1668        let wrap = CreateFastWrap;
1669        self.create_impl::<CreateFastClient, _, _>(protocol, recvcreated, &wrap, &(), params, &())
1670            .await
1671    }
1672
1673    /// Use the ntor handshake to connect to the first hop of this circuit.
1674    ///
1675    /// Note that the provided keys must match the channel's target,
1676    /// or the handshake will fail.
1677    async fn create_firsthop_ntor(
1678        &mut self,
1679        recvcreated: oneshot::Receiver<CreateResponse>,
1680        ed_identity: pk::ed25519::Ed25519Identity,
1681        pubkey: NtorPublicKey,
1682        params: &CircParameters,
1683    ) -> Result<()> {
1684        // In an ntor handshake, we can't negotiate a format other than this.
1685        let relay_cell_protocol = RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0);
1686
1687        // Exit now if we have an Ed25519 or RSA identity mismatch.
1688        let target = RelayIds::builder()
1689            .ed_identity(ed_identity)
1690            .rsa_identity(pubkey.id)
1691            .build()
1692            .expect("Unable to build RelayIds");
1693        self.channel.check_match(&target)?;
1694
1695        let wrap = Create2Wrap {
1696            handshake_type: HandshakeType::NTOR,
1697        };
1698        self.create_impl::<NtorClient, _, _>(
1699            relay_cell_protocol,
1700            recvcreated,
1701            &wrap,
1702            &pubkey,
1703            params,
1704            &(),
1705        )
1706        .await
1707    }
1708
1709    /// Use the ntor-v3 handshake to connect to the first hop of this circuit.
1710    ///
1711    /// Note that the provided key must match the channel's target,
1712    /// or the handshake will fail.
1713    #[cfg(feature = "ntor_v3")]
1714    async fn create_firsthop_ntor_v3(
1715        &mut self,
1716        recvcreated: oneshot::Receiver<CreateResponse>,
1717        pubkey: NtorV3PublicKey,
1718        params: &CircParameters,
1719    ) -> Result<()> {
1720        // Exit now if we have a mismatched key.
1721        let target = RelayIds::builder()
1722            .ed_identity(pubkey.id)
1723            .build()
1724            .expect("Unable to build RelayIds");
1725        self.channel.check_match(&target)?;
1726
1727        // TODO: Add support for negotiating other formats.
1728        let relay_cell_protocol = RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0);
1729
1730        // TODO: Set client extensions. e.g. request congestion control
1731        // if specified in `params`.
1732        let client_extensions = [];
1733
1734        let wrap = Create2Wrap {
1735            handshake_type: HandshakeType::NTOR_V3,
1736        };
1737
1738        self.create_impl::<NtorV3Client, _, _>(
1739            relay_cell_protocol,
1740            recvcreated,
1741            &wrap,
1742            &pubkey,
1743            params,
1744            &client_extensions,
1745        )
1746        .await
1747    }
1748
1749    /// Add a hop to the end of this circuit.
1750    fn add_hop(
1751        &mut self,
1752        format: RelayCellFormat,
1753        peer_id: path::HopDetail,
1754        fwd: Box<dyn OutboundClientLayer + 'static + Send>,
1755        rev: Box<dyn InboundClientLayer + 'static + Send>,
1756        binding: Option<CircuitBinding>,
1757        params: &CircParameters,
1758    ) {
1759        let hop_num = (self.hops.len() as u8).into();
1760        let hop = CircHop::new(self.unique_id, hop_num, format, params);
1761        self.hops.push(hop);
1762        self.crypto_in.add_layer(rev);
1763        self.crypto_out.add_layer(fwd);
1764        let mut mutable = self.mutable.lock().expect("poisoned lock");
1765        Arc::make_mut(&mut mutable.path).push_hop(peer_id);
1766        mutable.binding.push(binding);
1767    }
1768
1769    /// Handle a RELAY cell on this circuit with stream ID 0.
1770    fn handle_meta_cell(
1771        &mut self,
1772        handlers: &mut CellHandlers,
1773        hopnum: HopNum,
1774        msg: UnparsedRelayMsg,
1775    ) -> Result<Option<RunOnceCmdInner>> {
1776        // SENDME cells and TRUNCATED get handled internally by the circuit.
1777
1778        // TODO: This pattern (Check command, try to decode, map error) occurs
1779        // several times, and would be good to extract simplify. Such
1780        // simplification is obstructed by a couple of factors: First, that
1781        // there is not currently a good way to get the RelayCmd from _type_ of
1782        // a RelayMsg.  Second, that decode() [correctly] consumes the
1783        // UnparsedRelayMsg.  I tried a macro-based approach, and didn't care
1784        // for it. -nickm
1785        if msg.cmd() == RelayCmd::SENDME {
1786            let sendme = msg
1787                .decode::<Sendme>()
1788                .map_err(|e| Error::from_bytes_err(e, "sendme message"))?
1789                .into_msg();
1790
1791            return Ok(Some(RunOnceCmdInner::HandleSendMe {
1792                hop: hopnum,
1793                sendme,
1794            }));
1795        }
1796        if msg.cmd() == RelayCmd::TRUNCATED {
1797            let truncated = msg
1798                .decode::<Truncated>()
1799                .map_err(|e| Error::from_bytes_err(e, "truncated message"))?
1800                .into_msg();
1801            let reason = truncated.reason();
1802            debug!(
1803                "{}: Truncated from hop {}. Reason: {} [{}]",
1804                self.unique_id,
1805                hopnum.display(),
1806                reason.human_str(),
1807                reason
1808            );
1809
1810            return Ok(Some(RunOnceCmdInner::CleanShutdown));
1811        }
1812
1813        trace!("{}: Received meta-cell {:?}", self.unique_id, msg);
1814
1815        // For all other command types, we'll only get them in response
1816        // to another command, which should have registered a responder.
1817        //
1818        // TODO: that means that service-introduction circuits will need
1819        // a different implementation, but that should be okay. We'll work
1820        // something out.
1821        if let Some(mut handler) = handlers.meta_handler.take() {
1822            if handler.expected_hop() == hopnum {
1823                // Somebody was waiting for a message -- maybe this message
1824                let ret = handler.handle_msg(msg, self);
1825                trace!(
1826                    "{}: meta handler completed with result: {:?}",
1827                    self.unique_id,
1828                    ret
1829                );
1830                match ret {
1831                    #[cfg(feature = "send-control-msg")]
1832                    Ok(MetaCellDisposition::Consumed) => {
1833                        handlers.meta_handler = Some(handler);
1834                        Ok(None)
1835                    }
1836                    Ok(MetaCellDisposition::ConversationFinished) => Ok(None),
1837                    #[cfg(feature = "send-control-msg")]
1838                    Ok(MetaCellDisposition::CloseCirc) => Ok(Some(RunOnceCmdInner::CleanShutdown)),
1839                    Err(e) => Err(e),
1840                }
1841            } else {
1842                // Somebody wanted a message from a different hop!  Put this
1843                // one back.
1844                handlers.meta_handler = Some(handler);
1845                Err(Error::CircProto(format!(
1846                    "Unexpected {} cell from hop {} on client circuit",
1847                    msg.cmd(),
1848                    hopnum.display(),
1849                )))
1850            }
1851        } else {
1852            // No need to call shutdown here, since this error will
1853            // propagate to the reactor shut it down.
1854            Err(Error::CircProto(format!(
1855                "Unexpected {} cell on client circuit",
1856                msg.cmd()
1857            )))
1858        }
1859    }
1860
1861    /// Handle a RELAY_SENDME cell on this circuit with stream ID 0.
1862    fn handle_sendme(
1863        &mut self,
1864        hopnum: HopNum,
1865        msg: Sendme,
1866        signals: CongestionSignals,
1867    ) -> Result<Option<RunOnceCmdInner>> {
1868        // No need to call "shutdown" on errors in this function;
1869        // it's called from the reactor task and errors will propagate there.
1870        let hop = self
1871            .hop_mut(hopnum)
1872            .ok_or_else(|| Error::CircProto(format!("Couldn't find hop {}", hopnum.display())))?;
1873
1874        let tag = match msg.into_tag() {
1875            Some(v) => CircTag::try_from(v.as_slice())
1876                .map_err(|_| Error::CircProto("malformed tag on circuit sendme".into()))?,
1877            None => {
1878                // Versions of Tor <=0.3.5 would omit a SENDME tag in this case;
1879                // but we don't support those any longer.
1880                return Err(Error::CircProto("missing tag on circuit sendme".into()));
1881            }
1882        };
1883        // Update the CC object that we received a SENDME along with possible congestion signals.
1884        hop.ccontrol.note_sendme_received(tag, signals)?;
1885        Ok(None)
1886    }
1887
1888    /// Send a message onto the circuit's channel.
1889    ///
1890    /// If the channel is ready to accept messages, it will be sent immediately. If not, the message
1891    /// will be enqueued for sending at a later iteration of the reactor loop.
1892    ///
1893    /// # Note
1894    ///
1895    /// Making use of the enqueuing capabilities of this function is discouraged! You should first
1896    /// check whether the channel is ready to receive messages (`self.channel.poll_ready`), and
1897    /// ideally use this to implement backpressure (such that you do not read from other sources
1898    /// that would send here while you know you're unable to forward the messages on).
1899    async fn send_msg(&mut self, msg: AnyChanMsg) -> Result<()> {
1900        let cell = AnyChanCell::new(Some(self.channel_id), msg);
1901        // Note: this future is always `Ready`, so await won't block.
1902        Pin::new(&mut self.chan_sender).send_unbounded(cell).await?;
1903        Ok(())
1904    }
1905
1906    /// Returns a [`Stream`] of [`RunOnceCmdInner`] to poll from the main loop.
1907    ///
1908    /// The iterator contains at most one [`RunOnceCmdInner`] for each hop,
1909    /// representing the instructions for handling the ready-item, if any,
1910    /// of its highest priority stream.
1911    ///
1912    /// IMPORTANT: this stream locks the stream map mutexes of each `CircHop`!
1913    /// To avoid contention, never create more than one [`Circuit::ready_streams_iterator`]
1914    /// stream at a time!
1915    fn ready_streams_iterator(&self) -> impl Stream<Item = Result<RunOnceCmdInner>> {
1916        self.hops
1917            .iter()
1918            .enumerate()
1919            .filter_map(|(i, hop)| {
1920                if !hop.ccontrol.can_send() {
1921                    // We can't send anything on this hop that counts towards SENDME windows.
1922                    //
1923                    // In theory we could send messages that don't count towards
1924                    // windows (like `RESOLVE`), and process end-of-stream
1925                    // events (to send an `END`), but it's probably not worth
1926                    // doing an O(N) iteration over flow-control-ready streams
1927                    // to see if that's the case.
1928                    //
1929                    // This *doesn't* block outgoing flow-control messages (e.g.
1930                    // SENDME), which are initiated via the control-message
1931                    // channel, handled above.
1932                    //
1933                    // TODO: Consider revisiting. OTOH some extra throttling when circuit-level
1934                    // congestion control has "bottomed out" might not be so bad, and the
1935                    // alternatives have complexity and/or performance costs.
1936                    return None;
1937                }
1938
1939                let hop_num = HopNum::from(i as u8);
1940                let hop_map = Arc::clone(&self.hops[i].map);
1941                Some(async move {
1942                    futures::future::poll_fn(move |cx| {
1943                        // Process an outbound message from the first ready stream on
1944                        // this hop. The stream map implements round robin scheduling to
1945                        // ensure fairness across streams.
1946                        // TODO: Consider looping here to process multiple ready
1947                        // streams. Need to be careful though to balance that with
1948                        // continuing to service incoming and control messages.
1949                        let mut hop_map = hop_map.lock().expect("lock poisoned");
1950                        let Some((sid, msg)) = hop_map.poll_ready_streams_iter(cx).next() else {
1951                            // No ready streams for this hop.
1952                            return Poll::Pending;
1953                        };
1954
1955                        if msg.is_none() {
1956                            return Poll::Ready(Ok(RunOnceCmdInner::CloseStream {
1957                                hop_num,
1958                                sid,
1959                                behav: CloseStreamBehavior::default(),
1960                                reason: streammap::TerminateReason::StreamTargetClosed,
1961                                done: None,
1962                            }));
1963                        };
1964                        let msg = hop_map.take_ready_msg(sid).expect("msg disappeared");
1965
1966                        #[allow(unused)] // unused in non-debug builds
1967                        let Some(StreamEntMut::Open(s)) = hop_map.get_mut(sid) else {
1968                            panic!("Stream {sid} disappeared");
1969                        };
1970
1971                        debug_assert!(
1972                            s.can_send(&msg),
1973                            "Stream {sid} produced a message it can't send: {msg:?}"
1974                        );
1975
1976                        let cell = SendRelayCell {
1977                            hop: hop_num,
1978                            early: false,
1979                            cell: AnyRelayMsgOuter::new(Some(sid), msg),
1980                        };
1981                        Poll::Ready(Ok(RunOnceCmdInner::Send { cell, done: None }))
1982                    })
1983                    .await
1984                })
1985            })
1986            .collect::<FuturesUnordered<_>>()
1987    }
1988
1989    /// Return the congestion signals for this reactor. This is used by congestion control module.
1990    ///
1991    /// Note: This is only async because we need a Context to check the sink for readiness.
1992    async fn congestion_signals(&mut self) -> CongestionSignals {
1993        futures::future::poll_fn(|cx| -> Poll<CongestionSignals> {
1994            Poll::Ready(CongestionSignals::new(
1995                self.chan_sender.poll_ready_unpin_bool(cx).unwrap_or(false),
1996                self.chan_sender.n_queued(),
1997            ))
1998        })
1999        .await
2000    }
2001
2002    /// Return the hop corresponding to `hopnum`, if there is one.
2003    fn hop_mut(&mut self, hopnum: HopNum) -> Option<&mut CircHop> {
2004        self.hops.get_mut(Into::<usize>::into(hopnum))
2005    }
2006
2007    /// Begin a stream with the provided hop in this circuit.
2008    fn begin_stream(
2009        &mut self,
2010        hop_num: HopNum,
2011        message: AnyRelayMsg,
2012        sender: StreamMpscSender<UnparsedRelayMsg>,
2013        rx: StreamMpscReceiver<AnyRelayMsg>,
2014        cmd_checker: AnyCmdChecker,
2015    ) -> StdResult<Result<(SendRelayCell, StreamId)>, Bug> {
2016        let Some(hop) = self.hop_mut(hop_num) else {
2017            return Err(internal!(
2018                "{}: Attempting to send a BEGIN cell to an unknown hop {hop_num:?}",
2019                self.unique_id,
2020            ));
2021        };
2022
2023        Ok(hop.begin_stream(message, sender, rx, cmd_checker))
2024    }
2025
2026    /// Close the specified stream
2027    async fn close_stream(
2028        &mut self,
2029        hop_num: HopNum,
2030        sid: StreamId,
2031        behav: CloseStreamBehavior,
2032        reason: streammap::TerminateReason,
2033    ) -> Result<()> {
2034        if let Some(hop) = self.hop_mut(hop_num) {
2035            let res = hop.close_stream(sid, behav, reason)?;
2036            if let Some(cell) = res {
2037                self.send_relay_cell(cell).await?;
2038            }
2039        }
2040        Ok(())
2041    }
2042}
2043
2044impl CellHandlers {
2045    /// Try to install a given meta-cell handler to receive any unusual cells on
2046    /// this circuit, along with a result channel to notify on completion.
2047    fn set_meta_handler(&mut self, handler: Box<dyn MetaCellHandler + Send>) -> Result<()> {
2048        if self.meta_handler.is_none() {
2049            self.meta_handler = Some(handler);
2050            Ok(())
2051        } else {
2052            Err(Error::from(internal!(
2053                "Tried to install a meta-cell handler before the old one was gone."
2054            )))
2055        }
2056    }
2057
2058    /// Try to install a given cell handler on this circuit.
2059    #[cfg(feature = "hs-service")]
2060    fn set_incoming_stream_req_handler(
2061        &mut self,
2062        handler: IncomingStreamRequestHandler,
2063    ) -> Result<()> {
2064        if self.incoming_stream_req_handler.is_none() {
2065            self.incoming_stream_req_handler = Some(handler);
2066            Ok(())
2067        } else {
2068            Err(Error::from(internal!(
2069                "Tried to install a BEGIN cell handler before the old one was gone."
2070            )))
2071        }
2072    }
2073}
2074
2075/// Return the stream ID of `msg`, if it has one.
2076///
2077/// Returns `Ok(None)` if `msg` is a meta cell.
2078fn msg_streamid(msg: &UnparsedRelayMsg) -> Result<Option<StreamId>> {
2079    let cmd = msg.cmd();
2080    let streamid = msg.stream_id();
2081    if !cmd.accepts_streamid_val(streamid) {
2082        return Err(Error::CircProto(format!(
2083            "Invalid stream ID {} for relay command {}",
2084            sv(StreamId::get_or_zero(streamid)),
2085            msg.cmd()
2086        )));
2087    }
2088
2089    Ok(streamid)
2090}
2091
2092impl Drop for Circuit {
2093    fn drop(&mut self) {
2094        let _ = self.channel.close_circuit(self.channel_id);
2095    }
2096}
2097
2098#[cfg(test)]
2099mod test {
2100    // Tested in [`crate::tunnel::circuit::test`].
2101}