Skip to main content

tor_proto/
client.rs

1//! Client-specific types and implementation.
2
3pub mod channel;
4pub mod circuit;
5pub mod stream;
6
7#[cfg(feature = "rpc")]
8pub mod rpc;
9
10#[cfg(feature = "send-control-msg")]
11pub(crate) mod msghandler;
12pub(crate) mod reactor;
13
14use derive_deftly::Deftly;
15use oneshot_fused_workaround as oneshot;
16use std::net::IpAddr;
17use std::sync::Arc;
18use tracing::instrument;
19
20use crate::circuit::UniqId;
21use crate::circuit::circhop::ReactorStreamComponents;
22#[cfg(feature = "circ-padding-manual")]
23pub use crate::client::circuit::padding::{
24    CircuitPadder, CircuitPadderConfig, CircuitPadderConfigError,
25};
26use crate::client::stream::{
27    DataStream, OutboundDataCmdChecker, ResolveCmdChecker, ResolveStream, StreamParameters,
28};
29use crate::congestion::sendme::StreamRecvWindow;
30use crate::crypto::cell::HopNum;
31use crate::memquota::{SpecificAccount as _, StreamAccount};
32use crate::stream::STREAM_READER_BUFFER;
33use crate::stream::cmdcheck::AnyCmdChecker;
34use crate::stream::flow_ctrl::xon_xoff::reader::XonXoffReaderCtrl;
35use crate::stream::{RECV_WINDOW_INIT, StreamComponents, StreamReceiver, StreamTarget, Tunnel};
36use crate::{Error, ResolveError, Result};
37use circuit::{ClientCirc, Path};
38use reactor::{CtrlCmd, CtrlMsg, FlowCtrlMsg, MetaCellHandler};
39
40use tor_cell::relaycell::StreamId;
41use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
42use tor_cell::relaycell::msg::{AnyRelayMsg, Begin, Resolve, Resolved, ResolvedVal};
43use tor_error::bad_api_usage;
44use tor_linkspec::OwnedChanTarget;
45use tor_memquota::derive_deftly_template_HasMemoryCost;
46use tor_memquota::mq_queue::{ChannelSpec as _, MpscSpec};
47
48#[cfg(feature = "hs-service")]
49use crate::stream::{IncomingCmdChecker, IncomingStream, StreamReqInfo};
50
51#[cfg(feature = "send-control-msg")]
52use msghandler::{MsgHandler, UserMsgHandler};
53
54/// Handle to use during an ongoing protocol exchange with a circuit's last hop
55///
56/// This is obtained from [`ClientTunnel::start_conversation`],
57/// and used to send messages to the last hop relay.
58//
59// TODO(conflux): this should use ClientTunnel, and it should be moved into
60// the tunnel module.
61#[cfg(feature = "send-control-msg")]
62pub struct Conversation<'r>(&'r ClientTunnel);
63
64#[cfg(feature = "send-control-msg")]
65impl Conversation<'_> {
66    /// Send a protocol message as part of an ad-hoc exchange
67    ///
68    /// Responses are handled by the `UserMsgHandler` set up
69    /// when the `Conversation` was created.
70    pub async fn send_message(&self, msg: tor_cell::relaycell::msg::AnyRelayMsg) -> Result<()> {
71        self.send_internal(Some(msg), None).await
72    }
73
74    /// Send a `SendMsgAndInstallHandler` to the reactor and wait for the outcome
75    ///
76    /// The guts of `start_conversation` and `Conversation::send_msg`
77    pub(crate) async fn send_internal(
78        &self,
79        msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
80        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
81    ) -> Result<()> {
82        let msg = msg.map(|msg| tor_cell::relaycell::AnyRelayMsgOuter::new(None, msg));
83        let (sender, receiver) = oneshot::channel();
84
85        let ctrl_msg = CtrlMsg::SendMsgAndInstallHandler {
86            msg,
87            handler,
88            sender,
89        };
90        self.0
91            .circ
92            .control
93            .unbounded_send(ctrl_msg)
94            .map_err(|_| Error::CircuitClosed)?;
95
96        receiver.await.map_err(|_| Error::CircuitClosed)?
97    }
98}
99
100/// A low-level client tunnel API.
101///
102/// This is a communication channel to the tunnel reactor, which manages 1 or more circuits.
103///
104/// Note: the tor-circmgr crates wrap this type in specialized *Tunnel types exposing only the
105/// desired subset of functionality depending on purpose and path size.
106///
107/// Some API calls are for single path and some for multi path. A check with the underlying reactor
108/// is done preventing for instance multi path calls to be used on a single path. Top level types
109/// should prevent this and thus this object should never be used directly.
110#[derive(Debug)]
111#[cfg_attr(
112    feature = "rpc",
113    derive(derive_deftly::Deftly),
114    derive_deftly(tor_rpcbase::templates::Object)
115)]
116#[allow(dead_code)] // TODO(conflux)
117pub struct ClientTunnel {
118    /// The underlying handle to the reactor.
119    circ: ClientCirc,
120}
121
122impl ClientTunnel {
123    /// Return a handle to the `ClientCirc` of this `ClientTunnel`, if the tunnel is a single
124    /// circuit tunnel.
125    ///
126    /// Returns an error if the tunnel has more than one circuit.
127    pub fn as_single_circ(&self) -> Result<&ClientCirc> {
128        if self.circ.is_multi_path {
129            return Err(bad_api_usage!("Single circuit getter on multi path tunnel"))?;
130        }
131        Ok(&self.circ)
132    }
133
134    /// Return the channel target of the first hop.
135    ///
136    /// Can only be used for single path tunnel.
137    pub fn first_hop(&self) -> Result<OwnedChanTarget> {
138        self.as_single_circ()?.first_hop()
139    }
140
141    /// Return true if the circuit reactor is closed meaning the circuit is unusable for both
142    /// receiving or sending.
143    pub fn is_closed(&self) -> bool {
144        self.circ.is_closing()
145    }
146
147    /// Return a [`TargetHop`] representing precisely the last hop of the circuit as in set as a
148    /// HopLocation with its id and hop number.
149    ///
150    /// Return an error if there is no last hop.
151    pub fn last_hop(&self) -> Result<TargetHop> {
152        let uniq_id = self.unique_id();
153        let hop_num = self
154            .circ
155            .mutable
156            .last_hop_num(uniq_id)?
157            .ok_or_else(|| bad_api_usage!("no last hop"))?;
158        Ok((uniq_id, hop_num).into())
159    }
160
161    /// Return a description of the last hop of the tunnel.
162    ///
163    /// Return None if the last hop is virtual; return an error
164    /// if the tunnel has no circuits, or all of its circuits are zero length.
165    ///
166    ///
167    /// # Panics
168    ///
169    /// Panics if there is no last hop.  (This should be impossible outside of
170    /// the tor-proto crate, but within the crate it's possible to have a
171    /// circuit with no hops.)
172    pub fn last_hop_info(&self) -> Result<Option<OwnedChanTarget>> {
173        self.circ.last_hop_info()
174    }
175
176    /// Return the number of hops this tunnel as. Fail for a multi path.
177    pub fn n_hops(&self) -> Result<usize> {
178        self.as_single_circ()?.n_hops()
179    }
180
181    /// Return the [`Path`] objects describing all the hops
182    /// of all the circuits in this tunnel.
183    pub fn all_paths(&self) -> Vec<Arc<Path>> {
184        self.circ.all_paths()
185    }
186
187    /// Return a representation of the Paths for all the circuits in this tunnel,
188    /// as a map from each circuits' UniqId to its path.
189    ///
190    /// This is only exposed for the RPC subsystem, where it is documented that the
191    /// format of `UniqId` is not stable.
192    #[cfg(feature = "rpc")]
193    pub(crate) fn tagged_paths(&self) -> std::collections::HashMap<UniqId, Arc<Path>> {
194        self.circ.mutable.tagged_paths()
195    }
196
197    /// Return a process-unique identifier for this tunnel.
198    ///
199    /// Returns the reactor unique ID of the main reactor.
200    pub fn unique_id(&self) -> UniqId {
201        self.circ.unique_id()
202    }
203
204    /// Return the time at which this tunnel last had any open streams.
205    ///
206    /// Returns `None` if this tunnel has never had any open streams,
207    /// or if it currently has open streams.
208    ///
209    /// NOTE that the Instant returned by this method is not affected by
210    /// any runtime mocking; it is the output of an ordinary call to
211    /// `Instant::now()`.
212    pub async fn disused_since(&self) -> Result<Option<web_time_compat::Instant>> {
213        self.circ.disused_since().await
214    }
215
216    /// Return a future that will resolve once the underlying circuit reactor has closed.
217    ///
218    /// Note that this method does not itself cause the tunnel to shut down.
219    pub fn wait_for_close(
220        self: &Arc<Self>,
221    ) -> impl futures::Future<Output = ()> + Send + Sync + 'static + use<> {
222        self.circ.wait_for_close()
223    }
224
225    /// Single-path tunnel only. Multi path onion service is not supported yet.
226    ///
227    /// Tell this tunnel to begin allowing the final hop of the tunnel to try
228    /// to create new Tor streams, and to return those pending requests in an
229    /// asynchronous stream.
230    ///
231    /// Ordinarily, these requests are rejected.
232    ///
233    /// There can only be one [`Stream`](futures::Stream) of this type created on a given tunnel.
234    /// If a such a [`Stream`](futures::Stream) already exists, this method will return
235    /// an error.
236    ///
237    /// After this method has been called on a tunnel, the tunnel is expected
238    /// to receive requests of this type indefinitely, until it is finally closed.
239    /// If the `Stream` is dropped, the next request on this tunnel will cause it to close.
240    ///
241    /// Only onion services (and eventually) exit relays should call this
242    /// method.
243    //
244    // TODO: Someday, we might want to allow a stream request handler to be
245    // un-registered.  However, nothing in the Tor protocol requires it.
246    //
247    // Any incoming request handlers installed on the other circuits
248    // (which are shutdown using CtrlCmd::ShutdownAndReturnCircuit)
249    // will be discarded (along with the reactor of that circuit)
250    #[cfg(feature = "hs-service")]
251    #[allow(unreachable_code, unused_variables)] // TODO(conflux)
252    pub async fn allow_stream_requests<'a, FILT>(
253        self: &Arc<Self>,
254        allow_commands: &'a [tor_cell::relaycell::RelayCmd],
255        hop: TargetHop,
256        filter: FILT,
257    ) -> Result<impl futures::Stream<Item = IncomingStream> + use<'a, FILT>>
258    where
259        FILT: crate::stream::IncomingStreamRequestFilter + 'a,
260    {
261        use futures::stream::StreamExt;
262
263        /// The size of the channel receiving IncomingStreamRequestContexts.
264        const INCOMING_BUFFER: usize = STREAM_READER_BUFFER;
265
266        // TODO(#2002): support onion service conflux
267        let circ = self.as_single_circ().map_err(tor_error::into_internal!(
268            "Cannot allow stream requests on a multi-path tunnel"
269        ))?;
270
271        let time_prov = circ.time_provider.clone();
272        let cmd_checker = IncomingCmdChecker::new_any(allow_commands);
273        let (incoming_sender, incoming_receiver) = MpscSpec::new(INCOMING_BUFFER)
274            .new_mq(time_prov.clone(), circ.memquota.as_raw_account())?;
275        let (tx, rx) = oneshot::channel();
276
277        circ.command
278            .unbounded_send(CtrlCmd::AwaitStreamRequest {
279                cmd_checker,
280                incoming_sender,
281                hop,
282                done: tx,
283                filter: Box::new(filter),
284            })
285            .map_err(|_| Error::CircuitClosed)?;
286
287        // Check whether the AwaitStreamRequest was processed successfully.
288        rx.await.map_err(|_| Error::CircuitClosed)??;
289
290        let allowed_hop_loc: HopLocation = match hop {
291            TargetHop::Hop(loc) => Some(loc),
292            _ => None,
293        }
294        .ok_or_else(|| bad_api_usage!("Expected TargetHop with HopLocation"))?;
295
296        let tunnel = self.clone();
297        Ok(incoming_receiver.map(move |req_ctx| {
298            let StreamReqInfo {
299                req,
300                stream_id,
301                hop,
302                stream_components:
303                    ReactorStreamComponents {
304                        stream_inbound_rx,
305                        stream_outbound_tx,
306                        rate_limit_rx,
307                        drain_rate_request_rx,
308                    },
309                memquota,
310                relay_cell_format,
311            } = req_ctx;
312
313            // We already enforce this in handle_incoming_stream_request; this
314            // assertion is just here to make sure that we don't ever
315            // accidentally remove or fail to enforce that check, since it is
316            // security-critical.
317            assert_eq!(Some(allowed_hop_loc), hop);
318
319            // TODO(#2002): figure out what this is going to look like
320            // for onion services (perhaps we should forbid this function
321            // from being called on a multipath circuit?)
322            //
323            // See also:
324            // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3002#note_3200937
325            let target = StreamTarget {
326                tunnel: Tunnel::Client(Arc::clone(&tunnel)),
327                tx: stream_outbound_tx,
328                hop: Some(allowed_hop_loc),
329                stream_id,
330                relay_cell_format,
331                rate_limit_stream: rate_limit_rx,
332            };
333
334            // can be used to build a reader that supports XON/XOFF flow control
335            let xon_xoff_reader_ctrl =
336                XonXoffReaderCtrl::new(drain_rate_request_rx, target.clone());
337
338            let reader = StreamReceiver {
339                target: target.clone(),
340                receiver: stream_inbound_rx,
341                recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
342                ended: false,
343            };
344
345            let components = StreamComponents {
346                stream_receiver: reader,
347                target,
348                memquota,
349                xon_xoff_reader_ctrl,
350            };
351
352            IncomingStream::new(time_prov.clone(), req, components)
353        }))
354    }
355
356    /// Single and Multi path helper, used to begin a stream.
357    ///
358    /// This function allocates a stream ID, and sends the message
359    /// (like a BEGIN or RESOLVE), but doesn't wait for a response.
360    ///
361    /// The caller will typically want to see the first cell in response,
362    /// to see whether it is e.g. an END or a CONNECTED.
363    async fn begin_stream_impl(
364        self: &Arc<Self>,
365        begin_msg: AnyRelayMsg,
366        cmd_checker: AnyCmdChecker,
367    ) -> Result<StreamComponents> {
368        // TODO: Possibly this should take a hop, rather than just
369        // assuming it's the last hop.
370        let hop = TargetHop::LastHop;
371
372        let memquota = StreamAccount::new(self.circ.mq_account())?;
373        let (tx, rx) = oneshot::channel();
374
375        self.circ
376            .control
377            .unbounded_send(CtrlMsg::BeginStream {
378                hop,
379                message: begin_msg,
380                memquota: memquota.clone(),
381                done: tx,
382                cmd_checker,
383            })
384            .map_err(|_| Error::CircuitClosed)?;
385
386        let (stream_id, hop, relay_cell_format, stream_components) =
387            rx.await.map_err(|_| Error::CircuitClosed)??;
388
389        // Destructure so that we don't forget to use any fields.
390        let ReactorStreamComponents {
391            stream_inbound_rx,
392            stream_outbound_tx,
393            rate_limit_rx,
394            drain_rate_request_rx,
395        } = stream_components;
396
397        let target = StreamTarget {
398            tunnel: Tunnel::Client(self.clone()),
399            tx: stream_outbound_tx,
400            hop: Some(hop),
401            stream_id,
402            relay_cell_format,
403            rate_limit_stream: rate_limit_rx,
404        };
405
406        // can be used to build a reader that supports XON/XOFF flow control
407        let xon_xoff_reader_ctrl = XonXoffReaderCtrl::new(drain_rate_request_rx, target.clone());
408
409        let stream_receiver = StreamReceiver {
410            target: target.clone(),
411            receiver: stream_inbound_rx,
412            recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
413            ended: false,
414        };
415
416        let components = StreamComponents {
417            stream_receiver,
418            target,
419            memquota,
420            xon_xoff_reader_ctrl,
421        };
422
423        Ok(components)
424    }
425
426    /// Install a [`CircuitPadder`] at the listed `hop`.
427    ///
428    /// Replaces any previous padder installed at that hop.
429    #[cfg(feature = "circ-padding-manual")]
430    pub async fn start_padding_at_hop(
431        self: &Arc<Self>,
432        hop: HopLocation,
433        padder: CircuitPadder,
434    ) -> Result<()> {
435        self.circ.set_padder_impl(hop, Some(padder)).await
436    }
437
438    /// Remove any [`CircuitPadder`] at the listed `hop`.
439    ///
440    /// Does nothing if there was not a padder installed there.
441    #[cfg(feature = "circ-padding-manual")]
442    pub async fn stop_padding_at_hop(self: &Arc<Self>, hop: HopLocation) -> Result<()> {
443        self.circ.set_padder_impl(hop, None).await
444    }
445
446    /// Start a DataStream (anonymized connection) to the given
447    /// address and port, using a BEGIN cell.
448    async fn begin_data_stream(
449        self: &Arc<Self>,
450        msg: AnyRelayMsg,
451        optimistic: bool,
452    ) -> Result<DataStream> {
453        let components = self
454            .begin_stream_impl(msg, OutboundDataCmdChecker::new_any())
455            .await?;
456
457        let StreamComponents {
458            stream_receiver,
459            target,
460            memquota,
461            xon_xoff_reader_ctrl,
462        } = components;
463
464        let mut stream = DataStream::new(
465            self.circ.time_provider.clone(),
466            stream_receiver,
467            xon_xoff_reader_ctrl,
468            target,
469            memquota,
470        );
471        if !optimistic {
472            stream.wait_for_connection().await?;
473        }
474        Ok(stream)
475    }
476
477    /// Single and multi path helper.
478    ///
479    /// Start a stream to the given address and port, using a BEGIN
480    /// cell.
481    ///
482    /// The use of a string for the address is intentional: you should let
483    /// the remote Tor relay do the hostname lookup for you.
484    #[instrument(level = "trace", skip_all)]
485    pub async fn begin_stream(
486        self: &Arc<Self>,
487        target: &str,
488        port: u16,
489        parameters: Option<StreamParameters>,
490    ) -> Result<DataStream> {
491        let parameters = parameters.unwrap_or_default();
492        let begin_flags = parameters.begin_flags();
493        let optimistic = parameters.is_optimistic();
494        let target = if parameters.suppressing_hostname() {
495            ""
496        } else {
497            target
498        };
499        let beginmsg = Begin::new(target, port, begin_flags)
500            .map_err(|e| Error::from_cell_enc(e, "begin message"))?;
501        self.begin_data_stream(beginmsg.into(), optimistic).await
502    }
503
504    /// Start a new stream to the last relay in the tunnel, using
505    /// a BEGIN_DIR cell.
506    pub async fn begin_dir_stream(self: Arc<Self>) -> Result<DataStream> {
507        // Note that we always open begindir connections optimistically.
508        // Since they are local to a relay that we've already authenticated
509        // with and built a tunnel to, there should be no additional checks
510        // we need to perform to see whether the BEGINDIR will succeed.
511        self.begin_data_stream(AnyRelayMsg::BeginDir(Default::default()), true)
512            .await
513    }
514
515    /// Perform a DNS lookup, using a RESOLVE cell with the last relay
516    /// in this tunnel.
517    ///
518    /// Note that this function does not check for timeouts; that's
519    /// the caller's responsibility.
520    pub async fn resolve(self: &Arc<Self>, hostname: &str) -> Result<Vec<IpAddr>> {
521        let resolve_msg = Resolve::new(hostname);
522
523        let resolved_msg = self.try_resolve(resolve_msg).await?;
524
525        resolved_msg
526            .into_answers()
527            .into_iter()
528            .filter_map(|(val, _)| match resolvedval_to_result(val) {
529                Ok(ResolvedVal::Ip(ip)) => Some(Ok(ip)),
530                Ok(_) => None,
531                Err(e) => Some(Err(e)),
532            })
533            .collect()
534    }
535
536    /// Perform a reverse DNS lookup, by sending a RESOLVE cell with
537    /// the last relay on this tunnel.
538    ///
539    /// Note that this function does not check for timeouts; that's
540    /// the caller's responsibility.
541    pub async fn resolve_ptr(self: &Arc<Self>, addr: IpAddr) -> Result<Vec<String>> {
542        let resolve_ptr_msg = Resolve::new_reverse(&addr);
543
544        let resolved_msg = self.try_resolve(resolve_ptr_msg).await?;
545
546        resolved_msg
547            .into_answers()
548            .into_iter()
549            .filter_map(|(val, _)| match resolvedval_to_result(val) {
550                Ok(ResolvedVal::Hostname(v)) => Some(
551                    String::from_utf8(v)
552                        .map_err(|_| Error::StreamProto("Resolved Hostname was not utf-8".into())),
553                ),
554                Ok(_) => None,
555                Err(e) => Some(Err(e)),
556            })
557            .collect()
558    }
559
560    /// Send an ad-hoc message to a given hop on the circuit, without expecting
561    /// a reply.
562    ///
563    /// (If you want to handle one or more possible replies, see
564    /// [`ClientTunnel::start_conversation`].)
565    // TODO(conflux): Change this to use the ReactorHandle for the control commands.
566    #[cfg(feature = "send-control-msg")]
567    pub async fn send_raw_msg(
568        &self,
569        msg: tor_cell::relaycell::msg::AnyRelayMsg,
570        hop: TargetHop,
571    ) -> Result<()> {
572        let (sender, receiver) = oneshot::channel();
573        let ctrl_msg = CtrlMsg::SendMsg { hop, msg, sender };
574        self.circ
575            .control
576            .unbounded_send(ctrl_msg)
577            .map_err(|_| Error::CircuitClosed)?;
578
579        receiver.await.map_err(|_| Error::CircuitClosed)?
580    }
581
582    /// Start an ad-hoc protocol exchange to the specified hop on this tunnel.
583    ///
584    /// To use this:
585    ///
586    ///  0. Create an inter-task channel you'll use to receive
587    ///     the outcome of your conversation,
588    ///     and bundle it into a [`UserMsgHandler`].
589    ///
590    ///  1. Call `start_conversation`.
591    ///     This will install a your handler, for incoming messages,
592    ///     and send the outgoing message (if you provided one).
593    ///     After that, each message on the circuit
594    ///     that isn't handled by the core machinery
595    ///     is passed to your provided `reply_handler`.
596    ///
597    ///  2. Possibly call `send_msg` on the [`Conversation`],
598    ///     from the call site of `start_conversation`,
599    ///     possibly multiple times, from time to time,
600    ///     to send further desired messages to the peer.
601    ///
602    ///  3. In your [`UserMsgHandler`], process the incoming messages.
603    ///     You may respond by
604    ///     sending additional messages
605    ///     When the protocol exchange is finished,
606    ///     `UserMsgHandler::handle_msg` should return
607    ///     [`ConversationFinished`](reactor::MetaCellDisposition::ConversationFinished).
608    ///
609    /// If you don't need the `Conversation` to send followup messages,
610    /// you may simply drop it,
611    /// and rely on the responses you get from your handler,
612    /// on the channel from step 0 above.
613    /// Your handler will remain installed and able to process incoming messages
614    /// until it returns `ConversationFinished`.
615    ///
616    /// (If you don't want to accept any replies at all, it may be
617    /// simpler to use [`ClientTunnel::send_raw_msg`].)
618    ///
619    /// Note that it is quite possible to use this function to violate the tor
620    /// protocol; most users of this API will not need to call it.  It is used
621    /// to implement most of the onion service handshake.
622    ///
623    /// # Limitations
624    ///
625    /// Only one conversation may be active at any one time,
626    /// for any one circuit.
627    /// This generally means that this function should not be called
628    /// on a tunnel which might be shared with anyone else.
629    ///
630    /// Likewise, it is forbidden to try to extend the tunnel,
631    /// while the conversation is in progress.
632    ///
633    /// After the conversation has finished, the tunnel may be extended.
634    /// Or, `start_conversation` may be called again;
635    /// but, in that case there will be a gap between the two conversations,
636    /// during which no `UserMsgHandler` is installed,
637    /// and unexpected incoming messages would close the tunnel.
638    ///
639    /// If these restrictions are violated, the tunnel will be closed with an error.
640    ///
641    /// ## Precise definition of the lifetime of a conversation
642    ///
643    /// A conversation is in progress from entry to `start_conversation`,
644    /// until entry to the body of the [`UserMsgHandler::handle_msg`](MsgHandler::handle_msg)
645    /// call which returns [`ConversationFinished`](reactor::MetaCellDisposition::ConversationFinished).
646    /// (*Entry* since `handle_msg` is synchronously embedded
647    /// into the incoming message processing.)
648    /// So you may start a new conversation as soon as you have the final response
649    /// via your inter-task channel from (0) above.
650    ///
651    /// The lifetime relationship of the [`Conversation`],
652    /// vs the handler returning `ConversationFinished`
653    /// is not enforced by the type system.
654    // Doing so without still leaving plenty of scope for runtime errors doesn't seem possible,
655    // at least while allowing sending followup messages from outside the handler.
656    #[cfg(feature = "send-control-msg")]
657    pub async fn start_conversation(
658        &self,
659        msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
660        reply_handler: impl MsgHandler + Send + 'static,
661        hop: TargetHop,
662    ) -> Result<Conversation<'_>> {
663        // We need to resolve the TargetHop into a precise HopLocation so our msg handler can match
664        // the right Leg/Hop with inbound cell.
665        let (sender, receiver) = oneshot::channel();
666        self.circ
667            .command
668            .unbounded_send(CtrlCmd::ResolveTargetHop { hop, done: sender })
669            .map_err(|_| Error::CircuitClosed)?;
670        let hop_location = receiver.await.map_err(|_| Error::CircuitClosed)??;
671        let handler = Box::new(UserMsgHandler::new(hop_location, reply_handler));
672        let conversation = Conversation(self);
673        conversation.send_internal(msg, Some(handler)).await?;
674        Ok(conversation)
675    }
676
677    /// Shut down this tunnel, along with all streams that are using it. Happens asynchronously
678    /// (i.e. the tunnel won't necessarily be done shutting down immediately after this function
679    /// returns!).
680    ///
681    /// Note that other references to this tunnel may exist. If they do, they will stop working
682    /// after you call this function.
683    ///
684    /// It's not necessary to call this method if you're just done with a tunnel: the tunnel should
685    /// close on its own once nothing is using it any more.
686    // TODO(conflux): This should use the ReactorHandle instead.
687    pub fn terminate(&self) {
688        let _ = self.circ.command.unbounded_send(CtrlCmd::Shutdown);
689    }
690
691    /// Helper: Send the resolve message, and read resolved message from
692    /// resolve stream.
693    async fn try_resolve(self: &Arc<Self>, msg: Resolve) -> Result<Resolved> {
694        let components = self
695            .begin_stream_impl(msg.into(), ResolveCmdChecker::new_any())
696            .await?;
697
698        let StreamComponents {
699            stream_receiver,
700            target: _,
701            memquota,
702            xon_xoff_reader_ctrl: _,
703        } = components;
704
705        let mut resolve_stream = ResolveStream::new(stream_receiver, memquota);
706        resolve_stream.read_msg().await
707    }
708
709    // TODO(conflux)
710}
711
712// TODO(conflux): We will likely need to enforce some invariants here, for example that the `circ`
713// has the expected (non-zero) number of hops.
714impl TryFrom<ClientCirc> for ClientTunnel {
715    type Error = Error;
716
717    fn try_from(circ: ClientCirc) -> std::result::Result<Self, Self::Error> {
718        Ok(Self { circ })
719    }
720}
721
722/// Convert a [`ResolvedVal`] into a Result, based on whether or not
723/// it represents an error.
724fn resolvedval_to_result(val: ResolvedVal) -> Result<ResolvedVal> {
725    match val {
726        ResolvedVal::TransientError => Err(Error::ResolveError(ResolveError::Transient)),
727        ResolvedVal::NontransientError => Err(Error::ResolveError(ResolveError::Nontransient)),
728        ResolvedVal::Unrecognized(_, _) => Err(Error::ResolveError(ResolveError::Unrecognized)),
729        _ => Ok(val),
730    }
731}
732
733/// A precise position in a tunnel.
734#[derive(Debug, Deftly, Copy, Clone, PartialEq, Eq)]
735#[derive_deftly(HasMemoryCost)]
736#[non_exhaustive]
737pub enum HopLocation {
738    /// A specific position in a tunnel.
739    Hop((UniqId, HopNum)),
740    /// The join point of a multi-path tunnel.
741    JoinPoint,
742}
743
744/// A position in a tunnel.
745#[derive(Debug, Copy, Clone, PartialEq, Eq)]
746#[non_exhaustive]
747pub enum TargetHop {
748    /// A specific position in a tunnel.
749    Hop(HopLocation),
750    /// The last hop of a tunnel.
751    ///
752    /// This should be used only when you don't care about what specific hop is used.
753    /// Some tunnels may be extended or truncated,
754    /// which means that the "last hop" may change at any time.
755    LastHop,
756}
757
758impl From<(UniqId, HopNum)> for HopLocation {
759    fn from(v: (UniqId, HopNum)) -> Self {
760        HopLocation::Hop(v)
761    }
762}
763
764impl From<(UniqId, HopNum)> for TargetHop {
765    fn from(v: (UniqId, HopNum)) -> Self {
766        TargetHop::Hop(v.into())
767    }
768}
769
770impl HopLocation {
771    /// Return the hop number if not a JointPoint.
772    pub fn hop_num(&self) -> Option<HopNum> {
773        match self {
774            Self::Hop((_, hop_num)) => Some(*hop_num),
775            Self::JoinPoint => None,
776        }
777    }
778}
779
780impl ClientTunnel {
781    /// Close the pending stream that owns this StreamTarget, delivering the specified
782    /// END message (if any)
783    ///
784    /// See [`StreamTarget::close_pending`].
785    #[cfg(feature = "hs-service")]
786    pub(crate) fn close_pending(
787        &self,
788        stream_id: StreamId,
789        hop: Option<HopLocation>,
790        message: crate::stream::CloseStreamBehavior,
791    ) -> Result<oneshot::Receiver<Result<()>>> {
792        let (tx, rx) = oneshot::channel();
793
794        self.circ
795            .control
796            .unbounded_send(CtrlMsg::ClosePendingStream {
797                stream_id,
798                hop: hop.expect("missing stream hop for client tunnel"),
799                message,
800                done: tx,
801            })
802            .map_err(|_| Error::CircuitClosed)?;
803
804        Ok(rx)
805    }
806
807    /// Request to send a SENDME cell for this stream.
808    ///
809    /// See [`StreamTarget::send_sendme`].
810    pub(crate) fn send_sendme(&self, stream_id: StreamId, hop: Option<HopLocation>) -> Result<()> {
811        self.circ
812            .control
813            .unbounded_send(CtrlMsg::FlowCtrlUpdate {
814                msg: FlowCtrlMsg::Sendme,
815                stream_id,
816                hop: hop.expect("missing stream hop for client tunnel"),
817            })
818            .map_err(|_| Error::CircuitClosed)
819    }
820
821    /// Inform the circuit reactor that there has been a change in the drain rate for this stream.
822    ///
823    /// See [`StreamTarget::drain_rate_update`].
824    pub(crate) fn drain_rate_update(
825        &self,
826        stream_id: StreamId,
827        hop: Option<HopLocation>,
828        rate: XonKbpsEwma,
829    ) -> Result<()> {
830        self.circ
831            .control
832            .unbounded_send(CtrlMsg::FlowCtrlUpdate {
833                msg: FlowCtrlMsg::Xon(rate),
834                stream_id,
835                hop: hop.expect("missing stream hop for client tunnel"),
836            })
837            .map_err(|_| Error::CircuitClosed)
838    }
839}