tor_proto/client/reactor.rs
1//! Code to handle incoming cells on a circuit.
2//!
3//! ## On message validation
4//!
5//! There are three steps for validating an incoming message on a stream:
6//!
7//! 1. Is the message contextually appropriate? (e.g., no more than one
8//! `CONNECTED` message per stream.) This is handled by calling
9//! [`CmdChecker::check_msg`](crate::stream::cmdcheck::CmdChecker::check_msg).
10//! 2. Does the message comply with flow-control rules? (e.g., no more SENDMEs
11//! than we've sent data for.) This is handled within the reactor by the
12//! `StreamFlowCtrl`. For half-closed streams which don't send stream
13//! SENDMEs, an additional receive-window check is performed in the
14//! `halfstream` module.
15//! 3. Does the message have an acceptable command type, and is the message
16//! well-formed? For open streams, the streams themselves handle this check.
17//! For half-closed streams, the reactor handles it by calling
18//! `consume_checked_msg()`.
19
20pub(crate) mod circuit;
21mod conflux;
22mod control;
23
24use crate::circuit::circhop::{ReactorStreamComponents, SendRelayCell};
25use crate::circuit::{CircuitRxReceiver, UniqId};
26use crate::client::circuit::ClientCircChanMsg;
27use crate::client::circuit::padding::{PaddingController, PaddingEvent, PaddingEventStream};
28use crate::client::{HopLocation, TargetHop};
29use crate::crypto::cell::HopNum;
30use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
31use crate::memquota::CircuitAccount;
32use crate::stream::CloseStreamBehavior;
33use crate::streammap;
34use crate::tunnel::{TunnelId, TunnelScopedCircId};
35use crate::util::err::ReactorError;
36use crate::util::skew::ClockSkew;
37use crate::util::timeout::TimeoutEstimator;
38use crate::{Error, Result};
39use circuit::Circuit;
40use conflux::ConfluxSet;
41use control::ControlHandler;
42use std::cmp::Ordering;
43use std::collections::BinaryHeap;
44use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
45use tor_cell::relaycell::msg::Sendme;
46use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId, UnparsedRelayMsg};
47use tor_error::{Bug, bad_api_usage, debug_report, internal, into_bad_api_usage};
48use tor_rtcompat::{DynTimeProvider, SleepProvider};
49
50use cfg_if::cfg_if;
51use futures::StreamExt;
52use futures::channel::mpsc;
53use futures::{FutureExt as _, select_biased};
54use oneshot_fused_workaround as oneshot;
55
56use std::result::Result as StdResult;
57use std::sync::Arc;
58use std::time::Duration;
59
60use crate::channel::Channel;
61use crate::conflux::msghandler::RemoveLegReason;
62use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey};
63use circuit::CircuitCmd;
64use derive_more::From;
65use smallvec::smallvec;
66use tor_cell::chancell::CircId;
67use tor_llcrypto::pk;
68use tracing::{debug, info, instrument, trace, warn};
69
70use super::circuit::{MutableState, TunnelMutableState};
71use crate::circuit::reactor::ReactorResultChannel;
72
73#[cfg(feature = "hs-service")]
74use crate::stream::incoming::IncomingStreamRequestHandler;
75
76#[cfg(feature = "conflux")]
77use {
78 crate::conflux::msghandler::{ConfluxCmd, OooRelayMsg},
79 crate::util::err::ConfluxHandshakeError,
80};
81
82pub(super) use control::{CtrlCmd, CtrlMsg, FlowCtrlMsg};
83
84/// Contains a list of conflux handshake results.
85#[cfg(feature = "conflux")]
86pub(super) type ConfluxHandshakeResult = Vec<StdResult<(), ConfluxHandshakeError>>;
87
88/// The type of oneshot channel used to inform reactor users of the outcome
89/// of a client-side conflux handshake.
90///
91/// Contains a list of handshake results, one for each circuit that we were asked
92/// to link in the tunnel.
93#[cfg(feature = "conflux")]
94pub(super) type ConfluxLinkResultChannel = ReactorResultChannel<ConfluxHandshakeResult>;
95
96/// A handshake type, to be used when creating circuit hops.
97#[derive(Clone, Debug)]
98pub(crate) enum CircuitHandshake {
99 /// Use the CREATE_FAST handshake.
100 CreateFast,
101 /// Use the ntor handshake.
102 Ntor {
103 /// The public key of the relay.
104 public_key: NtorPublicKey,
105 /// The Ed25519 identity of the relay, which is verified against the
106 /// identity held in the circuit's channel.
107 ed_identity: pk::ed25519::Ed25519Identity,
108 },
109 /// Use the ntor-v3 handshake.
110 NtorV3 {
111 /// The public key of the relay.
112 public_key: NtorV3PublicKey,
113 },
114}
115
116// TODO: the RunOnceCmd/RunOnceCmdInner/CircuitCmd/CircuitEvent enum
117// proliferation is a bit bothersome, but unavoidable with the current design.
118//
119// We should consider getting rid of some of these enums (if possible),
120// and coming up with more intuitive names.
121
122/// One or more [`RunOnceCmdInner`] to run inside [`Reactor::run_once`].
123#[derive(From, Debug)]
124#[allow(clippy::large_enum_variant)] // TODO #2003: resolve this
125enum RunOnceCmd {
126 /// Run a single `RunOnceCmdInner` command.
127 Single(RunOnceCmdInner),
128 /// Run multiple `RunOnceCmdInner` commands.
129 //
130 // Note: this whole enum *could* be replaced with Vec<RunOnceCmdInner>,
131 // but most of the time we're only going to have *one* RunOnceCmdInner
132 // to run per run_once() loop. The enum enables us avoid the extra heap
133 // allocation for the `RunOnceCmd::Single` case.
134 Multiple(Vec<RunOnceCmdInner>),
135}
136
137/// Instructions for running something in the reactor loop.
138///
139/// Run at the end of [`Reactor::run_once`].
140//
141// TODO: many of the variants of this enum have an identical CtrlMsg counterpart.
142// We should consider making each variant a tuple variant and deduplicating the fields.
143#[derive(educe::Educe)]
144#[educe(Debug)]
145enum RunOnceCmdInner {
146 /// Send a RELAY cell.
147 Send {
148 /// The leg the cell should be sent on.
149 leg: UniqId,
150 /// The cell to send.
151 cell: SendRelayCell,
152 /// A channel for sending completion notifications.
153 done: Option<ReactorResultChannel<()>>,
154 },
155 /// Send a given control message on this circuit, and install a control-message handler to
156 /// receive responses.
157 #[cfg(feature = "send-control-msg")]
158 SendMsgAndInstallHandler {
159 /// The message to send, if any
160 msg: Option<AnyRelayMsgOuter>,
161 /// A message handler to install.
162 ///
163 /// If this is `None`, there must already be a message handler installed
164 #[educe(Debug(ignore))]
165 handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
166 /// A sender that we use to tell the caller that the message was sent
167 /// and the handler installed.
168 done: oneshot::Sender<Result<()>>,
169 },
170 /// Handle a SENDME message.
171 HandleSendMe {
172 /// The leg the SENDME was received on.
173 leg: UniqId,
174 /// The hop number.
175 hop: HopNum,
176 /// The SENDME message to handle.
177 sendme: Sendme,
178 },
179 /// Begin a stream with the provided hop in this circuit.
180 ///
181 /// Uses the provided stream ID, and sends the provided message to that hop.
182 BeginStream {
183 /// The cell to send.
184 cell: SendRelayCell,
185 /// The ID of the stream to return on the oneshot channel.
186 stream_id: StreamId,
187 /// The location of the hop on the tunnel. We don't use this (and `Circuit`s shouldn't need
188 /// to worry about legs anyways), but need it so that we can pass it back in `done` to the
189 /// caller.
190 hop: HopLocation,
191 /// The circuit leg to begin the stream on.
192 leg: UniqId,
193 /// Components that are needed to interact with the new stream.
194 stream_components: ReactorStreamComponents,
195 /// Oneshot channel to notify on completion, with the allocated stream ID.
196 done: ReactorResultChannel<(
197 StreamId,
198 HopLocation,
199 RelayCellFormat,
200 ReactorStreamComponents,
201 )>,
202 },
203 /// Consider sending an XON message with the given `rate`.
204 MaybeSendXon {
205 /// The drain rate to advertise in the XON message.
206 rate: XonKbpsEwma,
207 /// The ID of the stream to send the message on.
208 stream_id: StreamId,
209 /// The location of the hop on the tunnel.
210 hop: HopLocation,
211 },
212 /// Close the specified stream.
213 CloseStream {
214 /// The hop number.
215 hop: HopLocation,
216 /// The ID of the stream to close.
217 sid: StreamId,
218 /// The stream-closing behavior.
219 behav: CloseStreamBehavior,
220 /// The reason for closing the stream.
221 reason: streammap::TerminateReason,
222 /// A channel for sending completion notifications.
223 done: Option<ReactorResultChannel<()>>,
224 },
225 /// Get the clock skew claimed by the first hop of the circuit.
226 FirstHopClockSkew {
227 /// Oneshot channel to return the clock skew.
228 answer: oneshot::Sender<StdResult<ClockSkew, Bug>>,
229 },
230 /// Remove a circuit leg from the conflux set.
231 RemoveLeg {
232 /// The circuit leg to remove.
233 leg: UniqId,
234 /// The reason for removal.
235 ///
236 /// This is only used for conflux circuits that get removed
237 /// before the conflux handshake is complete.
238 ///
239 /// The [`RemoveLegReason`] is mapped by the reactor to a
240 /// [`ConfluxHandshakeError`] that is sent to the initiator of the
241 /// handshake to indicate the reason the handshake failed.
242 reason: RemoveLegReason,
243 },
244 /// A circuit has completed the conflux handshake,
245 /// and wants to send the specified cell.
246 ///
247 /// This is similar to [`RunOnceCmdInner::Send`],
248 /// but needs to remain a separate variant,
249 /// because in addition to instructing the reactor to send a cell,
250 /// it also notifies it that the conflux handshake is complete on the specified `leg`.
251 /// This enables the reactor to save the handshake result (`Ok(())`),
252 /// and, if there are no other legs still in the handshake phase,
253 /// send the result to the handshake initiator.
254 #[cfg(feature = "conflux")]
255 ConfluxHandshakeComplete {
256 /// The circuit leg that has completed the handshake,
257 /// This is the leg the cell should be sent on.
258 leg: UniqId,
259 /// The cell to send.
260 cell: SendRelayCell,
261 },
262 /// Send a LINK cell on each of the unlinked circuit legs in the conflux set of this reactor.
263 #[cfg(feature = "conflux")]
264 Link {
265 /// The circuits to link into the tunnel
266 #[educe(Debug(ignore))]
267 circuits: Vec<Circuit>,
268 /// Oneshot channel for notifying of conflux handshake completion.
269 answer: ConfluxLinkResultChannel,
270 },
271 /// Enqueue an out-of-order cell in ooo_msg.
272 #[cfg(feature = "conflux")]
273 Enqueue {
274 /// The leg the entry originated from.
275 leg: UniqId,
276 /// The out-of-order message.
277 msg: OooRelayMsg,
278 },
279 /// Take a padding-related event on a circuit leg.
280 #[cfg(feature = "circ-padding")]
281 PaddingAction {
282 /// The leg to event on.
283 leg: UniqId,
284 /// The event to take.
285 padding_event: PaddingEvent,
286 },
287 /// Perform a clean shutdown on this circuit.
288 CleanShutdown,
289}
290
291impl RunOnceCmdInner {
292 /// Create a [`RunOnceCmdInner`] out of a [`CircuitCmd`] and [`UniqId`].
293 fn from_circuit_cmd(leg: UniqId, cmd: CircuitCmd) -> Self {
294 match cmd {
295 CircuitCmd::Send(cell) => Self::Send {
296 leg,
297 cell,
298 done: None,
299 },
300 CircuitCmd::HandleSendMe { hop, sendme } => Self::HandleSendMe { leg, hop, sendme },
301 CircuitCmd::CloseStream {
302 hop,
303 sid,
304 behav,
305 reason,
306 } => Self::CloseStream {
307 hop: HopLocation::Hop((leg, hop)),
308 sid,
309 behav,
310 reason,
311 done: None,
312 },
313 #[cfg(feature = "conflux")]
314 CircuitCmd::Conflux(ConfluxCmd::RemoveLeg(reason)) => Self::RemoveLeg { leg, reason },
315 #[cfg(feature = "conflux")]
316 CircuitCmd::Conflux(ConfluxCmd::HandshakeComplete { hop, early, cell }) => {
317 let cell = SendRelayCell {
318 hop: Some(hop),
319 early,
320 cell,
321 };
322 Self::ConfluxHandshakeComplete { leg, cell }
323 }
324 #[cfg(feature = "conflux")]
325 CircuitCmd::Enqueue(msg) => Self::Enqueue { leg, msg },
326 CircuitCmd::CleanShutdown => Self::CleanShutdown,
327 }
328 }
329}
330
331/// A command to execute at the end of [`Reactor::run_once`].
332#[derive(From, Debug)]
333#[allow(clippy::large_enum_variant)] // TODO #2003: should we resolve this?
334enum CircuitEvent {
335 /// Run a single `CircuitCmd` command.
336 RunCmd {
337 /// The unique identifier of the circuit leg to run the command on
338 leg: UniqId,
339 /// The command to run.
340 cmd: CircuitCmd,
341 },
342 /// Handle a control message
343 HandleControl(CtrlMsg),
344 /// Handle an input message.
345 HandleCell {
346 /// The unique identifier of the circuit leg the message was received on.
347 leg: UniqId,
348 /// The message to handle.
349 cell: ClientCircChanMsg,
350 },
351 /// Remove the specified circuit leg from the conflux set.
352 ///
353 /// Returned whenever a single circuit leg needs to be removed
354 /// from the reactor's conflux set, without necessarily tearing down
355 /// the whole set or shutting down the reactor.
356 ///
357 /// Note: this event *can* cause the reactor to shut down
358 /// (and the conflux set to be closed).
359 ///
360 /// See the [`ConfluxSet::remove`] docs for more on the exact behavior of this command.
361 RemoveLeg {
362 /// The leg to remove.
363 leg: UniqId,
364 /// The reason for removal.
365 ///
366 /// This is only used for conflux circuits that get removed
367 /// before the conflux handshake is complete.
368 ///
369 /// The [`RemoveLegReason`] is mapped by the reactor to a
370 /// [`ConfluxHandshakeError`] that is sent to the initiator of the
371 /// handshake to indicate the reason the handshake failed.
372 reason: RemoveLegReason,
373 },
374 /// Take some event (blocking or unblocking a circuit, or sending padding)
375 /// based on the circuit padding backend code.
376 PaddingAction {
377 /// The leg on which to take the padding event .
378 leg: UniqId,
379 /// The event to take.
380 padding_event: PaddingEvent,
381 },
382 /// Protocol violation. This leads for now to the close of the circuit reactor. The
383 /// error causing the violation is set in err.
384 ProtoViolation {
385 /// The error that causes this protocol violation.
386 err: crate::Error,
387 },
388}
389
390impl CircuitEvent {
391 /// Return the ordering with which we should handle this event
392 /// within a list of events returned by a single call to next_circ_event().
393 ///
394 /// NOTE: Please do not make this any more complicated:
395 /// It is a consequence of a kludge that we need this sorting at all.
396 /// Assuming that eventually, we switch away from the current
397 /// poll-oriented `next_circ_event` design,
398 /// we may be able to get rid of this entirely.
399 fn order_within_batch(&self) -> u8 {
400 use CircuitEvent as CA;
401 use PaddingEvent as PE;
402 // This immediate state MUST NOT be used for events emitting cells. At the moment, it is
403 // only used by the protocol violation event which leads to a shutdown of the reactor.
404 const IMMEDIATE: u8 = 0;
405 const EARLY: u8 = 1;
406 const NORMAL: u8 = 2;
407 const LATE: u8 = 3;
408
409 // We use this ordering to move any "StartBlocking" to the _end_ of a batch and
410 // "StopBlocking" to the start.
411 //
412 // This way, we can be sure that we will handle any "send data" operations
413 // (and tell the Padder about them) _before_ we tell the Padder
414 // that we have blocked the circuit.
415 //
416 // This keeps things a bit more logical.
417 match self {
418 CA::RunCmd { .. } => NORMAL,
419 CA::HandleControl(..) => NORMAL,
420 CA::HandleCell { .. } => NORMAL,
421 CA::RemoveLeg { .. } => NORMAL,
422 #[cfg(feature = "circ-padding")]
423 CA::PaddingAction { padding_event, .. } => match padding_event {
424 PE::StopBlocking => EARLY,
425 PE::SendPadding(..) => NORMAL,
426 PE::StartBlocking(..) => LATE,
427 },
428 #[cfg(not(feature = "circ-padding"))]
429 CA::PaddingAction { .. } => NORMAL,
430 CA::ProtoViolation { .. } => IMMEDIATE,
431 }
432 }
433}
434
435/// An object that's waiting for a meta cell (one not associated with a stream) in order to make
436/// progress.
437///
438/// # Background
439///
440/// The `Reactor` can't have async functions that send and receive cells, because its job is to
441/// send and receive cells: if one of its functions tried to do that, it would just hang forever.
442///
443/// To get around this problem, the reactor can send some cells, and then make one of these
444/// `MetaCellHandler` objects, which will be run when the reply arrives.
445pub(crate) trait MetaCellHandler: Send {
446 /// The hop we're expecting the message to come from. This is compared against the hop
447 /// from which we actually receive messages, and an error is thrown if the two don't match.
448 fn expected_hop(&self) -> HopLocation;
449 /// Called when the message we were waiting for arrives.
450 ///
451 /// Gets a copy of the `Reactor` in order to do anything it likes there.
452 ///
453 /// If this function returns an error, the reactor will shut down.
454 fn handle_msg(
455 &mut self,
456 msg: UnparsedRelayMsg,
457 reactor: &mut Circuit,
458 ) -> Result<MetaCellDisposition>;
459}
460
461/// A possible successful outcome of giving a message to a [`MsgHandler`](super::msghandler::MsgHandler).
462#[derive(Debug, Clone, PartialEq)]
463#[cfg_attr(feature = "send-control-msg", visibility::make(pub))]
464#[non_exhaustive]
465pub(crate) enum MetaCellDisposition {
466 /// The message was consumed; the handler should remain installed.
467 #[cfg(feature = "send-control-msg")]
468 Consumed,
469 /// The message was consumed; the handler should be uninstalled.
470 ConversationFinished,
471 /// The message was consumed; the circuit should be closed.
472 #[cfg(feature = "send-control-msg")]
473 CloseCirc,
474 // TODO: Eventually we might want the ability to have multiple handlers
475 // installed, and to let them say "not for me, maybe for somebody else?".
476 // But right now we don't need that.
477}
478
479/// Unwrap the specified [`Option`], returning a [`ReactorError::Shutdown`] if it is `None`.
480///
481/// This is a macro instead of a function to work around borrowck errors
482/// in the select! from run_once().
483macro_rules! unwrap_or_shutdown {
484 ($self:expr, $res:expr, $reason:expr) => {{
485 match $res {
486 None => {
487 trace!(
488 tunnel_id = %$self.tunnel_id,
489 reason = %$reason,
490 "reactor shutdown"
491 );
492 Err(ReactorError::Shutdown)
493 }
494 Some(v) => Ok(v),
495 }
496 }};
497}
498
499/// Object to handle incoming cells and background tasks on a circuit
500///
501/// This type is returned when you finish a circuit; you need to spawn a
502/// new task that calls `run()` on it.
503#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
504pub struct Reactor {
505 /// Receiver for control messages for this reactor, sent by `ClientCirc` objects.
506 ///
507 /// This channel is polled in [`Reactor::run_once`], but only if the `chan_sender` sink
508 /// is ready to accept cells.
509 control: mpsc::UnboundedReceiver<CtrlMsg>,
510 /// Receiver for command messages for this reactor, sent by `ClientCirc` objects.
511 ///
512 /// This channel is polled in [`Reactor::run_once`].
513 ///
514 /// NOTE: this is a separate channel from `control`, because some messages
515 /// have higher priority and need to be handled even if the `chan_sender` is not
516 /// ready (whereas `control` messages are not read until the `chan_sender` sink
517 /// is ready to accept cells).
518 command: mpsc::UnboundedReceiver<CtrlCmd>,
519 /// A oneshot sender that is used to alert other tasks when this reactor is
520 /// finally dropped.
521 ///
522 /// It is a sender for Void because we never actually want to send anything here;
523 /// we only want to generate canceled events.
524 #[allow(dead_code)] // the only purpose of this field is to be dropped.
525 reactor_closed_tx: oneshot::Sender<void::Void>,
526 /// A set of circuits that form a tunnel.
527 ///
528 /// Contains 1 or more circuits.
529 ///
530 /// Circuits may be added to this set throughout the lifetime of the reactor.
531 ///
532 /// Sometimes, the reactor will remove circuits from this set,
533 /// for example if the `LINKED` message takes too long to arrive,
534 /// or if congestion control negotiation fails.
535 /// The reactor will continue running with the remaining circuits.
536 /// It will shut down if *all* the circuits are removed.
537 ///
538 // TODO(conflux): document all the reasons why the reactor might
539 // chose to tear down a circuit or tunnel (timeouts, protocol violations, etc.)
540 circuits: ConfluxSet,
541 /// An identifier for logging about this tunnel reactor.
542 tunnel_id: TunnelId,
543 /// Handlers, shared with `Circuit`.
544 cell_handlers: CellHandlers,
545 /// The time provider, used for conflux handshake timeouts.
546 runtime: DynTimeProvider,
547 /// The conflux handshake context, if there is an on-going handshake.
548 ///
549 /// Set to `None` if this is a single-path tunnel,
550 /// or if none of the circuit legs from our conflux set
551 /// are currently in the conflux handshake phase.
552 #[cfg(feature = "conflux")]
553 conflux_hs_ctx: Option<ConfluxHandshakeCtx>,
554 /// A min-heap buffering all the out-of-order messages received so far.
555 ///
556 /// TODO(conflux): this becomes a DoS vector unless we impose a limit
557 /// on its size. We should make this participate in the memquota memory
558 /// tracking system, somehow.
559 #[cfg(feature = "conflux")]
560 ooo_msgs: BinaryHeap<ConfluxHeapEntry>,
561}
562
563/// The context for an on-going conflux handshake.
564#[cfg(feature = "conflux")]
565struct ConfluxHandshakeCtx {
566 /// A channel for notifying the caller of the outcome of a CONFLUX_LINK request.
567 answer: ConfluxLinkResultChannel,
568 /// The number of legs that are currently doing the handshake.
569 num_legs: usize,
570 /// The handshake results we have collected so far.
571 results: ConfluxHandshakeResult,
572}
573
574/// An out-of-order message buffered in [`Reactor::ooo_msgs`].
575#[derive(Debug)]
576#[cfg(feature = "conflux")]
577struct ConfluxHeapEntry {
578 /// The leg id this message came from.
579 leg_id: UniqId,
580 /// The out of order message
581 msg: OooRelayMsg,
582}
583
584#[cfg(feature = "conflux")]
585impl Ord for ConfluxHeapEntry {
586 fn cmp(&self, other: &Self) -> Ordering {
587 self.msg.cmp(&other.msg)
588 }
589}
590
591#[cfg(feature = "conflux")]
592impl PartialOrd for ConfluxHeapEntry {
593 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
594 Some(self.cmp(other))
595 }
596}
597
598#[cfg(feature = "conflux")]
599impl PartialEq for ConfluxHeapEntry {
600 fn eq(&self, other: &Self) -> bool {
601 self.msg == other.msg
602 }
603}
604
605#[cfg(feature = "conflux")]
606impl Eq for ConfluxHeapEntry {}
607
608/// Cell handlers, shared between the Reactor and its underlying `Circuit`s.
609struct CellHandlers {
610 /// A handler for a meta cell, together with a result channel to notify on completion.
611 ///
612 /// NOTE(prop349): this is part of Arti's "Base Circuit Hop Handler".
613 ///
614 /// Upon sending an EXTEND cell, the [`ControlHandler`] sets this handler
615 /// to [`CircuitExtender`](circuit::extender::CircuitExtender).
616 /// The handler is then used in [`Circuit::handle_meta_cell`] for handling
617 /// all the meta cells received on the circuit that are not SENDMEs or TRUNCATE
618 /// (which are handled separately) or conflux cells
619 /// (which are handled by the conflux handlers).
620 ///
621 /// The handler is uninstalled after the receipt of the EXTENDED cell,
622 /// so any subsequent EXTENDED cells will cause the circuit to be torn down.
623 meta_handler: Option<Box<dyn MetaCellHandler + Send>>,
624 /// A handler for incoming stream requests.
625 #[cfg(feature = "hs-service")]
626 incoming_stream_req_handler: Option<IncomingStreamRequestHandler>,
627}
628
629impl Reactor {
630 /// Create a new circuit reactor.
631 ///
632 /// The reactor will send outbound messages on `channel`, receive incoming
633 /// messages on `input`, and identify this circuit by the channel-local
634 /// [`CircId`] provided.
635 ///
636 /// The internal unique identifier for this circuit will be `unique_id`.
637 #[allow(clippy::type_complexity, clippy::too_many_arguments)] // TODO
638 pub(super) fn new(
639 channel: Arc<Channel>,
640 channel_id: CircId,
641 unique_id: UniqId,
642 input: CircuitRxReceiver,
643 runtime: DynTimeProvider,
644 memquota: CircuitAccount,
645 padding_ctrl: PaddingController,
646 padding_stream: PaddingEventStream,
647 timeouts: Arc<dyn TimeoutEstimator + Send>,
648 ) -> (
649 Self,
650 mpsc::UnboundedSender<CtrlMsg>,
651 mpsc::UnboundedSender<CtrlCmd>,
652 oneshot::Receiver<void::Void>,
653 Arc<TunnelMutableState>,
654 ) {
655 let tunnel_id = TunnelId::next();
656 let (control_tx, control_rx) = mpsc::unbounded();
657 let (command_tx, command_rx) = mpsc::unbounded();
658 let mutable = Arc::new(MutableState::default());
659
660 let (reactor_closed_tx, reactor_closed_rx) = oneshot::channel();
661
662 let cell_handlers = CellHandlers {
663 meta_handler: None,
664 #[cfg(feature = "hs-service")]
665 incoming_stream_req_handler: None,
666 };
667
668 let unique_id = TunnelScopedCircId::new(tunnel_id, unique_id);
669 let circuit_leg = Circuit::new(
670 runtime.clone(),
671 channel,
672 channel_id,
673 unique_id,
674 input,
675 memquota,
676 Arc::clone(&mutable),
677 padding_ctrl,
678 padding_stream,
679 timeouts,
680 );
681
682 let (circuits, mutable) = ConfluxSet::new(tunnel_id, circuit_leg);
683
684 let reactor = Reactor {
685 circuits,
686 control: control_rx,
687 command: command_rx,
688 reactor_closed_tx,
689 tunnel_id,
690 cell_handlers,
691 runtime,
692 #[cfg(feature = "conflux")]
693 conflux_hs_ctx: None,
694 #[cfg(feature = "conflux")]
695 ooo_msgs: Default::default(),
696 };
697
698 (reactor, control_tx, command_tx, reactor_closed_rx, mutable)
699 }
700
701 /// Launch the reactor, and run until the circuit closes or we
702 /// encounter an error.
703 ///
704 /// Once this method returns, the circuit is dead and cannot be
705 /// used again.
706 #[instrument(level = "trace", skip_all)]
707 pub async fn run(mut self) -> Result<()> {
708 trace!(tunnel_id = %self.tunnel_id, "Running tunnel reactor");
709 let result: Result<()> = loop {
710 match self.run_once().await {
711 Ok(()) => (),
712 Err(ReactorError::Shutdown) => break Ok(()),
713 Err(ReactorError::Err(e)) => break Err(e),
714 }
715 };
716
717 // Log that the reactor stopped, possibly with the associated error as a report.
718 // May log at a higher level depending on the error kind.
719 const MSG: &str = "Tunnel reactor stopped";
720 match &result {
721 Ok(()) => trace!(tunnel_id = %self.tunnel_id, "{MSG}"),
722 Err(e) => debug_report!(e, tunnel_id = %self.tunnel_id, "{MSG}"),
723 }
724
725 result
726 }
727
728 /// Helper for run: doesn't mark the circuit closed on finish. Only
729 /// processes one cell or control message.
730 #[instrument(level = "trace", skip_all)]
731 async fn run_once(&mut self) -> StdResult<(), ReactorError> {
732 // If all the circuits are closed, shut down the reactor
733 if self.circuits.is_empty() {
734 trace!(
735 tunnel_id = %self.tunnel_id,
736 "Tunnel reactor shutting down: all circuits have closed",
737 );
738
739 return Err(ReactorError::Shutdown);
740 }
741
742 // If this is a single path circuit, we need to wait until the first hop
743 // is created before doing anything else
744 let single_path_with_hops = self
745 .circuits
746 .single_leg_mut()
747 .is_ok_and(|leg| !leg.has_hops());
748 if single_path_with_hops {
749 self.wait_for_create().await?;
750
751 return Ok(());
752 }
753
754 // Prioritize the buffered messages.
755 //
756 // Note: if any of the messages are ready to be handled,
757 // this will block the reactor until we are done processing them
758 //
759 // TODO circpad: If this is a problem, we might want to re-order things so that we
760 // prioritize padding instead. On the other hand, this should be fixed by refactoring
761 // circuit and tunnel reactors, so we might do well to just leave it alone for now.
762 #[cfg(feature = "conflux")]
763 self.try_dequeue_ooo_msgs().await?;
764
765 let mut events = select_biased! {
766 res = self.command.next() => {
767 let cmd = unwrap_or_shutdown!(self, res, "command channel drop")?;
768 return ControlHandler::new(self).handle_cmd(cmd);
769 },
770 // Check whether we've got a control message pending.
771 //
772 // Note: unfortunately, reading from control here means we might start
773 // handling control messages before our chan_senders are ready.
774 // With the current design, this is inevitable: we can't know which circuit leg
775 // a control message is meant for without first reading the control message from
776 // the channel, and at that point, we can't know for sure whether that particular
777 // circuit is ready for sending.
778 ret = self.control.next() => {
779 let msg = unwrap_or_shutdown!(self, ret, "control drop")?;
780 smallvec![CircuitEvent::HandleControl(msg)]
781 },
782 res = self.circuits.next_circ_event(&self.runtime).fuse() => res?,
783 };
784
785 // Put the events into the order that we need to execute them in.
786 //
787 // (Yes, this _does_ have to be a stable sort. Not all events may be freely re-ordered
788 // with respect to one another.)
789 events.sort_by_key(|a| a.order_within_batch());
790
791 for event in events {
792 let cmd = match event {
793 CircuitEvent::RunCmd { leg, cmd } => Some(RunOnceCmd::Single(
794 RunOnceCmdInner::from_circuit_cmd(leg, cmd),
795 )),
796 CircuitEvent::HandleControl(ctrl) => ControlHandler::new(self)
797 .handle_msg(ctrl)?
798 .map(RunOnceCmd::Single),
799 CircuitEvent::HandleCell { leg, cell } => {
800 let circ = self
801 .circuits
802 .leg_mut(leg)
803 .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
804
805 let circ_cmds = circ.handle_cell(&mut self.cell_handlers, leg, cell)?;
806 if circ_cmds.is_empty() {
807 None
808 } else {
809 // TODO: we return RunOnceCmd::Multiple even if there's a single command.
810 //
811 // See the TODO on `Circuit::handle_cell`.
812 let cmd = RunOnceCmd::Multiple(
813 circ_cmds
814 .into_iter()
815 .map(|cmd| RunOnceCmdInner::from_circuit_cmd(leg, cmd))
816 .collect(),
817 );
818
819 Some(cmd)
820 }
821 }
822 CircuitEvent::RemoveLeg { leg, reason } => {
823 Some(RunOnceCmdInner::RemoveLeg { leg, reason }.into())
824 }
825 CircuitEvent::PaddingAction { leg, padding_event } => {
826 cfg_if! {
827 if #[cfg(feature = "circ-padding")] {
828 Some(RunOnceCmdInner::PaddingAction { leg, padding_event }.into())
829 } else {
830 // If padding isn't enabled, we never generate a padding event,
831 // so we can be sure this case will never be called.
832 void::unreachable(padding_event.0);
833 }
834 }
835 }
836 CircuitEvent::ProtoViolation { err } => {
837 return Err(err.into());
838 }
839 };
840
841 if let Some(cmd) = cmd {
842 self.handle_run_once_cmd(cmd).await?;
843 }
844 }
845
846 Ok(())
847 }
848
849 /// Try to process the previously-out-of-order messages we might have buffered.
850 #[cfg(feature = "conflux")]
851 #[instrument(level = "trace", skip_all)]
852 async fn try_dequeue_ooo_msgs(&mut self) -> StdResult<(), ReactorError> {
853 // Check if we're ready to dequeue any of the previously out-of-order cells.
854 while let Some(entry) = self.ooo_msgs.peek() {
855 let should_pop = self.circuits.is_seqno_in_order(entry.msg.seqno);
856
857 if !should_pop {
858 break;
859 }
860
861 let entry = self.ooo_msgs.pop().expect("item just disappeared?!");
862
863 let circ = self
864 .circuits
865 .leg_mut(entry.leg_id)
866 .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
867 let handlers = &mut self.cell_handlers;
868 let cmd = circ
869 .handle_in_order_relay_msg(
870 handlers,
871 entry.msg.hopnum,
872 entry.leg_id,
873 entry.msg.cell_counts_towards_windows,
874 entry.msg.streamid,
875 entry.msg.msg,
876 )?
877 .map(|cmd| {
878 RunOnceCmd::Single(RunOnceCmdInner::from_circuit_cmd(entry.leg_id, cmd))
879 });
880
881 if let Some(cmd) = cmd {
882 self.handle_run_once_cmd(cmd).await?;
883 }
884 }
885
886 Ok(())
887 }
888
889 /// Handle a [`RunOnceCmd`].
890 #[instrument(level = "trace", skip_all)]
891 async fn handle_run_once_cmd(&mut self, cmd: RunOnceCmd) -> StdResult<(), ReactorError> {
892 match cmd {
893 RunOnceCmd::Single(cmd) => return self.handle_single_run_once_cmd(cmd).await,
894 RunOnceCmd::Multiple(cmds) => {
895 // While we know `sendable` is ready to accept *one* cell,
896 // we can't be certain it will be able to accept *all* of the cells
897 // that need to be sent here. This means we *may* end up buffering
898 // in its underlying SometimesUnboundedSink! That is OK, because
899 // RunOnceCmd::Multiple is only used for handling packed cells.
900 for cmd in cmds {
901 self.handle_single_run_once_cmd(cmd).await?;
902 }
903 }
904 }
905
906 Ok(())
907 }
908
909 /// Handle a [`RunOnceCmd`].
910 #[instrument(level = "trace", skip_all)]
911 async fn handle_single_run_once_cmd(
912 &mut self,
913 cmd: RunOnceCmdInner,
914 ) -> StdResult<(), ReactorError> {
915 match cmd {
916 RunOnceCmdInner::Send { leg, cell, done } => {
917 // TODO: check the cc window
918 let res = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
919 if let Some(done) = done {
920 // Don't care if the receiver goes away
921 let _ = done.send(res.clone());
922 }
923 res?;
924 }
925 #[cfg(feature = "send-control-msg")]
926 RunOnceCmdInner::SendMsgAndInstallHandler { msg, handler, done } => {
927 let cell: Result<Option<SendRelayCell>> =
928 self.prepare_msg_and_install_handler(msg, handler);
929
930 match cell {
931 Ok(Some(cell)) => {
932 // TODO(conflux): let the RunOnceCmdInner specify which leg to send the cell on
933 let outcome = self.circuits.send_relay_cell_on_leg(cell, None).await;
934 // don't care if receiver goes away.
935 let _ = done.send(outcome.clone());
936 outcome?;
937 }
938 Ok(None) => {
939 // don't care if receiver goes away.
940 let _ = done.send(Ok(()));
941 }
942 Err(e) => {
943 // don't care if receiver goes away.
944 let _ = done.send(Err(e.clone()));
945 return Err(e.into());
946 }
947 }
948 }
949 RunOnceCmdInner::BeginStream {
950 leg,
951 cell,
952 stream_id,
953 hop,
954 stream_components,
955 done,
956 } => {
957 let circ = self
958 .circuits
959 .leg_mut(leg)
960 .ok_or_else(|| internal!("leg disappeared?!"))?;
961 let cell_hop = cell.hop.expect("missing hop in client SendRelayCell?!");
962 let relay_format = circ
963 .hop_mut(cell_hop)
964 // TODO: Is this the right error type here? Or should there be a "HopDisappeared"?
965 .ok_or(Error::NoSuchHop)?
966 .relay_cell_format();
967
968 let outcome = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
969 // don't care if receiver goes away.
970 let _ = done.send(
971 outcome
972 .clone()
973 .map(|_| (stream_id, hop, relay_format, stream_components)),
974 );
975 outcome?;
976 }
977 RunOnceCmdInner::CloseStream {
978 hop,
979 sid,
980 behav,
981 reason,
982 done,
983 } => {
984 let result = {
985 let (leg_id, hop_num) = self
986 .resolve_hop_location(hop)
987 .map_err(into_bad_api_usage!("Could not resolve {hop:?}"))?;
988 let leg = self
989 .circuits
990 .leg_mut(leg_id)
991 .ok_or(bad_api_usage!("No leg for id {:?}", leg_id))?;
992 Ok::<_, Bug>((leg, hop_num))
993 };
994
995 let (leg, hop_num) = match result {
996 Ok(x) => x,
997 Err(e) => {
998 if let Some(done) = done {
999 // don't care if the sender goes away
1000 let e = into_bad_api_usage!("Could not resolve {hop:?}")(e);
1001 let _ = done.send(Err(e.into()));
1002 }
1003 return Ok(());
1004 }
1005 };
1006
1007 let max_rtt = {
1008 let hop = leg
1009 .hop(hop_num)
1010 .ok_or_else(|| internal!("the hop we resolved disappeared?!"))?;
1011 let ccontrol = hop.ccontrol();
1012
1013 // Note: if we have no measurements for the RTT, this will be set to 0,
1014 // and the timeout will be 2 * CBT.
1015 ccontrol
1016 .rtt()
1017 .max_rtt_usec()
1018 .map(|rtt| Duration::from_millis(u64::from(rtt)))
1019 .unwrap_or_default()
1020 };
1021
1022 // The length of the circuit up until the hop that has the half-streeam.
1023 //
1024 // +1, because HopNums are zero-based.
1025 let circ_len = usize::from(hop_num) + 1;
1026
1027 // We double the CBT to account for rend circuits,
1028 // which are twice as long (otherwise we risk expiring
1029 // the rend half-streams too soon).
1030 let timeout = std::cmp::max(max_rtt, 2 * leg.estimate_cbt(circ_len));
1031 let expire_at = self.runtime.now() + timeout;
1032
1033 let res: Result<()> = leg
1034 .close_stream(hop_num, sid, behav, reason, expire_at)
1035 .await;
1036
1037 if let Some(done) = done {
1038 // don't care if the sender goes away
1039 let _ = done.send(res);
1040 }
1041 }
1042 RunOnceCmdInner::MaybeSendXon {
1043 rate,
1044 stream_id,
1045 hop,
1046 } => {
1047 let (leg_id, hop_num) = match self.resolve_hop_location(hop) {
1048 Ok(x) => x,
1049 Err(NoJoinPointError) => {
1050 // A stream tried to send an XON message message to the join point of
1051 // a tunnel that has never had a join point. Currently in arti, only a
1052 // `StreamTarget` asks us to send an XON message, and this tunnel
1053 // originally created the `StreamTarget` to begin with. So this is a
1054 // legitimate bug somewhere in the tunnel code.
1055 return Err(
1056 internal!(
1057 "Could not send an XON message to a join point on a tunnel without a join point",
1058 )
1059 .into()
1060 );
1061 }
1062 };
1063
1064 let Some(leg) = self.circuits.leg_mut(leg_id) else {
1065 // The leg has disappeared. This is fine since the stream may have ended and
1066 // been cleaned up while this `CtrlMsg::MaybeSendXon` message was queued.
1067 // It is possible that is a bug and this is an incorrect leg number, but
1068 // it's not currently possible to differentiate between an incorrect leg
1069 // number and a tunnel leg that has been closed.
1070 debug!("Could not send an XON message on a leg that does not exist. Ignoring.");
1071 return Ok(());
1072 };
1073
1074 let Some(hop) = leg.hop_mut(hop_num) else {
1075 // The hop has disappeared. This is fine since the circuit may have been
1076 // been truncated while the `CtrlMsg::MaybeSendXon` message was queued.
1077 // It is possible that is a bug and this is an incorrect hop number, but
1078 // it's not currently possible to differentiate between an incorrect hop
1079 // number and a circuit hop that has been removed.
1080 debug!("Could not send an XON message on a hop that does not exist. Ignoring.");
1081 return Ok(());
1082 };
1083
1084 let Some(msg) = hop.maybe_send_xon(rate, stream_id)? else {
1085 // Nothing to do.
1086 return Ok(());
1087 };
1088
1089 let cell = AnyRelayMsgOuter::new(Some(stream_id), msg.into());
1090 let cell = SendRelayCell {
1091 hop: Some(hop_num),
1092 early: false,
1093 cell,
1094 };
1095
1096 leg.send_relay_cell(cell).await?;
1097 }
1098 RunOnceCmdInner::HandleSendMe { leg, hop, sendme } => {
1099 let leg = self
1100 .circuits
1101 .leg_mut(leg)
1102 .ok_or_else(|| internal!("leg disappeared?!"))?;
1103 // NOTE: it's okay to await. We are only awaiting on the congestion_signals
1104 // future which *should* resolve immediately
1105 let signals = leg.chan_sender.congestion_signals().await;
1106 leg.handle_sendme(hop, sendme, signals)?;
1107 }
1108 RunOnceCmdInner::FirstHopClockSkew { answer } => {
1109 let res = self.circuits.single_leg_mut().map(|leg| leg.clock_skew());
1110
1111 // don't care if the sender goes away
1112 let _ = answer.send(res.map_err(Into::into));
1113 }
1114 RunOnceCmdInner::CleanShutdown => {
1115 trace!(tunnel_id = %self.tunnel_id, "reactor shutdown due to handled cell");
1116 return Err(ReactorError::Shutdown);
1117 }
1118 RunOnceCmdInner::RemoveLeg { leg, reason } => {
1119 debug!(tunnel_id = %self.tunnel_id, reason = %reason, "removing circuit leg");
1120
1121 let circ = self.circuits.remove(leg)?;
1122 let is_conflux_pending = circ.is_conflux_pending();
1123
1124 // Drop the removed leg. This will cause it to close if it's not already closed.
1125 drop(circ);
1126
1127 // If we reach this point, it means we have more than one leg
1128 // (otherwise the .remove() would've returned a Shutdown error),
1129 // so we expect there to be a ConfluxHandshakeContext installed.
1130
1131 #[cfg(feature = "conflux")]
1132 if is_conflux_pending {
1133 let (error, proto_violation): (_, Option<Error>) = match &reason {
1134 RemoveLegReason::ConfluxHandshakeTimeout => {
1135 (ConfluxHandshakeError::Timeout, None)
1136 }
1137 RemoveLegReason::ConfluxHandshakeErr(e) => {
1138 (ConfluxHandshakeError::Link(e.clone()), Some(e.clone()))
1139 }
1140 RemoveLegReason::ChannelClosed => {
1141 (ConfluxHandshakeError::ChannelClosed, None)
1142 }
1143 };
1144
1145 self.note_conflux_handshake_result(Err(error), proto_violation.is_some())?;
1146
1147 if let Some(e) = proto_violation {
1148 tor_error::warn_report!(
1149 e,
1150 tunnel_id = %self.tunnel_id,
1151 "Malformed conflux handshake, tearing down tunnel",
1152 );
1153
1154 return Err(e.into());
1155 }
1156 }
1157 }
1158 #[cfg(feature = "conflux")]
1159 RunOnceCmdInner::ConfluxHandshakeComplete { leg, cell } => {
1160 // Note: on the client-side, the handshake is considered complete once the
1161 // RELAY_CONFLUX_LINKED_ACK is sent (roughly upon receipt of the LINKED cell).
1162 //
1163 // We're optimistic here, and declare the handshake a success *before*
1164 // sending the LINKED_ACK response. I think this is OK though,
1165 // because if the send_relay_cell() below fails, the reactor will shut
1166 // down anyway. OTOH, marking the handshake as complete slightly early
1167 // means that on the happy path, the circuit is marked as usable sooner,
1168 // instead of blocking on the sending of the LINKED_ACK.
1169 self.note_conflux_handshake_result(Ok(()), false)?;
1170
1171 let res = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
1172
1173 res?;
1174 }
1175 #[cfg(feature = "conflux")]
1176 RunOnceCmdInner::Link { circuits, answer } => {
1177 // Add the specified circuits to our conflux set,
1178 // and send a LINK cell down each unlinked leg.
1179 //
1180 // NOTE: this will block the reactor until all the cells are sent.
1181 self.handle_link_circuits(circuits, answer).await?;
1182 }
1183 #[cfg(feature = "conflux")]
1184 RunOnceCmdInner::Enqueue { leg, msg } => {
1185 let entry = ConfluxHeapEntry { leg_id: leg, msg };
1186 self.ooo_msgs.push(entry);
1187 }
1188 #[cfg(feature = "circ-padding")]
1189 RunOnceCmdInner::PaddingAction { leg, padding_event } => {
1190 // TODO: If we someday move back to having a per-circuit reactor,
1191 // this event would logically belong there, not on the tunnel reactor.
1192 self.circuits.run_padding_event(leg, padding_event).await?;
1193 }
1194 }
1195
1196 Ok(())
1197 }
1198
1199 /// Wait for a [`CtrlMsg::Create`] to come along to set up the circuit.
1200 ///
1201 /// Returns an error if an unexpected `CtrlMsg` is received.
1202 #[instrument(level = "trace", skip_all)]
1203 async fn wait_for_create(&mut self) -> StdResult<(), ReactorError> {
1204 let msg = select_biased! {
1205 res = self.command.next() => {
1206 let cmd = unwrap_or_shutdown!(self, res, "shutdown channel drop")?;
1207 match cmd {
1208 CtrlCmd::Shutdown => return self.handle_shutdown().map(|_| ()),
1209 #[cfg(test)]
1210 CtrlCmd::AddFakeHop {
1211 relay_cell_format: format,
1212 fwd_lasthop,
1213 rev_lasthop,
1214 peer_id,
1215 params,
1216 done,
1217 } => {
1218 let leg = self.circuits.single_leg_mut()?;
1219 leg.handle_add_fake_hop(format, fwd_lasthop, rev_lasthop, peer_id, ¶ms, done);
1220 return Ok(())
1221 },
1222 _ => {
1223 trace!("reactor shutdown due to unexpected command: {:?}", cmd);
1224 return Err(Error::CircProto(format!("Unexpected control {cmd:?} on client circuit")).into());
1225 }
1226 }
1227 },
1228 res = self.control.next() => unwrap_or_shutdown!(self, res, "control drop")?,
1229 };
1230
1231 match msg {
1232 CtrlMsg::Create {
1233 recv_created,
1234 handshake,
1235 settings,
1236 done,
1237 } => {
1238 // TODO(conflux): instead of crashing the reactor, it might be better
1239 // to send the error via the done channel instead
1240 let leg = self.circuits.single_leg_mut()?;
1241 leg.handle_create(recv_created, handshake, settings, done)
1242 .await
1243 }
1244 _ => {
1245 trace!("reactor shutdown due to unexpected cell: {:?}", msg);
1246
1247 Err(Error::CircProto(format!("Unexpected {msg:?} cell on client circuit")).into())
1248 }
1249 }
1250 }
1251
1252 /// Add the specified handshake result to our `ConfluxHandshakeContext`.
1253 ///
1254 /// If all the circuits we were waiting on have finished the conflux handshake,
1255 /// the `ConfluxHandshakeContext` is consumed, and the results we have collected
1256 /// are sent to the handshake initiator.
1257 #[cfg(feature = "conflux")]
1258 #[instrument(level = "trace", skip_all)]
1259 fn note_conflux_handshake_result(
1260 &mut self,
1261 res: StdResult<(), ConfluxHandshakeError>,
1262 reactor_is_closing: bool,
1263 ) -> StdResult<(), ReactorError> {
1264 let tunnel_complete = match self.conflux_hs_ctx.as_mut() {
1265 Some(conflux_ctx) => {
1266 conflux_ctx.results.push(res);
1267 // Whether all the legs have finished linking:
1268 conflux_ctx.results.len() == conflux_ctx.num_legs
1269 }
1270 None => {
1271 return Err(internal!("no conflux handshake context").into());
1272 }
1273 };
1274
1275 if tunnel_complete || reactor_is_closing {
1276 // Time to remove the conflux handshake context
1277 // and extract the results we have collected
1278 let conflux_ctx = self.conflux_hs_ctx.take().expect("context disappeared?!");
1279
1280 let success_count = conflux_ctx.results.iter().filter(|res| res.is_ok()).count();
1281 let leg_count = conflux_ctx.results.len();
1282
1283 info!(
1284 tunnel_id = %self.tunnel_id,
1285 "conflux tunnel ready ({success_count}/{leg_count} circuits successfully linked)",
1286 );
1287
1288 send_conflux_outcome(conflux_ctx.answer, Ok(conflux_ctx.results))?;
1289
1290 // We don't expect to receive any more handshake results,
1291 // at least not until we get another LinkCircuits control message,
1292 // which will install a new ConfluxHandshakeCtx with a channel
1293 // for us to send updates on
1294 }
1295
1296 Ok(())
1297 }
1298
1299 /// Prepare a `SendRelayCell` request, and install the given meta-cell handler.
1300 fn prepare_msg_and_install_handler(
1301 &mut self,
1302 msg: Option<AnyRelayMsgOuter>,
1303 handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
1304 ) -> Result<Option<SendRelayCell>> {
1305 let msg = msg
1306 .map(|msg| {
1307 let handlers = &mut self.cell_handlers;
1308 let handler = handler
1309 .as_ref()
1310 .or(handlers.meta_handler.as_ref())
1311 .ok_or_else(|| internal!("tried to use an ended Conversation"))?;
1312 // We should always have a precise HopLocation here so this should never fails but
1313 // in case we have a ::JointPoint, we'll notice.
1314 let hop = handler.expected_hop().hop_num().ok_or(bad_api_usage!(
1315 "MsgHandler doesn't have a precise HopLocation"
1316 ))?;
1317 Ok::<_, crate::Error>(SendRelayCell {
1318 hop: Some(hop),
1319 early: false,
1320 cell: msg,
1321 })
1322 })
1323 .transpose()?;
1324
1325 if let Some(handler) = handler {
1326 self.cell_handlers.set_meta_handler(handler)?;
1327 }
1328
1329 Ok(msg)
1330 }
1331
1332 /// Handle a shutdown request.
1333 fn handle_shutdown(&self) -> StdResult<Option<RunOnceCmdInner>, ReactorError> {
1334 trace!(
1335 tunnel_id = %self.tunnel_id,
1336 "reactor shutdown due to explicit request",
1337 );
1338
1339 Err(ReactorError::Shutdown)
1340 }
1341
1342 /// Handle a request to shutdown the reactor and return the only [`Circuit`] in this tunnel.
1343 ///
1344 /// Returns an error over the `answer` channel if the reactor has no circuits,
1345 /// or more than one circuit. The reactor will shut down regardless.
1346 #[cfg(feature = "conflux")]
1347 fn handle_shutdown_and_return_circuit(
1348 &mut self,
1349 answer: oneshot::Sender<StdResult<Circuit, Bug>>,
1350 ) -> StdResult<(), ReactorError> {
1351 // Don't care if the receiver goes away
1352 let _ = answer.send(self.circuits.take_single_leg());
1353 self.handle_shutdown().map(|_| ())
1354 }
1355
1356 /// Resolves a [`TargetHop`] to a [`HopLocation`].
1357 ///
1358 /// After resolving a `TargetHop::LastHop`,
1359 /// the `HopLocation` can become stale if a single-path circuit is later extended or truncated.
1360 /// This means that the `HopLocation` can become stale from one reactor iteration to the next.
1361 ///
1362 /// It's generally okay to hold on to a (possibly stale) `HopLocation`
1363 /// if you need a fixed hop position in the tunnel.
1364 /// For example if we open a stream to `TargetHop::LastHop`,
1365 /// we would want to store the stream position as a `HopLocation` and not a `TargetHop::LastHop`
1366 /// as we don't want the stream position to change as the tunnel is extended or truncated.
1367 ///
1368 /// Returns [`NoHopsBuiltError`] if trying to resolve `TargetHop::LastHop`
1369 /// and the tunnel has no hops
1370 /// (either has no legs, or has legs which contain no hops).
1371 fn resolve_target_hop(&self, hop: TargetHop) -> StdResult<HopLocation, NoHopsBuiltError> {
1372 match hop {
1373 TargetHop::Hop(hop) => Ok(hop),
1374 TargetHop::LastHop => {
1375 if let Ok(leg) = self.circuits.single_leg() {
1376 let leg_id = leg.unique_id();
1377 // single-path tunnel
1378 let hop = leg.last_hop_num().ok_or(NoHopsBuiltError)?;
1379 Ok(HopLocation::Hop((leg_id, hop)))
1380 } else if !self.circuits.is_empty() {
1381 // multi-path tunnel
1382 Ok(HopLocation::JoinPoint)
1383 } else {
1384 // no legs
1385 Err(NoHopsBuiltError)
1386 }
1387 }
1388 }
1389 }
1390
1391 /// Resolves a [`HopLocation`] to a [`UniqId`] and [`HopNum`].
1392 ///
1393 /// After resolving a `HopLocation::JoinPoint`,
1394 /// the [`UniqId`] and [`HopNum`] can become stale if the primary leg changes.
1395 ///
1396 /// You should try to only resolve to a specific [`UniqId`] and [`HopNum`] immediately before you
1397 /// need them,
1398 /// and you should not hold on to the resolved [`UniqId`] and [`HopNum`] between reactor
1399 /// iterations as the primary leg may change from one iteration to the next.
1400 ///
1401 /// Returns [`NoJoinPointError`] if trying to resolve `HopLocation::JoinPoint`
1402 /// but it does not have a join point.
1403 #[instrument(level = "trace", skip_all)]
1404 fn resolve_hop_location(
1405 &self,
1406 hop: HopLocation,
1407 ) -> StdResult<(UniqId, HopNum), NoJoinPointError> {
1408 match hop {
1409 HopLocation::Hop((leg_id, hop_num)) => Ok((leg_id, hop_num)),
1410 HopLocation::JoinPoint => {
1411 if let Some((leg_id, hop_num)) = self.circuits.primary_join_point() {
1412 Ok((leg_id, hop_num))
1413 } else {
1414 // Attempted to get the join point of a non-multipath tunnel.
1415 Err(NoJoinPointError)
1416 }
1417 }
1418 }
1419 }
1420
1421 /// Resolve a [`TargetHop`] directly into a [`UniqId`] and [`HopNum`].
1422 ///
1423 /// This is a helper function that basically calls both resolve_target_hop and
1424 /// resolve_hop_location back to back.
1425 ///
1426 /// It returns None on failure to resolve meaning that if you want more detailed error on why
1427 /// it failed, explicitly use the resolve_hop_location() and resolve_target_hop() functions.
1428 pub(crate) fn target_hop_to_hopnum_id(&self, hop: TargetHop) -> Option<(UniqId, HopNum)> {
1429 self.resolve_target_hop(hop)
1430 .ok()
1431 .and_then(|resolved| self.resolve_hop_location(resolved).ok())
1432 }
1433
1434 /// Install or remove a padder at a given hop.
1435 #[cfg(feature = "circ-padding-manual")]
1436 fn set_padding_at_hop(
1437 &self,
1438 hop: HopLocation,
1439 padder: Option<super::circuit::padding::CircuitPadder>,
1440 ) -> Result<()> {
1441 let HopLocation::Hop((leg_id, hop_num)) = hop else {
1442 return Err(bad_api_usage!("Padding to the join point is not supported.").into());
1443 };
1444 let circ = self.circuits.leg(leg_id).ok_or(Error::NoSuchHop)?;
1445 circ.set_padding_at_hop(hop_num, padder)?;
1446 Ok(())
1447 }
1448
1449 /// Does congestion control use stream SENDMEs for the given hop?
1450 ///
1451 /// Returns `None` if either the `leg` or `hop` don't exist.
1452 fn uses_stream_sendme(&self, leg: UniqId, hop: HopNum) -> Option<bool> {
1453 self.circuits.uses_stream_sendme(leg, hop)
1454 }
1455
1456 /// Handle a request to link some extra circuits in the reactor's conflux set.
1457 ///
1458 /// The circuits are validated, and if they do not have the same length,
1459 /// or if they do not all have the same last hop, an error is returned on
1460 /// the `answer` channel, and the conflux handshake is *not* initiated.
1461 ///
1462 /// If validation succeeds, the circuits are added to this reactor's conflux set,
1463 /// and the conflux handshake is initiated (by sending a LINK cell on each leg).
1464 ///
1465 /// NOTE: this blocks the reactor main loop until all the cells are sent.
1466 #[cfg(feature = "conflux")]
1467 #[instrument(level = "trace", skip_all)]
1468 async fn handle_link_circuits(
1469 &mut self,
1470 circuits: Vec<Circuit>,
1471 answer: ConfluxLinkResultChannel,
1472 ) -> StdResult<(), ReactorError> {
1473 use tor_error::warn_report;
1474
1475 if self.conflux_hs_ctx.is_some() {
1476 let err = internal!("conflux linking already in progress");
1477 send_conflux_outcome(answer, Err(err.into()))?;
1478
1479 return Ok(());
1480 }
1481
1482 let unlinked_legs = self.circuits.num_unlinked();
1483
1484 // We need to send the LINK cell on each of the new circuits
1485 // and on each of the existing, unlinked legs from self.circuits.
1486 //
1487 // In reality, there can only be one such circuit
1488 // (the "initial" one from the previously single-path tunnel),
1489 // because any circuits that to complete the conflux handshake
1490 // get removed from the set.
1491 let num_legs = circuits.len() + unlinked_legs;
1492
1493 // Note: add_legs validates `circuits`
1494 let res = async {
1495 self.circuits.add_legs(circuits, &self.runtime)?;
1496 self.circuits.link_circuits(&self.runtime).await
1497 }
1498 .await;
1499
1500 if let Err(e) = res {
1501 warn_report!(e, "Failed to link conflux circuits");
1502
1503 send_conflux_outcome(answer, Err(e))?;
1504 } else {
1505 // Save the channel, to notify the user of completion.
1506 self.conflux_hs_ctx = Some(ConfluxHandshakeCtx {
1507 answer,
1508 num_legs,
1509 results: Default::default(),
1510 });
1511 }
1512
1513 Ok(())
1514 }
1515}
1516
1517/// Notify the conflux handshake initiator of the handshake outcome.
1518///
1519/// Returns an error if the initiator has done away.
1520#[cfg(feature = "conflux")]
1521fn send_conflux_outcome(
1522 tx: ConfluxLinkResultChannel,
1523 res: Result<ConfluxHandshakeResult>,
1524) -> StdResult<(), ReactorError> {
1525 if tx.send(res).is_err() {
1526 tracing::warn!("conflux initiator went away before handshake completed?");
1527 return Err(ReactorError::Shutdown);
1528 }
1529
1530 Ok(())
1531}
1532
1533/// The tunnel does not have any hops.
1534#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
1535#[non_exhaustive]
1536#[error("no hops have been built for this tunnel")]
1537pub(crate) struct NoHopsBuiltError;
1538
1539/// The tunnel does not have a join point.
1540#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
1541#[non_exhaustive]
1542#[error("the tunnel does not have a join point")]
1543pub(crate) struct NoJoinPointError;
1544
1545impl CellHandlers {
1546 /// Try to install a given meta-cell handler to receive any unusual cells on
1547 /// this circuit, along with a result channel to notify on completion.
1548 fn set_meta_handler(&mut self, handler: Box<dyn MetaCellHandler + Send>) -> Result<()> {
1549 if self.meta_handler.is_none() {
1550 self.meta_handler = Some(handler);
1551 Ok(())
1552 } else {
1553 Err(Error::from(internal!(
1554 "Tried to install a meta-cell handler before the old one was gone."
1555 )))
1556 }
1557 }
1558
1559 /// Try to install a given cell handler on this circuit.
1560 #[cfg(feature = "hs-service")]
1561 fn set_incoming_stream_req_handler(
1562 &mut self,
1563 handler: IncomingStreamRequestHandler,
1564 ) -> Result<()> {
1565 if self.incoming_stream_req_handler.is_none() {
1566 self.incoming_stream_req_handler = Some(handler);
1567 Ok(())
1568 } else {
1569 Err(Error::from(internal!(
1570 "Tried to install a BEGIN cell handler before the old one was gone."
1571 )))
1572 }
1573 }
1574}
1575
1576#[cfg(test)]
1577mod test {
1578 // Tested in [`crate::client::circuit::test`].
1579}