tor-proto 0.41.0

Asynchronous client-side implementation of the central Tor network protocols
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
//! Module exposing the circuit reactor subsystem.
//!
//! This module implements the new [multi-reactor circuit subsystem].
//!
// Note: this is currently only used for the relay side,
// but we plan to eventually rewrite client circuit implementation
// to use these new reactor types as well.
//!
//! The entry point of the reactor is [`Reactor::run`], which launches the
//! reactor background tasks, and begins listening for inbound cells on the provided
//! inbound Tor channel.
//!
//! ### Architecture
//!
//! Internally, the circuit reactor consists of multiple reactors,
//! each running in a separate task:
//!
//!   * [`StreamReactor`] (one per hop): handles all messages arriving to,
//!     and coming from the streams of a given hop. The ready stream messages
//!     are sent to the [`BackwardReactor`]
//!   * [`ForwardReactor`]: handles incoming cells arriving on the
//!     "inbound" Tor channel (towards the guard, if we are a client, or towards
//!     the client, if we are a relay). If we are a client, it moves stream messages
//!     towards the corresponding [`StreamReactor`]. If we are a relay,
//!     in addition to sending any stream messages to the `StreamReactor`,
//!     this reactor also moves cells in the forward direction
//!     (from the client towards the exit)
//!   * [`BackwardReactor`]: writes cells to the "inbound" Tor channel:
//!     towards the client if we are a relay, or the towards the exit
//!     if we are a client.
//!
// TODO: the forward/backward terminology no longer makes sense! Come up with better terms...
//!
//! If we are an exit relay, the cell flow looks roughly like this:
//!
//! ```text
//!                             <stream_tx
//!                              MPSC (0)>
//!   +--------------> FWD -------------------------+
//!   |                 |                           |
//!   |                 |                           |
//!   |                 |                           |
//!   |                 |                           v
//! relay      BackwardReactorCmd            StreamReactor
//!   ^             <MPSC (0)>                      |
//!   |                 |                           |
//!   |                 |                           |
//!   |                 |                           |
//!   |                 v                           |
//!   +--------------- BWD <------------------------+
//!     application stream data    <stream_rx
//!                                 MPSC (0)>
//!
//! For a middle relay (the `StreamReactor` is omitted for brevity,
//! but middle relays can have one too, if leaky pipe is in use):
//!
//! ```text                   unrecognized cell
//!   +--------------> FWD -------------------------+
//!   |                 |                           |
//!   |                 |                           |
//!   |                 |                           |
//!   |                 |                           v
//! client      BackwardReactorCmd                relay
//! or relay        <MPSC (0)>                      |
//!   ^                 |                           |
//!   |                 |                           |
//!   |                 |                           |
//!   |                 |                           |
//!   |                 v                           |
//!   +--------------- BWD <------------------------+
//! ```
//!
//! On the client-side the `ForwardReactor` reads cells from the Tor channel to the guard,
//! and the `BackwardReactor` writes to it.
//!
//! ```text
//!   +--------------- FWD <--------------------+
//!   |                 |                       |
//!   |                 |                       |
//!   |                 |                       |
//!   v                 |                       |
//! StreamReactor  BackwardReactorCmd         guard
//!   |               <MPSC (0)>                ^
//!   |                 |                       |
//!   |                 |                       |
//!   |                 |                       |
//!   |                 v                       |
//!   +--------------> BWD ---------------------+
//! ```
//!
//! Client with leaky pipe (`SR` = `StreamReactor`):
//!
//! ```text
//!   +------------------------------+
//!   |       +--------------------+ | (1 MPSC TX per SR)
//!   |       |                    | |
//!   |       |       +----------- FWD <------------------+
//!   |       |       |             |                     |
//!   |       |       |             |                     |
//!   |       |       |             |                     |
//!   v       v       v             |                     |
//!  SR      SR      SR           BackwardReactorCmd    guard
//! (hop 4) (hop 3)  (hop 2)      <MPSC (0)>              ^
//!   |       |       |             |                     |
//!   |       |       |             |                     |
//!   |       |       |             |                     |
//!   |       |       |             v                     |
//!   |       |       |            BWD -------------------+
//!   |       |       |             ^
//!   |       |       |             |
//!   |       |       |             | <stream_rx
//!   |       |       |             |  MPSC (0)>
//!   +-------+-------+-------------+
//! ```
//!
// TODO(tuning): The inter-reactor MPSC channels have no buffering,
// which is likely going to be bad for performance,
// so we will need to tune the sizes of these MPSC buffers.
//!
//! The read and write ends of the inbound and outbound Tor channels are "split",
//! such that each reactor holds an `inbound_chan_rx` stream (for reading)
//! and a `inbound_chan_tx` sink (for writing):
//!
//!  * `ForwardReactor` holds the reading end of the inbound
//!    (coming from the client, if we are a relay, or coming from the guard, if we are a client)
//!    Tor channel, and the writing end of the outbound (towards the exit, if we are a middle relay)
//!    Tor channel, if there is one
//!  * `BackwardReactor` holds the reading end of the outbound channel, if there is one,
//!    and the writing end of the inbound channel, if there is one
//!
//! #### `ForwardReactor`
//!
//! It handles forward cells, by delegating to the implementation-dependent
//! [`ForwardHandler::handle_forward_cell`], which decides
//! whether the cell needs to be handled in `ForwardReactor`,
//! or in the `ForwardHandler` itself.
//!
//! More concretely:
//!
//! ```text
//!
//! Legend: `F` = "forward reactor", `H` = "ForwardHandler"
//!
//! | Message           | Received in | Handled in | Description                            |
//! |-------------------|-------------|------------|----------------------------------------|
//! | DESTROY           | F           | H          | Handled internally by the FowardHandler|
//! |-------------------|-------------|------------|----------------------------------------|
//! | PADDING_NEGOTIATE | F           | H          | Handled internally by the FowardHandler|
//! |-------------------|-------------|------------|----------------------------------------|
//! | *unrecognized*    | F           | H          | Unrecognized relay cell handling is    |
//! | RELAY OR          |             |            | implementation-dependent so these are  |
//! | RELAY_EARLY       |             |            | handled in the ForwardHandler.         |
//! |                   |             |            |                                        |
//! |                   |             |            | The relay ForwardHandler will handle   |
//! |                   |             |            | these by forwarding them to the next   |
//! |                   |             |            | hop, if there is one.                  |
//! |                   |             |            |                                        |
//! |                   |             |            | Clients don't yet implement            |
//! |                   |             |            | ForwardHandler, but when they do,      |
//! |                   |             |            | its implementation will simply reject  |
//! |                   |             |            | any messages that can't be decrypted   |
//! |-------------------|-------------|------------|----------------------------------------|
//! | *recognized*      | F           | see table  | Handling depends on the cmd            |
//! | RELAY OR          |             | below      |                                        |
//! | RELAY_EARLY       |             |            |                                        |
//! ```
//!
//! Recognized relay cells are handled by splitting each cell into individual messages,
//! and handling each message individually as described in the table below
//! (Note: since prop340 is not yet implemented, in practice there is only 1 message per cell):
//!
//! ```text
//!
//! Legend: `F` = "forward reactor", `B` = "backward reactor", `S` = "stream reactor"
//!
//! | RELAY cmd         | Received in | Handled in | Description                            |
//! |-------------------|-------------|------------|----------------------------------------|
//! | SENDME            | F           | B          | Sent to BackwardReactor for handling   |
//! |                   |             |            | (BackwardReactorCmd::HandleSendme)     |
//! |                   |             |            | because the forward reactor doesn't    |
//! |                   |             |            | have access to the inbound_chan_tx part|
//! |                   |             |            | of the inbound (towards the client)    |
//! |                   |             |            | Tor channel, and so cannot obtain the  |
//! |                   |             |            | congestion signals needed for SENDME   |
//! |                   |             |            | handling                               |
//! |-------------------|-------------|------------|----------------------------------------|
//! | Other             | F           | F          | Passed to impl-dependent handler       |
//! | (StreamId = 0)    |             |            |  `ForwardHandler::handle_meta_msg()`   |
//! |-------------------|-------------|------------|----------------------------------------|
//! | Other             | F           | S          | All messages with a non-zero stream ID |
//! | (StreamId != 0)   |             |            | are forwarded to the stream reactor    |
//! |-------------------|-------------|------------|----------------------------------------|
//! ```
//!
//! #### `BackwardReactor`
//!
//! It handles
//!
//!  * the packaging and delivery of all cells that need to be written to the "inbound" Tor channel
//!    (it writes them to the towards-the-client Tor channel sink) (**partially implemented**)
//!  * incoming cells coming over the "outbound" Tor channel. This channel only exists
//!    if we are a middle relay. These cells are relayed to the "inbound" Tor channel (**not implemented**).
//!  * the sending of padding cells, according to the PaddingController's instructions
//!
//! This multi-reactor architecture should, in theory, have better performance than
//! a single reactor system, because it enables us to parallelize some of the work:
//! the forward and backward directions share little state,
//! because they read from, and write to, different sinks/streams,
//! so they can be run in parallel (as separate tasks).
//! With a single reactor architecture, the reactor would need to drive
//! both the forward and the backward direction, and on each iteration
//! would need to decide which to prioritize, which might prove tricky
//! (though prioritizing one of them at random would've probably been good enough).
//!
//! The monolithic single reactor alternative would also have been significantly
//! more convoluted, and so more difficult to maintain in the long run.
//!
//
// NOTE: The FWD and BWD currently share the hop list containing the per-hop state,
// (including the congestion control object, which is behind a mutex).
//
//! [multi-reactor circuit subsystem]: https://gitlab.torproject.org/tpo/core/arti/-/blob/main/doc/dev/notes/relay-conflux.md
//! [`StreamReactor`]: stream::StreamReactor

// TODO(DEDUP): this will replace CircHopList when we rewrite the client reactor
// to use the new reactor architecture
pub(crate) mod circhop;

pub(crate) mod backward;
pub(crate) mod forward;
pub(crate) mod hop_mgr;
pub(crate) mod macros;
pub(crate) mod stream;

use std::result::Result as StdResult;
use std::sync::Arc;

use derive_deftly::Deftly;
use futures::channel::mpsc;
use futures::{FutureExt as _, StreamExt as _, select_biased};
use oneshot_fused_workaround as oneshot;
use tracing::trace;

use tor_cell::chancell::CircId;
use tor_rtcompat::{DynTimeProvider, Runtime};

use crate::channel::Channel;
use crate::circuit::reactor::backward::BackwardHandler;
use crate::circuit::reactor::forward::ForwardHandler;
use crate::circuit::reactor::hop_mgr::HopMgr;
use crate::circuit::reactor::stream::ReadyStreamMsg;
use crate::circuit::{CircuitRxReceiver, UniqId};
use crate::memquota::CircuitAccount;
use crate::util::err::ReactorError;

// TODO(circpad): once padding is stabilized, the padding module will be moved out of client.
use crate::client::circuit::padding::{PaddingController, PaddingEventStream};

use backward::BackwardReactor;
use forward::ForwardReactor;
use macros::derive_deftly_template_CircuitReactor;

/// The type of a oneshot channel used to inform reactor of the result of an operation.
pub(crate) type ReactorResultChannel<T> = oneshot::Sender<crate::Result<T>>;

/// A handle for interacting with a circuit reactor.
#[derive(derive_more::Debug)]
pub(crate) struct CircReactorHandle<F: ForwardHandler, B: BackwardHandler> {
    /// Sender for reactor control messages.
    #[debug(skip)]
    pub(crate) control: mpsc::UnboundedSender<CtrlMsg<F::CtrlMsg, B::CtrlMsg>>,
    /// Sender for reactor control commands.
    #[debug(skip)]
    pub(crate) command: mpsc::UnboundedSender<CtrlCmd<F::CtrlCmd, B::CtrlCmd>>,
    /// The time provider.
    pub(crate) time_provider: DynTimeProvider,
    /// Memory quota account
    pub(crate) memquota: CircuitAccount,
}

/// A control command.
///
/// The difference between this and [`CtrlMsg`] is that `CtrlCmd`s
/// never cause cells to sent on the Tor channel,
/// while `CtrlMsg`s potentially do.
#[allow(unused)] // TODO(relay)
pub(crate) enum CtrlCmd<F, B> {
    /// A control command for the forward reactor.
    Forward(forward::CtrlCmd<F>),
    /// A control command for the backward reactor.
    Backward(backward::CtrlCmd<B>),
    /// Shut down the reactor.
    Shutdown,
}

/// A control message.
#[allow(unused)] // TODO(relay)
pub(crate) enum CtrlMsg<F, B> {
    /// A control message for the forward reactor.
    Forward(forward::CtrlMsg<F>),
    /// A control message for the backward reactor.
    Backward(backward::CtrlMsg<B>),
}

/// The entry point of the circuit reactor subsystem.
#[derive(Deftly)]
#[derive_deftly(CircuitReactor)]
#[deftly(reactor_name = "circuit reactor")]
#[deftly(only_run_once)]
#[deftly(run_inner_fn = "Self::run_inner")]
#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
pub(crate) struct Reactor<R: Runtime, F: ForwardHandler, B: BackwardHandler> {
    /// The process-unique identifier of this circuit.
    ///
    /// Used for logging.
    unique_id: UniqId,
    /// The reactor for handling
    ///
    ///   * cells moving in the forward direction (from the client towards exit), if we are a relay
    ///   * incoming cells (coming from the guard), if we are a client
    ///
    /// Optional so we can move it out of self in run().
    forward: Option<ForwardReactor<R, F>>,
    /// The reactor for handling
    ///
    ///   * cells moving in the backward direction (from the exit towards client), if we are a relay
    ///   * outgoing cells (moving towards the guard), if we are a client
    ///
    /// Optional so we can move it out of self in run().
    backward: Option<BackwardReactor<B>>,
    /// Receiver for control messages for this reactor, sent by reactor handle objects.
    control: mpsc::UnboundedReceiver<CtrlMsg<F::CtrlMsg, B::CtrlMsg>>,
    /// Receiver for command messages for this reactor, sent by reactor handle objects.
    ///
    /// This MPSC channel is polled in [`run`](Self::run).
    ///
    /// NOTE: this is a separate channel from `control`, because some messages
    /// have higher priority and need to be handled even if the `inbound_chan_tx` is not
    /// ready (whereas `control` messages are not read until the `inbound_chan_tx` sink
    /// is ready to accept cells).
    command: mpsc::UnboundedReceiver<CtrlCmd<F::CtrlCmd, B::CtrlCmd>>,
    /// Control channels for the [`ForwardReactor`].
    ///
    /// Handles [`CtrlCmd::Forward`] and [`CtrlMsg::Forward`] messages.
    fwd_ctrl: ReactorCtrl<forward::CtrlCmd<F::CtrlCmd>, forward::CtrlMsg<F::CtrlMsg>>,
    /// Control channels for the [`BackwardReactor`].
    ///
    /// Handles [`CtrlCmd::Backward`] and [`CtrlMsg::Backward`] messages.
    bwd_ctrl: ReactorCtrl<backward::CtrlCmd<B::CtrlCmd>, backward::CtrlMsg<B::CtrlMsg>>,
}

/// A handle for sending control/command messages to a FWD or BWD.
struct ReactorCtrl<C, M> {
    /// Sender for control commands.
    command_tx: mpsc::UnboundedSender<C>,
    /// Sender for control messages.
    control_tx: mpsc::UnboundedSender<M>,
}

impl<C, M> ReactorCtrl<C, M> {
    /// Create a new sender handle.
    fn new(command_tx: mpsc::UnboundedSender<C>, control_tx: mpsc::UnboundedSender<M>) -> Self {
        Self {
            command_tx,
            control_tx,
        }
    }

    /// Send a control command.
    fn send_cmd(&mut self, cmd: C) -> Result<(), ReactorError> {
        self.command_tx
            .unbounded_send(cmd)
            .map_err(|_| ReactorError::Shutdown)
    }

    /// Send a control message.
    fn send_msg(&mut self, msg: M) -> Result<(), ReactorError> {
        self.control_tx
            .unbounded_send(msg)
            .map_err(|_| ReactorError::Shutdown)
    }
}

/// Trait implemented by types that can handle control messages and commands.
pub(crate) trait ControlHandler {
    /// The type of control message expected by the forward reactor.
    type CtrlMsg;

    /// The type of control command expected by the forward reactor.
    type CtrlCmd;

    // TODO(DEDUP): do these APIs make sense?
    // What should we return here, maybe some instructions for the base reactor
    // to do something?

    /// Handle a control command.
    fn handle_cmd(&mut self, cmd: Self::CtrlCmd) -> StdResult<(), ReactorError>;

    /// Handle a control message.
    fn handle_msg(&mut self, msg: Self::CtrlMsg) -> StdResult<(), ReactorError>;
}

#[allow(unused)] // TODO(relay)
impl<R: Runtime, F: ForwardHandler + ControlHandler, B: BackwardHandler + ControlHandler>
    Reactor<R, F, B>
{
    /// Create a new circuit reactor.
    ///
    /// The reactor will send outbound messages on `channel`, receive incoming
    /// messages on `inbound_chan_rx`, and identify this circuit by the channel-local
    /// [`CircId`] provided.
    ///
    /// The internal unique identifier for this circuit will be `unique_id`.
    #[allow(clippy::too_many_arguments)] // TODO
    pub(crate) fn new(
        runtime: R,
        channel: &Arc<Channel>,
        circ_id: CircId,
        unique_id: UniqId,
        inbound_chan_rx: CircuitRxReceiver,
        forward_impl: F,
        backward_impl: B,
        hop_mgr: HopMgr<R>,
        padding_ctrl: PaddingController,
        padding_event_stream: PaddingEventStream,
        // The sending end of this channel should be in HopMgr
        bwd_rx: mpsc::Receiver<ReadyStreamMsg>,
        fwd_events: mpsc::Receiver<F::CircEvent>,
        memquota: &CircuitAccount,
    ) -> (Self, CircReactorHandle<F, B>) {
        // NOTE: not registering this channel with the memquota subsystem is okay,
        // because it has no buffering (if ever decide to make the size of this buffer
        // non-zero for whatever reason, we must remember to register it with memquota
        // so that it counts towards the total memory usage for the circuit.
        #[allow(clippy::disallowed_methods)]
        let (backward_reactor_tx, forward_reactor_rx) = mpsc::channel(0);

        // TODO: channels galore
        let (control_tx, control_rx) = mpsc::unbounded();
        let (command_tx, command_rx) = mpsc::unbounded();

        let (fwd_control_tx, fwd_control_rx) = mpsc::unbounded();
        let (fwd_command_tx, fwd_command_rx) = mpsc::unbounded();
        let (bwd_control_tx, bwd_control_rx) = mpsc::unbounded();
        let (bwd_command_tx, bwd_command_rx) = mpsc::unbounded();

        let fwd_ctrl = ReactorCtrl::new(fwd_command_tx, fwd_control_tx);
        let bwd_ctrl = ReactorCtrl::new(bwd_command_tx, bwd_control_tx);

        let handle = CircReactorHandle {
            control: control_tx,
            command: command_tx,
            time_provider: DynTimeProvider::new(runtime.clone()),
            memquota: memquota.clone(),
        };

        /// Grab a handle to the hop list (it's needed by the BWD)
        let hops = Arc::clone(hop_mgr.hops());
        let forward = ForwardReactor::new(
            runtime.clone(),
            unique_id,
            forward_impl,
            hop_mgr,
            inbound_chan_rx,
            fwd_control_rx,
            fwd_command_rx,
            backward_reactor_tx,
            fwd_events,
            padding_ctrl.clone(),
        );

        let backward = BackwardReactor::new(
            runtime,
            channel,
            circ_id,
            unique_id,
            backward_impl,
            hops,
            forward_reactor_rx,
            bwd_control_rx,
            bwd_command_rx,
            padding_ctrl,
            padding_event_stream,
            bwd_rx,
        );

        let reactor = Reactor {
            unique_id,
            forward: Some(forward),
            backward: Some(backward),
            control: control_rx,
            command: command_rx,
            fwd_ctrl,
            bwd_ctrl,
        };

        (reactor, handle)
    }

    /// Helper for [`run`](Self::run).
    pub(crate) async fn run_inner(&mut self) -> StdResult<(), ReactorError> {
        let (forward, backward) = (|| Some((self.forward.take()?, self.backward.take()?)))()
            .expect("relay reactor spawned twice?!");

        let mut forward = Box::pin(forward.run()).fuse();
        let mut backward = Box::pin(backward.run()).fuse();
        loop {
            // If either of these completes, this function returns,
            // dropping fwd_ctrl/bwd_ctrl channels, which will, in turn,
            // cause the remaining reactor, if there is one, to shut down too
            select_biased! {
                res = self.command.next() => {
                    let Some(cmd) = res else {
                        trace!(
                            circ_id = %self.unique_id,
                            reason = "command channel drop",
                            "reactor shutdown",
                        );

                        return Err(ReactorError::Shutdown);
                    };

                    self.handle_command(cmd)?;
                },
                res = self.control.next() => {
                    let Some(msg) = res else {
                        trace!(
                            circ_id = %self.unique_id,
                            reason = "control channel drop",
                            "reactor shutdown",
                        );

                        return Err(ReactorError::Shutdown);
                    };

                    self.handle_control(msg)?;
                },
                // No need to log the error here, because it was already logged
                // by the reactor that shut down
                res = forward => return Ok(res?),
                res = backward => return Ok(res?),
            }
        }
    }

    /// Handle a shutdown request.
    fn handle_shutdown(&self) -> StdResult<(), ReactorError> {
        trace!(
            tunnel_id = %self.unique_id,
            "reactor shutdown due to explicit request",
        );

        Err(ReactorError::Shutdown)
    }

    /// Handle a [`CtrlCmd`].
    fn handle_command(
        &mut self,
        cmd: CtrlCmd<F::CtrlCmd, B::CtrlCmd>,
    ) -> StdResult<(), ReactorError> {
        match cmd {
            CtrlCmd::Forward(c) => self.fwd_ctrl.send_cmd(c),
            CtrlCmd::Backward(c) => self.bwd_ctrl.send_cmd(c),
            CtrlCmd::Shutdown => self.handle_shutdown(),
        }
    }

    /// Handle a [`CtrlMsg`].
    fn handle_control(
        &mut self,
        cmd: CtrlMsg<F::CtrlMsg, B::CtrlMsg>,
    ) -> StdResult<(), ReactorError> {
        match cmd {
            CtrlMsg::Forward(c) => self.fwd_ctrl.send_msg(c),
            CtrlMsg::Backward(c) => self.bwd_ctrl.send_msg(c),
        }
    }
}

#[cfg(test)]
pub(crate) mod test {
    // @@ begin test lint list maintained by maint/add_warning @@
    #![allow(clippy::bool_assert_comparison)]
    #![allow(clippy::clone_on_copy)]
    #![allow(clippy::dbg_macro)]
    #![allow(clippy::mixed_attributes_style)]
    #![allow(clippy::print_stderr)]
    #![allow(clippy::print_stdout)]
    #![allow(clippy::single_char_pattern)]
    #![allow(clippy::unwrap_used)]
    #![allow(clippy::unchecked_time_subtraction)]
    #![allow(clippy::useless_vec)]
    #![allow(clippy::needless_pass_by_value)]
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->

    use tor_basic_utils::test_rng::testing_rng;
    use tor_cell::chancell::{BoxedCellBody, msg as chanmsg};
    use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId, msg as relaymsg};

    use chanmsg::AnyChanMsg;

    #[cfg(feature = "hs-service")]
    use crate::client::stream::IncomingStreamRequestFilter;

    pub(crate) fn rmsg_to_ccmsg(
        id: Option<StreamId>,
        msg: relaymsg::AnyRelayMsg,
        early: bool,
    ) -> AnyChanMsg {
        // TODO #1947: test other formats.
        let rfmt = RelayCellFormat::V0;
        let body: BoxedCellBody = AnyRelayMsgOuter::new(id, msg)
            .encode(rfmt, &mut testing_rng())
            .unwrap();
        let chanmsg = chanmsg::Relay::from(body);

        if early {
            let chanmsg = chanmsg::RelayEarly::from(chanmsg);
            AnyChanMsg::RelayEarly(chanmsg)
        } else {
            AnyChanMsg::Relay(chanmsg)
        }
    }

    #[cfg(any(feature = "hs-service", feature = "relay"))]
    pub(crate) struct AllowAllStreamsFilter;
    #[cfg(any(feature = "hs-service", feature = "relay"))]
    impl IncomingStreamRequestFilter for AllowAllStreamsFilter {
        fn disposition(
            &mut self,
            _ctx: &crate::client::stream::IncomingStreamRequestContext<'_>,
            _circ: &crate::circuit::CircHopSyncView<'_>,
        ) -> crate::Result<crate::client::stream::IncomingStreamRequestDisposition> {
            Ok(crate::client::stream::IncomingStreamRequestDisposition::Accept)
        }
    }
}