tor-proto 0.41.0

Asynchronous client-side implementation of the central Tor network protocols
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
//! Client-specific types and implementation.

pub mod channel;
pub mod circuit;
pub mod stream;

#[cfg(feature = "send-control-msg")]
pub(crate) mod msghandler;
pub(crate) mod reactor;

use derive_deftly::Deftly;
use oneshot_fused_workaround as oneshot;
use std::net::IpAddr;
use std::sync::Arc;
use tracing::instrument;

use crate::circuit::UniqId;
#[cfg(feature = "circ-padding-manual")]
pub use crate::client::circuit::padding::{
    CircuitPadder, CircuitPadderConfig, CircuitPadderConfigError,
};
use crate::client::stream::{
    DataStream, OutboundDataCmdChecker, ResolveCmdChecker, ResolveStream, StreamParameters,
    StreamReceiver,
};
use crate::congestion::sendme::StreamRecvWindow;
use crate::crypto::cell::HopNum;
use crate::memquota::{SpecificAccount as _, StreamAccount};
use crate::stream::STREAM_READER_BUFFER;
use crate::stream::cmdcheck::AnyCmdChecker;
use crate::stream::flow_ctrl::state::StreamRateLimit;
use crate::stream::flow_ctrl::xon_xoff::reader::XonXoffReaderCtrl;
use crate::stream::queue::stream_queue;
use crate::stream::{RECV_WINDOW_INIT, StreamComponents, StreamTarget, Tunnel};
use crate::util::notify::NotifySender;
use crate::{Error, ResolveError, Result};
use circuit::{CIRCUIT_BUFFER_SIZE, ClientCirc, Path};
use reactor::{CtrlCmd, CtrlMsg, FlowCtrlMsg, MetaCellHandler};

use postage::watch;
use tor_cell::relaycell::StreamId;
use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
use tor_cell::relaycell::msg::{AnyRelayMsg, Begin, Resolve, Resolved, ResolvedVal};
use tor_error::bad_api_usage;
use tor_linkspec::OwnedChanTarget;
use tor_memquota::derive_deftly_template_HasMemoryCost;
use tor_memquota::mq_queue::{ChannelSpec as _, MpscSpec};

#[cfg(feature = "hs-service")]
use crate::stream::incoming::StreamReqInfo;

#[cfg(feature = "hs-service")]
use crate::client::stream::{IncomingCmdChecker, IncomingStream};

#[cfg(feature = "send-control-msg")]
use msghandler::{MsgHandler, UserMsgHandler};

/// Handle to use during an ongoing protocol exchange with a circuit's last hop
///
/// This is obtained from [`ClientTunnel::start_conversation`],
/// and used to send messages to the last hop relay.
//
// TODO(conflux): this should use ClientTunnel, and it should be moved into
// the tunnel module.
#[cfg(feature = "send-control-msg")]
pub struct Conversation<'r>(&'r ClientTunnel);

#[cfg(feature = "send-control-msg")]
impl Conversation<'_> {
    /// Send a protocol message as part of an ad-hoc exchange
    ///
    /// Responses are handled by the `UserMsgHandler` set up
    /// when the `Conversation` was created.
    pub async fn send_message(&self, msg: tor_cell::relaycell::msg::AnyRelayMsg) -> Result<()> {
        self.send_internal(Some(msg), None).await
    }

    /// Send a `SendMsgAndInstallHandler` to the reactor and wait for the outcome
    ///
    /// The guts of `start_conversation` and `Conversation::send_msg`
    pub(crate) async fn send_internal(
        &self,
        msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
    ) -> Result<()> {
        let msg = msg.map(|msg| tor_cell::relaycell::AnyRelayMsgOuter::new(None, msg));
        let (sender, receiver) = oneshot::channel();

        let ctrl_msg = CtrlMsg::SendMsgAndInstallHandler {
            msg,
            handler,
            sender,
        };
        self.0
            .circ
            .control
            .unbounded_send(ctrl_msg)
            .map_err(|_| Error::CircuitClosed)?;

        receiver.await.map_err(|_| Error::CircuitClosed)?
    }
}

/// A low-level client tunnel API.
///
/// This is a communication channel to the tunnel reactor, which manages 1 or more circuits.
///
/// Note: the tor-circmgr crates wrap this type in specialized *Tunnel types exposing only the
/// desired subset of functionality depending on purpose and path size.
///
/// Some API calls are for single path and some for multi path. A check with the underlying reactor
/// is done preventing for instance multi path calls to be used on a single path. Top level types
/// should prevent this and thus this object should never be used directly.
#[derive(Debug)]
#[allow(dead_code)] // TODO(conflux)
pub struct ClientTunnel {
    /// The underlying handle to the reactor.
    circ: ClientCirc,
}

impl ClientTunnel {
    /// Return a handle to the `ClientCirc` of this `ClientTunnel`, if the tunnel is a single
    /// circuit tunnel.
    ///
    /// Returns an error if the tunnel has more than one circuit.
    pub fn as_single_circ(&self) -> Result<&ClientCirc> {
        if self.circ.is_multi_path {
            return Err(bad_api_usage!("Single circuit getter on multi path tunnel"))?;
        }
        Ok(&self.circ)
    }

    /// Return the channel target of the first hop.
    ///
    /// Can only be used for single path tunnel.
    pub fn first_hop(&self) -> Result<OwnedChanTarget> {
        self.as_single_circ()?.first_hop()
    }

    /// Return true if the circuit reactor is closed meaning the circuit is unusable for both
    /// receiving or sending.
    pub fn is_closed(&self) -> bool {
        self.circ.is_closing()
    }

    /// Return a [`TargetHop`] representing precisely the last hop of the circuit as in set as a
    /// HopLocation with its id and hop number.
    ///
    /// Return an error if there is no last hop.
    pub fn last_hop(&self) -> Result<TargetHop> {
        let uniq_id = self.unique_id();
        let hop_num = self
            .circ
            .mutable
            .last_hop_num(uniq_id)?
            .ok_or_else(|| bad_api_usage!("no last hop"))?;
        Ok((uniq_id, hop_num).into())
    }

    /// Return a description of the last hop of the tunnel.
    ///
    /// Return None if the last hop is virtual; return an error
    /// if the tunnel has no circuits, or all of its circuits are zero length.
    ///
    ///
    /// # Panics
    ///
    /// Panics if there is no last hop.  (This should be impossible outside of
    /// the tor-proto crate, but within the crate it's possible to have a
    /// circuit with no hops.)
    pub fn last_hop_info(&self) -> Result<Option<OwnedChanTarget>> {
        self.circ.last_hop_info()
    }

    /// Return the number of hops this tunnel as. Fail for a multi path.
    pub fn n_hops(&self) -> Result<usize> {
        self.as_single_circ()?.n_hops()
    }

    /// Return the [`Path`] objects describing all the hops
    /// of all the circuits in this tunnel.
    pub fn all_paths(&self) -> Vec<Arc<Path>> {
        self.circ.all_paths()
    }

    /// Return a process-unique identifier for this tunnel.
    ///
    /// Returns the reactor unique ID of the main reactor.
    pub fn unique_id(&self) -> UniqId {
        self.circ.unique_id()
    }

    /// Return the time at which this tunnel last had any open streams.
    ///
    /// Returns `None` if this tunnel has never had any open streams,
    /// or if it currently has open streams.
    ///
    /// NOTE that the Instant returned by this method is not affected by
    /// any runtime mocking; it is the output of an ordinary call to
    /// `Instant::now()`.
    pub async fn disused_since(&self) -> Result<Option<web_time_compat::Instant>> {
        self.circ.disused_since().await
    }

    /// Return a future that will resolve once the underlying circuit reactor has closed.
    ///
    /// Note that this method does not itself cause the tunnel to shut down.
    pub fn wait_for_close(
        self: &Arc<Self>,
    ) -> impl futures::Future<Output = ()> + Send + Sync + 'static + use<> {
        self.circ.wait_for_close()
    }

    /// Single-path tunnel only. Multi path onion service is not supported yet.
    ///
    /// Tell this tunnel to begin allowing the final hop of the tunnel to try
    /// to create new Tor streams, and to return those pending requests in an
    /// asynchronous stream.
    ///
    /// Ordinarily, these requests are rejected.
    ///
    /// There can only be one [`Stream`](futures::Stream) of this type created on a given tunnel.
    /// If a such a [`Stream`](futures::Stream) already exists, this method will return
    /// an error.
    ///
    /// After this method has been called on a tunnel, the tunnel is expected
    /// to receive requests of this type indefinitely, until it is finally closed.
    /// If the `Stream` is dropped, the next request on this tunnel will cause it to close.
    ///
    /// Only onion services (and eventually) exit relays should call this
    /// method.
    //
    // TODO: Someday, we might want to allow a stream request handler to be
    // un-registered.  However, nothing in the Tor protocol requires it.
    //
    // Any incoming request handlers installed on the other circuits
    // (which are shutdown using CtrlCmd::ShutdownAndReturnCircuit)
    // will be discarded (along with the reactor of that circuit)
    #[cfg(feature = "hs-service")]
    #[allow(unreachable_code, unused_variables)] // TODO(conflux)
    pub async fn allow_stream_requests<'a, FILT>(
        self: &Arc<Self>,
        allow_commands: &'a [tor_cell::relaycell::RelayCmd],
        hop: TargetHop,
        filter: FILT,
    ) -> Result<impl futures::Stream<Item = IncomingStream> + use<'a, FILT>>
    where
        FILT: crate::client::stream::IncomingStreamRequestFilter + 'a,
    {
        use futures::stream::StreamExt;

        /// The size of the channel receiving IncomingStreamRequestContexts.
        const INCOMING_BUFFER: usize = STREAM_READER_BUFFER;

        // TODO(#2002): support onion service conflux
        let circ = self.as_single_circ().map_err(tor_error::into_internal!(
            "Cannot allow stream requests on a multi-path tunnel"
        ))?;

        let time_prov = circ.time_provider.clone();
        let cmd_checker = IncomingCmdChecker::new_any(allow_commands);
        let (incoming_sender, incoming_receiver) = MpscSpec::new(INCOMING_BUFFER)
            .new_mq(time_prov.clone(), circ.memquota.as_raw_account())?;
        let (tx, rx) = oneshot::channel();

        circ.command
            .unbounded_send(CtrlCmd::AwaitStreamRequest {
                cmd_checker,
                incoming_sender,
                hop,
                done: tx,
                filter: Box::new(filter),
            })
            .map_err(|_| Error::CircuitClosed)?;

        // Check whether the AwaitStreamRequest was processed successfully.
        rx.await.map_err(|_| Error::CircuitClosed)??;

        let allowed_hop_loc: HopLocation = match hop {
            TargetHop::Hop(loc) => Some(loc),
            _ => None,
        }
        .ok_or_else(|| bad_api_usage!("Expected TargetHop with HopLocation"))?;

        let tunnel = self.clone();
        Ok(incoming_receiver.map(move |req_ctx| {
            let StreamReqInfo {
                req,
                stream_id,
                hop,
                receiver,
                msg_tx,
                rate_limit_stream,
                drain_rate_request_stream,
                memquota,
                relay_cell_format,
            } = req_ctx;

            // We already enforce this in handle_incoming_stream_request; this
            // assertion is just here to make sure that we don't ever
            // accidentally remove or fail to enforce that check, since it is
            // security-critical.
            assert_eq!(Some(allowed_hop_loc), hop);

            // TODO(#2002): figure out what this is going to look like
            // for onion services (perhaps we should forbid this function
            // from being called on a multipath circuit?)
            //
            // See also:
            // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3002#note_3200937
            let target = StreamTarget {
                tunnel: Tunnel::Client(Arc::clone(&tunnel)),
                tx: msg_tx,
                hop: Some(allowed_hop_loc),
                stream_id,
                relay_cell_format,
                rate_limit_stream,
            };

            // can be used to build a reader that supports XON/XOFF flow control
            let xon_xoff_reader_ctrl =
                XonXoffReaderCtrl::new(drain_rate_request_stream, target.clone());

            let reader = StreamReceiver {
                target: target.clone(),
                receiver,
                recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
                ended: false,
            };

            let components = StreamComponents {
                stream_receiver: reader,
                target,
                memquota,
                xon_xoff_reader_ctrl,
            };

            IncomingStream::new(time_prov.clone(), req, components)
        }))
    }

    /// Single and Multi path helper, used to begin a stream.
    ///
    /// This function allocates a stream ID, and sends the message
    /// (like a BEGIN or RESOLVE), but doesn't wait for a response.
    ///
    /// The caller will typically want to see the first cell in response,
    /// to see whether it is e.g. an END or a CONNECTED.
    async fn begin_stream_impl(
        self: &Arc<Self>,
        begin_msg: AnyRelayMsg,
        cmd_checker: AnyCmdChecker,
    ) -> Result<StreamComponents> {
        // TODO: Possibly this should take a hop, rather than just
        // assuming it's the last hop.
        let hop = TargetHop::LastHop;

        let time_prov = self.circ.time_provider.clone();

        let memquota = StreamAccount::new(self.circ.mq_account())?;
        let (sender, receiver) = stream_queue(
            #[cfg(not(feature = "flowctl-cc"))]
            STREAM_READER_BUFFER,
            &memquota,
            &time_prov,
        )?;
        let (tx, rx) = oneshot::channel();
        let (msg_tx, msg_rx) =
            MpscSpec::new(CIRCUIT_BUFFER_SIZE).new_mq(time_prov, memquota.as_raw_account())?;

        let (rate_limit_tx, rate_limit_rx) = watch::channel_with(StreamRateLimit::MAX);

        // A channel for the reactor to request a new drain rate from the reader.
        // Typically this notification will be sent after an XOFF is sent so that the reader can
        // send us a new drain rate when the stream data queue becomes empty.
        let mut drain_rate_request_tx = NotifySender::new_typed();
        let drain_rate_request_rx = drain_rate_request_tx.subscribe();

        self.circ
            .control
            .unbounded_send(CtrlMsg::BeginStream {
                hop,
                message: begin_msg,
                sender,
                rx: msg_rx,
                rate_limit_notifier: rate_limit_tx,
                drain_rate_requester: drain_rate_request_tx,
                done: tx,
                cmd_checker,
            })
            .map_err(|_| Error::CircuitClosed)?;

        let (stream_id, hop, relay_cell_format) = rx.await.map_err(|_| Error::CircuitClosed)??;

        let target = StreamTarget {
            tunnel: Tunnel::Client(self.clone()),
            tx: msg_tx,
            hop: Some(hop),
            stream_id,
            relay_cell_format,
            rate_limit_stream: rate_limit_rx,
        };

        // can be used to build a reader that supports XON/XOFF flow control
        let xon_xoff_reader_ctrl = XonXoffReaderCtrl::new(drain_rate_request_rx, target.clone());

        let stream_receiver = StreamReceiver {
            target: target.clone(),
            receiver,
            recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
            ended: false,
        };

        let components = StreamComponents {
            stream_receiver,
            target,
            memquota,
            xon_xoff_reader_ctrl,
        };

        Ok(components)
    }

    /// Install a [`CircuitPadder`] at the listed `hop`.
    ///
    /// Replaces any previous padder installed at that hop.
    #[cfg(feature = "circ-padding-manual")]
    pub async fn start_padding_at_hop(
        self: &Arc<Self>,
        hop: HopLocation,
        padder: CircuitPadder,
    ) -> Result<()> {
        self.circ.set_padder_impl(hop, Some(padder)).await
    }

    /// Remove any [`CircuitPadder`] at the listed `hop`.
    ///
    /// Does nothing if there was not a padder installed there.
    #[cfg(feature = "circ-padding-manual")]
    pub async fn stop_padding_at_hop(self: &Arc<Self>, hop: HopLocation) -> Result<()> {
        self.circ.set_padder_impl(hop, None).await
    }

    /// Start a DataStream (anonymized connection) to the given
    /// address and port, using a BEGIN cell.
    async fn begin_data_stream(
        self: &Arc<Self>,
        msg: AnyRelayMsg,
        optimistic: bool,
    ) -> Result<DataStream> {
        let components = self
            .begin_stream_impl(msg, OutboundDataCmdChecker::new_any())
            .await?;

        let StreamComponents {
            stream_receiver,
            target,
            memquota,
            xon_xoff_reader_ctrl,
        } = components;

        let mut stream = DataStream::new(
            self.circ.time_provider.clone(),
            stream_receiver,
            xon_xoff_reader_ctrl,
            target,
            memquota,
        );
        if !optimistic {
            stream.wait_for_connection().await?;
        }
        Ok(stream)
    }

    /// Single and multi path helper.
    ///
    /// Start a stream to the given address and port, using a BEGIN
    /// cell.
    ///
    /// The use of a string for the address is intentional: you should let
    /// the remote Tor relay do the hostname lookup for you.
    #[instrument(level = "trace", skip_all)]
    pub async fn begin_stream(
        self: &Arc<Self>,
        target: &str,
        port: u16,
        parameters: Option<StreamParameters>,
    ) -> Result<DataStream> {
        let parameters = parameters.unwrap_or_default();
        let begin_flags = parameters.begin_flags();
        let optimistic = parameters.is_optimistic();
        let target = if parameters.suppressing_hostname() {
            ""
        } else {
            target
        };
        let beginmsg = Begin::new(target, port, begin_flags)
            .map_err(|e| Error::from_cell_enc(e, "begin message"))?;
        self.begin_data_stream(beginmsg.into(), optimistic).await
    }

    /// Start a new stream to the last relay in the tunnel, using
    /// a BEGIN_DIR cell.
    pub async fn begin_dir_stream(self: Arc<Self>) -> Result<DataStream> {
        // Note that we always open begindir connections optimistically.
        // Since they are local to a relay that we've already authenticated
        // with and built a tunnel to, there should be no additional checks
        // we need to perform to see whether the BEGINDIR will succeed.
        self.begin_data_stream(AnyRelayMsg::BeginDir(Default::default()), true)
            .await
    }

    /// Perform a DNS lookup, using a RESOLVE cell with the last relay
    /// in this tunnel.
    ///
    /// Note that this function does not check for timeouts; that's
    /// the caller's responsibility.
    pub async fn resolve(self: &Arc<Self>, hostname: &str) -> Result<Vec<IpAddr>> {
        let resolve_msg = Resolve::new(hostname);

        let resolved_msg = self.try_resolve(resolve_msg).await?;

        resolved_msg
            .into_answers()
            .into_iter()
            .filter_map(|(val, _)| match resolvedval_to_result(val) {
                Ok(ResolvedVal::Ip(ip)) => Some(Ok(ip)),
                Ok(_) => None,
                Err(e) => Some(Err(e)),
            })
            .collect()
    }

    /// Perform a reverse DNS lookup, by sending a RESOLVE cell with
    /// the last relay on this tunnel.
    ///
    /// Note that this function does not check for timeouts; that's
    /// the caller's responsibility.
    pub async fn resolve_ptr(self: &Arc<Self>, addr: IpAddr) -> Result<Vec<String>> {
        let resolve_ptr_msg = Resolve::new_reverse(&addr);

        let resolved_msg = self.try_resolve(resolve_ptr_msg).await?;

        resolved_msg
            .into_answers()
            .into_iter()
            .filter_map(|(val, _)| match resolvedval_to_result(val) {
                Ok(ResolvedVal::Hostname(v)) => Some(
                    String::from_utf8(v)
                        .map_err(|_| Error::StreamProto("Resolved Hostname was not utf-8".into())),
                ),
                Ok(_) => None,
                Err(e) => Some(Err(e)),
            })
            .collect()
    }

    /// Send an ad-hoc message to a given hop on the circuit, without expecting
    /// a reply.
    ///
    /// (If you want to handle one or more possible replies, see
    /// [`ClientTunnel::start_conversation`].)
    // TODO(conflux): Change this to use the ReactorHandle for the control commands.
    #[cfg(feature = "send-control-msg")]
    pub async fn send_raw_msg(
        &self,
        msg: tor_cell::relaycell::msg::AnyRelayMsg,
        hop: TargetHop,
    ) -> Result<()> {
        let (sender, receiver) = oneshot::channel();
        let ctrl_msg = CtrlMsg::SendMsg { hop, msg, sender };
        self.circ
            .control
            .unbounded_send(ctrl_msg)
            .map_err(|_| Error::CircuitClosed)?;

        receiver.await.map_err(|_| Error::CircuitClosed)?
    }

    /// Start an ad-hoc protocol exchange to the specified hop on this tunnel.
    ///
    /// To use this:
    ///
    ///  0. Create an inter-task channel you'll use to receive
    ///     the outcome of your conversation,
    ///     and bundle it into a [`UserMsgHandler`].
    ///
    ///  1. Call `start_conversation`.
    ///     This will install a your handler, for incoming messages,
    ///     and send the outgoing message (if you provided one).
    ///     After that, each message on the circuit
    ///     that isn't handled by the core machinery
    ///     is passed to your provided `reply_handler`.
    ///
    ///  2. Possibly call `send_msg` on the [`Conversation`],
    ///     from the call site of `start_conversation`,
    ///     possibly multiple times, from time to time,
    ///     to send further desired messages to the peer.
    ///
    ///  3. In your [`UserMsgHandler`], process the incoming messages.
    ///     You may respond by
    ///     sending additional messages
    ///     When the protocol exchange is finished,
    ///     `UserMsgHandler::handle_msg` should return
    ///     [`ConversationFinished`](reactor::MetaCellDisposition::ConversationFinished).
    ///
    /// If you don't need the `Conversation` to send followup messages,
    /// you may simply drop it,
    /// and rely on the responses you get from your handler,
    /// on the channel from step 0 above.
    /// Your handler will remain installed and able to process incoming messages
    /// until it returns `ConversationFinished`.
    ///
    /// (If you don't want to accept any replies at all, it may be
    /// simpler to use [`ClientTunnel::send_raw_msg`].)
    ///
    /// Note that it is quite possible to use this function to violate the tor
    /// protocol; most users of this API will not need to call it.  It is used
    /// to implement most of the onion service handshake.
    ///
    /// # Limitations
    ///
    /// Only one conversation may be active at any one time,
    /// for any one circuit.
    /// This generally means that this function should not be called
    /// on a tunnel which might be shared with anyone else.
    ///
    /// Likewise, it is forbidden to try to extend the tunnel,
    /// while the conversation is in progress.
    ///
    /// After the conversation has finished, the tunnel may be extended.
    /// Or, `start_conversation` may be called again;
    /// but, in that case there will be a gap between the two conversations,
    /// during which no `UserMsgHandler` is installed,
    /// and unexpected incoming messages would close the tunnel.
    ///
    /// If these restrictions are violated, the tunnel will be closed with an error.
    ///
    /// ## Precise definition of the lifetime of a conversation
    ///
    /// A conversation is in progress from entry to `start_conversation`,
    /// until entry to the body of the [`UserMsgHandler::handle_msg`](MsgHandler::handle_msg)
    /// call which returns [`ConversationFinished`](reactor::MetaCellDisposition::ConversationFinished).
    /// (*Entry* since `handle_msg` is synchronously embedded
    /// into the incoming message processing.)
    /// So you may start a new conversation as soon as you have the final response
    /// via your inter-task channel from (0) above.
    ///
    /// The lifetime relationship of the [`Conversation`],
    /// vs the handler returning `ConversationFinished`
    /// is not enforced by the type system.
    // Doing so without still leaving plenty of scope for runtime errors doesn't seem possible,
    // at least while allowing sending followup messages from outside the handler.
    #[cfg(feature = "send-control-msg")]
    pub async fn start_conversation(
        &self,
        msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
        reply_handler: impl MsgHandler + Send + 'static,
        hop: TargetHop,
    ) -> Result<Conversation<'_>> {
        // We need to resolve the TargetHop into a precise HopLocation so our msg handler can match
        // the right Leg/Hop with inbound cell.
        let (sender, receiver) = oneshot::channel();
        self.circ
            .command
            .unbounded_send(CtrlCmd::ResolveTargetHop { hop, done: sender })
            .map_err(|_| Error::CircuitClosed)?;
        let hop_location = receiver.await.map_err(|_| Error::CircuitClosed)??;
        let handler = Box::new(UserMsgHandler::new(hop_location, reply_handler));
        let conversation = Conversation(self);
        conversation.send_internal(msg, Some(handler)).await?;
        Ok(conversation)
    }

    /// Shut down this tunnel, along with all streams that are using it. Happens asynchronously
    /// (i.e. the tunnel won't necessarily be done shutting down immediately after this function
    /// returns!).
    ///
    /// Note that other references to this tunnel may exist. If they do, they will stop working
    /// after you call this function.
    ///
    /// It's not necessary to call this method if you're just done with a tunnel: the tunnel should
    /// close on its own once nothing is using it any more.
    // TODO(conflux): This should use the ReactorHandle instead.
    pub fn terminate(&self) {
        let _ = self.circ.command.unbounded_send(CtrlCmd::Shutdown);
    }

    /// Helper: Send the resolve message, and read resolved message from
    /// resolve stream.
    async fn try_resolve(self: &Arc<Self>, msg: Resolve) -> Result<Resolved> {
        let components = self
            .begin_stream_impl(msg.into(), ResolveCmdChecker::new_any())
            .await?;

        let StreamComponents {
            stream_receiver,
            target: _,
            memquota,
            xon_xoff_reader_ctrl: _,
        } = components;

        let mut resolve_stream = ResolveStream::new(stream_receiver, memquota);
        resolve_stream.read_msg().await
    }

    // TODO(conflux)
}

// TODO(conflux): We will likely need to enforce some invariants here, for example that the `circ`
// has the expected (non-zero) number of hops.
impl TryFrom<ClientCirc> for ClientTunnel {
    type Error = Error;

    fn try_from(circ: ClientCirc) -> std::result::Result<Self, Self::Error> {
        Ok(Self { circ })
    }
}

/// Convert a [`ResolvedVal`] into a Result, based on whether or not
/// it represents an error.
fn resolvedval_to_result(val: ResolvedVal) -> Result<ResolvedVal> {
    match val {
        ResolvedVal::TransientError => Err(Error::ResolveError(ResolveError::Transient)),
        ResolvedVal::NontransientError => Err(Error::ResolveError(ResolveError::Nontransient)),
        ResolvedVal::Unrecognized(_, _) => Err(Error::ResolveError(ResolveError::Unrecognized)),
        _ => Ok(val),
    }
}

/// A precise position in a tunnel.
#[derive(Debug, Deftly, Copy, Clone, PartialEq, Eq)]
#[derive_deftly(HasMemoryCost)]
#[non_exhaustive]
pub enum HopLocation {
    /// A specific position in a tunnel.
    Hop((UniqId, HopNum)),
    /// The join point of a multi-path tunnel.
    JoinPoint,
}

/// A position in a tunnel.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum TargetHop {
    /// A specific position in a tunnel.
    Hop(HopLocation),
    /// The last hop of a tunnel.
    ///
    /// This should be used only when you don't care about what specific hop is used.
    /// Some tunnels may be extended or truncated,
    /// which means that the "last hop" may change at any time.
    LastHop,
}

impl From<(UniqId, HopNum)> for HopLocation {
    fn from(v: (UniqId, HopNum)) -> Self {
        HopLocation::Hop(v)
    }
}

impl From<(UniqId, HopNum)> for TargetHop {
    fn from(v: (UniqId, HopNum)) -> Self {
        TargetHop::Hop(v.into())
    }
}

impl HopLocation {
    /// Return the hop number if not a JointPoint.
    pub fn hop_num(&self) -> Option<HopNum> {
        match self {
            Self::Hop((_, hop_num)) => Some(*hop_num),
            Self::JoinPoint => None,
        }
    }
}

impl ClientTunnel {
    /// Close the pending stream that owns this StreamTarget, delivering the specified
    /// END message (if any)
    ///
    /// See [`StreamTarget::close_pending`].
    #[cfg(feature = "hs-service")]
    pub(crate) fn close_pending(
        &self,
        stream_id: StreamId,
        hop: Option<HopLocation>,
        message: crate::stream::CloseStreamBehavior,
    ) -> Result<oneshot::Receiver<Result<()>>> {
        let (tx, rx) = oneshot::channel();

        self.circ
            .control
            .unbounded_send(CtrlMsg::ClosePendingStream {
                stream_id,
                hop: hop.expect("missing stream hop for client tunnel"),
                message,
                done: tx,
            })
            .map_err(|_| Error::CircuitClosed)?;

        Ok(rx)
    }

    /// Request to send a SENDME cell for this stream.
    ///
    /// See [`StreamTarget::send_sendme`].
    pub(crate) fn send_sendme(&self, stream_id: StreamId, hop: Option<HopLocation>) -> Result<()> {
        self.circ
            .control
            .unbounded_send(CtrlMsg::FlowCtrlUpdate {
                msg: FlowCtrlMsg::Sendme,
                stream_id,
                hop: hop.expect("missing stream hop for client tunnel"),
            })
            .map_err(|_| Error::CircuitClosed)
    }

    /// Inform the circuit reactor that there has been a change in the drain rate for this stream.
    ///
    /// See [`StreamTarget::drain_rate_update`].
    pub(crate) fn drain_rate_update(
        &self,
        stream_id: StreamId,
        hop: Option<HopLocation>,
        rate: XonKbpsEwma,
    ) -> Result<()> {
        self.circ
            .control
            .unbounded_send(CtrlMsg::FlowCtrlUpdate {
                msg: FlowCtrlMsg::Xon(rate),
                stream_id,
                hop: hop.expect("missing stream hop for client tunnel"),
            })
            .map_err(|_| Error::CircuitClosed)
    }
}