tor_proto/circuit/circhop.rs
1//! Module exposing structures relating to a reactor's view of a circuit hop.
2
3// TODO(relay): don't import from the client module
4use crate::client::circuit::handshake::RelayCryptLayerProtocol;
5
6use crate::ccparams::CongestionControlParams;
7use crate::circuit::CircParameters;
8use crate::congestion::{CongestionControl, sendme};
9use crate::memquota::{SpecificAccount, StreamAccount};
10use crate::stream::CloseStreamBehavior;
11use crate::stream::SEND_WINDOW_INIT;
12use crate::stream::StreamMpscSender;
13use crate::stream::cmdcheck::{AnyCmdChecker, StreamStatus};
14use crate::stream::flow_ctrl::params::FlowCtrlParameters;
15use crate::stream::flow_ctrl::state::{FlowCtrlHooks, StreamFlowCtrl, StreamRateLimit};
16use crate::stream::flow_ctrl::xon_xoff::reader::DrainRateRequest;
17use crate::stream::queue::{StreamQueueReceiver, stream_queue};
18use crate::streammap::{
19 self, EndSentStreamEnt, OpenStreamEnt, ShouldSendEnd, StreamEntMut, StreamMap,
20};
21use crate::util::notify::{NotifyReceiver, NotifySender};
22use crate::{Error, HopNum, Result};
23
24use derive_deftly::Deftly;
25use postage::watch;
26use safelog::sensitive as sv;
27use tracing::{debug, trace};
28
29use tor_cell::chancell::BoxedCellBody;
30use tor_cell::relaycell::extend::{CcRequest, CircRequestExt};
31use tor_cell::relaycell::flow_ctrl::{Xoff, Xon, XonKbpsEwma};
32use tor_cell::relaycell::msg::AnyRelayMsg;
33use tor_cell::relaycell::{
34 AnyRelayMsgOuter, RelayCellDecoder, RelayCellDecoderResult, RelayCellFormat, RelayCmd,
35 StreamId, UnparsedRelayMsg,
36};
37use tor_error::{Bug, internal};
38use tor_memquota::derive_deftly_template_HasMemoryCost;
39use tor_memquota::mq_queue::{ChannelSpec as _, MpscSpec};
40use tor_protover::named;
41use tor_rtcompat::DynTimeProvider;
42
43use std::num::NonZeroU32;
44use std::pin::Pin;
45use std::result::Result as StdResult;
46use std::sync::{Arc, Mutex};
47use web_time_compat::Instant;
48
49#[cfg(test)]
50use tor_cell::relaycell::msg::SendmeTag;
51
52use cfg_if::cfg_if;
53
54/// The size of the stream's outbound RELAY message queue.
55// TODO(tuning): figure out if this is a good size for this buffer
56const CIRCUIT_BUFFER_SIZE: usize = 128;
57
58/// Type of negotiation that we'll be performing as we establish a hop.
59///
60/// Determines what flavor of extensions we can send and receive, which in turn
61/// limits the hop settings we can negotiate.
62///
63// TODO-CGO: This is likely to be refactored when we finally add support for
64// HsV3+CGO, which will require refactoring
65#[derive(Debug, Clone, Copy, Eq, PartialEq)]
66pub(crate) enum HopNegotiationType {
67 /// We're using a handshake in which extension-based negotiation cannot occur.
68 None,
69 /// We're using the HsV3-ntor handshake, in which the client can send extensions,
70 /// but the server cannot.
71 ///
72 /// As a special case, the default relay encryption protocol is the hsv3
73 /// variant of Tor1.
74 //
75 // We would call this "HalfDuplex" or something, but we do not expect to add
76 // any more handshakes of this type.
77 HsV3,
78 /// We're using a handshake in which both client and relay can send extensions.
79 Full,
80}
81
82/// The settings we use for single hop of a circuit.
83///
84/// Unlike [`CircParameters`], this type is crate-internal.
85/// We construct it based on our settings from the circuit,
86/// and from the hop's actual capabilities.
87/// Then, we negotiate with the hop as part of circuit
88/// creation/extension to determine the actual settings that will be in use.
89/// Finally, we use those settings to construct the negotiated circuit hop.
90//
91// TODO: Relays should probably derive an instance of this type too, as
92// part of the circuit creation handshake.
93#[derive(Clone, Debug)]
94pub(crate) struct HopSettings {
95 /// The negotiated congestion control settings for this hop .
96 pub(crate) ccontrol: CongestionControlParams,
97
98 /// Flow control parameters that will be used for streams on this hop.
99 pub(crate) flow_ctrl_params: FlowCtrlParameters,
100
101 /// Maximum number of permitted incoming relay cells for this hop.
102 pub(crate) n_incoming_cells_permitted: Option<u32>,
103
104 /// Maximum number of permitted outgoing relay cells for this hop.
105 pub(crate) n_outgoing_cells_permitted: Option<u32>,
106
107 /// The relay cell encryption algorithm and cell format for this hop.
108 relay_crypt_protocol: RelayCryptLayerProtocol,
109}
110
111impl HopSettings {
112 /// Construct a new `HopSettings` based on `params` (a set of circuit parameters)
113 /// and `caps` (a set of protocol capabilities for a circuit target).
114 ///
115 /// The resulting settings will represent what the client would prefer to negotiate
116 /// (determined by `params`),
117 /// as modified by what the target relay is believed to support (represented by `caps`).
118 ///
119 /// This represents the `HopSettings` in a pre-negotiation state:
120 /// the circuit negotiation process will modify it.
121 #[allow(clippy::unnecessary_wraps)] // likely to become fallible in the future.
122 pub(crate) fn from_params_and_caps(
123 hoptype: HopNegotiationType,
124 params: &CircParameters,
125 caps: &tor_protover::Protocols,
126 ) -> Result<Self> {
127 let mut ccontrol = params.ccontrol.clone();
128 match ccontrol.alg() {
129 crate::ccparams::Algorithm::FixedWindow(_) => {}
130 crate::ccparams::Algorithm::Vegas(_) => {
131 // If the target doesn't support FLOWCTRL_CC, we can't use Vegas.
132 if !caps.supports_named_subver(named::FLOWCTRL_CC) {
133 ccontrol.use_fallback_alg();
134 }
135 }
136 };
137 if hoptype == HopNegotiationType::None {
138 ccontrol.use_fallback_alg();
139 } else if hoptype == HopNegotiationType::HsV3 {
140 // TODO #2037, TODO-CGO: We need a way to send congestion control extensions
141 // in this case too. But since we aren't sending them, we
142 // should use the fallback algorithm.
143 ccontrol.use_fallback_alg();
144 }
145 let ccontrol = ccontrol; // drop mut
146
147 // Negotiate CGO if it is supported, if CC is also supported,
148 // and if CGO is available on this relay.
149 let relay_crypt_protocol = match hoptype {
150 HopNegotiationType::None => RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0),
151 HopNegotiationType::HsV3 => {
152 // TODO-CGO: Support CGO when available.
153 cfg_if! {
154 if #[cfg(feature = "hs-common")] {
155 RelayCryptLayerProtocol::HsV3(RelayCellFormat::V0)
156 } else {
157 return Err(
158 tor_error::internal!("Unexpectedly tried to negotiate HsV3 without support!").into(),
159 );
160 }
161 }
162 }
163 HopNegotiationType::Full => {
164 cfg_if! {
165 if #[cfg(all(feature = "flowctl-cc", feature = "counter-galois-onion"))] {
166 #[allow(clippy::overly_complex_bool_expr)]
167 if ccontrol.alg().compatible_with_cgo()
168 && caps.supports_named_subver(named::RELAY_NEGOTIATE_SUBPROTO)
169 && caps.supports_named_subver(named::RELAY_CRYPT_CGO)
170 {
171 RelayCryptLayerProtocol::Cgo
172 } else {
173 RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0)
174 }
175 } else {
176 RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0)
177 }
178 }
179 }
180 };
181
182 Ok(Self {
183 ccontrol,
184 flow_ctrl_params: params.flow_ctrl.clone(),
185 relay_crypt_protocol,
186 n_incoming_cells_permitted: params.n_incoming_cells_permitted,
187 n_outgoing_cells_permitted: params.n_outgoing_cells_permitted,
188 })
189 }
190
191 /// Return the negotiated relay crypto protocol.
192 pub(crate) fn relay_crypt_protocol(&self) -> RelayCryptLayerProtocol {
193 self.relay_crypt_protocol
194 }
195
196 /// Return the client circuit-creation extensions that we should use in order to negotiate
197 /// these circuit hop parameters.
198 #[allow(clippy::unnecessary_wraps)]
199 pub(crate) fn circuit_request_extensions(&self) -> Result<Vec<CircRequestExt>> {
200 // allow 'unused_mut' because of the combinations of `cfg` conditions below
201 #[allow(unused_mut)]
202 let mut client_extensions = Vec::new();
203
204 #[allow(unused, unused_mut)]
205 let mut cc_extension_set = false;
206
207 if self.ccontrol.is_enabled() {
208 cfg_if::cfg_if! {
209 if #[cfg(feature = "flowctl-cc")] {
210 client_extensions.push(CircRequestExt::CcRequest(CcRequest::default()));
211 cc_extension_set = true;
212 } else {
213 return Err(
214 tor_error::internal!(
215 "Congestion control is enabled on this circuit, but 'flowctl-cc' feature is not enabled"
216 )
217 .into()
218 );
219 }
220 }
221 }
222
223 // See whether we need to send a list of required protocol capabilities.
224 // These aren't "negotiated" per se; they're simply demanded.
225 // The relay will refuse the circuit if it doesn't support all of them,
226 // and if any of them isn't supported in the SubprotocolRequest extension.
227 //
228 // (In other words, don't add capabilities here just because you want the
229 // relay to have them! They must be explicitly listed as supported for use
230 // with this extension. For the current list, see
231 // https://spec.torproject.org/tor-spec/create-created-cells.html#subproto-request)
232 //
233 #[allow(unused_mut)]
234 let mut required_protocol_capabilities: Vec<tor_protover::NamedSubver> = Vec::new();
235
236 #[cfg(feature = "counter-galois-onion")]
237 if matches!(self.relay_crypt_protocol(), RelayCryptLayerProtocol::Cgo) {
238 if !cc_extension_set {
239 return Err(tor_error::internal!("Tried to negotiate CGO without CC.").into());
240 }
241 required_protocol_capabilities.push(tor_protover::named::RELAY_CRYPT_CGO);
242 }
243
244 if !required_protocol_capabilities.is_empty() {
245 client_extensions.push(CircRequestExt::SubprotocolRequest(
246 required_protocol_capabilities.into_iter().collect(),
247 ));
248 }
249
250 Ok(client_extensions)
251 }
252}
253
254#[cfg(test)]
255impl std::default::Default for CircParameters {
256 fn default() -> Self {
257 Self {
258 extend_by_ed25519_id: true,
259 ccontrol: crate::congestion::test_utils::params::build_cc_fixed_params(),
260 flow_ctrl: FlowCtrlParameters::defaults_for_tests(),
261 n_incoming_cells_permitted: None,
262 n_outgoing_cells_permitted: None,
263 }
264 }
265}
266
267impl CircParameters {
268 /// Constructor
269 pub fn new(
270 extend_by_ed25519_id: bool,
271 ccontrol: CongestionControlParams,
272 flow_ctrl: FlowCtrlParameters,
273 ) -> Self {
274 Self {
275 extend_by_ed25519_id,
276 ccontrol,
277 flow_ctrl,
278 n_incoming_cells_permitted: None,
279 n_outgoing_cells_permitted: None,
280 }
281 }
282}
283
284/// Instructions for sending a RELAY cell.
285///
286/// This instructs a circuit reactor to send a RELAY cell to a given target
287/// (a hop, if we are a client, or the client, if we are a relay).
288#[derive(educe::Educe)]
289#[educe(Debug)]
290pub(crate) struct SendRelayCell {
291 /// The hop number, or `None` if we are a relay.
292 pub(crate) hop: Option<HopNum>,
293 /// Whether to use a RELAY_EARLY cell.
294 pub(crate) early: bool,
295 /// The cell to send.
296 pub(crate) cell: AnyRelayMsgOuter,
297}
298
299/// The inbound state of a hop.
300pub(crate) struct CircHopInbound {
301 /// Decodes relay cells received from this hop.
302 decoder: RelayCellDecoder,
303 /// Remaining permitted incoming relay cells from this hop, plus 1.
304 ///
305 /// (In other words, `None` represents no limit,
306 /// `Some(1)` represents an exhausted limit,
307 /// and `Some(n)` means that n-1 more cells may be received.)
308 ///
309 /// If this ever decrements from Some(1), then the circuit must be torn down with an error.
310 n_incoming_cells_permitted: Option<NonZeroU32>,
311}
312
313/// The outbound state of a hop.
314pub(crate) struct CircHopOutbound {
315 /// Congestion control object.
316 ///
317 /// This object is also in charge of handling circuit level SENDME logic for this hop.
318 ccontrol: Arc<Mutex<CongestionControl>>,
319 /// Map from stream IDs to streams.
320 ///
321 /// We store this with the reactor instead of the circuit, since the
322 /// reactor needs it for every incoming cell on a stream, whereas
323 /// the circuit only needs it when allocating new streams.
324 ///
325 /// NOTE: this is behind a mutex because the client reactor polls the `StreamMap`s
326 /// of all hops concurrently, in a `FuturesUnordered`. Without the mutex,
327 /// this wouldn't be possible, because it would mean holding multiple
328 /// mutable references to `self` (the reactor). Note, however,
329 /// that there should never be any contention on this mutex:
330 /// we never create more than one
331 /// `CircHopList::ready_streams_iterator()` stream
332 /// at a time, and we never clone/lock the hop's `StreamMap` outside of it.
333 ///
334 /// Additionally, the stream map of the last hop (join point) of a conflux tunnel
335 /// is shared with all the circuits in the tunnel.
336 map: Arc<Mutex<StreamMap>>,
337 /// Format to use for relay cells.
338 //
339 // When we have packed/fragmented cells, this may be replaced by a RelayCellEncoder.
340 relay_format: RelayCellFormat,
341 /// Flow control parameters for new streams.
342 flow_ctrl_params: Arc<FlowCtrlParameters>,
343 /// Remaining permitted outgoing relay cells from this hop, plus 1.
344 ///
345 /// If this ever decrements from Some(1), then the circuit must be torn down with an error.
346 n_outgoing_cells_permitted: Option<NonZeroU32>,
347}
348
349impl CircHopInbound {
350 /// Create a new [`CircHopInbound`].
351 pub(crate) fn new(decoder: RelayCellDecoder, settings: &HopSettings) -> Self {
352 Self {
353 decoder,
354 n_incoming_cells_permitted: settings.n_incoming_cells_permitted.map(cvt),
355 }
356 }
357
358 /// Parse a RELAY or RELAY_EARLY cell body.
359 ///
360 /// Requires that the cryptographic checks on the message have already been
361 /// performed
362 pub(crate) fn decode(&mut self, cell: BoxedCellBody) -> Result<RelayCellDecoderResult> {
363 self.decoder
364 .decode(cell)
365 .map_err(|e| Error::from_bytes_err(e, "relay cell"))
366 }
367
368 /// Decrement the limit of inbound cells that may be received from this hop; give
369 /// an error if it would reach zero.
370 pub(crate) fn decrement_cell_limit(&mut self) -> Result<()> {
371 try_decrement_cell_limit(&mut self.n_incoming_cells_permitted)
372 .map_err(|_| Error::ExcessInboundCells)
373 }
374}
375
376impl CircHopOutbound {
377 /// Create a new [`CircHopOutbound`].
378 pub(crate) fn new(
379 ccontrol: Arc<Mutex<CongestionControl>>,
380 relay_format: RelayCellFormat,
381 flow_ctrl_params: Arc<FlowCtrlParameters>,
382 settings: &HopSettings,
383 ) -> Self {
384 Self {
385 ccontrol,
386 map: Arc::new(Mutex::new(StreamMap::new())),
387 relay_format,
388 flow_ctrl_params,
389 n_outgoing_cells_permitted: settings.n_outgoing_cells_permitted.map(cvt),
390 }
391 }
392
393 /// Start a stream. Creates an entry in the stream map with the given channels, and sends the
394 /// `message` to the provided hop.
395 pub(crate) fn begin_stream(
396 &mut self,
397 hop: Option<HopNum>,
398 message: AnyRelayMsg,
399 time_prov: &DynTimeProvider,
400 cmd_checker: AnyCmdChecker,
401 memquota: &StreamAccount,
402 ) -> Result<(SendRelayCell, StreamId, ReactorStreamComponents)> {
403 // TODO: This has a lot of duplicated code with `Self::add_ent_with_id()`.
404
405 // A channel for the reactor to inform the writer of a new rate limit.
406 let (rate_limit_tx, rate_limit_rx) = watch::channel_with(StreamRateLimit::MAX);
407
408 // A channel for the reactor to request a new drain rate from the reader.
409 // Typically this notification will be sent after an XOFF is sent so that the reader can
410 // send us a new drain rate when the stream data queue becomes empty.
411 let mut drain_rate_request_tx = NotifySender::new_typed();
412 let drain_rate_request_rx = drain_rate_request_tx.subscribe();
413
414 let flow_ctrl = self.build_flow_ctrl(rate_limit_tx, drain_rate_request_tx)?;
415
416 let stream_queue_max_len = flow_ctrl.inbound_queue_max_len();
417
418 // A queue for inbound RELAY messages.
419 let (sender, receiver) = stream_queue(stream_queue_max_len, memquota, time_prov)?;
420
421 // A queue for outbound RELAY messages.
422 let (msg_tx, msg_rx) = MpscSpec::new(CIRCUIT_BUFFER_SIZE)
423 .new_mq(time_prov.clone(), memquota.as_raw_account())?;
424
425 let r = self.map.lock().expect("lock poisoned").add_ent(
426 sender,
427 msg_rx,
428 flow_ctrl,
429 cmd_checker,
430 )?;
431 let cell = AnyRelayMsgOuter::new(Some(r), message);
432
433 let stream_components = ReactorStreamComponents {
434 stream_inbound_rx: receiver,
435 stream_outbound_tx: msg_tx,
436 rate_limit_rx,
437 drain_rate_request_rx,
438 };
439
440 Ok((
441 SendRelayCell {
442 hop,
443 early: false,
444 cell,
445 },
446 r,
447 stream_components,
448 ))
449 }
450
451 /// Close the stream associated with `id` because the stream was dropped.
452 ///
453 /// If we have not already received an END cell on this stream, send one.
454 /// If no END cell is specified, an END cell with the reason byte set to
455 /// REASON_MISC will be sent.
456 ///
457 // Note(relay): `circ_id` is an opaque displayable type
458 // because relays use a different circuit ID type
459 // than clients. Eventually, we should probably make
460 // them both use the same ID type, or have a nicer approach here
461 pub(crate) fn close_stream(
462 &mut self,
463 circ_id: impl std::fmt::Display,
464 id: StreamId,
465 hop: Option<HopNum>,
466 message: CloseStreamBehavior,
467 why: streammap::TerminateReason,
468 expiry: Instant,
469 ) -> Result<Option<SendRelayCell>> {
470 let should_send_end = self
471 .map
472 .lock()
473 .expect("lock poisoned")
474 .terminate(id, why, expiry)?;
475 trace!(
476 circ_id = %circ_id,
477 stream_id = %id,
478 should_send_end = ?should_send_end,
479 "Ending stream",
480 );
481 // TODO: I am about 80% sure that we only send an END cell if
482 // we didn't already get an END cell. But I should double-check!
483 if let (ShouldSendEnd::Send, CloseStreamBehavior::SendEnd(end_message)) =
484 (should_send_end, message)
485 {
486 let end_cell = AnyRelayMsgOuter::new(Some(id), end_message.into());
487 let cell = SendRelayCell {
488 hop,
489 early: false,
490 cell: end_cell,
491 };
492
493 return Ok(Some(cell));
494 }
495 Ok(None)
496 }
497
498 /// Check if we should send an XON message.
499 ///
500 /// If we should, then returns the XON message that should be sent.
501 pub(crate) fn maybe_send_xon(
502 &mut self,
503 rate: XonKbpsEwma,
504 id: StreamId,
505 ) -> Result<Option<Xon>> {
506 // the call below will return an error if XON/XOFF aren't supported,
507 // so we check for support here
508 if !self
509 .ccontrol()
510 .lock()
511 .expect("poisoned lock")
512 .uses_xon_xoff()
513 {
514 return Ok(None);
515 }
516
517 let mut map = self.map.lock().expect("lock poisoned");
518 let Some(StreamEntMut::Open(ent)) = map.get_mut(id) else {
519 // stream went away
520 return Ok(None);
521 };
522
523 ent.maybe_send_xon(rate)
524 }
525
526 /// Check if we should send an XOFF message.
527 ///
528 /// If we should, then returns the XOFF message that should be sent.
529 pub(crate) fn maybe_send_xoff(&mut self, id: StreamId) -> Result<Option<Xoff>> {
530 // the call below will return an error if XON/XOFF aren't supported,
531 // so we check for support here
532 if !self
533 .ccontrol()
534 .lock()
535 .expect("poisoned lock")
536 .uses_xon_xoff()
537 {
538 return Ok(None);
539 }
540
541 let mut map = self.map.lock().expect("lock poisoned");
542 let Some(StreamEntMut::Open(ent)) = map.get_mut(id) else {
543 // stream went away
544 return Ok(None);
545 };
546
547 ent.maybe_send_xoff()
548 }
549
550 /// Return the format that is used for relay cells sent to this hop.
551 ///
552 /// For the most part, this format isn't necessary to interact with a CircHop;
553 /// it becomes relevant when we are deciding _what_ we can encode for the hop.
554 pub(crate) fn relay_cell_format(&self) -> RelayCellFormat {
555 self.relay_format
556 }
557
558 /// Delegate to CongestionControl, for testing purposes
559 #[cfg(test)]
560 pub(crate) fn send_window_and_expected_tags(&self) -> (u32, Vec<SendmeTag>) {
561 self.ccontrol()
562 .lock()
563 .expect("poisoned lock")
564 .send_window_and_expected_tags()
565 }
566
567 /// Return the number of open streams on this hop.
568 ///
569 /// WARNING: because this locks the stream map mutex,
570 /// it should never be called from a context where that mutex is already locked.
571 pub(crate) fn n_open_streams(&self) -> usize {
572 self.map.lock().expect("lock poisoned").n_open_streams()
573 }
574
575 /// Return a reference to our CongestionControl object.
576 pub(crate) fn ccontrol(&self) -> &Arc<Mutex<CongestionControl>> {
577 &self.ccontrol
578 }
579
580 /// We're about to send `msg`.
581 ///
582 /// See [`OpenStreamEnt::about_to_send`](crate::streammap::OpenStreamEnt::about_to_send).
583 //
584 // TODO prop340: This should take a cell or similar, not a message.
585 //
586 // Note(relay): `circ_id` is an opaque displayable type
587 // because relays use a different circuit ID type
588 // than clients. Eventually, we should probably make
589 // them both use the same ID type, or have a nicer approach here
590 pub(crate) fn about_to_send(
591 &mut self,
592 circ_id: impl std::fmt::Display,
593 stream_id: StreamId,
594 msg: &AnyRelayMsg,
595 ) -> Result<()> {
596 let mut hop_map = self.map.lock().expect("lock poisoned");
597 let Some(StreamEntMut::Open(ent)) = hop_map.get_mut(stream_id) else {
598 // This can happen when we have outgoing data queued when we received an END.
599 // We shouldn't return an error here since it would close the circuit along with all
600 // other streams, and instead we just let the caller send this message anyways.
601 // Also the caller only calls `about_to_send()` for DATA cells,
602 // which means that other non-DATA cells don't hit this code path and are always sent,
603 // and so we should handle all cell types consistently.
604 // TODO: We should drop the message and not send it,
605 // but the caller of `about_to_send()` isn't designed to handle fallible sends
606 // so it would need some refactoring to handle this.
607 debug!(
608 circ_id = %circ_id,
609 stream_id = %stream_id,
610 "sending a relay cell for non-existent or non-open stream!",
611 );
612 return Ok(());
613 };
614
615 ent.about_to_send(msg)
616 }
617
618 /// Add an entry to this map using the specified StreamId.
619 #[cfg(any(feature = "hs-service", feature = "relay"))]
620 pub(crate) fn add_ent_with_id(
621 &self,
622 time_prov: &DynTimeProvider,
623 stream_id: StreamId,
624 cmd_checker: AnyCmdChecker,
625 memquota: &StreamAccount,
626 ) -> Result<ReactorStreamComponents> {
627 // TODO: This has a lot of duplicated code with `Self::begin_stream()`.
628
629 // A channel for the reactor to inform the writer of a new rate limit.
630 let (rate_limit_tx, rate_limit_rx) = watch::channel_with(StreamRateLimit::MAX);
631
632 // A channel for the reactor to request a new drain rate from the reader.
633 // Typically this notification will be sent after an XOFF is sent so that the reader can
634 // send us a new drain rate when the stream data queue becomes empty.
635 let mut drain_rate_request_tx = NotifySender::new_typed();
636 let drain_rate_request_rx = drain_rate_request_tx.subscribe();
637
638 let flow_ctrl = self.build_flow_ctrl(rate_limit_tx, drain_rate_request_tx)?;
639
640 let stream_queue_max_len = flow_ctrl.inbound_queue_max_len();
641
642 // A queue for inbound RELAY messages.
643 let (sender, receiver) = stream_queue(stream_queue_max_len, memquota, time_prov)?;
644
645 // A queue for outbound RELAY messages.
646 let (msg_tx, msg_rx) = MpscSpec::new(CIRCUIT_BUFFER_SIZE)
647 .new_mq(time_prov.clone(), memquota.as_raw_account())?;
648
649 let mut hop_map = self.map.lock().expect("lock poisoned");
650 hop_map.add_ent_with_id(sender, msg_rx, flow_ctrl, stream_id, cmd_checker)?;
651
652 Ok(ReactorStreamComponents {
653 stream_inbound_rx: receiver,
654 stream_outbound_tx: msg_tx,
655 rate_limit_rx,
656 drain_rate_request_rx,
657 })
658 }
659
660 /// Builds the reactor's flow control handler for a new stream.
661 // TODO: remove the `Result` once we remove the "flowctl-cc" feature
662 #[cfg_attr(feature = "flowctl-cc", expect(clippy::unnecessary_wraps))]
663 fn build_flow_ctrl(
664 &self,
665 rate_limit_updater: watch::Sender<StreamRateLimit>,
666 drain_rate_requester: NotifySender<DrainRateRequest>,
667 ) -> Result<StreamFlowCtrl> {
668 let params = Arc::clone(&self.flow_ctrl_params);
669
670 if self
671 .ccontrol()
672 .lock()
673 .expect("poisoned lock")
674 .uses_stream_sendme()
675 {
676 let window = sendme::StreamSendWindow::new(SEND_WINDOW_INIT);
677 Ok(StreamFlowCtrl::new_window(window))
678 } else {
679 cfg_if::cfg_if! {
680 if #[cfg(feature = "flowctl-cc")] {
681 // TODO: Currently arti only supports clients, and we don't support connecting
682 // to onion services while using congestion control, so we hardcode this. In the
683 // future we will need to somehow tell the `CircHop` this so that we can set it
684 // correctly, since we don't want to enable this at exits.
685 let use_sidechannel_mitigations = true;
686
687 Ok(StreamFlowCtrl::new_xon_xoff(
688 params,
689 use_sidechannel_mitigations,
690 rate_limit_updater,
691 drain_rate_requester,
692 ))
693 } else {
694 drop(params);
695 drop(rate_limit_updater);
696 drop(drain_rate_requester);
697 Err(internal!(
698 "`CongestionControl` doesn't use sendmes, but 'flowctl-cc' feature not enabled",
699 ).into())
700 }
701 }
702 }
703 }
704
705 /// Deliver `msg` to the specified open stream entry `ent`.
706 fn deliver_msg_to_stream(
707 streamid: StreamId,
708 ent: &mut OpenStreamEnt,
709 cell_counts_toward_windows: bool,
710 msg: UnparsedRelayMsg,
711 ) -> Result<bool> {
712 use tor_async_utils::SinkTrySend as _;
713 use tor_async_utils::SinkTrySendError as _;
714
715 // The stream for this message exists, and is open.
716
717 // We need to handle SENDME/XON/XOFF messages here, not in the stream's recv() method, or
718 // else we'd never notice them if the stream isn't reading.
719 match msg.cmd() {
720 RelayCmd::SENDME => {
721 ent.put_for_incoming_sendme(msg)?;
722 return Ok(false);
723 }
724 RelayCmd::XON => {
725 ent.handle_incoming_xon(msg)?;
726 return Ok(false);
727 }
728 RelayCmd::XOFF => {
729 ent.handle_incoming_xoff(msg)?;
730 return Ok(false);
731 }
732 _ => {}
733 }
734
735 let message_closes_stream = ent.cmd_checker.check_msg(&msg)? == StreamStatus::Closed;
736
737 if let Err(e) = Pin::new(&mut ent.sink).try_send(msg) {
738 if e.is_full() {
739 cfg_if::cfg_if! {
740 if #[cfg(not(feature = "flowctl-cc"))] {
741 // If we get here, we either have a logic bug (!), or an attacker
742 // is sending us more cells than we asked for via congestion control.
743 return Err(Error::CircProto(format!(
744 "Stream sink would block; received too many cells on stream ID {}",
745 sv(streamid),
746 )));
747 } else {
748 return Err(internal!(
749 "Stream (ID {}) uses an unbounded queue, but apparently it's full?",
750 sv(streamid),
751 )
752 .into());
753 }
754 }
755 }
756 if e.is_disconnected() && cell_counts_toward_windows {
757 // the other side of the stream has gone away; remember
758 // that we received a cell that we couldn't queue for it.
759 //
760 // Later this value will be recorded in a half-stream.
761 ent.dropped += 1;
762 }
763 }
764
765 Ok(message_closes_stream)
766 }
767
768 /// Note that we received an END message (or other message indicating the end of
769 /// the stream) on the stream with `id`.
770 ///
771 /// See [`StreamMap::ending_msg_received`](crate::streammap::StreamMap::ending_msg_received).
772 #[cfg(feature = "hs-service")]
773 pub(crate) fn ending_msg_received(&self, stream_id: StreamId) -> Result<()> {
774 let mut hop_map = self.map.lock().expect("lock poisoned");
775
776 hop_map.ending_msg_received(stream_id)?;
777
778 Ok(())
779 }
780
781 /// Handle `msg`, delivering it to the stream with the specified `streamid` if appropriate.
782 ///
783 /// Returns back the provided `msg`, if the message is an incoming stream request
784 /// that needs to be handled by the calling code.
785 ///
786 // TODO: the above is a bit of a code smell -- we should try to avoid passing the msg
787 // back and forth like this.
788 pub(crate) fn handle_msg<F>(
789 &self,
790 possible_proto_violation_err: F,
791 cell_counts_toward_windows: bool,
792 streamid: StreamId,
793 msg: UnparsedRelayMsg,
794 now: Instant,
795 ) -> Result<Option<UnparsedRelayMsg>>
796 where
797 F: FnOnce(StreamId) -> Error,
798 {
799 let mut hop_map = self.map.lock().expect("lock poisoned");
800
801 match hop_map.get_mut(streamid) {
802 Some(StreamEntMut::Open(ent)) => {
803 // Can't have a stream level SENDME when congestion control is enabled.
804 let message_closes_stream =
805 Self::deliver_msg_to_stream(streamid, ent, cell_counts_toward_windows, msg)?;
806
807 if message_closes_stream {
808 hop_map.ending_msg_received(streamid)?;
809 }
810 }
811 Some(StreamEntMut::EndSent(EndSentStreamEnt { expiry, .. })) if now >= *expiry => {
812 return Err(possible_proto_violation_err(streamid));
813 }
814 #[cfg(feature = "hs-service")]
815 Some(StreamEntMut::EndSent(_))
816 if matches!(
817 msg.cmd(),
818 RelayCmd::BEGIN | RelayCmd::BEGIN_DIR | RelayCmd::RESOLVE
819 ) =>
820 {
821 // If the other side is sending us a BEGIN but hasn't yet acknowledged our END
822 // message, just remove the old stream from the map and stop waiting for a
823 // response
824 hop_map.ending_msg_received(streamid)?;
825 return Ok(Some(msg));
826 }
827 Some(StreamEntMut::EndSent(EndSentStreamEnt { half_stream, .. })) => {
828 // We sent an end but maybe the other side hasn't heard.
829
830 match half_stream.handle_msg(msg)? {
831 StreamStatus::Open => {}
832 StreamStatus::Closed => {
833 hop_map.ending_msg_received(streamid)?;
834 }
835 }
836 }
837 #[cfg(feature = "hs-service")]
838 None if matches!(
839 msg.cmd(),
840 RelayCmd::BEGIN | RelayCmd::BEGIN_DIR | RelayCmd::RESOLVE
841 ) =>
842 {
843 return Ok(Some(msg));
844 }
845 _ => {
846 // No stream wants this message, or ever did.
847 return Err(possible_proto_violation_err(streamid));
848 }
849 }
850
851 Ok(None)
852 }
853
854 /// Get the stream map of this hop.
855 pub(crate) fn stream_map(&self) -> &Arc<Mutex<StreamMap>> {
856 &self.map
857 }
858
859 /// Set the stream map of this hop to `map`.
860 ///
861 /// Returns an error if the existing stream map of the hop has any open stream.
862 pub(crate) fn set_stream_map(&mut self, map: Arc<Mutex<StreamMap>>) -> StdResult<(), Bug> {
863 if self.n_open_streams() != 0 {
864 return Err(internal!("Tried to discard existing open streams?!"));
865 }
866
867 self.map = map;
868
869 Ok(())
870 }
871
872 /// Decrement the limit of outbound cells that may be sent to this hop; give
873 /// an error if it would reach zero.
874 pub(crate) fn decrement_cell_limit(&mut self) -> Result<()> {
875 try_decrement_cell_limit(&mut self.n_outgoing_cells_permitted)
876 .map_err(|_| Error::ExcessOutboundCells)
877 }
878}
879
880/// If `val` is `Some(1)`, return Err(());
881/// otherwise decrement it (if it is Some) and return Ok(()).
882#[inline]
883fn try_decrement_cell_limit(val: &mut Option<NonZeroU32>) -> StdResult<(), ()> {
884 // This is a bit verbose, but I've confirmed that it optimizes nicely.
885 match val {
886 Some(x) => {
887 let z = u32::from(*x);
888 if z == 1 {
889 Err(())
890 } else {
891 *x = (z - 1).try_into().expect("NonZeroU32 was zero?!");
892 Ok(())
893 }
894 }
895 None => Ok(()),
896 }
897}
898
899/// Convert a limit from the form used in a HopSettings to that used here.
900/// (The format we use here is more compact.)
901fn cvt(limit: u32) -> NonZeroU32 {
902 // See "known limitations" comment on n_incoming_cells_permitted.
903 limit
904 .saturating_add(1)
905 .try_into()
906 .expect("Adding one left it as zero?")
907}
908
909/// A collection of components that can be used to interact with the reactor's view of a Tor stream.
910//
911// TODO: We also have a `StreamComponents` type that is used and built outside of the reactor.
912// It's maybe confusing to have these similar type names, so a better name would be nice.
913//
914// TODO(arti#2068): The components we return should maybe depend on what type of flow control is
915// used, so in the future we might want to make some of these fields optional.
916#[derive(Debug, Deftly)]
917#[derive_deftly(HasMemoryCost)]
918pub(crate) struct ReactorStreamComponents {
919 /// An MPSC receiver for inbound messages that arrive on the stream.
920 #[deftly(has_memory_cost(indirect_size = "0"))] // estimate
921 pub(crate) stream_inbound_rx: StreamQueueReceiver,
922
923 /// An MPSC sender for outbound messages to be sent on the stream.
924 #[deftly(has_memory_cost(indirect_size = "size_of::<AnyRelayMsg>()"))] // estimate
925 pub(crate) stream_outbound_tx: StreamMpscSender<AnyRelayMsg>,
926
927 /// A mechanism to allow the stream's writer to receive rate limit updates from the reactor.
928 // The `watch::Sender` owns the indirect data.
929 #[deftly(has_memory_cost(indirect_size = "0"))]
930 pub(crate) rate_limit_rx: watch::Receiver<StreamRateLimit>,
931
932 /// A mechanism to allow the stream's reader to receive drain rate update requests from the
933 /// reactor.
934 #[deftly(has_memory_cost(indirect_size = "0"))]
935 pub(crate) drain_rate_request_rx: NotifyReceiver<DrainRateRequest>,
936}