tor_proto/tunnel/reactor.rs
1//! Code to handle incoming cells on a circuit.
2//!
3//! ## On message validation
4//!
5//! There are three steps for validating an incoming message on a stream:
6//!
7//! 1. Is the message contextually appropriate? (e.g., no more than one
8//! `CONNECTED` message per stream.) This is handled by calling
9//! [`CmdChecker::check_msg`](crate::stream::CmdChecker::check_msg).
10//! 2. Does the message comply with flow-control rules? (e.g., no more data than
11//! we've gotten SENDMEs for.) For open streams, the stream itself handles
12//! this; for half-closed streams, the reactor handles it using the
13//! `halfstream` module.
14//! 3. Does the message have an acceptable command type, and is the message
15//! well-formed? For open streams, the streams themselves handle this check.
16//! For half-closed streams, the reactor handles it by calling
17//! `consume_checked_msg()`.
18
19mod conflux;
20mod control;
21mod create;
22mod extender;
23pub(super) mod syncview;
24
25use super::handshake::RelayCryptLayerProtocol;
26use crate::congestion::sendme::{self, CircTag};
27use crate::congestion::{CongestionControl, CongestionSignals};
28use crate::crypto::binding::CircuitBinding;
29use crate::crypto::cell::{
30 HopNum, InboundClientCrypt, InboundClientLayer, OutboundClientCrypt, OutboundClientLayer,
31 RelayCellBody, SENDME_TAG_LEN,
32};
33use crate::crypto::handshake::fast::CreateFastClient;
34#[cfg(feature = "ntor_v3")]
35use crate::crypto::handshake::ntor_v3::{NtorV3Client, NtorV3PublicKey};
36use crate::memquota::{CircuitAccount, SpecificAccount as _, StreamAccount};
37use crate::stream::{AnyCmdChecker, StreamStatus};
38use crate::tunnel::circuit::celltypes::{ClientCircChanMsg, CreateResponse};
39use crate::tunnel::circuit::handshake::{BoxedClientLayer, HandshakeRole};
40use crate::tunnel::circuit::unique_id::UniqId;
41use crate::tunnel::circuit::MutableState;
42use crate::tunnel::circuit::{CircParameters, CircuitRxReceiver};
43use crate::tunnel::streammap::{
44 self, EndSentStreamEnt, OpenStreamEnt, ShouldSendEnd, StreamEntMut,
45};
46use crate::util::err::ReactorError;
47use crate::util::skew::ClockSkew;
48use crate::util::sometimes_unbounded_sink::SometimesUnboundedSink;
49use crate::util::SinkExt as _;
50use crate::{Error, Result};
51use conflux::ConfluxSet;
52use control::ControlHandler;
53use futures::stream::FuturesUnordered;
54use std::borrow::Borrow;
55use std::mem::size_of;
56use std::pin::Pin;
57use tor_cell::chancell::msg::{AnyChanMsg, HandshakeType, Relay};
58use tor_cell::relaycell::msg::{AnyRelayMsg, End, Sendme, Truncated};
59use tor_cell::relaycell::{
60 AnyRelayMsgOuter, RelayCellDecoder, RelayCellDecoderResult, RelayCellFormat, RelayCmd,
61 StreamId, UnparsedRelayMsg,
62};
63use tor_error::{internal, Bug};
64#[cfg(feature = "hs-service")]
65use {
66 crate::stream::{DataCmdChecker, IncomingStreamRequest, IncomingStreamRequestFilter},
67 tor_cell::relaycell::msg::Begin,
68};
69
70use futures::channel::mpsc;
71use futures::StreamExt;
72use futures::{select_biased, FutureExt as _, SinkExt as _, Stream};
73use oneshot_fused_workaround as oneshot;
74
75use std::result::Result as StdResult;
76use std::sync::{Arc, Mutex};
77use std::task::Poll;
78
79use crate::channel::{Channel, ChannelSender};
80use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey};
81use crate::crypto::handshake::{ClientHandshake, KeyGenerator};
82use crate::tunnel::circuit::path;
83use crate::tunnel::circuit::{StreamMpscReceiver, StreamMpscSender};
84use derive_deftly::Deftly;
85use derive_more::From;
86use safelog::sensitive as sv;
87use tor_async_utils::{SinkPrepareExt as _, SinkTrySend as _, SinkTrySendError as _};
88use tor_cell::chancell::{AnyChanCell, CircId};
89use tor_cell::chancell::{BoxedCellBody, ChanMsg};
90use tor_linkspec::RelayIds;
91use tor_llcrypto::pk;
92use tor_memquota::derive_deftly_template_HasMemoryCost;
93use tor_memquota::mq_queue::{self, ChannelSpec as _, MpscSpec};
94use tracing::{debug, trace, warn};
95
96use create::{Create2Wrap, CreateFastWrap, CreateHandshakeWrap};
97use extender::HandshakeAuxDataHandler;
98
99pub(super) use control::CtrlCmd;
100pub(super) use control::CtrlMsg;
101
102/// Initial value for outbound flow-control window on streams.
103pub(super) const SEND_WINDOW_INIT: u16 = 500;
104/// Initial value for inbound flow-control window on streams.
105pub(super) const RECV_WINDOW_INIT: u16 = 500;
106/// Size of the buffer used between the reactor and a `StreamReader`.
107///
108/// FIXME(eta): We pick 2× the receive window, which is very conservative (we arguably shouldn't
109/// get sent more than the receive window anyway!). We might do due to things that
110/// don't count towards the window though.
111pub(super) const STREAM_READER_BUFFER: usize = (2 * RECV_WINDOW_INIT) as usize;
112
113/// The type of a oneshot channel used to inform reactor users of the result of an operation.
114pub(super) type ReactorResultChannel<T> = oneshot::Sender<Result<T>>;
115
116/// MPSC queue containing stream requests
117#[cfg(feature = "hs-service")]
118type StreamReqSender = mq_queue::Sender<StreamReqInfo, MpscSpec>;
119
120/// A handshake type, to be used when creating circuit hops.
121#[derive(Clone, Debug)]
122pub(crate) enum CircuitHandshake {
123 /// Use the CREATE_FAST handshake.
124 CreateFast,
125 /// Use the ntor handshake.
126 Ntor {
127 /// The public key of the relay.
128 public_key: NtorPublicKey,
129 /// The Ed25519 identity of the relay, which is verified against the
130 /// identity held in the circuit's channel.
131 ed_identity: pk::ed25519::Ed25519Identity,
132 },
133 /// Use the ntor-v3 handshake.
134 #[cfg(feature = "ntor_v3")]
135 NtorV3 {
136 /// The public key of the relay.
137 public_key: NtorV3PublicKey,
138 },
139}
140
141/// A behavior to perform when closing a stream.
142///
143/// We don't use `Option<End>`` here, since the behavior of `SendNothing` is so surprising
144/// that we shouldn't let it pass unremarked.
145#[derive(Clone, Debug)]
146pub(crate) enum CloseStreamBehavior {
147 /// Send nothing at all, so that the other side will not realize we have
148 /// closed the stream.
149 ///
150 /// We should only do this for incoming onion service streams when we
151 /// want to black-hole the client's requests.
152 SendNothing,
153 /// Send an End cell, if we haven't already sent one.
154 SendEnd(End),
155}
156impl Default for CloseStreamBehavior {
157 fn default() -> Self {
158 Self::SendEnd(End::new_misc())
159 }
160}
161
162/// Represents the reactor's view of a single hop.
163pub(super) struct CircHop {
164 /// Reactor unique ID. Used for logging.
165 unique_id: UniqId,
166 /// Hop number in the path.
167 hop_num: HopNum,
168 /// Map from stream IDs to streams.
169 ///
170 /// We store this with the reactor instead of the circuit, since the
171 /// reactor needs it for every incoming cell on a stream, whereas
172 /// the circuit only needs it when allocating new streams.
173 ///
174 /// NOTE: this is behind a mutex because the reactor polls the `StreamMap`s
175 /// of all hops concurrently, in a [`FuturesUnordered`]. Without the mutex,
176 /// this wouldn't be possible, because it would mean holding multiple
177 /// mutable references to `self` (the reactor). Note, however,
178 /// that there should never be any contention on this mutex:
179 /// we never create more than one [`Circuit::ready_streams_iterator`] stream
180 /// at a time, and we never clone/lock the hop's `StreamMap` outside of
181 /// [`Circuit::ready_streams_iterator`].
182 ///
183 // TODO: encapsulate the Vec<CircHop> into a separate CircHops structure,
184 // and hide its internals from the Reactor. The CircHops implementation
185 // should enforce the invariant described in the note above.
186 map: Arc<Mutex<streammap::StreamMap>>,
187 /// Congestion control object.
188 ///
189 /// This object is also in charge of handling circuit level SENDME logic for this hop.
190 ccontrol: CongestionControl,
191 /// Decodes relay cells received from this hop.
192 inbound: RelayCellDecoder,
193}
194
195/// One or more [`RunOnceCmdInner`] to run inside [`Reactor::run_once`].
196#[derive(From, Debug)]
197enum RunOnceCmd {
198 /// Run a single `RunOnceCmdInner` command.
199 Single(RunOnceCmdInner),
200 /// Run multiple `RunOnceCmdInner` commands.
201 //
202 // Note: this whole enum *could* be replaced with Vec<RunOnceCmdInner>,
203 // but most of the time we're only going to have *one* RunOnceCmdInner
204 // to run per run_once() loop. The enum enables us avoid the extra heap
205 // allocation for the `RunOnceCmd::Single` case.
206 Multiple(Vec<RunOnceCmdInner>),
207}
208
209/// Instructions for running something in the reactor loop.
210///
211/// Run at the end of [`Reactor::run_once`].
212//
213// TODO: many of the variants of this enum have an identical CtrlMsg counterpart.
214// We should consider making each variant a tuple variant and deduplicating the fields.
215#[derive(educe::Educe)]
216#[educe(Debug)]
217enum RunOnceCmdInner {
218 /// Send a RELAY cell.
219 Send {
220 /// The cell to send.
221 cell: SendRelayCell,
222 /// A channel for sending completion notifications.
223 done: Option<ReactorResultChannel<()>>,
224 },
225 /// Send a given control message on this circuit, and install a control-message handler to
226 /// receive responses.
227 #[cfg(feature = "send-control-msg")]
228 SendMsgAndInstallHandler {
229 /// The message to send, if any
230 msg: Option<AnyRelayMsgOuter>,
231 /// A message handler to install.
232 ///
233 /// If this is `None`, there must already be a message handler installed
234 #[educe(Debug(ignore))]
235 handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
236 /// A sender that we use to tell the caller that the message was sent
237 /// and the handler installed.
238 done: oneshot::Sender<Result<()>>,
239 },
240 /// Handle a SENDME message.
241 HandleSendMe {
242 /// The hop number.
243 hop: HopNum,
244 /// The SENDME message to handle.
245 sendme: Sendme,
246 },
247 /// Begin a stream with the provided hop in this circuit.
248 ///
249 /// Uses the provided stream ID, and sends the provided message to that hop.
250 BeginStream {
251 /// The cell to send.
252 cell: Result<(SendRelayCell, StreamId)>,
253 /// Oneshot channel to notify on completion, with the allocated stream ID.
254 done: ReactorResultChannel<StreamId>,
255 },
256 /// Close the specified stream.
257 CloseStream {
258 /// The hop number.
259 hop_num: HopNum,
260 /// The ID of the stream to close.
261 sid: StreamId,
262 /// The stream-closing behavior.
263 behav: CloseStreamBehavior,
264 /// The reason for closing the stream.
265 reason: streammap::TerminateReason,
266 /// A channel for sending completion notifications.
267 done: Option<ReactorResultChannel<()>>,
268 },
269 /// Get the clock skew claimed by the first hop of the circuit.
270 FirstHopClockSkew {
271 /// Oneshot channel to return the clock skew.
272 answer: oneshot::Sender<StdResult<ClockSkew, Bug>>,
273 },
274 /// Perform a clean shutdown on this circuit.
275 CleanShutdown,
276}
277
278// Cmd for sending a relay cell.
279//
280// The contents of this struct are passed to `send_relay_cell`
281#[derive(educe::Educe)]
282#[educe(Debug)]
283pub(crate) struct SendRelayCell {
284 /// The hop number.
285 pub(crate) hop: HopNum,
286 /// Whether to use a RELAY_EARLY cell.
287 pub(crate) early: bool,
288 /// The cell to send.
289 pub(crate) cell: AnyRelayMsgOuter,
290}
291
292/// A [`RunOnceCmdInner`] command to execute at the end of [`Reactor::run_once`].
293#[derive(From, Debug)]
294enum SelectResult {
295 /// Run a single `RunOnceCmdInner` command.
296 Single(RunOnceCmdInner),
297 /// Handle a control message
298 HandleControl(CtrlMsg),
299 /// Handle an input message.
300 HandleCell(ClientCircChanMsg),
301}
302
303impl CircHop {
304 /// Create a new hop.
305 pub(super) fn new(
306 unique_id: UniqId,
307 hop_num: HopNum,
308 format: RelayCellFormat,
309 params: &CircParameters,
310 ) -> Self {
311 CircHop {
312 unique_id,
313 hop_num,
314 map: Arc::new(Mutex::new(streammap::StreamMap::new())),
315 ccontrol: CongestionControl::new(¶ms.ccontrol),
316 inbound: RelayCellDecoder::new(format),
317 }
318 }
319
320 /// Start a stream. Creates an entry in the stream map with the given channels, and sends the
321 /// `message` to the provided hop.
322 fn begin_stream(
323 &mut self,
324 message: AnyRelayMsg,
325 sender: StreamMpscSender<UnparsedRelayMsg>,
326 rx: StreamMpscReceiver<AnyRelayMsg>,
327 cmd_checker: AnyCmdChecker,
328 ) -> Result<(SendRelayCell, StreamId)> {
329 let send_window = sendme::StreamSendWindow::new(SEND_WINDOW_INIT);
330 let r = self.map.lock().expect("lock poisoned").add_ent(
331 sender,
332 rx,
333 send_window,
334 cmd_checker,
335 )?;
336 let cell = AnyRelayMsgOuter::new(Some(r), message);
337 Ok((
338 SendRelayCell {
339 hop: self.hop_num,
340 early: false,
341 cell,
342 },
343 r,
344 ))
345 }
346
347 /// Close the stream associated with `id` because the stream was
348 /// dropped.
349 ///
350 /// If we have not already received an END cell on this stream, send one.
351 /// If no END cell is specified, an END cell with the reason byte set to
352 /// REASON_MISC will be sent.
353 fn close_stream(
354 &mut self,
355 id: StreamId,
356 message: CloseStreamBehavior,
357 why: streammap::TerminateReason,
358 ) -> Result<Option<SendRelayCell>> {
359 let should_send_end = self.map.lock().expect("lock poisoned").terminate(id, why)?;
360 trace!(
361 "{}: Ending stream {}; should_send_end={:?}",
362 self.unique_id,
363 id,
364 should_send_end
365 );
366 // TODO: I am about 80% sure that we only send an END cell if
367 // we didn't already get an END cell. But I should double-check!
368 if let (ShouldSendEnd::Send, CloseStreamBehavior::SendEnd(end_message)) =
369 (should_send_end, message)
370 {
371 let end_cell = AnyRelayMsgOuter::new(Some(id), end_message.into());
372 let cell = SendRelayCell {
373 hop: self.hop_num,
374 early: false,
375 cell: end_cell,
376 };
377
378 return Ok(Some(cell));
379 }
380 Ok(None)
381 }
382}
383
384/// An object that's waiting for a meta cell (one not associated with a stream) in order to make
385/// progress.
386///
387/// # Background
388///
389/// The `Reactor` can't have async functions that send and receive cells, because its job is to
390/// send and receive cells: if one of its functions tried to do that, it would just hang forever.
391///
392/// To get around this problem, the reactor can send some cells, and then make one of these
393/// `MetaCellHandler` objects, which will be run when the reply arrives.
394pub(crate) trait MetaCellHandler: Send {
395 /// The hop we're expecting the message to come from. This is compared against the hop
396 /// from which we actually receive messages, and an error is thrown if the two don't match.
397 fn expected_hop(&self) -> HopNum;
398 /// Called when the message we were waiting for arrives.
399 ///
400 /// Gets a copy of the `Reactor` in order to do anything it likes there.
401 ///
402 /// If this function returns an error, the reactor will shut down.
403 fn handle_msg(
404 &mut self,
405 msg: UnparsedRelayMsg,
406 reactor: &mut Circuit,
407 ) -> Result<MetaCellDisposition>;
408}
409
410/// A possible successful outcome of giving a message to a [`MsgHandler`](super::msghandler::MsgHandler).
411#[derive(Debug, Clone)]
412#[cfg_attr(feature = "send-control-msg", visibility::make(pub))]
413#[non_exhaustive]
414pub(super) enum MetaCellDisposition {
415 /// The message was consumed; the handler should remain installed.
416 #[cfg(feature = "send-control-msg")]
417 Consumed,
418 /// The message was consumed; the handler should be uninstalled.
419 ConversationFinished,
420 /// The message was consumed; the circuit should be closed.
421 #[cfg(feature = "send-control-msg")]
422 CloseCirc,
423 // TODO: Eventually we might want the ability to have multiple handlers
424 // installed, and to let them say "not for me, maybe for somebody else?".
425 // But right now we don't need that.
426}
427
428/// A unique identifier for a circuit leg.
429///
430/// After the circuit is torn down, its `LegId` becomes invalid.
431/// The same `LegId` won't be reused for a future circuit.
432//
433// TODO(#1857): make this pub
434#[allow(unused)]
435#[derive(Copy, Clone, Debug, Eq, PartialEq)]
436pub(crate) struct LegId(pub(crate) LegIdKey);
437
438slotmap_careful::new_key_type! {
439 /// A key type for the circuit leg slotmap
440 ///
441 /// See [`LegId`].
442 pub(crate) struct LegIdKey;
443}
444
445/// Unwrap the specified [`Option`], returning a [`ReactorError::Shutdown`] if it is `None`.
446///
447/// This is a macro instead of a function to work around borrowck errors
448/// in the select! from run_once().
449macro_rules! unwrap_or_shutdown {
450 ($self:expr, $res:expr, $reason:expr) => {{
451 match $res {
452 None => {
453 trace!("{}: reactor shutdown due to {}", $self.unique_id, $reason);
454 Err(ReactorError::Shutdown)
455 }
456 Some(v) => Ok(v),
457 }
458 }};
459}
460
461/// Object to handle incoming cells and background tasks on a circuit
462///
463/// This type is returned when you finish a circuit; you need to spawn a
464/// new task that calls `run()` on it.
465#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
466pub struct Reactor {
467 /// Receiver for control messages for this reactor, sent by `ClientCirc` objects.
468 ///
469 /// This channel is polled in [`Reactor::run_once`], but only if the `chan_sender` sink
470 /// is ready to accept cells.
471 control: mpsc::UnboundedReceiver<CtrlMsg>,
472 /// Receiver for command messages for this reactor, sent by `ClientCirc` objects.
473 ///
474 /// This channel is polled in [`Reactor::run_once`].
475 ///
476 /// NOTE: this is a separate channel from `control`, because some messages
477 /// have higher priority and need to be handled even if the `chan_sender` is not
478 /// ready (whereas `control` messages are not read until the `chan_sender` sink
479 /// is ready to accept cells).
480 command: mpsc::UnboundedReceiver<CtrlCmd>,
481 /// A oneshot sender that is used to alert other tasks when this reactor is
482 /// finally dropped.
483 ///
484 /// It is a sender for Void because we never actually want to send anything here;
485 /// we only want to generate canceled events.
486 #[allow(dead_code)] // the only purpose of this field is to be dropped.
487 reactor_closed_tx: oneshot::Sender<void::Void>,
488 /// A set of circuits that form a tunnel.
489 ///
490 /// Contains 1 or more circuits.
491 ///
492 /// Circuits may be added to this set throughout the lifetime of the reactor.
493 //
494 // TODO(conflux): add a control command for adding a circuit leg,
495 // and update these docs to explain how legs are added
496 ///
497 /// Sometimes, the reactor will remove circuits from this set,
498 /// for example if the `LINKED` message takes too long to arrive,
499 /// or if congestion control negotiation fails.
500 /// The reactor will continue running with the remaining circuits.
501 /// It will shut down if *all* the circuits are removed.
502 ///
503 // TODO(conflux): document all the reasons why the reactor might
504 // chose to tear down a circuit or tunnel (timeouts, protocol violations, etc.)
505 circuits: ConfluxSet,
506 /// An identifier for logging about this reactor's circuit.
507 unique_id: UniqId,
508 /// Handlers, shared with `Circuit`.
509 cell_handlers: CellHandlers,
510}
511
512/// Cell handlers, shared between the Reactor and its underlying `Circuit`s.
513struct CellHandlers {
514 /// A handler for a meta cell, together with a result channel to notify on completion.
515 meta_handler: Option<Box<dyn MetaCellHandler + Send>>,
516 /// A handler for incoming stream requests.
517 #[cfg(feature = "hs-service")]
518 incoming_stream_req_handler: Option<IncomingStreamRequestHandler>,
519}
520
521/// A circuit "leg" from a tunnel.
522///
523/// Regular (non-multipath) circuits have a single leg.
524/// Conflux (multipath) circuits have `N` (usually, `N = 2`).
525pub(crate) struct Circuit {
526 /// The channel this circuit is attached to.
527 channel: Arc<Channel>,
528 /// Sender object used to actually send cells.
529 ///
530 /// NOTE: Control messages could potentially add unboundedly to this, although that's
531 /// not likely to happen (and isn't triggereable from the network, either).
532 chan_sender: SometimesUnboundedSink<AnyChanCell, ChannelSender>,
533 /// Input stream, on which we receive ChanMsg objects from this circuit's
534 /// channel.
535 // TODO: could use a SPSC channel here instead.
536 input: CircuitRxReceiver,
537 /// The cryptographic state for this circuit for inbound cells.
538 /// This object is divided into multiple layers, each of which is
539 /// shared with one hop of the circuit.
540 crypto_in: InboundClientCrypt,
541 /// The cryptographic state for this circuit for outbound cells.
542 crypto_out: OutboundClientCrypt,
543 /// List of hops state objects used by the reactor
544 hops: Vec<CircHop>,
545 /// Mutable information about this circuit, shared with
546 /// [`ClientCirc`](super::ClientCirc).
547 ///
548 // TODO(conflux)/TODO(#1840): this belongs in the Reactor
549 mutable: Arc<Mutex<MutableState>>,
550 /// This circuit's identifier on the upstream channel.
551 channel_id: CircId,
552 /// An identifier for logging about this reactor's circuit.
553 unique_id: UniqId,
554 /// Memory quota account
555 #[allow(dead_code)] // Partly here to keep it alive as long as the circuit
556 memquota: CircuitAccount,
557}
558
559/// Information about an incoming stream request.
560#[cfg(feature = "hs-service")]
561#[derive(Debug, Deftly)]
562#[derive_deftly(HasMemoryCost)]
563pub(crate) struct StreamReqInfo {
564 /// The [`IncomingStreamRequest`].
565 pub(crate) req: IncomingStreamRequest,
566 /// The ID of the stream being requested.
567 pub(crate) stream_id: StreamId,
568 /// The [`HopNum`].
569 //
570 // TODO: When we add support for exit relays, we need to turn this into an Option<HopNum>.
571 // (For outbound messages (towards relays), there is only one hop that can send them: the client.)
572 //
573 // TODO: For onion services, we might be able to enforce the HopNum earlier: we would never accept an
574 // incoming stream request from two separate hops. (There is only one that's valid.)
575 pub(crate) hop_num: HopNum,
576 /// A channel for receiving messages from this stream.
577 #[deftly(has_memory_cost(indirect_size = "0"))] // estimate
578 pub(crate) receiver: StreamMpscReceiver<UnparsedRelayMsg>,
579 /// A channel for sending messages to be sent on this stream.
580 #[deftly(has_memory_cost(indirect_size = "size_of::<AnyRelayMsg>()"))] // estimate
581 pub(crate) msg_tx: StreamMpscSender<AnyRelayMsg>,
582 /// The memory quota account to be used for this stream
583 #[deftly(has_memory_cost(indirect_size = "0"))] // estimate (it contains an Arc)
584 pub(crate) memquota: StreamAccount,
585}
586
587/// Data required for handling an incoming stream request.
588#[cfg(feature = "hs-service")]
589#[derive(educe::Educe)]
590#[educe(Debug)]
591struct IncomingStreamRequestHandler {
592 /// A sender for sharing information about an incoming stream request.
593 incoming_sender: StreamReqSender,
594 /// A [`AnyCmdChecker`] for validating incoming stream requests.
595 cmd_checker: AnyCmdChecker,
596 /// The hop to expect incoming stream requests from.
597 hop_num: HopNum,
598 /// An [`IncomingStreamRequestFilter`] for checking whether the user wants
599 /// this request, or wants to reject it immediately.
600 #[educe(Debug(ignore))]
601 filter: Box<dyn IncomingStreamRequestFilter>,
602}
603
604impl Reactor {
605 /// Create a new circuit reactor.
606 ///
607 /// The reactor will send outbound messages on `channel`, receive incoming
608 /// messages on `input`, and identify this circuit by the channel-local
609 /// [`CircId`] provided.
610 ///
611 /// The internal unique identifier for this circuit will be `unique_id`.
612 #[allow(clippy::type_complexity)] // TODO
613 pub(super) fn new(
614 channel: Arc<Channel>,
615 channel_id: CircId,
616 unique_id: UniqId,
617 input: CircuitRxReceiver,
618 memquota: CircuitAccount,
619 ) -> (
620 Self,
621 mpsc::UnboundedSender<CtrlMsg>,
622 mpsc::UnboundedSender<CtrlCmd>,
623 oneshot::Receiver<void::Void>,
624 Arc<Mutex<MutableState>>,
625 ) {
626 let crypto_out = OutboundClientCrypt::new();
627 let (control_tx, control_rx) = mpsc::unbounded();
628 let (command_tx, command_rx) = mpsc::unbounded();
629 let mutable = Arc::new(Mutex::new(MutableState::default()));
630
631 let (reactor_closed_tx, reactor_closed_rx) = oneshot::channel();
632
633 let chan_sender = SometimesUnboundedSink::new(channel.sender());
634
635 let cell_handlers = CellHandlers {
636 meta_handler: None,
637 #[cfg(feature = "hs-service")]
638 incoming_stream_req_handler: None,
639 };
640
641 let circuit_leg = Circuit {
642 channel,
643 chan_sender,
644 input,
645 crypto_in: InboundClientCrypt::new(),
646 hops: vec![],
647 unique_id,
648 channel_id,
649 crypto_out,
650 mutable: mutable.clone(),
651 memquota,
652 };
653
654 let reactor = Reactor {
655 circuits: ConfluxSet::new(circuit_leg),
656 control: control_rx,
657 command: command_rx,
658 reactor_closed_tx,
659 unique_id,
660 cell_handlers,
661 };
662
663 (reactor, control_tx, command_tx, reactor_closed_rx, mutable)
664 }
665
666 /// Launch the reactor, and run until the circuit closes or we
667 /// encounter an error.
668 ///
669 /// Once this method returns, the circuit is dead and cannot be
670 /// used again.
671 pub async fn run(mut self) -> Result<()> {
672 trace!("{}: Running circuit reactor", self.unique_id);
673 let result: Result<()> = loop {
674 match self.run_once().await {
675 Ok(()) => (),
676 Err(ReactorError::Shutdown) => break Ok(()),
677 Err(ReactorError::Err(e)) => break Err(e),
678 }
679 };
680 trace!("{}: Circuit reactor stopped: {:?}", self.unique_id, result);
681 result
682 }
683
684 /// Helper for run: doesn't mark the circuit closed on finish. Only
685 /// processes one cell or control message.
686 async fn run_once(&mut self) -> StdResult<(), ReactorError> {
687 // If all the circuits are closed, shut down the reactor
688 //
689 // TODO(conflux): we might need to rethink this behavior
690 if self.circuits.is_empty() {
691 trace!(
692 "{}: Circuit reactor shutting down: all circuits have closed",
693 self.unique_id
694 );
695
696 return Err(ReactorError::Shutdown);
697 }
698
699 // If this is a single path circuit, we need to wait until the first hop
700 // is created before doing anything else
701 if self
702 .circuits
703 .single_leg_mut()
704 .is_ok_and(|c| c.hops.is_empty())
705 {
706 self.wait_for_create().await?;
707
708 return Ok(());
709 }
710
711 // TODO(conflux): support adding and linking circuits
712 // TODO(conflux): support switching the primary leg
713
714 // TODO(conflux): read from *all* the circuits, not just the primary
715 //
716 // Note: this is a big TODO, and will likely involve factoring out the
717 // chan_sender.prepare_send_from() call into a function on Circuit.
718 // Each Circuit will have its own control channel, for handling control
719 // messages meant for it (I imagine some/all CtrlMsgs will have a LegId
720 // field, and that the reactor will redirect the CtrlMsg to the appropriate
721 // Circuit's control channel?). Putting the control channel (which will
722 // probably receive a CtrlMsgInner type) inside the Circuit should enable
723 // us to lift the prepare_send_from() into a Circuit function, that will
724 // get called from ConfluxSet::poll_all_name_tbd() that can be used in
725 // this select_biased! to select between the channel readiness of *all*
726 // underlying circuits.
727 let primary_leg = self.circuits.primary_leg_mut()?;
728 let mut ready_streams = primary_leg.ready_streams_iterator();
729
730 // Note: We don't actually use the returned SinkSendable,
731 // and continue writing to the SometimesUboundedSink :(
732 let (cmd, _sendable) = select_biased! {
733 res = self.command.next() => {
734 let cmd = unwrap_or_shutdown!(self, res, "command channel drop")?;
735 return ControlHandler::new(self).handle_cmd(cmd);
736 },
737 res = primary_leg.chan_sender
738 .prepare_send_from(async {
739 select_biased! {
740 // Check whether we've got a control message pending.
741 ret = self.control.next() => {
742 let msg = unwrap_or_shutdown!(self, ret, "control drop")?;
743 Ok::<_, ReactorError>(Some(SelectResult::HandleControl(msg)))
744 },
745 // Check whether we've got an input message pending.
746 ret = primary_leg.input.next().fuse() => {
747 let cell = unwrap_or_shutdown!(self, ret, "input drop")?;
748 Ok(Some(SelectResult::HandleCell(cell)))
749 },
750 ret = ready_streams.next().fuse() => {
751 match ret {
752 Some(cmd) => {
753 let cmd = cmd?;
754 Ok(Some(SelectResult::Single(cmd)))
755 },
756 None => {
757 // There are no ready streams (for example, they may all be
758 // blocked due to congestion control), so there is nothing
759 // to do.
760 Ok(None)
761 }
762 }
763 }
764 }
765 }) => res?,
766 };
767 let cmd = cmd?;
768
769 let cmd = match cmd {
770 None => None,
771 Some(SelectResult::Single(cmd)) => Some(RunOnceCmd::Single(cmd)),
772 Some(SelectResult::HandleControl(ctrl)) => ControlHandler::new(self)
773 .handle_msg(ctrl)?
774 .map(RunOnceCmd::Single),
775 Some(SelectResult::HandleCell(cell)) => {
776 // TODO(conflux): put the LegId of the circuit the cell was received on
777 // inside HandleCell
778 //let circ = self.circuits.leg(leg_id)?;
779
780 let circ = self.circuits.primary_leg_mut()?;
781 circ.handle_cell(&mut self.cell_handlers, cell)?
782 }
783 };
784
785 if let Some(cmd) = cmd {
786 self.handle_run_once_cmd(cmd).await?;
787 }
788
789 Ok(())
790 }
791
792 /// Handle a [`RunOnceCmd`].
793 async fn handle_run_once_cmd(&mut self, cmd: RunOnceCmd) -> StdResult<(), ReactorError> {
794 match cmd {
795 RunOnceCmd::Single(cmd) => return self.handle_single_run_once_cmd(cmd).await,
796 RunOnceCmd::Multiple(cmds) => {
797 // While we know `sendable` is ready to accept *one* cell,
798 // we can't be certain it will be able to accept *all* of the cells
799 // that need to be sent here. This means we *may* end up buffering
800 // in its underlying SometimesUnboundedSink! That is OK, because
801 // RunOnceCmd::Multiple is only used for handling packed cells.
802 for cmd in cmds {
803 self.handle_single_run_once_cmd(cmd).await?;
804 }
805 }
806 }
807
808 Ok(())
809 }
810
811 /// Handle a [`RunOnceCmd`].
812 async fn handle_single_run_once_cmd(
813 &mut self,
814 cmd: RunOnceCmdInner,
815 ) -> StdResult<(), ReactorError> {
816 match cmd {
817 RunOnceCmdInner::Send { cell, done } => {
818 // TODO: check the cc window
819
820 // TODO(conflux): let the RunOnceCmdInner specify which leg to send the cell on
821 let res = self.circuits.primary_leg_mut()?.send_relay_cell(cell).await;
822 if let Some(done) = done {
823 // Don't care if the receiver goes away
824 let _ = done.send(res.clone());
825 }
826 res?;
827 }
828 #[cfg(feature = "send-control-msg")]
829 RunOnceCmdInner::SendMsgAndInstallHandler { msg, handler, done } => {
830 let cell: Result<Option<SendRelayCell>> =
831 self.prepare_msg_and_install_handler(msg, handler);
832
833 match cell {
834 Ok(Some(cell)) => {
835 // TODO(conflux): let the RunOnceCmdInner specify which leg to send the cell on
836 let outcome = self.circuits.primary_leg_mut()?.send_relay_cell(cell).await;
837 // don't care if receiver goes away.
838 let _ = done.send(outcome.clone());
839 outcome?;
840 }
841 Ok(None) => {
842 // don't care if receiver goes away.
843 let _ = done.send(Ok(()));
844 }
845 Err(e) => {
846 // don't care if receiver goes away.
847 let _ = done.send(Err(e.clone()));
848 return Err(e.into());
849 }
850 }
851 }
852 // TODO(conflux)/TODO(#1857): should this take a leg_id argument?
853 // Currently, we always begin streams on the primary leg
854 RunOnceCmdInner::BeginStream { cell, done } => {
855 match cell {
856 Ok((cell, stream_id)) => {
857 // TODO(conflux): let the RunOnceCmdInner specify which leg to send the cell on
858 // (currently it is an error to use BeginStream on a multipath tunnel)
859 let outcome = self.circuits.single_leg_mut()?.send_relay_cell(cell).await;
860 // don't care if receiver goes away.
861 let _ = done.send(outcome.clone().map(|_| stream_id));
862 outcome?;
863 }
864 Err(e) => {
865 // don't care if receiver goes away.
866 let _ = done.send(Err(e.clone()));
867 return Err(e.into());
868 }
869 }
870 }
871 RunOnceCmdInner::CloseStream {
872 hop_num,
873 sid,
874 behav,
875 reason,
876 done,
877 } => {
878 // TODO(conflux): currently, it is an error to use CloseStream
879 // with a multi-path circuit.
880 let leg = self.circuits.single_leg_mut()?;
881 let res: Result<()> = leg.close_stream(hop_num, sid, behav, reason).await;
882
883 if let Some(done) = done {
884 // don't care if the sender goes away
885 let _ = done.send(res);
886 }
887 }
888 RunOnceCmdInner::HandleSendMe { hop, sendme } => {
889 // TODO(conflux): this should specify which leg of the circuit the SENDME
890 // came on
891 let leg = self.circuits.single_leg_mut()?;
892 // NOTE: it's okay to await. We are only awaiting on the congestion_signals
893 // future which *should* resolve immediately
894 let signals = leg.congestion_signals().await;
895 leg.handle_sendme(hop, sendme, signals)?;
896 }
897 RunOnceCmdInner::FirstHopClockSkew { answer } => {
898 let res = self
899 .circuits
900 .single_leg_mut()
901 .map(|leg| leg.channel.clock_skew());
902
903 // don't care if the sender goes away
904 let _ = answer.send(res);
905 }
906 RunOnceCmdInner::CleanShutdown => {
907 trace!("{}: reactor shutdown due to handled cell", self.unique_id);
908 return Err(ReactorError::Shutdown);
909 }
910 }
911
912 Ok(())
913 }
914
915 /// Wait for a [`CtrlMsg::Create`] to come along to set up the circuit.
916 ///
917 /// Returns an error if an unexpected `CtrlMsg` is received.
918 async fn wait_for_create(&mut self) -> StdResult<(), ReactorError> {
919 let msg = select_biased! {
920 res = self.command.next() => {
921 let cmd = unwrap_or_shutdown!(self, res, "shutdown channel drop")?;
922 match cmd {
923 CtrlCmd::Shutdown => return self.handle_shutdown().map(|_| ()),
924 #[cfg(test)]
925 CtrlCmd::AddFakeHop {
926 relay_cell_format: format,
927 fwd_lasthop,
928 rev_lasthop,
929 params,
930 done,
931 } => {
932 self.circuits.single_leg_mut()?.handle_add_fake_hop(format, fwd_lasthop, rev_lasthop, ¶ms, done);
933 return Ok(())
934 },
935 _ => {
936 trace!("reactor shutdown due to unexpected command: {:?}", cmd);
937 return Err(Error::CircProto(format!("Unexpected control {cmd:?} on client circuit")).into());
938 }
939 }
940 },
941 res = self.control.next() => unwrap_or_shutdown!(self, res, "control drop")?,
942 };
943
944 match msg {
945 CtrlMsg::Create {
946 recv_created,
947 handshake,
948 params,
949 done,
950 } => {
951 // TODO(conflux): instead of crashing the reactor, it might be better
952 // to send the error via the done channel instead
953 let leg = self.circuits.single_leg_mut()?;
954 leg.handle_create(recv_created, handshake, ¶ms, done)
955 .await
956 }
957 _ => {
958 trace!("reactor shutdown due to unexpected cell: {:?}", msg);
959
960 Err(Error::CircProto(format!("Unexpected {msg:?} cell on client circuit")).into())
961 }
962 }
963 }
964
965 /// Prepare a `SendRelayCell` request, and install the given meta-cell handler.
966 fn prepare_msg_and_install_handler(
967 &mut self,
968 msg: Option<AnyRelayMsgOuter>,
969 handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
970 ) -> Result<Option<SendRelayCell>> {
971 let msg = msg
972 .map(|msg| {
973 let handlers = &mut self.cell_handlers;
974 let handler = handler
975 .as_ref()
976 .or(handlers.meta_handler.as_ref())
977 .ok_or_else(|| internal!("tried to use an ended Conversation"))?;
978 Ok::<_, crate::Error>(SendRelayCell {
979 hop: handler.expected_hop(),
980 early: false,
981 cell: msg,
982 })
983 })
984 .transpose()?;
985
986 if let Some(handler) = handler {
987 self.cell_handlers.set_meta_handler(handler)?;
988 }
989
990 Ok(msg)
991 }
992
993 /// Handle a shutdown request.
994 fn handle_shutdown(&self) -> StdResult<Option<RunOnceCmdInner>, ReactorError> {
995 trace!(
996 "{}: reactor shutdown due to explicit request",
997 self.unique_id
998 );
999
1000 Err(ReactorError::Shutdown)
1001 }
1002}
1003
1004impl Circuit {
1005 /// Handle a [`CtrlMsg::AddFakeHop`] message.
1006 #[cfg(test)]
1007 fn handle_add_fake_hop(
1008 &mut self,
1009 format: RelayCellFormat,
1010 fwd_lasthop: bool,
1011 rev_lasthop: bool,
1012 params: &CircParameters,
1013 done: ReactorResultChannel<()>,
1014 ) {
1015 use crate::tunnel::circuit::test::DummyCrypto;
1016
1017 let dummy_peer_id = tor_linkspec::OwnedChanTarget::builder()
1018 .ed_identity([4; 32].into())
1019 .rsa_identity([5; 20].into())
1020 .build()
1021 .expect("Could not construct fake hop");
1022
1023 let fwd = Box::new(DummyCrypto::new(fwd_lasthop));
1024 let rev = Box::new(DummyCrypto::new(rev_lasthop));
1025 let binding = None;
1026 self.add_hop(
1027 format,
1028 path::HopDetail::Relay(dummy_peer_id),
1029 fwd,
1030 rev,
1031 binding,
1032 params,
1033 );
1034 let _ = done.send(Ok(()));
1035 }
1036
1037 /// Encode `msg` and encrypt it, returning the resulting cell
1038 /// and tag that should be expected for an authenticated SENDME sent
1039 /// in response to that cell.
1040 fn encode_relay_cell(
1041 crypto_out: &mut OutboundClientCrypt,
1042 hop: HopNum,
1043 early: bool,
1044 msg: AnyRelayMsgOuter,
1045 ) -> Result<(AnyChanMsg, &[u8; SENDME_TAG_LEN])> {
1046 let mut body: RelayCellBody = msg
1047 .encode(&mut rand::thread_rng())
1048 .map_err(|e| Error::from_cell_enc(e, "relay cell body"))?
1049 .into();
1050 let tag = crypto_out.encrypt(&mut body, hop)?;
1051 let msg = Relay::from(BoxedCellBody::from(body));
1052 let msg = if early {
1053 AnyChanMsg::RelayEarly(msg.into())
1054 } else {
1055 AnyChanMsg::Relay(msg)
1056 };
1057
1058 Ok((msg, tag))
1059 }
1060
1061 /// Encode `msg`, encrypt it, and send it to the 'hop'th hop.
1062 ///
1063 /// If there is insufficient outgoing *circuit-level* or *stream-level*
1064 /// SENDME window, an error is returned instead.
1065 ///
1066 /// Does not check whether the cell is well-formed or reasonable.
1067 async fn send_relay_cell(&mut self, msg: SendRelayCell) -> Result<()> {
1068 let SendRelayCell {
1069 hop,
1070 early,
1071 cell: msg,
1072 } = msg;
1073
1074 let c_t_w = sendme::cmd_counts_towards_windows(msg.cmd());
1075 let stream_id = msg.stream_id();
1076 let hop_num = Into::<usize>::into(hop);
1077 let circhop = &mut self.hops[hop_num];
1078
1079 // We need to apply stream-level flow control *before* encoding the message.
1080 if c_t_w {
1081 if let Some(stream_id) = stream_id {
1082 let mut hop_map = circhop.map.lock().expect("lock poisoned");
1083 let Some(StreamEntMut::Open(ent)) = hop_map.get_mut(stream_id) else {
1084 warn!(
1085 "{}: sending a relay cell for non-existent or non-open stream with ID {}!",
1086 self.unique_id, stream_id
1087 );
1088 return Err(Error::CircProto(format!(
1089 "tried to send a relay cell on non-open stream {}",
1090 sv(stream_id),
1091 )));
1092 };
1093 ent.take_capacity_to_send(msg.msg())?;
1094 }
1095 }
1096 // NOTE(eta): Now that we've encrypted the cell, we *must* either send it or abort
1097 // the whole circuit (e.g. by returning an error).
1098 let (msg, tag) = Self::encode_relay_cell(&mut self.crypto_out, hop, early, msg)?;
1099 // The cell counted for congestion control, inform our algorithm of such and pass down the
1100 // tag for authenticated SENDMEs.
1101 if c_t_w {
1102 circhop.ccontrol.note_data_sent(tag)?;
1103 }
1104
1105 let cell = AnyChanCell::new(Some(self.channel_id), msg);
1106 Pin::new(&mut self.chan_sender).send_unbounded(cell).await?;
1107
1108 Ok(())
1109 }
1110
1111 /// Helper: process a cell on a channel. Most cells get ignored
1112 /// or rejected; a few get delivered to circuits.
1113 ///
1114 /// Return `CellStatus::CleanShutdown` if we should exit.
1115 fn handle_cell(
1116 &mut self,
1117 handlers: &mut CellHandlers,
1118 cell: ClientCircChanMsg,
1119 ) -> Result<Option<RunOnceCmd>> {
1120 trace!("{}: handling cell: {:?}", self.unique_id, cell);
1121 use ClientCircChanMsg::*;
1122 match cell {
1123 Relay(r) => self.handle_relay_cell(handlers, r),
1124 Destroy(d) => {
1125 let reason = d.reason();
1126 debug!(
1127 "{}: Received DESTROY cell. Reason: {} [{}]",
1128 self.unique_id,
1129 reason.human_str(),
1130 reason
1131 );
1132
1133 self.handle_destroy_cell()
1134 .map(|c| Some(RunOnceCmd::Single(c)))
1135 }
1136 }
1137 }
1138
1139 /// Decode `cell`, returning its corresponding hop number, tag,
1140 /// and decoded body.
1141 fn decode_relay_cell(
1142 &mut self,
1143 cell: Relay,
1144 ) -> Result<(HopNum, CircTag, RelayCellDecoderResult)> {
1145 let mut body = cell.into_relay_body().into();
1146
1147 // Decrypt the cell. If it's recognized, then find the
1148 // corresponding hop.
1149 let (hopnum, tag) = self.crypto_in.decrypt(&mut body)?;
1150 // Make a copy of the authentication tag. TODO: I'd rather not
1151 // copy it, but I don't see a way around it right now.
1152 let tag = {
1153 let mut tag_copy = [0_u8; SENDME_TAG_LEN];
1154 // TODO(nickm): This could crash if the tag length changes. We'll
1155 // have to refactor it then.
1156 tag_copy.copy_from_slice(tag);
1157 tag_copy
1158 };
1159
1160 // Decode the cell.
1161 let decode_res = self
1162 .hop_mut(hopnum)
1163 .ok_or_else(|| {
1164 Error::from(internal!(
1165 "Trying to decode cell from nonexistent hop {:?}",
1166 hopnum
1167 ))
1168 })?
1169 .inbound
1170 .decode(body.into())
1171 .map_err(|e| Error::from_bytes_err(e, "relay cell"))?;
1172
1173 Ok((hopnum, tag.into(), decode_res))
1174 }
1175
1176 /// React to a Relay or RelayEarly cell.
1177 fn handle_relay_cell(
1178 &mut self,
1179 handlers: &mut CellHandlers,
1180 cell: Relay,
1181 ) -> Result<Option<RunOnceCmd>> {
1182 let (hopnum, tag, decode_res) = self.decode_relay_cell(cell)?;
1183
1184 let c_t_w = decode_res.cmds().any(sendme::cmd_counts_towards_windows);
1185
1186 // Decrement the circuit sendme windows, and see if we need to
1187 // send a sendme cell.
1188 let send_circ_sendme = if c_t_w {
1189 self.hop_mut(hopnum)
1190 .ok_or_else(|| Error::CircProto("Sendme from nonexistent hop".into()))?
1191 .ccontrol
1192 .note_data_received()?
1193 } else {
1194 false
1195 };
1196
1197 let mut run_once_cmds = vec![];
1198 // If we do need to send a circuit-level SENDME cell, do so.
1199 if send_circ_sendme {
1200 // This always sends a V1 (tagged) sendme cell, and thereby assumes
1201 // that SendmeEmitMinVersion is no more than 1. If the authorities
1202 // every increase that parameter to a higher number, this will
1203 // become incorrect. (Higher numbers are not currently defined.)
1204 let sendme = Sendme::new_tag(tag.into());
1205 let cell = AnyRelayMsgOuter::new(None, sendme.into());
1206 run_once_cmds.push(RunOnceCmdInner::Send {
1207 cell: SendRelayCell {
1208 hop: hopnum,
1209 early: false,
1210 cell,
1211 },
1212 done: None,
1213 });
1214
1215 // Inform congestion control of the SENDME we are sending. This is a circuit level one.
1216 self.hop_mut(hopnum)
1217 .ok_or_else(|| {
1218 Error::from(internal!(
1219 "Trying to send SENDME to nonexistent hop {:?}",
1220 hopnum
1221 ))
1222 })?
1223 .ccontrol
1224 .note_sendme_sent()?;
1225 }
1226
1227 let (mut msgs, incomplete) = decode_res.into_parts();
1228 while let Some(msg) = msgs.next() {
1229 let msg_status = self.handle_relay_msg(handlers, hopnum, c_t_w, msg)?;
1230
1231 match msg_status {
1232 None => continue,
1233 Some(msg @ RunOnceCmdInner::CleanShutdown) => {
1234 for m in msgs {
1235 debug!(
1236 "{id}: Ignoring relay msg received after triggering shutdown: {m:?}",
1237 id = self.unique_id
1238 );
1239 }
1240 if let Some(incomplete) = incomplete {
1241 debug!(
1242 "{id}: Ignoring partial relay msg received after triggering shutdown: {:?}",
1243 incomplete,
1244 id=self.unique_id,
1245 );
1246 }
1247 run_once_cmds.push(msg);
1248 return Ok(Some(RunOnceCmd::Multiple(run_once_cmds)));
1249 }
1250 Some(msg) => {
1251 run_once_cmds.push(msg);
1252 }
1253 }
1254 }
1255
1256 Ok(Some(RunOnceCmd::Multiple(run_once_cmds)))
1257 }
1258
1259 /// Handle a single incoming relay message.
1260 fn handle_relay_msg(
1261 &mut self,
1262 handlers: &mut CellHandlers,
1263 hopnum: HopNum,
1264 cell_counts_toward_windows: bool,
1265 msg: UnparsedRelayMsg,
1266 ) -> Result<Option<RunOnceCmdInner>> {
1267 // If this msg wants/refuses to have a Stream ID, does it
1268 // have/not have one?
1269 let streamid = msg_streamid(&msg)?;
1270
1271 // If this doesn't have a StreamId, it's a meta cell,
1272 // not meant for a particular stream.
1273 let Some(streamid) = streamid else {
1274 return self.handle_meta_cell(handlers, hopnum, msg);
1275 };
1276
1277 let hop = self
1278 .hop_mut(hopnum)
1279 .ok_or_else(|| Error::CircProto("Cell from nonexistent hop!".into()))?;
1280 let mut hop_map = hop.map.lock().expect("lock poisoned");
1281 match hop_map.get_mut(streamid) {
1282 Some(StreamEntMut::Open(ent)) => {
1283 let message_closes_stream =
1284 Self::deliver_msg_to_stream(streamid, ent, cell_counts_toward_windows, msg)?;
1285
1286 if message_closes_stream {
1287 hop_map.ending_msg_received(streamid)?;
1288 }
1289 }
1290 #[cfg(feature = "hs-service")]
1291 Some(StreamEntMut::EndSent(_))
1292 if matches!(
1293 msg.cmd(),
1294 RelayCmd::BEGIN | RelayCmd::BEGIN_DIR | RelayCmd::RESOLVE
1295 ) =>
1296 {
1297 // If the other side is sending us a BEGIN but hasn't yet acknowledged our END
1298 // message, just remove the old stream from the map and stop waiting for a
1299 // response
1300 hop_map.ending_msg_received(streamid)?;
1301 drop(hop_map);
1302 return self.handle_incoming_stream_request(handlers, msg, streamid, hopnum);
1303 }
1304 Some(StreamEntMut::EndSent(EndSentStreamEnt { half_stream, .. })) => {
1305 // We sent an end but maybe the other side hasn't heard.
1306
1307 match half_stream.handle_msg(msg)? {
1308 StreamStatus::Open => {}
1309 StreamStatus::Closed => {
1310 hop_map.ending_msg_received(streamid)?;
1311 }
1312 }
1313 }
1314 #[cfg(feature = "hs-service")]
1315 None if matches!(
1316 msg.cmd(),
1317 RelayCmd::BEGIN | RelayCmd::BEGIN_DIR | RelayCmd::RESOLVE
1318 ) =>
1319 {
1320 drop(hop_map);
1321 return self.handle_incoming_stream_request(handlers, msg, streamid, hopnum);
1322 }
1323 _ => {
1324 // No stream wants this message, or ever did.
1325 return Err(Error::CircProto(
1326 "Cell received on nonexistent stream!?".into(),
1327 ));
1328 }
1329 }
1330 Ok(None)
1331 }
1332
1333 /// Deliver `msg` to the specified open stream entry `ent`.
1334 fn deliver_msg_to_stream(
1335 streamid: StreamId,
1336 ent: &mut OpenStreamEnt,
1337 cell_counts_toward_windows: bool,
1338 msg: UnparsedRelayMsg,
1339 ) -> Result<bool> {
1340 // The stream for this message exists, and is open.
1341
1342 if msg.cmd() == RelayCmd::SENDME {
1343 let _sendme = msg
1344 .decode::<Sendme>()
1345 .map_err(|e| Error::from_bytes_err(e, "Sendme message on stream"))?
1346 .into_msg();
1347 // We need to handle sendmes here, not in the stream's
1348 // recv() method, or else we'd never notice them if the
1349 // stream isn't reading.
1350 ent.put_for_incoming_sendme()?;
1351 return Ok(false);
1352 }
1353
1354 let message_closes_stream = ent.cmd_checker.check_msg(&msg)? == StreamStatus::Closed;
1355
1356 if let Err(e) = Pin::new(&mut ent.sink).try_send(msg) {
1357 if e.is_full() {
1358 // If we get here, we either have a logic bug (!), or an attacker
1359 // is sending us more cells than we asked for via congestion control.
1360 return Err(Error::CircProto(format!(
1361 "Stream sink would block; received too many cells on stream ID {}",
1362 sv(streamid),
1363 )));
1364 }
1365 if e.is_disconnected() && cell_counts_toward_windows {
1366 // the other side of the stream has gone away; remember
1367 // that we received a cell that we couldn't queue for it.
1368 //
1369 // Later this value will be recorded in a half-stream.
1370 ent.dropped += 1;
1371 }
1372 }
1373
1374 Ok(message_closes_stream)
1375 }
1376
1377 /// A helper for handling incoming stream requests.
1378 #[cfg(feature = "hs-service")]
1379 fn handle_incoming_stream_request(
1380 &mut self,
1381 handlers: &mut CellHandlers,
1382 msg: UnparsedRelayMsg,
1383 stream_id: StreamId,
1384 hop_num: HopNum,
1385 ) -> Result<Option<RunOnceCmdInner>> {
1386 use syncview::ClientCircSyncView;
1387 use tor_cell::relaycell::msg::EndReason;
1388 use tor_error::into_internal;
1389 use tor_log_ratelim::log_ratelim;
1390
1391 // We need to construct this early so that we don't double-borrow &mut self
1392
1393 let Some(handler) = handlers.incoming_stream_req_handler.as_mut() else {
1394 return Err(Error::CircProto(
1395 "Cannot handle BEGIN cells on this circuit".into(),
1396 ));
1397 };
1398
1399 if hop_num != handler.hop_num {
1400 return Err(Error::CircProto(format!(
1401 "Expecting incoming streams from {}, but received {} cell from unexpected hop {}",
1402 handler.hop_num.display(),
1403 msg.cmd(),
1404 hop_num.display()
1405 )));
1406 }
1407
1408 let message_closes_stream = handler.cmd_checker.check_msg(&msg)? == StreamStatus::Closed;
1409
1410 // TODO: we've already looked up the `hop` in handle_relay_cell, so we shouldn't
1411 // have to look it up again! However, we can't pass the `&mut hop` reference from
1412 // `handle_relay_cell` to this function, because that makes Rust angry (we'd be
1413 // borrowing self as mutable more than once).
1414 //
1415 // TODO: we _could_ use self.hops.get_mut(..) instead self.hop_mut(..) inside
1416 // handle_relay_cell to work around the problem described above
1417 let hop = self
1418 .hops
1419 .get_mut(Into::<usize>::into(hop_num))
1420 .ok_or(Error::CircuitClosed)?;
1421
1422 if message_closes_stream {
1423 hop.map
1424 .lock()
1425 .expect("lock poisoned")
1426 .ending_msg_received(stream_id)?;
1427
1428 return Ok(None);
1429 }
1430
1431 let begin = msg
1432 .decode::<Begin>()
1433 .map_err(|e| Error::from_bytes_err(e, "Invalid Begin message"))?
1434 .into_msg();
1435
1436 let req = IncomingStreamRequest::Begin(begin);
1437
1438 {
1439 use crate::stream::IncomingStreamRequestDisposition::*;
1440
1441 let ctx = crate::stream::IncomingStreamRequestContext { request: &req };
1442 // IMPORTANT: ClientCircSyncView::n_open_streams() (called via disposition() below)
1443 // accesses the stream map mutexes!
1444 //
1445 // This means it's very important not to call this function while any of the hop's
1446 // stream map mutex is held.
1447 let view = ClientCircSyncView::new(&self.hops);
1448
1449 match handler.filter.as_mut().disposition(&ctx, &view)? {
1450 Accept => {}
1451 CloseCircuit => return Ok(Some(RunOnceCmdInner::CleanShutdown)),
1452 RejectRequest(end) => {
1453 let end_msg = AnyRelayMsgOuter::new(Some(stream_id), end.into());
1454 let cell = SendRelayCell {
1455 hop: hop_num,
1456 early: false,
1457 cell: end_msg,
1458 };
1459 return Ok(Some(RunOnceCmdInner::Send { cell, done: None }));
1460 }
1461 }
1462 }
1463
1464 // TODO: Sadly, we need to look up `&mut hop` yet again,
1465 // since we needed to pass `&self.hops` by reference to our filter above. :(
1466 let hop = self
1467 .hops
1468 .get_mut(Into::<usize>::into(hop_num))
1469 .ok_or(Error::CircuitClosed)?;
1470
1471 let memquota = StreamAccount::new(&self.memquota)?;
1472
1473 let (sender, receiver) = MpscSpec::new(STREAM_READER_BUFFER).new_mq(
1474 self.chan_sender.as_inner().time_provider().clone(),
1475 memquota.as_raw_account(),
1476 )?;
1477 let (msg_tx, msg_rx) = MpscSpec::new(super::CIRCUIT_BUFFER_SIZE).new_mq(
1478 self.chan_sender.as_inner().time_provider().clone(),
1479 memquota.as_raw_account(),
1480 )?;
1481
1482 let send_window = sendme::StreamSendWindow::new(SEND_WINDOW_INIT);
1483 let cmd_checker = DataCmdChecker::new_connected();
1484 hop.map.lock().expect("lock poisoned").add_ent_with_id(
1485 sender,
1486 msg_rx,
1487 send_window,
1488 stream_id,
1489 cmd_checker,
1490 )?;
1491
1492 let outcome = Pin::new(&mut handler.incoming_sender).try_send(StreamReqInfo {
1493 req,
1494 stream_id,
1495 hop_num,
1496 msg_tx,
1497 receiver,
1498 memquota,
1499 });
1500
1501 log_ratelim!("Delivering message to incoming stream handler"; outcome);
1502
1503 if let Err(e) = outcome {
1504 if e.is_full() {
1505 // The IncomingStreamRequestHandler's stream is full; it isn't
1506 // handling requests fast enough. So instead, we reply with an
1507 // END cell.
1508 let end_msg = AnyRelayMsgOuter::new(
1509 Some(stream_id),
1510 End::new_with_reason(EndReason::RESOURCELIMIT).into(),
1511 );
1512
1513 let cell = SendRelayCell {
1514 hop: hop_num,
1515 early: false,
1516 cell: end_msg,
1517 };
1518 return Ok(Some(RunOnceCmdInner::Send { cell, done: None }));
1519 } else if e.is_disconnected() {
1520 // The IncomingStreamRequestHandler's stream has been dropped.
1521 // In the Tor protocol as it stands, this always means that the
1522 // circuit itself is out-of-use and should be closed. (See notes
1523 // on `allow_stream_requests.`)
1524 //
1525 // Note that we will _not_ reach this point immediately after
1526 // the IncomingStreamRequestHandler is dropped; we won't hit it
1527 // until we next get an incoming request. Thus, if we do later
1528 // want to add early detection for a dropped
1529 // IncomingStreamRequestHandler, we need to do it elsewhere, in
1530 // a different way.
1531 debug!(
1532 "{}: Incoming stream request receiver dropped",
1533 self.unique_id
1534 );
1535 // This will _cause_ the circuit to get closed.
1536 return Err(Error::CircuitClosed);
1537 } else {
1538 // There are no errors like this with the current design of
1539 // futures::mpsc, but we shouldn't just ignore the possibility
1540 // that they'll be added later.
1541 return Err(Error::from((into_internal!(
1542 "try_send failed unexpectedly"
1543 ))(e)));
1544 }
1545 }
1546
1547 Ok(None)
1548 }
1549
1550 /// Helper: process a destroy cell.
1551 #[allow(clippy::unnecessary_wraps)]
1552 fn handle_destroy_cell(&mut self) -> Result<RunOnceCmdInner> {
1553 // I think there is nothing more to do here.
1554 Ok(RunOnceCmdInner::CleanShutdown)
1555 }
1556
1557 /// Handle a [`CtrlMsg::Create`] message.
1558 async fn handle_create(
1559 &mut self,
1560 recv_created: oneshot::Receiver<CreateResponse>,
1561 handshake: CircuitHandshake,
1562 params: &CircParameters,
1563 done: ReactorResultChannel<()>,
1564 ) -> StdResult<(), ReactorError> {
1565 let ret = match handshake {
1566 CircuitHandshake::CreateFast => self.create_firsthop_fast(recv_created, params).await,
1567 CircuitHandshake::Ntor {
1568 public_key,
1569 ed_identity,
1570 } => {
1571 self.create_firsthop_ntor(recv_created, ed_identity, public_key, params)
1572 .await
1573 }
1574 #[cfg(feature = "ntor_v3")]
1575 CircuitHandshake::NtorV3 { public_key } => {
1576 self.create_firsthop_ntor_v3(recv_created, public_key, params)
1577 .await
1578 }
1579 };
1580 let _ = done.send(ret); // don't care if sender goes away
1581
1582 // TODO: maybe we don't need to flush here?
1583 // (we could let run_once() handle all the flushing)
1584 self.chan_sender.flush().await?;
1585
1586 Ok(())
1587 }
1588
1589 /// Helper: create the first hop of a circuit.
1590 ///
1591 /// This is parameterized not just on the RNG, but a wrapper object to
1592 /// build the right kind of create cell, and a handshake object to perform
1593 /// the cryptographic handshake.
1594 async fn create_impl<H, W, M>(
1595 &mut self,
1596 cell_protocol: RelayCryptLayerProtocol,
1597 recvcreated: oneshot::Receiver<CreateResponse>,
1598 wrap: &W,
1599 key: &H::KeyType,
1600 params: &CircParameters,
1601 msg: &M,
1602 ) -> Result<()>
1603 where
1604 H: ClientHandshake + HandshakeAuxDataHandler,
1605 W: CreateHandshakeWrap,
1606 H::KeyGen: KeyGenerator,
1607 M: Borrow<H::ClientAuxData>,
1608 {
1609 // We don't need to shut down the circuit on failure here, since this
1610 // function consumes the PendingClientCirc and only returns
1611 // a ClientCirc on success.
1612
1613 let (state, msg) = {
1614 // done like this because holding the RNG across an await boundary makes the future
1615 // non-Send
1616 let mut rng = rand::thread_rng();
1617 H::client1(&mut rng, key, msg)?
1618 };
1619 let create_cell = wrap.to_chanmsg(msg);
1620 trace!(
1621 "{}: Extending to hop 1 with {}",
1622 self.unique_id,
1623 create_cell.cmd()
1624 );
1625 self.send_msg(create_cell).await?;
1626
1627 let reply = recvcreated
1628 .await
1629 .map_err(|_| Error::CircProto("Circuit closed while waiting".into()))?;
1630
1631 let relay_handshake = wrap.decode_chanmsg(reply)?;
1632 let (server_msg, keygen) = H::client2(state, relay_handshake)?;
1633
1634 H::handle_server_aux_data(params, &server_msg)?;
1635
1636 let relay_cell_format = cell_protocol.relay_cell_format();
1637 let BoxedClientLayer { fwd, back, binding } =
1638 cell_protocol.construct_layers(HandshakeRole::Initiator, keygen)?;
1639
1640 trace!("{}: Handshake complete; circuit created.", self.unique_id);
1641
1642 let peer_id = self.channel.target().clone();
1643
1644 self.add_hop(
1645 relay_cell_format,
1646 path::HopDetail::Relay(peer_id),
1647 fwd,
1648 back,
1649 binding,
1650 params,
1651 );
1652 Ok(())
1653 }
1654
1655 /// Use the (questionable!) CREATE_FAST handshake to connect to the
1656 /// first hop of this circuit.
1657 ///
1658 /// There's no authentication in CREATE_FAST,
1659 /// so we don't need to know whom we're connecting to: we're just
1660 /// connecting to whichever relay the channel is for.
1661 async fn create_firsthop_fast(
1662 &mut self,
1663 recvcreated: oneshot::Receiver<CreateResponse>,
1664 params: &CircParameters,
1665 ) -> Result<()> {
1666 // In a CREATE_FAST handshake, we can't negotiate a format other than this.
1667 let protocol = RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0);
1668 let wrap = CreateFastWrap;
1669 self.create_impl::<CreateFastClient, _, _>(protocol, recvcreated, &wrap, &(), params, &())
1670 .await
1671 }
1672
1673 /// Use the ntor handshake to connect to the first hop of this circuit.
1674 ///
1675 /// Note that the provided keys must match the channel's target,
1676 /// or the handshake will fail.
1677 async fn create_firsthop_ntor(
1678 &mut self,
1679 recvcreated: oneshot::Receiver<CreateResponse>,
1680 ed_identity: pk::ed25519::Ed25519Identity,
1681 pubkey: NtorPublicKey,
1682 params: &CircParameters,
1683 ) -> Result<()> {
1684 // In an ntor handshake, we can't negotiate a format other than this.
1685 let relay_cell_protocol = RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0);
1686
1687 // Exit now if we have an Ed25519 or RSA identity mismatch.
1688 let target = RelayIds::builder()
1689 .ed_identity(ed_identity)
1690 .rsa_identity(pubkey.id)
1691 .build()
1692 .expect("Unable to build RelayIds");
1693 self.channel.check_match(&target)?;
1694
1695 let wrap = Create2Wrap {
1696 handshake_type: HandshakeType::NTOR,
1697 };
1698 self.create_impl::<NtorClient, _, _>(
1699 relay_cell_protocol,
1700 recvcreated,
1701 &wrap,
1702 &pubkey,
1703 params,
1704 &(),
1705 )
1706 .await
1707 }
1708
1709 /// Use the ntor-v3 handshake to connect to the first hop of this circuit.
1710 ///
1711 /// Note that the provided key must match the channel's target,
1712 /// or the handshake will fail.
1713 #[cfg(feature = "ntor_v3")]
1714 async fn create_firsthop_ntor_v3(
1715 &mut self,
1716 recvcreated: oneshot::Receiver<CreateResponse>,
1717 pubkey: NtorV3PublicKey,
1718 params: &CircParameters,
1719 ) -> Result<()> {
1720 // Exit now if we have a mismatched key.
1721 let target = RelayIds::builder()
1722 .ed_identity(pubkey.id)
1723 .build()
1724 .expect("Unable to build RelayIds");
1725 self.channel.check_match(&target)?;
1726
1727 // TODO: Add support for negotiating other formats.
1728 let relay_cell_protocol = RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0);
1729
1730 // TODO: Set client extensions. e.g. request congestion control
1731 // if specified in `params`.
1732 let client_extensions = [];
1733
1734 let wrap = Create2Wrap {
1735 handshake_type: HandshakeType::NTOR_V3,
1736 };
1737
1738 self.create_impl::<NtorV3Client, _, _>(
1739 relay_cell_protocol,
1740 recvcreated,
1741 &wrap,
1742 &pubkey,
1743 params,
1744 &client_extensions,
1745 )
1746 .await
1747 }
1748
1749 /// Add a hop to the end of this circuit.
1750 fn add_hop(
1751 &mut self,
1752 format: RelayCellFormat,
1753 peer_id: path::HopDetail,
1754 fwd: Box<dyn OutboundClientLayer + 'static + Send>,
1755 rev: Box<dyn InboundClientLayer + 'static + Send>,
1756 binding: Option<CircuitBinding>,
1757 params: &CircParameters,
1758 ) {
1759 let hop_num = (self.hops.len() as u8).into();
1760 let hop = CircHop::new(self.unique_id, hop_num, format, params);
1761 self.hops.push(hop);
1762 self.crypto_in.add_layer(rev);
1763 self.crypto_out.add_layer(fwd);
1764 let mut mutable = self.mutable.lock().expect("poisoned lock");
1765 Arc::make_mut(&mut mutable.path).push_hop(peer_id);
1766 mutable.binding.push(binding);
1767 }
1768
1769 /// Handle a RELAY cell on this circuit with stream ID 0.
1770 fn handle_meta_cell(
1771 &mut self,
1772 handlers: &mut CellHandlers,
1773 hopnum: HopNum,
1774 msg: UnparsedRelayMsg,
1775 ) -> Result<Option<RunOnceCmdInner>> {
1776 // SENDME cells and TRUNCATED get handled internally by the circuit.
1777
1778 // TODO: This pattern (Check command, try to decode, map error) occurs
1779 // several times, and would be good to extract simplify. Such
1780 // simplification is obstructed by a couple of factors: First, that
1781 // there is not currently a good way to get the RelayCmd from _type_ of
1782 // a RelayMsg. Second, that decode() [correctly] consumes the
1783 // UnparsedRelayMsg. I tried a macro-based approach, and didn't care
1784 // for it. -nickm
1785 if msg.cmd() == RelayCmd::SENDME {
1786 let sendme = msg
1787 .decode::<Sendme>()
1788 .map_err(|e| Error::from_bytes_err(e, "sendme message"))?
1789 .into_msg();
1790
1791 return Ok(Some(RunOnceCmdInner::HandleSendMe {
1792 hop: hopnum,
1793 sendme,
1794 }));
1795 }
1796 if msg.cmd() == RelayCmd::TRUNCATED {
1797 let truncated = msg
1798 .decode::<Truncated>()
1799 .map_err(|e| Error::from_bytes_err(e, "truncated message"))?
1800 .into_msg();
1801 let reason = truncated.reason();
1802 debug!(
1803 "{}: Truncated from hop {}. Reason: {} [{}]",
1804 self.unique_id,
1805 hopnum.display(),
1806 reason.human_str(),
1807 reason
1808 );
1809
1810 return Ok(Some(RunOnceCmdInner::CleanShutdown));
1811 }
1812
1813 trace!("{}: Received meta-cell {:?}", self.unique_id, msg);
1814
1815 // For all other command types, we'll only get them in response
1816 // to another command, which should have registered a responder.
1817 //
1818 // TODO: that means that service-introduction circuits will need
1819 // a different implementation, but that should be okay. We'll work
1820 // something out.
1821 if let Some(mut handler) = handlers.meta_handler.take() {
1822 if handler.expected_hop() == hopnum {
1823 // Somebody was waiting for a message -- maybe this message
1824 let ret = handler.handle_msg(msg, self);
1825 trace!(
1826 "{}: meta handler completed with result: {:?}",
1827 self.unique_id,
1828 ret
1829 );
1830 match ret {
1831 #[cfg(feature = "send-control-msg")]
1832 Ok(MetaCellDisposition::Consumed) => {
1833 handlers.meta_handler = Some(handler);
1834 Ok(None)
1835 }
1836 Ok(MetaCellDisposition::ConversationFinished) => Ok(None),
1837 #[cfg(feature = "send-control-msg")]
1838 Ok(MetaCellDisposition::CloseCirc) => Ok(Some(RunOnceCmdInner::CleanShutdown)),
1839 Err(e) => Err(e),
1840 }
1841 } else {
1842 // Somebody wanted a message from a different hop! Put this
1843 // one back.
1844 handlers.meta_handler = Some(handler);
1845 Err(Error::CircProto(format!(
1846 "Unexpected {} cell from hop {} on client circuit",
1847 msg.cmd(),
1848 hopnum.display(),
1849 )))
1850 }
1851 } else {
1852 // No need to call shutdown here, since this error will
1853 // propagate to the reactor shut it down.
1854 Err(Error::CircProto(format!(
1855 "Unexpected {} cell on client circuit",
1856 msg.cmd()
1857 )))
1858 }
1859 }
1860
1861 /// Handle a RELAY_SENDME cell on this circuit with stream ID 0.
1862 fn handle_sendme(
1863 &mut self,
1864 hopnum: HopNum,
1865 msg: Sendme,
1866 signals: CongestionSignals,
1867 ) -> Result<Option<RunOnceCmdInner>> {
1868 // No need to call "shutdown" on errors in this function;
1869 // it's called from the reactor task and errors will propagate there.
1870 let hop = self
1871 .hop_mut(hopnum)
1872 .ok_or_else(|| Error::CircProto(format!("Couldn't find hop {}", hopnum.display())))?;
1873
1874 let tag = match msg.into_tag() {
1875 Some(v) => CircTag::try_from(v.as_slice())
1876 .map_err(|_| Error::CircProto("malformed tag on circuit sendme".into()))?,
1877 None => {
1878 // Versions of Tor <=0.3.5 would omit a SENDME tag in this case;
1879 // but we don't support those any longer.
1880 return Err(Error::CircProto("missing tag on circuit sendme".into()));
1881 }
1882 };
1883 // Update the CC object that we received a SENDME along with possible congestion signals.
1884 hop.ccontrol.note_sendme_received(tag, signals)?;
1885 Ok(None)
1886 }
1887
1888 /// Send a message onto the circuit's channel.
1889 ///
1890 /// If the channel is ready to accept messages, it will be sent immediately. If not, the message
1891 /// will be enqueued for sending at a later iteration of the reactor loop.
1892 ///
1893 /// # Note
1894 ///
1895 /// Making use of the enqueuing capabilities of this function is discouraged! You should first
1896 /// check whether the channel is ready to receive messages (`self.channel.poll_ready`), and
1897 /// ideally use this to implement backpressure (such that you do not read from other sources
1898 /// that would send here while you know you're unable to forward the messages on).
1899 async fn send_msg(&mut self, msg: AnyChanMsg) -> Result<()> {
1900 let cell = AnyChanCell::new(Some(self.channel_id), msg);
1901 // Note: this future is always `Ready`, so await won't block.
1902 Pin::new(&mut self.chan_sender).send_unbounded(cell).await?;
1903 Ok(())
1904 }
1905
1906 /// Returns a [`Stream`] of [`RunOnceCmdInner`] to poll from the main loop.
1907 ///
1908 /// The iterator contains at most one [`RunOnceCmdInner`] for each hop,
1909 /// representing the instructions for handling the ready-item, if any,
1910 /// of its highest priority stream.
1911 ///
1912 /// IMPORTANT: this stream locks the stream map mutexes of each `CircHop`!
1913 /// To avoid contention, never create more than one [`Circuit::ready_streams_iterator`]
1914 /// stream at a time!
1915 fn ready_streams_iterator(&self) -> impl Stream<Item = Result<RunOnceCmdInner>> {
1916 self.hops
1917 .iter()
1918 .enumerate()
1919 .filter_map(|(i, hop)| {
1920 if !hop.ccontrol.can_send() {
1921 // We can't send anything on this hop that counts towards SENDME windows.
1922 //
1923 // In theory we could send messages that don't count towards
1924 // windows (like `RESOLVE`), and process end-of-stream
1925 // events (to send an `END`), but it's probably not worth
1926 // doing an O(N) iteration over flow-control-ready streams
1927 // to see if that's the case.
1928 //
1929 // This *doesn't* block outgoing flow-control messages (e.g.
1930 // SENDME), which are initiated via the control-message
1931 // channel, handled above.
1932 //
1933 // TODO: Consider revisiting. OTOH some extra throttling when circuit-level
1934 // congestion control has "bottomed out" might not be so bad, and the
1935 // alternatives have complexity and/or performance costs.
1936 return None;
1937 }
1938
1939 let hop_num = HopNum::from(i as u8);
1940 let hop_map = Arc::clone(&self.hops[i].map);
1941 Some(async move {
1942 futures::future::poll_fn(move |cx| {
1943 // Process an outbound message from the first ready stream on
1944 // this hop. The stream map implements round robin scheduling to
1945 // ensure fairness across streams.
1946 // TODO: Consider looping here to process multiple ready
1947 // streams. Need to be careful though to balance that with
1948 // continuing to service incoming and control messages.
1949 let mut hop_map = hop_map.lock().expect("lock poisoned");
1950 let Some((sid, msg)) = hop_map.poll_ready_streams_iter(cx).next() else {
1951 // No ready streams for this hop.
1952 return Poll::Pending;
1953 };
1954
1955 if msg.is_none() {
1956 return Poll::Ready(Ok(RunOnceCmdInner::CloseStream {
1957 hop_num,
1958 sid,
1959 behav: CloseStreamBehavior::default(),
1960 reason: streammap::TerminateReason::StreamTargetClosed,
1961 done: None,
1962 }));
1963 };
1964 let msg = hop_map.take_ready_msg(sid).expect("msg disappeared");
1965
1966 #[allow(unused)] // unused in non-debug builds
1967 let Some(StreamEntMut::Open(s)) = hop_map.get_mut(sid) else {
1968 panic!("Stream {sid} disappeared");
1969 };
1970
1971 debug_assert!(
1972 s.can_send(&msg),
1973 "Stream {sid} produced a message it can't send: {msg:?}"
1974 );
1975
1976 let cell = SendRelayCell {
1977 hop: hop_num,
1978 early: false,
1979 cell: AnyRelayMsgOuter::new(Some(sid), msg),
1980 };
1981 Poll::Ready(Ok(RunOnceCmdInner::Send { cell, done: None }))
1982 })
1983 .await
1984 })
1985 })
1986 .collect::<FuturesUnordered<_>>()
1987 }
1988
1989 /// Return the congestion signals for this reactor. This is used by congestion control module.
1990 ///
1991 /// Note: This is only async because we need a Context to check the sink for readiness.
1992 async fn congestion_signals(&mut self) -> CongestionSignals {
1993 futures::future::poll_fn(|cx| -> Poll<CongestionSignals> {
1994 Poll::Ready(CongestionSignals::new(
1995 self.chan_sender.poll_ready_unpin_bool(cx).unwrap_or(false),
1996 self.chan_sender.n_queued(),
1997 ))
1998 })
1999 .await
2000 }
2001
2002 /// Return the hop corresponding to `hopnum`, if there is one.
2003 fn hop_mut(&mut self, hopnum: HopNum) -> Option<&mut CircHop> {
2004 self.hops.get_mut(Into::<usize>::into(hopnum))
2005 }
2006
2007 /// Begin a stream with the provided hop in this circuit.
2008 fn begin_stream(
2009 &mut self,
2010 hop_num: HopNum,
2011 message: AnyRelayMsg,
2012 sender: StreamMpscSender<UnparsedRelayMsg>,
2013 rx: StreamMpscReceiver<AnyRelayMsg>,
2014 cmd_checker: AnyCmdChecker,
2015 ) -> StdResult<Result<(SendRelayCell, StreamId)>, Bug> {
2016 let Some(hop) = self.hop_mut(hop_num) else {
2017 return Err(internal!(
2018 "{}: Attempting to send a BEGIN cell to an unknown hop {hop_num:?}",
2019 self.unique_id,
2020 ));
2021 };
2022
2023 Ok(hop.begin_stream(message, sender, rx, cmd_checker))
2024 }
2025
2026 /// Close the specified stream
2027 async fn close_stream(
2028 &mut self,
2029 hop_num: HopNum,
2030 sid: StreamId,
2031 behav: CloseStreamBehavior,
2032 reason: streammap::TerminateReason,
2033 ) -> Result<()> {
2034 if let Some(hop) = self.hop_mut(hop_num) {
2035 let res = hop.close_stream(sid, behav, reason)?;
2036 if let Some(cell) = res {
2037 self.send_relay_cell(cell).await?;
2038 }
2039 }
2040 Ok(())
2041 }
2042}
2043
2044impl CellHandlers {
2045 /// Try to install a given meta-cell handler to receive any unusual cells on
2046 /// this circuit, along with a result channel to notify on completion.
2047 fn set_meta_handler(&mut self, handler: Box<dyn MetaCellHandler + Send>) -> Result<()> {
2048 if self.meta_handler.is_none() {
2049 self.meta_handler = Some(handler);
2050 Ok(())
2051 } else {
2052 Err(Error::from(internal!(
2053 "Tried to install a meta-cell handler before the old one was gone."
2054 )))
2055 }
2056 }
2057
2058 /// Try to install a given cell handler on this circuit.
2059 #[cfg(feature = "hs-service")]
2060 fn set_incoming_stream_req_handler(
2061 &mut self,
2062 handler: IncomingStreamRequestHandler,
2063 ) -> Result<()> {
2064 if self.incoming_stream_req_handler.is_none() {
2065 self.incoming_stream_req_handler = Some(handler);
2066 Ok(())
2067 } else {
2068 Err(Error::from(internal!(
2069 "Tried to install a BEGIN cell handler before the old one was gone."
2070 )))
2071 }
2072 }
2073}
2074
2075/// Return the stream ID of `msg`, if it has one.
2076///
2077/// Returns `Ok(None)` if `msg` is a meta cell.
2078fn msg_streamid(msg: &UnparsedRelayMsg) -> Result<Option<StreamId>> {
2079 let cmd = msg.cmd();
2080 let streamid = msg.stream_id();
2081 if !cmd.accepts_streamid_val(streamid) {
2082 return Err(Error::CircProto(format!(
2083 "Invalid stream ID {} for relay command {}",
2084 sv(StreamId::get_or_zero(streamid)),
2085 msg.cmd()
2086 )));
2087 }
2088
2089 Ok(streamid)
2090}
2091
2092impl Drop for Circuit {
2093 fn drop(&mut self) {
2094 let _ = self.channel.close_circuit(self.channel_id);
2095 }
2096}
2097
2098#[cfg(test)]
2099mod test {
2100 // Tested in [`crate::tunnel::circuit::test`].
2101}