tor_proto/client.rs
1//! Client-specific types and implementation.
2
3pub mod channel;
4pub mod circuit;
5pub mod stream;
6
7#[cfg(feature = "rpc")]
8pub mod rpc;
9
10#[cfg(feature = "send-control-msg")]
11pub(crate) mod msghandler;
12pub(crate) mod reactor;
13
14use derive_deftly::Deftly;
15use oneshot_fused_workaround as oneshot;
16use std::net::IpAddr;
17use std::sync::Arc;
18use tracing::instrument;
19
20use crate::circuit::UniqId;
21use crate::circuit::circhop::ReactorStreamComponents;
22#[cfg(feature = "circ-padding-manual")]
23pub use crate::client::circuit::padding::{
24 CircuitPadder, CircuitPadderConfig, CircuitPadderConfigError,
25};
26use crate::client::stream::{
27 DataStream, OutboundDataCmdChecker, ResolveCmdChecker, ResolveStream, StreamParameters,
28};
29use crate::congestion::sendme::StreamRecvWindow;
30use crate::crypto::cell::HopNum;
31use crate::memquota::{SpecificAccount as _, StreamAccount};
32use crate::stream::STREAM_READER_BUFFER;
33use crate::stream::cmdcheck::AnyCmdChecker;
34use crate::stream::flow_ctrl::xon_xoff::reader::XonXoffReaderCtrl;
35use crate::stream::{RECV_WINDOW_INIT, StreamComponents, StreamReceiver, StreamTarget, Tunnel};
36use crate::{Error, ResolveError, Result};
37use circuit::{ClientCirc, Path};
38use reactor::{CtrlCmd, CtrlMsg, FlowCtrlMsg, MetaCellHandler};
39
40use tor_cell::relaycell::StreamId;
41use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
42use tor_cell::relaycell::msg::{AnyRelayMsg, Begin, Resolve, Resolved, ResolvedVal};
43use tor_error::bad_api_usage;
44use tor_linkspec::OwnedChanTarget;
45use tor_memquota::derive_deftly_template_HasMemoryCost;
46use tor_memquota::mq_queue::{ChannelSpec as _, MpscSpec};
47
48#[cfg(feature = "hs-service")]
49use crate::stream::{IncomingCmdChecker, IncomingStream, StreamReqInfo};
50
51#[cfg(feature = "send-control-msg")]
52use msghandler::{MsgHandler, UserMsgHandler};
53
54/// Handle to use during an ongoing protocol exchange with a circuit's last hop
55///
56/// This is obtained from [`ClientTunnel::start_conversation`],
57/// and used to send messages to the last hop relay.
58//
59// TODO(conflux): this should use ClientTunnel, and it should be moved into
60// the tunnel module.
61#[cfg(feature = "send-control-msg")]
62pub struct Conversation<'r>(&'r ClientTunnel);
63
64#[cfg(feature = "send-control-msg")]
65impl Conversation<'_> {
66 /// Send a protocol message as part of an ad-hoc exchange
67 ///
68 /// Responses are handled by the `UserMsgHandler` set up
69 /// when the `Conversation` was created.
70 pub async fn send_message(&self, msg: tor_cell::relaycell::msg::AnyRelayMsg) -> Result<()> {
71 self.send_internal(Some(msg), None).await
72 }
73
74 /// Send a `SendMsgAndInstallHandler` to the reactor and wait for the outcome
75 ///
76 /// The guts of `start_conversation` and `Conversation::send_msg`
77 pub(crate) async fn send_internal(
78 &self,
79 msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
80 handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
81 ) -> Result<()> {
82 let msg = msg.map(|msg| tor_cell::relaycell::AnyRelayMsgOuter::new(None, msg));
83 let (sender, receiver) = oneshot::channel();
84
85 let ctrl_msg = CtrlMsg::SendMsgAndInstallHandler {
86 msg,
87 handler,
88 sender,
89 };
90 self.0
91 .circ
92 .control
93 .unbounded_send(ctrl_msg)
94 .map_err(|_| Error::CircuitClosed)?;
95
96 receiver.await.map_err(|_| Error::CircuitClosed)?
97 }
98}
99
100/// A low-level client tunnel API.
101///
102/// This is a communication channel to the tunnel reactor, which manages 1 or more circuits.
103///
104/// Note: the tor-circmgr crates wrap this type in specialized *Tunnel types exposing only the
105/// desired subset of functionality depending on purpose and path size.
106///
107/// Some API calls are for single path and some for multi path. A check with the underlying reactor
108/// is done preventing for instance multi path calls to be used on a single path. Top level types
109/// should prevent this and thus this object should never be used directly.
110#[derive(Debug)]
111#[cfg_attr(
112 feature = "rpc",
113 derive(derive_deftly::Deftly),
114 derive_deftly(tor_rpcbase::templates::Object)
115)]
116#[allow(dead_code)] // TODO(conflux)
117pub struct ClientTunnel {
118 /// The underlying handle to the reactor.
119 circ: ClientCirc,
120}
121
122impl ClientTunnel {
123 /// Return a handle to the `ClientCirc` of this `ClientTunnel`, if the tunnel is a single
124 /// circuit tunnel.
125 ///
126 /// Returns an error if the tunnel has more than one circuit.
127 pub fn as_single_circ(&self) -> Result<&ClientCirc> {
128 if self.circ.is_multi_path {
129 return Err(bad_api_usage!("Single circuit getter on multi path tunnel"))?;
130 }
131 Ok(&self.circ)
132 }
133
134 /// Return the channel target of the first hop.
135 ///
136 /// Can only be used for single path tunnel.
137 pub fn first_hop(&self) -> Result<OwnedChanTarget> {
138 self.as_single_circ()?.first_hop()
139 }
140
141 /// Return true if the circuit reactor is closed meaning the circuit is unusable for both
142 /// receiving or sending.
143 pub fn is_closed(&self) -> bool {
144 self.circ.is_closing()
145 }
146
147 /// Return a [`TargetHop`] representing precisely the last hop of the circuit as in set as a
148 /// HopLocation with its id and hop number.
149 ///
150 /// Return an error if there is no last hop.
151 pub fn last_hop(&self) -> Result<TargetHop> {
152 let uniq_id = self.unique_id();
153 let hop_num = self
154 .circ
155 .mutable
156 .last_hop_num(uniq_id)?
157 .ok_or_else(|| bad_api_usage!("no last hop"))?;
158 Ok((uniq_id, hop_num).into())
159 }
160
161 /// Return a description of the last hop of the tunnel.
162 ///
163 /// Return None if the last hop is virtual; return an error
164 /// if the tunnel has no circuits, or all of its circuits are zero length.
165 ///
166 ///
167 /// # Panics
168 ///
169 /// Panics if there is no last hop. (This should be impossible outside of
170 /// the tor-proto crate, but within the crate it's possible to have a
171 /// circuit with no hops.)
172 pub fn last_hop_info(&self) -> Result<Option<OwnedChanTarget>> {
173 self.circ.last_hop_info()
174 }
175
176 /// Return the number of hops this tunnel as. Fail for a multi path.
177 pub fn n_hops(&self) -> Result<usize> {
178 self.as_single_circ()?.n_hops()
179 }
180
181 /// Return the [`Path`] objects describing all the hops
182 /// of all the circuits in this tunnel.
183 pub fn all_paths(&self) -> Vec<Arc<Path>> {
184 self.circ.all_paths()
185 }
186
187 /// Return a representation of the Paths for all the circuits in this tunnel,
188 /// as a map from each circuits' UniqId to its path.
189 ///
190 /// This is only exposed for the RPC subsystem, where it is documented that the
191 /// format of `UniqId` is not stable.
192 #[cfg(feature = "rpc")]
193 pub(crate) fn tagged_paths(&self) -> std::collections::HashMap<UniqId, Arc<Path>> {
194 self.circ.mutable.tagged_paths()
195 }
196
197 /// Return a process-unique identifier for this tunnel.
198 ///
199 /// Returns the reactor unique ID of the main reactor.
200 pub fn unique_id(&self) -> UniqId {
201 self.circ.unique_id()
202 }
203
204 /// Return the time at which this tunnel last had any open streams.
205 ///
206 /// Returns `None` if this tunnel has never had any open streams,
207 /// or if it currently has open streams.
208 ///
209 /// NOTE that the Instant returned by this method is not affected by
210 /// any runtime mocking; it is the output of an ordinary call to
211 /// `Instant::now()`.
212 pub async fn disused_since(&self) -> Result<Option<web_time_compat::Instant>> {
213 self.circ.disused_since().await
214 }
215
216 /// Return a future that will resolve once the underlying circuit reactor has closed.
217 ///
218 /// Note that this method does not itself cause the tunnel to shut down.
219 pub fn wait_for_close(
220 self: &Arc<Self>,
221 ) -> impl futures::Future<Output = ()> + Send + Sync + 'static + use<> {
222 self.circ.wait_for_close()
223 }
224
225 /// Single-path tunnel only. Multi path onion service is not supported yet.
226 ///
227 /// Tell this tunnel to begin allowing the final hop of the tunnel to try
228 /// to create new Tor streams, and to return those pending requests in an
229 /// asynchronous stream.
230 ///
231 /// Ordinarily, these requests are rejected.
232 ///
233 /// There can only be one [`Stream`](futures::Stream) of this type created on a given tunnel.
234 /// If a such a [`Stream`](futures::Stream) already exists, this method will return
235 /// an error.
236 ///
237 /// After this method has been called on a tunnel, the tunnel is expected
238 /// to receive requests of this type indefinitely, until it is finally closed.
239 /// If the `Stream` is dropped, the next request on this tunnel will cause it to close.
240 ///
241 /// Only onion services (and eventually) exit relays should call this
242 /// method.
243 //
244 // TODO: Someday, we might want to allow a stream request handler to be
245 // un-registered. However, nothing in the Tor protocol requires it.
246 //
247 // Any incoming request handlers installed on the other circuits
248 // (which are shutdown using CtrlCmd::ShutdownAndReturnCircuit)
249 // will be discarded (along with the reactor of that circuit)
250 #[cfg(feature = "hs-service")]
251 #[allow(unreachable_code, unused_variables)] // TODO(conflux)
252 pub async fn allow_stream_requests<'a, FILT>(
253 self: &Arc<Self>,
254 allow_commands: &'a [tor_cell::relaycell::RelayCmd],
255 hop: TargetHop,
256 filter: FILT,
257 ) -> Result<impl futures::Stream<Item = IncomingStream> + use<'a, FILT>>
258 where
259 FILT: crate::stream::IncomingStreamRequestFilter + 'a,
260 {
261 use futures::stream::StreamExt;
262
263 /// The size of the channel receiving IncomingStreamRequestContexts.
264 const INCOMING_BUFFER: usize = STREAM_READER_BUFFER;
265
266 // TODO(#2002): support onion service conflux
267 let circ = self.as_single_circ().map_err(tor_error::into_internal!(
268 "Cannot allow stream requests on a multi-path tunnel"
269 ))?;
270
271 let time_prov = circ.time_provider.clone();
272 let cmd_checker = IncomingCmdChecker::new_any(allow_commands);
273 let (incoming_sender, incoming_receiver) = MpscSpec::new(INCOMING_BUFFER)
274 .new_mq(time_prov.clone(), circ.memquota.as_raw_account())?;
275 let (tx, rx) = oneshot::channel();
276
277 circ.command
278 .unbounded_send(CtrlCmd::AwaitStreamRequest {
279 cmd_checker,
280 incoming_sender,
281 hop,
282 done: tx,
283 filter: Box::new(filter),
284 })
285 .map_err(|_| Error::CircuitClosed)?;
286
287 // Check whether the AwaitStreamRequest was processed successfully.
288 rx.await.map_err(|_| Error::CircuitClosed)??;
289
290 let allowed_hop_loc: HopLocation = match hop {
291 TargetHop::Hop(loc) => Some(loc),
292 _ => None,
293 }
294 .ok_or_else(|| bad_api_usage!("Expected TargetHop with HopLocation"))?;
295
296 let tunnel = self.clone();
297 Ok(incoming_receiver.map(move |req_ctx| {
298 let StreamReqInfo {
299 req,
300 stream_id,
301 hop,
302 stream_components:
303 ReactorStreamComponents {
304 stream_inbound_rx,
305 stream_outbound_tx,
306 rate_limit_rx,
307 drain_rate_request_rx,
308 },
309 memquota,
310 relay_cell_format,
311 } = req_ctx;
312
313 // We already enforce this in handle_incoming_stream_request; this
314 // assertion is just here to make sure that we don't ever
315 // accidentally remove or fail to enforce that check, since it is
316 // security-critical.
317 assert_eq!(Some(allowed_hop_loc), hop);
318
319 // TODO(#2002): figure out what this is going to look like
320 // for onion services (perhaps we should forbid this function
321 // from being called on a multipath circuit?)
322 //
323 // See also:
324 // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3002#note_3200937
325 let target = StreamTarget {
326 tunnel: Tunnel::Client(Arc::clone(&tunnel)),
327 tx: stream_outbound_tx,
328 hop: Some(allowed_hop_loc),
329 stream_id,
330 relay_cell_format,
331 rate_limit_stream: rate_limit_rx,
332 };
333
334 // can be used to build a reader that supports XON/XOFF flow control
335 let xon_xoff_reader_ctrl =
336 XonXoffReaderCtrl::new(drain_rate_request_rx, target.clone());
337
338 let reader = StreamReceiver {
339 target: target.clone(),
340 receiver: stream_inbound_rx,
341 recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
342 ended: false,
343 };
344
345 let components = StreamComponents {
346 stream_receiver: reader,
347 target,
348 memquota,
349 xon_xoff_reader_ctrl,
350 };
351
352 IncomingStream::new(time_prov.clone(), req, components)
353 }))
354 }
355
356 /// Single and Multi path helper, used to begin a stream.
357 ///
358 /// This function allocates a stream ID, and sends the message
359 /// (like a BEGIN or RESOLVE), but doesn't wait for a response.
360 ///
361 /// The caller will typically want to see the first cell in response,
362 /// to see whether it is e.g. an END or a CONNECTED.
363 async fn begin_stream_impl(
364 self: &Arc<Self>,
365 begin_msg: AnyRelayMsg,
366 cmd_checker: AnyCmdChecker,
367 ) -> Result<StreamComponents> {
368 // TODO: Possibly this should take a hop, rather than just
369 // assuming it's the last hop.
370 let hop = TargetHop::LastHop;
371
372 let memquota = StreamAccount::new(self.circ.mq_account())?;
373 let (tx, rx) = oneshot::channel();
374
375 self.circ
376 .control
377 .unbounded_send(CtrlMsg::BeginStream {
378 hop,
379 message: begin_msg,
380 memquota: memquota.clone(),
381 done: tx,
382 cmd_checker,
383 })
384 .map_err(|_| Error::CircuitClosed)?;
385
386 let (stream_id, hop, relay_cell_format, stream_components) =
387 rx.await.map_err(|_| Error::CircuitClosed)??;
388
389 // Destructure so that we don't forget to use any fields.
390 let ReactorStreamComponents {
391 stream_inbound_rx,
392 stream_outbound_tx,
393 rate_limit_rx,
394 drain_rate_request_rx,
395 } = stream_components;
396
397 let target = StreamTarget {
398 tunnel: Tunnel::Client(self.clone()),
399 tx: stream_outbound_tx,
400 hop: Some(hop),
401 stream_id,
402 relay_cell_format,
403 rate_limit_stream: rate_limit_rx,
404 };
405
406 // can be used to build a reader that supports XON/XOFF flow control
407 let xon_xoff_reader_ctrl = XonXoffReaderCtrl::new(drain_rate_request_rx, target.clone());
408
409 let stream_receiver = StreamReceiver {
410 target: target.clone(),
411 receiver: stream_inbound_rx,
412 recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
413 ended: false,
414 };
415
416 let components = StreamComponents {
417 stream_receiver,
418 target,
419 memquota,
420 xon_xoff_reader_ctrl,
421 };
422
423 Ok(components)
424 }
425
426 /// Install a [`CircuitPadder`] at the listed `hop`.
427 ///
428 /// Replaces any previous padder installed at that hop.
429 #[cfg(feature = "circ-padding-manual")]
430 pub async fn start_padding_at_hop(
431 self: &Arc<Self>,
432 hop: HopLocation,
433 padder: CircuitPadder,
434 ) -> Result<()> {
435 self.circ.set_padder_impl(hop, Some(padder)).await
436 }
437
438 /// Remove any [`CircuitPadder`] at the listed `hop`.
439 ///
440 /// Does nothing if there was not a padder installed there.
441 #[cfg(feature = "circ-padding-manual")]
442 pub async fn stop_padding_at_hop(self: &Arc<Self>, hop: HopLocation) -> Result<()> {
443 self.circ.set_padder_impl(hop, None).await
444 }
445
446 /// Start a DataStream (anonymized connection) to the given
447 /// address and port, using a BEGIN cell.
448 async fn begin_data_stream(
449 self: &Arc<Self>,
450 msg: AnyRelayMsg,
451 optimistic: bool,
452 ) -> Result<DataStream> {
453 let components = self
454 .begin_stream_impl(msg, OutboundDataCmdChecker::new_any())
455 .await?;
456
457 let StreamComponents {
458 stream_receiver,
459 target,
460 memquota,
461 xon_xoff_reader_ctrl,
462 } = components;
463
464 let mut stream = DataStream::new(
465 self.circ.time_provider.clone(),
466 stream_receiver,
467 xon_xoff_reader_ctrl,
468 target,
469 memquota,
470 );
471 if !optimistic {
472 stream.wait_for_connection().await?;
473 }
474 Ok(stream)
475 }
476
477 /// Single and multi path helper.
478 ///
479 /// Start a stream to the given address and port, using a BEGIN
480 /// cell.
481 ///
482 /// The use of a string for the address is intentional: you should let
483 /// the remote Tor relay do the hostname lookup for you.
484 #[instrument(level = "trace", skip_all)]
485 pub async fn begin_stream(
486 self: &Arc<Self>,
487 target: &str,
488 port: u16,
489 parameters: Option<StreamParameters>,
490 ) -> Result<DataStream> {
491 let parameters = parameters.unwrap_or_default();
492 let begin_flags = parameters.begin_flags();
493 let optimistic = parameters.is_optimistic();
494 let target = if parameters.suppressing_hostname() {
495 ""
496 } else {
497 target
498 };
499 let beginmsg = Begin::new(target, port, begin_flags)
500 .map_err(|e| Error::from_cell_enc(e, "begin message"))?;
501 self.begin_data_stream(beginmsg.into(), optimistic).await
502 }
503
504 /// Start a new stream to the last relay in the tunnel, using
505 /// a BEGIN_DIR cell.
506 pub async fn begin_dir_stream(self: Arc<Self>) -> Result<DataStream> {
507 // Note that we always open begindir connections optimistically.
508 // Since they are local to a relay that we've already authenticated
509 // with and built a tunnel to, there should be no additional checks
510 // we need to perform to see whether the BEGINDIR will succeed.
511 self.begin_data_stream(AnyRelayMsg::BeginDir(Default::default()), true)
512 .await
513 }
514
515 /// Perform a DNS lookup, using a RESOLVE cell with the last relay
516 /// in this tunnel.
517 ///
518 /// Note that this function does not check for timeouts; that's
519 /// the caller's responsibility.
520 pub async fn resolve(self: &Arc<Self>, hostname: &str) -> Result<Vec<IpAddr>> {
521 let resolve_msg = Resolve::new(hostname);
522
523 let resolved_msg = self.try_resolve(resolve_msg).await?;
524
525 resolved_msg
526 .into_answers()
527 .into_iter()
528 .filter_map(|(val, _)| match resolvedval_to_result(val) {
529 Ok(ResolvedVal::Ip(ip)) => Some(Ok(ip)),
530 Ok(_) => None,
531 Err(e) => Some(Err(e)),
532 })
533 .collect()
534 }
535
536 /// Perform a reverse DNS lookup, by sending a RESOLVE cell with
537 /// the last relay on this tunnel.
538 ///
539 /// Note that this function does not check for timeouts; that's
540 /// the caller's responsibility.
541 pub async fn resolve_ptr(self: &Arc<Self>, addr: IpAddr) -> Result<Vec<String>> {
542 let resolve_ptr_msg = Resolve::new_reverse(&addr);
543
544 let resolved_msg = self.try_resolve(resolve_ptr_msg).await?;
545
546 resolved_msg
547 .into_answers()
548 .into_iter()
549 .filter_map(|(val, _)| match resolvedval_to_result(val) {
550 Ok(ResolvedVal::Hostname(v)) => Some(
551 String::from_utf8(v)
552 .map_err(|_| Error::StreamProto("Resolved Hostname was not utf-8".into())),
553 ),
554 Ok(_) => None,
555 Err(e) => Some(Err(e)),
556 })
557 .collect()
558 }
559
560 /// Send an ad-hoc message to a given hop on the circuit, without expecting
561 /// a reply.
562 ///
563 /// (If you want to handle one or more possible replies, see
564 /// [`ClientTunnel::start_conversation`].)
565 // TODO(conflux): Change this to use the ReactorHandle for the control commands.
566 #[cfg(feature = "send-control-msg")]
567 pub async fn send_raw_msg(
568 &self,
569 msg: tor_cell::relaycell::msg::AnyRelayMsg,
570 hop: TargetHop,
571 ) -> Result<()> {
572 let (sender, receiver) = oneshot::channel();
573 let ctrl_msg = CtrlMsg::SendMsg { hop, msg, sender };
574 self.circ
575 .control
576 .unbounded_send(ctrl_msg)
577 .map_err(|_| Error::CircuitClosed)?;
578
579 receiver.await.map_err(|_| Error::CircuitClosed)?
580 }
581
582 /// Start an ad-hoc protocol exchange to the specified hop on this tunnel.
583 ///
584 /// To use this:
585 ///
586 /// 0. Create an inter-task channel you'll use to receive
587 /// the outcome of your conversation,
588 /// and bundle it into a [`UserMsgHandler`].
589 ///
590 /// 1. Call `start_conversation`.
591 /// This will install a your handler, for incoming messages,
592 /// and send the outgoing message (if you provided one).
593 /// After that, each message on the circuit
594 /// that isn't handled by the core machinery
595 /// is passed to your provided `reply_handler`.
596 ///
597 /// 2. Possibly call `send_msg` on the [`Conversation`],
598 /// from the call site of `start_conversation`,
599 /// possibly multiple times, from time to time,
600 /// to send further desired messages to the peer.
601 ///
602 /// 3. In your [`UserMsgHandler`], process the incoming messages.
603 /// You may respond by
604 /// sending additional messages
605 /// When the protocol exchange is finished,
606 /// `UserMsgHandler::handle_msg` should return
607 /// [`ConversationFinished`](reactor::MetaCellDisposition::ConversationFinished).
608 ///
609 /// If you don't need the `Conversation` to send followup messages,
610 /// you may simply drop it,
611 /// and rely on the responses you get from your handler,
612 /// on the channel from step 0 above.
613 /// Your handler will remain installed and able to process incoming messages
614 /// until it returns `ConversationFinished`.
615 ///
616 /// (If you don't want to accept any replies at all, it may be
617 /// simpler to use [`ClientTunnel::send_raw_msg`].)
618 ///
619 /// Note that it is quite possible to use this function to violate the tor
620 /// protocol; most users of this API will not need to call it. It is used
621 /// to implement most of the onion service handshake.
622 ///
623 /// # Limitations
624 ///
625 /// Only one conversation may be active at any one time,
626 /// for any one circuit.
627 /// This generally means that this function should not be called
628 /// on a tunnel which might be shared with anyone else.
629 ///
630 /// Likewise, it is forbidden to try to extend the tunnel,
631 /// while the conversation is in progress.
632 ///
633 /// After the conversation has finished, the tunnel may be extended.
634 /// Or, `start_conversation` may be called again;
635 /// but, in that case there will be a gap between the two conversations,
636 /// during which no `UserMsgHandler` is installed,
637 /// and unexpected incoming messages would close the tunnel.
638 ///
639 /// If these restrictions are violated, the tunnel will be closed with an error.
640 ///
641 /// ## Precise definition of the lifetime of a conversation
642 ///
643 /// A conversation is in progress from entry to `start_conversation`,
644 /// until entry to the body of the [`UserMsgHandler::handle_msg`](MsgHandler::handle_msg)
645 /// call which returns [`ConversationFinished`](reactor::MetaCellDisposition::ConversationFinished).
646 /// (*Entry* since `handle_msg` is synchronously embedded
647 /// into the incoming message processing.)
648 /// So you may start a new conversation as soon as you have the final response
649 /// via your inter-task channel from (0) above.
650 ///
651 /// The lifetime relationship of the [`Conversation`],
652 /// vs the handler returning `ConversationFinished`
653 /// is not enforced by the type system.
654 // Doing so without still leaving plenty of scope for runtime errors doesn't seem possible,
655 // at least while allowing sending followup messages from outside the handler.
656 #[cfg(feature = "send-control-msg")]
657 pub async fn start_conversation(
658 &self,
659 msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
660 reply_handler: impl MsgHandler + Send + 'static,
661 hop: TargetHop,
662 ) -> Result<Conversation<'_>> {
663 // We need to resolve the TargetHop into a precise HopLocation so our msg handler can match
664 // the right Leg/Hop with inbound cell.
665 let (sender, receiver) = oneshot::channel();
666 self.circ
667 .command
668 .unbounded_send(CtrlCmd::ResolveTargetHop { hop, done: sender })
669 .map_err(|_| Error::CircuitClosed)?;
670 let hop_location = receiver.await.map_err(|_| Error::CircuitClosed)??;
671 let handler = Box::new(UserMsgHandler::new(hop_location, reply_handler));
672 let conversation = Conversation(self);
673 conversation.send_internal(msg, Some(handler)).await?;
674 Ok(conversation)
675 }
676
677 /// Shut down this tunnel, along with all streams that are using it. Happens asynchronously
678 /// (i.e. the tunnel won't necessarily be done shutting down immediately after this function
679 /// returns!).
680 ///
681 /// Note that other references to this tunnel may exist. If they do, they will stop working
682 /// after you call this function.
683 ///
684 /// It's not necessary to call this method if you're just done with a tunnel: the tunnel should
685 /// close on its own once nothing is using it any more.
686 // TODO(conflux): This should use the ReactorHandle instead.
687 pub fn terminate(&self) {
688 let _ = self.circ.command.unbounded_send(CtrlCmd::Shutdown);
689 }
690
691 /// Helper: Send the resolve message, and read resolved message from
692 /// resolve stream.
693 async fn try_resolve(self: &Arc<Self>, msg: Resolve) -> Result<Resolved> {
694 let components = self
695 .begin_stream_impl(msg.into(), ResolveCmdChecker::new_any())
696 .await?;
697
698 let StreamComponents {
699 stream_receiver,
700 target: _,
701 memquota,
702 xon_xoff_reader_ctrl: _,
703 } = components;
704
705 let mut resolve_stream = ResolveStream::new(stream_receiver, memquota);
706 resolve_stream.read_msg().await
707 }
708
709 // TODO(conflux)
710}
711
712// TODO(conflux): We will likely need to enforce some invariants here, for example that the `circ`
713// has the expected (non-zero) number of hops.
714impl TryFrom<ClientCirc> for ClientTunnel {
715 type Error = Error;
716
717 fn try_from(circ: ClientCirc) -> std::result::Result<Self, Self::Error> {
718 Ok(Self { circ })
719 }
720}
721
722/// Convert a [`ResolvedVal`] into a Result, based on whether or not
723/// it represents an error.
724fn resolvedval_to_result(val: ResolvedVal) -> Result<ResolvedVal> {
725 match val {
726 ResolvedVal::TransientError => Err(Error::ResolveError(ResolveError::Transient)),
727 ResolvedVal::NontransientError => Err(Error::ResolveError(ResolveError::Nontransient)),
728 ResolvedVal::Unrecognized(_, _) => Err(Error::ResolveError(ResolveError::Unrecognized)),
729 _ => Ok(val),
730 }
731}
732
733/// A precise position in a tunnel.
734#[derive(Debug, Deftly, Copy, Clone, PartialEq, Eq)]
735#[derive_deftly(HasMemoryCost)]
736#[non_exhaustive]
737pub enum HopLocation {
738 /// A specific position in a tunnel.
739 Hop((UniqId, HopNum)),
740 /// The join point of a multi-path tunnel.
741 JoinPoint,
742}
743
744/// A position in a tunnel.
745#[derive(Debug, Copy, Clone, PartialEq, Eq)]
746#[non_exhaustive]
747pub enum TargetHop {
748 /// A specific position in a tunnel.
749 Hop(HopLocation),
750 /// The last hop of a tunnel.
751 ///
752 /// This should be used only when you don't care about what specific hop is used.
753 /// Some tunnels may be extended or truncated,
754 /// which means that the "last hop" may change at any time.
755 LastHop,
756}
757
758impl From<(UniqId, HopNum)> for HopLocation {
759 fn from(v: (UniqId, HopNum)) -> Self {
760 HopLocation::Hop(v)
761 }
762}
763
764impl From<(UniqId, HopNum)> for TargetHop {
765 fn from(v: (UniqId, HopNum)) -> Self {
766 TargetHop::Hop(v.into())
767 }
768}
769
770impl HopLocation {
771 /// Return the hop number if not a JointPoint.
772 pub fn hop_num(&self) -> Option<HopNum> {
773 match self {
774 Self::Hop((_, hop_num)) => Some(*hop_num),
775 Self::JoinPoint => None,
776 }
777 }
778}
779
780impl ClientTunnel {
781 /// Close the pending stream that owns this StreamTarget, delivering the specified
782 /// END message (if any)
783 ///
784 /// See [`StreamTarget::close_pending`].
785 #[cfg(feature = "hs-service")]
786 pub(crate) fn close_pending(
787 &self,
788 stream_id: StreamId,
789 hop: Option<HopLocation>,
790 message: crate::stream::CloseStreamBehavior,
791 ) -> Result<oneshot::Receiver<Result<()>>> {
792 let (tx, rx) = oneshot::channel();
793
794 self.circ
795 .control
796 .unbounded_send(CtrlMsg::ClosePendingStream {
797 stream_id,
798 hop: hop.expect("missing stream hop for client tunnel"),
799 message,
800 done: tx,
801 })
802 .map_err(|_| Error::CircuitClosed)?;
803
804 Ok(rx)
805 }
806
807 /// Request to send a SENDME cell for this stream.
808 ///
809 /// See [`StreamTarget::send_sendme`].
810 pub(crate) fn send_sendme(&self, stream_id: StreamId, hop: Option<HopLocation>) -> Result<()> {
811 self.circ
812 .control
813 .unbounded_send(CtrlMsg::FlowCtrlUpdate {
814 msg: FlowCtrlMsg::Sendme,
815 stream_id,
816 hop: hop.expect("missing stream hop for client tunnel"),
817 })
818 .map_err(|_| Error::CircuitClosed)
819 }
820
821 /// Inform the circuit reactor that there has been a change in the drain rate for this stream.
822 ///
823 /// See [`StreamTarget::drain_rate_update`].
824 pub(crate) fn drain_rate_update(
825 &self,
826 stream_id: StreamId,
827 hop: Option<HopLocation>,
828 rate: XonKbpsEwma,
829 ) -> Result<()> {
830 self.circ
831 .control
832 .unbounded_send(CtrlMsg::FlowCtrlUpdate {
833 msg: FlowCtrlMsg::Xon(rate),
834 stream_id,
835 hop: hop.expect("missing stream hop for client tunnel"),
836 })
837 .map_err(|_| Error::CircuitClosed)
838 }
839}