Skip to main content

tor_proto/client/
circuit.rs

1//! Multi-hop paths over the Tor network.
2//!
3//! Right now, we only implement "client circuits" -- also sometimes
4//! called "origin circuits".  A client circuit is one that is
5//! constructed by this Tor instance, and used in its own behalf to
6//! send data over the Tor network.
7//!
8//! Each circuit has multiple hops over the Tor network: each hop
9//! knows only the hop before and the hop after.  The client shares a
10//! separate set of keys with each hop.
11//!
12//! To build a circuit, first create a [crate::channel::Channel], then
13//! call its [crate::channel::Channel::new_tunnel] method.  This yields
14//! a [PendingClientTunnel] object that won't become live until you call
15//! one of the methods
16//! (typically [`PendingClientTunnel::create_firsthop`])
17//! that extends it to its first hop.  After you've
18//! done that, you can call [`ClientCirc::extend`] on the tunnel to
19//! build it into a multi-hop tunnel.  Finally, you can use
20//! [ClientTunnel::begin_stream] to get a Stream object that can be used
21//! for anonymized data.
22//!
23//! # Implementation
24//!
25//! Each open circuit has a corresponding Reactor object that runs in
26//! an asynchronous task, and manages incoming cells from the
27//! circuit's upstream channel.  These cells are either RELAY cells or
28//! DESTROY cells.  DESTROY cells are handled immediately.
29//! RELAY cells are either for a particular stream, in which case they
30//! get forwarded to a StreamReceiver object, or for no particular stream,
31//! in which case they are considered "meta" cells (like EXTENDED2)
32//! that should only get accepted if something is waiting for them.
33//!
34//! # Limitations
35//!
36//! This is client-only.
37
38pub(crate) mod halfcirc;
39
40#[cfg(feature = "hs-common")]
41pub mod handshake;
42#[cfg(not(feature = "hs-common"))]
43pub(crate) mod handshake;
44
45pub(crate) mod padding;
46
47pub(super) mod path;
48
49use crate::channel::Channel;
50use crate::circuit::circhop::{HopNegotiationType, HopSettings};
51use crate::circuit::{CircuitRxReceiver, celltypes::*};
52#[cfg(feature = "circ-padding-manual")]
53use crate::client::CircuitPadder;
54use crate::client::circuit::padding::{PaddingController, PaddingEventStream};
55use crate::client::reactor::{CircuitHandshake, CtrlCmd, CtrlMsg, Reactor};
56use crate::crypto::cell::HopNum;
57use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
58use crate::memquota::CircuitAccount;
59use crate::util::skew::ClockSkew;
60use crate::{Error, Result};
61use derive_deftly::Deftly;
62use educe::Educe;
63use path::HopDetail;
64use tor_cell::chancell::{
65    CircId,
66    msg::{self as chanmsg},
67};
68use tor_error::{bad_api_usage, internal, into_internal};
69use tor_linkspec::{CircTarget, LinkSpecType, OwnedChanTarget, RelayIdType};
70use tor_protover::named;
71use tor_rtcompat::DynTimeProvider;
72use web_time_compat::Instant;
73
74use crate::circuit::UniqId;
75
76use super::{ClientTunnel, TargetHop};
77
78use futures::channel::mpsc;
79use oneshot_fused_workaround as oneshot;
80
81use futures::FutureExt as _;
82use std::collections::HashMap;
83use std::sync::{Arc, Mutex};
84use tor_memquota::derive_deftly_template_HasMemoryCost;
85
86use crate::crypto::handshake::ntor::NtorPublicKey;
87
88pub use crate::crypto::binding::CircuitBinding;
89pub use path::{Path, PathEntry};
90
91// TODO: export this from the top-level instead (it's not client-specific).
92pub use crate::circuit::CircParameters;
93
94// TODO(relay): reexport this from somewhere else (it's not client-specific)
95pub use crate::util::timeout::TimeoutEstimator;
96
97/// A subclass of ChanMsg that can correctly arrive on a live client
98/// circuit (one where a CREATED* has been received).
99#[derive(Debug, Deftly)]
100#[allow(unreachable_pub)] // Only `pub` with feature `testing`; otherwise, visible in crate
101#[derive_deftly(HasMemoryCost)]
102#[derive_deftly(RestrictedChanMsgSet)]
103#[deftly(usage = "on an open client circuit")]
104pub(super) enum ClientCircChanMsg {
105    /// A relay cell telling us some kind of remote command from some
106    /// party on the circuit.
107    Relay(chanmsg::Relay),
108    /// A cell telling us to destroy the circuit.
109    Destroy(chanmsg::Destroy),
110    // Note: RelayEarly is not valid for clients!
111}
112
113#[derive(Debug)]
114/// A circuit that we have constructed over the Tor network.
115///
116/// # Circuit life cycle
117///
118/// `ClientCirc`s are created in an initially unusable state using [`Channel::new_tunnel`],
119/// which returns a [`PendingClientTunnel`].  To get a real (one-hop) tunnel from
120/// one of these, you invoke one of its `create_firsthop` methods (typically
121/// [`create_firsthop_fast()`](PendingClientTunnel::create_firsthop_fast) or
122/// [`create_firsthop()`](PendingClientTunnel::create_firsthop)).
123/// Then, to add more hops to the circuit, you can call
124/// [`extend()`](ClientCirc::extend) on it.
125///
126/// For higher-level APIs, see the `tor-circmgr` crate: the ones here in
127/// `tor-proto` are probably not what you need.
128///
129/// After a circuit is created, it will persist until it is closed in one of
130/// five ways:
131///    1. A remote error occurs.
132///    2. Some hop on the circuit sends a `DESTROY` message to tear down the
133///       circuit.
134///    3. The circuit's channel is closed.
135///    4. Someone calls [`ClientTunnel::terminate`] on the tunnel owning the circuit.
136///    5. The last reference to the `ClientCirc` is dropped. (Note that every stream
137///       on a `ClientCirc` keeps a reference to it, which will in turn keep the
138///       circuit from closing until all those streams have gone away.)
139///
140/// Note that in cases 1-4 the [`ClientCirc`] object itself will still exist: it
141/// will just be unusable for most purposes.  Most operations on it will fail
142/// with an error.
143//
144// Effectively, this struct contains two Arcs: one for `path` and one for
145// `control` (which surely has something Arc-like in it).  We cannot unify
146// these by putting a single Arc around the whole struct, and passing
147// an Arc strong reference to the `Reactor`, because then `control` would
148// not be dropped when the last user of the circuit goes away.  We could
149// make the reactor have a weak reference but weak references are more
150// expensive to dereference.
151//
152// Because of the above, cloning this struct is always going to involve
153// two atomic refcount changes/checks.  Wrapping it in another Arc would
154// be overkill.
155//
156pub struct ClientCirc {
157    /// Mutable state shared with the `Reactor`.
158    pub(super) mutable: Arc<TunnelMutableState>,
159    /// A unique identifier for this circuit.
160    unique_id: UniqId,
161    /// Channel to send control messages to the reactor.
162    pub(super) control: mpsc::UnboundedSender<CtrlMsg>,
163    /// Channel to send commands to the reactor.
164    pub(super) command: mpsc::UnboundedSender<CtrlCmd>,
165    /// A future that resolves to Cancelled once the reactor is shut down,
166    /// meaning that the circuit is closed.
167    #[cfg_attr(not(feature = "experimental-api"), allow(dead_code))]
168    reactor_closed_rx: futures::future::Shared<oneshot::Receiver<void::Void>>,
169    /// For testing purposes: the CircId, for use in peek_circid().
170    #[cfg(test)]
171    circid: CircId,
172    /// Memory quota account
173    pub(super) memquota: CircuitAccount,
174    /// Time provider
175    pub(super) time_provider: DynTimeProvider,
176    /// Indicate if this reactor is a multi path or not. This is flagged at the very first
177    /// LinkCircuit seen and never changed after.
178    ///
179    /// We can't just look at the number of legs because a multi path tunnel could have 1 leg only
180    /// because the other(s) have collapsed.
181    ///
182    /// This is very important because it allows to make a quick efficient safety check by the
183    /// circmgr higher level tunnel type without locking the mutable state or using the command
184    /// channel.
185    pub(super) is_multi_path: bool,
186}
187
188/// The mutable state of a tunnel, shared between [`ClientCirc`] and [`Reactor`].
189///
190/// NOTE(gabi): this mutex-inside-a-mutex might look suspicious,
191/// but it is currently the best option we have for sharing
192/// the circuit state with `ClientCirc` (and soon, with `ClientTunnel`).
193/// In practice, these mutexes won't be accessed very often
194/// (they're accessed for writing when a circuit is extended,
195/// and for reading by the various `ClientCirc` APIs),
196/// so they shouldn't really impact performance.
197///
198/// Alternatively, the circuit state information could be shared
199/// outside the reactor through a channel (passed to the reactor via a `CtrlCmd`),
200/// but in #1840 @opara notes that involves making the `ClientCirc` accessors
201/// (`ClientCirc::path`, `ClientCirc::binding_key`, etc.)
202/// asynchronous, which will significantly complicate their callsites,
203/// which would in turn need to be made async too.
204///
205/// We should revisit this decision at some point, and decide whether an async API
206/// would be preferable.
207#[derive(Debug, Default)]
208pub(super) struct TunnelMutableState(Mutex<HashMap<UniqId, Arc<MutableState>>>);
209
210impl TunnelMutableState {
211    /// Add the [`MutableState`] of a circuit.
212    pub(super) fn insert(&self, unique_id: UniqId, mutable: Arc<MutableState>) {
213        #[allow(unused)] // unused in non-debug builds
214        let state = self
215            .0
216            .lock()
217            .expect("lock poisoned")
218            .insert(unique_id, mutable);
219
220        debug_assert!(state.is_none());
221    }
222
223    /// Remove the [`MutableState`] of a circuit.
224    pub(super) fn remove(&self, unique_id: UniqId) {
225        #[allow(unused)] // unused in non-debug builds
226        let state = self.0.lock().expect("lock poisoned").remove(&unique_id);
227
228        debug_assert!(state.is_some());
229    }
230
231    /// Return a [`Path`] object describing all the circuits in this tunnel.
232    fn all_paths(&self) -> Vec<Arc<Path>> {
233        let lock = self.0.lock().expect("lock poisoned");
234        lock.values().map(|mutable| mutable.path()).collect()
235    }
236
237    /// Return a representation of the Paths for all the circuits in this tunnel,
238    /// as a map from each circuits' UniqId to its path.
239    ///
240    /// This is only exposed for the RPC subsystem, where it is documented that the
241    /// format of `UniqId` is not stable.
242    #[cfg(feature = "rpc")]
243    pub(super) fn tagged_paths(&self) -> HashMap<UniqId, Arc<Path>> {
244        let lock = self.0.lock().expect("lock poisoned");
245        lock.iter()
246            .map(|(id, mutable)| (*id, mutable.path()))
247            .collect()
248    }
249
250    /// Return a list of [`Path`] objects describing the only circuit in this tunnel.
251    ///
252    /// Returns an error if the tunnel has more than one tunnel.
253    //
254    // TODO: replace Itertools::exactly_one() with a stdlib equivalent when there is one.
255    //
256    // See issue #48919 <https://github.com/rust-lang/rust/issues/48919>
257    #[allow(unstable_name_collisions)]
258    fn single_path(&self) -> Result<Arc<Path>> {
259        use itertools::Itertools as _;
260
261        self.all_paths().into_iter().exactly_one().map_err(|_| {
262            bad_api_usage!("requested the single path of a multi-path tunnel?!").into()
263        })
264    }
265
266    /// Return a description of the first hop of this circuit.
267    ///
268    /// Returns an error if a circuit with the specified [`UniqId`] doesn't exist.
269    /// Returns `Ok(None)` if the specified circuit doesn't have any hops.
270    fn first_hop(&self, unique_id: UniqId) -> Result<Option<OwnedChanTarget>> {
271        let lock = self.0.lock().expect("lock poisoned");
272        let mutable = lock
273            .get(&unique_id)
274            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
275
276        let first_hop = mutable.first_hop().map(|first_hop| match first_hop {
277            path::HopDetail::Relay(r) => r,
278            #[cfg(feature = "hs-common")]
279            path::HopDetail::Virtual => {
280                panic!("somehow made a circuit with a virtual first hop.")
281            }
282        });
283
284        Ok(first_hop)
285    }
286
287    /// Return the [`HopNum`] of the last hop of the specified circuit.
288    ///
289    /// Returns an error if a circuit with the specified [`UniqId`] doesn't exist.
290    ///
291    /// See [`MutableState::last_hop_num`].
292    pub(super) fn last_hop_num(&self, unique_id: UniqId) -> Result<Option<HopNum>> {
293        let lock = self.0.lock().expect("lock poisoned");
294        let mutable = lock
295            .get(&unique_id)
296            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
297
298        Ok(mutable.last_hop_num())
299    }
300
301    /// Return the number of hops in the specified circuit.
302    ///
303    /// See [`MutableState::n_hops`].
304    fn n_hops(&self, unique_id: UniqId) -> Result<usize> {
305        let lock = self.0.lock().expect("lock poisoned");
306        let mutable = lock
307            .get(&unique_id)
308            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
309
310        Ok(mutable.n_hops())
311    }
312}
313
314/// The mutable state of a circuit.
315#[derive(Educe, Default)]
316#[educe(Debug)]
317pub(super) struct MutableState(Mutex<CircuitState>);
318
319impl MutableState {
320    /// Add a hop to the path of this circuit.
321    pub(super) fn add_hop(&self, peer_id: HopDetail, binding: Option<CircuitBinding>) {
322        let mut mutable = self.0.lock().expect("poisoned lock");
323        Arc::make_mut(&mut mutable.path).push_hop(peer_id);
324        mutable.binding.push(binding);
325    }
326
327    /// Get a copy of the circuit's current [`path::Path`].
328    pub(super) fn path(&self) -> Arc<path::Path> {
329        let mutable = self.0.lock().expect("poisoned lock");
330        Arc::clone(&mutable.path)
331    }
332
333    /// Return the cryptographic material used to prove knowledge of a shared
334    /// secret with with `hop`.
335    pub(super) fn binding_key(&self, hop: HopNum) -> Option<CircuitBinding> {
336        let mutable = self.0.lock().expect("poisoned lock");
337
338        mutable.binding.get::<usize>(hop.into()).cloned().flatten()
339        // NOTE: I'm not thrilled to have to copy this information, but we use
340        // it very rarely, so it's not _that_ bad IMO.
341    }
342
343    /// Return a description of the first hop of this circuit.
344    fn first_hop(&self) -> Option<HopDetail> {
345        let mutable = self.0.lock().expect("poisoned lock");
346        mutable.path.first_hop()
347    }
348
349    /// Return the [`HopNum`] of the last hop of this circuit.
350    ///
351    /// NOTE: This function will return the [`HopNum`] of the hop
352    /// that is _currently_ the last. If there is an extend operation in progress,
353    /// the currently pending hop may or may not be counted, depending on whether
354    /// the extend operation finishes before this call is done.
355    fn last_hop_num(&self) -> Option<HopNum> {
356        let mutable = self.0.lock().expect("poisoned lock");
357        mutable.path.last_hop_num()
358    }
359
360    /// Return the number of hops in this circuit.
361    ///
362    /// NOTE: This function will currently return only the number of hops
363    /// _currently_ in the circuit. If there is an extend operation in progress,
364    /// the currently pending hop may or may not be counted, depending on whether
365    /// the extend operation finishes before this call is done.
366    fn n_hops(&self) -> usize {
367        let mutable = self.0.lock().expect("poisoned lock");
368        mutable.path.n_hops()
369    }
370}
371
372/// The shared state of a circuit.
373#[derive(Educe, Default)]
374#[educe(Debug)]
375pub(super) struct CircuitState {
376    /// Information about this circuit's path.
377    ///
378    /// This is stored in an Arc so that we can cheaply give a copy of it to
379    /// client code; when we need to add a hop (which is less frequent) we use
380    /// [`Arc::make_mut()`].
381    path: Arc<path::Path>,
382
383    /// Circuit binding keys [q.v.][`CircuitBinding`] information for each hop
384    /// in the circuit's path.
385    ///
386    /// NOTE: Right now, there is a `CircuitBinding` for every hop.  There's a
387    /// fair chance that this will change in the future, and I don't want other
388    /// code to assume that a `CircuitBinding` _must_ exist, so I'm making this
389    /// an `Option`.
390    #[educe(Debug(ignore))]
391    binding: Vec<Option<CircuitBinding>>,
392}
393
394/// A ClientCirc that needs to send a create cell and receive a created* cell.
395///
396/// To use one of these, call `create_firsthop_fast()` or `create_firsthop()`
397/// to negotiate the cryptographic handshake with the first hop.
398pub struct PendingClientTunnel {
399    /// A oneshot receiver on which we'll receive a CREATED* cell,
400    /// or a DESTROY cell.
401    recvcreated: oneshot::Receiver<CreateResponse>,
402    /// The ClientCirc object that we can expose on success.
403    circ: ClientCirc,
404}
405
406impl ClientCirc {
407    /// Convert this `ClientCirc` into a single circuit [`ClientTunnel`].
408    pub fn into_tunnel(self) -> Result<ClientTunnel> {
409        self.try_into()
410    }
411
412    /// Return a description of the first hop of this circuit.
413    ///
414    /// # Panics
415    ///
416    /// Panics if there is no first hop.  (This should be impossible outside of
417    /// the tor-proto crate, but within the crate it's possible to have a
418    /// circuit with no hops.)
419    pub fn first_hop(&self) -> Result<OwnedChanTarget> {
420        Ok(self
421            .mutable
422            .first_hop(self.unique_id)
423            .map_err(|_| Error::CircuitClosed)?
424            .expect("called first_hop on an un-constructed circuit"))
425    }
426
427    /// Return a description of the last hop of the tunnel.
428    ///
429    /// Return None if the last hop is virtual.
430    ///
431    /// # Panics
432    ///
433    /// Panics if there is no last hop.  (This should be impossible outside of
434    /// the tor-proto crate, but within the crate it's possible to have a
435    /// circuit with no hops.)
436    pub fn last_hop_info(&self) -> Result<Option<OwnedChanTarget>> {
437        let all_paths = self.all_paths();
438        let path = all_paths.first().ok_or_else(|| {
439            tor_error::bad_api_usage!("Called last_hop_info on an un-constructed tunnel")
440        })?;
441        Ok(path
442            .hops()
443            .last()
444            .expect("Called last_hop on an un-constructed circuit")
445            .as_chan_target()
446            .map(OwnedChanTarget::from_chan_target))
447    }
448
449    /// Return the [`HopNum`] of the last hop of this circuit.
450    ///
451    /// Returns an error if there is no last hop.  (This should be impossible outside of the
452    /// tor-proto crate, but within the crate it's possible to have a circuit with no hops.)
453    ///
454    /// NOTE: This function will return the [`HopNum`] of the hop
455    /// that is _currently_ the last. If there is an extend operation in progress,
456    /// the currently pending hop may or may not be counted, depending on whether
457    /// the extend operation finishes before this call is done.
458    pub fn last_hop_num(&self) -> Result<HopNum> {
459        Ok(self
460            .mutable
461            .last_hop_num(self.unique_id)?
462            .ok_or_else(|| internal!("no last hop index"))?)
463    }
464
465    /// Return a [`TargetHop`] representing precisely the last hop of the circuit as in set as a
466    /// HopLocation with its id and hop number.
467    ///
468    /// Return an error if there is no last hop.
469    pub fn last_hop(&self) -> Result<TargetHop> {
470        let hop_num = self
471            .mutable
472            .last_hop_num(self.unique_id)?
473            .ok_or_else(|| bad_api_usage!("no last hop"))?;
474        Ok((self.unique_id, hop_num).into())
475    }
476
477    /// Return a list of [`Path`] objects describing all the circuits in this tunnel.
478    ///
479    /// Note that these `Path`s are not automatically updated if the underlying
480    /// circuits are extended.
481    pub fn all_paths(&self) -> Vec<Arc<Path>> {
482        self.mutable.all_paths()
483    }
484
485    /// Return a list of [`Path`] objects describing the only circuit in this tunnel.
486    ///
487    /// Returns an error if the tunnel has more than one tunnel.
488    pub fn single_path(&self) -> Result<Arc<Path>> {
489        self.mutable.single_path()
490    }
491
492    /// Return the time at which this circuit last had any open streams.
493    ///
494    /// Returns `None` if this circuit has never had any open streams,
495    /// or if it currently has open streams.
496    ///
497    /// NOTE that the Instant returned by this method is not affected by
498    /// any runtime mocking; it is the output of an ordinary call to
499    /// `Instant::get()`.
500    pub async fn disused_since(&self) -> Result<Option<Instant>> {
501        let (tx, rx) = oneshot::channel();
502        self.command
503            .unbounded_send(CtrlCmd::GetTunnelActivity { sender: tx })
504            .map_err(|_| Error::CircuitClosed)?;
505
506        Ok(rx.await.map_err(|_| Error::CircuitClosed)?.disused_since())
507    }
508
509    /// Get the clock skew claimed by the first hop of the circuit.
510    ///
511    /// See [`Channel::clock_skew()`].
512    pub async fn first_hop_clock_skew(&self) -> Result<ClockSkew> {
513        let (tx, rx) = oneshot::channel();
514
515        self.control
516            .unbounded_send(CtrlMsg::FirstHopClockSkew { answer: tx })
517            .map_err(|_| Error::CircuitClosed)?;
518
519        Ok(rx.await.map_err(|_| Error::CircuitClosed)??)
520    }
521
522    /// Return a reference to this circuit's memory quota account
523    pub fn mq_account(&self) -> &CircuitAccount {
524        &self.memquota
525    }
526
527    /// Return the cryptographic material used to prove knowledge of a shared
528    /// secret with with `hop`.
529    ///
530    /// See [`CircuitBinding`] for more information on how this is used.
531    ///
532    /// Return None if we have no circuit binding information for the hop, or if
533    /// the hop does not exist.
534    #[cfg(feature = "hs-service")]
535    pub async fn binding_key(&self, hop: TargetHop) -> Result<Option<CircuitBinding>> {
536        let (sender, receiver) = oneshot::channel();
537        let msg = CtrlCmd::GetBindingKey { hop, done: sender };
538        self.command
539            .unbounded_send(msg)
540            .map_err(|_| Error::CircuitClosed)?;
541
542        receiver.await.map_err(|_| Error::CircuitClosed)?
543    }
544
545    /// Extend the circuit, via the most appropriate circuit extension handshake,
546    /// to the chosen `target` hop.
547    pub async fn extend<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
548    where
549        Tg: CircTarget,
550    {
551        #![allow(deprecated)]
552
553        // For now we use the simplest decision-making mechanism:
554        // we use ntor_v3 whenever it is present; and otherwise we use ntor.
555        //
556        // This behavior is slightly different from C tor, which uses ntor v3
557        // only whenever it want to send any extension in the circuit message.
558        // But thanks to congestion control (named::FLOWCTRL_CC), we'll _always_
559        // want to use an extension if we can, and so it doesn't make too much
560        // sense to detect the case where we have no extensions.
561        //
562        // (As of April 2025, RELAY_NTORV3 is not yet listed as Required for relays
563        // on the tor network, and so we cannot simply assume that everybody has it.)
564        if target
565            .protovers()
566            .supports_named_subver(named::RELAY_NTORV3)
567        {
568            self.extend_ntor_v3(target, params).await
569        } else {
570            self.extend_ntor(target, params).await
571        }
572    }
573
574    /// Extend the circuit via the ntor handshake to a new target last
575    /// hop.
576    #[deprecated(since = "1.6.1", note = "Use extend instead.")]
577    pub async fn extend_ntor<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
578    where
579        Tg: CircTarget,
580    {
581        let key = NtorPublicKey {
582            id: *target
583                .rsa_identity()
584                .ok_or(Error::MissingId(RelayIdType::Rsa))?,
585            pk: *target.ntor_onion_key(),
586        };
587        let mut linkspecs = target
588            .linkspecs()
589            .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
590        if !params.extend_by_ed25519_id {
591            linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
592        }
593
594        let (tx, rx) = oneshot::channel();
595
596        let peer_id = OwnedChanTarget::from_chan_target(target);
597        let settings = HopSettings::from_params_and_caps(
598            HopNegotiationType::None,
599            &params,
600            target.protovers(),
601        )?;
602        self.control
603            .unbounded_send(CtrlMsg::ExtendNtor {
604                peer_id,
605                public_key: key,
606                linkspecs,
607                settings,
608                done: tx,
609            })
610            .map_err(|_| Error::CircuitClosed)?;
611
612        rx.await.map_err(|_| Error::CircuitClosed)??;
613
614        Ok(())
615    }
616
617    /// Extend the circuit via the ntor handshake to a new target last
618    /// hop.
619    #[deprecated(since = "1.6.1", note = "Use extend instead.")]
620    pub async fn extend_ntor_v3<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
621    where
622        Tg: CircTarget,
623    {
624        let key = NtorV3PublicKey {
625            id: *target
626                .ed_identity()
627                .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
628            pk: *target.ntor_onion_key(),
629        };
630        let mut linkspecs = target
631            .linkspecs()
632            .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
633        if !params.extend_by_ed25519_id {
634            linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
635        }
636
637        let (tx, rx) = oneshot::channel();
638
639        let peer_id = OwnedChanTarget::from_chan_target(target);
640        let settings = HopSettings::from_params_and_caps(
641            HopNegotiationType::Full,
642            &params,
643            target.protovers(),
644        )?;
645        self.control
646            .unbounded_send(CtrlMsg::ExtendNtorV3 {
647                peer_id,
648                public_key: key,
649                linkspecs,
650                settings,
651                done: tx,
652            })
653            .map_err(|_| Error::CircuitClosed)?;
654
655        rx.await.map_err(|_| Error::CircuitClosed)??;
656
657        Ok(())
658    }
659
660    /// Extend this circuit by a single, "virtual" hop.
661    ///
662    /// A virtual hop is one for which we do not add an actual network connection
663    /// between separate hosts (such as Relays).  We only add a layer of
664    /// cryptography.
665    ///
666    /// This is used to implement onion services: the client and the service
667    /// both build a circuit to a single rendezvous point, and tell the
668    /// rendezvous point to relay traffic between their two circuits.  Having
669    /// completed a [`handshake`] out of band[^1], the parties each extend their
670    /// circuits by a single "virtual" encryption hop that represents their
671    /// shared cryptographic context.
672    ///
673    /// Once a circuit has been extended in this way, it is an error to try to
674    /// extend it in any other way.
675    ///
676    /// [^1]: Technically, the handshake is only _mostly_ out of band: the
677    ///     client sends their half of the handshake in an ` message, and the
678    ///     service's response is inline in its `RENDEZVOUS2` message.
679    //
680    // TODO hs: let's try to enforce the "you can't extend a circuit again once
681    // it has been extended this way" property.  We could do that with internal
682    // state, or some kind of a type state pattern.
683    #[cfg(feature = "hs-common")]
684    pub async fn extend_virtual(
685        &self,
686        protocol: handshake::RelayProtocol,
687        role: handshake::HandshakeRole,
688        seed: impl handshake::KeyGenerator,
689        params: &CircParameters,
690        capabilities: &tor_protover::Protocols,
691    ) -> Result<()> {
692        use self::handshake::BoxedClientLayer;
693
694        // TODO CGO: Possibly refactor this match into a separate method when we revisit this.
695        let negotiation_type = match protocol {
696            handshake::RelayProtocol::HsV3 => HopNegotiationType::HsV3,
697        };
698        let protocol = handshake::RelayCryptLayerProtocol::from(protocol);
699
700        let BoxedClientLayer { fwd, back, binding } =
701            protocol.construct_client_layers(role, seed)?;
702
703        let settings = HopSettings::from_params_and_caps(negotiation_type, params, capabilities)?;
704        let (tx, rx) = oneshot::channel();
705        let message = CtrlCmd::ExtendVirtual {
706            cell_crypto: (fwd, back, binding),
707            settings,
708            done: tx,
709        };
710
711        self.command
712            .unbounded_send(message)
713            .map_err(|_| Error::CircuitClosed)?;
714
715        rx.await.map_err(|_| Error::CircuitClosed)?
716    }
717
718    /// Install a [`CircuitPadder`] at the listed `hop`.
719    ///
720    /// Replaces any previous padder installed at that hop.
721    #[cfg(feature = "circ-padding-manual")]
722    pub async fn start_padding_at_hop(&self, hop: HopNum, padder: CircuitPadder) -> Result<()> {
723        self.set_padder_impl(crate::HopLocation::Hop((self.unique_id, hop)), Some(padder))
724            .await
725    }
726
727    /// Remove any [`CircuitPadder`] at the listed `hop`.
728    ///
729    /// Does nothing if there was not a padder installed there.
730    #[cfg(feature = "circ-padding-manual")]
731    pub async fn stop_padding_at_hop(&self, hop: HopNum) -> Result<()> {
732        self.set_padder_impl(crate::HopLocation::Hop((self.unique_id, hop)), None)
733            .await
734    }
735
736    /// Helper: replace the padder at `hop` with the provided `padder`, or with `None`.
737    #[cfg(feature = "circ-padding-manual")]
738    pub(super) async fn set_padder_impl(
739        &self,
740        hop: crate::HopLocation,
741        padder: Option<CircuitPadder>,
742    ) -> Result<()> {
743        let (tx, rx) = oneshot::channel();
744        let msg = CtrlCmd::SetPadder {
745            hop,
746            padder,
747            sender: tx,
748        };
749        self.command
750            .unbounded_send(msg)
751            .map_err(|_| Error::CircuitClosed)?;
752        rx.await.map_err(|_| Error::CircuitClosed)?
753    }
754
755    /// Return true if this circuit is closed and therefore unusable.
756    pub fn is_closing(&self) -> bool {
757        self.control.is_closed()
758    }
759
760    /// Return a process-unique identifier for this circuit.
761    pub fn unique_id(&self) -> UniqId {
762        self.unique_id
763    }
764
765    /// Return the number of hops in this circuit.
766    ///
767    /// NOTE: This function will currently return only the number of hops
768    /// _currently_ in the circuit. If there is an extend operation in progress,
769    /// the currently pending hop may or may not be counted, depending on whether
770    /// the extend operation finishes before this call is done.
771    pub fn n_hops(&self) -> Result<usize> {
772        self.mutable
773            .n_hops(self.unique_id)
774            .map_err(|_| Error::CircuitClosed)
775    }
776
777    /// Return a future that will resolve once this circuit has closed.
778    ///
779    /// Note that this method does not itself cause the circuit to shut down.
780    ///
781    /// TODO: Perhaps this should return some kind of status indication instead
782    /// of just ()
783    pub fn wait_for_close(
784        &self,
785    ) -> impl futures::Future<Output = ()> + Send + Sync + 'static + use<> {
786        self.reactor_closed_rx.clone().map(|_| ())
787    }
788}
789
790impl PendingClientTunnel {
791    /// Instantiate a new circuit object: used from Channel::new_tunnel().
792    ///
793    /// Does not send a CREATE* cell on its own.
794    #[allow(clippy::too_many_arguments)]
795    pub(crate) fn new(
796        id: CircId,
797        channel: Arc<Channel>,
798        createdreceiver: oneshot::Receiver<CreateResponse>,
799        input: CircuitRxReceiver,
800        unique_id: UniqId,
801        runtime: DynTimeProvider,
802        memquota: CircuitAccount,
803        padding_ctrl: PaddingController,
804        padding_stream: PaddingEventStream,
805        timeouts: Arc<dyn TimeoutEstimator>,
806    ) -> (PendingClientTunnel, crate::client::reactor::Reactor) {
807        let time_provider = channel.time_provider().clone();
808        let (reactor, control_tx, command_tx, reactor_closed_rx, mutable) = Reactor::new(
809            channel,
810            id,
811            unique_id,
812            input,
813            runtime,
814            memquota.clone(),
815            padding_ctrl,
816            padding_stream,
817            timeouts,
818        );
819
820        let circuit = ClientCirc {
821            mutable,
822            unique_id,
823            control: control_tx,
824            command: command_tx,
825            reactor_closed_rx: reactor_closed_rx.shared(),
826            #[cfg(test)]
827            circid: id,
828            memquota,
829            time_provider,
830            is_multi_path: false,
831        };
832
833        let pending = PendingClientTunnel {
834            recvcreated: createdreceiver,
835            circ: circuit,
836        };
837        (pending, reactor)
838    }
839
840    /// Extract the process-unique identifier for this pending circuit.
841    pub fn peek_unique_id(&self) -> UniqId {
842        self.circ.unique_id
843    }
844
845    /// Use the (questionable!) CREATE_FAST handshake to connect to the
846    /// first hop of this circuit.
847    ///
848    /// There's no authentication in CRATE_FAST,
849    /// so we don't need to know whom we're connecting to: we're just
850    /// connecting to whichever relay the channel is for.
851    pub async fn create_firsthop_fast(self, params: CircParameters) -> Result<ClientTunnel> {
852        // We know nothing about this relay, so we assume it supports no protocol capabilities at all.
853        //
854        // TODO: If we had a consensus, we could assume it supported all required-relay-protocols.
855        // TODO prop364: When we implement CreateOneHop, we will want a Protocols argument here.
856        let protocols = tor_protover::Protocols::new();
857        let settings =
858            HopSettings::from_params_and_caps(HopNegotiationType::None, &params, &protocols)?;
859        let (tx, rx) = oneshot::channel();
860        self.circ
861            .control
862            .unbounded_send(CtrlMsg::Create {
863                recv_created: self.recvcreated,
864                handshake: CircuitHandshake::CreateFast,
865                settings,
866                done: tx,
867            })
868            .map_err(|_| Error::CircuitClosed)?;
869
870        rx.await.map_err(|_| Error::CircuitClosed)??;
871
872        self.circ.into_tunnel()
873    }
874
875    /// Use the most appropriate handshake to connect to the first hop of this circuit.
876    ///
877    /// Note that the provided 'target' must match the channel's target,
878    /// or the handshake will fail.
879    pub async fn create_firsthop<Tg>(
880        self,
881        target: &Tg,
882        params: CircParameters,
883    ) -> Result<ClientTunnel>
884    where
885        Tg: tor_linkspec::CircTarget,
886    {
887        #![allow(deprecated)]
888        // (See note in ClientCirc::extend.)
889        if target
890            .protovers()
891            .supports_named_subver(named::RELAY_NTORV3)
892        {
893            self.create_firsthop_ntor_v3(target, params).await
894        } else {
895            self.create_firsthop_ntor(target, params).await
896        }
897    }
898
899    /// Use the ntor handshake to connect to the first hop of this circuit.
900    ///
901    /// Note that the provided 'target' must match the channel's target,
902    /// or the handshake will fail.
903    #[deprecated(since = "1.6.1", note = "Use create_firsthop instead.")]
904    pub async fn create_firsthop_ntor<Tg>(
905        self,
906        target: &Tg,
907        params: CircParameters,
908    ) -> Result<ClientTunnel>
909    where
910        Tg: tor_linkspec::CircTarget,
911    {
912        let (tx, rx) = oneshot::channel();
913        let settings = HopSettings::from_params_and_caps(
914            HopNegotiationType::None,
915            &params,
916            target.protovers(),
917        )?;
918
919        self.circ
920            .control
921            .unbounded_send(CtrlMsg::Create {
922                recv_created: self.recvcreated,
923                handshake: CircuitHandshake::Ntor {
924                    public_key: NtorPublicKey {
925                        id: *target
926                            .rsa_identity()
927                            .ok_or(Error::MissingId(RelayIdType::Rsa))?,
928                        pk: *target.ntor_onion_key(),
929                    },
930                    ed_identity: *target
931                        .ed_identity()
932                        .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
933                },
934                settings,
935                done: tx,
936            })
937            .map_err(|_| Error::CircuitClosed)?;
938
939        rx.await.map_err(|_| Error::CircuitClosed)??;
940
941        self.circ.into_tunnel()
942    }
943
944    /// Use the ntor_v3 handshake to connect to the first hop of this circuit.
945    ///
946    /// Assumes that the target supports ntor_v3. The caller should verify
947    /// this before calling this function, e.g. by validating that the target
948    /// has advertised ["Relay=4"](https://spec.torproject.org/tor-spec/subprotocol-versioning.html#relay).
949    ///
950    /// Note that the provided 'target' must match the channel's target,
951    /// or the handshake will fail.
952    #[deprecated(since = "1.6.1", note = "Use create_firsthop instead.")]
953    pub async fn create_firsthop_ntor_v3<Tg>(
954        self,
955        target: &Tg,
956        params: CircParameters,
957    ) -> Result<ClientTunnel>
958    where
959        Tg: tor_linkspec::CircTarget,
960    {
961        let settings = HopSettings::from_params_and_caps(
962            HopNegotiationType::Full,
963            &params,
964            target.protovers(),
965        )?;
966        let (tx, rx) = oneshot::channel();
967
968        self.circ
969            .control
970            .unbounded_send(CtrlMsg::Create {
971                recv_created: self.recvcreated,
972                handshake: CircuitHandshake::NtorV3 {
973                    public_key: NtorV3PublicKey {
974                        id: *target
975                            .ed_identity()
976                            .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
977                        pk: *target.ntor_onion_key(),
978                    },
979                },
980                settings,
981                done: tx,
982            })
983            .map_err(|_| Error::CircuitClosed)?;
984
985        rx.await.map_err(|_| Error::CircuitClosed)??;
986
987        self.circ.into_tunnel()
988    }
989}
990
991#[cfg(test)]
992pub(crate) mod test {
993    // @@ begin test lint list maintained by maint/add_warning @@
994    #![allow(clippy::bool_assert_comparison)]
995    #![allow(clippy::clone_on_copy)]
996    #![allow(clippy::dbg_macro)]
997    #![allow(clippy::mixed_attributes_style)]
998    #![allow(clippy::print_stderr)]
999    #![allow(clippy::print_stdout)]
1000    #![allow(clippy::single_char_pattern)]
1001    #![allow(clippy::unwrap_used)]
1002    #![allow(clippy::unchecked_time_subtraction)]
1003    #![allow(clippy::useless_vec)]
1004    #![allow(clippy::needless_pass_by_value)]
1005    #![allow(clippy::string_slice)] // See arti#2571
1006    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
1007
1008    use super::*;
1009    use crate::channel::test::{CodecResult, new_reactor};
1010    use crate::circuit::CircuitRxSender;
1011    use crate::circuit::reactor::test::rmsg_to_ccmsg;
1012    use crate::circuit::test::fake_mpsc;
1013    use crate::client::circuit::padding::new_padding;
1014    use crate::client::stream::DataStream;
1015    use crate::congestion::params::CongestionControlParams;
1016    use crate::congestion::test_utils::params::build_cc_vegas_params;
1017    use crate::crypto::cell::RelayCellBody;
1018    use crate::crypto::handshake::ntor_v3::NtorV3Server;
1019    use crate::memquota::SpecificAccount as _;
1020    use crate::stream::flow_ctrl::params::FlowCtrlParameters;
1021    use crate::util::DummyTimeoutEstimator;
1022    use assert_matches::assert_matches;
1023    use chanmsg::{AnyChanMsg, Created2, CreatedFast};
1024    use futures::channel::mpsc::{Receiver, Sender};
1025    use futures::io::{AsyncReadExt, AsyncWriteExt};
1026    use futures::sink::SinkExt;
1027    use futures::stream::StreamExt;
1028    use hex_literal::hex;
1029    use std::collections::{HashMap, VecDeque};
1030    use std::fmt::Debug;
1031    use std::time::Duration;
1032    use tor_basic_utils::test_rng::testing_rng;
1033    use tor_cell::chancell::{AnyChanCell, BoxedCellBody, ChanCell, ChanCmd, msg as chanmsg};
1034    use tor_cell::relaycell::extend::{self as extend_ext, CircRequestExt, CircResponseExt};
1035    use tor_cell::relaycell::msg::SendmeTag;
1036    use tor_cell::relaycell::{
1037        AnyRelayMsgOuter, RelayCellFormat, RelayCmd, StreamId, msg as relaymsg, msg::AnyRelayMsg,
1038    };
1039    use tor_cell::relaycell::{RelayMsg, UnparsedRelayMsg};
1040    use tor_linkspec::OwnedCircTarget;
1041    use tor_rtcompat::Runtime;
1042    use tor_rtcompat::SpawnExt;
1043    use tracing::trace;
1044    use tracing_test::traced_test;
1045
1046    #[cfg(feature = "conflux")]
1047    use {
1048        crate::client::reactor::ConfluxHandshakeResult,
1049        crate::util::err::ConfluxHandshakeError,
1050        futures::future::FusedFuture,
1051        futures::lock::Mutex as AsyncMutex,
1052        std::pin::Pin,
1053        std::result::Result as StdResult,
1054        tor_cell::relaycell::conflux::{V1DesiredUx, V1LinkPayload, V1Nonce},
1055        tor_cell::relaycell::msg::ConfluxLink,
1056        tor_rtmock::MockRuntime,
1057    };
1058
1059    #[cfg(feature = "hs-service")]
1060    use crate::circuit::reactor::test::AllowAllStreamsFilter;
1061
1062    impl PendingClientTunnel {
1063        /// Testing only: Extract the circuit ID for this pending circuit.
1064        pub(crate) fn peek_circid(&self) -> CircId {
1065            self.circ.circid
1066        }
1067    }
1068
1069    impl ClientCirc {
1070        /// Testing only: Extract the circuit ID of this circuit.
1071        pub(crate) fn peek_circid(&self) -> CircId {
1072            self.circid
1073        }
1074    }
1075
1076    impl ClientTunnel {
1077        pub(crate) async fn resolve_last_hop(&self) -> TargetHop {
1078            let (sender, receiver) = oneshot::channel();
1079            let _ =
1080                self.as_single_circ()
1081                    .unwrap()
1082                    .command
1083                    .unbounded_send(CtrlCmd::ResolveTargetHop {
1084                        hop: TargetHop::LastHop,
1085                        done: sender,
1086                    });
1087            TargetHop::Hop(receiver.await.unwrap().unwrap())
1088        }
1089    }
1090
1091    // Example relay IDs and keys
1092    const EXAMPLE_SK: [u8; 32] =
1093        hex!("7789d92a89711a7e2874c61ea495452cfd48627b3ca2ea9546aafa5bf7b55803");
1094    const EXAMPLE_PK: [u8; 32] =
1095        hex!("395cb26b83b3cd4b91dba9913e562ae87d21ecdd56843da7ca939a6a69001253");
1096    const EXAMPLE_ED_ID: [u8; 32] = [6; 32];
1097    const EXAMPLE_RSA_ID: [u8; 20] = [10; 20];
1098
1099    /// return an example OwnedCircTarget that can get used for an ntor handshake.
1100    fn example_target() -> OwnedCircTarget {
1101        let mut builder = OwnedCircTarget::builder();
1102        builder
1103            .chan_target()
1104            .ed_identity(EXAMPLE_ED_ID.into())
1105            .rsa_identity(EXAMPLE_RSA_ID.into());
1106        builder
1107            .ntor_onion_key(EXAMPLE_PK.into())
1108            .protocols("FlowCtrl=1-2".parse().unwrap())
1109            .build()
1110            .unwrap()
1111    }
1112    fn example_ntor_key() -> crate::crypto::handshake::ntor::NtorSecretKey {
1113        crate::crypto::handshake::ntor::NtorSecretKey::new(
1114            EXAMPLE_SK.into(),
1115            EXAMPLE_PK.into(),
1116            EXAMPLE_RSA_ID.into(),
1117        )
1118    }
1119    fn example_ntor_v3_key() -> crate::crypto::handshake::ntor_v3::NtorV3SecretKey {
1120        crate::crypto::handshake::ntor_v3::NtorV3SecretKey::new(
1121            EXAMPLE_SK.into(),
1122            EXAMPLE_PK.into(),
1123            EXAMPLE_ED_ID.into(),
1124        )
1125    }
1126
1127    fn working_fake_channel<R: Runtime>(
1128        rt: &R,
1129    ) -> (Arc<Channel>, Receiver<AnyChanCell>, Sender<CodecResult>) {
1130        let (channel, chan_reactor, rx, tx) = new_reactor(rt.clone());
1131        rt.spawn(async {
1132            let _ignore = chan_reactor.run().await;
1133        })
1134        .unwrap();
1135        (channel, rx, tx)
1136    }
1137
1138    /// Which handshake type to use.
1139    #[derive(Copy, Clone)]
1140    enum HandshakeType {
1141        Fast,
1142        Ntor,
1143        NtorV3,
1144    }
1145
1146    #[allow(deprecated)]
1147    async fn test_create<R: Runtime>(rt: &R, handshake_type: HandshakeType, with_cc: bool) {
1148        // We want to try progressing from a pending circuit to a circuit
1149        // via a crate_fast handshake.
1150
1151        use crate::crypto::handshake::{ServerHandshake, fast::CreateFastServer, ntor::NtorServer};
1152
1153        let (chan, mut rx, _sink) = working_fake_channel(rt);
1154        let circid = CircId::new(128).unwrap();
1155        let (created_send, created_recv) = oneshot::channel();
1156        let (_circmsg_send, circmsg_recv) = fake_mpsc(64);
1157        let unique_id = UniqId::new(23, 17);
1158        let (padding_ctrl, padding_stream) = new_padding(DynTimeProvider::new(rt.clone()));
1159
1160        let (pending, reactor) = PendingClientTunnel::new(
1161            circid,
1162            chan,
1163            created_recv,
1164            circmsg_recv,
1165            unique_id,
1166            DynTimeProvider::new(rt.clone()),
1167            CircuitAccount::new_noop(),
1168            padding_ctrl,
1169            padding_stream,
1170            Arc::new(DummyTimeoutEstimator),
1171        );
1172
1173        rt.spawn(async {
1174            let _ignore = reactor.run().await;
1175        })
1176        .unwrap();
1177
1178        // Future to pretend to be a relay on the other end of the circuit.
1179        let simulate_relay_fut = async move {
1180            let mut rng = testing_rng();
1181            let create_cell = rx.next().await.unwrap();
1182            assert_eq!(create_cell.circid(), Some(circid));
1183            let reply = match handshake_type {
1184                HandshakeType::Fast => {
1185                    let cf = match create_cell.msg() {
1186                        AnyChanMsg::CreateFast(cf) => cf,
1187                        other => panic!("{:?}", other),
1188                    };
1189                    let (_, rep) = CreateFastServer::server(
1190                        &mut rng,
1191                        &mut |_: &()| Some(()),
1192                        &[()],
1193                        cf.handshake(),
1194                    )
1195                    .unwrap();
1196                    CreateResponse::CreatedFast(CreatedFast::new(rep))
1197                }
1198                HandshakeType::Ntor => {
1199                    let c2 = match create_cell.msg() {
1200                        AnyChanMsg::Create2(c2) => c2,
1201                        other => panic!("{:?}", other),
1202                    };
1203                    let (_, rep) = NtorServer::server(
1204                        &mut rng,
1205                        &mut |_: &()| Some(()),
1206                        &[example_ntor_key()],
1207                        c2.body(),
1208                    )
1209                    .unwrap();
1210                    CreateResponse::Created2(Created2::new(rep))
1211                }
1212                HandshakeType::NtorV3 => {
1213                    let c2 = match create_cell.msg() {
1214                        AnyChanMsg::Create2(c2) => c2,
1215                        other => panic!("{:?}", other),
1216                    };
1217                    let mut reply_fn = if with_cc {
1218                        |client_exts: &[CircRequestExt]| {
1219                            let _ = client_exts
1220                                .iter()
1221                                .find(|e| matches!(e, CircRequestExt::CcRequest(_)))
1222                                .expect("Client failed to request CC");
1223                            // This needs to be aligned to test_utils params
1224                            // value due to validation that needs it in range.
1225                            Some(vec![CircResponseExt::CcResponse(
1226                                extend_ext::CcResponse::new(31),
1227                            )])
1228                        }
1229                    } else {
1230                        |_: &_| Some(vec![])
1231                    };
1232                    let (_, rep) = NtorV3Server::server(
1233                        &mut rng,
1234                        &mut reply_fn,
1235                        &[example_ntor_v3_key()],
1236                        c2.body(),
1237                    )
1238                    .unwrap();
1239                    CreateResponse::Created2(Created2::new(rep))
1240                }
1241            };
1242            created_send.send(reply).unwrap();
1243        };
1244        // Future to pretend to be a client.
1245        let client_fut = async move {
1246            let target = example_target();
1247            let params = CircParameters::default();
1248            let ret = match handshake_type {
1249                HandshakeType::Fast => {
1250                    trace!("doing fast create");
1251                    pending.create_firsthop_fast(params).await
1252                }
1253                HandshakeType::Ntor => {
1254                    trace!("doing ntor create");
1255                    pending.create_firsthop_ntor(&target, params).await
1256                }
1257                HandshakeType::NtorV3 => {
1258                    let params = if with_cc {
1259                        // Setup CC vegas parameters.
1260                        CircParameters::new(
1261                            true,
1262                            build_cc_vegas_params(),
1263                            FlowCtrlParameters::defaults_for_tests(),
1264                        )
1265                    } else {
1266                        params
1267                    };
1268                    trace!("doing ntor_v3 create");
1269                    pending.create_firsthop_ntor_v3(&target, params).await
1270                }
1271            };
1272            trace!("create done: result {:?}", ret);
1273            ret
1274        };
1275
1276        let (circ, _) = futures::join!(client_fut, simulate_relay_fut);
1277
1278        let _circ = circ.unwrap();
1279
1280        // pfew!  We've build a circuit!  Let's make sure it has one hop.
1281        assert_eq!(_circ.n_hops().unwrap(), 1);
1282    }
1283
1284    #[traced_test]
1285    #[test]
1286    fn test_create_fast() {
1287        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1288            test_create(&rt, HandshakeType::Fast, false).await;
1289        });
1290    }
1291    #[traced_test]
1292    #[test]
1293    fn test_create_ntor() {
1294        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1295            test_create(&rt, HandshakeType::Ntor, false).await;
1296        });
1297    }
1298    #[traced_test]
1299    #[test]
1300    fn test_create_ntor_v3() {
1301        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1302            test_create(&rt, HandshakeType::NtorV3, false).await;
1303        });
1304    }
1305    #[traced_test]
1306    #[test]
1307    #[cfg(feature = "flowctl-cc")]
1308    fn test_create_ntor_v3_with_cc() {
1309        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1310            test_create(&rt, HandshakeType::NtorV3, true).await;
1311        });
1312    }
1313
1314    // An encryption layer that doesn't do any crypto.   Can be used
1315    // as inbound or outbound, but not both at once.
1316    pub(crate) struct DummyCrypto {
1317        counter_tag: [u8; 20],
1318        counter: u32,
1319        lasthop: bool,
1320    }
1321    impl DummyCrypto {
1322        fn next_tag(&mut self) -> SendmeTag {
1323            #![allow(clippy::identity_op)]
1324            self.counter_tag[0] = ((self.counter >> 0) & 255) as u8;
1325            self.counter_tag[1] = ((self.counter >> 8) & 255) as u8;
1326            self.counter_tag[2] = ((self.counter >> 16) & 255) as u8;
1327            self.counter_tag[3] = ((self.counter >> 24) & 255) as u8;
1328            self.counter += 1;
1329            self.counter_tag.into()
1330        }
1331    }
1332
1333    impl crate::crypto::cell::OutboundClientLayer for DummyCrypto {
1334        fn originate_for(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) -> SendmeTag {
1335            self.next_tag()
1336        }
1337        fn encrypt_outbound(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) {}
1338    }
1339    impl crate::crypto::cell::InboundClientLayer for DummyCrypto {
1340        fn decrypt_inbound(
1341            &mut self,
1342            _cmd: ChanCmd,
1343            _cell: &mut RelayCellBody,
1344        ) -> Option<SendmeTag> {
1345            if self.lasthop {
1346                Some(self.next_tag())
1347            } else {
1348                None
1349            }
1350        }
1351    }
1352    impl DummyCrypto {
1353        pub(crate) fn new(lasthop: bool) -> Self {
1354            DummyCrypto {
1355                counter_tag: [0; 20],
1356                counter: 0,
1357                lasthop,
1358            }
1359        }
1360    }
1361
1362    // Helper: set up a 3-hop circuit with no encryption, where the
1363    // next inbound message seems to come from hop next_msg_from
1364    async fn newtunnel_ext<R: Runtime>(
1365        rt: &R,
1366        unique_id: UniqId,
1367        chan: Arc<Channel>,
1368        hops: Vec<path::HopDetail>,
1369        next_msg_from: HopNum,
1370        params: CircParameters,
1371    ) -> (ClientTunnel, CircuitRxSender) {
1372        let circid = CircId::new(128).unwrap();
1373        let (_created_send, created_recv) = oneshot::channel();
1374        let (circmsg_send, circmsg_recv) = fake_mpsc(64);
1375        let (padding_ctrl, padding_stream) = new_padding(DynTimeProvider::new(rt.clone()));
1376
1377        let (pending, reactor) = PendingClientTunnel::new(
1378            circid,
1379            chan,
1380            created_recv,
1381            circmsg_recv,
1382            unique_id,
1383            DynTimeProvider::new(rt.clone()),
1384            CircuitAccount::new_noop(),
1385            padding_ctrl,
1386            padding_stream,
1387            Arc::new(DummyTimeoutEstimator),
1388        );
1389
1390        rt.spawn(async {
1391            let _ignore = reactor.run().await;
1392        })
1393        .unwrap();
1394        let PendingClientTunnel {
1395            circ,
1396            recvcreated: _,
1397        } = pending;
1398
1399        // TODO #1067: Support other formats
1400        let relay_cell_format = RelayCellFormat::V0;
1401
1402        let last_hop_num = u8::try_from(hops.len() - 1).unwrap();
1403        for (idx, peer_id) in hops.into_iter().enumerate() {
1404            let (tx, rx) = oneshot::channel();
1405            let idx = idx as u8;
1406
1407            circ.command
1408                .unbounded_send(CtrlCmd::AddFakeHop {
1409                    relay_cell_format,
1410                    fwd_lasthop: idx == last_hop_num,
1411                    rev_lasthop: idx == u8::from(next_msg_from),
1412                    peer_id,
1413                    params: params.clone(),
1414                    done: tx,
1415                })
1416                .unwrap();
1417            rx.await.unwrap().unwrap();
1418        }
1419        (circ.into_tunnel().unwrap(), circmsg_send)
1420    }
1421
1422    // Helper: set up a 3-hop circuit with no encryption, where the
1423    // next inbound message seems to come from hop next_msg_from
1424    async fn newtunnel<R: Runtime>(
1425        rt: &R,
1426        chan: Arc<Channel>,
1427    ) -> (Arc<ClientTunnel>, CircuitRxSender) {
1428        let hops = std::iter::repeat_with(|| {
1429            let peer_id = tor_linkspec::OwnedChanTarget::builder()
1430                .ed_identity([4; 32].into())
1431                .rsa_identity([5; 20].into())
1432                .build()
1433                .expect("Could not construct fake hop");
1434
1435            path::HopDetail::Relay(peer_id)
1436        })
1437        .take(3)
1438        .collect();
1439
1440        let unique_id = UniqId::new(23, 17);
1441        let (tunnel, circmsg_send) = newtunnel_ext(
1442            rt,
1443            unique_id,
1444            chan,
1445            hops,
1446            2.into(),
1447            CircParameters::default(),
1448        )
1449        .await;
1450
1451        (Arc::new(tunnel), circmsg_send)
1452    }
1453
1454    /// Create `n` distinct [`path::HopDetail`]s,
1455    /// with the specified `start_idx` for the dummy identities.
1456    fn hop_details(n: u8, start_idx: u8) -> Vec<path::HopDetail> {
1457        (0..n)
1458            .map(|idx| {
1459                let peer_id = tor_linkspec::OwnedChanTarget::builder()
1460                    .ed_identity([idx + start_idx; 32].into())
1461                    .rsa_identity([idx + start_idx + 1; 20].into())
1462                    .build()
1463                    .expect("Could not construct fake hop");
1464
1465                path::HopDetail::Relay(peer_id)
1466            })
1467            .collect()
1468    }
1469
1470    #[allow(deprecated)]
1471    async fn test_extend<R: Runtime>(rt: &R, handshake_type: HandshakeType) {
1472        use crate::crypto::handshake::{ServerHandshake, ntor::NtorServer};
1473
1474        let (chan, mut rx, _sink) = working_fake_channel(rt);
1475        let (tunnel, mut sink) = newtunnel(rt, chan).await;
1476        let circ = Arc::new(tunnel.as_single_circ().unwrap());
1477        let circid = circ.peek_circid();
1478        let params = CircParameters::default();
1479
1480        let extend_fut = async move {
1481            let target = example_target();
1482            match handshake_type {
1483                HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
1484                HandshakeType::Ntor => circ.extend_ntor(&target, params).await.unwrap(),
1485                HandshakeType::NtorV3 => circ.extend_ntor_v3(&target, params).await.unwrap(),
1486            };
1487            circ // gotta keep the circ alive, or the reactor would exit.
1488        };
1489        let reply_fut = async move {
1490            // We've disabled encryption on this circuit, so we can just
1491            // read the extend2 cell.
1492            let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1493            assert_eq!(id, Some(circid));
1494            let rmsg = match chmsg {
1495                AnyChanMsg::RelayEarly(r) => {
1496                    AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1497                        .unwrap()
1498                }
1499                other => panic!("{:?}", other),
1500            };
1501            let e2 = match rmsg.msg() {
1502                AnyRelayMsg::Extend2(e2) => e2,
1503                other => panic!("{:?}", other),
1504            };
1505            let mut rng = testing_rng();
1506            let reply = match handshake_type {
1507                HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
1508                HandshakeType::Ntor => {
1509                    let (_keygen, reply) = NtorServer::server(
1510                        &mut rng,
1511                        &mut |_: &()| Some(()),
1512                        &[example_ntor_key()],
1513                        e2.handshake(),
1514                    )
1515                    .unwrap();
1516                    reply
1517                }
1518                HandshakeType::NtorV3 => {
1519                    let (_keygen, reply) = NtorV3Server::server(
1520                        &mut rng,
1521                        &mut |_: &[CircRequestExt]| Some(vec![]),
1522                        &[example_ntor_v3_key()],
1523                        e2.handshake(),
1524                    )
1525                    .unwrap();
1526                    reply
1527                }
1528            };
1529
1530            let extended2 = relaymsg::Extended2::new(reply).into();
1531            sink.send(rmsg_to_ccmsg(None, extended2, false))
1532                .await
1533                .unwrap();
1534            (sink, rx) // gotta keep the sink and receiver alive, or the reactor will exit.
1535        };
1536
1537        let (circ, (_sink, _rx)) = futures::join!(extend_fut, reply_fut);
1538
1539        // Did we really add another hop?
1540        assert_eq!(circ.n_hops().unwrap(), 4);
1541
1542        // Do the path accessors report a reasonable outcome?
1543        {
1544            let path = circ.single_path().unwrap();
1545            let path = path
1546                .all_hops()
1547                .filter_map(|hop| match hop {
1548                    path::HopDetail::Relay(r) => Some(r),
1549                    #[cfg(feature = "hs-common")]
1550                    path::HopDetail::Virtual => None,
1551                })
1552                .collect::<Vec<_>>();
1553
1554            assert_eq!(path.len(), 4);
1555            use tor_linkspec::HasRelayIds;
1556            assert_eq!(path[3].ed_identity(), example_target().ed_identity());
1557            assert_ne!(path[0].ed_identity(), example_target().ed_identity());
1558        }
1559        {
1560            let path = circ.single_path().unwrap();
1561            assert_eq!(path.n_hops(), 4);
1562            use tor_linkspec::HasRelayIds;
1563            assert_eq!(
1564                path.hops()[3].as_chan_target().unwrap().ed_identity(),
1565                example_target().ed_identity()
1566            );
1567            assert_ne!(
1568                path.hops()[0].as_chan_target().unwrap().ed_identity(),
1569                example_target().ed_identity()
1570            );
1571        }
1572    }
1573
1574    #[traced_test]
1575    #[test]
1576    fn test_extend_ntor() {
1577        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1578            test_extend(&rt, HandshakeType::Ntor).await;
1579        });
1580    }
1581
1582    #[traced_test]
1583    #[test]
1584    fn test_extend_ntor_v3() {
1585        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1586            test_extend(&rt, HandshakeType::NtorV3).await;
1587        });
1588    }
1589
1590    #[allow(deprecated)]
1591    async fn bad_extend_test_impl<R: Runtime>(
1592        rt: &R,
1593        reply_hop: HopNum,
1594        bad_reply: AnyChanMsg,
1595    ) -> Error {
1596        let (chan, mut rx, _sink) = working_fake_channel(rt);
1597        let hops = std::iter::repeat_with(|| {
1598            let peer_id = tor_linkspec::OwnedChanTarget::builder()
1599                .ed_identity([4; 32].into())
1600                .rsa_identity([5; 20].into())
1601                .build()
1602                .expect("Could not construct fake hop");
1603
1604            path::HopDetail::Relay(peer_id)
1605        })
1606        .take(3)
1607        .collect();
1608
1609        let unique_id = UniqId::new(23, 17);
1610        let (tunnel, mut sink) = newtunnel_ext(
1611            rt,
1612            unique_id,
1613            chan,
1614            hops,
1615            reply_hop,
1616            CircParameters::default(),
1617        )
1618        .await;
1619        let params = CircParameters::default();
1620
1621        let target = example_target();
1622        let reply_task_handle = rt
1623            .spawn_with_handle(async move {
1624                // Wait for a cell, and make sure it's EXTEND2.
1625                let (_circid, chanmsg) = rx.next().await.unwrap().into_circid_and_msg();
1626                let AnyChanMsg::RelayEarly(relay_early) = chanmsg else {
1627                    panic!("unexpected message {chanmsg:?}");
1628                };
1629                let relaymsg = UnparsedRelayMsg::from_singleton_body(
1630                    RelayCellFormat::V0,
1631                    relay_early.into_relay_body(),
1632                )
1633                .unwrap();
1634                assert_eq!(relaymsg.cmd(), RelayCmd::EXTEND2);
1635
1636                // Send back the "bad_reply."
1637                sink.send(bad_reply).await.unwrap();
1638                sink
1639            })
1640            .unwrap();
1641        let outcome = tunnel
1642            .as_single_circ()
1643            .unwrap()
1644            .extend_ntor(&target, params)
1645            .await;
1646        let _sink = reply_task_handle.await;
1647
1648        assert_eq!(tunnel.n_hops().unwrap(), 3);
1649        assert!(outcome.is_err());
1650        outcome.unwrap_err()
1651    }
1652
1653    #[traced_test]
1654    #[test]
1655    fn bad_extend_wronghop() {
1656        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1657            let extended2 = relaymsg::Extended2::new(vec![]).into();
1658            let cc = rmsg_to_ccmsg(None, extended2, false);
1659
1660            let error = bad_extend_test_impl(&rt, 1.into(), cc).await;
1661            // This case shows up as a CircDestroy, since a message sent
1662            // from the wrong hop won't even be delivered to the extend
1663            // code's meta-handler.  Instead the unexpected message will cause
1664            // the circuit to get torn down.
1665            match error {
1666                Error::CircuitClosed => {}
1667                x => panic!("got other error: {}", x),
1668            }
1669        });
1670    }
1671
1672    #[traced_test]
1673    #[test]
1674    fn bad_extend_wrongtype() {
1675        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1676            let extended = relaymsg::Extended::new(vec![7; 200]).into();
1677            let cc = rmsg_to_ccmsg(None, extended, false);
1678
1679            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
1680            match error {
1681                Error::BytesErr {
1682                    err: tor_bytes::Error::InvalidMessage(_),
1683                    object: "extended2 message",
1684                } => {}
1685                other => panic!("{:?}", other),
1686            }
1687        });
1688    }
1689
1690    #[traced_test]
1691    #[test]
1692    fn bad_extend_destroy() {
1693        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1694            let cc = AnyChanMsg::Destroy(chanmsg::Destroy::new(4.into()));
1695            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
1696            match error {
1697                Error::CircuitClosed => {}
1698                other => panic!("{:?}", other),
1699            }
1700        });
1701    }
1702
1703    #[traced_test]
1704    #[test]
1705    fn bad_extend_crypto() {
1706        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1707            let extended2 = relaymsg::Extended2::new(vec![99; 256]).into();
1708            let cc = rmsg_to_ccmsg(None, extended2, false);
1709            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
1710            assert_matches!(error, Error::BadCircHandshakeAuth);
1711        });
1712    }
1713
1714    #[traced_test]
1715    #[test]
1716    fn begindir() {
1717        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1718            let (chan, mut rx, _sink) = working_fake_channel(&rt);
1719            let (tunnel, mut sink) = newtunnel(&rt, chan).await;
1720            let circ = tunnel.as_single_circ().unwrap();
1721            let circid = circ.peek_circid();
1722
1723            let begin_and_send_fut = async move {
1724                // Here we'll say we've got a circuit, and we want to
1725                // make a simple BEGINDIR request with it.
1726                let mut stream = tunnel.begin_dir_stream().await.unwrap();
1727                stream.write_all(b"HTTP/1.0 GET /\r\n").await.unwrap();
1728                stream.flush().await.unwrap();
1729                let mut buf = [0_u8; 1024];
1730                let n = stream.read(&mut buf).await.unwrap();
1731                assert_eq!(&buf[..n], b"HTTP/1.0 404 Not found\r\n");
1732                let n = stream.read(&mut buf).await.unwrap();
1733                assert_eq!(n, 0);
1734                stream
1735            };
1736            let reply_fut = async move {
1737                // We've disabled encryption on this circuit, so we can just
1738                // read the begindir cell.
1739                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1740                assert_eq!(id, Some(circid));
1741                let rmsg = match chmsg {
1742                    AnyChanMsg::Relay(r) => {
1743                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1744                            .unwrap()
1745                    }
1746                    other => panic!("{:?}", other),
1747                };
1748                let (streamid, rmsg) = rmsg.into_streamid_and_msg();
1749                assert_matches!(rmsg, AnyRelayMsg::BeginDir(_));
1750
1751                // Reply with a Connected cell to indicate success.
1752                let connected = relaymsg::Connected::new_empty().into();
1753                sink.send(rmsg_to_ccmsg(streamid, connected, false))
1754                    .await
1755                    .unwrap();
1756
1757                // Now read a DATA cell...
1758                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1759                assert_eq!(id, Some(circid));
1760                let rmsg = match chmsg {
1761                    AnyChanMsg::Relay(r) => {
1762                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1763                            .unwrap()
1764                    }
1765                    other => panic!("{:?}", other),
1766                };
1767                let (streamid_2, rmsg) = rmsg.into_streamid_and_msg();
1768                assert_eq!(streamid_2, streamid);
1769                if let AnyRelayMsg::Data(d) = rmsg {
1770                    assert_eq!(d.as_ref(), &b"HTTP/1.0 GET /\r\n"[..]);
1771                } else {
1772                    panic!();
1773                }
1774
1775                // Write another data cell in reply!
1776                let data = relaymsg::Data::new(b"HTTP/1.0 404 Not found\r\n")
1777                    .unwrap()
1778                    .into();
1779                sink.send(rmsg_to_ccmsg(streamid, data, false))
1780                    .await
1781                    .unwrap();
1782
1783                // Send an END cell to say that the conversation is over.
1784                let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into();
1785                sink.send(rmsg_to_ccmsg(streamid, end, false))
1786                    .await
1787                    .unwrap();
1788
1789                (rx, sink) // gotta keep these alive, or the reactor will exit.
1790            };
1791
1792            let (_stream, (_rx, _sink)) = futures::join!(begin_and_send_fut, reply_fut);
1793        });
1794    }
1795
1796    // Test: close a stream, either by dropping it or by calling AsyncWriteExt::close.
1797    fn close_stream_helper(by_drop: bool) {
1798        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1799            let (chan, mut rx, _sink) = working_fake_channel(&rt);
1800            let (tunnel, mut sink) = newtunnel(&rt, chan).await;
1801
1802            let stream_fut = async move {
1803                let stream = tunnel
1804                    .begin_stream("www.example.com", 80, None)
1805                    .await
1806                    .unwrap();
1807
1808                let (r, mut w) = stream.split();
1809                if by_drop {
1810                    // Drop the writer and the reader, which should close the stream.
1811                    drop(r);
1812                    drop(w);
1813                    (None, tunnel) // make sure to keep the circuit alive
1814                } else {
1815                    // Call close on the writer, while keeping the reader alive.
1816                    w.close().await.unwrap();
1817                    (Some(r), tunnel)
1818                }
1819            };
1820            let handler_fut = async {
1821                // Read the BEGIN message.
1822                let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
1823                let rmsg = match msg {
1824                    AnyChanMsg::Relay(r) => {
1825                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1826                            .unwrap()
1827                    }
1828                    other => panic!("{:?}", other),
1829                };
1830                let (streamid, rmsg) = rmsg.into_streamid_and_msg();
1831                assert_eq!(rmsg.cmd(), RelayCmd::BEGIN);
1832
1833                // Reply with a CONNECTED.
1834                let connected =
1835                    relaymsg::Connected::new_with_addr("10.0.0.1".parse().unwrap(), 1234).into();
1836                sink.send(rmsg_to_ccmsg(streamid, connected, false))
1837                    .await
1838                    .unwrap();
1839
1840                // Expect an END.
1841                let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
1842                let rmsg = match msg {
1843                    AnyChanMsg::Relay(r) => {
1844                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1845                            .unwrap()
1846                    }
1847                    other => panic!("{:?}", other),
1848                };
1849                let (_, rmsg) = rmsg.into_streamid_and_msg();
1850                assert_eq!(rmsg.cmd(), RelayCmd::END);
1851
1852                (rx, sink) // keep these alive or the reactor will exit.
1853            };
1854
1855            let ((_opt_reader, _circ), (_rx, _sink)) = futures::join!(stream_fut, handler_fut);
1856        });
1857    }
1858
1859    #[traced_test]
1860    #[test]
1861    fn drop_stream() {
1862        close_stream_helper(true);
1863    }
1864
1865    #[traced_test]
1866    #[test]
1867    fn close_stream() {
1868        close_stream_helper(false);
1869    }
1870
1871    #[traced_test]
1872    #[test]
1873    fn expire_halfstreams() {
1874        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
1875            let (chan, mut rx, _sink) = working_fake_channel(&rt);
1876            let (tunnel, mut sink) = newtunnel(&rt, chan).await;
1877
1878            let client_fut = async move {
1879                let stream = tunnel
1880                    .begin_stream("www.example.com", 80, None)
1881                    .await
1882                    .unwrap();
1883
1884                let (r, mut w) = stream.split();
1885                // Close the stream
1886                w.close().await.unwrap();
1887                (Some(r), tunnel)
1888            };
1889            let exit_fut = async {
1890                // Read the BEGIN message.
1891                let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
1892                let rmsg = match msg {
1893                    AnyChanMsg::Relay(r) => {
1894                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1895                            .unwrap()
1896                    }
1897                    other => panic!("{:?}", other),
1898                };
1899                let (streamid, rmsg) = rmsg.into_streamid_and_msg();
1900                assert_eq!(rmsg.cmd(), RelayCmd::BEGIN);
1901
1902                // Reply with a CONNECTED.
1903                let connected =
1904                    relaymsg::Connected::new_with_addr("10.0.0.1".parse().unwrap(), 1234).into();
1905                sink.send(rmsg_to_ccmsg(streamid, connected, false))
1906                    .await
1907                    .unwrap();
1908
1909                (rx, streamid, sink) // keep these alive or the reactor will exit.
1910            };
1911
1912            let ((_opt_reader, tunnel), (_rx, streamid, mut sink)) =
1913                futures::join!(client_fut, exit_fut);
1914
1915            // Progress all futures to ensure the reactor has a chance to notice
1916            // we closed the stream.
1917            rt.progress_until_stalled().await;
1918
1919            // The tunnel should remain open
1920            assert!(!tunnel.is_closed());
1921
1922            // Write some more data on the half-stream.
1923            // The half-stream hasn't expired yet, so it will simply be ignored.
1924            let data = relaymsg::Data::new(b"hello").unwrap();
1925            sink.send(rmsg_to_ccmsg(streamid, AnyRelayMsg::Data(data), false))
1926                .await
1927                .unwrap();
1928            rt.progress_until_stalled().await;
1929
1930            // This was not a protocol violation, so the tunnel is still alive.
1931            assert!(!tunnel.is_closed());
1932
1933            // Advance the time to cause the half-streams to get garbage collected.
1934            //
1935            // Advancing it by 2 * CBT ought to be enough, because the RTT estimator
1936            // won't yet have an estimate for the max_rtt.
1937            let stream_timeout = DummyTimeoutEstimator.circuit_build_timeout(3);
1938            rt.advance_by(2 * stream_timeout).await;
1939
1940            // Sending this cell is a protocol violation now
1941            // that the half-stream expired.
1942            let data = relaymsg::Data::new(b"hello").unwrap();
1943            sink.send(rmsg_to_ccmsg(streamid, AnyRelayMsg::Data(data), false))
1944                .await
1945                .unwrap();
1946            rt.progress_until_stalled().await;
1947
1948            // The tunnel shut down because of the proto violation.
1949            assert!(tunnel.is_closed());
1950        });
1951    }
1952
1953    // Set up a circuit and stream that expects some incoming SENDMEs.
1954    async fn setup_incoming_sendme_case<R: Runtime>(
1955        rt: &R,
1956        n_to_send: usize,
1957    ) -> (
1958        Arc<ClientTunnel>,
1959        DataStream,
1960        CircuitRxSender,
1961        Option<StreamId>,
1962        usize,
1963        Receiver<AnyChanCell>,
1964        Sender<CodecResult>,
1965    ) {
1966        let (chan, mut rx, sink2) = working_fake_channel(rt);
1967        let (tunnel, mut sink) = newtunnel(rt, chan).await;
1968        let circid = tunnel.as_single_circ().unwrap().peek_circid();
1969
1970        let begin_and_send_fut = {
1971            let tunnel = tunnel.clone();
1972            async move {
1973                // Take our circuit and make a stream on it.
1974                let mut stream = tunnel
1975                    .begin_stream("www.example.com", 443, None)
1976                    .await
1977                    .unwrap();
1978                let junk = [0_u8; 1024];
1979                let mut remaining = n_to_send;
1980                while remaining > 0 {
1981                    let n = std::cmp::min(remaining, junk.len());
1982                    stream.write_all(&junk[..n]).await.unwrap();
1983                    remaining -= n;
1984                }
1985                stream.flush().await.unwrap();
1986                stream
1987            }
1988        };
1989
1990        let receive_fut = async move {
1991            // Read the begin cell.
1992            let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1993            let rmsg = match chmsg {
1994                AnyChanMsg::Relay(r) => {
1995                    AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1996                        .unwrap()
1997                }
1998                other => panic!("{:?}", other),
1999            };
2000            let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2001            assert_matches!(rmsg, AnyRelayMsg::Begin(_));
2002            // Reply with a connected cell...
2003            let connected = relaymsg::Connected::new_empty().into();
2004            sink.send(rmsg_to_ccmsg(streamid, connected, false))
2005                .await
2006                .unwrap();
2007            // Now read bytes from the stream until we have them all.
2008            let mut bytes_received = 0_usize;
2009            let mut cells_received = 0_usize;
2010            while bytes_received < n_to_send {
2011                // Read a data cell, and remember how much we got.
2012                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2013                assert_eq!(id, Some(circid));
2014
2015                let rmsg = match chmsg {
2016                    AnyChanMsg::Relay(r) => {
2017                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2018                            .unwrap()
2019                    }
2020                    other => panic!("{:?}", other),
2021                };
2022                let (streamid2, rmsg) = rmsg.into_streamid_and_msg();
2023                assert_eq!(streamid2, streamid);
2024                if let AnyRelayMsg::Data(dat) = rmsg {
2025                    cells_received += 1;
2026                    bytes_received += dat.as_ref().len();
2027                } else {
2028                    panic!();
2029                }
2030            }
2031
2032            (sink, streamid, cells_received, rx)
2033        };
2034
2035        let (stream, (sink, streamid, cells_received, rx)) =
2036            futures::join!(begin_and_send_fut, receive_fut);
2037
2038        (tunnel, stream, sink, streamid, cells_received, rx, sink2)
2039    }
2040
2041    #[traced_test]
2042    #[test]
2043    fn accept_valid_sendme() {
2044        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2045            let (tunnel, _stream, mut sink, streamid, cells_received, _rx, _sink2) =
2046                setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
2047            let circ = tunnel.as_single_circ().unwrap();
2048
2049            assert_eq!(cells_received, 301);
2050
2051            // Make sure that the circuit is indeed expecting the right sendmes
2052            {
2053                let (tx, rx) = oneshot::channel();
2054                circ.command
2055                    .unbounded_send(CtrlCmd::QuerySendWindow {
2056                        hop: 2.into(),
2057                        leg: tunnel.unique_id(),
2058                        done: tx,
2059                    })
2060                    .unwrap();
2061                let (window, tags) = rx.await.unwrap().unwrap();
2062                assert_eq!(window, 1000 - 301);
2063                assert_eq!(tags.len(), 3);
2064                // 100
2065                assert_eq!(
2066                    tags[0],
2067                    SendmeTag::from(hex!("6400000000000000000000000000000000000000"))
2068                );
2069                // 200
2070                assert_eq!(
2071                    tags[1],
2072                    SendmeTag::from(hex!("c800000000000000000000000000000000000000"))
2073                );
2074                // 300
2075                assert_eq!(
2076                    tags[2],
2077                    SendmeTag::from(hex!("2c01000000000000000000000000000000000000"))
2078                );
2079            }
2080
2081            let reply_with_sendme_fut = async move {
2082                // make and send a circuit-level sendme.
2083                let c_sendme =
2084                    relaymsg::Sendme::new_tag(hex!("6400000000000000000000000000000000000000"))
2085                        .into();
2086                sink.send(rmsg_to_ccmsg(None, c_sendme, false))
2087                    .await
2088                    .unwrap();
2089
2090                // Make and send a stream-level sendme.
2091                let s_sendme = relaymsg::Sendme::new_empty().into();
2092                sink.send(rmsg_to_ccmsg(streamid, s_sendme, false))
2093                    .await
2094                    .unwrap();
2095
2096                sink
2097            };
2098
2099            let _sink = reply_with_sendme_fut.await;
2100
2101            rt.advance_until_stalled().await;
2102
2103            // Now make sure that the circuit is still happy, and its
2104            // window is updated.
2105            {
2106                let (tx, rx) = oneshot::channel();
2107                circ.command
2108                    .unbounded_send(CtrlCmd::QuerySendWindow {
2109                        hop: 2.into(),
2110                        leg: tunnel.unique_id(),
2111                        done: tx,
2112                    })
2113                    .unwrap();
2114                let (window, _tags) = rx.await.unwrap().unwrap();
2115                assert_eq!(window, 1000 - 201);
2116            }
2117        });
2118    }
2119
2120    #[traced_test]
2121    #[test]
2122    fn invalid_circ_sendme() {
2123        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2124            // Same setup as accept_valid_sendme() test above but try giving
2125            // a sendme with the wrong tag.
2126
2127            let (tunnel, _stream, mut sink, _streamid, _cells_received, _rx, _sink2) =
2128                setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
2129
2130            let reply_with_sendme_fut = async move {
2131                // make and send a circuit-level sendme with a bad tag.
2132                let c_sendme =
2133                    relaymsg::Sendme::new_tag(hex!("FFFF0000000000000000000000000000000000FF"))
2134                        .into();
2135                sink.send(rmsg_to_ccmsg(None, c_sendme, false))
2136                    .await
2137                    .unwrap();
2138                sink
2139            };
2140
2141            let _sink = reply_with_sendme_fut.await;
2142
2143            // Check whether the reactor dies as a result of receiving invalid data.
2144            rt.advance_until_stalled().await;
2145            assert!(tunnel.is_closed());
2146        });
2147    }
2148
2149    #[traced_test]
2150    #[test]
2151    fn test_busy_stream_fairness() {
2152        // Number of streams to use.
2153        const N_STREAMS: usize = 3;
2154        // Number of cells (roughly) for each stream to send.
2155        const N_CELLS: usize = 20;
2156        // Number of bytes that *each* stream will send, and that we'll read
2157        // from the channel.
2158        const N_BYTES: usize = relaymsg::Data::MAXLEN_V0 * N_CELLS;
2159        // Ignoring cell granularity, with perfect fairness we'd expect
2160        // `N_BYTES/N_STREAMS` bytes from each stream.
2161        //
2162        // We currently allow for up to a full cell less than that.  This is
2163        // somewhat arbitrary and can be changed as needed, since we don't
2164        // provide any specific fairness guarantees.
2165        const MIN_EXPECTED_BYTES_PER_STREAM: usize =
2166            N_BYTES / N_STREAMS - relaymsg::Data::MAXLEN_V0;
2167
2168        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2169            let (chan, mut rx, _sink) = working_fake_channel(&rt);
2170            let (tunnel, mut sink) = newtunnel(&rt, chan).await;
2171
2172            // Run clients in a single task, doing our own round-robin
2173            // scheduling of writes to the reactor. Conversely, if we were to
2174            // put each client in its own task, we would be at the mercy of
2175            // how fairly the runtime schedules the client tasks, which is outside
2176            // the scope of this test.
2177            rt.spawn({
2178                // Clone the circuit to keep it alive after writers have
2179                // finished with it.
2180                let tunnel = tunnel.clone();
2181                async move {
2182                    let mut clients = VecDeque::new();
2183                    struct Client {
2184                        stream: DataStream,
2185                        to_write: &'static [u8],
2186                    }
2187                    for _ in 0..N_STREAMS {
2188                        clients.push_back(Client {
2189                            stream: tunnel
2190                                .begin_stream("www.example.com", 80, None)
2191                                .await
2192                                .unwrap(),
2193                            to_write: &[0_u8; N_BYTES][..],
2194                        });
2195                    }
2196                    while let Some(mut client) = clients.pop_front() {
2197                        if client.to_write.is_empty() {
2198                            // Client is done. Don't put back in queue.
2199                            continue;
2200                        }
2201                        let written = client.stream.write(client.to_write).await.unwrap();
2202                        client.to_write = &client.to_write[written..];
2203                        clients.push_back(client);
2204                    }
2205                }
2206            })
2207            .unwrap();
2208
2209            let channel_handler_fut = async {
2210                let mut stream_bytes_received = HashMap::<StreamId, usize>::new();
2211                let mut total_bytes_received = 0;
2212
2213                loop {
2214                    let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
2215                    let rmsg = match msg {
2216                        AnyChanMsg::Relay(r) => AnyRelayMsgOuter::decode_singleton(
2217                            RelayCellFormat::V0,
2218                            r.into_relay_body(),
2219                        )
2220                        .unwrap(),
2221                        other => panic!("Unexpected chanmsg: {other:?}"),
2222                    };
2223                    let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2224                    match rmsg.cmd() {
2225                        RelayCmd::BEGIN => {
2226                            // Add an entry for this stream.
2227                            let prev = stream_bytes_received.insert(streamid.unwrap(), 0);
2228                            assert_eq!(prev, None);
2229                            // Reply with a CONNECTED.
2230                            let connected = relaymsg::Connected::new_with_addr(
2231                                "10.0.0.1".parse().unwrap(),
2232                                1234,
2233                            )
2234                            .into();
2235                            sink.send(rmsg_to_ccmsg(streamid, connected, false))
2236                                .await
2237                                .unwrap();
2238                        }
2239                        RelayCmd::DATA => {
2240                            let data_msg = relaymsg::Data::try_from(rmsg).unwrap();
2241                            let nbytes = data_msg.as_ref().len();
2242                            total_bytes_received += nbytes;
2243                            let streamid = streamid.unwrap();
2244                            let stream_bytes = stream_bytes_received.get_mut(&streamid).unwrap();
2245                            *stream_bytes += nbytes;
2246                            if total_bytes_received >= N_BYTES {
2247                                break;
2248                            }
2249                        }
2250                        RelayCmd::END => {
2251                            // Stream is done. If fair scheduling is working as
2252                            // expected we *probably* shouldn't get here, but we
2253                            // can ignore it and save the failure until we
2254                            // actually have the final stats.
2255                            continue;
2256                        }
2257                        other => {
2258                            panic!("Unexpected command {other:?}");
2259                        }
2260                    }
2261                }
2262
2263                // Return our stats, along with the `rx` and `sink` to keep the
2264                // reactor alive (since clients could still be writing).
2265                (total_bytes_received, stream_bytes_received, rx, sink)
2266            };
2267
2268            let (total_bytes_received, stream_bytes_received, _rx, _sink) =
2269                channel_handler_fut.await;
2270            assert_eq!(stream_bytes_received.len(), N_STREAMS);
2271            for (sid, stream_bytes) in stream_bytes_received {
2272                assert!(
2273                    stream_bytes >= MIN_EXPECTED_BYTES_PER_STREAM,
2274                    "Only {stream_bytes} of {total_bytes_received} bytes received from {N_STREAMS} came from {sid:?}; expected at least {MIN_EXPECTED_BYTES_PER_STREAM}"
2275                );
2276            }
2277        });
2278    }
2279
2280    #[test]
2281    fn basic_params() {
2282        use super::CircParameters;
2283        let mut p = CircParameters::default();
2284        assert!(p.extend_by_ed25519_id);
2285
2286        p.extend_by_ed25519_id = false;
2287        assert!(!p.extend_by_ed25519_id);
2288    }
2289
2290    #[traced_test]
2291    #[test]
2292    #[cfg(feature = "hs-service")]
2293    fn allow_stream_requests_twice() {
2294        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2295            let (chan, _rx, _sink) = working_fake_channel(&rt);
2296            let (tunnel, _send) = newtunnel(&rt, chan).await;
2297
2298            let _incoming = tunnel
2299                .allow_stream_requests(
2300                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2301                    tunnel.resolve_last_hop().await,
2302                    AllowAllStreamsFilter,
2303                )
2304                .await
2305                .unwrap();
2306
2307            let incoming = tunnel
2308                .allow_stream_requests(
2309                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2310                    tunnel.resolve_last_hop().await,
2311                    AllowAllStreamsFilter,
2312                )
2313                .await;
2314
2315            // There can only be one IncomingStream at a time on any given circuit.
2316            assert!(incoming.is_err());
2317        });
2318    }
2319
2320    #[traced_test]
2321    #[test]
2322    #[cfg(feature = "hs-service")]
2323    fn allow_stream_requests() {
2324        use tor_cell::relaycell::msg::BeginFlags;
2325
2326        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2327            const TEST_DATA: &[u8] = b"ping";
2328
2329            let (chan, _rx, _sink) = working_fake_channel(&rt);
2330            let (tunnel, mut send) = newtunnel(&rt, chan).await;
2331
2332            let rfmt = RelayCellFormat::V0;
2333
2334            // A helper channel for coordinating the "client"/"service" interaction
2335            let (tx, rx) = oneshot::channel();
2336            let mut incoming = tunnel
2337                .allow_stream_requests(
2338                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2339                    tunnel.resolve_last_hop().await,
2340                    AllowAllStreamsFilter,
2341                )
2342                .await
2343                .unwrap();
2344
2345            let simulate_service = async move {
2346                let stream = incoming.next().await.unwrap();
2347                let mut data_stream = stream
2348                    .accept_data(relaymsg::Connected::new_empty())
2349                    .await
2350                    .unwrap();
2351                // Notify the client task we're ready to accept DATA cells
2352                tx.send(()).unwrap();
2353
2354                // Read the data the client sent us
2355                let mut buf = [0_u8; TEST_DATA.len()];
2356                data_stream.read_exact(&mut buf).await.unwrap();
2357                assert_eq!(&buf, TEST_DATA);
2358
2359                tunnel
2360            };
2361
2362            let simulate_client = async move {
2363                let begin = relaymsg::Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2364                let body: BoxedCellBody =
2365                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2366                        .encode(rfmt, &mut testing_rng())
2367                        .unwrap();
2368                let begin_msg = chanmsg::Relay::from(body);
2369
2370                // Pretend to be a client at the other end of the circuit sending a begin cell
2371                send.send(AnyChanMsg::Relay(begin_msg)).await.unwrap();
2372
2373                // Wait until the service is ready to accept data
2374                // TODO: we shouldn't need to wait! This is needed because the service will reject
2375                // any DATA cells that aren't associated with a known stream. We need to wait until
2376                // the service receives our BEGIN cell (and the reactor updates hop.map with the
2377                // new stream).
2378                rx.await.unwrap();
2379                // Now send some data along the newly established circuit..
2380                let data = relaymsg::Data::new(TEST_DATA).unwrap();
2381                let body: BoxedCellBody =
2382                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
2383                        .encode(rfmt, &mut testing_rng())
2384                        .unwrap();
2385                let data_msg = chanmsg::Relay::from(body);
2386
2387                send.send(AnyChanMsg::Relay(data_msg)).await.unwrap();
2388                send
2389            };
2390
2391            let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2392        });
2393    }
2394
2395    #[traced_test]
2396    #[test]
2397    #[cfg(feature = "hs-service")]
2398    fn accept_stream_after_reject() {
2399        use tor_cell::relaycell::msg::AnyRelayMsg;
2400        use tor_cell::relaycell::msg::BeginFlags;
2401        use tor_cell::relaycell::msg::EndReason;
2402
2403        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2404            const TEST_DATA: &[u8] = b"ping";
2405            const STREAM_COUNT: usize = 2;
2406            let rfmt = RelayCellFormat::V0;
2407
2408            let (chan, _rx, _sink) = working_fake_channel(&rt);
2409            let (tunnel, mut send) = newtunnel(&rt, chan).await;
2410
2411            // A helper channel for coordinating the "client"/"service" interaction
2412            let (mut tx, mut rx) = mpsc::channel(STREAM_COUNT);
2413
2414            let mut incoming = tunnel
2415                .allow_stream_requests(
2416                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2417                    tunnel.resolve_last_hop().await,
2418                    AllowAllStreamsFilter,
2419                )
2420                .await
2421                .unwrap();
2422
2423            let simulate_service = async move {
2424                // Process 2 incoming streams
2425                for i in 0..STREAM_COUNT {
2426                    let stream = incoming.next().await.unwrap();
2427
2428                    // Reject the first one
2429                    if i == 0 {
2430                        stream
2431                            .reject(relaymsg::End::new_with_reason(EndReason::INTERNAL))
2432                            .await
2433                            .unwrap();
2434                        // Notify the client
2435                        tx.send(()).await.unwrap();
2436                        continue;
2437                    }
2438
2439                    let mut data_stream = stream
2440                        .accept_data(relaymsg::Connected::new_empty())
2441                        .await
2442                        .unwrap();
2443                    // Notify the client task we're ready to accept DATA cells
2444                    tx.send(()).await.unwrap();
2445
2446                    // Read the data the client sent us
2447                    let mut buf = [0_u8; TEST_DATA.len()];
2448                    data_stream.read_exact(&mut buf).await.unwrap();
2449                    assert_eq!(&buf, TEST_DATA);
2450                }
2451
2452                tunnel
2453            };
2454
2455            let simulate_client = async move {
2456                let begin = relaymsg::Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2457                let body: BoxedCellBody =
2458                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2459                        .encode(rfmt, &mut testing_rng())
2460                        .unwrap();
2461                let begin_msg = chanmsg::Relay::from(body);
2462
2463                // Pretend to be a client at the other end of the circuit sending 2 identical begin
2464                // cells (the first one will be rejected by the test service).
2465                for _ in 0..STREAM_COUNT {
2466                    send.send(AnyChanMsg::Relay(begin_msg.clone()))
2467                        .await
2468                        .unwrap();
2469
2470                    // Wait until the service rejects our request
2471                    rx.next().await.unwrap();
2472                }
2473
2474                // Now send some data along the newly established circuit..
2475                let data = relaymsg::Data::new(TEST_DATA).unwrap();
2476                let body: BoxedCellBody =
2477                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
2478                        .encode(rfmt, &mut testing_rng())
2479                        .unwrap();
2480                let data_msg = chanmsg::Relay::from(body);
2481
2482                send.send(AnyChanMsg::Relay(data_msg)).await.unwrap();
2483                send
2484            };
2485
2486            let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2487        });
2488    }
2489
2490    #[traced_test]
2491    #[test]
2492    #[cfg(feature = "hs-service")]
2493    fn incoming_stream_bad_hop() {
2494        use tor_cell::relaycell::msg::BeginFlags;
2495
2496        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2497            /// Expect the originator of the BEGIN cell to be hop 1.
2498            const EXPECTED_HOP: u8 = 1;
2499            let rfmt = RelayCellFormat::V0;
2500
2501            let (chan, _rx, _sink) = working_fake_channel(&rt);
2502            let (tunnel, mut send) = newtunnel(&rt, chan).await;
2503
2504            // Expect to receive incoming streams from hop EXPECTED_HOP
2505            let mut incoming = tunnel
2506                .allow_stream_requests(
2507                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2508                    // Build the precise HopLocation with the underlying circuit.
2509                    (
2510                        tunnel.as_single_circ().unwrap().unique_id(),
2511                        EXPECTED_HOP.into(),
2512                    )
2513                        .into(),
2514                    AllowAllStreamsFilter,
2515                )
2516                .await
2517                .unwrap();
2518
2519            let simulate_service = async move {
2520                // The originator of the cell is actually the last hop on the circuit, not hop 1,
2521                // so we expect the reactor to shut down.
2522                assert!(incoming.next().await.is_none());
2523                tunnel
2524            };
2525
2526            let simulate_client = async move {
2527                let begin = relaymsg::Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2528                let body: BoxedCellBody =
2529                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2530                        .encode(rfmt, &mut testing_rng())
2531                        .unwrap();
2532                let begin_msg = chanmsg::Relay::from(body);
2533
2534                // Pretend to be a client at the other end of the circuit sending a begin cell
2535                send.send(AnyChanMsg::Relay(begin_msg)).await.unwrap();
2536
2537                send
2538            };
2539
2540            let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2541        });
2542    }
2543
2544    #[traced_test]
2545    #[test]
2546    #[cfg(feature = "conflux")]
2547    fn multipath_circ_validation() {
2548        use std::error::Error as _;
2549
2550        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2551            let params = CircParameters::default();
2552            let invalid_tunnels = [
2553                setup_bad_conflux_tunnel(&rt).await,
2554                setup_conflux_tunnel(&rt, true, params).await,
2555            ];
2556
2557            for tunnel in invalid_tunnels {
2558                let TestTunnelCtx {
2559                    tunnel: _tunnel,
2560                    circs: _circs,
2561                    conflux_link_rx,
2562                } = tunnel;
2563
2564                let conflux_hs_err = conflux_link_rx.await.unwrap().unwrap_err();
2565                let err_src = conflux_hs_err.source().unwrap();
2566
2567                // The two circuits don't end in the same hop (no join point),
2568                // so the reactor will refuse to link them
2569                assert!(
2570                    err_src
2571                        .to_string()
2572                        .contains("one more conflux circuits are invalid")
2573                );
2574            }
2575        });
2576    }
2577
2578    // TODO: this structure could be reused for the other tests,
2579    // to address nickm's comment:
2580    // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3005#note_3202362
2581    #[derive(Debug)]
2582    #[allow(unused)]
2583    #[cfg(feature = "conflux")]
2584    struct TestCircuitCtx {
2585        chan_rx: Receiver<AnyChanCell>,
2586        chan_tx: Sender<std::result::Result<AnyChanCell, Error>>,
2587        circ_tx: CircuitRxSender,
2588        unique_id: UniqId,
2589    }
2590
2591    #[derive(Debug)]
2592    #[cfg(feature = "conflux")]
2593    struct TestTunnelCtx {
2594        tunnel: Arc<ClientTunnel>,
2595        circs: Vec<TestCircuitCtx>,
2596        conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
2597    }
2598
2599    /// Wait for a LINK cell to arrive on the specified channel and return its payload.
2600    #[cfg(feature = "conflux")]
2601    async fn await_link_payload(rx: &mut Receiver<AnyChanCell>) -> ConfluxLink {
2602        // Wait for the LINK cell...
2603        let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2604        let rmsg = match chmsg {
2605            AnyChanMsg::Relay(r) => {
2606                AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2607                    .unwrap()
2608            }
2609            other => panic!("{:?}", other),
2610        };
2611        let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2612
2613        let link = match rmsg {
2614            AnyRelayMsg::ConfluxLink(link) => link,
2615            _ => panic!("unexpected relay message {rmsg:?}"),
2616        };
2617
2618        assert!(streamid.is_none());
2619
2620        link
2621    }
2622
2623    #[cfg(feature = "conflux")]
2624    async fn setup_conflux_tunnel(
2625        rt: &MockRuntime,
2626        same_hops: bool,
2627        params: CircParameters,
2628    ) -> TestTunnelCtx {
2629        let hops1 = hop_details(3, 0);
2630        let hops2 = if same_hops {
2631            hops1.clone()
2632        } else {
2633            hop_details(3, 10)
2634        };
2635
2636        let (chan1, rx1, chan_sink1) = working_fake_channel(rt);
2637        let (mut tunnel1, sink1) = newtunnel_ext(
2638            rt,
2639            UniqId::new(1, 3),
2640            chan1,
2641            hops1,
2642            2.into(),
2643            params.clone(),
2644        )
2645        .await;
2646
2647        let (chan2, rx2, chan_sink2) = working_fake_channel(rt);
2648
2649        let (tunnel2, sink2) =
2650            newtunnel_ext(rt, UniqId::new(2, 4), chan2, hops2, 2.into(), params).await;
2651
2652        let (answer_tx, answer_rx) = oneshot::channel();
2653        tunnel2
2654            .as_single_circ()
2655            .unwrap()
2656            .command
2657            .unbounded_send(CtrlCmd::ShutdownAndReturnCircuit { answer: answer_tx })
2658            .unwrap();
2659
2660        let circuit = answer_rx.await.unwrap().unwrap();
2661        // The circuit should be shutting down its reactor
2662        rt.advance_until_stalled().await;
2663        assert!(tunnel2.is_closed());
2664
2665        let (conflux_link_tx, conflux_link_rx) = oneshot::channel();
2666        // Tell the first circuit to link with the second and form a multipath tunnel
2667        tunnel1
2668            .as_single_circ()
2669            .unwrap()
2670            .control
2671            .unbounded_send(CtrlMsg::LinkCircuits {
2672                circuits: vec![circuit],
2673                answer: conflux_link_tx,
2674            })
2675            .unwrap();
2676
2677        let circ_ctx1 = TestCircuitCtx {
2678            chan_rx: rx1,
2679            chan_tx: chan_sink1,
2680            circ_tx: sink1,
2681            unique_id: tunnel1.unique_id(),
2682        };
2683
2684        let circ_ctx2 = TestCircuitCtx {
2685            chan_rx: rx2,
2686            chan_tx: chan_sink2,
2687            circ_tx: sink2,
2688            unique_id: tunnel2.unique_id(),
2689        };
2690
2691        // TODO(conflux): nothing currently sets this,
2692        // so we need to manually set it.
2693        //
2694        // Instead of doing this, we should have a ClientCirc
2695        // API that sends CtrlMsg::Link circuits and sets this to true
2696        tunnel1.circ.is_multi_path = true;
2697        TestTunnelCtx {
2698            tunnel: Arc::new(tunnel1),
2699            circs: vec![circ_ctx1, circ_ctx2],
2700            conflux_link_rx,
2701        }
2702    }
2703
2704    #[cfg(feature = "conflux")]
2705    async fn setup_good_conflux_tunnel(
2706        rt: &MockRuntime,
2707        cc_params: CongestionControlParams,
2708    ) -> TestTunnelCtx {
2709        // Our 2 test circuits are identical, so they both have the same guards,
2710        // which technically violates the conflux set rule mentioned in prop354.
2711        // For testing purposes this is fine, but in production we'll need to ensure
2712        // the calling code prevents guard reuse (except in the case where
2713        // one of the guards happens to be Guard + Exit)
2714        let same_hops = true;
2715        let flow_ctrl_params = FlowCtrlParameters::defaults_for_tests();
2716        let params = CircParameters::new(true, cc_params, flow_ctrl_params);
2717        setup_conflux_tunnel(rt, same_hops, params).await
2718    }
2719
2720    #[cfg(feature = "conflux")]
2721    async fn setup_bad_conflux_tunnel(rt: &MockRuntime) -> TestTunnelCtx {
2722        // The two circuits don't share any hops,
2723        // so they won't end in the same hop (no join point),
2724        // causing the reactor to refuse to link them.
2725        let same_hops = false;
2726        let flow_ctrl_params = FlowCtrlParameters::defaults_for_tests();
2727        let params = CircParameters::new(true, build_cc_vegas_params(), flow_ctrl_params);
2728        setup_conflux_tunnel(rt, same_hops, params).await
2729    }
2730
2731    #[traced_test]
2732    #[test]
2733    #[cfg(feature = "conflux")]
2734    fn reject_conflux_linked_before_hs() {
2735        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2736            let (chan, mut _rx, _sink) = working_fake_channel(&rt);
2737            let (tunnel, mut sink) = newtunnel(&rt, chan).await;
2738
2739            let nonce = V1Nonce::new(&mut testing_rng());
2740            let payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
2741            // Send a LINKED cell
2742            let linked = relaymsg::ConfluxLinked::new(payload).into();
2743            sink.send(rmsg_to_ccmsg(None, linked, false)).await.unwrap();
2744
2745            rt.advance_until_stalled().await;
2746            assert!(tunnel.is_closed());
2747        });
2748    }
2749
2750    #[traced_test]
2751    #[test]
2752    #[cfg(feature = "conflux")]
2753    fn conflux_hs_timeout() {
2754        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2755            let TestTunnelCtx {
2756                tunnel: _tunnel,
2757                circs,
2758                conflux_link_rx,
2759            } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
2760
2761            let [mut circ1, _circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
2762
2763            // Wait for the LINK cell
2764            let link = await_link_payload(&mut circ1.chan_rx).await;
2765
2766            // Send a LINK cell on the first leg...
2767            let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
2768            circ1
2769                .circ_tx
2770                .send(rmsg_to_ccmsg(None, linked, false))
2771                .await
2772                .unwrap();
2773
2774            // Do nothing, and wait for the handshake to timeout on the second leg
2775            rt.advance_by(Duration::from_secs(60)).await;
2776
2777            let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
2778
2779            // Get the handshake results of each circuit
2780            let [res1, res2]: [StdResult<(), ConfluxHandshakeError>; 2] =
2781                conflux_hs_res.try_into().unwrap();
2782
2783            assert!(res1.is_ok());
2784
2785            let err = res2.unwrap_err();
2786            assert_matches!(err, ConfluxHandshakeError::Timeout);
2787        });
2788    }
2789
2790    #[traced_test]
2791    #[test]
2792    #[cfg(feature = "conflux")]
2793    fn conflux_bad_hs() {
2794        use crate::util::err::ConfluxHandshakeError;
2795
2796        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2797            let nonce = V1Nonce::new(&mut testing_rng());
2798            let bad_link_payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
2799            //let extended2 = relaymsg::Extended2::new(vec![]).into();
2800            let bad_hs_responses = [
2801                (
2802                    rmsg_to_ccmsg(
2803                        None,
2804                        relaymsg::ConfluxLinked::new(bad_link_payload.clone()).into(),
2805                        false,
2806                    ),
2807                    "Received CONFLUX_LINKED cell with mismatched nonce",
2808                ),
2809                (
2810                    rmsg_to_ccmsg(
2811                        None,
2812                        relaymsg::ConfluxLink::new(bad_link_payload).into(),
2813                        false,
2814                    ),
2815                    "Unexpected CONFLUX_LINK cell from hop #3 on client circuit",
2816                ),
2817                (
2818                    rmsg_to_ccmsg(None, relaymsg::ConfluxSwitch::new(0).into(), false),
2819                    "Received CONFLUX_SWITCH on unlinked circuit?!",
2820                ),
2821                // TODO: this currently causes the reactor to shut down immediately,
2822                // without sending a response on the handshake channel
2823                /*
2824                (
2825                    rmsg_to_ccmsg(None, extended2, false),
2826                    "Received CONFLUX_LINKED cell with mismatched nonce",
2827                ),
2828                */
2829            ];
2830
2831            for (bad_cell, expected_err) in bad_hs_responses {
2832                let TestTunnelCtx {
2833                    tunnel,
2834                    circs,
2835                    conflux_link_rx,
2836                } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
2837
2838                let [mut _circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
2839
2840                // Respond with a bogus cell on one of the legs
2841                circ2.circ_tx.send(bad_cell).await.unwrap();
2842
2843                let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
2844                // Get the handshake results (the handshake results are reported early,
2845                // without waiting for the second circuit leg's handshake to timeout,
2846                // because this is a protocol violation causing the entire tunnel to shut down)
2847                let [res2]: [StdResult<(), ConfluxHandshakeError>; 1] =
2848                    conflux_hs_res.try_into().unwrap();
2849
2850                match res2.unwrap_err() {
2851                    ConfluxHandshakeError::Link(Error::CircProto(e)) => {
2852                        assert_eq!(e, expected_err);
2853                    }
2854                    e => panic!("unexpected error: {e:?}"),
2855                }
2856
2857                assert!(tunnel.is_closed());
2858            }
2859        });
2860    }
2861
2862    #[traced_test]
2863    #[test]
2864    #[cfg(feature = "conflux")]
2865    fn unexpected_conflux_cell() {
2866        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2867            let nonce = V1Nonce::new(&mut testing_rng());
2868            let link_payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
2869            let bad_cells = [
2870                rmsg_to_ccmsg(
2871                    None,
2872                    relaymsg::ConfluxLinked::new(link_payload.clone()).into(),
2873                    false,
2874                ),
2875                rmsg_to_ccmsg(
2876                    None,
2877                    relaymsg::ConfluxLink::new(link_payload.clone()).into(),
2878                    false,
2879                ),
2880                rmsg_to_ccmsg(None, relaymsg::ConfluxSwitch::new(0).into(), false),
2881            ];
2882
2883            for bad_cell in bad_cells {
2884                let (chan, mut _rx, _sink) = working_fake_channel(&rt);
2885                let (tunnel, mut sink) = newtunnel(&rt, chan).await;
2886
2887                sink.send(bad_cell).await.unwrap();
2888                rt.advance_until_stalled().await;
2889
2890                // Note: unfortunately we can't assert the circuit is
2891                // closing for the reason, because the reactor just logs
2892                // the error and then exits.
2893                assert!(tunnel.is_closed());
2894            }
2895        });
2896    }
2897
2898    #[traced_test]
2899    #[test]
2900    #[cfg(feature = "conflux")]
2901    fn conflux_bad_linked() {
2902        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2903            let TestTunnelCtx {
2904                tunnel,
2905                circs,
2906                conflux_link_rx: _,
2907            } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
2908
2909            let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
2910
2911            let link = await_link_payload(&mut circ1.chan_rx).await;
2912
2913            // Send a LINK cell on the first leg...
2914            let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
2915            circ1
2916                .circ_tx
2917                .send(rmsg_to_ccmsg(None, linked, false))
2918                .await
2919                .unwrap();
2920
2921            // ...and two LINKED cells on the second
2922            let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
2923            circ2
2924                .circ_tx
2925                .send(rmsg_to_ccmsg(None, linked, false))
2926                .await
2927                .unwrap();
2928            let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
2929            circ2
2930                .circ_tx
2931                .send(rmsg_to_ccmsg(None, linked, false))
2932                .await
2933                .unwrap();
2934
2935            rt.advance_until_stalled().await;
2936
2937            // Receiving a LINKED cell on an already linked leg causes
2938            // the tunnel to be torn down
2939            assert!(tunnel.is_closed());
2940        });
2941    }
2942
2943    #[traced_test]
2944    #[test]
2945    #[cfg(feature = "conflux")]
2946    fn conflux_bad_switch() {
2947        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2948            let cc_vegas_params = build_cc_vegas_params();
2949            let cwnd_init = cc_vegas_params.cwnd_params().cwnd_init();
2950            let bad_switch = [
2951                // SWITCH cells with seqno = 0 are not allowed
2952                relaymsg::ConfluxSwitch::new(0),
2953                // SWITCH cells with seqno > cc_init_cwnd are not allowed
2954                // on tunnels that have not received any data
2955                relaymsg::ConfluxSwitch::new(cwnd_init + 1),
2956            ];
2957
2958            for bad_cell in bad_switch {
2959                let TestTunnelCtx {
2960                    tunnel,
2961                    circs,
2962                    conflux_link_rx,
2963                } = setup_good_conflux_tunnel(&rt, cc_vegas_params.clone()).await;
2964
2965                let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
2966
2967                let link = await_link_payload(&mut circ1.chan_rx).await;
2968
2969                // Send a LINKED cell on both legs
2970                for circ in [&mut circ1, &mut circ2] {
2971                    let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
2972                    circ.circ_tx
2973                        .send(rmsg_to_ccmsg(None, linked, false))
2974                        .await
2975                        .unwrap();
2976                }
2977
2978                let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
2979                assert!(conflux_hs_res.iter().all(|res| res.is_ok()));
2980
2981                // Now send a bad SWITCH cell on the first leg.
2982                // This will cause the tunnel reactor to shut down.
2983                let msg = rmsg_to_ccmsg(None, bad_cell.clone().into(), false);
2984                circ1.circ_tx.send(msg).await.unwrap();
2985
2986                // The tunnel should be shutting down
2987                rt.advance_until_stalled().await;
2988                assert!(tunnel.is_closed());
2989            }
2990        });
2991    }
2992
2993    #[traced_test]
2994    #[test]
2995    #[cfg(feature = "conflux")]
2996    fn conflux_consecutive_switch() {
2997        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2998            let TestTunnelCtx {
2999                tunnel,
3000                circs,
3001                conflux_link_rx,
3002            } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
3003
3004            let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3005
3006            let link = await_link_payload(&mut circ1.chan_rx).await;
3007
3008            // Send a LINKED cell on both legs
3009            for circ in [&mut circ1, &mut circ2] {
3010                let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3011                circ.circ_tx
3012                    .send(rmsg_to_ccmsg(None, linked, false))
3013                    .await
3014                    .unwrap();
3015            }
3016
3017            let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
3018            assert!(conflux_hs_res.iter().all(|res| res.is_ok()));
3019
3020            // Send a valid SWITCH cell on the first leg.
3021            let switch1 = relaymsg::ConfluxSwitch::new(10);
3022            let msg = rmsg_to_ccmsg(None, switch1.into(), false);
3023            circ1.circ_tx.send(msg).await.unwrap();
3024
3025            // The tunnel should not be shutting down
3026            rt.advance_until_stalled().await;
3027            assert!(!tunnel.is_closed());
3028
3029            // Send another valid SWITCH cell on the same leg.
3030            let switch2 = relaymsg::ConfluxSwitch::new(12);
3031            let msg = rmsg_to_ccmsg(None, switch2.into(), false);
3032            circ1.circ_tx.send(msg).await.unwrap();
3033
3034            // The tunnel should now be shutting down
3035            // (consecutive switches are not allowed)
3036            rt.advance_until_stalled().await;
3037            assert!(tunnel.is_closed());
3038        });
3039    }
3040
3041    // This test ensures CtrlMsg::ShutdownAndReturnCircuit returns an
3042    // error when called on a multi-path tunnel
3043    #[traced_test]
3044    #[test]
3045    #[cfg(feature = "conflux")]
3046    fn shutdown_and_return_circ_multipath() {
3047        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3048            let TestTunnelCtx {
3049                tunnel,
3050                circs,
3051                conflux_link_rx: _,
3052            } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
3053
3054            rt.progress_until_stalled().await;
3055
3056            let (answer_tx, answer_rx) = oneshot::channel();
3057            tunnel
3058                .circ
3059                .command
3060                .unbounded_send(CtrlCmd::ShutdownAndReturnCircuit { answer: answer_tx })
3061                .unwrap();
3062
3063            // map explicitly returns () for clarity
3064            #[allow(clippy::unused_unit, clippy::semicolon_if_nothing_returned)]
3065            let err = answer_rx
3066                .await
3067                .unwrap()
3068                .map(|_| {
3069                    // Map to () so we can call unwrap
3070                    // (Circuit doesn't impl debug)
3071                    ()
3072                })
3073                .unwrap_err();
3074
3075            const MSG: &str = "not a single leg conflux set (got at least 2 elements when exactly one was expected)";
3076            assert!(err.to_string().contains(MSG), "{err}");
3077
3078            // The tunnel reactor should be shutting down,
3079            // regardless of the error
3080            rt.progress_until_stalled().await;
3081            assert!(tunnel.is_closed());
3082
3083            // Keep circs alive, to prevent the reactor
3084            // from shutting down prematurely
3085            drop(circs);
3086        });
3087    }
3088
3089    /// Run a conflux test endpoint.
3090    #[cfg(feature = "conflux")]
3091    #[derive(Debug)]
3092    enum ConfluxTestEndpoint<I: Iterator<Item = Option<Duration>>> {
3093        /// Pretend to be an exit relay.
3094        Relay(ConfluxExitState<I>),
3095        /// Client task.
3096        Client {
3097            /// Channel for receiving the outcome of the conflux handshakes.
3098            conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
3099            /// The tunnel reactor handle
3100            tunnel: Arc<ClientTunnel>,
3101            /// Data to send on a stream.
3102            send_data: Vec<u8>,
3103            /// Data we expect to receive on a stream.
3104            recv_data: Vec<u8>,
3105        },
3106    }
3107
3108    /// Structure for returning the sinks, channels, etc. that must stay
3109    /// alive until the test is complete.
3110    #[allow(unused, clippy::large_enum_variant)]
3111    #[derive(Debug)]
3112    #[cfg(feature = "conflux")]
3113    enum ConfluxEndpointResult {
3114        Circuit {
3115            tunnel: Arc<ClientTunnel>,
3116            stream: DataStream,
3117        },
3118        Relay {
3119            circ: TestCircuitCtx,
3120        },
3121    }
3122
3123    /// Stream data, shared by all the mock exit endpoints.
3124    #[derive(Debug)]
3125    #[cfg(feature = "conflux")]
3126    struct ConfluxStreamState {
3127        /// The data received so far on this stream (at the exit).
3128        data_recvd: Vec<u8>,
3129        /// The total amount of data we expect to receive on this stream.
3130        expected_data_len: usize,
3131        /// Whether we have seen a BEGIN cell yet.
3132        begin_recvd: bool,
3133        /// Whether we have seen an END cell yet.
3134        end_recvd: bool,
3135        /// Whether we have sent an END cell yet.
3136        end_sent: bool,
3137    }
3138
3139    #[cfg(feature = "conflux")]
3140    impl ConfluxStreamState {
3141        fn new(expected_data_len: usize) -> Self {
3142            Self {
3143                data_recvd: vec![],
3144                expected_data_len,
3145                begin_recvd: false,
3146                end_recvd: false,
3147                end_sent: false,
3148            }
3149        }
3150    }
3151
3152    /// An object describing a SWITCH cell that we expect to receive
3153    /// in the mock exit
3154    #[derive(Debug)]
3155    #[cfg(feature = "conflux")]
3156    struct ExpectedSwitch {
3157        /// The number of cells we've seen on this leg so far,
3158        /// up to and including the SWITCH.
3159        cells_so_far: usize,
3160        /// The expected seqno in SWITCH cell,
3161        seqno: u32,
3162    }
3163
3164    /// Object dispatching cells for delivery on the appropriate
3165    /// leg in a multipath tunnel.
3166    ///
3167    /// Used to send out-of-order cells from the mock exit
3168    /// to the client under test.
3169    #[cfg(feature = "conflux")]
3170    struct CellDispatcher {
3171        /// Channels on which to send the [`CellToSend`] commands on.
3172        leg_tx: HashMap<UniqId, mpsc::Sender<CellToSend>>,
3173        /// The list of cells to send,
3174        cells_to_send: Vec<(UniqId, AnyRelayMsg)>,
3175    }
3176
3177    #[cfg(feature = "conflux")]
3178    impl CellDispatcher {
3179        async fn run(mut self) {
3180            while !self.cells_to_send.is_empty() {
3181                let (circ_id, cell) = self.cells_to_send.remove(0);
3182                let cell_tx = self.leg_tx.get_mut(&circ_id).unwrap();
3183                let (done_tx, done_rx) = oneshot::channel();
3184                cell_tx.send(CellToSend { done_tx, cell }).await.unwrap();
3185                // Wait for the cell to be sent before sending the next one.
3186                let () = done_rx.await.unwrap();
3187            }
3188        }
3189    }
3190
3191    /// A cell for the mock exit to send on one of its legs.
3192    #[cfg(feature = "conflux")]
3193    #[derive(Debug)]
3194    struct CellToSend {
3195        /// Channel for notifying the control task that the cell was sent.
3196        done_tx: oneshot::Sender<()>,
3197        /// The cell to send.
3198        cell: AnyRelayMsg,
3199    }
3200
3201    /// The state of a mock exit.
3202    #[derive(Debug)]
3203    #[cfg(feature = "conflux")]
3204    struct ConfluxExitState<I: Iterator<Item = Option<Duration>>> {
3205        /// The runtime, shared by the test client and mock exit tasks.
3206        ///
3207        /// The mutex prevents the client and mock exit tasks from calling
3208        /// functions like [`MockRuntime::advance_until_stalled`]
3209        /// or [`MockRuntime::progress_until_stalled]` concurrently,
3210        /// as this is not supported by the mock runtime.
3211        runtime: Arc<AsyncMutex<MockRuntime>>,
3212        /// The client view of the tunnel.
3213        tunnel: Arc<ClientTunnel>,
3214        /// The circuit test context.
3215        circ: TestCircuitCtx,
3216        /// The RTT delay to introduce just before each SENDME.
3217        ///
3218        /// Used to trigger the client to send a SWITCH.
3219        rtt_delays: I,
3220        /// State of the (only) expected stream on this tunnel,
3221        /// shared by all the mock exit endpoints.
3222        stream_state: Arc<Mutex<ConfluxStreamState>>,
3223        /// The number of cells after which to expect a SWITCH
3224        /// cell from the client.
3225        expect_switch: Vec<ExpectedSwitch>,
3226        /// Channel for receiving notifications from the other leg.
3227        event_rx: mpsc::Receiver<MockExitEvent>,
3228        /// Channel for sending notifications to the other leg.
3229        event_tx: mpsc::Sender<MockExitEvent>,
3230        /// Whether this circuit leg should act as the primary (sending) leg.
3231        is_sending_leg: bool,
3232        /// A channel for receiving cells to send on this stream.
3233        cells_rx: mpsc::Receiver<CellToSend>,
3234    }
3235
3236    #[cfg(feature = "conflux")]
3237    async fn good_exit_handshake(
3238        runtime: &Arc<AsyncMutex<MockRuntime>>,
3239        init_rtt_delay: Option<Duration>,
3240        rx: &mut Receiver<ChanCell<AnyChanMsg>>,
3241        sink: &mut CircuitRxSender,
3242    ) {
3243        // Wait for the LINK cell
3244        let link = await_link_payload(rx).await;
3245
3246        // Introduce an artificial delay, to make one circ have a better initial RTT
3247        // than the other
3248        if let Some(init_rtt_delay) = init_rtt_delay {
3249            runtime.lock().await.advance_by(init_rtt_delay).await;
3250        }
3251
3252        // Reply with a LINKED cell...
3253        let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3254        sink.send(rmsg_to_ccmsg(None, linked, false)).await.unwrap();
3255
3256        // Wait for the client to respond with LINKED_ACK...
3257        let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
3258        let rmsg = match chmsg {
3259            AnyChanMsg::Relay(r) => {
3260                AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
3261                    .unwrap()
3262            }
3263            other => panic!("{other:?}"),
3264        };
3265        let (_streamid, rmsg) = rmsg.into_streamid_and_msg();
3266
3267        assert_matches!(rmsg, AnyRelayMsg::ConfluxLinkedAck(_));
3268    }
3269
3270    /// An event sent by one mock conflux leg to another.
3271    #[derive(Copy, Clone, Debug)]
3272    enum MockExitEvent {
3273        /// Inform the other leg we are done.
3274        Done,
3275        /// Inform the other leg a stream was opened.
3276        BeginRecvd(StreamId),
3277    }
3278
3279    #[cfg(feature = "conflux")]
3280    async fn run_mock_conflux_exit<I: Iterator<Item = Option<Duration>>>(
3281        state: ConfluxExitState<I>,
3282    ) -> ConfluxEndpointResult {
3283        let ConfluxExitState {
3284            runtime,
3285            tunnel,
3286            mut circ,
3287            rtt_delays,
3288            stream_state,
3289            mut expect_switch,
3290            mut event_tx,
3291            mut event_rx,
3292            is_sending_leg,
3293            mut cells_rx,
3294        } = state;
3295
3296        let mut rtt_delays = rtt_delays.into_iter();
3297
3298        // Expect the client to open a stream, and de-multiplex the received stream data
3299        let stream_len = stream_state.lock().unwrap().expected_data_len;
3300        let mut data_cells_received = 0_usize;
3301        let mut cell_count = 0_usize;
3302        let mut tags = vec![];
3303        let mut streamid = None;
3304        let mut done_writing = false;
3305
3306        loop {
3307            let should_exit = {
3308                let stream_state = stream_state.lock().unwrap();
3309                let done_reading = stream_state.data_recvd.len() >= stream_len;
3310
3311                (stream_state.begin_recvd || stream_state.end_recvd) && done_reading && done_writing
3312            };
3313
3314            if should_exit {
3315                break;
3316            }
3317
3318            use futures::select;
3319
3320            // Only start reading from the dispatcher channel after the stream is open
3321            // and we're ready to start sending cells.
3322            let mut next_cell = if streamid.is_some() && !done_writing {
3323                Box::pin(cells_rx.next().fuse())
3324                    as Pin<Box<dyn FusedFuture<Output = Option<CellToSend>> + Send>>
3325            } else {
3326                Box::pin(std::future::pending().fuse())
3327            };
3328
3329            // Wait for the BEGIN cell to arrive, or for the transfer to complete
3330            // (we need to bail if the other leg already completed);
3331            let res = select! {
3332                res = circ.chan_rx.next() => {
3333                    res.unwrap()
3334                },
3335                res = event_rx.next() => {
3336                    let Some(event) = res else {
3337                        break;
3338                    };
3339
3340                    match event {
3341                        MockExitEvent::Done => {
3342                            break;
3343                        },
3344                        MockExitEvent::BeginRecvd(id) => {
3345                            // The stream is now open (the other leg received the BEGIN),
3346                            // so we're reading to start reading cells from the cell dispatcher.
3347                            streamid = Some(id);
3348                            continue;
3349                        },
3350                    }
3351                }
3352                res = next_cell => {
3353                    if let Some(cell_to_send) = res {
3354                        let CellToSend { cell, done_tx } = cell_to_send;
3355
3356                        // SWITCH cells don't have a stream ID
3357                        let streamid = if matches!(cell, AnyRelayMsg::ConfluxSwitch(_)) {
3358                            None
3359                        } else {
3360                            streamid
3361                        };
3362
3363                        circ.circ_tx
3364                            .send(rmsg_to_ccmsg(streamid, cell, false))
3365                            .await
3366                            .unwrap();
3367
3368                        runtime.lock().await.advance_until_stalled().await;
3369                        done_tx.send(()).unwrap();
3370                    } else {
3371                        done_writing = true;
3372                    }
3373
3374                    continue;
3375                }
3376            };
3377
3378            let (_id, chmsg) = res.into_circid_and_msg();
3379            cell_count += 1;
3380            let rmsg = match chmsg {
3381                AnyChanMsg::Relay(r) => {
3382                    AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
3383                        .unwrap()
3384                }
3385                other => panic!("{:?}", other),
3386            };
3387            let (new_streamid, rmsg) = rmsg.into_streamid_and_msg();
3388            if streamid.is_none() {
3389                streamid = new_streamid;
3390            }
3391
3392            let begin_recvd = stream_state.lock().unwrap().begin_recvd;
3393            let end_recvd = stream_state.lock().unwrap().end_recvd;
3394            match rmsg {
3395                AnyRelayMsg::Begin(_) if begin_recvd => {
3396                    panic!("client tried to open two streams?!");
3397                }
3398                AnyRelayMsg::Begin(_) if !begin_recvd => {
3399                    stream_state.lock().unwrap().begin_recvd = true;
3400                    // Reply with a connected cell...
3401                    let connected = relaymsg::Connected::new_empty().into();
3402                    circ.circ_tx
3403                        .send(rmsg_to_ccmsg(streamid, connected, false))
3404                        .await
3405                        .unwrap();
3406                    // Tell the other leg we received a BEGIN cell
3407                    event_tx
3408                        .send(MockExitEvent::BeginRecvd(streamid.unwrap()))
3409                        .await
3410                        .unwrap();
3411                }
3412                AnyRelayMsg::End(_) if !end_recvd => {
3413                    stream_state.lock().unwrap().end_recvd = true;
3414                    break;
3415                }
3416                AnyRelayMsg::End(_) if end_recvd => {
3417                    panic!("received two END cells for the same stream?!");
3418                }
3419                AnyRelayMsg::ConfluxSwitch(cell) => {
3420                    // Ensure we got the SWITCH after the expected number of cells
3421                    let expected = expect_switch.remove(0);
3422
3423                    assert_eq!(expected.cells_so_far, cell_count);
3424                    assert_eq!(expected.seqno, cell.seqno());
3425
3426                    // To keep the tests simple, we don't handle out of order cells,
3427                    // and simply sort the received data at the end.
3428                    // This ensures all the data was actually received,
3429                    // but it doesn't actually test that the SWITCH cells
3430                    // contain the appropriate seqnos.
3431                    continue;
3432                }
3433                AnyRelayMsg::Data(dat) => {
3434                    data_cells_received += 1;
3435                    stream_state
3436                        .lock()
3437                        .unwrap()
3438                        .data_recvd
3439                        .extend_from_slice(dat.as_ref());
3440
3441                    let is_next_cell_sendme = data_cells_received.is_multiple_of(31);
3442                    if is_next_cell_sendme {
3443                        if tags.is_empty() {
3444                            // Important: we need to make sure all the SENDMEs
3445                            // we sent so far have been processed by the reactor
3446                            // (otherwise the next QuerySendWindow call
3447                            // might return an outdated list of tags!)
3448                            runtime.lock().await.advance_until_stalled().await;
3449                            let (tx, rx) = oneshot::channel();
3450                            tunnel
3451                                .circ
3452                                .command
3453                                .unbounded_send(CtrlCmd::QuerySendWindow {
3454                                    hop: 2.into(),
3455                                    leg: circ.unique_id,
3456                                    done: tx,
3457                                })
3458                                .unwrap();
3459
3460                            // Get a fresh batch of tags.
3461                            let (_window, new_tags) = rx.await.unwrap().unwrap();
3462                            tags = new_tags;
3463                        }
3464
3465                        let tag = tags.remove(0);
3466
3467                        // Introduce an artificial delay, to make one circ have worse RTT
3468                        // than the other, and thus trigger a SWITCH
3469                        if let Some(rtt_delay) = rtt_delays.next().flatten() {
3470                            runtime.lock().await.advance_by(rtt_delay).await;
3471                        }
3472                        // Make and send a circuit-level SENDME
3473                        let sendme = relaymsg::Sendme::from(tag).into();
3474
3475                        circ.circ_tx
3476                            .send(rmsg_to_ccmsg(None, sendme, false))
3477                            .await
3478                            .unwrap();
3479                    }
3480                }
3481                _ => panic!("unexpected message {rmsg:?} on leg {}", circ.unique_id),
3482            }
3483        }
3484
3485        let end_recvd = stream_state.lock().unwrap().end_recvd;
3486
3487        // Close the stream if the other endpoint hasn't already done so
3488        if is_sending_leg && !end_recvd {
3489            let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into();
3490            circ.circ_tx
3491                .send(rmsg_to_ccmsg(streamid, end, false))
3492                .await
3493                .unwrap();
3494            stream_state.lock().unwrap().end_sent = true;
3495        }
3496
3497        // This is allowed to fail, because the other leg might have exited first.
3498        let _ = event_tx.send(MockExitEvent::Done).await;
3499
3500        // Ensure we received all the switch cells we were expecting
3501        assert!(
3502            expect_switch.is_empty(),
3503            "expect_switch = {expect_switch:?}"
3504        );
3505
3506        ConfluxEndpointResult::Relay { circ }
3507    }
3508
3509    #[cfg(feature = "conflux")]
3510    async fn run_conflux_client(
3511        tunnel: Arc<ClientTunnel>,
3512        conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
3513        send_data: Vec<u8>,
3514        recv_data: Vec<u8>,
3515    ) -> ConfluxEndpointResult {
3516        let res = conflux_link_rx.await;
3517
3518        let res = res.unwrap().unwrap();
3519        assert_eq!(res.len(), 2);
3520
3521        // All circuit legs have completed the conflux handshake,
3522        // so we now have a multipath tunnel
3523
3524        // Now we're ready to open a stream
3525        let mut stream = tunnel
3526            .begin_stream("www.example.com", 443, None)
3527            .await
3528            .unwrap();
3529
3530        stream.write_all(&send_data).await.unwrap();
3531        stream.flush().await.unwrap();
3532
3533        let mut recv: Vec<u8> = Vec::new();
3534        let recv_len = stream.read_to_end(&mut recv).await.unwrap();
3535        assert_eq!(recv_len, recv_data.len());
3536        assert_eq!(recv_data, recv);
3537
3538        ConfluxEndpointResult::Circuit { tunnel, stream }
3539    }
3540
3541    #[cfg(feature = "conflux")]
3542    async fn run_conflux_endpoint<I: Iterator<Item = Option<Duration>>>(
3543        endpoint: ConfluxTestEndpoint<I>,
3544    ) -> ConfluxEndpointResult {
3545        match endpoint {
3546            ConfluxTestEndpoint::Relay(state) => run_mock_conflux_exit(state).await,
3547            ConfluxTestEndpoint::Client {
3548                tunnel,
3549                conflux_link_rx,
3550                send_data,
3551                recv_data,
3552            } => run_conflux_client(tunnel, conflux_link_rx, send_data, recv_data).await,
3553        }
3554    }
3555
3556    // In this test, a `ConfluxTestEndpoint::Client` task creates a multipath tunnel
3557    // with 2 legs, opens a stream and sends 300 DATA cells on it.
3558    //
3559    // The test spawns two `ConfluxTestEndpoint::Relay` tasks (one for each leg),
3560    // which mock the behavior of an exit. The two relay tasks introduce
3561    // artificial delays before each SENDME sent to the client,
3562    // in order to trigger it to switch its sending leg predictably.
3563    //
3564    // The mock exit does not send any data on the stream.
3565    //
3566    // This test checks that the client sends SWITCH cells at the right time,
3567    // and that all the data it sent over the stream arrived at the exit.
3568    //
3569    // Note, however, that it doesn't check that the client sends the data in
3570    // the right order. For simplicity, the test concatenates the data received
3571    // on both legs, sorts it, and then compares it against the of the data sent
3572    // by the client (TODO: improve this)
3573    #[traced_test]
3574    #[test]
3575    #[cfg(feature = "conflux")]
3576    fn multipath_client_to_exit() {
3577        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3578            /// The number of data cells to send.
3579            const NUM_CELLS: usize = 300;
3580            /// 498 bytes per DATA cell.
3581            const CELL_SIZE: usize = 498;
3582
3583            let TestTunnelCtx {
3584                tunnel,
3585                circs,
3586                conflux_link_rx,
3587            } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
3588            let [circ1, circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3589
3590            // The stream data we're going to send over the conflux tunnel
3591            let mut send_data = (0..255_u8)
3592                .cycle()
3593                .take(NUM_CELLS * CELL_SIZE)
3594                .collect::<Vec<_>>();
3595            let stream_state = Arc::new(Mutex::new(ConfluxStreamState::new(send_data.len())));
3596
3597            let mut tasks = vec![];
3598
3599            // Channels used by the mock relays to notify each other
3600            // of various events.
3601            let (tx1, rx1) = mpsc::channel(1);
3602            let (tx2, rx2) = mpsc::channel(1);
3603
3604            // The 9 RTT delays to insert before each of the 9 SENDMEs
3605            // the exit will end up sending.
3606            //
3607            // Note: the first delay is the init_rtt delay (measured during the conflux HS).
3608            let circ1_rtt_delays = [
3609                // Initially, circ1 has better RTT, so we will start on this leg.
3610                Some(Duration::from_millis(100)),
3611                // But then its RTT takes a turn for the worse,
3612                // triggering a switch after the first SENDME is processed
3613                // (this happens after sending 123 DATA cells).
3614                Some(Duration::from_millis(500)),
3615                Some(Duration::from_millis(700)),
3616                Some(Duration::from_millis(900)),
3617                Some(Duration::from_millis(1100)),
3618                Some(Duration::from_millis(1300)),
3619                Some(Duration::from_millis(1500)),
3620                Some(Duration::from_millis(1700)),
3621                Some(Duration::from_millis(1900)),
3622                Some(Duration::from_millis(2100)),
3623            ]
3624            .into_iter();
3625
3626            let circ2_rtt_delays = [
3627                Some(Duration::from_millis(200)),
3628                Some(Duration::from_millis(400)),
3629                Some(Duration::from_millis(600)),
3630                Some(Duration::from_millis(800)),
3631                Some(Duration::from_millis(1000)),
3632                Some(Duration::from_millis(1200)),
3633                Some(Duration::from_millis(1400)),
3634                Some(Duration::from_millis(1600)),
3635                Some(Duration::from_millis(1800)),
3636                Some(Duration::from_millis(2000)),
3637            ]
3638            .into_iter();
3639
3640            let expected_switches1 = vec![ExpectedSwitch {
3641                // We start on this leg, and receive a BEGIN cell,
3642                // followed by (4 * 31 - 1) = 123 DATA cells.
3643                // Then it becomes blocked on CC, then finally the reactor
3644                // realizes it has some SENDMEs to process, and
3645                // then as a result of the new RTT measurement, we switch to circ1,
3646                // and then finally we switch back here, and get another SWITCH
3647                // as the 126th cell.
3648                cells_so_far: 126,
3649                // Leg 2 switches back to this leg after the 249th cell
3650                // (just before sending the 250th one):
3651                // seqno = 125 carried over from leg 1 (see the seqno of the
3652                // SWITCH expected on leg 2 below), plus 1 SWITCH, plus
3653                // 4 * 31 = 124 DATA cells after which the RTT of the first leg
3654                // is deemed favorable again.
3655                //
3656                // 249 - 125 (last_seq_sent of leg 1) = 124
3657                seqno: 124,
3658            }];
3659
3660            let expected_switches2 = vec![ExpectedSwitch {
3661                // The SWITCH is the first cell we received after the conflux HS
3662                // on this leg.
3663                cells_so_far: 1,
3664                // See explanation on the ExpectedSwitch from circ1 above.
3665                seqno: 125,
3666            }];
3667
3668            let relay_runtime = Arc::new(AsyncMutex::new(rt.clone()));
3669
3670            // Drop the senders and close the channels,
3671            // we have nothing to send in this test.
3672            let (_, cells_rx1) = mpsc::channel(1);
3673            let (_, cells_rx2) = mpsc::channel(1);
3674
3675            let relay1 = ConfluxExitState {
3676                runtime: Arc::clone(&relay_runtime),
3677                tunnel: Arc::clone(&tunnel),
3678                circ: circ1,
3679                rtt_delays: circ1_rtt_delays,
3680                stream_state: Arc::clone(&stream_state),
3681                expect_switch: expected_switches1,
3682                event_tx: tx1,
3683                event_rx: rx2,
3684                is_sending_leg: true,
3685                cells_rx: cells_rx1,
3686            };
3687
3688            let relay2 = ConfluxExitState {
3689                runtime: Arc::clone(&relay_runtime),
3690                tunnel: Arc::clone(&tunnel),
3691                circ: circ2,
3692                rtt_delays: circ2_rtt_delays,
3693                stream_state: Arc::clone(&stream_state),
3694                expect_switch: expected_switches2,
3695                event_tx: tx2,
3696                event_rx: rx1,
3697                is_sending_leg: false,
3698                cells_rx: cells_rx2,
3699            };
3700
3701            for mut mock_relay in [relay1, relay2] {
3702                let leg = mock_relay.circ.unique_id;
3703
3704                // Do the conflux handshake
3705                //
3706                // We do this outside of run_conflux_endpoint,
3707                // toa void running both handshakes at concurrently
3708                // (this gives more predictable RTT delays:
3709                // if both handshake tasks run at once, they race
3710                // to advance the mock runtime's clock)
3711                good_exit_handshake(
3712                    &relay_runtime,
3713                    mock_relay.rtt_delays.next().flatten(),
3714                    &mut mock_relay.circ.chan_rx,
3715                    &mut mock_relay.circ.circ_tx,
3716                )
3717                .await;
3718
3719                let relay = ConfluxTestEndpoint::Relay(mock_relay);
3720
3721                tasks.push(rt.spawn_join(format!("relay task {leg}"), run_conflux_endpoint(relay)));
3722            }
3723
3724            tasks.push(rt.spawn_join(
3725                "client task".to_string(),
3726                run_conflux_endpoint(ConfluxTestEndpoint::Client {
3727                    tunnel,
3728                    conflux_link_rx,
3729                    send_data: send_data.clone(),
3730                    recv_data: vec![],
3731                }),
3732            ));
3733            let _sinks = futures::future::join_all(tasks).await;
3734            let mut stream_state = stream_state.lock().unwrap();
3735            assert!(stream_state.begin_recvd);
3736
3737            stream_state.data_recvd.sort();
3738            send_data.sort();
3739            assert_eq!(stream_state.data_recvd, send_data);
3740        });
3741    }
3742
3743    // In this test, a `ConfluxTestEndpoint::Client` task creates a multipath tunnel
3744    // with 2 legs, opens a stream and reads from the stream until the stream is closed.
3745    //
3746    // The test spawns two `ConfluxTestEndpoint::Relay` tasks (one for each leg),
3747    // which mock the behavior of an exit. The two tasks send DATA and SWITCH
3748    // cells on the two circuit "legs" such that some cells arrive out of order.
3749    // This forces the client to buffer some cells, and then reorder them when
3750    // the missing cells finally arrive.
3751    //
3752    // The client does not send any data on the stream.
3753    #[cfg(feature = "conflux")]
3754    async fn run_multipath_exit_to_client_test(
3755        rt: MockRuntime,
3756        tunnel: TestTunnelCtx,
3757        cells_to_send: Vec<(UniqId, AnyRelayMsg)>,
3758        send_data: Vec<u8>,
3759        recv_data: Vec<u8>,
3760    ) -> Arc<Mutex<ConfluxStreamState>> {
3761        let TestTunnelCtx {
3762            tunnel,
3763            circs,
3764            conflux_link_rx,
3765        } = tunnel;
3766        let [circ1, circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3767
3768        let stream_state = Arc::new(Mutex::new(ConfluxStreamState::new(send_data.len())));
3769
3770        let mut tasks = vec![];
3771        let relay_runtime = Arc::new(AsyncMutex::new(rt.clone()));
3772        let (cells_tx1, cells_rx1) = mpsc::channel(1);
3773        let (cells_tx2, cells_rx2) = mpsc::channel(1);
3774
3775        let dispatcher = CellDispatcher {
3776            leg_tx: [(circ1.unique_id, cells_tx1), (circ2.unique_id, cells_tx2)]
3777                .into_iter()
3778                .collect(),
3779            cells_to_send,
3780        };
3781
3782        // Channels used by the mock relays to notify each other
3783        // of various events.
3784        let (tx1, rx1) = mpsc::channel(1);
3785        let (tx2, rx2) = mpsc::channel(1);
3786
3787        let relay1 = ConfluxExitState {
3788            runtime: Arc::clone(&relay_runtime),
3789            tunnel: Arc::clone(&tunnel),
3790            circ: circ1,
3791            rtt_delays: [].into_iter(),
3792            stream_state: Arc::clone(&stream_state),
3793            // Expect no SWITCH cells from the client
3794            expect_switch: vec![],
3795            event_tx: tx1,
3796            event_rx: rx2,
3797            is_sending_leg: false,
3798            cells_rx: cells_rx1,
3799        };
3800
3801        let relay2 = ConfluxExitState {
3802            runtime: Arc::clone(&relay_runtime),
3803            tunnel: Arc::clone(&tunnel),
3804            circ: circ2,
3805            rtt_delays: [].into_iter(),
3806            stream_state: Arc::clone(&stream_state),
3807            // Expect no SWITCH cells from the client
3808            expect_switch: vec![],
3809            event_tx: tx2,
3810            event_rx: rx1,
3811            is_sending_leg: true,
3812            cells_rx: cells_rx2,
3813        };
3814
3815        // Run the cell dispatcher, which tells each exit leg task
3816        // what cells to write.
3817        //
3818        // This enables us to write out-of-order cells deterministically.
3819        rt.spawn(dispatcher.run()).unwrap();
3820
3821        for mut mock_relay in [relay1, relay2] {
3822            let leg = mock_relay.circ.unique_id;
3823
3824            good_exit_handshake(
3825                &relay_runtime,
3826                mock_relay.rtt_delays.next().flatten(),
3827                &mut mock_relay.circ.chan_rx,
3828                &mut mock_relay.circ.circ_tx,
3829            )
3830            .await;
3831
3832            let relay = ConfluxTestEndpoint::Relay(mock_relay);
3833
3834            tasks.push(rt.spawn_join(format!("relay task {leg}"), run_conflux_endpoint(relay)));
3835        }
3836
3837        tasks.push(rt.spawn_join(
3838            "client task".to_string(),
3839            run_conflux_endpoint(ConfluxTestEndpoint::Client {
3840                tunnel,
3841                conflux_link_rx,
3842                send_data: send_data.clone(),
3843                recv_data,
3844            }),
3845        ));
3846
3847        // Wait for all the tasks to complete
3848        let _sinks = futures::future::join_all(tasks).await;
3849
3850        stream_state
3851    }
3852
3853    #[traced_test]
3854    #[test]
3855    #[cfg(feature = "conflux")]
3856    fn multipath_exit_to_client() {
3857        // The data we expect the client to read from the stream
3858        const TO_SEND: &[u8] =
3859            b"But something about Buster Friendly irritated John Isidore, one specific thing";
3860
3861        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3862            // The indices of the tunnel legs.
3863            const CIRC1: usize = 0;
3864            const CIRC2: usize = 1;
3865
3866            // The client receives the following cells, in the order indicated
3867            // by the t0-t8 "timestamps" (where C = CONNECTED, D = DATA, E = END,
3868            // S = SWITCH):
3869            //
3870            //  Leg 1 (CIRC1):   -----------D--------------------- D -- D -- C
3871            //                              |                      |    |    | \
3872            //                              |                      |    |    |  v
3873            //                              |                      |    |    | client
3874            //                              |                      |    |    |  ^
3875            //                              |                      |    |    |/
3876            //  Leg 2 (CIRC2): E - D -- D --\--- D* -- S (seqno=4)-/----/----/
3877            //                 |   |    |   |    |       |         |    |    |
3878            //                 |   |    |   |    |       |         |    |    |
3879            //                 |   |    |   |    |       |         |    |    |
3880            //  Time:          t8  t7   t6  t5   t4      t3        t2   t1  t0
3881            //
3882            //
3883            //  The cells marked with * are out of order.
3884            //
3885            // Note: t0 is the time when the client receives the first cell,
3886            // and t8 is the time when it receives the last one.
3887            // In other words, this test simulates a mock exit that "sent" the cells
3888            // in the order t0, t1, t2, t5, t4, t6, t7, t8
3889            let simple_switch = vec![
3890                (CIRC1, relaymsg::Data::new(&TO_SEND[0..5]).unwrap().into()),
3891                (CIRC1, relaymsg::Data::new(&TO_SEND[5..10]).unwrap().into()),
3892                // Switch to sending on the second leg
3893                (CIRC2, relaymsg::ConfluxSwitch::new(4).into()),
3894                // An out of order cell!
3895                (CIRC2, relaymsg::Data::new(&TO_SEND[20..30]).unwrap().into()),
3896                // The missing cell (as indicated by seqno = 4 from the switch cell above)
3897                // is finally arriving on leg1
3898                (CIRC1, relaymsg::Data::new(&TO_SEND[10..20]).unwrap().into()),
3899                (CIRC2, relaymsg::Data::new(&TO_SEND[30..40]).unwrap().into()),
3900                (CIRC2, relaymsg::Data::new(&TO_SEND[40..]).unwrap().into()),
3901            ];
3902
3903            //  Leg 1 (CIRC1): ---------------- D  ------D* --- S(seqno = 3) -- D - D ---------------------------- C
3904            //                                  |        |          |           |   |                              | \
3905            //                                  |        |          |           |   |                              |  v
3906            //                                  |        |          |           |   |                              |  client
3907            //                                  |        |          |           |   |                              |  ^
3908            //                                  |        |          |           |   |                              | /
3909            //  Leg 2 (CIRC2): E - S(seqno = 2) \ -- D --\----------\---------- \ --\--- D* -- D* - S(seqno = 3) --/
3910            //                 |        |       |    |   |          |           |   |    |     |         |         |
3911            //                 |        |       |    |   |          |           |   |    |     |         |         |
3912            //                 |        |       |    |   |          |           |   |    |     |         |         |
3913            //  Time:          t11      t10     t9   t8  t7         t6          t5  t4   t3    t2        t1        t0
3914            //  =====================================================================================================
3915            //  Leg 1 LSR:      8        8      8 7  7   7          6           3   2    1      1        1         1
3916            //  Leg 2 LSR:      9        8      6 6  6   5          5           5   5    5      4        3         0
3917            //  LSD:            9        8      8 7  6   5          5       5   3   2    1      1        1         1
3918            //                                    ^ OOO cell is delivered   ^ the OOO cells are delivered to the stream
3919            //
3920            //
3921            //  (LSR = last seq received, LSD = last seq delivered, both from the client's POV)
3922            //
3923            //
3924            // The client keeps track of the `last_seqno_received` (LSR) on each leg.
3925            // This is incremented for each cell that counts towards the seqnos (BEGIN, DATA, etc.)
3926            // that is received on the leg. The client also tracks the `last_seqno_delivered` (LSD),
3927            // which is the seqno of the last cell delivered to a stream
3928            // (this is global for the whole tunnel, whereas the LSR is different for each leg).
3929            //
3930            // When switching to leg `N`, the seqno in the switch is, from the POV of the sender,
3931            // the delta between the absolute seqno (i.e. the total number of cells[^1] sent)
3932            // and the value of this absolute seqno when leg `N` was last used.
3933            //
3934            // At the time of the first SWITCH from `t1`, the exit "sent" 3 cells:
3935            // a `CONNECTED` cell, which was received by the client at `t0`, and 2 `DATA` cells that
3936            // haven't been received yet. At this point, the exit decides to switch to leg 2,
3937            // on which it hasn't sent any cells yet, so the seqno is set to `3 - 0 = 3`.
3938            //
3939            // At `t6` when the exit sends the second switch (leg 2 -> leg 1), has "sent" 6 cells
3940            // (`C` plus the data cells that are received at `t1 - 5` and `t8`.
3941            // The seqno is `6 - 3 = 3`, because when it last sent on leg 1,
3942            // the absolute seqno was `3`.
3943            //
3944            // At `t10`, the absolute seqno is 8 (8 qualifying cells have been sent so far).
3945            // When the exit last sent on leg 2 (which we are switching to),
3946            // the absolute seqno was `6`, so the `SWITCH` cell will have `8 - 6 = 2` as the seqno.
3947            //
3948            // [^1]: only counting the cells that count towards sequence numbers
3949            let multiple_switches = vec![
3950                // Immediately switch to sending on the second leg
3951                // (indicating that we've already sent 3 cells (including the CONNECTED)
3952                (CIRC2, relaymsg::ConfluxSwitch::new(3).into()),
3953                // Two out of order cells!
3954                (CIRC2, relaymsg::Data::new(&TO_SEND[15..20]).unwrap().into()),
3955                (CIRC2, relaymsg::Data::new(&TO_SEND[20..30]).unwrap().into()),
3956                // The missing cells finally arrive on the first leg
3957                (CIRC1, relaymsg::Data::new(&TO_SEND[0..10]).unwrap().into()),
3958                (CIRC1, relaymsg::Data::new(&TO_SEND[10..15]).unwrap().into()),
3959                // Switch back to the first leg
3960                (CIRC1, relaymsg::ConfluxSwitch::new(3).into()),
3961                // OOO cell
3962                (CIRC1, relaymsg::Data::new(&TO_SEND[31..40]).unwrap().into()),
3963                // Missing cell is received
3964                (CIRC2, relaymsg::Data::new(&TO_SEND[30..31]).unwrap().into()),
3965                // The remaining cells are in-order
3966                (CIRC1, relaymsg::Data::new(&TO_SEND[40..]).unwrap().into()),
3967                // Switch right after we've sent all the data we had to send
3968                (CIRC2, relaymsg::ConfluxSwitch::new(2).into()),
3969            ];
3970
3971            // TODO: give these tests the ability to control when END cells are sent
3972            // (currently we have ensure the is_sending_leg is set to true
3973            // on the leg that ends up sending the last data cell).
3974            //
3975            // TODO: test the edge cases
3976            let tests = [simple_switch, multiple_switches];
3977
3978            for cells_to_send in tests {
3979                let tunnel = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
3980                assert_eq!(tunnel.circs.len(), 2);
3981                let circ_ids = [tunnel.circs[0].unique_id, tunnel.circs[1].unique_id];
3982                let cells_to_send = cells_to_send
3983                    .into_iter()
3984                    .map(|(i, cell)| (circ_ids[i], cell))
3985                    .collect();
3986
3987                // The client won't be sending any DATA cells on this stream
3988                let send_data = vec![];
3989                let stream_state = run_multipath_exit_to_client_test(
3990                    rt.clone(),
3991                    tunnel,
3992                    cells_to_send,
3993                    send_data.clone(),
3994                    TO_SEND.into(),
3995                )
3996                .await;
3997                let stream_state = stream_state.lock().unwrap();
3998                assert!(stream_state.begin_recvd);
3999                // We don't expect the client to have sent anything
4000                assert!(stream_state.data_recvd.is_empty());
4001            }
4002        });
4003    }
4004
4005    #[traced_test]
4006    #[test]
4007    #[cfg(all(feature = "conflux", feature = "hs-service"))]
4008    fn conflux_incoming_stream() {
4009        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
4010            use std::error::Error as _;
4011
4012            const EXPECTED_HOP: u8 = 1;
4013
4014            let TestTunnelCtx {
4015                tunnel,
4016                circs,
4017                conflux_link_rx,
4018            } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
4019
4020            let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
4021
4022            let link = await_link_payload(&mut circ1.chan_rx).await;
4023            for circ in [&mut circ1, &mut circ2] {
4024                let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
4025                circ.circ_tx
4026                    .send(rmsg_to_ccmsg(None, linked, false))
4027                    .await
4028                    .unwrap();
4029            }
4030
4031            let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
4032            assert!(conflux_hs_res.iter().all(|res| res.is_ok()));
4033
4034            // TODO(#2002): we don't currently support conflux for onion services
4035            let err = tunnel
4036                .allow_stream_requests(
4037                    &[tor_cell::relaycell::RelayCmd::BEGIN],
4038                    (tunnel.circ.unique_id(), EXPECTED_HOP.into()).into(),
4039                    AllowAllStreamsFilter,
4040                )
4041                .await
4042                // IncomingStream doesn't impl Debug, so we need to map to a different type
4043                .map(|_| ())
4044                .unwrap_err();
4045
4046            let err_src = err.source().unwrap().to_string();
4047            assert!(
4048                err_src.contains("Cannot allow stream requests on a multi-path tunnel"),
4049                "{err_src}"
4050            );
4051        });
4052    }
4053
4054    #[test]
4055    fn client_circ_chan_msg() {
4056        use tor_cell::chancell::msg::{self, AnyChanMsg};
4057        fn good(m: AnyChanMsg) {
4058            assert!(ClientCircChanMsg::try_from(m).is_ok());
4059        }
4060        fn bad(m: AnyChanMsg) {
4061            assert!(ClientCircChanMsg::try_from(m).is_err());
4062        }
4063
4064        good(msg::Destroy::new(2.into()).into());
4065        bad(msg::CreatedFast::new(&b"guaranteed in this world"[..]).into());
4066        bad(msg::Created2::new(&b"and the next"[..]).into());
4067        good(msg::Relay::new(&b"guaranteed guaranteed"[..]).into());
4068        bad(msg::AnyChanMsg::RelayEarly(
4069            msg::Relay::new(&b"for the world and its mother"[..]).into(),
4070        ));
4071        bad(msg::Versions::new([1, 2, 3]).unwrap().into());
4072    }
4073}