Skip to main content

tor_proto/
client.rs

1//! Client-specific types and implementation.
2
3pub mod channel;
4pub mod circuit;
5pub mod stream;
6
7#[cfg(feature = "send-control-msg")]
8pub(crate) mod msghandler;
9pub(crate) mod reactor;
10
11use derive_deftly::Deftly;
12use oneshot_fused_workaround as oneshot;
13use std::net::IpAddr;
14use std::sync::Arc;
15use tracing::instrument;
16
17use crate::circuit::UniqId;
18#[cfg(feature = "circ-padding-manual")]
19pub use crate::client::circuit::padding::{
20    CircuitPadder, CircuitPadderConfig, CircuitPadderConfigError,
21};
22use crate::client::stream::{
23    DataStream, OutboundDataCmdChecker, ResolveCmdChecker, ResolveStream, StreamParameters,
24    StreamReceiver,
25};
26use crate::congestion::sendme::StreamRecvWindow;
27use crate::crypto::cell::HopNum;
28use crate::memquota::{SpecificAccount as _, StreamAccount};
29use crate::stream::STREAM_READER_BUFFER;
30use crate::stream::cmdcheck::AnyCmdChecker;
31use crate::stream::flow_ctrl::state::StreamRateLimit;
32use crate::stream::flow_ctrl::xon_xoff::reader::XonXoffReaderCtrl;
33use crate::stream::queue::stream_queue;
34use crate::stream::{RECV_WINDOW_INIT, StreamComponents, StreamTarget, Tunnel};
35use crate::util::notify::NotifySender;
36use crate::{Error, ResolveError, Result};
37use circuit::{CIRCUIT_BUFFER_SIZE, ClientCirc, Path};
38use reactor::{CtrlCmd, CtrlMsg, FlowCtrlMsg, MetaCellHandler};
39
40use postage::watch;
41use tor_cell::relaycell::StreamId;
42use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
43use tor_cell::relaycell::msg::{AnyRelayMsg, Begin, Resolve, Resolved, ResolvedVal};
44use tor_error::bad_api_usage;
45use tor_linkspec::OwnedChanTarget;
46use tor_memquota::derive_deftly_template_HasMemoryCost;
47use tor_memquota::mq_queue::{ChannelSpec as _, MpscSpec};
48
49#[cfg(feature = "hs-service")]
50use crate::stream::incoming::StreamReqInfo;
51
52#[cfg(feature = "hs-service")]
53use crate::client::stream::{IncomingCmdChecker, IncomingStream};
54
55#[cfg(feature = "send-control-msg")]
56use msghandler::{MsgHandler, UserMsgHandler};
57
58/// Handle to use during an ongoing protocol exchange with a circuit's last hop
59///
60/// This is obtained from [`ClientTunnel::start_conversation`],
61/// and used to send messages to the last hop relay.
62//
63// TODO(conflux): this should use ClientTunnel, and it should be moved into
64// the tunnel module.
65#[cfg(feature = "send-control-msg")]
66pub struct Conversation<'r>(&'r ClientTunnel);
67
68#[cfg(feature = "send-control-msg")]
69impl Conversation<'_> {
70    /// Send a protocol message as part of an ad-hoc exchange
71    ///
72    /// Responses are handled by the `UserMsgHandler` set up
73    /// when the `Conversation` was created.
74    pub async fn send_message(&self, msg: tor_cell::relaycell::msg::AnyRelayMsg) -> Result<()> {
75        self.send_internal(Some(msg), None).await
76    }
77
78    /// Send a `SendMsgAndInstallHandler` to the reactor and wait for the outcome
79    ///
80    /// The guts of `start_conversation` and `Conversation::send_msg`
81    pub(crate) async fn send_internal(
82        &self,
83        msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
84        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
85    ) -> Result<()> {
86        let msg = msg.map(|msg| tor_cell::relaycell::AnyRelayMsgOuter::new(None, msg));
87        let (sender, receiver) = oneshot::channel();
88
89        let ctrl_msg = CtrlMsg::SendMsgAndInstallHandler {
90            msg,
91            handler,
92            sender,
93        };
94        self.0
95            .circ
96            .control
97            .unbounded_send(ctrl_msg)
98            .map_err(|_| Error::CircuitClosed)?;
99
100        receiver.await.map_err(|_| Error::CircuitClosed)?
101    }
102}
103
104/// A low-level client tunnel API.
105///
106/// This is a communication channel to the tunnel reactor, which manages 1 or more circuits.
107///
108/// Note: the tor-circmgr crates wrap this type in specialized *Tunnel types exposing only the
109/// desired subset of functionality depending on purpose and path size.
110///
111/// Some API calls are for single path and some for multi path. A check with the underlying reactor
112/// is done preventing for instance multi path calls to be used on a single path. Top level types
113/// should prevent this and thus this object should never be used directly.
114#[derive(Debug)]
115#[allow(dead_code)] // TODO(conflux)
116pub struct ClientTunnel {
117    /// The underlying handle to the reactor.
118    circ: ClientCirc,
119}
120
121impl ClientTunnel {
122    /// Return a handle to the `ClientCirc` of this `ClientTunnel`, if the tunnel is a single
123    /// circuit tunnel.
124    ///
125    /// Returns an error if the tunnel has more than one circuit.
126    pub fn as_single_circ(&self) -> Result<&ClientCirc> {
127        if self.circ.is_multi_path {
128            return Err(bad_api_usage!("Single circuit getter on multi path tunnel"))?;
129        }
130        Ok(&self.circ)
131    }
132
133    /// Return the channel target of the first hop.
134    ///
135    /// Can only be used for single path tunnel.
136    pub fn first_hop(&self) -> Result<OwnedChanTarget> {
137        self.as_single_circ()?.first_hop()
138    }
139
140    /// Return true if the circuit reactor is closed meaning the circuit is unusable for both
141    /// receiving or sending.
142    pub fn is_closed(&self) -> bool {
143        self.circ.is_closing()
144    }
145
146    /// Return a [`TargetHop`] representing precisely the last hop of the circuit as in set as a
147    /// HopLocation with its id and hop number.
148    ///
149    /// Return an error if there is no last hop.
150    pub fn last_hop(&self) -> Result<TargetHop> {
151        let uniq_id = self.unique_id();
152        let hop_num = self
153            .circ
154            .mutable
155            .last_hop_num(uniq_id)?
156            .ok_or_else(|| bad_api_usage!("no last hop"))?;
157        Ok((uniq_id, hop_num).into())
158    }
159
160    /// Return a description of the last hop of the tunnel.
161    ///
162    /// Return None if the last hop is virtual; return an error
163    /// if the tunnel has no circuits, or all of its circuits are zero length.
164    ///
165    ///
166    /// # Panics
167    ///
168    /// Panics if there is no last hop.  (This should be impossible outside of
169    /// the tor-proto crate, but within the crate it's possible to have a
170    /// circuit with no hops.)
171    pub fn last_hop_info(&self) -> Result<Option<OwnedChanTarget>> {
172        self.circ.last_hop_info()
173    }
174
175    /// Return the number of hops this tunnel as. Fail for a multi path.
176    pub fn n_hops(&self) -> Result<usize> {
177        self.as_single_circ()?.n_hops()
178    }
179
180    /// Return the [`Path`] objects describing all the hops
181    /// of all the circuits in this tunnel.
182    pub fn all_paths(&self) -> Vec<Arc<Path>> {
183        self.circ.all_paths()
184    }
185
186    /// Return a process-unique identifier for this tunnel.
187    ///
188    /// Returns the reactor unique ID of the main reactor.
189    pub fn unique_id(&self) -> UniqId {
190        self.circ.unique_id()
191    }
192
193    /// Return the time at which this tunnel last had any open streams.
194    ///
195    /// Returns `None` if this tunnel has never had any open streams,
196    /// or if it currently has open streams.
197    ///
198    /// NOTE that the Instant returned by this method is not affected by
199    /// any runtime mocking; it is the output of an ordinary call to
200    /// `Instant::now()`.
201    pub async fn disused_since(&self) -> Result<Option<std::time::Instant>> {
202        self.circ.disused_since().await
203    }
204
205    /// Return a future that will resolve once the underlying circuit reactor has closed.
206    ///
207    /// Note that this method does not itself cause the tunnel to shut down.
208    pub fn wait_for_close(
209        self: &Arc<Self>,
210    ) -> impl futures::Future<Output = ()> + Send + Sync + 'static + use<> {
211        self.circ.wait_for_close()
212    }
213
214    /// Single-path tunnel only. Multi path onion service is not supported yet.
215    ///
216    /// Tell this tunnel to begin allowing the final hop of the tunnel to try
217    /// to create new Tor streams, and to return those pending requests in an
218    /// asynchronous stream.
219    ///
220    /// Ordinarily, these requests are rejected.
221    ///
222    /// There can only be one [`Stream`](futures::Stream) of this type created on a given tunnel.
223    /// If a such a [`Stream`](futures::Stream) already exists, this method will return
224    /// an error.
225    ///
226    /// After this method has been called on a tunnel, the tunnel is expected
227    /// to receive requests of this type indefinitely, until it is finally closed.
228    /// If the `Stream` is dropped, the next request on this tunnel will cause it to close.
229    ///
230    /// Only onion services (and eventually) exit relays should call this
231    /// method.
232    //
233    // TODO: Someday, we might want to allow a stream request handler to be
234    // un-registered.  However, nothing in the Tor protocol requires it.
235    //
236    // Any incoming request handlers installed on the other circuits
237    // (which are are shutdown using CtrlCmd::ShutdownAndReturnCircuit)
238    // will be discarded (along with the reactor of that circuit)
239    #[cfg(feature = "hs-service")]
240    #[allow(unreachable_code, unused_variables)] // TODO(conflux)
241    pub async fn allow_stream_requests<'a, FILT>(
242        self: &Arc<Self>,
243        allow_commands: &'a [tor_cell::relaycell::RelayCmd],
244        hop: TargetHop,
245        filter: FILT,
246    ) -> Result<impl futures::Stream<Item = IncomingStream> + use<'a, FILT>>
247    where
248        FILT: crate::client::stream::IncomingStreamRequestFilter + 'a,
249    {
250        use futures::stream::StreamExt;
251
252        /// The size of the channel receiving IncomingStreamRequestContexts.
253        const INCOMING_BUFFER: usize = STREAM_READER_BUFFER;
254
255        // TODO(#2002): support onion service conflux
256        let circ = self.as_single_circ().map_err(tor_error::into_internal!(
257            "Cannot allow stream requests on a multi-path tunnel"
258        ))?;
259
260        let time_prov = circ.time_provider.clone();
261        let cmd_checker = IncomingCmdChecker::new_any(allow_commands);
262        let (incoming_sender, incoming_receiver) = MpscSpec::new(INCOMING_BUFFER)
263            .new_mq(time_prov.clone(), circ.memquota.as_raw_account())?;
264        let (tx, rx) = oneshot::channel();
265
266        circ.command
267            .unbounded_send(CtrlCmd::AwaitStreamRequest {
268                cmd_checker,
269                incoming_sender,
270                hop,
271                done: tx,
272                filter: Box::new(filter),
273            })
274            .map_err(|_| Error::CircuitClosed)?;
275
276        // Check whether the AwaitStreamRequest was processed successfully.
277        rx.await.map_err(|_| Error::CircuitClosed)??;
278
279        let allowed_hop_loc: HopLocation = match hop {
280            TargetHop::Hop(loc) => Some(loc),
281            _ => None,
282        }
283        .ok_or_else(|| bad_api_usage!("Expected TargetHop with HopLocation"))?;
284
285        let tunnel = self.clone();
286        Ok(incoming_receiver.map(move |req_ctx| {
287            let StreamReqInfo {
288                req,
289                stream_id,
290                hop,
291                receiver,
292                msg_tx,
293                rate_limit_stream,
294                drain_rate_request_stream,
295                memquota,
296                relay_cell_format,
297            } = req_ctx;
298
299            // We already enforce this in handle_incoming_stream_request; this
300            // assertion is just here to make sure that we don't ever
301            // accidentally remove or fail to enforce that check, since it is
302            // security-critical.
303            assert_eq!(Some(allowed_hop_loc), hop);
304
305            // TODO(#2002): figure out what this is going to look like
306            // for onion services (perhaps we should forbid this function
307            // from being called on a multipath circuit?)
308            //
309            // See also:
310            // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3002#note_3200937
311            let target = StreamTarget {
312                tunnel: Tunnel::Client(Arc::clone(&tunnel)),
313                tx: msg_tx,
314                hop: Some(allowed_hop_loc),
315                stream_id,
316                relay_cell_format,
317                rate_limit_stream,
318            };
319
320            // can be used to build a reader that supports XON/XOFF flow control
321            let xon_xoff_reader_ctrl =
322                XonXoffReaderCtrl::new(drain_rate_request_stream, target.clone());
323
324            let reader = StreamReceiver {
325                target: target.clone(),
326                receiver,
327                recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
328                ended: false,
329            };
330
331            let components = StreamComponents {
332                stream_receiver: reader,
333                target,
334                memquota,
335                xon_xoff_reader_ctrl,
336            };
337
338            IncomingStream::new(time_prov.clone(), req, components)
339        }))
340    }
341
342    /// Single and Multi path helper, used to begin a stream.
343    ///
344    /// This function allocates a stream ID, and sends the message
345    /// (like a BEGIN or RESOLVE), but doesn't wait for a response.
346    ///
347    /// The caller will typically want to see the first cell in response,
348    /// to see whether it is e.g. an END or a CONNECTED.
349    async fn begin_stream_impl(
350        self: &Arc<Self>,
351        begin_msg: AnyRelayMsg,
352        cmd_checker: AnyCmdChecker,
353    ) -> Result<StreamComponents> {
354        // TODO: Possibly this should take a hop, rather than just
355        // assuming it's the last hop.
356        let hop = TargetHop::LastHop;
357
358        let time_prov = self.circ.time_provider.clone();
359
360        let memquota = StreamAccount::new(self.circ.mq_account())?;
361        let (sender, receiver) = stream_queue(
362            #[cfg(not(feature = "flowctl-cc"))]
363            STREAM_READER_BUFFER,
364            &memquota,
365            &time_prov,
366        )?;
367        let (tx, rx) = oneshot::channel();
368        let (msg_tx, msg_rx) =
369            MpscSpec::new(CIRCUIT_BUFFER_SIZE).new_mq(time_prov, memquota.as_raw_account())?;
370
371        let (rate_limit_tx, rate_limit_rx) = watch::channel_with(StreamRateLimit::MAX);
372
373        // A channel for the reactor to request a new drain rate from the reader.
374        // Typically this notification will be sent after an XOFF is sent so that the reader can
375        // send us a new drain rate when the stream data queue becomes empty.
376        let mut drain_rate_request_tx = NotifySender::new_typed();
377        let drain_rate_request_rx = drain_rate_request_tx.subscribe();
378
379        self.circ
380            .control
381            .unbounded_send(CtrlMsg::BeginStream {
382                hop,
383                message: begin_msg,
384                sender,
385                rx: msg_rx,
386                rate_limit_notifier: rate_limit_tx,
387                drain_rate_requester: drain_rate_request_tx,
388                done: tx,
389                cmd_checker,
390            })
391            .map_err(|_| Error::CircuitClosed)?;
392
393        let (stream_id, hop, relay_cell_format) = rx.await.map_err(|_| Error::CircuitClosed)??;
394
395        let target = StreamTarget {
396            tunnel: Tunnel::Client(self.clone()),
397            tx: msg_tx,
398            hop: Some(hop),
399            stream_id,
400            relay_cell_format,
401            rate_limit_stream: rate_limit_rx,
402        };
403
404        // can be used to build a reader that supports XON/XOFF flow control
405        let xon_xoff_reader_ctrl = XonXoffReaderCtrl::new(drain_rate_request_rx, target.clone());
406
407        let stream_receiver = StreamReceiver {
408            target: target.clone(),
409            receiver,
410            recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
411            ended: false,
412        };
413
414        let components = StreamComponents {
415            stream_receiver,
416            target,
417            memquota,
418            xon_xoff_reader_ctrl,
419        };
420
421        Ok(components)
422    }
423
424    /// Install a [`CircuitPadder`] at the listed `hop`.
425    ///
426    /// Replaces any previous padder installed at that hop.
427    #[cfg(feature = "circ-padding-manual")]
428    pub async fn start_padding_at_hop(
429        self: &Arc<Self>,
430        hop: HopLocation,
431        padder: CircuitPadder,
432    ) -> Result<()> {
433        self.circ.set_padder_impl(hop, Some(padder)).await
434    }
435
436    /// Remove any [`CircuitPadder`] at the listed `hop`.
437    ///
438    /// Does nothing if there was not a padder installed there.
439    #[cfg(feature = "circ-padding-manual")]
440    pub async fn stop_padding_at_hop(self: &Arc<Self>, hop: HopLocation) -> Result<()> {
441        self.circ.set_padder_impl(hop, None).await
442    }
443
444    /// Start a DataStream (anonymized connection) to the given
445    /// address and port, using a BEGIN cell.
446    async fn begin_data_stream(
447        self: &Arc<Self>,
448        msg: AnyRelayMsg,
449        optimistic: bool,
450    ) -> Result<DataStream> {
451        let components = self
452            .begin_stream_impl(msg, OutboundDataCmdChecker::new_any())
453            .await?;
454
455        let StreamComponents {
456            stream_receiver,
457            target,
458            memquota,
459            xon_xoff_reader_ctrl,
460        } = components;
461
462        let mut stream = DataStream::new(
463            self.circ.time_provider.clone(),
464            stream_receiver,
465            xon_xoff_reader_ctrl,
466            target,
467            memquota,
468        );
469        if !optimistic {
470            stream.wait_for_connection().await?;
471        }
472        Ok(stream)
473    }
474
475    /// Single and multi path helper.
476    ///
477    /// Start a stream to the given address and port, using a BEGIN
478    /// cell.
479    ///
480    /// The use of a string for the address is intentional: you should let
481    /// the remote Tor relay do the hostname lookup for you.
482    #[instrument(level = "trace", skip_all)]
483    pub async fn begin_stream(
484        self: &Arc<Self>,
485        target: &str,
486        port: u16,
487        parameters: Option<StreamParameters>,
488    ) -> Result<DataStream> {
489        let parameters = parameters.unwrap_or_default();
490        let begin_flags = parameters.begin_flags();
491        let optimistic = parameters.is_optimistic();
492        let target = if parameters.suppressing_hostname() {
493            ""
494        } else {
495            target
496        };
497        let beginmsg = Begin::new(target, port, begin_flags)
498            .map_err(|e| Error::from_cell_enc(e, "begin message"))?;
499        self.begin_data_stream(beginmsg.into(), optimistic).await
500    }
501
502    /// Start a new stream to the last relay in the tunnel, using
503    /// a BEGIN_DIR cell.
504    pub async fn begin_dir_stream(self: Arc<Self>) -> Result<DataStream> {
505        // Note that we always open begindir connections optimistically.
506        // Since they are local to a relay that we've already authenticated
507        // with and built a tunnel to, there should be no additional checks
508        // we need to perform to see whether the BEGINDIR will succeed.
509        self.begin_data_stream(AnyRelayMsg::BeginDir(Default::default()), true)
510            .await
511    }
512
513    /// Perform a DNS lookup, using a RESOLVE cell with the last relay
514    /// in this tunnel.
515    ///
516    /// Note that this function does not check for timeouts; that's
517    /// the caller's responsibility.
518    pub async fn resolve(self: &Arc<Self>, hostname: &str) -> Result<Vec<IpAddr>> {
519        let resolve_msg = Resolve::new(hostname);
520
521        let resolved_msg = self.try_resolve(resolve_msg).await?;
522
523        resolved_msg
524            .into_answers()
525            .into_iter()
526            .filter_map(|(val, _)| match resolvedval_to_result(val) {
527                Ok(ResolvedVal::Ip(ip)) => Some(Ok(ip)),
528                Ok(_) => None,
529                Err(e) => Some(Err(e)),
530            })
531            .collect()
532    }
533
534    /// Perform a reverse DNS lookup, by sending a RESOLVE cell with
535    /// the last relay on this tunnel.
536    ///
537    /// Note that this function does not check for timeouts; that's
538    /// the caller's responsibility.
539    pub async fn resolve_ptr(self: &Arc<Self>, addr: IpAddr) -> Result<Vec<String>> {
540        let resolve_ptr_msg = Resolve::new_reverse(&addr);
541
542        let resolved_msg = self.try_resolve(resolve_ptr_msg).await?;
543
544        resolved_msg
545            .into_answers()
546            .into_iter()
547            .filter_map(|(val, _)| match resolvedval_to_result(val) {
548                Ok(ResolvedVal::Hostname(v)) => Some(
549                    String::from_utf8(v)
550                        .map_err(|_| Error::StreamProto("Resolved Hostname was not utf-8".into())),
551                ),
552                Ok(_) => None,
553                Err(e) => Some(Err(e)),
554            })
555            .collect()
556    }
557
558    /// Send an ad-hoc message to a given hop on the circuit, without expecting
559    /// a reply.
560    ///
561    /// (If you want to handle one or more possible replies, see
562    /// [`ClientTunnel::start_conversation`].)
563    // TODO(conflux): Change this to use the ReactorHandle for the control commands.
564    #[cfg(feature = "send-control-msg")]
565    pub async fn send_raw_msg(
566        &self,
567        msg: tor_cell::relaycell::msg::AnyRelayMsg,
568        hop: TargetHop,
569    ) -> Result<()> {
570        let (sender, receiver) = oneshot::channel();
571        let ctrl_msg = CtrlMsg::SendMsg { hop, msg, sender };
572        self.circ
573            .control
574            .unbounded_send(ctrl_msg)
575            .map_err(|_| Error::CircuitClosed)?;
576
577        receiver.await.map_err(|_| Error::CircuitClosed)?
578    }
579
580    /// Start an ad-hoc protocol exchange to the specified hop on this tunnel.
581    ///
582    /// To use this:
583    ///
584    ///  0. Create an inter-task channel you'll use to receive
585    ///     the outcome of your conversation,
586    ///     and bundle it into a [`UserMsgHandler`].
587    ///
588    ///  1. Call `start_conversation`.
589    ///     This will install a your handler, for incoming messages,
590    ///     and send the outgoing message (if you provided one).
591    ///     After that, each message on the circuit
592    ///     that isn't handled by the core machinery
593    ///     is passed to your provided `reply_handler`.
594    ///
595    ///  2. Possibly call `send_msg` on the [`Conversation`],
596    ///     from the call site of `start_conversation`,
597    ///     possibly multiple times, from time to time,
598    ///     to send further desired messages to the peer.
599    ///
600    ///  3. In your [`UserMsgHandler`], process the incoming messages.
601    ///     You may respond by
602    ///     sending additional messages
603    ///     When the protocol exchange is finished,
604    ///     `UserMsgHandler::handle_msg` should return
605    ///     [`ConversationFinished`](reactor::MetaCellDisposition::ConversationFinished).
606    ///
607    /// If you don't need the `Conversation` to send followup messages,
608    /// you may simply drop it,
609    /// and rely on the responses you get from your handler,
610    /// on the channel from step 0 above.
611    /// Your handler will remain installed and able to process incoming messages
612    /// until it returns `ConversationFinished`.
613    ///
614    /// (If you don't want to accept any replies at all, it may be
615    /// simpler to use [`ClientTunnel::send_raw_msg`].)
616    ///
617    /// Note that it is quite possible to use this function to violate the tor
618    /// protocol; most users of this API will not need to call it.  It is used
619    /// to implement most of the onion service handshake.
620    ///
621    /// # Limitations
622    ///
623    /// Only one conversation may be active at any one time,
624    /// for any one circuit.
625    /// This generally means that this function should not be called
626    /// on a tunnel which might be shared with anyone else.
627    ///
628    /// Likewise, it is forbidden to try to extend the tunnel,
629    /// while the conversation is in progress.
630    ///
631    /// After the conversation has finished, the tunnel may be extended.
632    /// Or, `start_conversation` may be called again;
633    /// but, in that case there will be a gap between the two conversations,
634    /// during which no `UserMsgHandler` is installed,
635    /// and unexpected incoming messages would close the tunnel.
636    ///
637    /// If these restrictions are violated, the tunnel will be closed with an error.
638    ///
639    /// ## Precise definition of the lifetime of a conversation
640    ///
641    /// A conversation is in progress from entry to `start_conversation`,
642    /// until entry to the body of the [`UserMsgHandler::handle_msg`](MsgHandler::handle_msg)
643    /// call which returns [`ConversationFinished`](reactor::MetaCellDisposition::ConversationFinished).
644    /// (*Entry* since `handle_msg` is synchronously embedded
645    /// into the incoming message processing.)
646    /// So you may start a new conversation as soon as you have the final response
647    /// via your inter-task channel from (0) above.
648    ///
649    /// The lifetime relationship of the [`Conversation`],
650    /// vs the handler returning `ConversationFinished`
651    /// is not enforced by the type system.
652    // Doing so without still leaving plenty of scope for runtime errors doesn't seem possible,
653    // at least while allowing sending followup messages from outside the handler.
654    #[cfg(feature = "send-control-msg")]
655    pub async fn start_conversation(
656        &self,
657        msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
658        reply_handler: impl MsgHandler + Send + 'static,
659        hop: TargetHop,
660    ) -> Result<Conversation<'_>> {
661        // We need to resolve the TargetHop into a precise HopLocation so our msg handler can match
662        // the right Leg/Hop with inbound cell.
663        let (sender, receiver) = oneshot::channel();
664        self.circ
665            .command
666            .unbounded_send(CtrlCmd::ResolveTargetHop { hop, done: sender })
667            .map_err(|_| Error::CircuitClosed)?;
668        let hop_location = receiver.await.map_err(|_| Error::CircuitClosed)??;
669        let handler = Box::new(UserMsgHandler::new(hop_location, reply_handler));
670        let conversation = Conversation(self);
671        conversation.send_internal(msg, Some(handler)).await?;
672        Ok(conversation)
673    }
674
675    /// Shut down this tunnel, along with all streams that are using it. Happens asynchronously
676    /// (i.e. the tunnel won't necessarily be done shutting down immediately after this function
677    /// returns!).
678    ///
679    /// Note that other references to this tunnel may exist. If they do, they will stop working
680    /// after you call this function.
681    ///
682    /// It's not necessary to call this method if you're just done with a tunnel: the tunnel should
683    /// close on its own once nothing is using it any more.
684    // TODO(conflux): This should use the ReactorHandle instead.
685    pub fn terminate(&self) {
686        let _ = self.circ.command.unbounded_send(CtrlCmd::Shutdown);
687    }
688
689    /// Helper: Send the resolve message, and read resolved message from
690    /// resolve stream.
691    async fn try_resolve(self: &Arc<Self>, msg: Resolve) -> Result<Resolved> {
692        let components = self
693            .begin_stream_impl(msg.into(), ResolveCmdChecker::new_any())
694            .await?;
695
696        let StreamComponents {
697            stream_receiver,
698            target: _,
699            memquota,
700            xon_xoff_reader_ctrl: _,
701        } = components;
702
703        let mut resolve_stream = ResolveStream::new(stream_receiver, memquota);
704        resolve_stream.read_msg().await
705    }
706
707    // TODO(conflux)
708}
709
710// TODO(conflux): We will likely need to enforce some invariants here, for example that the `circ`
711// has the expected (non-zero) number of hops.
712impl TryFrom<ClientCirc> for ClientTunnel {
713    type Error = Error;
714
715    fn try_from(circ: ClientCirc) -> std::result::Result<Self, Self::Error> {
716        Ok(Self { circ })
717    }
718}
719
720/// Convert a [`ResolvedVal`] into a Result, based on whether or not
721/// it represents an error.
722fn resolvedval_to_result(val: ResolvedVal) -> Result<ResolvedVal> {
723    match val {
724        ResolvedVal::TransientError => Err(Error::ResolveError(ResolveError::Transient)),
725        ResolvedVal::NontransientError => Err(Error::ResolveError(ResolveError::Nontransient)),
726        ResolvedVal::Unrecognized(_, _) => Err(Error::ResolveError(ResolveError::Unrecognized)),
727        _ => Ok(val),
728    }
729}
730
731/// A precise position in a tunnel.
732#[derive(Debug, Deftly, Copy, Clone, PartialEq, Eq)]
733#[derive_deftly(HasMemoryCost)]
734#[non_exhaustive]
735pub enum HopLocation {
736    /// A specific position in a tunnel.
737    Hop((UniqId, HopNum)),
738    /// The join point of a multi-path tunnel.
739    JoinPoint,
740}
741
742/// A position in a tunnel.
743#[derive(Debug, Copy, Clone, PartialEq, Eq)]
744#[non_exhaustive]
745pub enum TargetHop {
746    /// A specific position in a tunnel.
747    Hop(HopLocation),
748    /// The last hop of a tunnel.
749    ///
750    /// This should be used only when you don't care about what specific hop is used.
751    /// Some tunnels may be extended or truncated,
752    /// which means that the "last hop" may change at any time.
753    LastHop,
754}
755
756impl From<(UniqId, HopNum)> for HopLocation {
757    fn from(v: (UniqId, HopNum)) -> Self {
758        HopLocation::Hop(v)
759    }
760}
761
762impl From<(UniqId, HopNum)> for TargetHop {
763    fn from(v: (UniqId, HopNum)) -> Self {
764        TargetHop::Hop(v.into())
765    }
766}
767
768impl HopLocation {
769    /// Return the hop number if not a JointPoint.
770    pub fn hop_num(&self) -> Option<HopNum> {
771        match self {
772            Self::Hop((_, hop_num)) => Some(*hop_num),
773            Self::JoinPoint => None,
774        }
775    }
776}
777
778impl ClientTunnel {
779    /// Close the pending stream that owns this StreamTarget, delivering the specified
780    /// END message (if any)
781    ///
782    /// See [`StreamTarget::close_pending`].
783    #[cfg(feature = "hs-service")]
784    pub(crate) fn close_pending(
785        &self,
786        stream_id: StreamId,
787        hop: Option<HopLocation>,
788        message: crate::stream::CloseStreamBehavior,
789    ) -> Result<oneshot::Receiver<Result<()>>> {
790        let (tx, rx) = oneshot::channel();
791
792        self.circ
793            .control
794            .unbounded_send(CtrlMsg::ClosePendingStream {
795                stream_id,
796                hop: hop.expect("missing stream hop for client tunnel"),
797                message,
798                done: tx,
799            })
800            .map_err(|_| Error::CircuitClosed)?;
801
802        Ok(rx)
803    }
804
805    /// Request to send a SENDME cell for this stream.
806    ///
807    /// See [`StreamTarget::send_sendme`].
808    pub(crate) fn send_sendme(&self, stream_id: StreamId, hop: Option<HopLocation>) -> Result<()> {
809        self.circ
810            .control
811            .unbounded_send(CtrlMsg::FlowCtrlUpdate {
812                msg: FlowCtrlMsg::Sendme,
813                stream_id,
814                hop: hop.expect("missing stream hop for client tunnel"),
815            })
816            .map_err(|_| Error::CircuitClosed)
817    }
818
819    /// Inform the circuit reactor that there has been a change in the drain rate for this stream.
820    ///
821    /// See [`StreamTarget::drain_rate_update`].
822    pub(crate) fn drain_rate_update(
823        &self,
824        stream_id: StreamId,
825        hop: Option<HopLocation>,
826        rate: XonKbpsEwma,
827    ) -> Result<()> {
828        self.circ
829            .control
830            .unbounded_send(CtrlMsg::FlowCtrlUpdate {
831                msg: FlowCtrlMsg::Xon(rate),
832                stream_id,
833                hop: hop.expect("missing stream hop for client tunnel"),
834            })
835            .map_err(|_| Error::CircuitClosed)
836    }
837}