tor_proto/client.rs
1//! Client-specific types and implementation.
2
3pub mod channel;
4pub mod circuit;
5pub mod stream;
6
7#[cfg(feature = "send-control-msg")]
8pub(crate) mod msghandler;
9pub(crate) mod reactor;
10
11use derive_deftly::Deftly;
12use futures::SinkExt as _;
13use oneshot_fused_workaround as oneshot;
14use std::net::IpAddr;
15use std::pin::Pin;
16use std::sync::Arc;
17use tracing::instrument;
18
19use crate::circuit::UniqId;
20#[cfg(feature = "circ-padding-manual")]
21#[cfg_attr(docsrs, doc(cfg(feature = "circ-padding-manual")))]
22pub use crate::client::circuit::padding::{
23 CircuitPadder, CircuitPadderConfig, CircuitPadderConfigError,
24};
25use crate::client::stream::{
26 DataStream, OutboundDataCmdChecker, ResolveCmdChecker, ResolveStream, StreamParameters,
27 StreamReceiver,
28};
29use crate::congestion::sendme::StreamRecvWindow;
30use crate::crypto::cell::HopNum;
31use crate::memquota::{SpecificAccount as _, StreamAccount};
32use crate::stream::StreamMpscSender;
33use crate::stream::cmdcheck::AnyCmdChecker;
34use crate::stream::flow_ctrl::state::StreamRateLimit;
35use crate::stream::flow_ctrl::xon_xoff::reader::XonXoffReaderCtrl;
36use crate::stream::queue::stream_queue;
37use crate::util::notify::NotifySender;
38use crate::{Error, ResolveError, Result};
39use circuit::{CIRCUIT_BUFFER_SIZE, ClientCirc, Path};
40use reactor::{
41 CtrlCmd, CtrlMsg, FlowCtrlMsg, MetaCellHandler, RECV_WINDOW_INIT, STREAM_READER_BUFFER,
42};
43
44use postage::watch;
45use tor_async_utils::SinkCloseChannel as _;
46use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
47use tor_cell::relaycell::msg::{AnyRelayMsg, Begin, Resolve, Resolved, ResolvedVal};
48use tor_cell::relaycell::{RelayCellFormat, StreamId};
49use tor_error::bad_api_usage;
50use tor_linkspec::OwnedChanTarget;
51use tor_memquota::derive_deftly_template_HasMemoryCost;
52use tor_memquota::mq_queue::{ChannelSpec as _, MpscSpec};
53
54#[cfg(feature = "hs-service")]
55use {
56 crate::client::reactor::StreamReqInfo,
57 crate::client::stream::{IncomingCmdChecker, IncomingStream},
58};
59
60#[cfg(feature = "send-control-msg")]
61use msghandler::{MsgHandler, UserMsgHandler};
62
63/// Handle to use during an ongoing protocol exchange with a circuit's last hop
64///
65/// This is obtained from [`ClientTunnel::start_conversation`],
66/// and used to send messages to the last hop relay.
67//
68// TODO(conflux): this should use ClientTunnel, and it should be moved into
69// the tunnel module.
70#[cfg(feature = "send-control-msg")]
71#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
72pub struct Conversation<'r>(&'r ClientTunnel);
73
74#[cfg(feature = "send-control-msg")]
75#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
76impl Conversation<'_> {
77 /// Send a protocol message as part of an ad-hoc exchange
78 ///
79 /// Responses are handled by the `UserMsgHandler` set up
80 /// when the `Conversation` was created.
81 pub async fn send_message(&self, msg: tor_cell::relaycell::msg::AnyRelayMsg) -> Result<()> {
82 self.send_internal(Some(msg), None).await
83 }
84
85 /// Send a `SendMsgAndInstallHandler` to the reactor and wait for the outcome
86 ///
87 /// The guts of `start_conversation` and `Conversation::send_msg`
88 pub(crate) async fn send_internal(
89 &self,
90 msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
91 handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
92 ) -> Result<()> {
93 let msg = msg.map(|msg| tor_cell::relaycell::AnyRelayMsgOuter::new(None, msg));
94 let (sender, receiver) = oneshot::channel();
95
96 let ctrl_msg = CtrlMsg::SendMsgAndInstallHandler {
97 msg,
98 handler,
99 sender,
100 };
101 self.0
102 .circ
103 .control
104 .unbounded_send(ctrl_msg)
105 .map_err(|_| Error::CircuitClosed)?;
106
107 receiver.await.map_err(|_| Error::CircuitClosed)?
108 }
109}
110
111/// A low-level client tunnel API.
112///
113/// This is a communication channel to the tunnel reactor, which manages 1 or more circuits.
114///
115/// Note: the tor-circmgr crates wrap this type in specialized *Tunnel types exposing only the
116/// desired subset of functionality depending on purpose and path size.
117///
118/// Some API calls are for single path and some for multi path. A check with the underlying reactor
119/// is done preventing for instance multi path calls to be used on a single path. Top level types
120/// should prevent this and thus this object should never be used directly.
121#[derive(Debug)]
122#[allow(dead_code)] // TODO(conflux)
123pub struct ClientTunnel {
124 /// The underlying handle to the reactor.
125 circ: ClientCirc,
126}
127
128impl ClientTunnel {
129 /// Return a handle to the `ClientCirc` of this `ClientTunnel`, if the tunnel is a single
130 /// circuit tunnel.
131 ///
132 /// Returns an error if the tunnel has more than one circuit.
133 pub fn as_single_circ(&self) -> Result<&ClientCirc> {
134 if self.circ.is_multi_path {
135 return Err(bad_api_usage!("Single circuit getter on multi path tunnel"))?;
136 }
137 Ok(&self.circ)
138 }
139
140 /// Return the channel target of the first hop.
141 ///
142 /// Can only be used for single path tunnel.
143 pub fn first_hop(&self) -> Result<OwnedChanTarget> {
144 self.as_single_circ()?.first_hop()
145 }
146
147 /// Return true if the circuit reactor is closed meaning the circuit is unusable for both
148 /// receiving or sending.
149 pub fn is_closed(&self) -> bool {
150 self.circ.is_closing()
151 }
152
153 /// Return a [`TargetHop`] representing precisely the last hop of the circuit as in set as a
154 /// HopLocation with its id and hop number.
155 ///
156 /// Return an error if there is no last hop.
157 pub fn last_hop(&self) -> Result<TargetHop> {
158 let uniq_id = self.unique_id();
159 let hop_num = self
160 .circ
161 .mutable
162 .last_hop_num(uniq_id)?
163 .ok_or_else(|| bad_api_usage!("no last hop"))?;
164 Ok((uniq_id, hop_num).into())
165 }
166
167 /// Return a description of the last hop of the tunnel.
168 ///
169 /// Return None if the last hop is virtual; return an error
170 /// if the tunnel has no circuits, or all of its circuits are zero length.
171 ///
172 ///
173 /// # Panics
174 ///
175 /// Panics if there is no last hop. (This should be impossible outside of
176 /// the tor-proto crate, but within the crate it's possible to have a
177 /// circuit with no hops.)
178 pub fn last_hop_info(&self) -> Result<Option<OwnedChanTarget>> {
179 self.circ.last_hop_info()
180 }
181
182 /// Return the number of hops this tunnel as. Fail for a multi path.
183 pub fn n_hops(&self) -> Result<usize> {
184 self.as_single_circ()?.n_hops()
185 }
186
187 /// Return the [`Path`] objects describing all the hops
188 /// of all the circuits in this tunnel.
189 pub fn all_paths(&self) -> Vec<Arc<Path>> {
190 self.circ.all_paths()
191 }
192
193 /// Return a process-unique identifier for this tunnel.
194 ///
195 /// Returns the reactor unique ID of the main reactor.
196 pub fn unique_id(&self) -> UniqId {
197 self.circ.unique_id()
198 }
199
200 /// Return the time at which this tunnel last had any open streams.
201 ///
202 /// Returns `None` if this tunnel has never had any open streams,
203 /// or if it currently has open streams.
204 ///
205 /// NOTE that the Instant returned by this method is not affected by
206 /// any runtime mocking; it is the output of an ordinary call to
207 /// `Instant::now()`.
208 pub async fn disused_since(&self) -> Result<Option<std::time::Instant>> {
209 self.circ.disused_since().await
210 }
211
212 /// Return a future that will resolve once the underlying circuit reactor has closed.
213 ///
214 /// Note that this method does not itself cause the tunnel to shut down.
215 pub fn wait_for_close(
216 self: &Arc<Self>,
217 ) -> impl futures::Future<Output = ()> + Send + Sync + 'static + use<> {
218 self.circ.wait_for_close()
219 }
220
221 /// Single-path tunnel only. Multi path onion service is not supported yet.
222 ///
223 /// Tell this tunnel to begin allowing the final hop of the tunnel to try
224 /// to create new Tor streams, and to return those pending requests in an
225 /// asynchronous stream.
226 ///
227 /// Ordinarily, these requests are rejected.
228 ///
229 /// There can only be one [`Stream`](futures::Stream) of this type created on a given tunnel.
230 /// If a such a [`Stream`](futures::Stream) already exists, this method will return
231 /// an error.
232 ///
233 /// After this method has been called on a tunnel, the tunnel is expected
234 /// to receive requests of this type indefinitely, until it is finally closed.
235 /// If the `Stream` is dropped, the next request on this tunnel will cause it to close.
236 ///
237 /// Only onion services (and eventually) exit relays should call this
238 /// method.
239 //
240 // TODO: Someday, we might want to allow a stream request handler to be
241 // un-registered. However, nothing in the Tor protocol requires it.
242 //
243 // Any incoming request handlers installed on the other circuits
244 // (which are are shutdown using CtrlCmd::ShutdownAndReturnCircuit)
245 // will be discarded (along with the reactor of that circuit)
246 #[cfg(feature = "hs-service")]
247 #[allow(unreachable_code, unused_variables)] // TODO(conflux)
248 pub async fn allow_stream_requests<'a, FILT>(
249 self: &Arc<Self>,
250 allow_commands: &'a [tor_cell::relaycell::RelayCmd],
251 hop: TargetHop,
252 filter: FILT,
253 ) -> Result<impl futures::Stream<Item = IncomingStream> + use<'a, FILT>>
254 where
255 FILT: crate::client::stream::IncomingStreamRequestFilter + 'a,
256 {
257 use futures::stream::StreamExt;
258
259 /// The size of the channel receiving IncomingStreamRequestContexts.
260 const INCOMING_BUFFER: usize = STREAM_READER_BUFFER;
261
262 // TODO(#2002): support onion service conflux
263 let circ = self.as_single_circ().map_err(tor_error::into_internal!(
264 "Cannot allow stream requests on a multi-path tunnel"
265 ))?;
266
267 let time_prov = circ.time_provider.clone();
268 let cmd_checker = IncomingCmdChecker::new_any(allow_commands);
269 let (incoming_sender, incoming_receiver) = MpscSpec::new(INCOMING_BUFFER)
270 .new_mq(time_prov.clone(), circ.memquota.as_raw_account())?;
271 let (tx, rx) = oneshot::channel();
272
273 circ.command
274 .unbounded_send(CtrlCmd::AwaitStreamRequest {
275 cmd_checker,
276 incoming_sender,
277 hop,
278 done: tx,
279 filter: Box::new(filter),
280 })
281 .map_err(|_| Error::CircuitClosed)?;
282
283 // Check whether the AwaitStreamRequest was processed successfully.
284 rx.await.map_err(|_| Error::CircuitClosed)??;
285
286 let allowed_hop_loc: HopLocation = match hop {
287 TargetHop::Hop(loc) => Some(loc),
288 _ => None,
289 }
290 .ok_or_else(|| bad_api_usage!("Expected TargetHop with HopLocation"))?;
291
292 let tunnel = self.clone();
293 Ok(incoming_receiver.map(move |req_ctx| {
294 let StreamReqInfo {
295 req,
296 stream_id,
297 hop,
298 receiver,
299 msg_tx,
300 rate_limit_stream,
301 drain_rate_request_stream,
302 memquota,
303 relay_cell_format,
304 } = req_ctx;
305
306 // We already enforce this in handle_incoming_stream_request; this
307 // assertion is just here to make sure that we don't ever
308 // accidentally remove or fail to enforce that check, since it is
309 // security-critical.
310 assert_eq!(allowed_hop_loc, hop);
311
312 // TODO(#2002): figure out what this is going to look like
313 // for onion services (perhaps we should forbid this function
314 // from being called on a multipath circuit?)
315 //
316 // See also:
317 // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3002#note_3200937
318 let target = StreamTarget {
319 tunnel: tunnel.clone(),
320 tx: msg_tx,
321 hop: allowed_hop_loc,
322 stream_id,
323 relay_cell_format,
324 rate_limit_stream,
325 };
326
327 // can be used to build a reader that supports XON/XOFF flow control
328 let xon_xoff_reader_ctrl =
329 XonXoffReaderCtrl::new(drain_rate_request_stream, target.clone());
330
331 let reader = StreamReceiver {
332 target: target.clone(),
333 receiver,
334 recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
335 ended: false,
336 };
337
338 let components = StreamComponents {
339 stream_receiver: reader,
340 target,
341 memquota,
342 xon_xoff_reader_ctrl,
343 };
344
345 IncomingStream::new(time_prov.clone(), req, components)
346 }))
347 }
348
349 /// Single and Multi path helper, used to begin a stream.
350 ///
351 /// This function allocates a stream ID, and sends the message
352 /// (like a BEGIN or RESOLVE), but doesn't wait for a response.
353 ///
354 /// The caller will typically want to see the first cell in response,
355 /// to see whether it is e.g. an END or a CONNECTED.
356 async fn begin_stream_impl(
357 self: &Arc<Self>,
358 begin_msg: AnyRelayMsg,
359 cmd_checker: AnyCmdChecker,
360 ) -> Result<StreamComponents> {
361 // TODO: Possibly this should take a hop, rather than just
362 // assuming it's the last hop.
363 let hop = TargetHop::LastHop;
364
365 let time_prov = self.circ.time_provider.clone();
366
367 let memquota = StreamAccount::new(self.circ.mq_account())?;
368 let (sender, receiver) = stream_queue(
369 #[cfg(not(feature = "flowctl-cc"))]
370 STREAM_READER_BUFFER,
371 &memquota,
372 &time_prov,
373 )?;
374 let (tx, rx) = oneshot::channel();
375 let (msg_tx, msg_rx) =
376 MpscSpec::new(CIRCUIT_BUFFER_SIZE).new_mq(time_prov, memquota.as_raw_account())?;
377
378 let (rate_limit_tx, rate_limit_rx) = watch::channel_with(StreamRateLimit::MAX);
379
380 // A channel for the reactor to request a new drain rate from the reader.
381 // Typically this notification will be sent after an XOFF is sent so that the reader can
382 // send us a new drain rate when the stream data queue becomes empty.
383 let mut drain_rate_request_tx = NotifySender::new_typed();
384 let drain_rate_request_rx = drain_rate_request_tx.subscribe();
385
386 self.circ
387 .control
388 .unbounded_send(CtrlMsg::BeginStream {
389 hop,
390 message: begin_msg,
391 sender,
392 rx: msg_rx,
393 rate_limit_notifier: rate_limit_tx,
394 drain_rate_requester: drain_rate_request_tx,
395 done: tx,
396 cmd_checker,
397 })
398 .map_err(|_| Error::CircuitClosed)?;
399
400 let (stream_id, hop, relay_cell_format) = rx.await.map_err(|_| Error::CircuitClosed)??;
401
402 let target = StreamTarget {
403 tunnel: self.clone(),
404 tx: msg_tx,
405 hop,
406 stream_id,
407 relay_cell_format,
408 rate_limit_stream: rate_limit_rx,
409 };
410
411 // can be used to build a reader that supports XON/XOFF flow control
412 let xon_xoff_reader_ctrl = XonXoffReaderCtrl::new(drain_rate_request_rx, target.clone());
413
414 let stream_receiver = StreamReceiver {
415 target: target.clone(),
416 receiver,
417 recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
418 ended: false,
419 };
420
421 let components = StreamComponents {
422 stream_receiver,
423 target,
424 memquota,
425 xon_xoff_reader_ctrl,
426 };
427
428 Ok(components)
429 }
430
431 /// Install a [`CircuitPadder`] at the listed `hop`.
432 ///
433 /// Replaces any previous padder installed at that hop.
434 #[cfg(feature = "circ-padding-manual")]
435 pub async fn start_padding_at_hop(
436 self: &Arc<Self>,
437 hop: HopLocation,
438 padder: CircuitPadder,
439 ) -> Result<()> {
440 self.circ.set_padder_impl(hop, Some(padder)).await
441 }
442
443 /// Remove any [`CircuitPadder`] at the listed `hop`.
444 ///
445 /// Does nothing if there was not a padder installed there.
446 #[cfg(feature = "circ-padding-manual")]
447 pub async fn stop_padding_at_hop(self: &Arc<Self>, hop: HopLocation) -> Result<()> {
448 self.circ.set_padder_impl(hop, None).await
449 }
450
451 /// Start a DataStream (anonymized connection) to the given
452 /// address and port, using a BEGIN cell.
453 async fn begin_data_stream(
454 self: &Arc<Self>,
455 msg: AnyRelayMsg,
456 optimistic: bool,
457 ) -> Result<DataStream> {
458 let components = self
459 .begin_stream_impl(msg, OutboundDataCmdChecker::new_any())
460 .await?;
461
462 let StreamComponents {
463 stream_receiver,
464 target,
465 memquota,
466 xon_xoff_reader_ctrl,
467 } = components;
468
469 let mut stream = DataStream::new(
470 self.circ.time_provider.clone(),
471 stream_receiver,
472 xon_xoff_reader_ctrl,
473 target,
474 memquota,
475 );
476 if !optimistic {
477 stream.wait_for_connection().await?;
478 }
479 Ok(stream)
480 }
481
482 /// Single and multi path helper.
483 ///
484 /// Start a stream to the given address and port, using a BEGIN
485 /// cell.
486 ///
487 /// The use of a string for the address is intentional: you should let
488 /// the remote Tor relay do the hostname lookup for you.
489 #[instrument(level = "trace", skip_all)]
490 pub async fn begin_stream(
491 self: &Arc<Self>,
492 target: &str,
493 port: u16,
494 parameters: Option<StreamParameters>,
495 ) -> Result<DataStream> {
496 let parameters = parameters.unwrap_or_default();
497 let begin_flags = parameters.begin_flags();
498 let optimistic = parameters.is_optimistic();
499 let target = if parameters.suppressing_hostname() {
500 ""
501 } else {
502 target
503 };
504 let beginmsg = Begin::new(target, port, begin_flags)
505 .map_err(|e| Error::from_cell_enc(e, "begin message"))?;
506 self.begin_data_stream(beginmsg.into(), optimistic).await
507 }
508
509 /// Start a new stream to the last relay in the tunnel, using
510 /// a BEGIN_DIR cell.
511 pub async fn begin_dir_stream(self: Arc<Self>) -> Result<DataStream> {
512 // Note that we always open begindir connections optimistically.
513 // Since they are local to a relay that we've already authenticated
514 // with and built a tunnel to, there should be no additional checks
515 // we need to perform to see whether the BEGINDIR will succeed.
516 self.begin_data_stream(AnyRelayMsg::BeginDir(Default::default()), true)
517 .await
518 }
519
520 /// Perform a DNS lookup, using a RESOLVE cell with the last relay
521 /// in this tunnel.
522 ///
523 /// Note that this function does not check for timeouts; that's
524 /// the caller's responsibility.
525 pub async fn resolve(self: &Arc<Self>, hostname: &str) -> Result<Vec<IpAddr>> {
526 let resolve_msg = Resolve::new(hostname);
527
528 let resolved_msg = self.try_resolve(resolve_msg).await?;
529
530 resolved_msg
531 .into_answers()
532 .into_iter()
533 .filter_map(|(val, _)| match resolvedval_to_result(val) {
534 Ok(ResolvedVal::Ip(ip)) => Some(Ok(ip)),
535 Ok(_) => None,
536 Err(e) => Some(Err(e)),
537 })
538 .collect()
539 }
540
541 /// Perform a reverse DNS lookup, by sending a RESOLVE cell with
542 /// the last relay on this tunnel.
543 ///
544 /// Note that this function does not check for timeouts; that's
545 /// the caller's responsibility.
546 pub async fn resolve_ptr(self: &Arc<Self>, addr: IpAddr) -> Result<Vec<String>> {
547 let resolve_ptr_msg = Resolve::new_reverse(&addr);
548
549 let resolved_msg = self.try_resolve(resolve_ptr_msg).await?;
550
551 resolved_msg
552 .into_answers()
553 .into_iter()
554 .filter_map(|(val, _)| match resolvedval_to_result(val) {
555 Ok(ResolvedVal::Hostname(v)) => Some(
556 String::from_utf8(v)
557 .map_err(|_| Error::StreamProto("Resolved Hostname was not utf-8".into())),
558 ),
559 Ok(_) => None,
560 Err(e) => Some(Err(e)),
561 })
562 .collect()
563 }
564
565 /// Send an ad-hoc message to a given hop on the circuit, without expecting
566 /// a reply.
567 ///
568 /// (If you want to handle one or more possible replies, see
569 /// [`ClientTunnel::start_conversation`].)
570 // TODO(conflux): Change this to use the ReactorHandle for the control commands.
571 #[cfg(feature = "send-control-msg")]
572 pub async fn send_raw_msg(
573 &self,
574 msg: tor_cell::relaycell::msg::AnyRelayMsg,
575 hop: TargetHop,
576 ) -> Result<()> {
577 let (sender, receiver) = oneshot::channel();
578 let ctrl_msg = CtrlMsg::SendMsg { hop, msg, sender };
579 self.circ
580 .control
581 .unbounded_send(ctrl_msg)
582 .map_err(|_| Error::CircuitClosed)?;
583
584 receiver.await.map_err(|_| Error::CircuitClosed)?
585 }
586
587 /// Start an ad-hoc protocol exchange to the specified hop on this tunnel.
588 ///
589 /// To use this:
590 ///
591 /// 0. Create an inter-task channel you'll use to receive
592 /// the outcome of your conversation,
593 /// and bundle it into a [`UserMsgHandler`].
594 ///
595 /// 1. Call `start_conversation`.
596 /// This will install a your handler, for incoming messages,
597 /// and send the outgoing message (if you provided one).
598 /// After that, each message on the circuit
599 /// that isn't handled by the core machinery
600 /// is passed to your provided `reply_handler`.
601 ///
602 /// 2. Possibly call `send_msg` on the [`Conversation`],
603 /// from the call site of `start_conversation`,
604 /// possibly multiple times, from time to time,
605 /// to send further desired messages to the peer.
606 ///
607 /// 3. In your [`UserMsgHandler`], process the incoming messages.
608 /// You may respond by
609 /// sending additional messages
610 /// When the protocol exchange is finished,
611 /// `UserMsgHandler::handle_msg` should return
612 /// [`ConversationFinished`](reactor::MetaCellDisposition::ConversationFinished).
613 ///
614 /// If you don't need the `Conversation` to send followup messages,
615 /// you may simply drop it,
616 /// and rely on the responses you get from your handler,
617 /// on the channel from step 0 above.
618 /// Your handler will remain installed and able to process incoming messages
619 /// until it returns `ConversationFinished`.
620 ///
621 /// (If you don't want to accept any replies at all, it may be
622 /// simpler to use [`ClientTunnel::send_raw_msg`].)
623 ///
624 /// Note that it is quite possible to use this function to violate the tor
625 /// protocol; most users of this API will not need to call it. It is used
626 /// to implement most of the onion service handshake.
627 ///
628 /// # Limitations
629 ///
630 /// Only one conversation may be active at any one time,
631 /// for any one circuit.
632 /// This generally means that this function should not be called
633 /// on a tunnel which might be shared with anyone else.
634 ///
635 /// Likewise, it is forbidden to try to extend the tunnel,
636 /// while the conversation is in progress.
637 ///
638 /// After the conversation has finished, the tunnel may be extended.
639 /// Or, `start_conversation` may be called again;
640 /// but, in that case there will be a gap between the two conversations,
641 /// during which no `UserMsgHandler` is installed,
642 /// and unexpected incoming messages would close the tunnel.
643 ///
644 /// If these restrictions are violated, the tunnel will be closed with an error.
645 ///
646 /// ## Precise definition of the lifetime of a conversation
647 ///
648 /// A conversation is in progress from entry to `start_conversation`,
649 /// until entry to the body of the [`UserMsgHandler::handle_msg`](MsgHandler::handle_msg)
650 /// call which returns [`ConversationFinished`](reactor::MetaCellDisposition::ConversationFinished).
651 /// (*Entry* since `handle_msg` is synchronously embedded
652 /// into the incoming message processing.)
653 /// So you may start a new conversation as soon as you have the final response
654 /// via your inter-task channel from (0) above.
655 ///
656 /// The lifetime relationship of the [`Conversation`],
657 /// vs the handler returning `ConversationFinished`
658 /// is not enforced by the type system.
659 // Doing so without still leaving plenty of scope for runtime errors doesn't seem possible,
660 // at least while allowing sending followup messages from outside the handler.
661 #[cfg(feature = "send-control-msg")]
662 pub async fn start_conversation(
663 &self,
664 msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
665 reply_handler: impl MsgHandler + Send + 'static,
666 hop: TargetHop,
667 ) -> Result<Conversation<'_>> {
668 // We need to resolve the TargetHop into a precise HopLocation so our msg handler can match
669 // the right Leg/Hop with inbound cell.
670 let (sender, receiver) = oneshot::channel();
671 self.circ
672 .command
673 .unbounded_send(CtrlCmd::ResolveTargetHop { hop, done: sender })
674 .map_err(|_| Error::CircuitClosed)?;
675 let hop_location = receiver.await.map_err(|_| Error::CircuitClosed)??;
676 let handler = Box::new(UserMsgHandler::new(hop_location, reply_handler));
677 let conversation = Conversation(self);
678 conversation.send_internal(msg, Some(handler)).await?;
679 Ok(conversation)
680 }
681
682 /// Shut down this tunnel, along with all streams that are using it. Happens asynchronously
683 /// (i.e. the tunnel won't necessarily be done shutting down immediately after this function
684 /// returns!).
685 ///
686 /// Note that other references to this tunnel may exist. If they do, they will stop working
687 /// after you call this function.
688 ///
689 /// It's not necessary to call this method if you're just done with a tunnel: the tunnel should
690 /// close on its own once nothing is using it any more.
691 // TODO(conflux): This should use the ReactorHandle instead.
692 pub fn terminate(&self) {
693 let _ = self.circ.command.unbounded_send(CtrlCmd::Shutdown);
694 }
695
696 /// Helper: Send the resolve message, and read resolved message from
697 /// resolve stream.
698 async fn try_resolve(self: &Arc<Self>, msg: Resolve) -> Result<Resolved> {
699 let components = self
700 .begin_stream_impl(msg.into(), ResolveCmdChecker::new_any())
701 .await?;
702
703 let StreamComponents {
704 stream_receiver,
705 target: _,
706 memquota,
707 xon_xoff_reader_ctrl: _,
708 } = components;
709
710 let mut resolve_stream = ResolveStream::new(stream_receiver, memquota);
711 resolve_stream.read_msg().await
712 }
713
714 // TODO(conflux)
715}
716
717// TODO(conflux): We will likely need to enforce some invariants here, for example that the `circ`
718// has the expected (non-zero) number of hops.
719impl TryFrom<ClientCirc> for ClientTunnel {
720 type Error = Error;
721
722 fn try_from(circ: ClientCirc) -> std::result::Result<Self, Self::Error> {
723 Ok(Self { circ })
724 }
725}
726
727/// A collection of components that can be combined to implement a Tor stream,
728/// or anything that requires a stream ID.
729///
730/// Not all components may be needed, depending on the purpose of the "stream".
731/// For example we build `RELAY_RESOLVE` requests like we do data streams,
732/// but they won't use the `StreamTarget` as they don't need to send additional
733/// messages.
734#[derive(Debug)]
735pub(crate) struct StreamComponents {
736 /// A [`Stream`](futures::Stream) of incoming relay messages for this Tor stream.
737 pub(crate) stream_receiver: StreamReceiver,
738 /// A handle that can communicate messages to the circuit reactor for this stream.
739 pub(crate) target: StreamTarget,
740 /// The memquota [account](tor_memquota::Account) to use for data on this stream.
741 pub(crate) memquota: StreamAccount,
742 /// The control information needed to add XON/XOFF flow control to the stream.
743 pub(crate) xon_xoff_reader_ctrl: XonXoffReaderCtrl,
744}
745
746/// Convert a [`ResolvedVal`] into a Result, based on whether or not
747/// it represents an error.
748fn resolvedval_to_result(val: ResolvedVal) -> Result<ResolvedVal> {
749 match val {
750 ResolvedVal::TransientError => Err(Error::ResolveError(ResolveError::Transient)),
751 ResolvedVal::NontransientError => Err(Error::ResolveError(ResolveError::Nontransient)),
752 ResolvedVal::Unrecognized(_, _) => Err(Error::ResolveError(ResolveError::Unrecognized)),
753 _ => Ok(val),
754 }
755}
756
757/// A precise position in a tunnel.
758#[derive(Debug, Deftly, Copy, Clone, PartialEq, Eq)]
759#[derive_deftly(HasMemoryCost)]
760#[non_exhaustive]
761pub enum HopLocation {
762 /// A specific position in a tunnel.
763 Hop((UniqId, HopNum)),
764 /// The join point of a multi-path tunnel.
765 JoinPoint,
766}
767
768/// A position in a tunnel.
769#[derive(Debug, Copy, Clone, PartialEq, Eq)]
770#[non_exhaustive]
771pub enum TargetHop {
772 /// A specific position in a tunnel.
773 Hop(HopLocation),
774 /// The last hop of a tunnel.
775 ///
776 /// This should be used only when you don't care about what specific hop is used.
777 /// Some tunnels may be extended or truncated,
778 /// which means that the "last hop" may change at any time.
779 LastHop,
780}
781
782impl From<(UniqId, HopNum)> for HopLocation {
783 fn from(v: (UniqId, HopNum)) -> Self {
784 HopLocation::Hop(v)
785 }
786}
787
788impl From<(UniqId, HopNum)> for TargetHop {
789 fn from(v: (UniqId, HopNum)) -> Self {
790 TargetHop::Hop(v.into())
791 }
792}
793
794impl HopLocation {
795 /// Return the hop number if not a JointPoint.
796 pub fn hop_num(&self) -> Option<HopNum> {
797 match self {
798 Self::Hop((_, hop_num)) => Some(*hop_num),
799 Self::JoinPoint => None,
800 }
801 }
802}
803
804/// Internal handle, used to implement a stream on a particular tunnel.
805///
806/// The reader and the writer for a stream should hold a `StreamTarget` for the stream;
807/// the reader should additionally hold an `mpsc::Receiver` to get
808/// relay messages for the stream.
809///
810/// When all the `StreamTarget`s for a stream are dropped, the Reactor will
811/// close the stream by sending an END message to the other side.
812/// You can close a stream earlier by using [`StreamTarget::close`]
813/// or [`StreamTarget::close_pending`].
814#[derive(Clone, Debug)]
815pub(crate) struct StreamTarget {
816 /// Which hop of the circuit this stream is with.
817 hop: HopLocation,
818 /// Reactor ID for this stream.
819 stream_id: StreamId,
820 /// Encoding to use for relay cells sent on this stream.
821 ///
822 /// This is mostly irrelevant, except when deciding
823 /// how many bytes we can pack in a DATA message.
824 relay_cell_format: RelayCellFormat,
825 /// A [`Stream`](futures::Stream) that provides updates to the rate limit for sending data.
826 // TODO(arti#2068): we should consider making this an `Option`
827 rate_limit_stream: watch::Receiver<StreamRateLimit>,
828 /// Channel to send cells down.
829 tx: StreamMpscSender<AnyRelayMsg>,
830 /// Reference to the tunnel that this stream is on.
831 tunnel: Arc<ClientTunnel>,
832}
833
834impl StreamTarget {
835 /// Deliver a relay message for the stream that owns this StreamTarget.
836 ///
837 /// The StreamTarget will set the correct stream ID and pick the
838 /// right hop, but will not validate that the message is well-formed
839 /// or meaningful in context.
840 pub(crate) async fn send(&mut self, msg: AnyRelayMsg) -> Result<()> {
841 self.tx.send(msg).await.map_err(|_| Error::CircuitClosed)?;
842 Ok(())
843 }
844
845 /// Close the pending stream that owns this StreamTarget, delivering the specified
846 /// END message (if any)
847 ///
848 /// The stream is closed by sending a [`CtrlMsg::ClosePendingStream`] message to the reactor.
849 ///
850 /// Returns a [`oneshot::Receiver`] that can be used to await the reactor's response.
851 ///
852 /// The StreamTarget will set the correct stream ID and pick the
853 /// right hop, but will not validate that the message is well-formed
854 /// or meaningful in context.
855 ///
856 /// Note that in many cases, the actual contents of an END message can leak unwanted
857 /// information. Please consider carefully before sending anything but an
858 /// [`End::new_misc()`](tor_cell::relaycell::msg::End::new_misc) message over a `ClientTunnel`.
859 /// (For onion services, we send [`DONE`](tor_cell::relaycell::msg::EndReason::DONE) )
860 ///
861 /// In addition to sending the END message, this function also ensures
862 /// the state of the stream map entry of this stream is updated
863 /// accordingly.
864 ///
865 /// Normally, you shouldn't need to call this function, as streams are implicitly closed by the
866 /// reactor when their corresponding `StreamTarget` is dropped. The only valid use of this
867 /// function is for closing pending incoming streams (a stream is said to be pending if we have
868 /// received the message initiating the stream but have not responded to it yet).
869 ///
870 /// **NOTE**: This function should be called at most once per request.
871 /// Calling it twice is an error.
872 #[cfg(feature = "hs-service")]
873 pub(crate) fn close_pending(
874 &self,
875 message: crate::stream::CloseStreamBehavior,
876 ) -> Result<oneshot::Receiver<Result<()>>> {
877 let (tx, rx) = oneshot::channel();
878
879 self.tunnel
880 .circ
881 .control
882 .unbounded_send(CtrlMsg::ClosePendingStream {
883 stream_id: self.stream_id,
884 hop: self.hop,
885 message,
886 done: tx,
887 })
888 .map_err(|_| Error::CircuitClosed)?;
889
890 Ok(rx)
891 }
892
893 /// Queue a "close" for the stream corresponding to this StreamTarget.
894 ///
895 /// Unlike `close_pending`, this method does not allow the caller to provide an `END` message.
896 ///
897 /// Once this method has been called, no more messages may be sent with [`StreamTarget::send`],
898 /// on this `StreamTarget`` or any clone of it.
899 /// The reactor *will* try to flush any already-send messages before it closes the stream.
900 ///
901 /// You don't need to call this method if the stream is closing because all of its StreamTargets
902 /// have been dropped.
903 pub(crate) fn close(&mut self) {
904 Pin::new(&mut self.tx).close_channel();
905 }
906
907 /// Called when a circuit-level protocol error has occurred and the
908 /// tunnel needs to shut down.
909 pub(crate) fn protocol_error(&mut self) {
910 self.tunnel.terminate();
911 }
912
913 /// Request to send a SENDME cell for this stream.
914 ///
915 /// This sends a request to the circuit reactor to send a stream-level SENDME, but it does not
916 /// block or wait for a response from the circuit reactor.
917 /// An error is only returned if we are unable to send the request.
918 /// This means that if the circuit reactor is unable to send the SENDME, we are not notified of
919 /// this here and an error will not be returned.
920 pub(crate) fn send_sendme(&mut self) -> Result<()> {
921 self.tunnel
922 .circ
923 .control
924 .unbounded_send(CtrlMsg::FlowCtrlUpdate {
925 msg: FlowCtrlMsg::Sendme,
926 stream_id: self.stream_id,
927 hop: self.hop,
928 })
929 .map_err(|_| Error::CircuitClosed)
930 }
931
932 /// Inform the circuit reactor that there has been a change in the drain rate for this stream.
933 ///
934 /// Typically the circuit reactor would send this new rate in an XON message to the other end of
935 /// the stream.
936 /// But it may decide not to, and may discard this update.
937 /// For example the stream may have a large amount of buffered data, and the reactor may not
938 /// want to send an XON while the buffer is large.
939 ///
940 /// This sends a message to inform the circuit reactor of the new drain rate,
941 /// but it does not block or wait for a response from the reactor.
942 /// An error is only returned if we are unable to send the update.
943 pub(crate) fn drain_rate_update(&mut self, rate: XonKbpsEwma) -> Result<()> {
944 self.tunnel
945 .circ
946 .control
947 .unbounded_send(CtrlMsg::FlowCtrlUpdate {
948 msg: FlowCtrlMsg::Xon(rate),
949 stream_id: self.stream_id,
950 hop: self.hop,
951 })
952 .map_err(|_| Error::CircuitClosed)
953 }
954
955 /// Return a reference to the tunnel that this `StreamTarget` is using.
956 #[cfg(any(feature = "experimental-api", feature = "stream-ctrl"))]
957 pub(crate) fn tunnel(&self) -> &Arc<ClientTunnel> {
958 &self.tunnel
959 }
960
961 /// Return the kind of relay cell in use on this `StreamTarget`.
962 pub(crate) fn relay_cell_format(&self) -> RelayCellFormat {
963 self.relay_cell_format
964 }
965
966 /// A [`Stream`](futures::Stream) that provides updates to the rate limit for sending data.
967 pub(crate) fn rate_limit_stream(&self) -> &watch::Receiver<StreamRateLimit> {
968 &self.rate_limit_stream
969 }
970}