tor_proto/client/reactor.rs
1//! Code to handle incoming cells on a circuit.
2//!
3//! ## On message validation
4//!
5//! There are three steps for validating an incoming message on a stream:
6//!
7//! 1. Is the message contextually appropriate? (e.g., no more than one
8//! `CONNECTED` message per stream.) This is handled by calling
9//! [`CmdChecker::check_msg`](crate::stream::cmdcheck::CmdChecker::check_msg).
10//! 2. Does the message comply with flow-control rules? (e.g., no more SENDMEs
11//! than we've sent data for.) This is handled within the reactor by the
12//! `StreamFlowCtrl`. For half-closed streams which don't send stream
13//! SENDMEs, an additional receive-window check is performed in the
14//! `halfstream` module.
15//! 3. Does the message have an acceptable command type, and is the message
16//! well-formed? For open streams, the streams themselves handle this check.
17//! For half-closed streams, the reactor handles it by calling
18//! `consume_checked_msg()`.
19
20pub(crate) mod circuit;
21mod conflux;
22mod control;
23pub(super) mod syncview;
24
25use crate::circuit::UniqId;
26use crate::circuit::celltypes::ClientCircChanMsg;
27use crate::circuit::circhop::SendRelayCell;
28use crate::client::circuit::padding::{PaddingController, PaddingEvent, PaddingEventStream};
29use crate::client::circuit::{CircuitRxReceiver, TimeoutEstimator};
30#[cfg(feature = "hs-service")]
31use crate::client::stream::{IncomingStreamRequest, IncomingStreamRequestFilter};
32use crate::client::{HopLocation, TargetHop};
33use crate::crypto::cell::HopNum;
34use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
35use crate::memquota::{CircuitAccount, StreamAccount};
36use crate::stream::cmdcheck::AnyCmdChecker;
37use crate::stream::flow_ctrl::state::StreamRateLimit;
38use crate::stream::flow_ctrl::xon_xoff::reader::DrainRateRequest;
39use crate::stream::queue::StreamQueueReceiver;
40use crate::stream::{CloseStreamBehavior, StreamMpscSender};
41use crate::streammap;
42use crate::tunnel::{TunnelId, TunnelScopedCircId};
43use crate::util::err::ReactorError;
44use crate::util::notify::NotifyReceiver;
45use crate::util::skew::ClockSkew;
46use crate::{Error, Result};
47use circuit::Circuit;
48use conflux::ConfluxSet;
49use control::ControlHandler;
50use postage::watch;
51use std::cmp::Ordering;
52use std::collections::BinaryHeap;
53use std::mem::size_of;
54use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
55use tor_cell::relaycell::msg::{AnyRelayMsg, Sendme};
56use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId, UnparsedRelayMsg};
57use tor_error::{Bug, bad_api_usage, debug_report, internal, into_bad_api_usage};
58use tor_rtcompat::{DynTimeProvider, SleepProvider};
59
60use cfg_if::cfg_if;
61use futures::StreamExt;
62use futures::channel::mpsc;
63use futures::{FutureExt as _, select_biased};
64use oneshot_fused_workaround as oneshot;
65
66use std::result::Result as StdResult;
67use std::sync::Arc;
68use std::time::Duration;
69
70use crate::channel::Channel;
71use crate::conflux::msghandler::RemoveLegReason;
72use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey};
73use circuit::CircuitCmd;
74use derive_deftly::Deftly;
75use derive_more::From;
76use smallvec::smallvec;
77use tor_cell::chancell::CircId;
78use tor_llcrypto::pk;
79use tor_memquota::derive_deftly_template_HasMemoryCost;
80use tor_memquota::mq_queue::{self, MpscSpec};
81use tracing::{debug, info, instrument, trace, warn};
82
83use super::circuit::{MutableState, TunnelMutableState};
84
85#[cfg(feature = "conflux")]
86use {
87 crate::conflux::msghandler::{ConfluxCmd, OooRelayMsg},
88 crate::util::err::ConfluxHandshakeError,
89};
90
91pub(super) use control::{CtrlCmd, CtrlMsg, FlowCtrlMsg};
92
93/// The type of a oneshot channel used to inform reactor of the result of an operation.
94pub(super) type ReactorResultChannel<T> = oneshot::Sender<Result<T>>;
95
96/// Contains a list of conflux handshake results.
97#[cfg(feature = "conflux")]
98pub(super) type ConfluxHandshakeResult = Vec<StdResult<(), ConfluxHandshakeError>>;
99
100/// The type of oneshot channel used to inform reactor users of the outcome
101/// of a client-side conflux handshake.
102///
103/// Contains a list of handshake results, one for each circuit that we were asked
104/// to link in the tunnel.
105#[cfg(feature = "conflux")]
106pub(super) type ConfluxLinkResultChannel = ReactorResultChannel<ConfluxHandshakeResult>;
107
108pub(crate) use circuit::{RECV_WINDOW_INIT, STREAM_READER_BUFFER};
109
110/// MPSC queue containing stream requests
111#[cfg(feature = "hs-service")]
112type StreamReqSender = mq_queue::Sender<StreamReqInfo, MpscSpec>;
113
114/// A handshake type, to be used when creating circuit hops.
115#[derive(Clone, Debug)]
116pub(crate) enum CircuitHandshake {
117 /// Use the CREATE_FAST handshake.
118 CreateFast,
119 /// Use the ntor handshake.
120 Ntor {
121 /// The public key of the relay.
122 public_key: NtorPublicKey,
123 /// The Ed25519 identity of the relay, which is verified against the
124 /// identity held in the circuit's channel.
125 ed_identity: pk::ed25519::Ed25519Identity,
126 },
127 /// Use the ntor-v3 handshake.
128 NtorV3 {
129 /// The public key of the relay.
130 public_key: NtorV3PublicKey,
131 },
132}
133
134// TODO: the RunOnceCmd/RunOnceCmdInner/CircuitCmd/CircuitAction enum
135// proliferation is a bit bothersome, but unavoidable with the current design.
136//
137// We should consider getting rid of some of these enums (if possible),
138// and coming up with more intuitive names.
139
140/// One or more [`RunOnceCmdInner`] to run inside [`Reactor::run_once`].
141#[derive(From, Debug)]
142#[allow(clippy::large_enum_variant)] // TODO #2003: resolve this
143enum RunOnceCmd {
144 /// Run a single `RunOnceCmdInner` command.
145 Single(RunOnceCmdInner),
146 /// Run multiple `RunOnceCmdInner` commands.
147 //
148 // Note: this whole enum *could* be replaced with Vec<RunOnceCmdInner>,
149 // but most of the time we're only going to have *one* RunOnceCmdInner
150 // to run per run_once() loop. The enum enables us avoid the extra heap
151 // allocation for the `RunOnceCmd::Single` case.
152 Multiple(Vec<RunOnceCmdInner>),
153}
154
155/// Instructions for running something in the reactor loop.
156///
157/// Run at the end of [`Reactor::run_once`].
158//
159// TODO: many of the variants of this enum have an identical CtrlMsg counterpart.
160// We should consider making each variant a tuple variant and deduplicating the fields.
161#[derive(educe::Educe)]
162#[educe(Debug)]
163enum RunOnceCmdInner {
164 /// Send a RELAY cell.
165 Send {
166 /// The leg the cell should be sent on.
167 leg: UniqId,
168 /// The cell to send.
169 cell: SendRelayCell,
170 /// A channel for sending completion notifications.
171 done: Option<ReactorResultChannel<()>>,
172 },
173 /// Send a given control message on this circuit, and install a control-message handler to
174 /// receive responses.
175 #[cfg(feature = "send-control-msg")]
176 SendMsgAndInstallHandler {
177 /// The message to send, if any
178 msg: Option<AnyRelayMsgOuter>,
179 /// A message handler to install.
180 ///
181 /// If this is `None`, there must already be a message handler installed
182 #[educe(Debug(ignore))]
183 handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
184 /// A sender that we use to tell the caller that the message was sent
185 /// and the handler installed.
186 done: oneshot::Sender<Result<()>>,
187 },
188 /// Handle a SENDME message.
189 HandleSendMe {
190 /// The leg the SENDME was received on.
191 leg: UniqId,
192 /// The hop number.
193 hop: HopNum,
194 /// The SENDME message to handle.
195 sendme: Sendme,
196 },
197 /// Begin a stream with the provided hop in this circuit.
198 ///
199 /// Uses the provided stream ID, and sends the provided message to that hop.
200 BeginStream {
201 /// The cell to send.
202 cell: Result<(SendRelayCell, StreamId)>,
203 /// The location of the hop on the tunnel. We don't use this (and `Circuit`s shouldn't need
204 /// to worry about legs anyways), but need it so that we can pass it back in `done` to the
205 /// caller.
206 hop: HopLocation,
207 /// The circuit leg to begin the stream on.
208 leg: UniqId,
209 /// Oneshot channel to notify on completion, with the allocated stream ID.
210 done: ReactorResultChannel<(StreamId, HopLocation, RelayCellFormat)>,
211 },
212 /// Consider sending an XON message with the given `rate`.
213 MaybeSendXon {
214 /// The drain rate to advertise in the XON message.
215 rate: XonKbpsEwma,
216 /// The ID of the stream to send the message on.
217 stream_id: StreamId,
218 /// The location of the hop on the tunnel.
219 hop: HopLocation,
220 },
221 /// Close the specified stream.
222 CloseStream {
223 /// The hop number.
224 hop: HopLocation,
225 /// The ID of the stream to close.
226 sid: StreamId,
227 /// The stream-closing behavior.
228 behav: CloseStreamBehavior,
229 /// The reason for closing the stream.
230 reason: streammap::TerminateReason,
231 /// A channel for sending completion notifications.
232 done: Option<ReactorResultChannel<()>>,
233 },
234 /// Get the clock skew claimed by the first hop of the circuit.
235 FirstHopClockSkew {
236 /// Oneshot channel to return the clock skew.
237 answer: oneshot::Sender<StdResult<ClockSkew, Bug>>,
238 },
239 /// Remove a circuit leg from the conflux set.
240 RemoveLeg {
241 /// The circuit leg to remove.
242 leg: UniqId,
243 /// The reason for removal.
244 ///
245 /// This is only used for conflux circuits that get removed
246 /// before the conflux handshake is complete.
247 ///
248 /// The [`RemoveLegReason`] is mapped by the reactor to a
249 /// [`ConfluxHandshakeError`] that is sent to the initiator of the
250 /// handshake to indicate the reason the handshake failed.
251 reason: RemoveLegReason,
252 },
253 /// A circuit has completed the conflux handshake,
254 /// and wants to send the specified cell.
255 ///
256 /// This is similar to [`RunOnceCmdInner::Send`],
257 /// but needs to remain a separate variant,
258 /// because in addition to instructing the reactor to send a cell,
259 /// it also notifies it that the conflux handshake is complete on the specified `leg`.
260 /// This enables the reactor to save the handshake result (`Ok(())`),
261 /// and, if there are no other legs still in the handshake phase,
262 /// send the result to the handshake initiator.
263 #[cfg(feature = "conflux")]
264 ConfluxHandshakeComplete {
265 /// The circuit leg that has completed the handshake,
266 /// This is the leg the cell should be sent on.
267 leg: UniqId,
268 /// The cell to send.
269 cell: SendRelayCell,
270 },
271 /// Send a LINK cell on each of the unlinked circuit legs in the conflux set of this reactor.
272 #[cfg(feature = "conflux")]
273 Link {
274 /// The circuits to link into the tunnel
275 #[educe(Debug(ignore))]
276 circuits: Vec<Circuit>,
277 /// Oneshot channel for notifying of conflux handshake completion.
278 answer: ConfluxLinkResultChannel,
279 },
280 /// Enqueue an out-of-order cell in ooo_msg.
281 #[cfg(feature = "conflux")]
282 Enqueue {
283 /// The leg the entry originated from.
284 leg: UniqId,
285 /// The out-of-order message.
286 msg: OooRelayMsg,
287 },
288 /// Take a padding-related action on a circuit leg.
289 #[cfg(feature = "circ-padding")]
290 PaddingAction {
291 /// The leg to action on.
292 leg: UniqId,
293 /// The action to take.
294 padding_action: PaddingEvent,
295 },
296 /// Perform a clean shutdown on this circuit.
297 CleanShutdown,
298}
299
300impl RunOnceCmdInner {
301 /// Create a [`RunOnceCmdInner`] out of a [`CircuitCmd`] and [`UniqId`].
302 fn from_circuit_cmd(leg: UniqId, cmd: CircuitCmd) -> Self {
303 match cmd {
304 CircuitCmd::Send(cell) => Self::Send {
305 leg,
306 cell,
307 done: None,
308 },
309 CircuitCmd::HandleSendMe { hop, sendme } => Self::HandleSendMe { leg, hop, sendme },
310 CircuitCmd::CloseStream {
311 hop,
312 sid,
313 behav,
314 reason,
315 } => Self::CloseStream {
316 hop: HopLocation::Hop((leg, hop)),
317 sid,
318 behav,
319 reason,
320 done: None,
321 },
322 #[cfg(feature = "conflux")]
323 CircuitCmd::Conflux(ConfluxCmd::RemoveLeg(reason)) => Self::RemoveLeg { leg, reason },
324 #[cfg(feature = "conflux")]
325 CircuitCmd::Conflux(ConfluxCmd::HandshakeComplete { hop, early, cell }) => {
326 let cell = SendRelayCell {
327 hop: Some(hop),
328 early,
329 cell,
330 };
331 Self::ConfluxHandshakeComplete { leg, cell }
332 }
333 #[cfg(feature = "conflux")]
334 CircuitCmd::Enqueue(msg) => Self::Enqueue { leg, msg },
335 CircuitCmd::CleanShutdown => Self::CleanShutdown,
336 }
337 }
338}
339
340/// A command to execute at the end of [`Reactor::run_once`].
341#[derive(From, Debug)]
342#[allow(clippy::large_enum_variant)] // TODO #2003: should we resolve this?
343enum CircuitAction {
344 /// Run a single `CircuitCmd` command.
345 RunCmd {
346 /// The unique identifier of the circuit leg to run the command on
347 leg: UniqId,
348 /// The command to run.
349 cmd: CircuitCmd,
350 },
351 /// Handle a control message
352 HandleControl(CtrlMsg),
353 /// Handle an input message.
354 HandleCell {
355 /// The unique identifier of the circuit leg the message was received on.
356 leg: UniqId,
357 /// The message to handle.
358 cell: ClientCircChanMsg,
359 },
360 /// Remove the specified circuit leg from the conflux set.
361 ///
362 /// Returned whenever a single circuit leg needs to be be removed
363 /// from the reactor's conflux set, without necessarily tearing down
364 /// the whole set or shutting down the reactor.
365 ///
366 /// Note: this action *can* cause the reactor to shut down
367 /// (and the conflux set to be closed).
368 ///
369 /// See the [`ConfluxSet::remove`] docs for more on the exact behavior of this command.
370 RemoveLeg {
371 /// The leg to remove.
372 leg: UniqId,
373 /// The reason for removal.
374 ///
375 /// This is only used for conflux circuits that get removed
376 /// before the conflux handshake is complete.
377 ///
378 /// The [`RemoveLegReason`] is mapped by the reactor to a
379 /// [`ConfluxHandshakeError`] that is sent to the initiator of the
380 /// handshake to indicate the reason the handshake failed.
381 reason: RemoveLegReason,
382 },
383 /// Take some action (blocking or unblocking a circuit, or sending padding)
384 /// based on the circuit padding backend code.
385 PaddingAction {
386 /// The leg on which to take the padding action.
387 leg: UniqId,
388 /// The action to take.
389 padding_action: PaddingEvent,
390 },
391}
392
393impl CircuitAction {
394 /// Return the ordering with which we should handle this action
395 /// within a list of actions returned by a single call to next_circ_action().
396 ///
397 /// NOTE: Please do not make this any more complicated:
398 /// It is a consequence of a kludge that we need this sorting at all.
399 /// Assuming that eventually, we switch away from the current
400 /// poll-oriented `next_circ_action` design,
401 /// we may be able to get rid of this entirely.
402 fn order_within_batch(&self) -> u8 {
403 use CircuitAction as CA;
404 use PaddingEvent as PE;
405 const EARLY: u8 = 0;
406 const NORMAL: u8 = 1;
407 const LATE: u8 = 2;
408
409 // We use this ordering to move any "StartBlocking" to the _end_ of a batch and
410 // "StopBlocking" to the start.
411 //
412 // This way, we can be sure that we will handle any "send data" operations
413 // (and tell the Padder about them) _before_ we tell the Padder
414 // that we have blocked the circuit.
415 //
416 // This keeps things a bit more logical.
417 match self {
418 CA::RunCmd { .. } => NORMAL,
419 CA::HandleControl(..) => NORMAL,
420 CA::HandleCell { .. } => NORMAL,
421 CA::RemoveLeg { .. } => NORMAL,
422 #[cfg(feature = "circ-padding")]
423 CA::PaddingAction { padding_action, .. } => match padding_action {
424 PE::StopBlocking => EARLY,
425 PE::SendPadding(..) => NORMAL,
426 PE::StartBlocking(..) => LATE,
427 },
428 #[cfg(not(feature = "circ-padding"))]
429 CA::PaddingAction { .. } => NORMAL,
430 }
431 }
432}
433
434/// An object that's waiting for a meta cell (one not associated with a stream) in order to make
435/// progress.
436///
437/// # Background
438///
439/// The `Reactor` can't have async functions that send and receive cells, because its job is to
440/// send and receive cells: if one of its functions tried to do that, it would just hang forever.
441///
442/// To get around this problem, the reactor can send some cells, and then make one of these
443/// `MetaCellHandler` objects, which will be run when the reply arrives.
444pub(crate) trait MetaCellHandler: Send {
445 /// The hop we're expecting the message to come from. This is compared against the hop
446 /// from which we actually receive messages, and an error is thrown if the two don't match.
447 fn expected_hop(&self) -> HopLocation;
448 /// Called when the message we were waiting for arrives.
449 ///
450 /// Gets a copy of the `Reactor` in order to do anything it likes there.
451 ///
452 /// If this function returns an error, the reactor will shut down.
453 fn handle_msg(
454 &mut self,
455 msg: UnparsedRelayMsg,
456 reactor: &mut Circuit,
457 ) -> Result<MetaCellDisposition>;
458}
459
460/// A possible successful outcome of giving a message to a [`MsgHandler`](super::msghandler::MsgHandler).
461#[derive(Debug, Clone)]
462#[cfg_attr(feature = "send-control-msg", visibility::make(pub))]
463#[non_exhaustive]
464pub(crate) enum MetaCellDisposition {
465 /// The message was consumed; the handler should remain installed.
466 #[cfg(feature = "send-control-msg")]
467 Consumed,
468 /// The message was consumed; the handler should be uninstalled.
469 ConversationFinished,
470 /// The message was consumed; the circuit should be closed.
471 #[cfg(feature = "send-control-msg")]
472 CloseCirc,
473 // TODO: Eventually we might want the ability to have multiple handlers
474 // installed, and to let them say "not for me, maybe for somebody else?".
475 // But right now we don't need that.
476}
477
478/// Unwrap the specified [`Option`], returning a [`ReactorError::Shutdown`] if it is `None`.
479///
480/// This is a macro instead of a function to work around borrowck errors
481/// in the select! from run_once().
482macro_rules! unwrap_or_shutdown {
483 ($self:expr, $res:expr, $reason:expr) => {{
484 match $res {
485 None => {
486 trace!(
487 tunnel_id = %$self.tunnel_id,
488 reason = %$reason,
489 "reactor shutdown"
490 );
491 Err(ReactorError::Shutdown)
492 }
493 Some(v) => Ok(v),
494 }
495 }};
496}
497
498/// Object to handle incoming cells and background tasks on a circuit
499///
500/// This type is returned when you finish a circuit; you need to spawn a
501/// new task that calls `run()` on it.
502#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
503pub struct Reactor {
504 /// Receiver for control messages for this reactor, sent by `ClientCirc` objects.
505 ///
506 /// This channel is polled in [`Reactor::run_once`], but only if the `chan_sender` sink
507 /// is ready to accept cells.
508 control: mpsc::UnboundedReceiver<CtrlMsg>,
509 /// Receiver for command messages for this reactor, sent by `ClientCirc` objects.
510 ///
511 /// This channel is polled in [`Reactor::run_once`].
512 ///
513 /// NOTE: this is a separate channel from `control`, because some messages
514 /// have higher priority and need to be handled even if the `chan_sender` is not
515 /// ready (whereas `control` messages are not read until the `chan_sender` sink
516 /// is ready to accept cells).
517 command: mpsc::UnboundedReceiver<CtrlCmd>,
518 /// A oneshot sender that is used to alert other tasks when this reactor is
519 /// finally dropped.
520 ///
521 /// It is a sender for Void because we never actually want to send anything here;
522 /// we only want to generate canceled events.
523 #[allow(dead_code)] // the only purpose of this field is to be dropped.
524 reactor_closed_tx: oneshot::Sender<void::Void>,
525 /// A set of circuits that form a tunnel.
526 ///
527 /// Contains 1 or more circuits.
528 ///
529 /// Circuits may be added to this set throughout the lifetime of the reactor.
530 ///
531 /// Sometimes, the reactor will remove circuits from this set,
532 /// for example if the `LINKED` message takes too long to arrive,
533 /// or if congestion control negotiation fails.
534 /// The reactor will continue running with the remaining circuits.
535 /// It will shut down if *all* the circuits are removed.
536 ///
537 // TODO(conflux): document all the reasons why the reactor might
538 // chose to tear down a circuit or tunnel (timeouts, protocol violations, etc.)
539 circuits: ConfluxSet,
540 /// An identifier for logging about this tunnel reactor.
541 tunnel_id: TunnelId,
542 /// Handlers, shared with `Circuit`.
543 cell_handlers: CellHandlers,
544 /// The time provider, used for conflux handshake timeouts.
545 runtime: DynTimeProvider,
546 /// The conflux handshake context, if there is an on-going handshake.
547 ///
548 /// Set to `None` if this is a single-path tunnel,
549 /// or if none of the circuit legs from our conflux set
550 /// are currently in the conflux handshake phase.
551 #[cfg(feature = "conflux")]
552 conflux_hs_ctx: Option<ConfluxHandshakeCtx>,
553 /// A min-heap buffering all the out-of-order messages received so far.
554 ///
555 /// TODO(conflux): this becomes a DoS vector unless we impose a limit
556 /// on its size. We should make this participate in the memquota memory
557 /// tracking system, somehow.
558 #[cfg(feature = "conflux")]
559 ooo_msgs: BinaryHeap<ConfluxHeapEntry>,
560}
561
562/// The context for an on-going conflux handshake.
563#[cfg(feature = "conflux")]
564struct ConfluxHandshakeCtx {
565 /// A channel for notifying the caller of the outcome of a CONFLUX_LINK request.
566 answer: ConfluxLinkResultChannel,
567 /// The number of legs that are currently doing the handshake.
568 num_legs: usize,
569 /// The handshake results we have collected so far.
570 results: ConfluxHandshakeResult,
571}
572
573/// An out-of-order message buffered in [`Reactor::ooo_msgs`].
574#[derive(Debug)]
575#[cfg(feature = "conflux")]
576struct ConfluxHeapEntry {
577 /// The leg id this message came from.
578 leg_id: UniqId,
579 /// The out of order message
580 msg: OooRelayMsg,
581}
582
583#[cfg(feature = "conflux")]
584impl Ord for ConfluxHeapEntry {
585 fn cmp(&self, other: &Self) -> Ordering {
586 self.msg.cmp(&other.msg)
587 }
588}
589
590#[cfg(feature = "conflux")]
591impl PartialOrd for ConfluxHeapEntry {
592 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
593 Some(self.cmp(other))
594 }
595}
596
597#[cfg(feature = "conflux")]
598impl PartialEq for ConfluxHeapEntry {
599 fn eq(&self, other: &Self) -> bool {
600 self.msg == other.msg
601 }
602}
603
604#[cfg(feature = "conflux")]
605impl Eq for ConfluxHeapEntry {}
606
607/// Cell handlers, shared between the Reactor and its underlying `Circuit`s.
608struct CellHandlers {
609 /// A handler for a meta cell, together with a result channel to notify on completion.
610 ///
611 /// NOTE(prop349): this is part of Arti's "Base Circuit Hop Handler".
612 ///
613 /// Upon sending an EXTEND cell, the [`ControlHandler`] sets this handler
614 /// to [`CircuitExtender`](circuit::extender::CircuitExtender).
615 /// The handler is then used in [`Circuit::handle_meta_cell`] for handling
616 /// all the meta cells received on the circuit that are not SENDMEs or TRUNCATE
617 /// (which are handled separately) or conflux cells
618 /// (which are handled by the conflux handlers).
619 ///
620 /// The handler is uninstalled after the receipt of the EXTENDED cell,
621 /// so any subsequent EXTENDED cells will cause the circuit to be torn down.
622 meta_handler: Option<Box<dyn MetaCellHandler + Send>>,
623 /// A handler for incoming stream requests.
624 #[cfg(feature = "hs-service")]
625 incoming_stream_req_handler: Option<IncomingStreamRequestHandler>,
626}
627
628/// Information about an incoming stream request.
629#[cfg(feature = "hs-service")]
630#[derive(Debug, Deftly)]
631#[derive_deftly(HasMemoryCost)]
632pub(crate) struct StreamReqInfo {
633 /// The [`IncomingStreamRequest`].
634 pub(crate) req: IncomingStreamRequest,
635 /// The ID of the stream being requested.
636 pub(crate) stream_id: StreamId,
637 /// The [`HopNum`].
638 //
639 // TODO: When we add support for exit relays, we need to turn this into an Option<HopNum>.
640 // (For outbound messages (towards relays), there is only one hop that can send them: the client.)
641 //
642 // TODO: For onion services, we might be able to enforce the HopNum earlier: we would never accept an
643 // incoming stream request from two separate hops. (There is only one that's valid.)
644 pub(crate) hop: HopLocation,
645 /// The format which must be used with this stream to encode messages.
646 #[deftly(has_memory_cost(indirect_size = "0"))]
647 pub(crate) relay_cell_format: RelayCellFormat,
648 /// A channel for receiving messages from this stream.
649 #[deftly(has_memory_cost(indirect_size = "0"))] // estimate
650 pub(crate) receiver: StreamQueueReceiver,
651 /// A channel for sending messages to be sent on this stream.
652 #[deftly(has_memory_cost(indirect_size = "size_of::<AnyRelayMsg>()"))] // estimate
653 pub(crate) msg_tx: StreamMpscSender<AnyRelayMsg>,
654 /// A [`Stream`](futures::Stream) that provides updates to the rate limit for sending data.
655 // TODO(arti#2068): we should consider making this an `Option`
656 // the `watch::Sender` owns the indirect data
657 #[deftly(has_memory_cost(indirect_size = "0"))]
658 pub(crate) rate_limit_stream: watch::Receiver<StreamRateLimit>,
659 /// A [`Stream`](futures::Stream) that provides notifications when a new drain rate is
660 /// requested.
661 #[deftly(has_memory_cost(indirect_size = "0"))]
662 pub(crate) drain_rate_request_stream: NotifyReceiver<DrainRateRequest>,
663 /// The memory quota account to be used for this stream
664 #[deftly(has_memory_cost(indirect_size = "0"))] // estimate (it contains an Arc)
665 pub(crate) memquota: StreamAccount,
666}
667
668/// Data required for handling an incoming stream request.
669#[cfg(feature = "hs-service")]
670#[derive(educe::Educe)]
671#[educe(Debug)]
672struct IncomingStreamRequestHandler {
673 /// A sender for sharing information about an incoming stream request.
674 incoming_sender: StreamReqSender,
675 /// A [`AnyCmdChecker`] for validating incoming stream requests.
676 cmd_checker: AnyCmdChecker,
677 /// The hop to expect incoming stream requests from.
678 hop_num: HopNum,
679 /// An [`IncomingStreamRequestFilter`] for checking whether the user wants
680 /// this request, or wants to reject it immediately.
681 #[educe(Debug(ignore))]
682 filter: Box<dyn IncomingStreamRequestFilter>,
683}
684
685impl Reactor {
686 /// Create a new circuit reactor.
687 ///
688 /// The reactor will send outbound messages on `channel`, receive incoming
689 /// messages on `input`, and identify this circuit by the channel-local
690 /// [`CircId`] provided.
691 ///
692 /// The internal unique identifier for this circuit will be `unique_id`.
693 #[allow(clippy::type_complexity, clippy::too_many_arguments)] // TODO
694 pub(super) fn new(
695 channel: Arc<Channel>,
696 channel_id: CircId,
697 unique_id: UniqId,
698 input: CircuitRxReceiver,
699 runtime: DynTimeProvider,
700 memquota: CircuitAccount,
701 padding_ctrl: PaddingController,
702 padding_stream: PaddingEventStream,
703 timeouts: Arc<dyn TimeoutEstimator + Send>,
704 ) -> (
705 Self,
706 mpsc::UnboundedSender<CtrlMsg>,
707 mpsc::UnboundedSender<CtrlCmd>,
708 oneshot::Receiver<void::Void>,
709 Arc<TunnelMutableState>,
710 ) {
711 let tunnel_id = TunnelId::next();
712 let (control_tx, control_rx) = mpsc::unbounded();
713 let (command_tx, command_rx) = mpsc::unbounded();
714 let mutable = Arc::new(MutableState::default());
715
716 let (reactor_closed_tx, reactor_closed_rx) = oneshot::channel();
717
718 let cell_handlers = CellHandlers {
719 meta_handler: None,
720 #[cfg(feature = "hs-service")]
721 incoming_stream_req_handler: None,
722 };
723
724 let unique_id = TunnelScopedCircId::new(tunnel_id, unique_id);
725 let circuit_leg = Circuit::new(
726 runtime.clone(),
727 channel,
728 channel_id,
729 unique_id,
730 input,
731 memquota,
732 Arc::clone(&mutable),
733 padding_ctrl,
734 padding_stream,
735 timeouts,
736 );
737
738 let (circuits, mutable) = ConfluxSet::new(tunnel_id, circuit_leg);
739
740 let reactor = Reactor {
741 circuits,
742 control: control_rx,
743 command: command_rx,
744 reactor_closed_tx,
745 tunnel_id,
746 cell_handlers,
747 runtime,
748 #[cfg(feature = "conflux")]
749 conflux_hs_ctx: None,
750 #[cfg(feature = "conflux")]
751 ooo_msgs: Default::default(),
752 };
753
754 (reactor, control_tx, command_tx, reactor_closed_rx, mutable)
755 }
756
757 /// Launch the reactor, and run until the circuit closes or we
758 /// encounter an error.
759 ///
760 /// Once this method returns, the circuit is dead and cannot be
761 /// used again.
762 #[instrument(level = "trace", skip_all)]
763 pub async fn run(mut self) -> Result<()> {
764 trace!(tunnel_id = %self.tunnel_id, "Running tunnel reactor");
765 let result: Result<()> = loop {
766 match self.run_once().await {
767 Ok(()) => (),
768 Err(ReactorError::Shutdown) => break Ok(()),
769 Err(ReactorError::Err(e)) => break Err(e),
770 }
771 };
772
773 // Log that the reactor stopped, possibly with the associated error as a report.
774 // May log at a higher level depending on the error kind.
775 const MSG: &str = "Tunnel reactor stopped";
776 match &result {
777 Ok(()) => trace!(tunnel_id = %self.tunnel_id, "{MSG}"),
778 Err(e) => debug_report!(e, tunnel_id = %self.tunnel_id, "{MSG}"),
779 }
780
781 result
782 }
783
784 /// Helper for run: doesn't mark the circuit closed on finish. Only
785 /// processes one cell or control message.
786 #[instrument(level = "trace", skip_all)]
787 async fn run_once(&mut self) -> StdResult<(), ReactorError> {
788 // If all the circuits are closed, shut down the reactor
789 if self.circuits.is_empty() {
790 trace!(
791 tunnel_id = %self.tunnel_id,
792 "Tunnel reactor shutting down: all circuits have closed",
793 );
794
795 return Err(ReactorError::Shutdown);
796 }
797
798 // If this is a single path circuit, we need to wait until the first hop
799 // is created before doing anything else
800 let single_path_with_hops = self
801 .circuits
802 .single_leg_mut()
803 .is_ok_and(|leg| !leg.has_hops());
804 if single_path_with_hops {
805 self.wait_for_create().await?;
806
807 return Ok(());
808 }
809
810 // Prioritize the buffered messages.
811 //
812 // Note: if any of the messages are ready to be handled,
813 // this will block the reactor until we are done processing them
814 //
815 // TODO circpad: If this is a problem, we might want to re-order things so that we
816 // prioritize padding instead. On the other hand, this should be fixed by refactoring
817 // circuit and tunnel reactors, so we might do well to just leave it alone for now.
818 #[cfg(feature = "conflux")]
819 self.try_dequeue_ooo_msgs().await?;
820
821 let mut actions = select_biased! {
822 res = self.command.next() => {
823 let cmd = unwrap_or_shutdown!(self, res, "command channel drop")?;
824 return ControlHandler::new(self).handle_cmd(cmd);
825 },
826 // Check whether we've got a control message pending.
827 //
828 // Note: unfortunately, reading from control here means we might start
829 // handling control messages before our chan_senders are ready.
830 // With the current design, this is inevitable: we can't know which circuit leg
831 // a control message is meant for without first reading the control message from
832 // the channel, and at that point, we can't know for sure whether that particular
833 // circuit is ready for sending.
834 ret = self.control.next() => {
835 let msg = unwrap_or_shutdown!(self, ret, "control drop")?;
836 smallvec![CircuitAction::HandleControl(msg)]
837 },
838 res = self.circuits.next_circ_action(&self.runtime).fuse() => res?,
839 };
840
841 // Put the actions into the order that we need to execute them in.
842 //
843 // (Yes, this _does_ have to be a stable sort. Not all actions may be freely re-ordered
844 // with respect to one another.)
845 actions.sort_by_key(|a| a.order_within_batch());
846
847 for action in actions {
848 let cmd = match action {
849 CircuitAction::RunCmd { leg, cmd } => Some(RunOnceCmd::Single(
850 RunOnceCmdInner::from_circuit_cmd(leg, cmd),
851 )),
852 CircuitAction::HandleControl(ctrl) => ControlHandler::new(self)
853 .handle_msg(ctrl)?
854 .map(RunOnceCmd::Single),
855 CircuitAction::HandleCell { leg, cell } => {
856 let circ = self
857 .circuits
858 .leg_mut(leg)
859 .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
860
861 let circ_cmds = circ.handle_cell(&mut self.cell_handlers, leg, cell)?;
862 if circ_cmds.is_empty() {
863 None
864 } else {
865 // TODO: we return RunOnceCmd::Multiple even if there's a single command.
866 //
867 // See the TODO on `Circuit::handle_cell`.
868 let cmd = RunOnceCmd::Multiple(
869 circ_cmds
870 .into_iter()
871 .map(|cmd| RunOnceCmdInner::from_circuit_cmd(leg, cmd))
872 .collect(),
873 );
874
875 Some(cmd)
876 }
877 }
878 CircuitAction::RemoveLeg { leg, reason } => {
879 Some(RunOnceCmdInner::RemoveLeg { leg, reason }.into())
880 }
881 CircuitAction::PaddingAction {
882 leg,
883 padding_action,
884 } => {
885 cfg_if! {
886 if #[cfg(feature = "circ-padding")] {
887 Some(RunOnceCmdInner::PaddingAction { leg, padding_action }.into())
888 } else {
889 // If padding isn't enabled, we never generate a padding action,
890 // so we can be sure this case will never be called.
891 void::unreachable(padding_action.0);
892 }
893 }
894 }
895 };
896
897 if let Some(cmd) = cmd {
898 self.handle_run_once_cmd(cmd).await?;
899 }
900 }
901
902 Ok(())
903 }
904
905 /// Try to process the previously-out-of-order messages we might have buffered.
906 #[cfg(feature = "conflux")]
907 #[instrument(level = "trace", skip_all)]
908 async fn try_dequeue_ooo_msgs(&mut self) -> StdResult<(), ReactorError> {
909 // Check if we're ready to dequeue any of the previously out-of-order cells.
910 while let Some(entry) = self.ooo_msgs.peek() {
911 let should_pop = self.circuits.is_seqno_in_order(entry.msg.seqno);
912
913 if !should_pop {
914 break;
915 }
916
917 let entry = self.ooo_msgs.pop().expect("item just disappeared?!");
918
919 let circ = self
920 .circuits
921 .leg_mut(entry.leg_id)
922 .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
923 let handlers = &mut self.cell_handlers;
924 let cmd = circ
925 .handle_in_order_relay_msg(
926 handlers,
927 entry.msg.hopnum,
928 entry.leg_id,
929 entry.msg.cell_counts_towards_windows,
930 entry.msg.streamid,
931 entry.msg.msg,
932 )?
933 .map(|cmd| {
934 RunOnceCmd::Single(RunOnceCmdInner::from_circuit_cmd(entry.leg_id, cmd))
935 });
936
937 if let Some(cmd) = cmd {
938 self.handle_run_once_cmd(cmd).await?;
939 }
940 }
941
942 Ok(())
943 }
944
945 /// Handle a [`RunOnceCmd`].
946 #[instrument(level = "trace", skip_all)]
947 async fn handle_run_once_cmd(&mut self, cmd: RunOnceCmd) -> StdResult<(), ReactorError> {
948 match cmd {
949 RunOnceCmd::Single(cmd) => return self.handle_single_run_once_cmd(cmd).await,
950 RunOnceCmd::Multiple(cmds) => {
951 // While we know `sendable` is ready to accept *one* cell,
952 // we can't be certain it will be able to accept *all* of the cells
953 // that need to be sent here. This means we *may* end up buffering
954 // in its underlying SometimesUnboundedSink! That is OK, because
955 // RunOnceCmd::Multiple is only used for handling packed cells.
956 for cmd in cmds {
957 self.handle_single_run_once_cmd(cmd).await?;
958 }
959 }
960 }
961
962 Ok(())
963 }
964
965 /// Handle a [`RunOnceCmd`].
966 #[instrument(level = "trace", skip_all)]
967 async fn handle_single_run_once_cmd(
968 &mut self,
969 cmd: RunOnceCmdInner,
970 ) -> StdResult<(), ReactorError> {
971 match cmd {
972 RunOnceCmdInner::Send { leg, cell, done } => {
973 // TODO: check the cc window
974 let res = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
975 if let Some(done) = done {
976 // Don't care if the receiver goes away
977 let _ = done.send(res.clone());
978 }
979 res?;
980 }
981 #[cfg(feature = "send-control-msg")]
982 RunOnceCmdInner::SendMsgAndInstallHandler { msg, handler, done } => {
983 let cell: Result<Option<SendRelayCell>> =
984 self.prepare_msg_and_install_handler(msg, handler);
985
986 match cell {
987 Ok(Some(cell)) => {
988 // TODO(conflux): let the RunOnceCmdInner specify which leg to send the cell on
989 let outcome = self.circuits.send_relay_cell_on_leg(cell, None).await;
990 // don't care if receiver goes away.
991 let _ = done.send(outcome.clone());
992 outcome?;
993 }
994 Ok(None) => {
995 // don't care if receiver goes away.
996 let _ = done.send(Ok(()));
997 }
998 Err(e) => {
999 // don't care if receiver goes away.
1000 let _ = done.send(Err(e.clone()));
1001 return Err(e.into());
1002 }
1003 }
1004 }
1005 RunOnceCmdInner::BeginStream {
1006 leg,
1007 cell,
1008 hop,
1009 done,
1010 } => {
1011 match cell {
1012 Ok((cell, stream_id)) => {
1013 let circ = self
1014 .circuits
1015 .leg_mut(leg)
1016 .ok_or_else(|| internal!("leg disappeared?!"))?;
1017 let cell_hop = cell.hop.expect("missing hop in client SendRelayCell?!");
1018 let relay_format = circ
1019 .hop_mut(cell_hop)
1020 // TODO: Is this the right error type here? Or should there be a "HopDisappeared"?
1021 .ok_or(Error::NoSuchHop)?
1022 .relay_cell_format();
1023
1024 let outcome = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
1025 // don't care if receiver goes away.
1026 let _ = done.send(outcome.clone().map(|_| (stream_id, hop, relay_format)));
1027 outcome?;
1028 }
1029 Err(e) => {
1030 // don't care if receiver goes away.
1031 let _ = done.send(Err(e.clone()));
1032 return Err(e.into());
1033 }
1034 }
1035 }
1036 RunOnceCmdInner::CloseStream {
1037 hop,
1038 sid,
1039 behav,
1040 reason,
1041 done,
1042 } => {
1043 let result = {
1044 let (leg_id, hop_num) = self
1045 .resolve_hop_location(hop)
1046 .map_err(into_bad_api_usage!("Could not resolve {hop:?}"))?;
1047 let leg = self
1048 .circuits
1049 .leg_mut(leg_id)
1050 .ok_or(bad_api_usage!("No leg for id {:?}", leg_id))?;
1051 Ok::<_, Bug>((leg, hop_num))
1052 };
1053
1054 let (leg, hop_num) = match result {
1055 Ok(x) => x,
1056 Err(e) => {
1057 if let Some(done) = done {
1058 // don't care if the sender goes away
1059 let e = into_bad_api_usage!("Could not resolve {hop:?}")(e);
1060 let _ = done.send(Err(e.into()));
1061 }
1062 return Ok(());
1063 }
1064 };
1065
1066 let max_rtt = {
1067 let hop = leg
1068 .hop(hop_num)
1069 .ok_or_else(|| internal!("the hop we resolved disappeared?!"))?;
1070 let ccontrol = hop.ccontrol();
1071
1072 // Note: if we have no measurements for the RTT, this will be set to 0,
1073 // and the timeout will be 2 * CBT.
1074 ccontrol
1075 .rtt()
1076 .max_rtt_usec()
1077 .map(|rtt| Duration::from_millis(u64::from(rtt)))
1078 .unwrap_or_default()
1079 };
1080
1081 // The length of the circuit up until the hop that has the half-streeam.
1082 //
1083 // +1, because HopNums are zero-based.
1084 let circ_len = usize::from(hop_num) + 1;
1085
1086 // We double the CBT to account for rend circuits,
1087 // which are twice as long (otherwise we risk expiring
1088 // the rend half-streams too soon).
1089 let timeout = std::cmp::max(max_rtt, 2 * leg.estimate_cbt(circ_len));
1090 let expire_at = self.runtime.now() + timeout;
1091
1092 let res: Result<()> = leg
1093 .close_stream(hop_num, sid, behav, reason, expire_at)
1094 .await;
1095
1096 if let Some(done) = done {
1097 // don't care if the sender goes away
1098 let _ = done.send(res);
1099 }
1100 }
1101 RunOnceCmdInner::MaybeSendXon {
1102 rate,
1103 stream_id,
1104 hop,
1105 } => {
1106 let (leg_id, hop_num) = match self.resolve_hop_location(hop) {
1107 Ok(x) => x,
1108 Err(NoJoinPointError) => {
1109 // A stream tried to send an XON message message to the join point of
1110 // a tunnel that has never had a join point. Currently in arti, only a
1111 // `StreamTarget` asks us to send an XON message, and this tunnel
1112 // originally created the `StreamTarget` to begin with. So this is a
1113 // legitimate bug somewhere in the tunnel code.
1114 return Err(
1115 internal!(
1116 "Could not send an XON message to a join point on a tunnel without a join point",
1117 )
1118 .into()
1119 );
1120 }
1121 };
1122
1123 let Some(leg) = self.circuits.leg_mut(leg_id) else {
1124 // The leg has disappeared. This is fine since the stream may have ended and
1125 // been cleaned up while this `CtrlMsg::MaybeSendXon` message was queued.
1126 // It is possible that is a bug and this is an incorrect leg number, but
1127 // it's not currently possible to differentiate between an incorrect leg
1128 // number and a tunnel leg that has been closed.
1129 debug!("Could not send an XON message on a leg that does not exist. Ignoring.");
1130 return Ok(());
1131 };
1132
1133 let Some(hop) = leg.hop_mut(hop_num) else {
1134 // The hop has disappeared. This is fine since the circuit may have been
1135 // been truncated while the `CtrlMsg::MaybeSendXon` message was queued.
1136 // It is possible that is a bug and this is an incorrect hop number, but
1137 // it's not currently possible to differentiate between an incorrect hop
1138 // number and a circuit hop that has been removed.
1139 debug!("Could not send an XON message on a hop that does not exist. Ignoring.");
1140 return Ok(());
1141 };
1142
1143 let Some(msg) = hop.maybe_send_xon(rate, stream_id)? else {
1144 // Nothing to do.
1145 return Ok(());
1146 };
1147
1148 let cell = AnyRelayMsgOuter::new(Some(stream_id), msg.into());
1149 let cell = SendRelayCell {
1150 hop: Some(hop_num),
1151 early: false,
1152 cell,
1153 };
1154
1155 leg.send_relay_cell(cell).await?;
1156 }
1157 RunOnceCmdInner::HandleSendMe { leg, hop, sendme } => {
1158 let leg = self
1159 .circuits
1160 .leg_mut(leg)
1161 .ok_or_else(|| internal!("leg disappeared?!"))?;
1162 // NOTE: it's okay to await. We are only awaiting on the congestion_signals
1163 // future which *should* resolve immediately
1164 let signals = leg.chan_sender.congestion_signals().await;
1165 leg.handle_sendme(hop, sendme, signals)?;
1166 }
1167 RunOnceCmdInner::FirstHopClockSkew { answer } => {
1168 let res = self.circuits.single_leg_mut().map(|leg| leg.clock_skew());
1169
1170 // don't care if the sender goes away
1171 let _ = answer.send(res.map_err(Into::into));
1172 }
1173 RunOnceCmdInner::CleanShutdown => {
1174 trace!(tunnel_id = %self.tunnel_id, "reactor shutdown due to handled cell");
1175 return Err(ReactorError::Shutdown);
1176 }
1177 RunOnceCmdInner::RemoveLeg { leg, reason } => {
1178 warn!(tunnel_id = %self.tunnel_id, reason = %reason, "removing circuit leg");
1179
1180 let circ = self.circuits.remove(leg)?;
1181 let is_conflux_pending = circ.is_conflux_pending();
1182
1183 // Drop the removed leg. This will cause it to close if it's not already closed.
1184 drop(circ);
1185
1186 // If we reach this point, it means we have more than one leg
1187 // (otherwise the .remove() would've returned a Shutdown error),
1188 // so we expect there to be a ConfluxHandshakeContext installed.
1189
1190 #[cfg(feature = "conflux")]
1191 if is_conflux_pending {
1192 let (error, proto_violation): (_, Option<Error>) = match &reason {
1193 RemoveLegReason::ConfluxHandshakeTimeout => {
1194 (ConfluxHandshakeError::Timeout, None)
1195 }
1196 RemoveLegReason::ConfluxHandshakeErr(e) => {
1197 (ConfluxHandshakeError::Link(e.clone()), Some(e.clone()))
1198 }
1199 RemoveLegReason::ChannelClosed => {
1200 (ConfluxHandshakeError::ChannelClosed, None)
1201 }
1202 };
1203
1204 self.note_conflux_handshake_result(Err(error), proto_violation.is_some())?;
1205
1206 if let Some(e) = proto_violation {
1207 tor_error::warn_report!(
1208 e,
1209 tunnel_id = %self.tunnel_id,
1210 "Malformed conflux handshake, tearing down tunnel",
1211 );
1212
1213 return Err(e.into());
1214 }
1215 }
1216 }
1217 #[cfg(feature = "conflux")]
1218 RunOnceCmdInner::ConfluxHandshakeComplete { leg, cell } => {
1219 // Note: on the client-side, the handshake is considered complete once the
1220 // RELAY_CONFLUX_LINKED_ACK is sent (roughly upon receipt of the LINKED cell).
1221 //
1222 // We're optimistic here, and declare the handshake a success *before*
1223 // sending the LINKED_ACK response. I think this is OK though,
1224 // because if the send_relay_cell() below fails, the reactor will shut
1225 // down anyway. OTOH, marking the handshake as complete slightly early
1226 // means that on the happy path, the circuit is marked as usable sooner,
1227 // instead of blocking on the sending of the LINKED_ACK.
1228 self.note_conflux_handshake_result(Ok(()), false)?;
1229
1230 let res = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
1231
1232 res?;
1233 }
1234 #[cfg(feature = "conflux")]
1235 RunOnceCmdInner::Link { circuits, answer } => {
1236 // Add the specified circuits to our conflux set,
1237 // and send a LINK cell down each unlinked leg.
1238 //
1239 // NOTE: this will block the reactor until all the cells are sent.
1240 self.handle_link_circuits(circuits, answer).await?;
1241 }
1242 #[cfg(feature = "conflux")]
1243 RunOnceCmdInner::Enqueue { leg, msg } => {
1244 let entry = ConfluxHeapEntry { leg_id: leg, msg };
1245 self.ooo_msgs.push(entry);
1246 }
1247 #[cfg(feature = "circ-padding")]
1248 RunOnceCmdInner::PaddingAction {
1249 leg,
1250 padding_action,
1251 } => {
1252 // TODO: If we someday move back to having a per-circuit reactor,
1253 // this action would logically belong there, not on the tunnel reactor.
1254 self.circuits
1255 .run_padding_action(leg, padding_action)
1256 .await?;
1257 }
1258 }
1259
1260 Ok(())
1261 }
1262
1263 /// Wait for a [`CtrlMsg::Create`] to come along to set up the circuit.
1264 ///
1265 /// Returns an error if an unexpected `CtrlMsg` is received.
1266 #[instrument(level = "trace", skip_all)]
1267 async fn wait_for_create(&mut self) -> StdResult<(), ReactorError> {
1268 let msg = select_biased! {
1269 res = self.command.next() => {
1270 let cmd = unwrap_or_shutdown!(self, res, "shutdown channel drop")?;
1271 match cmd {
1272 CtrlCmd::Shutdown => return self.handle_shutdown().map(|_| ()),
1273 #[cfg(test)]
1274 CtrlCmd::AddFakeHop {
1275 relay_cell_format: format,
1276 fwd_lasthop,
1277 rev_lasthop,
1278 peer_id,
1279 params,
1280 done,
1281 } => {
1282 let leg = self.circuits.single_leg_mut()?;
1283 leg.handle_add_fake_hop(format, fwd_lasthop, rev_lasthop, peer_id, ¶ms, done);
1284 return Ok(())
1285 },
1286 _ => {
1287 trace!("reactor shutdown due to unexpected command: {:?}", cmd);
1288 return Err(Error::CircProto(format!("Unexpected control {cmd:?} on client circuit")).into());
1289 }
1290 }
1291 },
1292 res = self.control.next() => unwrap_or_shutdown!(self, res, "control drop")?,
1293 };
1294
1295 match msg {
1296 CtrlMsg::Create {
1297 recv_created,
1298 handshake,
1299 settings,
1300 done,
1301 } => {
1302 // TODO(conflux): instead of crashing the reactor, it might be better
1303 // to send the error via the done channel instead
1304 let leg = self.circuits.single_leg_mut()?;
1305 leg.handle_create(recv_created, handshake, settings, done)
1306 .await
1307 }
1308 _ => {
1309 trace!("reactor shutdown due to unexpected cell: {:?}", msg);
1310
1311 Err(Error::CircProto(format!("Unexpected {msg:?} cell on client circuit")).into())
1312 }
1313 }
1314 }
1315
1316 /// Add the specified handshake result to our `ConfluxHandshakeContext`.
1317 ///
1318 /// If all the circuits we were waiting on have finished the conflux handshake,
1319 /// the `ConfluxHandshakeContext` is consumed, and the results we have collected
1320 /// are sent to the handshake initiator.
1321 #[cfg(feature = "conflux")]
1322 #[instrument(level = "trace", skip_all)]
1323 fn note_conflux_handshake_result(
1324 &mut self,
1325 res: StdResult<(), ConfluxHandshakeError>,
1326 reactor_is_closing: bool,
1327 ) -> StdResult<(), ReactorError> {
1328 let tunnel_complete = match self.conflux_hs_ctx.as_mut() {
1329 Some(conflux_ctx) => {
1330 conflux_ctx.results.push(res);
1331 // Whether all the legs have finished linking:
1332 conflux_ctx.results.len() == conflux_ctx.num_legs
1333 }
1334 None => {
1335 return Err(internal!("no conflux handshake context").into());
1336 }
1337 };
1338
1339 if tunnel_complete || reactor_is_closing {
1340 // Time to remove the conflux handshake context
1341 // and extract the results we have collected
1342 let conflux_ctx = self.conflux_hs_ctx.take().expect("context disappeared?!");
1343
1344 let success_count = conflux_ctx.results.iter().filter(|res| res.is_ok()).count();
1345 let leg_count = conflux_ctx.results.len();
1346
1347 info!(
1348 tunnel_id = %self.tunnel_id,
1349 "conflux tunnel ready ({success_count}/{leg_count} circuits successfully linked)",
1350 );
1351
1352 send_conflux_outcome(conflux_ctx.answer, Ok(conflux_ctx.results))?;
1353
1354 // We don't expect to receive any more handshake results,
1355 // at least not until we get another LinkCircuits control message,
1356 // which will install a new ConfluxHandshakeCtx with a channel
1357 // for us to send updates on
1358 }
1359
1360 Ok(())
1361 }
1362
1363 /// Prepare a `SendRelayCell` request, and install the given meta-cell handler.
1364 fn prepare_msg_and_install_handler(
1365 &mut self,
1366 msg: Option<AnyRelayMsgOuter>,
1367 handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
1368 ) -> Result<Option<SendRelayCell>> {
1369 let msg = msg
1370 .map(|msg| {
1371 let handlers = &mut self.cell_handlers;
1372 let handler = handler
1373 .as_ref()
1374 .or(handlers.meta_handler.as_ref())
1375 .ok_or_else(|| internal!("tried to use an ended Conversation"))?;
1376 // We should always have a precise HopLocation here so this should never fails but
1377 // in case we have a ::JointPoint, we'll notice.
1378 let hop = handler.expected_hop().hop_num().ok_or(bad_api_usage!(
1379 "MsgHandler doesn't have a precise HopLocation"
1380 ))?;
1381 Ok::<_, crate::Error>(SendRelayCell {
1382 hop: Some(hop),
1383 early: false,
1384 cell: msg,
1385 })
1386 })
1387 .transpose()?;
1388
1389 if let Some(handler) = handler {
1390 self.cell_handlers.set_meta_handler(handler)?;
1391 }
1392
1393 Ok(msg)
1394 }
1395
1396 /// Handle a shutdown request.
1397 fn handle_shutdown(&self) -> StdResult<Option<RunOnceCmdInner>, ReactorError> {
1398 trace!(
1399 tunnel_id = %self.tunnel_id,
1400 "reactor shutdown due to explicit request",
1401 );
1402
1403 Err(ReactorError::Shutdown)
1404 }
1405
1406 /// Handle a request to shutdown the reactor and return the only [`Circuit`] in this tunnel.
1407 ///
1408 /// Returns an error over the `answer` channel if the reactor has no circuits,
1409 /// or more than one circuit. The reactor will shut down regardless.
1410 #[cfg(feature = "conflux")]
1411 fn handle_shutdown_and_return_circuit(
1412 &mut self,
1413 answer: oneshot::Sender<StdResult<Circuit, Bug>>,
1414 ) -> StdResult<(), ReactorError> {
1415 // Don't care if the receiver goes away
1416 let _ = answer.send(self.circuits.take_single_leg());
1417 self.handle_shutdown().map(|_| ())
1418 }
1419
1420 /// Resolves a [`TargetHop`] to a [`HopLocation`].
1421 ///
1422 /// After resolving a `TargetHop::LastHop`,
1423 /// the `HopLocation` can become stale if a single-path circuit is later extended or truncated.
1424 /// This means that the `HopLocation` can become stale from one reactor iteration to the next.
1425 ///
1426 /// It's generally okay to hold on to a (possibly stale) `HopLocation`
1427 /// if you need a fixed hop position in the tunnel.
1428 /// For example if we open a stream to `TargetHop::LastHop`,
1429 /// we would want to store the stream position as a `HopLocation` and not a `TargetHop::LastHop`
1430 /// as we don't want the stream position to change as the tunnel is extended or truncated.
1431 ///
1432 /// Returns [`NoHopsBuiltError`] if trying to resolve `TargetHop::LastHop`
1433 /// and the tunnel has no hops
1434 /// (either has no legs, or has legs which contain no hops).
1435 fn resolve_target_hop(&self, hop: TargetHop) -> StdResult<HopLocation, NoHopsBuiltError> {
1436 match hop {
1437 TargetHop::Hop(hop) => Ok(hop),
1438 TargetHop::LastHop => {
1439 if let Ok(leg) = self.circuits.single_leg() {
1440 let leg_id = leg.unique_id();
1441 // single-path tunnel
1442 let hop = leg.last_hop_num().ok_or(NoHopsBuiltError)?;
1443 Ok(HopLocation::Hop((leg_id, hop)))
1444 } else if !self.circuits.is_empty() {
1445 // multi-path tunnel
1446 Ok(HopLocation::JoinPoint)
1447 } else {
1448 // no legs
1449 Err(NoHopsBuiltError)
1450 }
1451 }
1452 }
1453 }
1454
1455 /// Resolves a [`HopLocation`] to a [`UniqId`] and [`HopNum`].
1456 ///
1457 /// After resolving a `HopLocation::JoinPoint`,
1458 /// the [`UniqId`] and [`HopNum`] can become stale if the primary leg changes.
1459 ///
1460 /// You should try to only resolve to a specific [`UniqId`] and [`HopNum`] immediately before you
1461 /// need them,
1462 /// and you should not hold on to the resolved [`UniqId`] and [`HopNum`] between reactor
1463 /// iterations as the primary leg may change from one iteration to the next.
1464 ///
1465 /// Returns [`NoJoinPointError`] if trying to resolve `HopLocation::JoinPoint`
1466 /// but it does not have a join point.
1467 #[instrument(level = "trace", skip_all)]
1468 fn resolve_hop_location(
1469 &self,
1470 hop: HopLocation,
1471 ) -> StdResult<(UniqId, HopNum), NoJoinPointError> {
1472 match hop {
1473 HopLocation::Hop((leg_id, hop_num)) => Ok((leg_id, hop_num)),
1474 HopLocation::JoinPoint => {
1475 if let Some((leg_id, hop_num)) = self.circuits.primary_join_point() {
1476 Ok((leg_id, hop_num))
1477 } else {
1478 // Attempted to get the join point of a non-multipath tunnel.
1479 Err(NoJoinPointError)
1480 }
1481 }
1482 }
1483 }
1484
1485 /// Resolve a [`TargetHop`] directly into a [`UniqId`] and [`HopNum`].
1486 ///
1487 /// This is a helper function that basically calls both resolve_target_hop and
1488 /// resolve_hop_location back to back.
1489 ///
1490 /// It returns None on failure to resolve meaning that if you want more detailed error on why
1491 /// it failed, explicitly use the resolve_hop_location() and resolve_target_hop() functions.
1492 pub(crate) fn target_hop_to_hopnum_id(&self, hop: TargetHop) -> Option<(UniqId, HopNum)> {
1493 self.resolve_target_hop(hop)
1494 .ok()
1495 .and_then(|resolved| self.resolve_hop_location(resolved).ok())
1496 }
1497
1498 /// Install or remove a padder at a given hop.
1499 #[cfg(feature = "circ-padding-manual")]
1500 fn set_padding_at_hop(
1501 &self,
1502 hop: HopLocation,
1503 padder: Option<super::circuit::padding::CircuitPadder>,
1504 ) -> Result<()> {
1505 let HopLocation::Hop((leg_id, hop_num)) = hop else {
1506 return Err(bad_api_usage!("Padding to the join point is not supported.").into());
1507 };
1508 let circ = self.circuits.leg(leg_id).ok_or(Error::NoSuchHop)?;
1509 circ.set_padding_at_hop(hop_num, padder)?;
1510 Ok(())
1511 }
1512
1513 /// Does congestion control use stream SENDMEs for the given hop?
1514 ///
1515 /// Returns `None` if either the `leg` or `hop` don't exist.
1516 fn uses_stream_sendme(&self, leg: UniqId, hop: HopNum) -> Option<bool> {
1517 self.circuits.uses_stream_sendme(leg, hop)
1518 }
1519
1520 /// Handle a request to link some extra circuits in the reactor's conflux set.
1521 ///
1522 /// The circuits are validated, and if they do not have the same length,
1523 /// or if they do not all have the same last hop, an error is returned on
1524 /// the `answer` channel, and the conflux handshake is *not* initiated.
1525 ///
1526 /// If validation succeeds, the circuits are added to this reactor's conflux set,
1527 /// and the conflux handshake is initiated (by sending a LINK cell on each leg).
1528 ///
1529 /// NOTE: this blocks the reactor main loop until all the cells are sent.
1530 #[cfg(feature = "conflux")]
1531 #[instrument(level = "trace", skip_all)]
1532 async fn handle_link_circuits(
1533 &mut self,
1534 circuits: Vec<Circuit>,
1535 answer: ConfluxLinkResultChannel,
1536 ) -> StdResult<(), ReactorError> {
1537 use tor_error::warn_report;
1538
1539 if self.conflux_hs_ctx.is_some() {
1540 let err = internal!("conflux linking already in progress");
1541 send_conflux_outcome(answer, Err(err.into()))?;
1542
1543 return Ok(());
1544 }
1545
1546 let unlinked_legs = self.circuits.num_unlinked();
1547
1548 // We need to send the LINK cell on each of the new circuits
1549 // and on each of the existing, unlinked legs from self.circuits.
1550 //
1551 // In reality, there can only be one such circuit
1552 // (the "initial" one from the previously single-path tunnel),
1553 // because any circuits that to complete the conflux handshake
1554 // get removed from the set.
1555 let num_legs = circuits.len() + unlinked_legs;
1556
1557 // Note: add_legs validates `circuits`
1558 let res = async {
1559 self.circuits.add_legs(circuits, &self.runtime)?;
1560 self.circuits.link_circuits(&self.runtime).await
1561 }
1562 .await;
1563
1564 if let Err(e) = res {
1565 warn_report!(e, "Failed to link conflux circuits");
1566
1567 send_conflux_outcome(answer, Err(e))?;
1568 } else {
1569 // Save the channel, to notify the user of completion.
1570 self.conflux_hs_ctx = Some(ConfluxHandshakeCtx {
1571 answer,
1572 num_legs,
1573 results: Default::default(),
1574 });
1575 }
1576
1577 Ok(())
1578 }
1579}
1580
1581/// Notify the conflux handshake initiator of the handshake outcome.
1582///
1583/// Returns an error if the initiator has done away.
1584#[cfg(feature = "conflux")]
1585fn send_conflux_outcome(
1586 tx: ConfluxLinkResultChannel,
1587 res: Result<ConfluxHandshakeResult>,
1588) -> StdResult<(), ReactorError> {
1589 if tx.send(res).is_err() {
1590 tracing::warn!("conflux initiator went away before handshake completed?");
1591 return Err(ReactorError::Shutdown);
1592 }
1593
1594 Ok(())
1595}
1596
1597/// The tunnel does not have any hops.
1598#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
1599#[non_exhaustive]
1600#[error("no hops have been built for this tunnel")]
1601pub(crate) struct NoHopsBuiltError;
1602
1603/// The tunnel does not have a join point.
1604#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
1605#[non_exhaustive]
1606#[error("the tunnel does not have a join point")]
1607pub(crate) struct NoJoinPointError;
1608
1609impl CellHandlers {
1610 /// Try to install a given meta-cell handler to receive any unusual cells on
1611 /// this circuit, along with a result channel to notify on completion.
1612 fn set_meta_handler(&mut self, handler: Box<dyn MetaCellHandler + Send>) -> Result<()> {
1613 if self.meta_handler.is_none() {
1614 self.meta_handler = Some(handler);
1615 Ok(())
1616 } else {
1617 Err(Error::from(internal!(
1618 "Tried to install a meta-cell handler before the old one was gone."
1619 )))
1620 }
1621 }
1622
1623 /// Try to install a given cell handler on this circuit.
1624 #[cfg(feature = "hs-service")]
1625 fn set_incoming_stream_req_handler(
1626 &mut self,
1627 handler: IncomingStreamRequestHandler,
1628 ) -> Result<()> {
1629 if self.incoming_stream_req_handler.is_none() {
1630 self.incoming_stream_req_handler = Some(handler);
1631 Ok(())
1632 } else {
1633 Err(Error::from(internal!(
1634 "Tried to install a BEGIN cell handler before the old one was gone."
1635 )))
1636 }
1637 }
1638}
1639
1640#[cfg(test)]
1641mod test {
1642 // Tested in [`crate::client::circuit::test`].
1643}