tor_proto/client.rs
1//! Client-specific types and implementation.
2
3pub mod channel;
4pub mod circuit;
5pub mod stream;
6
7#[cfg(feature = "send-control-msg")]
8pub(crate) mod msghandler;
9pub(crate) mod reactor;
10
11use derive_deftly::Deftly;
12use oneshot_fused_workaround as oneshot;
13use std::net::IpAddr;
14use std::sync::Arc;
15use tracing::instrument;
16
17use crate::circuit::UniqId;
18#[cfg(feature = "circ-padding-manual")]
19pub use crate::client::circuit::padding::{
20 CircuitPadder, CircuitPadderConfig, CircuitPadderConfigError,
21};
22use crate::client::stream::{
23 DataStream, OutboundDataCmdChecker, ResolveCmdChecker, ResolveStream, StreamParameters,
24 StreamReceiver,
25};
26use crate::congestion::sendme::StreamRecvWindow;
27use crate::crypto::cell::HopNum;
28use crate::memquota::{SpecificAccount as _, StreamAccount};
29use crate::stream::STREAM_READER_BUFFER;
30use crate::stream::cmdcheck::AnyCmdChecker;
31use crate::stream::flow_ctrl::state::StreamRateLimit;
32use crate::stream::flow_ctrl::xon_xoff::reader::XonXoffReaderCtrl;
33use crate::stream::queue::stream_queue;
34use crate::stream::{RECV_WINDOW_INIT, StreamComponents, StreamTarget, Tunnel};
35use crate::util::notify::NotifySender;
36use crate::{Error, ResolveError, Result};
37use circuit::{CIRCUIT_BUFFER_SIZE, ClientCirc, Path};
38use reactor::{CtrlCmd, CtrlMsg, FlowCtrlMsg, MetaCellHandler};
39
40use postage::watch;
41use tor_cell::relaycell::StreamId;
42use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
43use tor_cell::relaycell::msg::{AnyRelayMsg, Begin, Resolve, Resolved, ResolvedVal};
44use tor_error::bad_api_usage;
45use tor_linkspec::OwnedChanTarget;
46use tor_memquota::derive_deftly_template_HasMemoryCost;
47use tor_memquota::mq_queue::{ChannelSpec as _, MpscSpec};
48
49#[cfg(feature = "hs-service")]
50use crate::stream::incoming::StreamReqInfo;
51
52#[cfg(feature = "hs-service")]
53use crate::client::stream::{IncomingCmdChecker, IncomingStream};
54
55#[cfg(feature = "send-control-msg")]
56use msghandler::{MsgHandler, UserMsgHandler};
57
58/// Handle to use during an ongoing protocol exchange with a circuit's last hop
59///
60/// This is obtained from [`ClientTunnel::start_conversation`],
61/// and used to send messages to the last hop relay.
62//
63// TODO(conflux): this should use ClientTunnel, and it should be moved into
64// the tunnel module.
65#[cfg(feature = "send-control-msg")]
66pub struct Conversation<'r>(&'r ClientTunnel);
67
68#[cfg(feature = "send-control-msg")]
69impl Conversation<'_> {
70 /// Send a protocol message as part of an ad-hoc exchange
71 ///
72 /// Responses are handled by the `UserMsgHandler` set up
73 /// when the `Conversation` was created.
74 pub async fn send_message(&self, msg: tor_cell::relaycell::msg::AnyRelayMsg) -> Result<()> {
75 self.send_internal(Some(msg), None).await
76 }
77
78 /// Send a `SendMsgAndInstallHandler` to the reactor and wait for the outcome
79 ///
80 /// The guts of `start_conversation` and `Conversation::send_msg`
81 pub(crate) async fn send_internal(
82 &self,
83 msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
84 handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
85 ) -> Result<()> {
86 let msg = msg.map(|msg| tor_cell::relaycell::AnyRelayMsgOuter::new(None, msg));
87 let (sender, receiver) = oneshot::channel();
88
89 let ctrl_msg = CtrlMsg::SendMsgAndInstallHandler {
90 msg,
91 handler,
92 sender,
93 };
94 self.0
95 .circ
96 .control
97 .unbounded_send(ctrl_msg)
98 .map_err(|_| Error::CircuitClosed)?;
99
100 receiver.await.map_err(|_| Error::CircuitClosed)?
101 }
102}
103
104/// A low-level client tunnel API.
105///
106/// This is a communication channel to the tunnel reactor, which manages 1 or more circuits.
107///
108/// Note: the tor-circmgr crates wrap this type in specialized *Tunnel types exposing only the
109/// desired subset of functionality depending on purpose and path size.
110///
111/// Some API calls are for single path and some for multi path. A check with the underlying reactor
112/// is done preventing for instance multi path calls to be used on a single path. Top level types
113/// should prevent this and thus this object should never be used directly.
114#[derive(Debug)]
115#[allow(dead_code)] // TODO(conflux)
116pub struct ClientTunnel {
117 /// The underlying handle to the reactor.
118 circ: ClientCirc,
119}
120
121impl ClientTunnel {
122 /// Return a handle to the `ClientCirc` of this `ClientTunnel`, if the tunnel is a single
123 /// circuit tunnel.
124 ///
125 /// Returns an error if the tunnel has more than one circuit.
126 pub fn as_single_circ(&self) -> Result<&ClientCirc> {
127 if self.circ.is_multi_path {
128 return Err(bad_api_usage!("Single circuit getter on multi path tunnel"))?;
129 }
130 Ok(&self.circ)
131 }
132
133 /// Return the channel target of the first hop.
134 ///
135 /// Can only be used for single path tunnel.
136 pub fn first_hop(&self) -> Result<OwnedChanTarget> {
137 self.as_single_circ()?.first_hop()
138 }
139
140 /// Return true if the circuit reactor is closed meaning the circuit is unusable for both
141 /// receiving or sending.
142 pub fn is_closed(&self) -> bool {
143 self.circ.is_closing()
144 }
145
146 /// Return a [`TargetHop`] representing precisely the last hop of the circuit as in set as a
147 /// HopLocation with its id and hop number.
148 ///
149 /// Return an error if there is no last hop.
150 pub fn last_hop(&self) -> Result<TargetHop> {
151 let uniq_id = self.unique_id();
152 let hop_num = self
153 .circ
154 .mutable
155 .last_hop_num(uniq_id)?
156 .ok_or_else(|| bad_api_usage!("no last hop"))?;
157 Ok((uniq_id, hop_num).into())
158 }
159
160 /// Return a description of the last hop of the tunnel.
161 ///
162 /// Return None if the last hop is virtual; return an error
163 /// if the tunnel has no circuits, or all of its circuits are zero length.
164 ///
165 ///
166 /// # Panics
167 ///
168 /// Panics if there is no last hop. (This should be impossible outside of
169 /// the tor-proto crate, but within the crate it's possible to have a
170 /// circuit with no hops.)
171 pub fn last_hop_info(&self) -> Result<Option<OwnedChanTarget>> {
172 self.circ.last_hop_info()
173 }
174
175 /// Return the number of hops this tunnel as. Fail for a multi path.
176 pub fn n_hops(&self) -> Result<usize> {
177 self.as_single_circ()?.n_hops()
178 }
179
180 /// Return the [`Path`] objects describing all the hops
181 /// of all the circuits in this tunnel.
182 pub fn all_paths(&self) -> Vec<Arc<Path>> {
183 self.circ.all_paths()
184 }
185
186 /// Return a process-unique identifier for this tunnel.
187 ///
188 /// Returns the reactor unique ID of the main reactor.
189 pub fn unique_id(&self) -> UniqId {
190 self.circ.unique_id()
191 }
192
193 /// Return the time at which this tunnel last had any open streams.
194 ///
195 /// Returns `None` if this tunnel has never had any open streams,
196 /// or if it currently has open streams.
197 ///
198 /// NOTE that the Instant returned by this method is not affected by
199 /// any runtime mocking; it is the output of an ordinary call to
200 /// `Instant::now()`.
201 pub async fn disused_since(&self) -> Result<Option<std::time::Instant>> {
202 self.circ.disused_since().await
203 }
204
205 /// Return a future that will resolve once the underlying circuit reactor has closed.
206 ///
207 /// Note that this method does not itself cause the tunnel to shut down.
208 pub fn wait_for_close(
209 self: &Arc<Self>,
210 ) -> impl futures::Future<Output = ()> + Send + Sync + 'static + use<> {
211 self.circ.wait_for_close()
212 }
213
214 /// Single-path tunnel only. Multi path onion service is not supported yet.
215 ///
216 /// Tell this tunnel to begin allowing the final hop of the tunnel to try
217 /// to create new Tor streams, and to return those pending requests in an
218 /// asynchronous stream.
219 ///
220 /// Ordinarily, these requests are rejected.
221 ///
222 /// There can only be one [`Stream`](futures::Stream) of this type created on a given tunnel.
223 /// If a such a [`Stream`](futures::Stream) already exists, this method will return
224 /// an error.
225 ///
226 /// After this method has been called on a tunnel, the tunnel is expected
227 /// to receive requests of this type indefinitely, until it is finally closed.
228 /// If the `Stream` is dropped, the next request on this tunnel will cause it to close.
229 ///
230 /// Only onion services (and eventually) exit relays should call this
231 /// method.
232 //
233 // TODO: Someday, we might want to allow a stream request handler to be
234 // un-registered. However, nothing in the Tor protocol requires it.
235 //
236 // Any incoming request handlers installed on the other circuits
237 // (which are are shutdown using CtrlCmd::ShutdownAndReturnCircuit)
238 // will be discarded (along with the reactor of that circuit)
239 #[cfg(feature = "hs-service")]
240 #[allow(unreachable_code, unused_variables)] // TODO(conflux)
241 pub async fn allow_stream_requests<'a, FILT>(
242 self: &Arc<Self>,
243 allow_commands: &'a [tor_cell::relaycell::RelayCmd],
244 hop: TargetHop,
245 filter: FILT,
246 ) -> Result<impl futures::Stream<Item = IncomingStream> + use<'a, FILT>>
247 where
248 FILT: crate::client::stream::IncomingStreamRequestFilter + 'a,
249 {
250 use futures::stream::StreamExt;
251
252 /// The size of the channel receiving IncomingStreamRequestContexts.
253 const INCOMING_BUFFER: usize = STREAM_READER_BUFFER;
254
255 // TODO(#2002): support onion service conflux
256 let circ = self.as_single_circ().map_err(tor_error::into_internal!(
257 "Cannot allow stream requests on a multi-path tunnel"
258 ))?;
259
260 let time_prov = circ.time_provider.clone();
261 let cmd_checker = IncomingCmdChecker::new_any(allow_commands);
262 let (incoming_sender, incoming_receiver) = MpscSpec::new(INCOMING_BUFFER)
263 .new_mq(time_prov.clone(), circ.memquota.as_raw_account())?;
264 let (tx, rx) = oneshot::channel();
265
266 circ.command
267 .unbounded_send(CtrlCmd::AwaitStreamRequest {
268 cmd_checker,
269 incoming_sender,
270 hop,
271 done: tx,
272 filter: Box::new(filter),
273 })
274 .map_err(|_| Error::CircuitClosed)?;
275
276 // Check whether the AwaitStreamRequest was processed successfully.
277 rx.await.map_err(|_| Error::CircuitClosed)??;
278
279 let allowed_hop_loc: HopLocation = match hop {
280 TargetHop::Hop(loc) => Some(loc),
281 _ => None,
282 }
283 .ok_or_else(|| bad_api_usage!("Expected TargetHop with HopLocation"))?;
284
285 let tunnel = self.clone();
286 Ok(incoming_receiver.map(move |req_ctx| {
287 let StreamReqInfo {
288 req,
289 stream_id,
290 hop,
291 receiver,
292 msg_tx,
293 rate_limit_stream,
294 drain_rate_request_stream,
295 memquota,
296 relay_cell_format,
297 } = req_ctx;
298
299 // We already enforce this in handle_incoming_stream_request; this
300 // assertion is just here to make sure that we don't ever
301 // accidentally remove or fail to enforce that check, since it is
302 // security-critical.
303 assert_eq!(Some(allowed_hop_loc), hop);
304
305 // TODO(#2002): figure out what this is going to look like
306 // for onion services (perhaps we should forbid this function
307 // from being called on a multipath circuit?)
308 //
309 // See also:
310 // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3002#note_3200937
311 let target = StreamTarget {
312 tunnel: Tunnel::Client(Arc::clone(&tunnel)),
313 tx: msg_tx,
314 hop: Some(allowed_hop_loc),
315 stream_id,
316 relay_cell_format,
317 rate_limit_stream,
318 };
319
320 // can be used to build a reader that supports XON/XOFF flow control
321 let xon_xoff_reader_ctrl =
322 XonXoffReaderCtrl::new(drain_rate_request_stream, target.clone());
323
324 let reader = StreamReceiver {
325 target: target.clone(),
326 receiver,
327 recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
328 ended: false,
329 };
330
331 let components = StreamComponents {
332 stream_receiver: reader,
333 target,
334 memquota,
335 xon_xoff_reader_ctrl,
336 };
337
338 IncomingStream::new(time_prov.clone(), req, components)
339 }))
340 }
341
342 /// Single and Multi path helper, used to begin a stream.
343 ///
344 /// This function allocates a stream ID, and sends the message
345 /// (like a BEGIN or RESOLVE), but doesn't wait for a response.
346 ///
347 /// The caller will typically want to see the first cell in response,
348 /// to see whether it is e.g. an END or a CONNECTED.
349 async fn begin_stream_impl(
350 self: &Arc<Self>,
351 begin_msg: AnyRelayMsg,
352 cmd_checker: AnyCmdChecker,
353 ) -> Result<StreamComponents> {
354 // TODO: Possibly this should take a hop, rather than just
355 // assuming it's the last hop.
356 let hop = TargetHop::LastHop;
357
358 let time_prov = self.circ.time_provider.clone();
359
360 let memquota = StreamAccount::new(self.circ.mq_account())?;
361 let (sender, receiver) = stream_queue(
362 #[cfg(not(feature = "flowctl-cc"))]
363 STREAM_READER_BUFFER,
364 &memquota,
365 &time_prov,
366 )?;
367 let (tx, rx) = oneshot::channel();
368 let (msg_tx, msg_rx) =
369 MpscSpec::new(CIRCUIT_BUFFER_SIZE).new_mq(time_prov, memquota.as_raw_account())?;
370
371 let (rate_limit_tx, rate_limit_rx) = watch::channel_with(StreamRateLimit::MAX);
372
373 // A channel for the reactor to request a new drain rate from the reader.
374 // Typically this notification will be sent after an XOFF is sent so that the reader can
375 // send us a new drain rate when the stream data queue becomes empty.
376 let mut drain_rate_request_tx = NotifySender::new_typed();
377 let drain_rate_request_rx = drain_rate_request_tx.subscribe();
378
379 self.circ
380 .control
381 .unbounded_send(CtrlMsg::BeginStream {
382 hop,
383 message: begin_msg,
384 sender,
385 rx: msg_rx,
386 rate_limit_notifier: rate_limit_tx,
387 drain_rate_requester: drain_rate_request_tx,
388 done: tx,
389 cmd_checker,
390 })
391 .map_err(|_| Error::CircuitClosed)?;
392
393 let (stream_id, hop, relay_cell_format) = rx.await.map_err(|_| Error::CircuitClosed)??;
394
395 let target = StreamTarget {
396 tunnel: Tunnel::Client(self.clone()),
397 tx: msg_tx,
398 hop: Some(hop),
399 stream_id,
400 relay_cell_format,
401 rate_limit_stream: rate_limit_rx,
402 };
403
404 // can be used to build a reader that supports XON/XOFF flow control
405 let xon_xoff_reader_ctrl = XonXoffReaderCtrl::new(drain_rate_request_rx, target.clone());
406
407 let stream_receiver = StreamReceiver {
408 target: target.clone(),
409 receiver,
410 recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
411 ended: false,
412 };
413
414 let components = StreamComponents {
415 stream_receiver,
416 target,
417 memquota,
418 xon_xoff_reader_ctrl,
419 };
420
421 Ok(components)
422 }
423
424 /// Install a [`CircuitPadder`] at the listed `hop`.
425 ///
426 /// Replaces any previous padder installed at that hop.
427 #[cfg(feature = "circ-padding-manual")]
428 pub async fn start_padding_at_hop(
429 self: &Arc<Self>,
430 hop: HopLocation,
431 padder: CircuitPadder,
432 ) -> Result<()> {
433 self.circ.set_padder_impl(hop, Some(padder)).await
434 }
435
436 /// Remove any [`CircuitPadder`] at the listed `hop`.
437 ///
438 /// Does nothing if there was not a padder installed there.
439 #[cfg(feature = "circ-padding-manual")]
440 pub async fn stop_padding_at_hop(self: &Arc<Self>, hop: HopLocation) -> Result<()> {
441 self.circ.set_padder_impl(hop, None).await
442 }
443
444 /// Start a DataStream (anonymized connection) to the given
445 /// address and port, using a BEGIN cell.
446 async fn begin_data_stream(
447 self: &Arc<Self>,
448 msg: AnyRelayMsg,
449 optimistic: bool,
450 ) -> Result<DataStream> {
451 let components = self
452 .begin_stream_impl(msg, OutboundDataCmdChecker::new_any())
453 .await?;
454
455 let StreamComponents {
456 stream_receiver,
457 target,
458 memquota,
459 xon_xoff_reader_ctrl,
460 } = components;
461
462 let mut stream = DataStream::new(
463 self.circ.time_provider.clone(),
464 stream_receiver,
465 xon_xoff_reader_ctrl,
466 target,
467 memquota,
468 );
469 if !optimistic {
470 stream.wait_for_connection().await?;
471 }
472 Ok(stream)
473 }
474
475 /// Single and multi path helper.
476 ///
477 /// Start a stream to the given address and port, using a BEGIN
478 /// cell.
479 ///
480 /// The use of a string for the address is intentional: you should let
481 /// the remote Tor relay do the hostname lookup for you.
482 #[instrument(level = "trace", skip_all)]
483 pub async fn begin_stream(
484 self: &Arc<Self>,
485 target: &str,
486 port: u16,
487 parameters: Option<StreamParameters>,
488 ) -> Result<DataStream> {
489 let parameters = parameters.unwrap_or_default();
490 let begin_flags = parameters.begin_flags();
491 let optimistic = parameters.is_optimistic();
492 let target = if parameters.suppressing_hostname() {
493 ""
494 } else {
495 target
496 };
497 let beginmsg = Begin::new(target, port, begin_flags)
498 .map_err(|e| Error::from_cell_enc(e, "begin message"))?;
499 self.begin_data_stream(beginmsg.into(), optimistic).await
500 }
501
502 /// Start a new stream to the last relay in the tunnel, using
503 /// a BEGIN_DIR cell.
504 pub async fn begin_dir_stream(self: Arc<Self>) -> Result<DataStream> {
505 // Note that we always open begindir connections optimistically.
506 // Since they are local to a relay that we've already authenticated
507 // with and built a tunnel to, there should be no additional checks
508 // we need to perform to see whether the BEGINDIR will succeed.
509 self.begin_data_stream(AnyRelayMsg::BeginDir(Default::default()), true)
510 .await
511 }
512
513 /// Perform a DNS lookup, using a RESOLVE cell with the last relay
514 /// in this tunnel.
515 ///
516 /// Note that this function does not check for timeouts; that's
517 /// the caller's responsibility.
518 pub async fn resolve(self: &Arc<Self>, hostname: &str) -> Result<Vec<IpAddr>> {
519 let resolve_msg = Resolve::new(hostname);
520
521 let resolved_msg = self.try_resolve(resolve_msg).await?;
522
523 resolved_msg
524 .into_answers()
525 .into_iter()
526 .filter_map(|(val, _)| match resolvedval_to_result(val) {
527 Ok(ResolvedVal::Ip(ip)) => Some(Ok(ip)),
528 Ok(_) => None,
529 Err(e) => Some(Err(e)),
530 })
531 .collect()
532 }
533
534 /// Perform a reverse DNS lookup, by sending a RESOLVE cell with
535 /// the last relay on this tunnel.
536 ///
537 /// Note that this function does not check for timeouts; that's
538 /// the caller's responsibility.
539 pub async fn resolve_ptr(self: &Arc<Self>, addr: IpAddr) -> Result<Vec<String>> {
540 let resolve_ptr_msg = Resolve::new_reverse(&addr);
541
542 let resolved_msg = self.try_resolve(resolve_ptr_msg).await?;
543
544 resolved_msg
545 .into_answers()
546 .into_iter()
547 .filter_map(|(val, _)| match resolvedval_to_result(val) {
548 Ok(ResolvedVal::Hostname(v)) => Some(
549 String::from_utf8(v)
550 .map_err(|_| Error::StreamProto("Resolved Hostname was not utf-8".into())),
551 ),
552 Ok(_) => None,
553 Err(e) => Some(Err(e)),
554 })
555 .collect()
556 }
557
558 /// Send an ad-hoc message to a given hop on the circuit, without expecting
559 /// a reply.
560 ///
561 /// (If you want to handle one or more possible replies, see
562 /// [`ClientTunnel::start_conversation`].)
563 // TODO(conflux): Change this to use the ReactorHandle for the control commands.
564 #[cfg(feature = "send-control-msg")]
565 pub async fn send_raw_msg(
566 &self,
567 msg: tor_cell::relaycell::msg::AnyRelayMsg,
568 hop: TargetHop,
569 ) -> Result<()> {
570 let (sender, receiver) = oneshot::channel();
571 let ctrl_msg = CtrlMsg::SendMsg { hop, msg, sender };
572 self.circ
573 .control
574 .unbounded_send(ctrl_msg)
575 .map_err(|_| Error::CircuitClosed)?;
576
577 receiver.await.map_err(|_| Error::CircuitClosed)?
578 }
579
580 /// Start an ad-hoc protocol exchange to the specified hop on this tunnel.
581 ///
582 /// To use this:
583 ///
584 /// 0. Create an inter-task channel you'll use to receive
585 /// the outcome of your conversation,
586 /// and bundle it into a [`UserMsgHandler`].
587 ///
588 /// 1. Call `start_conversation`.
589 /// This will install a your handler, for incoming messages,
590 /// and send the outgoing message (if you provided one).
591 /// After that, each message on the circuit
592 /// that isn't handled by the core machinery
593 /// is passed to your provided `reply_handler`.
594 ///
595 /// 2. Possibly call `send_msg` on the [`Conversation`],
596 /// from the call site of `start_conversation`,
597 /// possibly multiple times, from time to time,
598 /// to send further desired messages to the peer.
599 ///
600 /// 3. In your [`UserMsgHandler`], process the incoming messages.
601 /// You may respond by
602 /// sending additional messages
603 /// When the protocol exchange is finished,
604 /// `UserMsgHandler::handle_msg` should return
605 /// [`ConversationFinished`](reactor::MetaCellDisposition::ConversationFinished).
606 ///
607 /// If you don't need the `Conversation` to send followup messages,
608 /// you may simply drop it,
609 /// and rely on the responses you get from your handler,
610 /// on the channel from step 0 above.
611 /// Your handler will remain installed and able to process incoming messages
612 /// until it returns `ConversationFinished`.
613 ///
614 /// (If you don't want to accept any replies at all, it may be
615 /// simpler to use [`ClientTunnel::send_raw_msg`].)
616 ///
617 /// Note that it is quite possible to use this function to violate the tor
618 /// protocol; most users of this API will not need to call it. It is used
619 /// to implement most of the onion service handshake.
620 ///
621 /// # Limitations
622 ///
623 /// Only one conversation may be active at any one time,
624 /// for any one circuit.
625 /// This generally means that this function should not be called
626 /// on a tunnel which might be shared with anyone else.
627 ///
628 /// Likewise, it is forbidden to try to extend the tunnel,
629 /// while the conversation is in progress.
630 ///
631 /// After the conversation has finished, the tunnel may be extended.
632 /// Or, `start_conversation` may be called again;
633 /// but, in that case there will be a gap between the two conversations,
634 /// during which no `UserMsgHandler` is installed,
635 /// and unexpected incoming messages would close the tunnel.
636 ///
637 /// If these restrictions are violated, the tunnel will be closed with an error.
638 ///
639 /// ## Precise definition of the lifetime of a conversation
640 ///
641 /// A conversation is in progress from entry to `start_conversation`,
642 /// until entry to the body of the [`UserMsgHandler::handle_msg`](MsgHandler::handle_msg)
643 /// call which returns [`ConversationFinished`](reactor::MetaCellDisposition::ConversationFinished).
644 /// (*Entry* since `handle_msg` is synchronously embedded
645 /// into the incoming message processing.)
646 /// So you may start a new conversation as soon as you have the final response
647 /// via your inter-task channel from (0) above.
648 ///
649 /// The lifetime relationship of the [`Conversation`],
650 /// vs the handler returning `ConversationFinished`
651 /// is not enforced by the type system.
652 // Doing so without still leaving plenty of scope for runtime errors doesn't seem possible,
653 // at least while allowing sending followup messages from outside the handler.
654 #[cfg(feature = "send-control-msg")]
655 pub async fn start_conversation(
656 &self,
657 msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
658 reply_handler: impl MsgHandler + Send + 'static,
659 hop: TargetHop,
660 ) -> Result<Conversation<'_>> {
661 // We need to resolve the TargetHop into a precise HopLocation so our msg handler can match
662 // the right Leg/Hop with inbound cell.
663 let (sender, receiver) = oneshot::channel();
664 self.circ
665 .command
666 .unbounded_send(CtrlCmd::ResolveTargetHop { hop, done: sender })
667 .map_err(|_| Error::CircuitClosed)?;
668 let hop_location = receiver.await.map_err(|_| Error::CircuitClosed)??;
669 let handler = Box::new(UserMsgHandler::new(hop_location, reply_handler));
670 let conversation = Conversation(self);
671 conversation.send_internal(msg, Some(handler)).await?;
672 Ok(conversation)
673 }
674
675 /// Shut down this tunnel, along with all streams that are using it. Happens asynchronously
676 /// (i.e. the tunnel won't necessarily be done shutting down immediately after this function
677 /// returns!).
678 ///
679 /// Note that other references to this tunnel may exist. If they do, they will stop working
680 /// after you call this function.
681 ///
682 /// It's not necessary to call this method if you're just done with a tunnel: the tunnel should
683 /// close on its own once nothing is using it any more.
684 // TODO(conflux): This should use the ReactorHandle instead.
685 pub fn terminate(&self) {
686 let _ = self.circ.command.unbounded_send(CtrlCmd::Shutdown);
687 }
688
689 /// Helper: Send the resolve message, and read resolved message from
690 /// resolve stream.
691 async fn try_resolve(self: &Arc<Self>, msg: Resolve) -> Result<Resolved> {
692 let components = self
693 .begin_stream_impl(msg.into(), ResolveCmdChecker::new_any())
694 .await?;
695
696 let StreamComponents {
697 stream_receiver,
698 target: _,
699 memquota,
700 xon_xoff_reader_ctrl: _,
701 } = components;
702
703 let mut resolve_stream = ResolveStream::new(stream_receiver, memquota);
704 resolve_stream.read_msg().await
705 }
706
707 // TODO(conflux)
708}
709
710// TODO(conflux): We will likely need to enforce some invariants here, for example that the `circ`
711// has the expected (non-zero) number of hops.
712impl TryFrom<ClientCirc> for ClientTunnel {
713 type Error = Error;
714
715 fn try_from(circ: ClientCirc) -> std::result::Result<Self, Self::Error> {
716 Ok(Self { circ })
717 }
718}
719
720/// Convert a [`ResolvedVal`] into a Result, based on whether or not
721/// it represents an error.
722fn resolvedval_to_result(val: ResolvedVal) -> Result<ResolvedVal> {
723 match val {
724 ResolvedVal::TransientError => Err(Error::ResolveError(ResolveError::Transient)),
725 ResolvedVal::NontransientError => Err(Error::ResolveError(ResolveError::Nontransient)),
726 ResolvedVal::Unrecognized(_, _) => Err(Error::ResolveError(ResolveError::Unrecognized)),
727 _ => Ok(val),
728 }
729}
730
731/// A precise position in a tunnel.
732#[derive(Debug, Deftly, Copy, Clone, PartialEq, Eq)]
733#[derive_deftly(HasMemoryCost)]
734#[non_exhaustive]
735pub enum HopLocation {
736 /// A specific position in a tunnel.
737 Hop((UniqId, HopNum)),
738 /// The join point of a multi-path tunnel.
739 JoinPoint,
740}
741
742/// A position in a tunnel.
743#[derive(Debug, Copy, Clone, PartialEq, Eq)]
744#[non_exhaustive]
745pub enum TargetHop {
746 /// A specific position in a tunnel.
747 Hop(HopLocation),
748 /// The last hop of a tunnel.
749 ///
750 /// This should be used only when you don't care about what specific hop is used.
751 /// Some tunnels may be extended or truncated,
752 /// which means that the "last hop" may change at any time.
753 LastHop,
754}
755
756impl From<(UniqId, HopNum)> for HopLocation {
757 fn from(v: (UniqId, HopNum)) -> Self {
758 HopLocation::Hop(v)
759 }
760}
761
762impl From<(UniqId, HopNum)> for TargetHop {
763 fn from(v: (UniqId, HopNum)) -> Self {
764 TargetHop::Hop(v.into())
765 }
766}
767
768impl HopLocation {
769 /// Return the hop number if not a JointPoint.
770 pub fn hop_num(&self) -> Option<HopNum> {
771 match self {
772 Self::Hop((_, hop_num)) => Some(*hop_num),
773 Self::JoinPoint => None,
774 }
775 }
776}
777
778impl ClientTunnel {
779 /// Close the pending stream that owns this StreamTarget, delivering the specified
780 /// END message (if any)
781 ///
782 /// See [`StreamTarget::close_pending`].
783 #[cfg(feature = "hs-service")]
784 pub(crate) fn close_pending(
785 &self,
786 stream_id: StreamId,
787 hop: Option<HopLocation>,
788 message: crate::stream::CloseStreamBehavior,
789 ) -> Result<oneshot::Receiver<Result<()>>> {
790 let (tx, rx) = oneshot::channel();
791
792 self.circ
793 .control
794 .unbounded_send(CtrlMsg::ClosePendingStream {
795 stream_id,
796 hop: hop.expect("missing stream hop for client tunnel"),
797 message,
798 done: tx,
799 })
800 .map_err(|_| Error::CircuitClosed)?;
801
802 Ok(rx)
803 }
804
805 /// Request to send a SENDME cell for this stream.
806 ///
807 /// See [`StreamTarget::send_sendme`].
808 pub(crate) fn send_sendme(&self, stream_id: StreamId, hop: Option<HopLocation>) -> Result<()> {
809 self.circ
810 .control
811 .unbounded_send(CtrlMsg::FlowCtrlUpdate {
812 msg: FlowCtrlMsg::Sendme,
813 stream_id,
814 hop: hop.expect("missing stream hop for client tunnel"),
815 })
816 .map_err(|_| Error::CircuitClosed)
817 }
818
819 /// Inform the circuit reactor that there has been a change in the drain rate for this stream.
820 ///
821 /// See [`StreamTarget::drain_rate_update`].
822 pub(crate) fn drain_rate_update(
823 &self,
824 stream_id: StreamId,
825 hop: Option<HopLocation>,
826 rate: XonKbpsEwma,
827 ) -> Result<()> {
828 self.circ
829 .control
830 .unbounded_send(CtrlMsg::FlowCtrlUpdate {
831 msg: FlowCtrlMsg::Xon(rate),
832 stream_id,
833 hop: hop.expect("missing stream hop for client tunnel"),
834 })
835 .map_err(|_| Error::CircuitClosed)
836 }
837}