tor_proto/tunnel/
circuit.rs

1//! Multi-hop paths over the Tor network.
2//!
3//! Right now, we only implement "client circuits" -- also sometimes
4//! called "origin circuits".  A client circuit is one that is
5//! constructed by this Tor instance, and used in its own behalf to
6//! send data over the Tor network.
7//!
8//! Each circuit has multiple hops over the Tor network: each hop
9//! knows only the hop before and the hop after.  The client shares a
10//! separate set of keys with each hop.
11//!
12//! To build a circuit, first create a [crate::channel::Channel], then
13//! call its [crate::channel::Channel::new_circ] method.  This yields
14//! a [PendingClientCirc] object that won't become live until you call
15//! one of the methods that extends it to its first hop.  After you've
16//! done that, you can call [ClientCirc::extend_ntor] on the circuit to
17//! build it into a multi-hop circuit.  Finally, you can use
18//! [ClientCirc::begin_stream] to get a Stream object that can be used
19//! for anonymized data.
20//!
21//! # Implementation
22//!
23//! Each open circuit has a corresponding Reactor object that runs in
24//! an asynchronous task, and manages incoming cells from the
25//! circuit's upstream channel.  These cells are either RELAY cells or
26//! DESTROY cells.  DESTROY cells are handled immediately.
27//! RELAY cells are either for a particular stream, in which case they
28//! get forwarded to a RawCellStream object, or for no particular stream,
29//! in which case they are considered "meta" cells (like EXTENDED2)
30//! that should only get accepted if something is waiting for them.
31//!
32//! # Limitations
33//!
34//! This is client-only.
35
36pub(crate) mod celltypes;
37pub(crate) mod halfcirc;
38
39#[cfg(feature = "hs-common")]
40pub mod handshake;
41#[cfg(not(feature = "hs-common"))]
42pub(crate) mod handshake;
43
44pub(super) mod path;
45pub(crate) mod unique_id;
46
47use crate::channel::Channel;
48use crate::congestion::params::CongestionControlParams;
49use crate::crypto::cell::HopNum;
50#[cfg(feature = "ntor_v3")]
51use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
52use crate::memquota::{CircuitAccount, SpecificAccount as _};
53use crate::stream::{
54    AnyCmdChecker, DataCmdChecker, DataStream, ResolveCmdChecker, ResolveStream, StreamParameters,
55    StreamReader,
56};
57use crate::tunnel::circuit::celltypes::*;
58use crate::tunnel::reactor::CtrlCmd;
59use crate::tunnel::reactor::{
60    CircuitHandshake, CtrlMsg, Reactor, RECV_WINDOW_INIT, STREAM_READER_BUFFER,
61};
62use crate::tunnel::StreamTarget;
63use crate::util::skew::ClockSkew;
64use crate::{Error, ResolveError, Result};
65use educe::Educe;
66use tor_cell::{
67    chancell::CircId,
68    relaycell::msg::{AnyRelayMsg, Begin, Resolve, Resolved, ResolvedVal},
69};
70
71use tor_error::{internal, into_internal};
72use tor_linkspec::{CircTarget, LinkSpecType, OwnedChanTarget, RelayIdType};
73
74pub use crate::crypto::binding::CircuitBinding;
75pub use crate::memquota::StreamAccount;
76pub use crate::tunnel::circuit::unique_id::UniqId;
77
78#[cfg(feature = "hs-service")]
79use {
80    crate::stream::{IncomingCmdChecker, IncomingStream},
81    crate::tunnel::reactor::StreamReqInfo,
82};
83
84use futures::channel::mpsc;
85use oneshot_fused_workaround as oneshot;
86
87use crate::congestion::sendme::StreamRecvWindow;
88use crate::DynTimeProvider;
89use futures::FutureExt as _;
90use std::net::IpAddr;
91use std::sync::{Arc, Mutex};
92use tor_memquota::mq_queue::{self, ChannelSpec as _, MpscSpec};
93
94use crate::crypto::handshake::ntor::NtorPublicKey;
95
96pub use path::{Path, PathEntry};
97
98/// The size of the buffer for communication between `ClientCirc` and its reactor.
99pub const CIRCUIT_BUFFER_SIZE: usize = 128;
100
101#[cfg(feature = "send-control-msg")]
102use {crate::tunnel::msghandler::UserMsgHandler, crate::tunnel::reactor::MetaCellHandler};
103
104pub use crate::tunnel::reactor::syncview::ClientCircSyncView;
105#[cfg(feature = "send-control-msg")]
106#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
107pub use {crate::tunnel::msghandler::MsgHandler, crate::tunnel::reactor::MetaCellDisposition};
108
109/// MPSC queue relating to a stream (either inbound or outbound), sender
110pub(crate) type StreamMpscSender<T> = mq_queue::Sender<T, MpscSpec>;
111/// MPSC queue relating to a stream (either inbound or outbound), receiver
112pub(crate) type StreamMpscReceiver<T> = mq_queue::Receiver<T, MpscSpec>;
113
114/// MPSC queue for inbound data on its way from channel to circuit, sender
115pub(crate) type CircuitRxSender = mq_queue::Sender<ClientCircChanMsg, MpscSpec>;
116/// MPSC queue for inbound data on its way from channel to circuit, receiver
117pub(crate) type CircuitRxReceiver = mq_queue::Receiver<ClientCircChanMsg, MpscSpec>;
118
119#[derive(Debug)]
120/// A circuit that we have constructed over the Tor network.
121///
122/// # Circuit life cycle
123///
124/// `ClientCirc`s are created in an initially unusable state using [`Channel::new_circ`],
125/// which returns a [`PendingClientCirc`].  To get a real (one-hop) circuit from
126/// one of these, you invoke one of its `create_firsthop` methods (currently
127/// [`create_firsthop_fast()`](PendingClientCirc::create_firsthop_fast) or
128/// [`create_firsthop_ntor()`](PendingClientCirc::create_firsthop_ntor)).
129/// Then, to add more hops to the circuit, you can call
130/// [`extend_ntor()`](ClientCirc::extend_ntor) on it.
131///
132/// For higher-level APIs, see the `tor-circmgr` crate: the ones here in
133/// `tor-proto` are probably not what you need.
134///
135/// After a circuit is created, it will persist until it is closed in one of
136/// five ways:
137///    1. A remote error occurs.
138///    2. Some hop on the circuit sends a `DESTROY` message to tear down the
139///       circuit.
140///    3. The circuit's channel is closed.
141///    4. Someone calls [`ClientCirc::terminate`] on the circuit.
142///    5. The last reference to the `ClientCirc` is dropped. (Note that every stream
143///       on a `ClientCirc` keeps a reference to it, which will in turn keep the
144///       circuit from closing until all those streams have gone away.)
145///
146/// Note that in cases 1-4 the [`ClientCirc`] object itself will still exist: it
147/// will just be unusable for most purposes.  Most operations on it will fail
148/// with an error.
149//
150// Effectively, this struct contains two Arcs: one for `path` and one for
151// `control` (which surely has something Arc-like in it).  We cannot unify
152// these by putting a single Arc around the whole struct, and passing
153// an Arc strong reference to the `Reactor`, because then `control` would
154// not be dropped when the last user of the circuit goes away.  We could
155// make the reactor have a weak reference but weak references are more
156// expensive to dereference.
157//
158// Because of the above, cloning this struct is always going to involve
159// two atomic refcount changes/checks.  Wrapping it in another Arc would
160// be overkill.
161//
162pub struct ClientCirc {
163    /// Mutable state shared with the `Reactor`.
164    mutable: Arc<Mutex<MutableState>>,
165    /// A unique identifier for this circuit.
166    unique_id: UniqId,
167    /// Channel to send control messages to the reactor.
168    pub(super) control: mpsc::UnboundedSender<CtrlMsg>,
169    /// Channel to send commands to the reactor.
170    pub(super) command: mpsc::UnboundedSender<CtrlCmd>,
171    /// A future that resolves to Cancelled once the reactor is shut down,
172    /// meaning that the circuit is closed.
173    #[cfg_attr(not(feature = "experimental-api"), allow(dead_code))]
174    reactor_closed_rx: futures::future::Shared<oneshot::Receiver<void::Void>>,
175    /// For testing purposes: the CircId, for use in peek_circid().
176    #[cfg(test)]
177    circid: CircId,
178    /// Memory quota account
179    memquota: CircuitAccount,
180    /// Time provider
181    time_provider: DynTimeProvider,
182}
183
184/// Mutable state shared by [`ClientCirc`] and [`Reactor`].
185#[derive(Educe, Default)]
186#[educe(Debug)]
187pub(super) struct MutableState {
188    /// Information about this circuit's path.
189    ///
190    /// This is stored in an Arc so that we can cheaply give a copy of it to
191    /// client code; when we need to add a hop (which is less frequent) we use
192    /// [`Arc::make_mut()`].
193    pub(super) path: Arc<path::Path>,
194
195    /// Circuit binding keys [q.v.][`CircuitBinding`] information for each hop
196    /// in the circuit's path.
197    ///
198    /// NOTE: Right now, there is a `CircuitBinding` for every hop.  There's a
199    /// fair chance that this will change in the future, and I don't want other
200    /// code to assume that a `CircuitBinding` _must_ exist, so I'm making this
201    /// an `Option`.
202    #[educe(Debug(ignore))]
203    pub(super) binding: Vec<Option<CircuitBinding>>,
204}
205
206/// A ClientCirc that needs to send a create cell and receive a created* cell.
207///
208/// To use one of these, call create_firsthop_fast() or create_firsthop_ntor()
209/// to negotiate the cryptographic handshake with the first hop.
210pub struct PendingClientCirc {
211    /// A oneshot receiver on which we'll receive a CREATED* cell,
212    /// or a DESTROY cell.
213    recvcreated: oneshot::Receiver<CreateResponse>,
214    /// The ClientCirc object that we can expose on success.
215    circ: Arc<ClientCirc>,
216}
217
218/// Description of the network's current rules for building circuits.
219#[non_exhaustive]
220#[derive(Clone, Debug)]
221pub struct CircParameters {
222    /// Whether we should include ed25519 identities when we send
223    /// EXTEND2 cells.
224    pub extend_by_ed25519_id: bool,
225    /// Congestion control parameters for this circuit.
226    pub ccontrol: CongestionControlParams,
227}
228
229#[cfg(test)]
230impl std::default::Default for CircParameters {
231    fn default() -> Self {
232        Self {
233            extend_by_ed25519_id: true,
234            ccontrol: crate::congestion::test_utils::params::build_cc_fixed_params(),
235        }
236    }
237}
238
239impl CircParameters {
240    /// Constructor
241    pub fn new(extend_by_ed25519_id: bool, ccontrol: CongestionControlParams) -> Self {
242        Self {
243            extend_by_ed25519_id,
244            ccontrol,
245        }
246    }
247}
248
249impl ClientCirc {
250    /// Return a description of the first hop of this circuit.
251    ///
252    /// # Panics
253    ///
254    /// Panics if there is no first hop.  (This should be impossible outside of
255    /// the tor-proto crate, but within the crate it's possible to have a
256    /// circuit with no hops.)
257    pub fn first_hop(&self) -> OwnedChanTarget {
258        let first_hop = self
259            .mutable
260            .lock()
261            .expect("poisoned lock")
262            .path
263            .first_hop()
264            .expect("called first_hop on an un-constructed circuit");
265        match first_hop {
266            path::HopDetail::Relay(r) => r,
267            #[cfg(feature = "hs-common")]
268            path::HopDetail::Virtual => {
269                panic!("somehow made a circuit with a virtual first hop.")
270            }
271        }
272    }
273
274    /// Return the [`HopNum`] of the last hop of this circuit.
275    ///
276    /// Returns an error if there is no last hop.  (This should be impossible outside of the
277    /// tor-proto crate, but within the crate it's possible to have a circuit with no hops.)
278    pub fn last_hop_num(&self) -> Result<HopNum> {
279        Ok(self
280            .mutable
281            .lock()
282            .expect("poisoned lock")
283            .path
284            .last_hop_num()
285            .ok_or_else(|| internal!("no last hop index"))?)
286    }
287
288    /// Return a description of all the hops in this circuit.
289    ///
290    /// This method is **deprecated** for several reasons:
291    ///   * It performs a deep copy.
292    ///   * It ignores virtual hops.
293    ///   * It's not so extensible.
294    ///
295    /// Use [`ClientCirc::path_ref()`] instead.
296    #[deprecated(since = "0.11.1", note = "Use path_ref() instead.")]
297    pub fn path(&self) -> Vec<OwnedChanTarget> {
298        #[allow(clippy::unnecessary_filter_map)] // clippy is blind to the cfg
299        self.mutable
300            .lock()
301            .expect("poisoned lock")
302            .path
303            .all_hops()
304            .into_iter()
305            .filter_map(|hop| match hop {
306                path::HopDetail::Relay(r) => Some(r),
307                #[cfg(feature = "hs-common")]
308                path::HopDetail::Virtual => None,
309            })
310            .collect()
311    }
312
313    /// Return a [`Path`] object describing all the hops in this circuit.
314    ///
315    /// Note that this `Path` is not automatically updated if the circuit is
316    /// extended.
317    pub fn path_ref(&self) -> Arc<Path> {
318        self.mutable.lock().expect("poisoned_lock").path.clone()
319    }
320
321    /// Get the clock skew claimed by the first hop of the circuit.
322    ///
323    /// See [`Channel::clock_skew()`].
324    pub async fn first_hop_clock_skew(&self) -> Result<ClockSkew> {
325        let (tx, rx) = oneshot::channel();
326
327        self.control
328            .unbounded_send(CtrlMsg::FirstHopClockSkew { answer: tx })
329            .map_err(|_| Error::CircuitClosed)?;
330
331        Ok(rx.await.map_err(|_| Error::CircuitClosed)??)
332    }
333
334    /// Return a reference to this circuit's memory quota account
335    pub fn mq_account(&self) -> &CircuitAccount {
336        &self.memquota
337    }
338
339    /// Return the cryptographic material used to prove knowledge of a shared
340    /// secret with with `hop`.
341    ///
342    /// See [`CircuitBinding`] for more information on how this is used.
343    ///
344    /// Return None if we have no circuit binding information for the hop, or if
345    /// the hop does not exist.
346    pub fn binding_key(&self, hop: HopNum) -> Option<CircuitBinding> {
347        self.mutable
348            .lock()
349            .expect("poisoned lock")
350            .binding
351            .get::<usize>(hop.into())
352            .cloned()
353            .flatten()
354        // NOTE: I'm not thrilled to have to copy this information, but we use
355        // it very rarely, so it's not _that_ bad IMO.
356    }
357
358    /// Start an ad-hoc protocol exchange to the specified hop on this circuit
359    ///
360    /// To use this:
361    ///
362    ///  0. Create an inter-task channel you'll use to receive
363    ///     the outcome of your conversation,
364    ///     and bundle it into a [`MsgHandler`].
365    ///
366    ///  1. Call `start_conversation`.
367    ///     This will install a your handler, for incoming messages,
368    ///     and send the outgoing message (if you provided one).
369    ///     After that, each message on the circuit
370    ///     that isn't handled by the core machinery
371    ///     is passed to your provided `reply_handler`.
372    ///
373    ///  2. Possibly call `send_msg` on the [`Conversation`],
374    ///     from the call site of `start_conversation`,
375    ///     possibly multiple times, from time to time,
376    ///     to send further desired messages to the peer.
377    ///
378    ///  3. In your [`MsgHandler`], process the incoming messages.
379    ///     You may respond by
380    ///     sending additional messages
381    ///     When the protocol exchange is finished,
382    ///     `MsgHandler::handle_msg` should return
383    ///     [`ConversationFinished`](MetaCellDisposition::ConversationFinished).
384    ///
385    /// If you don't need the `Conversation` to send followup messages,
386    /// you may simply drop it,
387    /// and rely on the responses you get from your handler,
388    /// on the channel from step 0 above.
389    /// Your handler will remain installed and able to process incoming messages
390    /// until it returns `ConversationFinished`.
391    ///
392    /// (If you don't want to accept any replies at all, it may be
393    /// simpler to use [`ClientCirc::send_raw_msg`].)
394    ///
395    /// Note that it is quite possible to use this function to violate the tor
396    /// protocol; most users of this API will not need to call it.  It is used
397    /// to implement most of the onion service handshake.
398    ///
399    /// # Limitations
400    ///
401    /// Only one conversation may be active at any one time,
402    /// for any one circuit.
403    /// This generally means that this function should not be called
404    /// on a circuit which might be shared with anyone else.
405    ///
406    /// Likewise, it is forbidden to try to extend the circuit,
407    /// while the conversation is in progress.
408    ///
409    /// After the conversation has finished, the circuit may be extended.
410    /// Or, `start_conversation` may be called again;
411    /// but, in that case there will be a gap between the two conversations,
412    /// during which no `MsgHandler` is installed,
413    /// and unexpected incoming messages would close the circuit.
414    ///
415    /// If these restrictions are violated, the circuit will be closed with an error.
416    ///
417    /// ## Precise definition of the lifetime of a conversation
418    ///
419    /// A conversation is in progress from entry to `start_conversation`,
420    /// until entry to the body of the [`MsgHandler::handle_msg`]
421    /// call which returns [`ConversationFinished`](MetaCellDisposition::ConversationFinished).
422    /// (*Entry* since `handle_msg` is synchronously embedded
423    /// into the incoming message processing.)
424    /// So you may start a new conversation as soon as you have the final response
425    /// via your inter-task channel from (0) above.
426    ///
427    /// The lifetime relationship of the [`Conversation`],
428    /// vs the handler returning `ConversationFinished`
429    /// is not enforced by the type system.
430    // Doing so without still leaving plenty of scope for runtime errors doesn't seem possible,
431    // at least while allowing sending followup messages from outside the handler.
432    //
433    // TODO hs: it might be nice to avoid exposing tor-cell APIs in the
434    //   tor-proto interface.
435    #[cfg(feature = "send-control-msg")]
436    pub async fn start_conversation(
437        &self,
438        msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
439        reply_handler: impl MsgHandler + Send + 'static,
440        hop_num: HopNum,
441    ) -> Result<Conversation<'_>> {
442        let handler = Box::new(UserMsgHandler::new(hop_num, reply_handler));
443        let conversation = Conversation(self);
444        conversation.send_internal(msg, Some(handler)).await?;
445        Ok(conversation)
446    }
447
448    /// Start an ad-hoc protocol exchange to the final hop on this circuit
449    ///
450    /// See the [`ClientCirc::start_conversation`] docs for more information.
451    #[cfg(feature = "send-control-msg")]
452    #[deprecated(since = "0.13.0", note = "Use start_conversation instead.")]
453    pub async fn start_conversation_last_hop(
454        &self,
455        msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
456        reply_handler: impl MsgHandler + Send + 'static,
457    ) -> Result<Conversation<'_>> {
458        let last_hop = self
459            .mutable
460            .lock()
461            .expect("poisoned lock")
462            .path
463            .last_hop_num()
464            .ok_or_else(|| internal!("no last hop index"))?;
465
466        self.start_conversation(msg, reply_handler, last_hop).await
467    }
468
469    /// Send an ad-hoc message to a given hop on the circuit, without expecting
470    /// a reply.
471    ///
472    /// (If you want to handle one or more possible replies, see
473    /// [`ClientCirc::start_conversation`].)
474    #[cfg(feature = "send-control-msg")]
475    pub async fn send_raw_msg(
476        &self,
477        msg: tor_cell::relaycell::msg::AnyRelayMsg,
478        hop_num: HopNum,
479    ) -> Result<()> {
480        let (sender, receiver) = oneshot::channel();
481        let ctrl_msg = CtrlMsg::SendMsg {
482            hop_num,
483            msg,
484            sender,
485        };
486        self.control
487            .unbounded_send(ctrl_msg)
488            .map_err(|_| Error::CircuitClosed)?;
489
490        receiver.await.map_err(|_| Error::CircuitClosed)?
491    }
492
493    /// Tell this circuit to begin allowing the final hop of the circuit to try
494    /// to create new Tor streams, and to return those pending requests in an
495    /// asynchronous stream.
496    ///
497    /// Ordinarily, these requests are rejected.
498    ///
499    /// There can only be one [`Stream`](futures::Stream) of this type created on a given circuit.
500    /// If a such a [`Stream`](futures::Stream) already exists, this method will return
501    /// an error.
502    ///
503    /// After this method has been called on a circuit, the circuit is expected
504    /// to receive requests of this type indefinitely, until it is finally closed.
505    /// If the `Stream` is dropped, the next request on this circuit will cause it to close.
506    ///
507    /// Only onion services (and eventually) exit relays should call this
508    /// method.
509    //
510    // TODO: Someday, we might want to allow a stream request handler to be
511    // un-registered.  However, nothing in the Tor protocol requires it.
512    #[cfg(feature = "hs-service")]
513    pub async fn allow_stream_requests(
514        self: &Arc<ClientCirc>,
515        allow_commands: &[tor_cell::relaycell::RelayCmd],
516        hop_num: HopNum,
517        filter: impl crate::stream::IncomingStreamRequestFilter,
518    ) -> Result<impl futures::Stream<Item = IncomingStream>> {
519        use futures::stream::StreamExt;
520
521        /// The size of the channel receiving IncomingStreamRequestContexts.
522        const INCOMING_BUFFER: usize = STREAM_READER_BUFFER;
523
524        let time_prov = self.time_provider.clone();
525        let cmd_checker = IncomingCmdChecker::new_any(allow_commands);
526        let (incoming_sender, incoming_receiver) =
527            MpscSpec::new(INCOMING_BUFFER).new_mq(time_prov, self.memquota.as_raw_account())?;
528        let (tx, rx) = oneshot::channel();
529
530        self.command
531            .unbounded_send(CtrlCmd::AwaitStreamRequest {
532                cmd_checker,
533                incoming_sender,
534                hop_num,
535                done: tx,
536                filter: Box::new(filter),
537            })
538            .map_err(|_| Error::CircuitClosed)?;
539
540        // Check whether the AwaitStreamRequest was processed successfully.
541        rx.await.map_err(|_| Error::CircuitClosed)??;
542
543        let allowed_hop_num = hop_num;
544
545        let circ = Arc::clone(self);
546        Ok(incoming_receiver.map(move |req_ctx| {
547            let StreamReqInfo {
548                req,
549                stream_id,
550                hop_num,
551                receiver,
552                msg_tx,
553                memquota,
554            } = req_ctx;
555
556            // We already enforce this in handle_incoming_stream_request; this
557            // assertion is just here to make sure that we don't ever
558            // accidentally remove or fail to enforce that check, since it is
559            // security-critical.
560            assert_eq!(allowed_hop_num, hop_num);
561
562            let target = StreamTarget {
563                circ: Arc::clone(&circ),
564                tx: msg_tx,
565                hop_num,
566                stream_id,
567            };
568
569            let reader = StreamReader {
570                target: target.clone(),
571                receiver,
572                recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
573                ended: false,
574            };
575
576            IncomingStream::new(req, target, reader, memquota)
577        }))
578    }
579
580    /// Extend the circuit via the ntor handshake to a new target last
581    /// hop.
582    pub async fn extend_ntor<Tg>(&self, target: &Tg, params: &CircParameters) -> Result<()>
583    where
584        Tg: CircTarget,
585    {
586        let key = NtorPublicKey {
587            id: *target
588                .rsa_identity()
589                .ok_or(Error::MissingId(RelayIdType::Rsa))?,
590            pk: *target.ntor_onion_key(),
591        };
592        let mut linkspecs = target
593            .linkspecs()
594            .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
595        if !params.extend_by_ed25519_id {
596            linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
597        }
598
599        let (tx, rx) = oneshot::channel();
600
601        let peer_id = OwnedChanTarget::from_chan_target(target);
602        self.control
603            .unbounded_send(CtrlMsg::ExtendNtor {
604                peer_id,
605                public_key: key,
606                linkspecs,
607                params: params.clone(),
608                done: tx,
609            })
610            .map_err(|_| Error::CircuitClosed)?;
611
612        rx.await.map_err(|_| Error::CircuitClosed)??;
613
614        Ok(())
615    }
616
617    /// Extend the circuit via the ntor handshake to a new target last
618    /// hop.
619    #[cfg(feature = "ntor_v3")]
620    pub async fn extend_ntor_v3<Tg>(&self, target: &Tg, params: &CircParameters) -> Result<()>
621    where
622        Tg: CircTarget,
623    {
624        let key = NtorV3PublicKey {
625            id: *target
626                .ed_identity()
627                .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
628            pk: *target.ntor_onion_key(),
629        };
630        let mut linkspecs = target
631            .linkspecs()
632            .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
633        if !params.extend_by_ed25519_id {
634            linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
635        }
636
637        let (tx, rx) = oneshot::channel();
638
639        let peer_id = OwnedChanTarget::from_chan_target(target);
640        self.control
641            .unbounded_send(CtrlMsg::ExtendNtorV3 {
642                peer_id,
643                public_key: key,
644                linkspecs,
645                params: params.clone(),
646                done: tx,
647            })
648            .map_err(|_| Error::CircuitClosed)?;
649
650        rx.await.map_err(|_| Error::CircuitClosed)??;
651
652        Ok(())
653    }
654
655    /// Extend this circuit by a single, "virtual" hop.
656    ///
657    /// A virtual hop is one for which we do not add an actual network connection
658    /// between separate hosts (such as Relays).  We only add a layer of
659    /// cryptography.
660    ///
661    /// This is used to implement onion services: the client and the service
662    /// both build a circuit to a single rendezvous point, and tell the
663    /// rendezvous point to relay traffic between their two circuits.  Having
664    /// completed a [`handshake`] out of band[^1], the parties each extend their
665    /// circuits by a single "virtual" encryption hop that represents their
666    /// shared cryptographic context.
667    ///
668    /// Once a circuit has been extended in this way, it is an error to try to
669    /// extend it in any other way.
670    ///
671    /// [^1]: Technically, the handshake is only _mostly_ out of band: the
672    ///     client sends their half of the handshake in an ` message, and the
673    ///     service's response is inline in its `RENDEZVOUS2` message.
674    //
675    // TODO hs: let's try to enforce the "you can't extend a circuit again once
676    // it has been extended this way" property.  We could do that with internal
677    // state, or some kind of a type state pattern.
678    //
679    // TODO hs: possibly we should take a set of Protovers, and not just `Params`.
680    #[cfg(feature = "hs-common")]
681    pub async fn extend_virtual(
682        &self,
683        protocol: handshake::RelayProtocol,
684        role: handshake::HandshakeRole,
685        seed: impl handshake::KeyGenerator,
686        params: CircParameters,
687    ) -> Result<()> {
688        use self::handshake::BoxedClientLayer;
689
690        let protocol = handshake::RelayCryptLayerProtocol::from(protocol);
691        let relay_cell_format = protocol.relay_cell_format();
692
693        let BoxedClientLayer { fwd, back, binding } = protocol.construct_layers(role, seed)?;
694
695        let (tx, rx) = oneshot::channel();
696        let message = CtrlCmd::ExtendVirtual {
697            relay_cell_format,
698            cell_crypto: (fwd, back, binding),
699            params,
700            done: tx,
701        };
702
703        self.command
704            .unbounded_send(message)
705            .map_err(|_| Error::CircuitClosed)?;
706
707        rx.await.map_err(|_| Error::CircuitClosed)?
708    }
709
710    /// Helper, used to begin a stream.
711    ///
712    /// This function allocates a stream ID, and sends the message
713    /// (like a BEGIN or RESOLVE), but doesn't wait for a response.
714    ///
715    /// The caller will typically want to see the first cell in response,
716    /// to see whether it is e.g. an END or a CONNECTED.
717    async fn begin_stream_impl(
718        self: &Arc<ClientCirc>,
719        begin_msg: AnyRelayMsg,
720        cmd_checker: AnyCmdChecker,
721    ) -> Result<(StreamReader, StreamTarget, StreamAccount)> {
722        // TODO: Possibly this should take a hop, rather than just
723        // assuming it's the last hop.
724
725        let time_prov = self.time_provider.clone();
726
727        let hop_num = self
728            .mutable
729            .lock()
730            .expect("poisoned lock")
731            .path
732            .last_hop_num()
733            .ok_or_else(|| Error::from(internal!("Can't begin a stream at the 0th hop")))?;
734
735        let memquota = StreamAccount::new(self.mq_account())?;
736        let (sender, receiver) = MpscSpec::new(STREAM_READER_BUFFER)
737            .new_mq(time_prov.clone(), memquota.as_raw_account())?;
738        let (tx, rx) = oneshot::channel();
739        let (msg_tx, msg_rx) =
740            MpscSpec::new(CIRCUIT_BUFFER_SIZE).new_mq(time_prov, memquota.as_raw_account())?;
741
742        self.control
743            .unbounded_send(CtrlMsg::BeginStream {
744                hop_num,
745                message: begin_msg,
746                sender,
747                rx: msg_rx,
748                done: tx,
749                cmd_checker,
750            })
751            .map_err(|_| Error::CircuitClosed)?;
752
753        let stream_id = rx.await.map_err(|_| Error::CircuitClosed)??;
754
755        let target = StreamTarget {
756            circ: self.clone(),
757            tx: msg_tx,
758            hop_num,
759            stream_id,
760        };
761
762        let reader = StreamReader {
763            target: target.clone(),
764            receiver,
765            recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
766            ended: false,
767        };
768
769        Ok((reader, target, memquota))
770    }
771
772    /// Start a DataStream (anonymized connection) to the given
773    /// address and port, using a BEGIN cell.
774    async fn begin_data_stream(
775        self: &Arc<ClientCirc>,
776        msg: AnyRelayMsg,
777        optimistic: bool,
778    ) -> Result<DataStream> {
779        let (reader, target, memquota) = self
780            .begin_stream_impl(msg, DataCmdChecker::new_any())
781            .await?;
782        let mut stream = DataStream::new(reader, target, memquota);
783        if !optimistic {
784            stream.wait_for_connection().await?;
785        }
786        Ok(stream)
787    }
788
789    /// Start a stream to the given address and port, using a BEGIN
790    /// cell.
791    ///
792    /// The use of a string for the address is intentional: you should let
793    /// the remote Tor relay do the hostname lookup for you.
794    pub async fn begin_stream(
795        self: &Arc<ClientCirc>,
796        target: &str,
797        port: u16,
798        parameters: Option<StreamParameters>,
799    ) -> Result<DataStream> {
800        let parameters = parameters.unwrap_or_default();
801        let begin_flags = parameters.begin_flags();
802        let optimistic = parameters.is_optimistic();
803        let target = if parameters.suppressing_hostname() {
804            ""
805        } else {
806            target
807        };
808        let beginmsg = Begin::new(target, port, begin_flags)
809            .map_err(|e| Error::from_cell_enc(e, "begin message"))?;
810        self.begin_data_stream(beginmsg.into(), optimistic).await
811    }
812
813    /// Start a new stream to the last relay in the circuit, using
814    /// a BEGIN_DIR cell.
815    pub async fn begin_dir_stream(self: Arc<ClientCirc>) -> Result<DataStream> {
816        // Note that we always open begindir connections optimistically.
817        // Since they are local to a relay that we've already authenticated
818        // with and built a circuit to, there should be no additional checks
819        // we need to perform to see whether the BEGINDIR will succeed.
820        self.begin_data_stream(AnyRelayMsg::BeginDir(Default::default()), true)
821            .await
822    }
823
824    /// Perform a DNS lookup, using a RESOLVE cell with the last relay
825    /// in this circuit.
826    ///
827    /// Note that this function does not check for timeouts; that's
828    /// the caller's responsibility.
829    pub async fn resolve(self: &Arc<ClientCirc>, hostname: &str) -> Result<Vec<IpAddr>> {
830        let resolve_msg = Resolve::new(hostname);
831
832        let resolved_msg = self.try_resolve(resolve_msg).await?;
833
834        resolved_msg
835            .into_answers()
836            .into_iter()
837            .filter_map(|(val, _)| match resolvedval_to_result(val) {
838                Ok(ResolvedVal::Ip(ip)) => Some(Ok(ip)),
839                Ok(_) => None,
840                Err(e) => Some(Err(e)),
841            })
842            .collect()
843    }
844
845    /// Perform a reverse DNS lookup, by sending a RESOLVE cell with
846    /// the last relay on this circuit.
847    ///
848    /// Note that this function does not check for timeouts; that's
849    /// the caller's responsibility.
850    pub async fn resolve_ptr(self: &Arc<ClientCirc>, addr: IpAddr) -> Result<Vec<String>> {
851        let resolve_ptr_msg = Resolve::new_reverse(&addr);
852
853        let resolved_msg = self.try_resolve(resolve_ptr_msg).await?;
854
855        resolved_msg
856            .into_answers()
857            .into_iter()
858            .filter_map(|(val, _)| match resolvedval_to_result(val) {
859                Ok(ResolvedVal::Hostname(v)) => Some(
860                    String::from_utf8(v)
861                        .map_err(|_| Error::StreamProto("Resolved Hostname was not utf-8".into())),
862                ),
863                Ok(_) => None,
864                Err(e) => Some(Err(e)),
865            })
866            .collect()
867    }
868
869    /// Helper: Send the resolve message, and read resolved message from
870    /// resolve stream.
871    async fn try_resolve(self: &Arc<ClientCirc>, msg: Resolve) -> Result<Resolved> {
872        let (reader, _target, memquota) = self
873            .begin_stream_impl(msg.into(), ResolveCmdChecker::new_any())
874            .await?;
875        let mut resolve_stream = ResolveStream::new(reader, memquota);
876        resolve_stream.read_msg().await
877    }
878
879    /// Shut down this circuit, along with all streams that are using it.
880    /// Happens asynchronously (i.e. the circuit won't necessarily be done shutting down
881    /// immediately after this function returns!).
882    ///
883    /// Note that other references to this circuit may exist.  If they
884    /// do, they will stop working after you call this function.
885    ///
886    /// It's not necessary to call this method if you're just done
887    /// with a circuit: the circuit should close on its own once nothing
888    /// is using it any more.
889    pub fn terminate(&self) {
890        let _ = self.command.unbounded_send(CtrlCmd::Shutdown);
891    }
892
893    /// Called when a circuit-level protocol error has occurred and the
894    /// circuit needs to shut down.
895    ///
896    /// This is a separate function because we may eventually want to have
897    /// it do more than just shut down.
898    ///
899    /// As with `terminate`, this function is asynchronous.
900    pub(crate) fn protocol_error(&self) {
901        self.terminate();
902    }
903
904    /// Return true if this circuit is closed and therefore unusable.
905    pub fn is_closing(&self) -> bool {
906        self.control.is_closed()
907    }
908
909    /// Return a process-unique identifier for this circuit.
910    pub fn unique_id(&self) -> UniqId {
911        self.unique_id
912    }
913
914    /// Return the number of hops in this circuit.
915    ///
916    /// NOTE: This function will currently return only the number of hops
917    /// _currently_ in the circuit. If there is an extend operation in progress,
918    /// the currently pending hop may or may not be counted, depending on whether
919    /// the extend operation finishes before this call is done.
920    pub fn n_hops(&self) -> usize {
921        self.mutable.lock().expect("poisoned lock").path.n_hops()
922    }
923
924    /// Return a future that will resolve once this circuit has closed.
925    ///
926    /// Note that this method does not itself cause the circuit to shut down.
927    ///
928    /// TODO: Perhaps this should return some kind of status indication instead
929    /// of just ()
930    #[cfg(feature = "experimental-api")]
931    pub fn wait_for_close(&self) -> impl futures::Future<Output = ()> + Send + Sync + 'static {
932        self.reactor_closed_rx.clone().map(|_| ())
933    }
934}
935
936/// Handle to use during an ongoing protocol exchange with a circuit's last hop
937///
938/// This is obtained from [`ClientCirc::start_conversation`],
939/// and used to send messages to the last hop relay.
940//
941// TODO(conflux): this should use ClientTunnel, and it should be moved into
942// the tunnel module.
943#[cfg(feature = "send-control-msg")]
944#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
945pub struct Conversation<'r>(&'r ClientCirc);
946
947#[cfg(feature = "send-control-msg")]
948#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
949impl Conversation<'_> {
950    /// Send a protocol message as part of an ad-hoc exchange
951    ///
952    /// Responses are handled by the `MsgHandler` set up
953    /// when the `Conversation` was created.
954    pub async fn send_message(&self, msg: tor_cell::relaycell::msg::AnyRelayMsg) -> Result<()> {
955        self.send_internal(Some(msg), None).await
956    }
957
958    /// Send a `SendMsgAndInstallHandler` to the reactor and wait for the outcome
959    ///
960    /// The guts of `start_conversation` and `Conversation::send_msg`
961    pub(crate) async fn send_internal(
962        &self,
963        msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
964        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
965    ) -> Result<()> {
966        let msg = msg.map(|msg| tor_cell::relaycell::AnyRelayMsgOuter::new(None, msg));
967        let (sender, receiver) = oneshot::channel();
968
969        let ctrl_msg = CtrlMsg::SendMsgAndInstallHandler {
970            msg,
971            handler,
972            sender,
973        };
974        self.0
975            .control
976            .unbounded_send(ctrl_msg)
977            .map_err(|_| Error::CircuitClosed)?;
978
979        receiver.await.map_err(|_| Error::CircuitClosed)?
980    }
981}
982
983impl PendingClientCirc {
984    /// Instantiate a new circuit object: used from Channel::new_circ().
985    ///
986    /// Does not send a CREATE* cell on its own.
987    ///
988    ///
989    pub(crate) fn new(
990        id: CircId,
991        channel: Arc<Channel>,
992        createdreceiver: oneshot::Receiver<CreateResponse>,
993        input: CircuitRxReceiver,
994        unique_id: UniqId,
995        memquota: CircuitAccount,
996    ) -> (PendingClientCirc, crate::tunnel::reactor::Reactor) {
997        let time_provider = channel.time_provider().clone();
998        let (reactor, control_tx, command_tx, reactor_closed_rx, mutable) =
999            Reactor::new(channel, id, unique_id, input, memquota.clone());
1000
1001        let circuit = ClientCirc {
1002            mutable,
1003            unique_id,
1004            control: control_tx,
1005            command: command_tx,
1006            reactor_closed_rx: reactor_closed_rx.shared(),
1007            #[cfg(test)]
1008            circid: id,
1009            memquota,
1010            time_provider,
1011        };
1012
1013        let pending = PendingClientCirc {
1014            recvcreated: createdreceiver,
1015            circ: Arc::new(circuit),
1016        };
1017        (pending, reactor)
1018    }
1019
1020    /// Extract the process-unique identifier for this pending circuit.
1021    pub fn peek_unique_id(&self) -> UniqId {
1022        self.circ.unique_id
1023    }
1024
1025    /// Use the (questionable!) CREATE_FAST handshake to connect to the
1026    /// first hop of this circuit.
1027    ///
1028    /// There's no authentication in CRATE_FAST,
1029    /// so we don't need to know whom we're connecting to: we're just
1030    /// connecting to whichever relay the channel is for.
1031    pub async fn create_firsthop_fast(self, params: &CircParameters) -> Result<Arc<ClientCirc>> {
1032        let (tx, rx) = oneshot::channel();
1033        self.circ
1034            .control
1035            .unbounded_send(CtrlMsg::Create {
1036                recv_created: self.recvcreated,
1037                handshake: CircuitHandshake::CreateFast,
1038                params: params.clone(),
1039                done: tx,
1040            })
1041            .map_err(|_| Error::CircuitClosed)?;
1042
1043        rx.await.map_err(|_| Error::CircuitClosed)??;
1044
1045        Ok(self.circ)
1046    }
1047
1048    /// Use the ntor handshake to connect to the first hop of this circuit.
1049    ///
1050    /// Note that the provided 'target' must match the channel's target,
1051    /// or the handshake will fail.
1052    pub async fn create_firsthop_ntor<Tg>(
1053        self,
1054        target: &Tg,
1055        params: CircParameters,
1056    ) -> Result<Arc<ClientCirc>>
1057    where
1058        Tg: tor_linkspec::CircTarget,
1059    {
1060        let (tx, rx) = oneshot::channel();
1061
1062        self.circ
1063            .control
1064            .unbounded_send(CtrlMsg::Create {
1065                recv_created: self.recvcreated,
1066                handshake: CircuitHandshake::Ntor {
1067                    public_key: NtorPublicKey {
1068                        id: *target
1069                            .rsa_identity()
1070                            .ok_or(Error::MissingId(RelayIdType::Rsa))?,
1071                        pk: *target.ntor_onion_key(),
1072                    },
1073                    ed_identity: *target
1074                        .ed_identity()
1075                        .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
1076                },
1077                params: params.clone(),
1078                done: tx,
1079            })
1080            .map_err(|_| Error::CircuitClosed)?;
1081
1082        rx.await.map_err(|_| Error::CircuitClosed)??;
1083
1084        Ok(self.circ)
1085    }
1086
1087    /// Use the ntor_v3 handshake to connect to the first hop of this circuit.
1088    ///
1089    /// Assumes that the target supports ntor_v3. The caller should verify
1090    /// this before calling this function, e.g. by validating that the target
1091    /// has advertised ["Relay=4"](https://spec.torproject.org/tor-spec/subprotocol-versioning.html#relay).
1092    ///
1093    /// Note that the provided 'target' must match the channel's target,
1094    /// or the handshake will fail.
1095    #[cfg(feature = "ntor_v3")]
1096    pub async fn create_firsthop_ntor_v3<Tg>(
1097        self,
1098        target: &Tg,
1099        params: CircParameters,
1100    ) -> Result<Arc<ClientCirc>>
1101    where
1102        Tg: tor_linkspec::CircTarget,
1103    {
1104        let (tx, rx) = oneshot::channel();
1105
1106        self.circ
1107            .control
1108            .unbounded_send(CtrlMsg::Create {
1109                recv_created: self.recvcreated,
1110                handshake: CircuitHandshake::NtorV3 {
1111                    public_key: NtorV3PublicKey {
1112                        id: *target
1113                            .ed_identity()
1114                            .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
1115                        pk: *target.ntor_onion_key(),
1116                    },
1117                },
1118                params: params.clone(),
1119                done: tx,
1120            })
1121            .map_err(|_| Error::CircuitClosed)?;
1122
1123        rx.await.map_err(|_| Error::CircuitClosed)??;
1124
1125        Ok(self.circ)
1126    }
1127}
1128
1129/// Convert a [`ResolvedVal`] into a Result, based on whether or not
1130/// it represents an error.
1131fn resolvedval_to_result(val: ResolvedVal) -> Result<ResolvedVal> {
1132    match val {
1133        ResolvedVal::TransientError => Err(Error::ResolveError(ResolveError::Transient)),
1134        ResolvedVal::NontransientError => Err(Error::ResolveError(ResolveError::Nontransient)),
1135        ResolvedVal::Unrecognized(_, _) => Err(Error::ResolveError(ResolveError::Unrecognized)),
1136        _ => Ok(val),
1137    }
1138}
1139
1140#[cfg(test)]
1141pub(crate) mod test {
1142    // @@ begin test lint list maintained by maint/add_warning @@
1143    #![allow(clippy::bool_assert_comparison)]
1144    #![allow(clippy::clone_on_copy)]
1145    #![allow(clippy::dbg_macro)]
1146    #![allow(clippy::mixed_attributes_style)]
1147    #![allow(clippy::print_stderr)]
1148    #![allow(clippy::print_stdout)]
1149    #![allow(clippy::single_char_pattern)]
1150    #![allow(clippy::unwrap_used)]
1151    #![allow(clippy::unchecked_duration_subtraction)]
1152    #![allow(clippy::useless_vec)]
1153    #![allow(clippy::needless_pass_by_value)]
1154    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
1155
1156    use super::*;
1157    use crate::channel::OpenChanCellS2C;
1158    use crate::channel::{test::new_reactor, CodecError};
1159    use crate::congestion::sendme;
1160    use crate::crypto::cell::RelayCellBody;
1161    #[cfg(feature = "ntor_v3")]
1162    use crate::crypto::handshake::ntor_v3::NtorV3Server;
1163    #[cfg(feature = "hs-service")]
1164    use crate::stream::IncomingStreamRequestFilter;
1165    use chanmsg::{AnyChanMsg, Created2, CreatedFast};
1166    use futures::channel::mpsc::{Receiver, Sender};
1167    use futures::io::{AsyncReadExt, AsyncWriteExt};
1168    use futures::sink::SinkExt;
1169    use futures::stream::StreamExt;
1170    use futures::task::SpawnExt;
1171    use hex_literal::hex;
1172    use std::collections::{HashMap, VecDeque};
1173    use std::fmt::Debug;
1174    use std::time::Duration;
1175    use tor_basic_utils::test_rng::testing_rng;
1176    use tor_cell::chancell::{msg as chanmsg, AnyChanCell, BoxedCellBody};
1177    use tor_cell::relaycell::extend::NtorV3Extension;
1178    use tor_cell::relaycell::{
1179        msg as relaymsg, AnyRelayMsgOuter, RelayCellFormat, RelayCmd, RelayMsg as _, StreamId,
1180    };
1181    use tor_linkspec::OwnedCircTarget;
1182    use tor_memquota::HasMemoryCost;
1183    use tor_rtcompat::Runtime;
1184    use tracing::trace;
1185    use tracing_test::traced_test;
1186
1187    impl PendingClientCirc {
1188        /// Testing only: Extract the circuit ID for this pending circuit.
1189        pub(crate) fn peek_circid(&self) -> CircId {
1190            self.circ.circid
1191        }
1192    }
1193
1194    impl ClientCirc {
1195        /// Testing only: Extract the circuit ID of this circuit.
1196        pub(crate) fn peek_circid(&self) -> CircId {
1197            self.circid
1198        }
1199    }
1200
1201    fn rmsg_to_ccmsg(id: Option<StreamId>, msg: relaymsg::AnyRelayMsg) -> ClientCircChanMsg {
1202        let body: BoxedCellBody = AnyRelayMsgOuter::new(id, msg)
1203            .encode(&mut testing_rng())
1204            .unwrap();
1205        let chanmsg = chanmsg::Relay::from(body);
1206        ClientCircChanMsg::Relay(chanmsg)
1207    }
1208
1209    // Example relay IDs and keys
1210    const EXAMPLE_SK: [u8; 32] =
1211        hex!("7789d92a89711a7e2874c61ea495452cfd48627b3ca2ea9546aafa5bf7b55803");
1212    const EXAMPLE_PK: [u8; 32] =
1213        hex!("395cb26b83b3cd4b91dba9913e562ae87d21ecdd56843da7ca939a6a69001253");
1214    const EXAMPLE_ED_ID: [u8; 32] = [6; 32];
1215    const EXAMPLE_RSA_ID: [u8; 20] = [10; 20];
1216
1217    /// Make an MPSC queue, of the type we use in Channels, but a fake one for testing
1218    #[cfg(test)]
1219    pub(crate) fn fake_mpsc<T: HasMemoryCost + Debug + Send>(
1220        buffer: usize,
1221    ) -> (StreamMpscSender<T>, StreamMpscReceiver<T>) {
1222        crate::fake_mpsc(buffer)
1223    }
1224
1225    /// return an example OwnedCircTarget that can get used for an ntor handshake.
1226    fn example_target() -> OwnedCircTarget {
1227        let mut builder = OwnedCircTarget::builder();
1228        builder
1229            .chan_target()
1230            .ed_identity(EXAMPLE_ED_ID.into())
1231            .rsa_identity(EXAMPLE_RSA_ID.into());
1232        builder
1233            .ntor_onion_key(EXAMPLE_PK.into())
1234            .protocols("FlowCtrl=1".parse().unwrap())
1235            .build()
1236            .unwrap()
1237    }
1238    fn example_ntor_key() -> crate::crypto::handshake::ntor::NtorSecretKey {
1239        crate::crypto::handshake::ntor::NtorSecretKey::new(
1240            EXAMPLE_SK.into(),
1241            EXAMPLE_PK.into(),
1242            EXAMPLE_RSA_ID.into(),
1243        )
1244    }
1245    #[cfg(feature = "ntor_v3")]
1246    fn example_ntor_v3_key() -> crate::crypto::handshake::ntor_v3::NtorV3SecretKey {
1247        crate::crypto::handshake::ntor_v3::NtorV3SecretKey::new(
1248            EXAMPLE_SK.into(),
1249            EXAMPLE_PK.into(),
1250            EXAMPLE_ED_ID.into(),
1251        )
1252    }
1253
1254    fn working_fake_channel<R: Runtime>(
1255        rt: &R,
1256    ) -> (
1257        Arc<Channel>,
1258        Receiver<AnyChanCell>,
1259        Sender<std::result::Result<OpenChanCellS2C, CodecError>>,
1260    ) {
1261        let (channel, chan_reactor, rx, tx) = new_reactor(rt.clone());
1262        rt.spawn(async {
1263            let _ignore = chan_reactor.run().await;
1264        })
1265        .unwrap();
1266        (channel, rx, tx)
1267    }
1268
1269    /// Which handshake type to use.
1270    #[derive(Copy, Clone)]
1271    enum HandshakeType {
1272        Fast,
1273        Ntor,
1274        #[cfg(feature = "ntor_v3")]
1275        NtorV3,
1276    }
1277
1278    async fn test_create<R: Runtime>(rt: &R, handshake_type: HandshakeType) {
1279        // We want to try progressing from a pending circuit to a circuit
1280        // via a crate_fast handshake.
1281
1282        use crate::crypto::handshake::{fast::CreateFastServer, ntor::NtorServer, ServerHandshake};
1283
1284        let (chan, mut rx, _sink) = working_fake_channel(rt);
1285        let circid = CircId::new(128).unwrap();
1286        let (created_send, created_recv) = oneshot::channel();
1287        let (_circmsg_send, circmsg_recv) = fake_mpsc(64);
1288        let unique_id = UniqId::new(23, 17);
1289
1290        let (pending, reactor) = PendingClientCirc::new(
1291            circid,
1292            chan,
1293            created_recv,
1294            circmsg_recv,
1295            unique_id,
1296            CircuitAccount::new_noop(),
1297        );
1298
1299        rt.spawn(async {
1300            let _ignore = reactor.run().await;
1301        })
1302        .unwrap();
1303
1304        // Future to pretend to be a relay on the other end of the circuit.
1305        let simulate_relay_fut = async move {
1306            let mut rng = testing_rng();
1307            let create_cell = rx.next().await.unwrap();
1308            assert_eq!(create_cell.circid(), Some(circid));
1309            let reply = match handshake_type {
1310                HandshakeType::Fast => {
1311                    let cf = match create_cell.msg() {
1312                        AnyChanMsg::CreateFast(cf) => cf,
1313                        other => panic!("{:?}", other),
1314                    };
1315                    let (_, rep) = CreateFastServer::server(
1316                        &mut rng,
1317                        &mut |_: &()| Some(()),
1318                        &[()],
1319                        cf.handshake(),
1320                    )
1321                    .unwrap();
1322                    CreateResponse::CreatedFast(CreatedFast::new(rep))
1323                }
1324                HandshakeType::Ntor => {
1325                    let c2 = match create_cell.msg() {
1326                        AnyChanMsg::Create2(c2) => c2,
1327                        other => panic!("{:?}", other),
1328                    };
1329                    let (_, rep) = NtorServer::server(
1330                        &mut rng,
1331                        &mut |_: &()| Some(()),
1332                        &[example_ntor_key()],
1333                        c2.body(),
1334                    )
1335                    .unwrap();
1336                    CreateResponse::Created2(Created2::new(rep))
1337                }
1338                #[cfg(feature = "ntor_v3")]
1339                HandshakeType::NtorV3 => {
1340                    let c2 = match create_cell.msg() {
1341                        AnyChanMsg::Create2(c2) => c2,
1342                        other => panic!("{:?}", other),
1343                    };
1344                    let (_, rep) = NtorV3Server::server(
1345                        &mut rng,
1346                        &mut |_: &_| Some(vec![]),
1347                        &[example_ntor_v3_key()],
1348                        c2.body(),
1349                    )
1350                    .unwrap();
1351                    CreateResponse::Created2(Created2::new(rep))
1352                }
1353            };
1354            created_send.send(reply).unwrap();
1355        };
1356        // Future to pretend to be a client.
1357        let client_fut = async move {
1358            let target = example_target();
1359            let params = CircParameters::default();
1360            let ret = match handshake_type {
1361                HandshakeType::Fast => {
1362                    trace!("doing fast create");
1363                    pending.create_firsthop_fast(&params).await
1364                }
1365                HandshakeType::Ntor => {
1366                    trace!("doing ntor create");
1367                    pending.create_firsthop_ntor(&target, params).await
1368                }
1369                #[cfg(feature = "ntor_v3")]
1370                HandshakeType::NtorV3 => {
1371                    trace!("doing ntor_v3 create");
1372                    pending.create_firsthop_ntor_v3(&target, params).await
1373                }
1374            };
1375            trace!("create done: result {:?}", ret);
1376            ret
1377        };
1378
1379        let (circ, _) = futures::join!(client_fut, simulate_relay_fut);
1380
1381        let _circ = circ.unwrap();
1382
1383        // pfew!  We've build a circuit!  Let's make sure it has one hop.
1384        assert_eq!(_circ.n_hops(), 1);
1385    }
1386
1387    #[traced_test]
1388    #[test]
1389    fn test_create_fast() {
1390        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1391            test_create(&rt, HandshakeType::Fast).await;
1392        });
1393    }
1394    #[traced_test]
1395    #[test]
1396    fn test_create_ntor() {
1397        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1398            test_create(&rt, HandshakeType::Ntor).await;
1399        });
1400    }
1401    #[cfg(feature = "ntor_v3")]
1402    #[traced_test]
1403    #[test]
1404    fn test_create_ntor_v3() {
1405        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1406            test_create(&rt, HandshakeType::NtorV3).await;
1407        });
1408    }
1409
1410    // An encryption layer that doesn't do any crypto.   Can be used
1411    // as inbound or outbound, but not both at once.
1412    pub(crate) struct DummyCrypto {
1413        counter_tag: [u8; 20],
1414        counter: u32,
1415        lasthop: bool,
1416    }
1417    impl DummyCrypto {
1418        fn next_tag(&mut self) -> &[u8; 20] {
1419            #![allow(clippy::identity_op)]
1420            self.counter_tag[0] = ((self.counter >> 0) & 255) as u8;
1421            self.counter_tag[1] = ((self.counter >> 8) & 255) as u8;
1422            self.counter_tag[2] = ((self.counter >> 16) & 255) as u8;
1423            self.counter_tag[3] = ((self.counter >> 24) & 255) as u8;
1424            self.counter += 1;
1425            &self.counter_tag
1426        }
1427    }
1428
1429    impl crate::crypto::cell::OutboundClientLayer for DummyCrypto {
1430        fn originate_for(&mut self, _cell: &mut RelayCellBody) -> &[u8] {
1431            self.next_tag()
1432        }
1433        fn encrypt_outbound(&mut self, _cell: &mut RelayCellBody) {}
1434    }
1435    impl crate::crypto::cell::InboundClientLayer for DummyCrypto {
1436        fn decrypt_inbound(&mut self, _cell: &mut RelayCellBody) -> Option<&[u8]> {
1437            if self.lasthop {
1438                Some(self.next_tag())
1439            } else {
1440                None
1441            }
1442        }
1443    }
1444    impl DummyCrypto {
1445        pub(crate) fn new(lasthop: bool) -> Self {
1446            DummyCrypto {
1447                counter_tag: [0; 20],
1448                counter: 0,
1449                lasthop,
1450            }
1451        }
1452    }
1453
1454    // Helper: set up a 3-hop circuit with no encryption, where the
1455    // next inbound message seems to come from hop next_msg_from
1456    async fn newcirc_ext<R: Runtime>(
1457        rt: &R,
1458        chan: Arc<Channel>,
1459        next_msg_from: HopNum,
1460    ) -> (Arc<ClientCirc>, CircuitRxSender) {
1461        let circid = CircId::new(128).unwrap();
1462        let (_created_send, created_recv) = oneshot::channel();
1463        let (circmsg_send, circmsg_recv) = fake_mpsc(64);
1464        let unique_id = UniqId::new(23, 17);
1465
1466        let (pending, reactor) = PendingClientCirc::new(
1467            circid,
1468            chan,
1469            created_recv,
1470            circmsg_recv,
1471            unique_id,
1472            CircuitAccount::new_noop(),
1473        );
1474
1475        rt.spawn(async {
1476            let _ignore = reactor.run().await;
1477        })
1478        .unwrap();
1479
1480        let PendingClientCirc {
1481            circ,
1482            recvcreated: _,
1483        } = pending;
1484
1485        // TODO #1067: Support other formats
1486        let relay_cell_format = RelayCellFormat::V0;
1487        for idx in 0_u8..3 {
1488            let params = CircParameters::default();
1489            let (tx, rx) = oneshot::channel();
1490            circ.command
1491                .unbounded_send(CtrlCmd::AddFakeHop {
1492                    relay_cell_format,
1493                    fwd_lasthop: idx == 2,
1494                    rev_lasthop: idx == u8::from(next_msg_from),
1495                    params,
1496                    done: tx,
1497                })
1498                .unwrap();
1499            rx.await.unwrap().unwrap();
1500        }
1501
1502        (circ, circmsg_send)
1503    }
1504
1505    // Helper: set up a 3-hop circuit with no encryption, where the
1506    // next inbound message seems to come from hop next_msg_from
1507    async fn newcirc<R: Runtime>(rt: &R, chan: Arc<Channel>) -> (Arc<ClientCirc>, CircuitRxSender) {
1508        newcirc_ext(rt, chan, 2.into()).await
1509    }
1510
1511    async fn test_extend<R: Runtime>(rt: &R, handshake_type: HandshakeType) {
1512        use crate::crypto::handshake::{ntor::NtorServer, ServerHandshake};
1513
1514        let (chan, mut rx, _sink) = working_fake_channel(rt);
1515        let (circ, mut sink) = newcirc(rt, chan).await;
1516        let circid = circ.peek_circid();
1517        let params = CircParameters::default();
1518
1519        let extend_fut = async move {
1520            let target = example_target();
1521            match handshake_type {
1522                HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
1523                HandshakeType::Ntor => circ.extend_ntor(&target, &params).await.unwrap(),
1524                #[cfg(feature = "ntor_v3")]
1525                HandshakeType::NtorV3 => circ.extend_ntor_v3(&target, &params).await.unwrap(),
1526            };
1527            circ // gotta keep the circ alive, or the reactor would exit.
1528        };
1529        let reply_fut = async move {
1530            // We've disabled encryption on this circuit, so we can just
1531            // read the extend2 cell.
1532            let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1533            assert_eq!(id, Some(circid));
1534            let rmsg = match chmsg {
1535                AnyChanMsg::RelayEarly(r) => {
1536                    AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1537                        .unwrap()
1538                }
1539                other => panic!("{:?}", other),
1540            };
1541            let e2 = match rmsg.msg() {
1542                AnyRelayMsg::Extend2(e2) => e2,
1543                other => panic!("{:?}", other),
1544            };
1545            let mut rng = testing_rng();
1546            let reply = match handshake_type {
1547                HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
1548                HandshakeType::Ntor => {
1549                    let (_keygen, reply) = NtorServer::server(
1550                        &mut rng,
1551                        &mut |_: &()| Some(()),
1552                        &[example_ntor_key()],
1553                        e2.handshake(),
1554                    )
1555                    .unwrap();
1556                    reply
1557                }
1558                #[cfg(feature = "ntor_v3")]
1559                HandshakeType::NtorV3 => {
1560                    let (_keygen, reply) = NtorV3Server::server(
1561                        &mut rng,
1562                        &mut |_: &[NtorV3Extension]| Some(vec![]),
1563                        &[example_ntor_v3_key()],
1564                        e2.handshake(),
1565                    )
1566                    .unwrap();
1567                    reply
1568                }
1569            };
1570
1571            let extended2 = relaymsg::Extended2::new(reply).into();
1572            sink.send(rmsg_to_ccmsg(None, extended2)).await.unwrap();
1573            (sink, rx) // gotta keep the sink and receiver alive, or the reactor will exit.
1574        };
1575
1576        let (circ, _) = futures::join!(extend_fut, reply_fut);
1577
1578        // Did we really add another hop?
1579        assert_eq!(circ.n_hops(), 4);
1580
1581        // Do the path accessors report a reasonable outcome?
1582        #[allow(deprecated)]
1583        {
1584            let path = circ.path();
1585            assert_eq!(path.len(), 4);
1586            use tor_linkspec::HasRelayIds;
1587            assert_eq!(path[3].ed_identity(), example_target().ed_identity());
1588            assert_ne!(path[0].ed_identity(), example_target().ed_identity());
1589        }
1590        {
1591            let path = circ.path_ref();
1592            assert_eq!(path.n_hops(), 4);
1593            use tor_linkspec::HasRelayIds;
1594            assert_eq!(
1595                path.hops()[3].as_chan_target().unwrap().ed_identity(),
1596                example_target().ed_identity()
1597            );
1598            assert_ne!(
1599                path.hops()[0].as_chan_target().unwrap().ed_identity(),
1600                example_target().ed_identity()
1601            );
1602        }
1603    }
1604
1605    #[traced_test]
1606    #[test]
1607    fn test_extend_ntor() {
1608        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1609            test_extend(&rt, HandshakeType::Ntor).await;
1610        });
1611    }
1612
1613    #[cfg(feature = "ntor_v3")]
1614    #[traced_test]
1615    #[test]
1616    fn test_extend_ntor_v3() {
1617        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1618            test_extend(&rt, HandshakeType::NtorV3).await;
1619        });
1620    }
1621
1622    async fn bad_extend_test_impl<R: Runtime>(
1623        rt: &R,
1624        reply_hop: HopNum,
1625        bad_reply: ClientCircChanMsg,
1626    ) -> Error {
1627        let (chan, _rx, _sink) = working_fake_channel(rt);
1628        let (circ, mut sink) = newcirc_ext(rt, chan, reply_hop).await;
1629        let params = CircParameters::default();
1630
1631        let target = example_target();
1632        #[allow(clippy::clone_on_copy)]
1633        let rtc = rt.clone();
1634        let sink_handle = rt
1635            .spawn_with_handle(async move {
1636                rtc.sleep(Duration::from_millis(100)).await;
1637                sink.send(bad_reply).await.unwrap();
1638                sink
1639            })
1640            .unwrap();
1641        let outcome = circ.extend_ntor(&target, &params).await;
1642        let _sink = sink_handle.await;
1643
1644        assert_eq!(circ.n_hops(), 3);
1645        assert!(outcome.is_err());
1646        outcome.unwrap_err()
1647    }
1648
1649    #[traced_test]
1650    #[test]
1651    fn bad_extend_wronghop() {
1652        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1653            let extended2 = relaymsg::Extended2::new(vec![]).into();
1654            let cc = rmsg_to_ccmsg(None, extended2);
1655
1656            let error = bad_extend_test_impl(&rt, 1.into(), cc).await;
1657            // This case shows up as a CircDestroy, since a message sent
1658            // from the wrong hop won't even be delivered to the extend
1659            // code's meta-handler.  Instead the unexpected message will cause
1660            // the circuit to get torn down.
1661            match error {
1662                Error::CircuitClosed => {}
1663                x => panic!("got other error: {}", x),
1664            }
1665        });
1666    }
1667
1668    #[traced_test]
1669    #[test]
1670    fn bad_extend_wrongtype() {
1671        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1672            let extended = relaymsg::Extended::new(vec![7; 200]).into();
1673            let cc = rmsg_to_ccmsg(None, extended);
1674
1675            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
1676            match error {
1677                Error::BytesErr {
1678                    err: tor_bytes::Error::InvalidMessage(_),
1679                    object: "extended2 message",
1680                } => {}
1681                other => panic!("{:?}", other),
1682            }
1683        });
1684    }
1685
1686    #[traced_test]
1687    #[test]
1688    fn bad_extend_destroy() {
1689        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1690            let cc = ClientCircChanMsg::Destroy(chanmsg::Destroy::new(4.into()));
1691            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
1692            match error {
1693                Error::CircuitClosed => {}
1694                other => panic!("{:?}", other),
1695            }
1696        });
1697    }
1698
1699    #[traced_test]
1700    #[test]
1701    fn bad_extend_crypto() {
1702        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1703            let extended2 = relaymsg::Extended2::new(vec![99; 256]).into();
1704            let cc = rmsg_to_ccmsg(None, extended2);
1705            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
1706            assert!(matches!(error, Error::BadCircHandshakeAuth));
1707        });
1708    }
1709
1710    #[traced_test]
1711    #[test]
1712    fn begindir() {
1713        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1714            let (chan, mut rx, _sink) = working_fake_channel(&rt);
1715            let (circ, mut sink) = newcirc(&rt, chan).await;
1716            let circid = circ.peek_circid();
1717
1718            let begin_and_send_fut = async move {
1719                // Here we'll say we've got a circuit, and we want to
1720                // make a simple BEGINDIR request with it.
1721                let mut stream = circ.begin_dir_stream().await.unwrap();
1722                stream.write_all(b"HTTP/1.0 GET /\r\n").await.unwrap();
1723                stream.flush().await.unwrap();
1724                let mut buf = [0_u8; 1024];
1725                let n = stream.read(&mut buf).await.unwrap();
1726                assert_eq!(&buf[..n], b"HTTP/1.0 404 Not found\r\n");
1727                let n = stream.read(&mut buf).await.unwrap();
1728                assert_eq!(n, 0);
1729                stream
1730            };
1731            let reply_fut = async move {
1732                // We've disabled encryption on this circuit, so we can just
1733                // read the begindir cell.
1734                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1735                assert_eq!(id, Some(circid));
1736                let rmsg = match chmsg {
1737                    AnyChanMsg::Relay(r) => {
1738                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1739                            .unwrap()
1740                    }
1741                    other => panic!("{:?}", other),
1742                };
1743                let (streamid, rmsg) = rmsg.into_streamid_and_msg();
1744                assert!(matches!(rmsg, AnyRelayMsg::BeginDir(_)));
1745
1746                // Reply with a Connected cell to indicate success.
1747                let connected = relaymsg::Connected::new_empty().into();
1748                sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
1749
1750                // Now read a DATA cell...
1751                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1752                assert_eq!(id, Some(circid));
1753                let rmsg = match chmsg {
1754                    AnyChanMsg::Relay(r) => {
1755                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1756                            .unwrap()
1757                    }
1758                    other => panic!("{:?}", other),
1759                };
1760                let (streamid_2, rmsg) = rmsg.into_streamid_and_msg();
1761                assert_eq!(streamid_2, streamid);
1762                if let AnyRelayMsg::Data(d) = rmsg {
1763                    assert_eq!(d.as_ref(), &b"HTTP/1.0 GET /\r\n"[..]);
1764                } else {
1765                    panic!();
1766                }
1767
1768                // Write another data cell in reply!
1769                let data = relaymsg::Data::new(b"HTTP/1.0 404 Not found\r\n")
1770                    .unwrap()
1771                    .into();
1772                sink.send(rmsg_to_ccmsg(streamid, data)).await.unwrap();
1773
1774                // Send an END cell to say that the conversation is over.
1775                let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into();
1776                sink.send(rmsg_to_ccmsg(streamid, end)).await.unwrap();
1777
1778                (rx, sink) // gotta keep these alive, or the reactor will exit.
1779            };
1780
1781            let (_stream, (_rx, _sink)) = futures::join!(begin_and_send_fut, reply_fut);
1782        });
1783    }
1784
1785    // Test: close a stream, either by dropping it or by calling AsyncWriteExt::close.
1786    fn close_stream_helper(by_drop: bool) {
1787        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1788            let (chan, mut rx, _sink) = working_fake_channel(&rt);
1789            let (circ, mut sink) = newcirc(&rt, chan).await;
1790
1791            let stream_fut = async move {
1792                let stream = circ
1793                    .begin_stream("www.example.com", 80, None)
1794                    .await
1795                    .unwrap();
1796
1797                let (r, mut w) = stream.split();
1798                if by_drop {
1799                    // Drop the writer and the reader, which should close the stream.
1800                    drop(r);
1801                    drop(w);
1802                    (None, circ) // make sure to keep the circuit alive
1803                } else {
1804                    // Call close on the writer, while keeping the reader alive.
1805                    w.close().await.unwrap();
1806                    (Some(r), circ)
1807                }
1808            };
1809            let handler_fut = async {
1810                // Read the BEGIN message.
1811                let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
1812                let rmsg = match msg {
1813                    AnyChanMsg::Relay(r) => {
1814                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1815                            .unwrap()
1816                    }
1817                    other => panic!("{:?}", other),
1818                };
1819                let (streamid, rmsg) = rmsg.into_streamid_and_msg();
1820                assert_eq!(rmsg.cmd(), RelayCmd::BEGIN);
1821
1822                // Reply with a CONNECTED.
1823                let connected =
1824                    relaymsg::Connected::new_with_addr("10.0.0.1".parse().unwrap(), 1234).into();
1825                sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
1826
1827                // Expect an END.
1828                let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
1829                let rmsg = match msg {
1830                    AnyChanMsg::Relay(r) => {
1831                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1832                            .unwrap()
1833                    }
1834                    other => panic!("{:?}", other),
1835                };
1836                let (_, rmsg) = rmsg.into_streamid_and_msg();
1837                assert_eq!(rmsg.cmd(), RelayCmd::END);
1838
1839                (rx, sink) // keep these alive or the reactor will exit.
1840            };
1841
1842            let ((_opt_reader, _circ), (_rx, _sink)) = futures::join!(stream_fut, handler_fut);
1843        });
1844    }
1845
1846    #[traced_test]
1847    #[test]
1848    fn drop_stream() {
1849        close_stream_helper(true);
1850    }
1851
1852    #[traced_test]
1853    #[test]
1854    fn close_stream() {
1855        close_stream_helper(false);
1856    }
1857
1858    // Set up a circuit and stream that expects some incoming SENDMEs.
1859    async fn setup_incoming_sendme_case<R: Runtime>(
1860        rt: &R,
1861        n_to_send: usize,
1862    ) -> (
1863        Arc<ClientCirc>,
1864        DataStream,
1865        CircuitRxSender,
1866        Option<StreamId>,
1867        usize,
1868        Receiver<AnyChanCell>,
1869        Sender<std::result::Result<OpenChanCellS2C, CodecError>>,
1870    ) {
1871        let (chan, mut rx, sink2) = working_fake_channel(rt);
1872        let (circ, mut sink) = newcirc(rt, chan).await;
1873        let circid = circ.peek_circid();
1874
1875        let begin_and_send_fut = {
1876            let circ = circ.clone();
1877            async move {
1878                // Take our circuit and make a stream on it.
1879                let mut stream = circ
1880                    .begin_stream("www.example.com", 443, None)
1881                    .await
1882                    .unwrap();
1883                let junk = [0_u8; 1024];
1884                let mut remaining = n_to_send;
1885                while remaining > 0 {
1886                    let n = std::cmp::min(remaining, junk.len());
1887                    stream.write_all(&junk[..n]).await.unwrap();
1888                    remaining -= n;
1889                }
1890                stream.flush().await.unwrap();
1891                stream
1892            }
1893        };
1894
1895        let receive_fut = async move {
1896            // Read the begin cell.
1897            let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1898            let rmsg = match chmsg {
1899                AnyChanMsg::Relay(r) => {
1900                    AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1901                        .unwrap()
1902                }
1903                other => panic!("{:?}", other),
1904            };
1905            let (streamid, rmsg) = rmsg.into_streamid_and_msg();
1906            assert!(matches!(rmsg, AnyRelayMsg::Begin(_)));
1907            // Reply with a connected cell...
1908            let connected = relaymsg::Connected::new_empty().into();
1909            sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
1910            // Now read bytes from the stream until we have them all.
1911            let mut bytes_received = 0_usize;
1912            let mut cells_received = 0_usize;
1913            while bytes_received < n_to_send {
1914                // Read a data cell, and remember how much we got.
1915                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1916                assert_eq!(id, Some(circid));
1917
1918                let rmsg = match chmsg {
1919                    AnyChanMsg::Relay(r) => {
1920                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1921                            .unwrap()
1922                    }
1923                    other => panic!("{:?}", other),
1924                };
1925                let (streamid2, rmsg) = rmsg.into_streamid_and_msg();
1926                assert_eq!(streamid2, streamid);
1927                if let AnyRelayMsg::Data(dat) = rmsg {
1928                    cells_received += 1;
1929                    bytes_received += dat.as_ref().len();
1930                } else {
1931                    panic!();
1932                }
1933            }
1934
1935            (sink, streamid, cells_received, rx)
1936        };
1937
1938        let (stream, (sink, streamid, cells_received, rx)) =
1939            futures::join!(begin_and_send_fut, receive_fut);
1940
1941        (circ, stream, sink, streamid, cells_received, rx, sink2)
1942    }
1943
1944    #[traced_test]
1945    #[test]
1946    fn accept_valid_sendme() {
1947        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
1948            let (circ, _stream, mut sink, streamid, cells_received, _rx, _sink2) =
1949                setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
1950
1951            assert_eq!(cells_received, 301);
1952
1953            // Make sure that the circuit is indeed expecting the right sendmes
1954            {
1955                let (tx, rx) = oneshot::channel();
1956                circ.command
1957                    .unbounded_send(CtrlCmd::QuerySendWindow {
1958                        hop: 2.into(),
1959                        done: tx,
1960                    })
1961                    .unwrap();
1962                let (window, tags) = rx.await.unwrap().unwrap();
1963                assert_eq!(window, 1000 - 301);
1964                assert_eq!(tags.len(), 3);
1965                // 100
1966                assert_eq!(
1967                    tags[0],
1968                    sendme::CircTag::from(hex!("6400000000000000000000000000000000000000"))
1969                );
1970                // 200
1971                assert_eq!(
1972                    tags[1],
1973                    sendme::CircTag::from(hex!("c800000000000000000000000000000000000000"))
1974                );
1975                // 300
1976                assert_eq!(
1977                    tags[2],
1978                    sendme::CircTag::from(hex!("2c01000000000000000000000000000000000000"))
1979                );
1980            }
1981
1982            let reply_with_sendme_fut = async move {
1983                // make and send a circuit-level sendme.
1984                let c_sendme =
1985                    relaymsg::Sendme::new_tag(hex!("6400000000000000000000000000000000000000"))
1986                        .into();
1987                sink.send(rmsg_to_ccmsg(None, c_sendme)).await.unwrap();
1988
1989                // Make and send a stream-level sendme.
1990                let s_sendme = relaymsg::Sendme::new_empty().into();
1991                sink.send(rmsg_to_ccmsg(streamid, s_sendme)).await.unwrap();
1992
1993                sink
1994            };
1995
1996            let _sink = reply_with_sendme_fut.await;
1997
1998            rt.advance_until_stalled().await;
1999
2000            // Now make sure that the circuit is still happy, and its
2001            // window is updated.
2002            {
2003                let (tx, rx) = oneshot::channel();
2004                circ.command
2005                    .unbounded_send(CtrlCmd::QuerySendWindow {
2006                        hop: 2.into(),
2007                        done: tx,
2008                    })
2009                    .unwrap();
2010                let (window, _tags) = rx.await.unwrap().unwrap();
2011                assert_eq!(window, 1000 - 201);
2012            }
2013        });
2014    }
2015
2016    #[traced_test]
2017    #[test]
2018    fn invalid_circ_sendme() {
2019        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2020            // Same setup as accept_valid_sendme() test above but try giving
2021            // a sendme with the wrong tag.
2022
2023            let (circ, _stream, mut sink, _streamid, _cells_received, _rx, _sink2) =
2024                setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
2025
2026            let reply_with_sendme_fut = async move {
2027                // make and send a circuit-level sendme with a bad tag.
2028                let c_sendme =
2029                    relaymsg::Sendme::new_tag(hex!("FFFF0000000000000000000000000000000000FF"))
2030                        .into();
2031                sink.send(rmsg_to_ccmsg(None, c_sendme)).await.unwrap();
2032                sink
2033            };
2034
2035            let _sink = reply_with_sendme_fut.await;
2036
2037            // Check whether the reactor dies as a result of receiving invalid data.
2038            rt.advance_until_stalled().await;
2039            assert!(circ.is_closing());
2040        });
2041    }
2042
2043    #[traced_test]
2044    #[test]
2045    fn test_busy_stream_fairness() {
2046        // Number of streams to use.
2047        const N_STREAMS: usize = 3;
2048        // Number of cells (roughly) for each stream to send.
2049        const N_CELLS: usize = 20;
2050        // Number of bytes that *each* stream will send, and that we'll read
2051        // from the channel.
2052        const N_BYTES: usize = relaymsg::Data::MAXLEN * N_CELLS;
2053        // Ignoring cell granularity, with perfect fairness we'd expect
2054        // `N_BYTES/N_STREAMS` bytes from each stream.
2055        //
2056        // We currently allow for up to a full cell less than that.  This is
2057        // somewhat arbitrary and can be changed as needed, since we don't
2058        // provide any specific fairness guarantees.
2059        const MIN_EXPECTED_BYTES_PER_STREAM: usize = N_BYTES / N_STREAMS - relaymsg::Data::MAXLEN;
2060
2061        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2062            let (chan, mut rx, _sink) = working_fake_channel(&rt);
2063            let (circ, mut sink) = newcirc(&rt, chan).await;
2064
2065            // Run clients in a single task, doing our own round-robin
2066            // scheduling of writes to the reactor. Conversely, if we were to
2067            // put each client in its own task, we would be at the the mercy of
2068            // how fairly the runtime schedules the client tasks, which is outside
2069            // the scope of this test.
2070            rt.spawn({
2071                // Clone the circuit to keep it alive after writers have
2072                // finished with it.
2073                let circ = circ.clone();
2074                async move {
2075                    let mut clients = VecDeque::new();
2076                    struct Client {
2077                        stream: DataStream,
2078                        to_write: &'static [u8],
2079                    }
2080                    for _ in 0..N_STREAMS {
2081                        clients.push_back(Client {
2082                            stream: circ
2083                                .begin_stream("www.example.com", 80, None)
2084                                .await
2085                                .unwrap(),
2086                            to_write: &[0_u8; N_BYTES][..],
2087                        });
2088                    }
2089                    while let Some(mut client) = clients.pop_front() {
2090                        if client.to_write.is_empty() {
2091                            // Client is done. Don't put back in queue.
2092                            continue;
2093                        }
2094                        let written = client.stream.write(client.to_write).await.unwrap();
2095                        client.to_write = &client.to_write[written..];
2096                        clients.push_back(client);
2097                    }
2098                }
2099            })
2100            .unwrap();
2101
2102            let channel_handler_fut = async {
2103                let mut stream_bytes_received = HashMap::<StreamId, usize>::new();
2104                let mut total_bytes_received = 0;
2105
2106                loop {
2107                    let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
2108                    let rmsg = match msg {
2109                        AnyChanMsg::Relay(r) => AnyRelayMsgOuter::decode_singleton(
2110                            RelayCellFormat::V0,
2111                            r.into_relay_body(),
2112                        )
2113                        .unwrap(),
2114                        other => panic!("Unexpected chanmsg: {other:?}"),
2115                    };
2116                    let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2117                    match rmsg.cmd() {
2118                        RelayCmd::BEGIN => {
2119                            // Add an entry for this stream.
2120                            let prev = stream_bytes_received.insert(streamid.unwrap(), 0);
2121                            assert_eq!(prev, None);
2122                            // Reply with a CONNECTED.
2123                            let connected = relaymsg::Connected::new_with_addr(
2124                                "10.0.0.1".parse().unwrap(),
2125                                1234,
2126                            )
2127                            .into();
2128                            sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2129                        }
2130                        RelayCmd::DATA => {
2131                            let data_msg = relaymsg::Data::try_from(rmsg).unwrap();
2132                            let nbytes = data_msg.as_ref().len();
2133                            total_bytes_received += nbytes;
2134                            let streamid = streamid.unwrap();
2135                            let stream_bytes = stream_bytes_received.get_mut(&streamid).unwrap();
2136                            *stream_bytes += nbytes;
2137                            if total_bytes_received >= N_BYTES {
2138                                break;
2139                            }
2140                        }
2141                        RelayCmd::END => {
2142                            // Stream is done. If fair scheduling is working as
2143                            // expected we *probably* shouldn't get here, but we
2144                            // can ignore it and save the failure until we
2145                            // actually have the final stats.
2146                            continue;
2147                        }
2148                        other => {
2149                            panic!("Unexpected command {other:?}");
2150                        }
2151                    }
2152                }
2153
2154                // Return our stats, along with the `rx` and `sink` to keep the
2155                // reactor alive (since clients could still be writing).
2156                (total_bytes_received, stream_bytes_received, rx, sink)
2157            };
2158
2159            let (total_bytes_received, stream_bytes_received, _rx, _sink) =
2160                channel_handler_fut.await;
2161            assert_eq!(stream_bytes_received.len(), N_STREAMS);
2162            for (sid, stream_bytes) in stream_bytes_received {
2163                assert!(
2164                    stream_bytes >= MIN_EXPECTED_BYTES_PER_STREAM,
2165                    "Only {stream_bytes} of {total_bytes_received} bytes received from {N_STREAMS} came from {sid:?}; expected at least {MIN_EXPECTED_BYTES_PER_STREAM}"
2166                );
2167            }
2168        });
2169    }
2170
2171    #[test]
2172    fn basic_params() {
2173        use super::CircParameters;
2174        let mut p = CircParameters::default();
2175        assert!(p.extend_by_ed25519_id);
2176
2177        p.extend_by_ed25519_id = false;
2178        assert!(!p.extend_by_ed25519_id);
2179    }
2180
2181    #[cfg(feature = "hs-service")]
2182    struct AllowAllStreamsFilter;
2183    #[cfg(feature = "hs-service")]
2184    impl IncomingStreamRequestFilter for AllowAllStreamsFilter {
2185        fn disposition(
2186            &mut self,
2187            _ctx: &crate::stream::IncomingStreamRequestContext<'_>,
2188            _circ: &crate::tunnel::reactor::syncview::ClientCircSyncView<'_>,
2189        ) -> Result<crate::stream::IncomingStreamRequestDisposition> {
2190            Ok(crate::stream::IncomingStreamRequestDisposition::Accept)
2191        }
2192    }
2193
2194    #[traced_test]
2195    #[test]
2196    #[cfg(feature = "hs-service")]
2197    fn allow_stream_requests_twice() {
2198        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2199            let (chan, _rx, _sink) = working_fake_channel(&rt);
2200            let (circ, _send) = newcirc(&rt, chan).await;
2201
2202            let _incoming = circ
2203                .allow_stream_requests(
2204                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2205                    circ.last_hop_num().unwrap(),
2206                    AllowAllStreamsFilter,
2207                )
2208                .await
2209                .unwrap();
2210
2211            let incoming = circ
2212                .allow_stream_requests(
2213                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2214                    circ.last_hop_num().unwrap(),
2215                    AllowAllStreamsFilter,
2216                )
2217                .await;
2218
2219            // There can only be one IncomingStream at a time on any given circuit.
2220            assert!(incoming.is_err());
2221        });
2222    }
2223
2224    #[traced_test]
2225    #[test]
2226    #[cfg(feature = "hs-service")]
2227    fn allow_stream_requests() {
2228        use tor_cell::relaycell::msg::BeginFlags;
2229
2230        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2231            const TEST_DATA: &[u8] = b"ping";
2232
2233            let (chan, _rx, _sink) = working_fake_channel(&rt);
2234            let (circ, mut send) = newcirc(&rt, chan).await;
2235
2236            // A helper channel for coordinating the "client"/"service" interaction
2237            let (tx, rx) = oneshot::channel();
2238            let mut incoming = circ
2239                .allow_stream_requests(
2240                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2241                    circ.last_hop_num().unwrap(),
2242                    AllowAllStreamsFilter,
2243                )
2244                .await
2245                .unwrap();
2246
2247            let simulate_service = async move {
2248                let stream = incoming.next().await.unwrap();
2249                let mut data_stream = stream
2250                    .accept_data(relaymsg::Connected::new_empty())
2251                    .await
2252                    .unwrap();
2253                // Notify the client task we're ready to accept DATA cells
2254                tx.send(()).unwrap();
2255
2256                // Read the data the client sent us
2257                let mut buf = [0_u8; TEST_DATA.len()];
2258                data_stream.read_exact(&mut buf).await.unwrap();
2259                assert_eq!(&buf, TEST_DATA);
2260
2261                circ
2262            };
2263
2264            let simulate_client = async move {
2265                let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2266                let body: BoxedCellBody =
2267                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2268                        .encode(&mut testing_rng())
2269                        .unwrap();
2270                let begin_msg = chanmsg::Relay::from(body);
2271
2272                // Pretend to be a client at the other end of the circuit sending a begin cell
2273                send.send(ClientCircChanMsg::Relay(begin_msg))
2274                    .await
2275                    .unwrap();
2276
2277                // Wait until the service is ready to accept data
2278                // TODO: we shouldn't need to wait! This is needed because the service will reject
2279                // any DATA cells that aren't associated with a known stream. We need to wait until
2280                // the service receives our BEGIN cell (and the reactor updates hop.map with the
2281                // new stream).
2282                rx.await.unwrap();
2283                // Now send some data along the newly established circuit..
2284                let data = relaymsg::Data::new(TEST_DATA).unwrap();
2285                let body: BoxedCellBody =
2286                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
2287                        .encode(&mut testing_rng())
2288                        .unwrap();
2289                let data_msg = chanmsg::Relay::from(body);
2290
2291                send.send(ClientCircChanMsg::Relay(data_msg)).await.unwrap();
2292                send
2293            };
2294
2295            let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2296        });
2297    }
2298
2299    #[traced_test]
2300    #[test]
2301    #[cfg(feature = "hs-service")]
2302    fn accept_stream_after_reject() {
2303        use tor_cell::relaycell::msg::BeginFlags;
2304        use tor_cell::relaycell::msg::EndReason;
2305
2306        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2307            const TEST_DATA: &[u8] = b"ping";
2308            const STREAM_COUNT: usize = 2;
2309
2310            let (chan, _rx, _sink) = working_fake_channel(&rt);
2311            let (circ, mut send) = newcirc(&rt, chan).await;
2312
2313            // A helper channel for coordinating the "client"/"service" interaction
2314            let (mut tx, mut rx) = mpsc::channel(STREAM_COUNT);
2315
2316            let mut incoming = circ
2317                .allow_stream_requests(
2318                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2319                    circ.last_hop_num().unwrap(),
2320                    AllowAllStreamsFilter,
2321                )
2322                .await
2323                .unwrap();
2324
2325            let simulate_service = async move {
2326                // Process 2 incoming streams
2327                for i in 0..STREAM_COUNT {
2328                    let stream = incoming.next().await.unwrap();
2329
2330                    // Reject the first one
2331                    if i == 0 {
2332                        stream
2333                            .reject(relaymsg::End::new_with_reason(EndReason::INTERNAL))
2334                            .await
2335                            .unwrap();
2336                        // Notify the client
2337                        tx.send(()).await.unwrap();
2338                        continue;
2339                    }
2340
2341                    let mut data_stream = stream
2342                        .accept_data(relaymsg::Connected::new_empty())
2343                        .await
2344                        .unwrap();
2345                    // Notify the client task we're ready to accept DATA cells
2346                    tx.send(()).await.unwrap();
2347
2348                    // Read the data the client sent us
2349                    let mut buf = [0_u8; TEST_DATA.len()];
2350                    data_stream.read_exact(&mut buf).await.unwrap();
2351                    assert_eq!(&buf, TEST_DATA);
2352                }
2353
2354                circ
2355            };
2356
2357            let simulate_client = async move {
2358                let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2359                let body: BoxedCellBody =
2360                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2361                        .encode(&mut testing_rng())
2362                        .unwrap();
2363                let begin_msg = chanmsg::Relay::from(body);
2364
2365                // Pretend to be a client at the other end of the circuit sending 2 identical begin
2366                // cells (the first one will be rejected by the test service).
2367                for _ in 0..STREAM_COUNT {
2368                    send.send(ClientCircChanMsg::Relay(begin_msg.clone()))
2369                        .await
2370                        .unwrap();
2371
2372                    // Wait until the service rejects our request
2373                    rx.next().await.unwrap();
2374                }
2375
2376                // Now send some data along the newly established circuit..
2377                let data = relaymsg::Data::new(TEST_DATA).unwrap();
2378                let body: BoxedCellBody =
2379                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
2380                        .encode(&mut testing_rng())
2381                        .unwrap();
2382                let data_msg = chanmsg::Relay::from(body);
2383
2384                send.send(ClientCircChanMsg::Relay(data_msg)).await.unwrap();
2385                send
2386            };
2387
2388            let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2389        });
2390    }
2391
2392    #[traced_test]
2393    #[test]
2394    #[cfg(feature = "hs-service")]
2395    fn incoming_stream_bad_hop() {
2396        use tor_cell::relaycell::msg::BeginFlags;
2397
2398        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2399            /// Expect the originator of the BEGIN cell to be hop 1.
2400            const EXPECTED_HOP: u8 = 1;
2401
2402            let (chan, _rx, _sink) = working_fake_channel(&rt);
2403            let (circ, mut send) = newcirc(&rt, chan).await;
2404
2405            // Expect to receive incoming streams from hop EXPECTED_HOP
2406            let mut incoming = circ
2407                .allow_stream_requests(
2408                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2409                    EXPECTED_HOP.into(),
2410                    AllowAllStreamsFilter,
2411                )
2412                .await
2413                .unwrap();
2414
2415            let simulate_service = async move {
2416                // The originator of the cell is actually the last hop on the circuit, not hop 1,
2417                // so we expect the reactor to shut down.
2418                assert!(incoming.next().await.is_none());
2419                circ
2420            };
2421
2422            let simulate_client = async move {
2423                let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2424                let body: BoxedCellBody =
2425                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2426                        .encode(&mut testing_rng())
2427                        .unwrap();
2428                let begin_msg = chanmsg::Relay::from(body);
2429
2430                // Pretend to be a client at the other end of the circuit sending a begin cell
2431                send.send(ClientCircChanMsg::Relay(begin_msg))
2432                    .await
2433                    .unwrap();
2434
2435                send
2436            };
2437
2438            let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2439        });
2440    }
2441}