Skip to main content

tor_proto/circuit/
circhop.rs

1//! Module exposing structures relating to a reactor's view of a circuit hop.
2
3// TODO(relay): don't import from the client module
4use crate::client::circuit::handshake::RelayCryptLayerProtocol;
5
6use crate::ccparams::CongestionControlParams;
7use crate::circuit::CircParameters;
8use crate::congestion::{CongestionControl, sendme};
9use crate::memquota::{SpecificAccount, StreamAccount};
10use crate::stream::CloseStreamBehavior;
11use crate::stream::SEND_WINDOW_INIT;
12use crate::stream::StreamMpscSender;
13use crate::stream::cmdcheck::{AnyCmdChecker, StreamStatus};
14use crate::stream::flow_ctrl::params::FlowCtrlParameters;
15use crate::stream::flow_ctrl::state::{FlowCtrlHooks, StreamFlowCtrl, StreamRateLimit};
16use crate::stream::flow_ctrl::xon_xoff::reader::DrainRateRequest;
17use crate::stream::queue::{StreamQueueReceiver, stream_queue};
18use crate::streammap::{
19    self, EndSentStreamEnt, OpenStreamEnt, ShouldSendEnd, StreamEntMut, StreamMap,
20};
21use crate::util::notify::{NotifyReceiver, NotifySender};
22use crate::{Error, HopNum, Result};
23
24use derive_deftly::Deftly;
25use postage::watch;
26use safelog::sensitive as sv;
27use tracing::{debug, trace};
28
29use tor_cell::chancell::BoxedCellBody;
30use tor_cell::relaycell::extend::{CcRequest, CircRequestExt};
31use tor_cell::relaycell::flow_ctrl::{Xoff, Xon, XonKbpsEwma};
32use tor_cell::relaycell::msg::AnyRelayMsg;
33use tor_cell::relaycell::{
34    AnyRelayMsgOuter, RelayCellDecoder, RelayCellDecoderResult, RelayCellFormat, RelayCmd,
35    StreamId, UnparsedRelayMsg,
36};
37use tor_error::{Bug, internal};
38use tor_memquota::derive_deftly_template_HasMemoryCost;
39use tor_memquota::mq_queue::{ChannelSpec as _, MpscSpec};
40use tor_protover::named;
41use tor_rtcompat::DynTimeProvider;
42
43use std::num::NonZeroU32;
44use std::pin::Pin;
45use std::result::Result as StdResult;
46use std::sync::{Arc, Mutex};
47use web_time_compat::Instant;
48
49#[cfg(test)]
50use tor_cell::relaycell::msg::SendmeTag;
51
52use cfg_if::cfg_if;
53
54/// The size of the stream's outbound RELAY message queue.
55// TODO(tuning): figure out if this is a good size for this buffer
56const CIRCUIT_BUFFER_SIZE: usize = 128;
57
58/// Type of negotiation that we'll be performing as we establish a hop.
59///
60/// Determines what flavor of extensions we can send and receive, which in turn
61/// limits the hop settings we can negotiate.
62///
63// TODO-CGO: This is likely to be refactored when we finally add support for
64// HsV3+CGO, which will require refactoring
65#[derive(Debug, Clone, Copy, Eq, PartialEq)]
66pub(crate) enum HopNegotiationType {
67    /// We're using a handshake in which extension-based negotiation cannot occur.
68    None,
69    /// We're using the HsV3-ntor handshake, in which the client can send extensions,
70    /// but the server cannot.
71    ///
72    /// As a special case, the default relay encryption protocol is the hsv3
73    /// variant of Tor1.
74    //
75    // We would call this "HalfDuplex" or something, but we do not expect to add
76    // any more handshakes of this type.
77    HsV3,
78    /// We're using a handshake in which both client and relay can send extensions.
79    Full,
80}
81
82/// The settings we use for single hop of a circuit.
83///
84/// Unlike [`CircParameters`], this type is crate-internal.
85/// We construct it based on our settings from the circuit,
86/// and from the hop's actual capabilities.
87/// Then, we negotiate with the hop as part of circuit
88/// creation/extension to determine the actual settings that will be in use.
89/// Finally, we use those settings to construct the negotiated circuit hop.
90//
91// TODO: Relays should probably derive an instance of this type too, as
92// part of the circuit creation handshake.
93#[derive(Clone, Debug)]
94pub(crate) struct HopSettings {
95    /// The negotiated congestion control settings for this hop .
96    pub(crate) ccontrol: CongestionControlParams,
97
98    /// Flow control parameters that will be used for streams on this hop.
99    pub(crate) flow_ctrl_params: FlowCtrlParameters,
100
101    /// Maximum number of permitted incoming relay cells for this hop.
102    pub(crate) n_incoming_cells_permitted: Option<u32>,
103
104    /// Maximum number of permitted outgoing relay cells for this hop.
105    pub(crate) n_outgoing_cells_permitted: Option<u32>,
106
107    /// The relay cell encryption algorithm and cell format for this hop.
108    relay_crypt_protocol: RelayCryptLayerProtocol,
109}
110
111impl HopSettings {
112    /// Construct a new `HopSettings` based on `params` (a set of circuit parameters)
113    /// and `caps` (a set of protocol capabilities for a circuit target).
114    ///
115    /// The resulting settings will represent what the client would prefer to negotiate
116    /// (determined by `params`),
117    /// as modified by what the target relay is believed to support (represented by `caps`).
118    ///
119    /// This represents the `HopSettings` in a pre-negotiation state:
120    /// the circuit negotiation process will modify it.
121    #[allow(clippy::unnecessary_wraps)] // likely to become fallible in the future.
122    pub(crate) fn from_params_and_caps(
123        hoptype: HopNegotiationType,
124        params: &CircParameters,
125        caps: &tor_protover::Protocols,
126    ) -> Result<Self> {
127        let mut ccontrol = params.ccontrol.clone();
128        match ccontrol.alg() {
129            crate::ccparams::Algorithm::FixedWindow(_) => {}
130            crate::ccparams::Algorithm::Vegas(_) => {
131                // If the target doesn't support FLOWCTRL_CC, we can't use Vegas.
132                if !caps.supports_named_subver(named::FLOWCTRL_CC) {
133                    ccontrol.use_fallback_alg();
134                }
135            }
136        };
137        if hoptype == HopNegotiationType::None {
138            ccontrol.use_fallback_alg();
139        } else if hoptype == HopNegotiationType::HsV3 {
140            // TODO #2037, TODO-CGO: We need a way to send congestion control extensions
141            // in this case too.  But since we aren't sending them, we
142            // should use the fallback algorithm.
143            ccontrol.use_fallback_alg();
144        }
145        let ccontrol = ccontrol; // drop mut
146
147        // Negotiate CGO if it is supported, if CC is also supported,
148        // and if CGO is available on this relay.
149        let relay_crypt_protocol = match hoptype {
150            HopNegotiationType::None => RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0),
151            HopNegotiationType::HsV3 => {
152                // TODO-CGO: Support CGO when available.
153                cfg_if! {
154                    if #[cfg(feature = "hs-common")] {
155                        RelayCryptLayerProtocol::HsV3(RelayCellFormat::V0)
156                    } else {
157                        return Err(
158                            tor_error::internal!("Unexpectedly tried to negotiate HsV3 without support!").into(),
159                        );
160                    }
161                }
162            }
163            HopNegotiationType::Full => {
164                cfg_if! {
165                    if #[cfg(all(feature = "flowctl-cc", feature = "counter-galois-onion"))] {
166                        #[allow(clippy::overly_complex_bool_expr)]
167                        if  ccontrol.alg().compatible_with_cgo()
168                            && caps.supports_named_subver(named::RELAY_NEGOTIATE_SUBPROTO)
169                            && caps.supports_named_subver(named::RELAY_CRYPT_CGO)
170                        {
171                            RelayCryptLayerProtocol::Cgo
172                        } else {
173                            RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0)
174                        }
175                    } else {
176                        RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0)
177                    }
178                }
179            }
180        };
181
182        Ok(Self {
183            ccontrol,
184            flow_ctrl_params: params.flow_ctrl.clone(),
185            relay_crypt_protocol,
186            n_incoming_cells_permitted: params.n_incoming_cells_permitted,
187            n_outgoing_cells_permitted: params.n_outgoing_cells_permitted,
188        })
189    }
190
191    /// Return the negotiated relay crypto protocol.
192    pub(crate) fn relay_crypt_protocol(&self) -> RelayCryptLayerProtocol {
193        self.relay_crypt_protocol
194    }
195
196    /// Return the client circuit-creation extensions that we should use in order to negotiate
197    /// these circuit hop parameters.
198    #[allow(clippy::unnecessary_wraps)]
199    pub(crate) fn circuit_request_extensions(&self) -> Result<Vec<CircRequestExt>> {
200        // allow 'unused_mut' because of the combinations of `cfg` conditions below
201        #[allow(unused_mut)]
202        let mut client_extensions = Vec::new();
203
204        #[allow(unused, unused_mut)]
205        let mut cc_extension_set = false;
206
207        if self.ccontrol.is_enabled() {
208            cfg_if::cfg_if! {
209                if #[cfg(feature = "flowctl-cc")] {
210                    client_extensions.push(CircRequestExt::CcRequest(CcRequest::default()));
211                    cc_extension_set = true;
212                } else {
213                    return Err(
214                        tor_error::internal!(
215                            "Congestion control is enabled on this circuit, but 'flowctl-cc' feature is not enabled"
216                        )
217                        .into()
218                    );
219                }
220            }
221        }
222
223        // See whether we need to send a list of required protocol capabilities.
224        // These aren't "negotiated" per se; they're simply demanded.
225        // The relay will refuse the circuit if it doesn't support all of them,
226        // and if any of them isn't supported in the SubprotocolRequest extension.
227        //
228        // (In other words, don't add capabilities here just because you want the
229        // relay to have them! They must be explicitly listed as supported for use
230        // with this extension. For the current list, see
231        // https://spec.torproject.org/tor-spec/create-created-cells.html#subproto-request)
232        //
233        #[allow(unused_mut)]
234        let mut required_protocol_capabilities: Vec<tor_protover::NamedSubver> = Vec::new();
235
236        #[cfg(feature = "counter-galois-onion")]
237        if matches!(self.relay_crypt_protocol(), RelayCryptLayerProtocol::Cgo) {
238            if !cc_extension_set {
239                return Err(tor_error::internal!("Tried to negotiate CGO without CC.").into());
240            }
241            required_protocol_capabilities.push(tor_protover::named::RELAY_CRYPT_CGO);
242        }
243
244        if !required_protocol_capabilities.is_empty() {
245            client_extensions.push(CircRequestExt::SubprotocolRequest(
246                required_protocol_capabilities.into_iter().collect(),
247            ));
248        }
249
250        Ok(client_extensions)
251    }
252}
253
254#[cfg(test)]
255impl std::default::Default for CircParameters {
256    fn default() -> Self {
257        Self {
258            extend_by_ed25519_id: true,
259            ccontrol: crate::congestion::test_utils::params::build_cc_fixed_params(),
260            flow_ctrl: FlowCtrlParameters::defaults_for_tests(),
261            n_incoming_cells_permitted: None,
262            n_outgoing_cells_permitted: None,
263        }
264    }
265}
266
267impl CircParameters {
268    /// Constructor
269    pub fn new(
270        extend_by_ed25519_id: bool,
271        ccontrol: CongestionControlParams,
272        flow_ctrl: FlowCtrlParameters,
273    ) -> Self {
274        Self {
275            extend_by_ed25519_id,
276            ccontrol,
277            flow_ctrl,
278            n_incoming_cells_permitted: None,
279            n_outgoing_cells_permitted: None,
280        }
281    }
282}
283
284/// Instructions for sending a RELAY cell.
285///
286/// This instructs a circuit reactor to send a RELAY cell to a given target
287/// (a hop, if we are a client, or the client, if we are a relay).
288#[derive(educe::Educe)]
289#[educe(Debug)]
290pub(crate) struct SendRelayCell {
291    /// The hop number, or `None` if we are a relay.
292    pub(crate) hop: Option<HopNum>,
293    /// Whether to use a RELAY_EARLY cell.
294    pub(crate) early: bool,
295    /// The cell to send.
296    pub(crate) cell: AnyRelayMsgOuter,
297}
298
299/// The inbound state of a hop.
300pub(crate) struct CircHopInbound {
301    /// Decodes relay cells received from this hop.
302    decoder: RelayCellDecoder,
303    /// Remaining permitted incoming relay cells from this hop, plus 1.
304    ///
305    /// (In other words, `None` represents no limit,
306    /// `Some(1)` represents an exhausted limit,
307    /// and `Some(n)` means that n-1 more cells may be received.)
308    ///
309    /// If this ever decrements from Some(1), then the circuit must be torn down with an error.
310    n_incoming_cells_permitted: Option<NonZeroU32>,
311}
312
313/// The outbound state of a hop.
314pub(crate) struct CircHopOutbound {
315    /// Congestion control object.
316    ///
317    /// This object is also in charge of handling circuit level SENDME logic for this hop.
318    ccontrol: Arc<Mutex<CongestionControl>>,
319    /// Map from stream IDs to streams.
320    ///
321    /// We store this with the reactor instead of the circuit, since the
322    /// reactor needs it for every incoming cell on a stream, whereas
323    /// the circuit only needs it when allocating new streams.
324    ///
325    /// NOTE: this is behind a mutex because the client reactor polls the `StreamMap`s
326    /// of all hops concurrently, in a `FuturesUnordered`. Without the mutex,
327    /// this wouldn't be possible, because it would mean holding multiple
328    /// mutable references to `self` (the reactor). Note, however,
329    /// that there should never be any contention on this mutex:
330    /// we never create more than one
331    /// `CircHopList::ready_streams_iterator()` stream
332    /// at a time, and we never clone/lock the hop's `StreamMap` outside of it.
333    ///
334    /// Additionally, the stream map of the last hop (join point) of a conflux tunnel
335    /// is shared with all the circuits in the tunnel.
336    map: Arc<Mutex<StreamMap>>,
337    /// Format to use for relay cells.
338    //
339    // When we have packed/fragmented cells, this may be replaced by a RelayCellEncoder.
340    relay_format: RelayCellFormat,
341    /// Flow control parameters for new streams.
342    flow_ctrl_params: Arc<FlowCtrlParameters>,
343    /// Remaining permitted outgoing relay cells from this hop, plus 1.
344    ///
345    /// If this ever decrements from Some(1), then the circuit must be torn down with an error.
346    n_outgoing_cells_permitted: Option<NonZeroU32>,
347}
348
349impl CircHopInbound {
350    /// Create a new [`CircHopInbound`].
351    pub(crate) fn new(decoder: RelayCellDecoder, settings: &HopSettings) -> Self {
352        Self {
353            decoder,
354            n_incoming_cells_permitted: settings.n_incoming_cells_permitted.map(cvt),
355        }
356    }
357
358    /// Parse a RELAY or RELAY_EARLY cell body.
359    ///
360    /// Requires that the cryptographic checks on the message have already been
361    /// performed
362    pub(crate) fn decode(&mut self, cell: BoxedCellBody) -> Result<RelayCellDecoderResult> {
363        self.decoder
364            .decode(cell)
365            .map_err(|e| Error::from_bytes_err(e, "relay cell"))
366    }
367
368    /// Decrement the limit of inbound cells that may be received from this hop; give
369    /// an error if it would reach zero.
370    pub(crate) fn decrement_cell_limit(&mut self) -> Result<()> {
371        try_decrement_cell_limit(&mut self.n_incoming_cells_permitted)
372            .map_err(|_| Error::ExcessInboundCells)
373    }
374}
375
376impl CircHopOutbound {
377    /// Create a new [`CircHopOutbound`].
378    pub(crate) fn new(
379        ccontrol: Arc<Mutex<CongestionControl>>,
380        relay_format: RelayCellFormat,
381        flow_ctrl_params: Arc<FlowCtrlParameters>,
382        settings: &HopSettings,
383    ) -> Self {
384        Self {
385            ccontrol,
386            map: Arc::new(Mutex::new(StreamMap::new())),
387            relay_format,
388            flow_ctrl_params,
389            n_outgoing_cells_permitted: settings.n_outgoing_cells_permitted.map(cvt),
390        }
391    }
392
393    /// Start a stream. Creates an entry in the stream map with the given channels, and sends the
394    /// `message` to the provided hop.
395    pub(crate) fn begin_stream(
396        &mut self,
397        hop: Option<HopNum>,
398        message: AnyRelayMsg,
399        time_prov: &DynTimeProvider,
400        cmd_checker: AnyCmdChecker,
401        memquota: &StreamAccount,
402    ) -> Result<(SendRelayCell, StreamId, ReactorStreamComponents)> {
403        // TODO: This has a lot of duplicated code with `Self::add_ent_with_id()`.
404
405        // A channel for the reactor to inform the writer of a new rate limit.
406        let (rate_limit_tx, rate_limit_rx) = watch::channel_with(StreamRateLimit::MAX);
407
408        // A channel for the reactor to request a new drain rate from the reader.
409        // Typically this notification will be sent after an XOFF is sent so that the reader can
410        // send us a new drain rate when the stream data queue becomes empty.
411        let mut drain_rate_request_tx = NotifySender::new_typed();
412        let drain_rate_request_rx = drain_rate_request_tx.subscribe();
413
414        let flow_ctrl = self.build_flow_ctrl(rate_limit_tx, drain_rate_request_tx)?;
415
416        let stream_queue_max_len = flow_ctrl.inbound_queue_max_len();
417
418        // A queue for inbound RELAY messages.
419        let (sender, receiver) = stream_queue(stream_queue_max_len, memquota, time_prov)?;
420
421        // A queue for outbound RELAY messages.
422        let (msg_tx, msg_rx) = MpscSpec::new(CIRCUIT_BUFFER_SIZE)
423            .new_mq(time_prov.clone(), memquota.as_raw_account())?;
424
425        let r = self.map.lock().expect("lock poisoned").add_ent(
426            sender,
427            msg_rx,
428            flow_ctrl,
429            cmd_checker,
430        )?;
431        let cell = AnyRelayMsgOuter::new(Some(r), message);
432
433        let stream_components = ReactorStreamComponents {
434            stream_inbound_rx: receiver,
435            stream_outbound_tx: msg_tx,
436            rate_limit_rx,
437            drain_rate_request_rx,
438        };
439
440        Ok((
441            SendRelayCell {
442                hop,
443                early: false,
444                cell,
445            },
446            r,
447            stream_components,
448        ))
449    }
450
451    /// Close the stream associated with `id` because the stream was dropped.
452    ///
453    /// If we have not already received an END cell on this stream, send one.
454    /// If no END cell is specified, an END cell with the reason byte set to
455    /// REASON_MISC will be sent.
456    ///
457    // Note(relay): `circ_id` is an opaque displayable type
458    // because relays use a different circuit ID type
459    // than clients. Eventually, we should probably make
460    // them both use the same ID type, or have a nicer approach here
461    pub(crate) fn close_stream(
462        &mut self,
463        circ_id: impl std::fmt::Display,
464        id: StreamId,
465        hop: Option<HopNum>,
466        message: CloseStreamBehavior,
467        why: streammap::TerminateReason,
468        expiry: Instant,
469    ) -> Result<Option<SendRelayCell>> {
470        let should_send_end = self
471            .map
472            .lock()
473            .expect("lock poisoned")
474            .terminate(id, why, expiry)?;
475        trace!(
476            circ_id = %circ_id,
477            stream_id = %id,
478            should_send_end = ?should_send_end,
479            "Ending stream",
480        );
481        // TODO: I am about 80% sure that we only send an END cell if
482        // we didn't already get an END cell.  But I should double-check!
483        if let (ShouldSendEnd::Send, CloseStreamBehavior::SendEnd(end_message)) =
484            (should_send_end, message)
485        {
486            let end_cell = AnyRelayMsgOuter::new(Some(id), end_message.into());
487            let cell = SendRelayCell {
488                hop,
489                early: false,
490                cell: end_cell,
491            };
492
493            return Ok(Some(cell));
494        }
495        Ok(None)
496    }
497
498    /// Check if we should send an XON message.
499    ///
500    /// If we should, then returns the XON message that should be sent.
501    pub(crate) fn maybe_send_xon(
502        &mut self,
503        rate: XonKbpsEwma,
504        id: StreamId,
505    ) -> Result<Option<Xon>> {
506        // the call below will return an error if XON/XOFF aren't supported,
507        // so we check for support here
508        if !self
509            .ccontrol()
510            .lock()
511            .expect("poisoned lock")
512            .uses_xon_xoff()
513        {
514            return Ok(None);
515        }
516
517        let mut map = self.map.lock().expect("lock poisoned");
518        let Some(StreamEntMut::Open(ent)) = map.get_mut(id) else {
519            // stream went away
520            return Ok(None);
521        };
522
523        ent.maybe_send_xon(rate)
524    }
525
526    /// Check if we should send an XOFF message.
527    ///
528    /// If we should, then returns the XOFF message that should be sent.
529    pub(crate) fn maybe_send_xoff(&mut self, id: StreamId) -> Result<Option<Xoff>> {
530        // the call below will return an error if XON/XOFF aren't supported,
531        // so we check for support here
532        if !self
533            .ccontrol()
534            .lock()
535            .expect("poisoned lock")
536            .uses_xon_xoff()
537        {
538            return Ok(None);
539        }
540
541        let mut map = self.map.lock().expect("lock poisoned");
542        let Some(StreamEntMut::Open(ent)) = map.get_mut(id) else {
543            // stream went away
544            return Ok(None);
545        };
546
547        ent.maybe_send_xoff()
548    }
549
550    /// Return the format that is used for relay cells sent to this hop.
551    ///
552    /// For the most part, this format isn't necessary to interact with a CircHop;
553    /// it becomes relevant when we are deciding _what_ we can encode for the hop.
554    pub(crate) fn relay_cell_format(&self) -> RelayCellFormat {
555        self.relay_format
556    }
557
558    /// Delegate to CongestionControl, for testing purposes
559    #[cfg(test)]
560    pub(crate) fn send_window_and_expected_tags(&self) -> (u32, Vec<SendmeTag>) {
561        self.ccontrol()
562            .lock()
563            .expect("poisoned lock")
564            .send_window_and_expected_tags()
565    }
566
567    /// Return the number of open streams on this hop.
568    ///
569    /// WARNING: because this locks the stream map mutex,
570    /// it should never be called from a context where that mutex is already locked.
571    pub(crate) fn n_open_streams(&self) -> usize {
572        self.map.lock().expect("lock poisoned").n_open_streams()
573    }
574
575    /// Return a reference to our CongestionControl object.
576    pub(crate) fn ccontrol(&self) -> &Arc<Mutex<CongestionControl>> {
577        &self.ccontrol
578    }
579
580    /// We're about to send `msg`.
581    ///
582    /// See [`OpenStreamEnt::about_to_send`](crate::streammap::OpenStreamEnt::about_to_send).
583    //
584    // TODO prop340: This should take a cell or similar, not a message.
585    //
586    // Note(relay): `circ_id` is an opaque displayable type
587    // because relays use a different circuit ID type
588    // than clients. Eventually, we should probably make
589    // them both use the same ID type, or have a nicer approach here
590    pub(crate) fn about_to_send(
591        &mut self,
592        circ_id: impl std::fmt::Display,
593        stream_id: StreamId,
594        msg: &AnyRelayMsg,
595    ) -> Result<()> {
596        let mut hop_map = self.map.lock().expect("lock poisoned");
597        let Some(StreamEntMut::Open(ent)) = hop_map.get_mut(stream_id) else {
598            // This can happen when we have outgoing data queued when we received an END.
599            // We shouldn't return an error here since it would close the circuit along with all
600            // other streams, and instead we just let the caller send this message anyways.
601            // Also the caller only calls `about_to_send()` for DATA cells,
602            // which means that other non-DATA cells don't hit this code path and are always sent,
603            // and so we should handle all cell types consistently.
604            // TODO: We should drop the message and not send it,
605            // but the caller of `about_to_send()` isn't designed to handle fallible sends
606            // so it would need some refactoring to handle this.
607            debug!(
608                circ_id = %circ_id,
609                stream_id = %stream_id,
610                "sending a relay cell for non-existent or non-open stream!",
611            );
612            return Ok(());
613        };
614
615        ent.about_to_send(msg)
616    }
617
618    /// Add an entry to this map using the specified StreamId.
619    #[cfg(any(feature = "hs-service", feature = "relay"))]
620    pub(crate) fn add_ent_with_id(
621        &self,
622        time_prov: &DynTimeProvider,
623        stream_id: StreamId,
624        cmd_checker: AnyCmdChecker,
625        memquota: &StreamAccount,
626    ) -> Result<ReactorStreamComponents> {
627        // TODO: This has a lot of duplicated code with `Self::begin_stream()`.
628
629        // A channel for the reactor to inform the writer of a new rate limit.
630        let (rate_limit_tx, rate_limit_rx) = watch::channel_with(StreamRateLimit::MAX);
631
632        // A channel for the reactor to request a new drain rate from the reader.
633        // Typically this notification will be sent after an XOFF is sent so that the reader can
634        // send us a new drain rate when the stream data queue becomes empty.
635        let mut drain_rate_request_tx = NotifySender::new_typed();
636        let drain_rate_request_rx = drain_rate_request_tx.subscribe();
637
638        let flow_ctrl = self.build_flow_ctrl(rate_limit_tx, drain_rate_request_tx)?;
639
640        let stream_queue_max_len = flow_ctrl.inbound_queue_max_len();
641
642        // A queue for inbound RELAY messages.
643        let (sender, receiver) = stream_queue(stream_queue_max_len, memquota, time_prov)?;
644
645        // A queue for outbound RELAY messages.
646        let (msg_tx, msg_rx) = MpscSpec::new(CIRCUIT_BUFFER_SIZE)
647            .new_mq(time_prov.clone(), memquota.as_raw_account())?;
648
649        let mut hop_map = self.map.lock().expect("lock poisoned");
650        hop_map.add_ent_with_id(sender, msg_rx, flow_ctrl, stream_id, cmd_checker)?;
651
652        Ok(ReactorStreamComponents {
653            stream_inbound_rx: receiver,
654            stream_outbound_tx: msg_tx,
655            rate_limit_rx,
656            drain_rate_request_rx,
657        })
658    }
659
660    /// Builds the reactor's flow control handler for a new stream.
661    // TODO: remove the `Result` once we remove the "flowctl-cc" feature
662    #[cfg_attr(feature = "flowctl-cc", expect(clippy::unnecessary_wraps))]
663    fn build_flow_ctrl(
664        &self,
665        rate_limit_updater: watch::Sender<StreamRateLimit>,
666        drain_rate_requester: NotifySender<DrainRateRequest>,
667    ) -> Result<StreamFlowCtrl> {
668        let params = Arc::clone(&self.flow_ctrl_params);
669
670        if self
671            .ccontrol()
672            .lock()
673            .expect("poisoned lock")
674            .uses_stream_sendme()
675        {
676            let window = sendme::StreamSendWindow::new(SEND_WINDOW_INIT);
677            Ok(StreamFlowCtrl::new_window(window))
678        } else {
679            cfg_if::cfg_if! {
680                if #[cfg(feature = "flowctl-cc")] {
681                    // TODO: Currently arti only supports clients, and we don't support connecting
682                    // to onion services while using congestion control, so we hardcode this. In the
683                    // future we will need to somehow tell the `CircHop` this so that we can set it
684                    // correctly, since we don't want to enable this at exits.
685                    let use_sidechannel_mitigations = true;
686
687                    Ok(StreamFlowCtrl::new_xon_xoff(
688                        params,
689                        use_sidechannel_mitigations,
690                        rate_limit_updater,
691                        drain_rate_requester,
692                    ))
693                } else {
694                    drop(params);
695                    drop(rate_limit_updater);
696                    drop(drain_rate_requester);
697                    Err(internal!(
698                        "`CongestionControl` doesn't use sendmes, but 'flowctl-cc' feature not enabled",
699                    ).into())
700                }
701            }
702        }
703    }
704
705    /// Deliver `msg` to the specified open stream entry `ent`.
706    fn deliver_msg_to_stream(
707        streamid: StreamId,
708        ent: &mut OpenStreamEnt,
709        cell_counts_toward_windows: bool,
710        msg: UnparsedRelayMsg,
711    ) -> Result<bool> {
712        use tor_async_utils::SinkTrySend as _;
713        use tor_async_utils::SinkTrySendError as _;
714
715        // The stream for this message exists, and is open.
716
717        // We need to handle SENDME/XON/XOFF messages here, not in the stream's recv() method, or
718        // else we'd never notice them if the stream isn't reading.
719        match msg.cmd() {
720            RelayCmd::SENDME => {
721                ent.put_for_incoming_sendme(msg)?;
722                return Ok(false);
723            }
724            RelayCmd::XON => {
725                ent.handle_incoming_xon(msg)?;
726                return Ok(false);
727            }
728            RelayCmd::XOFF => {
729                ent.handle_incoming_xoff(msg)?;
730                return Ok(false);
731            }
732            _ => {}
733        }
734
735        let message_closes_stream = ent.cmd_checker.check_msg(&msg)? == StreamStatus::Closed;
736
737        if let Err(e) = Pin::new(&mut ent.sink).try_send(msg) {
738            if e.is_full() {
739                cfg_if::cfg_if! {
740                    if #[cfg(not(feature = "flowctl-cc"))] {
741                        // If we get here, we either have a logic bug (!), or an attacker
742                        // is sending us more cells than we asked for via congestion control.
743                        return Err(Error::CircProto(format!(
744                            "Stream sink would block; received too many cells on stream ID {}",
745                            sv(streamid),
746                        )));
747                    } else {
748                        return Err(internal!(
749                            "Stream (ID {}) uses an unbounded queue, but apparently it's full?",
750                            sv(streamid),
751                        )
752                        .into());
753                    }
754                }
755            }
756            if e.is_disconnected() && cell_counts_toward_windows {
757                // the other side of the stream has gone away; remember
758                // that we received a cell that we couldn't queue for it.
759                //
760                // Later this value will be recorded in a half-stream.
761                ent.dropped += 1;
762            }
763        }
764
765        Ok(message_closes_stream)
766    }
767
768    /// Note that we received an END message (or other message indicating the end of
769    /// the stream) on the stream with `id`.
770    ///
771    /// See [`StreamMap::ending_msg_received`](crate::streammap::StreamMap::ending_msg_received).
772    #[cfg(feature = "hs-service")]
773    pub(crate) fn ending_msg_received(&self, stream_id: StreamId) -> Result<()> {
774        let mut hop_map = self.map.lock().expect("lock poisoned");
775
776        hop_map.ending_msg_received(stream_id)?;
777
778        Ok(())
779    }
780
781    /// Handle `msg`, delivering it to the stream with the specified `streamid` if appropriate.
782    ///
783    /// Returns back the provided `msg`, if the message is an incoming stream request
784    /// that needs to be handled by the calling code.
785    ///
786    // TODO: the above is a bit of a code smell -- we should try to avoid passing the msg
787    // back and forth like this.
788    pub(crate) fn handle_msg<F>(
789        &self,
790        possible_proto_violation_err: F,
791        cell_counts_toward_windows: bool,
792        streamid: StreamId,
793        msg: UnparsedRelayMsg,
794        now: Instant,
795    ) -> Result<Option<UnparsedRelayMsg>>
796    where
797        F: FnOnce(StreamId) -> Error,
798    {
799        let mut hop_map = self.map.lock().expect("lock poisoned");
800
801        match hop_map.get_mut(streamid) {
802            Some(StreamEntMut::Open(ent)) => {
803                // Can't have a stream level SENDME when congestion control is enabled.
804                let message_closes_stream =
805                    Self::deliver_msg_to_stream(streamid, ent, cell_counts_toward_windows, msg)?;
806
807                if message_closes_stream {
808                    hop_map.ending_msg_received(streamid)?;
809                }
810            }
811            Some(StreamEntMut::EndSent(EndSentStreamEnt { expiry, .. })) if now >= *expiry => {
812                return Err(possible_proto_violation_err(streamid));
813            }
814            #[cfg(feature = "hs-service")]
815            Some(StreamEntMut::EndSent(_))
816                if matches!(
817                    msg.cmd(),
818                    RelayCmd::BEGIN | RelayCmd::BEGIN_DIR | RelayCmd::RESOLVE
819                ) =>
820            {
821                // If the other side is sending us a BEGIN but hasn't yet acknowledged our END
822                // message, just remove the old stream from the map and stop waiting for a
823                // response
824                hop_map.ending_msg_received(streamid)?;
825                return Ok(Some(msg));
826            }
827            Some(StreamEntMut::EndSent(EndSentStreamEnt { half_stream, .. })) => {
828                // We sent an end but maybe the other side hasn't heard.
829
830                match half_stream.handle_msg(msg)? {
831                    StreamStatus::Open => {}
832                    StreamStatus::Closed => {
833                        hop_map.ending_msg_received(streamid)?;
834                    }
835                }
836            }
837            #[cfg(feature = "hs-service")]
838            None if matches!(
839                msg.cmd(),
840                RelayCmd::BEGIN | RelayCmd::BEGIN_DIR | RelayCmd::RESOLVE
841            ) =>
842            {
843                return Ok(Some(msg));
844            }
845            _ => {
846                // No stream wants this message, or ever did.
847                return Err(possible_proto_violation_err(streamid));
848            }
849        }
850
851        Ok(None)
852    }
853
854    /// Get the stream map of this hop.
855    pub(crate) fn stream_map(&self) -> &Arc<Mutex<StreamMap>> {
856        &self.map
857    }
858
859    /// Set the stream map of this hop to `map`.
860    ///
861    /// Returns an error if the existing stream map of the hop has any open stream.
862    pub(crate) fn set_stream_map(&mut self, map: Arc<Mutex<StreamMap>>) -> StdResult<(), Bug> {
863        if self.n_open_streams() != 0 {
864            return Err(internal!("Tried to discard existing open streams?!"));
865        }
866
867        self.map = map;
868
869        Ok(())
870    }
871
872    /// Decrement the limit of outbound cells that may be sent to this hop; give
873    /// an error if it would reach zero.
874    pub(crate) fn decrement_cell_limit(&mut self) -> Result<()> {
875        try_decrement_cell_limit(&mut self.n_outgoing_cells_permitted)
876            .map_err(|_| Error::ExcessOutboundCells)
877    }
878}
879
880/// If `val` is `Some(1)`, return Err(());
881/// otherwise decrement it (if it is Some) and return Ok(()).
882#[inline]
883fn try_decrement_cell_limit(val: &mut Option<NonZeroU32>) -> StdResult<(), ()> {
884    // This is a bit verbose, but I've confirmed that it optimizes nicely.
885    match val {
886        Some(x) => {
887            let z = u32::from(*x);
888            if z == 1 {
889                Err(())
890            } else {
891                *x = (z - 1).try_into().expect("NonZeroU32 was zero?!");
892                Ok(())
893            }
894        }
895        None => Ok(()),
896    }
897}
898
899/// Convert a limit from the form used in a HopSettings to that used here.
900/// (The format we use here is more compact.)
901fn cvt(limit: u32) -> NonZeroU32 {
902    // See "known limitations" comment on n_incoming_cells_permitted.
903    limit
904        .saturating_add(1)
905        .try_into()
906        .expect("Adding one left it as zero?")
907}
908
909/// A collection of components that can be used to interact with the reactor's view of a Tor stream.
910//
911// TODO: We also have a `StreamComponents` type that is used and built outside of the reactor.
912// It's maybe confusing to have these similar type names, so a better name would be nice.
913//
914// TODO(arti#2068): The components we return should maybe depend on what type of flow control is
915// used, so in the future we might want to make some of these fields optional.
916#[derive(Debug, Deftly)]
917#[derive_deftly(HasMemoryCost)]
918pub(crate) struct ReactorStreamComponents {
919    /// An MPSC receiver for inbound messages that arrive on the stream.
920    #[deftly(has_memory_cost(indirect_size = "0"))] // estimate
921    pub(crate) stream_inbound_rx: StreamQueueReceiver,
922
923    /// An MPSC sender for outbound messages to be sent on the stream.
924    #[deftly(has_memory_cost(indirect_size = "size_of::<AnyRelayMsg>()"))] // estimate
925    pub(crate) stream_outbound_tx: StreamMpscSender<AnyRelayMsg>,
926
927    /// A mechanism to allow the stream's writer to receive rate limit updates from the reactor.
928    // The `watch::Sender` owns the indirect data.
929    #[deftly(has_memory_cost(indirect_size = "0"))]
930    pub(crate) rate_limit_rx: watch::Receiver<StreamRateLimit>,
931
932    /// A mechanism to allow the stream's reader to receive drain rate update requests from the
933    /// reactor.
934    #[deftly(has_memory_cost(indirect_size = "0"))]
935    pub(crate) drain_rate_request_rx: NotifyReceiver<DrainRateRequest>,
936}