tor_proto/
client.rs

1//! Client-specific types and implementation.
2
3pub mod channel;
4pub mod circuit;
5pub mod stream;
6
7#[cfg(feature = "send-control-msg")]
8pub(crate) mod msghandler;
9pub(crate) mod reactor;
10
11use derive_deftly::Deftly;
12use futures::SinkExt as _;
13use oneshot_fused_workaround as oneshot;
14use std::net::IpAddr;
15use std::pin::Pin;
16use std::sync::Arc;
17use tracing::instrument;
18
19use crate::circuit::UniqId;
20#[cfg(feature = "circ-padding-manual")]
21#[cfg_attr(docsrs, doc(cfg(feature = "circ-padding-manual")))]
22pub use crate::client::circuit::padding::{
23    CircuitPadder, CircuitPadderConfig, CircuitPadderConfigError,
24};
25use crate::client::stream::{
26    DataStream, OutboundDataCmdChecker, ResolveCmdChecker, ResolveStream, StreamParameters,
27    StreamReceiver,
28};
29use crate::congestion::sendme::StreamRecvWindow;
30use crate::crypto::cell::HopNum;
31use crate::memquota::{SpecificAccount as _, StreamAccount};
32use crate::stream::StreamMpscSender;
33use crate::stream::cmdcheck::AnyCmdChecker;
34use crate::stream::flow_ctrl::state::StreamRateLimit;
35use crate::stream::flow_ctrl::xon_xoff::reader::XonXoffReaderCtrl;
36use crate::stream::queue::stream_queue;
37use crate::util::notify::NotifySender;
38use crate::{Error, ResolveError, Result};
39use circuit::{CIRCUIT_BUFFER_SIZE, ClientCirc, Path};
40use reactor::{
41    CtrlCmd, CtrlMsg, FlowCtrlMsg, MetaCellHandler, RECV_WINDOW_INIT, STREAM_READER_BUFFER,
42};
43
44use postage::watch;
45use tor_async_utils::SinkCloseChannel as _;
46use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
47use tor_cell::relaycell::msg::{AnyRelayMsg, Begin, Resolve, Resolved, ResolvedVal};
48use tor_cell::relaycell::{RelayCellFormat, StreamId};
49use tor_error::bad_api_usage;
50use tor_linkspec::OwnedChanTarget;
51use tor_memquota::derive_deftly_template_HasMemoryCost;
52use tor_memquota::mq_queue::{ChannelSpec as _, MpscSpec};
53
54#[cfg(feature = "hs-service")]
55use {
56    crate::client::reactor::StreamReqInfo,
57    crate::client::stream::{IncomingCmdChecker, IncomingStream},
58};
59
60#[cfg(feature = "send-control-msg")]
61use msghandler::{MsgHandler, UserMsgHandler};
62
63/// Handle to use during an ongoing protocol exchange with a circuit's last hop
64///
65/// This is obtained from [`ClientTunnel::start_conversation`],
66/// and used to send messages to the last hop relay.
67//
68// TODO(conflux): this should use ClientTunnel, and it should be moved into
69// the tunnel module.
70#[cfg(feature = "send-control-msg")]
71#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
72pub struct Conversation<'r>(&'r ClientTunnel);
73
74#[cfg(feature = "send-control-msg")]
75#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
76impl Conversation<'_> {
77    /// Send a protocol message as part of an ad-hoc exchange
78    ///
79    /// Responses are handled by the `UserMsgHandler` set up
80    /// when the `Conversation` was created.
81    pub async fn send_message(&self, msg: tor_cell::relaycell::msg::AnyRelayMsg) -> Result<()> {
82        self.send_internal(Some(msg), None).await
83    }
84
85    /// Send a `SendMsgAndInstallHandler` to the reactor and wait for the outcome
86    ///
87    /// The guts of `start_conversation` and `Conversation::send_msg`
88    pub(crate) async fn send_internal(
89        &self,
90        msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
91        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
92    ) -> Result<()> {
93        let msg = msg.map(|msg| tor_cell::relaycell::AnyRelayMsgOuter::new(None, msg));
94        let (sender, receiver) = oneshot::channel();
95
96        let ctrl_msg = CtrlMsg::SendMsgAndInstallHandler {
97            msg,
98            handler,
99            sender,
100        };
101        self.0
102            .circ
103            .control
104            .unbounded_send(ctrl_msg)
105            .map_err(|_| Error::CircuitClosed)?;
106
107        receiver.await.map_err(|_| Error::CircuitClosed)?
108    }
109}
110
111/// A low-level client tunnel API.
112///
113/// This is a communication channel to the tunnel reactor, which manages 1 or more circuits.
114///
115/// Note: the tor-circmgr crates wrap this type in specialized *Tunnel types exposing only the
116/// desired subset of functionality depending on purpose and path size.
117///
118/// Some API calls are for single path and some for multi path. A check with the underlying reactor
119/// is done preventing for instance multi path calls to be used on a single path. Top level types
120/// should prevent this and thus this object should never be used directly.
121#[derive(Debug)]
122#[allow(dead_code)] // TODO(conflux)
123pub struct ClientTunnel {
124    /// The underlying handle to the reactor.
125    circ: ClientCirc,
126}
127
128impl ClientTunnel {
129    /// Return a handle to the `ClientCirc` of this `ClientTunnel`, if the tunnel is a single
130    /// circuit tunnel.
131    ///
132    /// Returns an error if the tunnel has more than one circuit.
133    pub fn as_single_circ(&self) -> Result<&ClientCirc> {
134        if self.circ.is_multi_path {
135            return Err(bad_api_usage!("Single circuit getter on multi path tunnel"))?;
136        }
137        Ok(&self.circ)
138    }
139
140    /// Return the channel target of the first hop.
141    ///
142    /// Can only be used for single path tunnel.
143    pub fn first_hop(&self) -> Result<OwnedChanTarget> {
144        self.as_single_circ()?.first_hop()
145    }
146
147    /// Return true if the circuit reactor is closed meaning the circuit is unusable for both
148    /// receiving or sending.
149    pub fn is_closed(&self) -> bool {
150        self.circ.is_closing()
151    }
152
153    /// Return a [`TargetHop`] representing precisely the last hop of the circuit as in set as a
154    /// HopLocation with its id and hop number.
155    ///
156    /// Return an error if there is no last hop.
157    pub fn last_hop(&self) -> Result<TargetHop> {
158        let uniq_id = self.unique_id();
159        let hop_num = self
160            .circ
161            .mutable
162            .last_hop_num(uniq_id)?
163            .ok_or_else(|| bad_api_usage!("no last hop"))?;
164        Ok((uniq_id, hop_num).into())
165    }
166
167    /// Return a description of the last hop of the tunnel.
168    ///
169    /// Return None if the last hop is virtual; return an error
170    /// if the tunnel has no circuits, or all of its circuits are zero length.
171    ///
172    ///
173    /// # Panics
174    ///
175    /// Panics if there is no last hop.  (This should be impossible outside of
176    /// the tor-proto crate, but within the crate it's possible to have a
177    /// circuit with no hops.)
178    pub fn last_hop_info(&self) -> Result<Option<OwnedChanTarget>> {
179        self.circ.last_hop_info()
180    }
181
182    /// Return the number of hops this tunnel as. Fail for a multi path.
183    pub fn n_hops(&self) -> Result<usize> {
184        self.as_single_circ()?.n_hops()
185    }
186
187    /// Return the [`Path`] objects describing all the hops
188    /// of all the circuits in this tunnel.
189    pub fn all_paths(&self) -> Vec<Arc<Path>> {
190        self.circ.all_paths()
191    }
192
193    /// Return a process-unique identifier for this tunnel.
194    ///
195    /// Returns the reactor unique ID of the main reactor.
196    pub fn unique_id(&self) -> UniqId {
197        self.circ.unique_id()
198    }
199
200    /// Return the time at which this tunnel last had any open streams.
201    ///
202    /// Returns `None` if this tunnel has never had any open streams,
203    /// or if it currently has open streams.
204    ///
205    /// NOTE that the Instant returned by this method is not affected by
206    /// any runtime mocking; it is the output of an ordinary call to
207    /// `Instant::now()`.
208    pub async fn disused_since(&self) -> Result<Option<std::time::Instant>> {
209        self.circ.disused_since().await
210    }
211
212    /// Return a future that will resolve once the underlying circuit reactor has closed.
213    ///
214    /// Note that this method does not itself cause the tunnel to shut down.
215    pub fn wait_for_close(
216        self: &Arc<Self>,
217    ) -> impl futures::Future<Output = ()> + Send + Sync + 'static + use<> {
218        self.circ.wait_for_close()
219    }
220
221    /// Single-path tunnel only. Multi path onion service is not supported yet.
222    ///
223    /// Tell this tunnel to begin allowing the final hop of the tunnel to try
224    /// to create new Tor streams, and to return those pending requests in an
225    /// asynchronous stream.
226    ///
227    /// Ordinarily, these requests are rejected.
228    ///
229    /// There can only be one [`Stream`](futures::Stream) of this type created on a given tunnel.
230    /// If a such a [`Stream`](futures::Stream) already exists, this method will return
231    /// an error.
232    ///
233    /// After this method has been called on a tunnel, the tunnel is expected
234    /// to receive requests of this type indefinitely, until it is finally closed.
235    /// If the `Stream` is dropped, the next request on this tunnel will cause it to close.
236    ///
237    /// Only onion services (and eventually) exit relays should call this
238    /// method.
239    //
240    // TODO: Someday, we might want to allow a stream request handler to be
241    // un-registered.  However, nothing in the Tor protocol requires it.
242    //
243    // Any incoming request handlers installed on the other circuits
244    // (which are are shutdown using CtrlCmd::ShutdownAndReturnCircuit)
245    // will be discarded (along with the reactor of that circuit)
246    #[cfg(feature = "hs-service")]
247    #[allow(unreachable_code, unused_variables)] // TODO(conflux)
248    pub async fn allow_stream_requests<'a, FILT>(
249        self: &Arc<Self>,
250        allow_commands: &'a [tor_cell::relaycell::RelayCmd],
251        hop: TargetHop,
252        filter: FILT,
253    ) -> Result<impl futures::Stream<Item = IncomingStream> + use<'a, FILT>>
254    where
255        FILT: crate::client::stream::IncomingStreamRequestFilter + 'a,
256    {
257        use futures::stream::StreamExt;
258
259        /// The size of the channel receiving IncomingStreamRequestContexts.
260        const INCOMING_BUFFER: usize = STREAM_READER_BUFFER;
261
262        // TODO(#2002): support onion service conflux
263        let circ = self.as_single_circ().map_err(tor_error::into_internal!(
264            "Cannot allow stream requests on a multi-path tunnel"
265        ))?;
266
267        let time_prov = circ.time_provider.clone();
268        let cmd_checker = IncomingCmdChecker::new_any(allow_commands);
269        let (incoming_sender, incoming_receiver) = MpscSpec::new(INCOMING_BUFFER)
270            .new_mq(time_prov.clone(), circ.memquota.as_raw_account())?;
271        let (tx, rx) = oneshot::channel();
272
273        circ.command
274            .unbounded_send(CtrlCmd::AwaitStreamRequest {
275                cmd_checker,
276                incoming_sender,
277                hop,
278                done: tx,
279                filter: Box::new(filter),
280            })
281            .map_err(|_| Error::CircuitClosed)?;
282
283        // Check whether the AwaitStreamRequest was processed successfully.
284        rx.await.map_err(|_| Error::CircuitClosed)??;
285
286        let allowed_hop_loc: HopLocation = match hop {
287            TargetHop::Hop(loc) => Some(loc),
288            _ => None,
289        }
290        .ok_or_else(|| bad_api_usage!("Expected TargetHop with HopLocation"))?;
291
292        let tunnel = self.clone();
293        Ok(incoming_receiver.map(move |req_ctx| {
294            let StreamReqInfo {
295                req,
296                stream_id,
297                hop,
298                receiver,
299                msg_tx,
300                rate_limit_stream,
301                drain_rate_request_stream,
302                memquota,
303                relay_cell_format,
304            } = req_ctx;
305
306            // We already enforce this in handle_incoming_stream_request; this
307            // assertion is just here to make sure that we don't ever
308            // accidentally remove or fail to enforce that check, since it is
309            // security-critical.
310            assert_eq!(allowed_hop_loc, hop);
311
312            // TODO(#2002): figure out what this is going to look like
313            // for onion services (perhaps we should forbid this function
314            // from being called on a multipath circuit?)
315            //
316            // See also:
317            // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3002#note_3200937
318            let target = StreamTarget {
319                tunnel: tunnel.clone(),
320                tx: msg_tx,
321                hop: allowed_hop_loc,
322                stream_id,
323                relay_cell_format,
324                rate_limit_stream,
325            };
326
327            // can be used to build a reader that supports XON/XOFF flow control
328            let xon_xoff_reader_ctrl =
329                XonXoffReaderCtrl::new(drain_rate_request_stream, target.clone());
330
331            let reader = StreamReceiver {
332                target: target.clone(),
333                receiver,
334                recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
335                ended: false,
336            };
337
338            let components = StreamComponents {
339                stream_receiver: reader,
340                target,
341                memquota,
342                xon_xoff_reader_ctrl,
343            };
344
345            IncomingStream::new(time_prov.clone(), req, components)
346        }))
347    }
348
349    /// Single and Multi path helper, used to begin a stream.
350    ///
351    /// This function allocates a stream ID, and sends the message
352    /// (like a BEGIN or RESOLVE), but doesn't wait for a response.
353    ///
354    /// The caller will typically want to see the first cell in response,
355    /// to see whether it is e.g. an END or a CONNECTED.
356    async fn begin_stream_impl(
357        self: &Arc<Self>,
358        begin_msg: AnyRelayMsg,
359        cmd_checker: AnyCmdChecker,
360    ) -> Result<StreamComponents> {
361        // TODO: Possibly this should take a hop, rather than just
362        // assuming it's the last hop.
363        let hop = TargetHop::LastHop;
364
365        let time_prov = self.circ.time_provider.clone();
366
367        let memquota = StreamAccount::new(self.circ.mq_account())?;
368        let (sender, receiver) = stream_queue(
369            #[cfg(not(feature = "flowctl-cc"))]
370            STREAM_READER_BUFFER,
371            &memquota,
372            &time_prov,
373        )?;
374        let (tx, rx) = oneshot::channel();
375        let (msg_tx, msg_rx) =
376            MpscSpec::new(CIRCUIT_BUFFER_SIZE).new_mq(time_prov, memquota.as_raw_account())?;
377
378        let (rate_limit_tx, rate_limit_rx) = watch::channel_with(StreamRateLimit::MAX);
379
380        // A channel for the reactor to request a new drain rate from the reader.
381        // Typically this notification will be sent after an XOFF is sent so that the reader can
382        // send us a new drain rate when the stream data queue becomes empty.
383        let mut drain_rate_request_tx = NotifySender::new_typed();
384        let drain_rate_request_rx = drain_rate_request_tx.subscribe();
385
386        self.circ
387            .control
388            .unbounded_send(CtrlMsg::BeginStream {
389                hop,
390                message: begin_msg,
391                sender,
392                rx: msg_rx,
393                rate_limit_notifier: rate_limit_tx,
394                drain_rate_requester: drain_rate_request_tx,
395                done: tx,
396                cmd_checker,
397            })
398            .map_err(|_| Error::CircuitClosed)?;
399
400        let (stream_id, hop, relay_cell_format) = rx.await.map_err(|_| Error::CircuitClosed)??;
401
402        let target = StreamTarget {
403            tunnel: self.clone(),
404            tx: msg_tx,
405            hop,
406            stream_id,
407            relay_cell_format,
408            rate_limit_stream: rate_limit_rx,
409        };
410
411        // can be used to build a reader that supports XON/XOFF flow control
412        let xon_xoff_reader_ctrl = XonXoffReaderCtrl::new(drain_rate_request_rx, target.clone());
413
414        let stream_receiver = StreamReceiver {
415            target: target.clone(),
416            receiver,
417            recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
418            ended: false,
419        };
420
421        let components = StreamComponents {
422            stream_receiver,
423            target,
424            memquota,
425            xon_xoff_reader_ctrl,
426        };
427
428        Ok(components)
429    }
430
431    /// Install a [`CircuitPadder`] at the listed `hop`.
432    ///
433    /// Replaces any previous padder installed at that hop.
434    #[cfg(feature = "circ-padding-manual")]
435    pub async fn start_padding_at_hop(
436        self: &Arc<Self>,
437        hop: HopLocation,
438        padder: CircuitPadder,
439    ) -> Result<()> {
440        self.circ.set_padder_impl(hop, Some(padder)).await
441    }
442
443    /// Remove any [`CircuitPadder`] at the listed `hop`.
444    ///
445    /// Does nothing if there was not a padder installed there.
446    #[cfg(feature = "circ-padding-manual")]
447    pub async fn stop_padding_at_hop(self: &Arc<Self>, hop: HopLocation) -> Result<()> {
448        self.circ.set_padder_impl(hop, None).await
449    }
450
451    /// Start a DataStream (anonymized connection) to the given
452    /// address and port, using a BEGIN cell.
453    async fn begin_data_stream(
454        self: &Arc<Self>,
455        msg: AnyRelayMsg,
456        optimistic: bool,
457    ) -> Result<DataStream> {
458        let components = self
459            .begin_stream_impl(msg, OutboundDataCmdChecker::new_any())
460            .await?;
461
462        let StreamComponents {
463            stream_receiver,
464            target,
465            memquota,
466            xon_xoff_reader_ctrl,
467        } = components;
468
469        let mut stream = DataStream::new(
470            self.circ.time_provider.clone(),
471            stream_receiver,
472            xon_xoff_reader_ctrl,
473            target,
474            memquota,
475        );
476        if !optimistic {
477            stream.wait_for_connection().await?;
478        }
479        Ok(stream)
480    }
481
482    /// Single and multi path helper.
483    ///
484    /// Start a stream to the given address and port, using a BEGIN
485    /// cell.
486    ///
487    /// The use of a string for the address is intentional: you should let
488    /// the remote Tor relay do the hostname lookup for you.
489    #[instrument(level = "trace", skip_all)]
490    pub async fn begin_stream(
491        self: &Arc<Self>,
492        target: &str,
493        port: u16,
494        parameters: Option<StreamParameters>,
495    ) -> Result<DataStream> {
496        let parameters = parameters.unwrap_or_default();
497        let begin_flags = parameters.begin_flags();
498        let optimistic = parameters.is_optimistic();
499        let target = if parameters.suppressing_hostname() {
500            ""
501        } else {
502            target
503        };
504        let beginmsg = Begin::new(target, port, begin_flags)
505            .map_err(|e| Error::from_cell_enc(e, "begin message"))?;
506        self.begin_data_stream(beginmsg.into(), optimistic).await
507    }
508
509    /// Start a new stream to the last relay in the tunnel, using
510    /// a BEGIN_DIR cell.
511    pub async fn begin_dir_stream(self: Arc<Self>) -> Result<DataStream> {
512        // Note that we always open begindir connections optimistically.
513        // Since they are local to a relay that we've already authenticated
514        // with and built a tunnel to, there should be no additional checks
515        // we need to perform to see whether the BEGINDIR will succeed.
516        self.begin_data_stream(AnyRelayMsg::BeginDir(Default::default()), true)
517            .await
518    }
519
520    /// Perform a DNS lookup, using a RESOLVE cell with the last relay
521    /// in this tunnel.
522    ///
523    /// Note that this function does not check for timeouts; that's
524    /// the caller's responsibility.
525    pub async fn resolve(self: &Arc<Self>, hostname: &str) -> Result<Vec<IpAddr>> {
526        let resolve_msg = Resolve::new(hostname);
527
528        let resolved_msg = self.try_resolve(resolve_msg).await?;
529
530        resolved_msg
531            .into_answers()
532            .into_iter()
533            .filter_map(|(val, _)| match resolvedval_to_result(val) {
534                Ok(ResolvedVal::Ip(ip)) => Some(Ok(ip)),
535                Ok(_) => None,
536                Err(e) => Some(Err(e)),
537            })
538            .collect()
539    }
540
541    /// Perform a reverse DNS lookup, by sending a RESOLVE cell with
542    /// the last relay on this tunnel.
543    ///
544    /// Note that this function does not check for timeouts; that's
545    /// the caller's responsibility.
546    pub async fn resolve_ptr(self: &Arc<Self>, addr: IpAddr) -> Result<Vec<String>> {
547        let resolve_ptr_msg = Resolve::new_reverse(&addr);
548
549        let resolved_msg = self.try_resolve(resolve_ptr_msg).await?;
550
551        resolved_msg
552            .into_answers()
553            .into_iter()
554            .filter_map(|(val, _)| match resolvedval_to_result(val) {
555                Ok(ResolvedVal::Hostname(v)) => Some(
556                    String::from_utf8(v)
557                        .map_err(|_| Error::StreamProto("Resolved Hostname was not utf-8".into())),
558                ),
559                Ok(_) => None,
560                Err(e) => Some(Err(e)),
561            })
562            .collect()
563    }
564
565    /// Send an ad-hoc message to a given hop on the circuit, without expecting
566    /// a reply.
567    ///
568    /// (If you want to handle one or more possible replies, see
569    /// [`ClientTunnel::start_conversation`].)
570    // TODO(conflux): Change this to use the ReactorHandle for the control commands.
571    #[cfg(feature = "send-control-msg")]
572    pub async fn send_raw_msg(
573        &self,
574        msg: tor_cell::relaycell::msg::AnyRelayMsg,
575        hop: TargetHop,
576    ) -> Result<()> {
577        let (sender, receiver) = oneshot::channel();
578        let ctrl_msg = CtrlMsg::SendMsg { hop, msg, sender };
579        self.circ
580            .control
581            .unbounded_send(ctrl_msg)
582            .map_err(|_| Error::CircuitClosed)?;
583
584        receiver.await.map_err(|_| Error::CircuitClosed)?
585    }
586
587    /// Start an ad-hoc protocol exchange to the specified hop on this tunnel.
588    ///
589    /// To use this:
590    ///
591    ///  0. Create an inter-task channel you'll use to receive
592    ///     the outcome of your conversation,
593    ///     and bundle it into a [`UserMsgHandler`].
594    ///
595    ///  1. Call `start_conversation`.
596    ///     This will install a your handler, for incoming messages,
597    ///     and send the outgoing message (if you provided one).
598    ///     After that, each message on the circuit
599    ///     that isn't handled by the core machinery
600    ///     is passed to your provided `reply_handler`.
601    ///
602    ///  2. Possibly call `send_msg` on the [`Conversation`],
603    ///     from the call site of `start_conversation`,
604    ///     possibly multiple times, from time to time,
605    ///     to send further desired messages to the peer.
606    ///
607    ///  3. In your [`UserMsgHandler`], process the incoming messages.
608    ///     You may respond by
609    ///     sending additional messages
610    ///     When the protocol exchange is finished,
611    ///     `UserMsgHandler::handle_msg` should return
612    ///     [`ConversationFinished`](reactor::MetaCellDisposition::ConversationFinished).
613    ///
614    /// If you don't need the `Conversation` to send followup messages,
615    /// you may simply drop it,
616    /// and rely on the responses you get from your handler,
617    /// on the channel from step 0 above.
618    /// Your handler will remain installed and able to process incoming messages
619    /// until it returns `ConversationFinished`.
620    ///
621    /// (If you don't want to accept any replies at all, it may be
622    /// simpler to use [`ClientTunnel::send_raw_msg`].)
623    ///
624    /// Note that it is quite possible to use this function to violate the tor
625    /// protocol; most users of this API will not need to call it.  It is used
626    /// to implement most of the onion service handshake.
627    ///
628    /// # Limitations
629    ///
630    /// Only one conversation may be active at any one time,
631    /// for any one circuit.
632    /// This generally means that this function should not be called
633    /// on a tunnel which might be shared with anyone else.
634    ///
635    /// Likewise, it is forbidden to try to extend the tunnel,
636    /// while the conversation is in progress.
637    ///
638    /// After the conversation has finished, the tunnel may be extended.
639    /// Or, `start_conversation` may be called again;
640    /// but, in that case there will be a gap between the two conversations,
641    /// during which no `UserMsgHandler` is installed,
642    /// and unexpected incoming messages would close the tunnel.
643    ///
644    /// If these restrictions are violated, the tunnel will be closed with an error.
645    ///
646    /// ## Precise definition of the lifetime of a conversation
647    ///
648    /// A conversation is in progress from entry to `start_conversation`,
649    /// until entry to the body of the [`UserMsgHandler::handle_msg`](MsgHandler::handle_msg)
650    /// call which returns [`ConversationFinished`](reactor::MetaCellDisposition::ConversationFinished).
651    /// (*Entry* since `handle_msg` is synchronously embedded
652    /// into the incoming message processing.)
653    /// So you may start a new conversation as soon as you have the final response
654    /// via your inter-task channel from (0) above.
655    ///
656    /// The lifetime relationship of the [`Conversation`],
657    /// vs the handler returning `ConversationFinished`
658    /// is not enforced by the type system.
659    // Doing so without still leaving plenty of scope for runtime errors doesn't seem possible,
660    // at least while allowing sending followup messages from outside the handler.
661    #[cfg(feature = "send-control-msg")]
662    pub async fn start_conversation(
663        &self,
664        msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
665        reply_handler: impl MsgHandler + Send + 'static,
666        hop: TargetHop,
667    ) -> Result<Conversation<'_>> {
668        // We need to resolve the TargetHop into a precise HopLocation so our msg handler can match
669        // the right Leg/Hop with inbound cell.
670        let (sender, receiver) = oneshot::channel();
671        self.circ
672            .command
673            .unbounded_send(CtrlCmd::ResolveTargetHop { hop, done: sender })
674            .map_err(|_| Error::CircuitClosed)?;
675        let hop_location = receiver.await.map_err(|_| Error::CircuitClosed)??;
676        let handler = Box::new(UserMsgHandler::new(hop_location, reply_handler));
677        let conversation = Conversation(self);
678        conversation.send_internal(msg, Some(handler)).await?;
679        Ok(conversation)
680    }
681
682    /// Shut down this tunnel, along with all streams that are using it. Happens asynchronously
683    /// (i.e. the tunnel won't necessarily be done shutting down immediately after this function
684    /// returns!).
685    ///
686    /// Note that other references to this tunnel may exist. If they do, they will stop working
687    /// after you call this function.
688    ///
689    /// It's not necessary to call this method if you're just done with a tunnel: the tunnel should
690    /// close on its own once nothing is using it any more.
691    // TODO(conflux): This should use the ReactorHandle instead.
692    pub fn terminate(&self) {
693        let _ = self.circ.command.unbounded_send(CtrlCmd::Shutdown);
694    }
695
696    /// Helper: Send the resolve message, and read resolved message from
697    /// resolve stream.
698    async fn try_resolve(self: &Arc<Self>, msg: Resolve) -> Result<Resolved> {
699        let components = self
700            .begin_stream_impl(msg.into(), ResolveCmdChecker::new_any())
701            .await?;
702
703        let StreamComponents {
704            stream_receiver,
705            target: _,
706            memquota,
707            xon_xoff_reader_ctrl: _,
708        } = components;
709
710        let mut resolve_stream = ResolveStream::new(stream_receiver, memquota);
711        resolve_stream.read_msg().await
712    }
713
714    // TODO(conflux)
715}
716
717// TODO(conflux): We will likely need to enforce some invariants here, for example that the `circ`
718// has the expected (non-zero) number of hops.
719impl TryFrom<ClientCirc> for ClientTunnel {
720    type Error = Error;
721
722    fn try_from(circ: ClientCirc) -> std::result::Result<Self, Self::Error> {
723        Ok(Self { circ })
724    }
725}
726
727/// A collection of components that can be combined to implement a Tor stream,
728/// or anything that requires a stream ID.
729///
730/// Not all components may be needed, depending on the purpose of the "stream".
731/// For example we build `RELAY_RESOLVE` requests like we do data streams,
732/// but they won't use the `StreamTarget` as they don't need to send additional
733/// messages.
734#[derive(Debug)]
735pub(crate) struct StreamComponents {
736    /// A [`Stream`](futures::Stream) of incoming relay messages for this Tor stream.
737    pub(crate) stream_receiver: StreamReceiver,
738    /// A handle that can communicate messages to the circuit reactor for this stream.
739    pub(crate) target: StreamTarget,
740    /// The memquota [account](tor_memquota::Account) to use for data on this stream.
741    pub(crate) memquota: StreamAccount,
742    /// The control information needed to add XON/XOFF flow control to the stream.
743    pub(crate) xon_xoff_reader_ctrl: XonXoffReaderCtrl,
744}
745
746/// Convert a [`ResolvedVal`] into a Result, based on whether or not
747/// it represents an error.
748fn resolvedval_to_result(val: ResolvedVal) -> Result<ResolvedVal> {
749    match val {
750        ResolvedVal::TransientError => Err(Error::ResolveError(ResolveError::Transient)),
751        ResolvedVal::NontransientError => Err(Error::ResolveError(ResolveError::Nontransient)),
752        ResolvedVal::Unrecognized(_, _) => Err(Error::ResolveError(ResolveError::Unrecognized)),
753        _ => Ok(val),
754    }
755}
756
757/// A precise position in a tunnel.
758#[derive(Debug, Deftly, Copy, Clone, PartialEq, Eq)]
759#[derive_deftly(HasMemoryCost)]
760#[non_exhaustive]
761pub enum HopLocation {
762    /// A specific position in a tunnel.
763    Hop((UniqId, HopNum)),
764    /// The join point of a multi-path tunnel.
765    JoinPoint,
766}
767
768/// A position in a tunnel.
769#[derive(Debug, Copy, Clone, PartialEq, Eq)]
770#[non_exhaustive]
771pub enum TargetHop {
772    /// A specific position in a tunnel.
773    Hop(HopLocation),
774    /// The last hop of a tunnel.
775    ///
776    /// This should be used only when you don't care about what specific hop is used.
777    /// Some tunnels may be extended or truncated,
778    /// which means that the "last hop" may change at any time.
779    LastHop,
780}
781
782impl From<(UniqId, HopNum)> for HopLocation {
783    fn from(v: (UniqId, HopNum)) -> Self {
784        HopLocation::Hop(v)
785    }
786}
787
788impl From<(UniqId, HopNum)> for TargetHop {
789    fn from(v: (UniqId, HopNum)) -> Self {
790        TargetHop::Hop(v.into())
791    }
792}
793
794impl HopLocation {
795    /// Return the hop number if not a JointPoint.
796    pub fn hop_num(&self) -> Option<HopNum> {
797        match self {
798            Self::Hop((_, hop_num)) => Some(*hop_num),
799            Self::JoinPoint => None,
800        }
801    }
802}
803
804/// Internal handle, used to implement a stream on a particular tunnel.
805///
806/// The reader and the writer for a stream should hold a `StreamTarget` for the stream;
807/// the reader should additionally hold an `mpsc::Receiver` to get
808/// relay messages for the stream.
809///
810/// When all the `StreamTarget`s for a stream are dropped, the Reactor will
811/// close the stream by sending an END message to the other side.
812/// You can close a stream earlier by using [`StreamTarget::close`]
813/// or [`StreamTarget::close_pending`].
814#[derive(Clone, Debug)]
815pub(crate) struct StreamTarget {
816    /// Which hop of the circuit this stream is with.
817    hop: HopLocation,
818    /// Reactor ID for this stream.
819    stream_id: StreamId,
820    /// Encoding to use for relay cells sent on this stream.
821    ///
822    /// This is mostly irrelevant, except when deciding
823    /// how many bytes we can pack in a DATA message.
824    relay_cell_format: RelayCellFormat,
825    /// A [`Stream`](futures::Stream) that provides updates to the rate limit for sending data.
826    // TODO(arti#2068): we should consider making this an `Option`
827    rate_limit_stream: watch::Receiver<StreamRateLimit>,
828    /// Channel to send cells down.
829    tx: StreamMpscSender<AnyRelayMsg>,
830    /// Reference to the tunnel that this stream is on.
831    tunnel: Arc<ClientTunnel>,
832}
833
834impl StreamTarget {
835    /// Deliver a relay message for the stream that owns this StreamTarget.
836    ///
837    /// The StreamTarget will set the correct stream ID and pick the
838    /// right hop, but will not validate that the message is well-formed
839    /// or meaningful in context.
840    pub(crate) async fn send(&mut self, msg: AnyRelayMsg) -> Result<()> {
841        self.tx.send(msg).await.map_err(|_| Error::CircuitClosed)?;
842        Ok(())
843    }
844
845    /// Close the pending stream that owns this StreamTarget, delivering the specified
846    /// END message (if any)
847    ///
848    /// The stream is closed by sending a [`CtrlMsg::ClosePendingStream`] message to the reactor.
849    ///
850    /// Returns a [`oneshot::Receiver`] that can be used to await the reactor's response.
851    ///
852    /// The StreamTarget will set the correct stream ID and pick the
853    /// right hop, but will not validate that the message is well-formed
854    /// or meaningful in context.
855    ///
856    /// Note that in many cases, the actual contents of an END message can leak unwanted
857    /// information. Please consider carefully before sending anything but an
858    /// [`End::new_misc()`](tor_cell::relaycell::msg::End::new_misc) message over a `ClientTunnel`.
859    /// (For onion services, we send [`DONE`](tor_cell::relaycell::msg::EndReason::DONE) )
860    ///
861    /// In addition to sending the END message, this function also ensures
862    /// the state of the stream map entry of this stream is updated
863    /// accordingly.
864    ///
865    /// Normally, you shouldn't need to call this function, as streams are implicitly closed by the
866    /// reactor when their corresponding `StreamTarget` is dropped. The only valid use of this
867    /// function is for closing pending incoming streams (a stream is said to be pending if we have
868    /// received the message initiating the stream but have not responded to it yet).
869    ///
870    /// **NOTE**: This function should be called at most once per request.
871    /// Calling it twice is an error.
872    #[cfg(feature = "hs-service")]
873    pub(crate) fn close_pending(
874        &self,
875        message: crate::stream::CloseStreamBehavior,
876    ) -> Result<oneshot::Receiver<Result<()>>> {
877        let (tx, rx) = oneshot::channel();
878
879        self.tunnel
880            .circ
881            .control
882            .unbounded_send(CtrlMsg::ClosePendingStream {
883                stream_id: self.stream_id,
884                hop: self.hop,
885                message,
886                done: tx,
887            })
888            .map_err(|_| Error::CircuitClosed)?;
889
890        Ok(rx)
891    }
892
893    /// Queue a "close" for the stream corresponding to this StreamTarget.
894    ///
895    /// Unlike `close_pending`, this method does not allow the caller to provide an `END` message.
896    ///
897    /// Once this method has been called, no more messages may be sent with [`StreamTarget::send`],
898    /// on this `StreamTarget`` or any clone of it.
899    /// The reactor *will* try to flush any already-send messages before it closes the stream.
900    ///
901    /// You don't need to call this method if the stream is closing because all of its StreamTargets
902    /// have been dropped.
903    pub(crate) fn close(&mut self) {
904        Pin::new(&mut self.tx).close_channel();
905    }
906
907    /// Called when a circuit-level protocol error has occurred and the
908    /// tunnel needs to shut down.
909    pub(crate) fn protocol_error(&mut self) {
910        self.tunnel.terminate();
911    }
912
913    /// Request to send a SENDME cell for this stream.
914    ///
915    /// This sends a request to the circuit reactor to send a stream-level SENDME, but it does not
916    /// block or wait for a response from the circuit reactor.
917    /// An error is only returned if we are unable to send the request.
918    /// This means that if the circuit reactor is unable to send the SENDME, we are not notified of
919    /// this here and an error will not be returned.
920    pub(crate) fn send_sendme(&mut self) -> Result<()> {
921        self.tunnel
922            .circ
923            .control
924            .unbounded_send(CtrlMsg::FlowCtrlUpdate {
925                msg: FlowCtrlMsg::Sendme,
926                stream_id: self.stream_id,
927                hop: self.hop,
928            })
929            .map_err(|_| Error::CircuitClosed)
930    }
931
932    /// Inform the circuit reactor that there has been a change in the drain rate for this stream.
933    ///
934    /// Typically the circuit reactor would send this new rate in an XON message to the other end of
935    /// the stream.
936    /// But it may decide not to, and may discard this update.
937    /// For example the stream may have a large amount of buffered data, and the reactor may not
938    /// want to send an XON while the buffer is large.
939    ///
940    /// This sends a message to inform the circuit reactor of the new drain rate,
941    /// but it does not block or wait for a response from the reactor.
942    /// An error is only returned if we are unable to send the update.
943    pub(crate) fn drain_rate_update(&mut self, rate: XonKbpsEwma) -> Result<()> {
944        self.tunnel
945            .circ
946            .control
947            .unbounded_send(CtrlMsg::FlowCtrlUpdate {
948                msg: FlowCtrlMsg::Xon(rate),
949                stream_id: self.stream_id,
950                hop: self.hop,
951            })
952            .map_err(|_| Error::CircuitClosed)
953    }
954
955    /// Return a reference to the tunnel that this `StreamTarget` is using.
956    #[cfg(any(feature = "experimental-api", feature = "stream-ctrl"))]
957    pub(crate) fn tunnel(&self) -> &Arc<ClientTunnel> {
958        &self.tunnel
959    }
960
961    /// Return the kind of relay cell in use on this `StreamTarget`.
962    pub(crate) fn relay_cell_format(&self) -> RelayCellFormat {
963        self.relay_cell_format
964    }
965
966    /// A [`Stream`](futures::Stream) that provides updates to the rate limit for sending data.
967    pub(crate) fn rate_limit_stream(&self) -> &watch::Receiver<StreamRateLimit> {
968        &self.rate_limit_stream
969    }
970}