Skip to main content

tor_proto/client/
circuit.rs

1//! Multi-hop paths over the Tor network.
2//!
3//! Right now, we only implement "client circuits" -- also sometimes
4//! called "origin circuits".  A client circuit is one that is
5//! constructed by this Tor instance, and used in its own behalf to
6//! send data over the Tor network.
7//!
8//! Each circuit has multiple hops over the Tor network: each hop
9//! knows only the hop before and the hop after.  The client shares a
10//! separate set of keys with each hop.
11//!
12//! To build a circuit, first create a [crate::channel::Channel], then
13//! call its [crate::channel::Channel::new_tunnel] method.  This yields
14//! a [PendingClientTunnel] object that won't become live until you call
15//! one of the methods
16//! (typically [`PendingClientTunnel::create_firsthop`])
17//! that extends it to its first hop.  After you've
18//! done that, you can call [`ClientCirc::extend`] on the tunnel to
19//! build it into a multi-hop tunnel.  Finally, you can use
20//! [ClientTunnel::begin_stream] to get a Stream object that can be used
21//! for anonymized data.
22//!
23//! # Implementation
24//!
25//! Each open circuit has a corresponding Reactor object that runs in
26//! an asynchronous task, and manages incoming cells from the
27//! circuit's upstream channel.  These cells are either RELAY cells or
28//! DESTROY cells.  DESTROY cells are handled immediately.
29//! RELAY cells are either for a particular stream, in which case they
30//! get forwarded to a RawCellStream object, or for no particular stream,
31//! in which case they are considered "meta" cells (like EXTENDED2)
32//! that should only get accepted if something is waiting for them.
33//!
34//! # Limitations
35//!
36//! This is client-only.
37
38pub(crate) mod halfcirc;
39
40#[cfg(feature = "hs-common")]
41pub mod handshake;
42#[cfg(not(feature = "hs-common"))]
43pub(crate) mod handshake;
44
45pub(crate) mod padding;
46
47pub(super) mod path;
48
49use crate::channel::Channel;
50use crate::circuit::circhop::{HopNegotiationType, HopSettings};
51use crate::circuit::{CircuitRxReceiver, celltypes::*};
52#[cfg(feature = "circ-padding-manual")]
53use crate::client::CircuitPadder;
54use crate::client::circuit::padding::{PaddingController, PaddingEventStream};
55use crate::client::reactor::{CircuitHandshake, CtrlCmd, CtrlMsg, Reactor};
56use crate::crypto::cell::HopNum;
57use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
58use crate::memquota::CircuitAccount;
59use crate::util::skew::ClockSkew;
60use crate::{Error, Result};
61use derive_deftly::Deftly;
62use educe::Educe;
63use path::HopDetail;
64use tor_cell::chancell::{
65    CircId,
66    msg::{self as chanmsg},
67};
68use tor_error::{bad_api_usage, internal, into_internal};
69use tor_linkspec::{CircTarget, LinkSpecType, OwnedChanTarget, RelayIdType};
70use tor_protover::named;
71use tor_rtcompat::DynTimeProvider;
72
73use crate::circuit::UniqId;
74
75use super::{ClientTunnel, TargetHop};
76
77use futures::channel::mpsc;
78use oneshot_fused_workaround as oneshot;
79
80use futures::FutureExt as _;
81use std::collections::HashMap;
82use std::sync::{Arc, Mutex};
83use tor_memquota::derive_deftly_template_HasMemoryCost;
84
85use crate::crypto::handshake::ntor::NtorPublicKey;
86
87#[cfg(test)]
88use crate::stream::{StreamMpscReceiver, StreamMpscSender};
89
90pub use crate::crypto::binding::CircuitBinding;
91pub use path::{Path, PathEntry};
92
93/// The size of the buffer for communication between `ClientCirc` and its reactor.
94pub const CIRCUIT_BUFFER_SIZE: usize = 128;
95
96// TODO: export this from the top-level instead (it's not client-specific).
97pub use crate::circuit::CircParameters;
98
99// TODO(relay): reexport this from somewhere else (it's not client-specific)
100pub use crate::util::timeout::TimeoutEstimator;
101
102/// A subclass of ChanMsg that can correctly arrive on a live client
103/// circuit (one where a CREATED* has been received).
104#[derive(Debug, Deftly)]
105#[allow(unreachable_pub)] // Only `pub` with feature `testing`; otherwise, visible in crate
106#[derive_deftly(HasMemoryCost)]
107#[derive_deftly(RestrictedChanMsgSet)]
108#[deftly(usage = "on an open client circuit")]
109pub(super) enum ClientCircChanMsg {
110    /// A relay cell telling us some kind of remote command from some
111    /// party on the circuit.
112    Relay(chanmsg::Relay),
113    /// A cell telling us to destroy the circuit.
114    Destroy(chanmsg::Destroy),
115    // Note: RelayEarly is not valid for clients!
116}
117
118#[derive(Debug)]
119/// A circuit that we have constructed over the Tor network.
120///
121/// # Circuit life cycle
122///
123/// `ClientCirc`s are created in an initially unusable state using [`Channel::new_tunnel`],
124/// which returns a [`PendingClientTunnel`].  To get a real (one-hop) tunnel from
125/// one of these, you invoke one of its `create_firsthop` methods (typically
126/// [`create_firsthop_fast()`](PendingClientTunnel::create_firsthop_fast) or
127/// [`create_firsthop()`](PendingClientTunnel::create_firsthop)).
128/// Then, to add more hops to the circuit, you can call
129/// [`extend()`](ClientCirc::extend) on it.
130///
131/// For higher-level APIs, see the `tor-circmgr` crate: the ones here in
132/// `tor-proto` are probably not what you need.
133///
134/// After a circuit is created, it will persist until it is closed in one of
135/// five ways:
136///    1. A remote error occurs.
137///    2. Some hop on the circuit sends a `DESTROY` message to tear down the
138///       circuit.
139///    3. The circuit's channel is closed.
140///    4. Someone calls [`ClientTunnel::terminate`] on the tunnel owning the circuit.
141///    5. The last reference to the `ClientCirc` is dropped. (Note that every stream
142///       on a `ClientCirc` keeps a reference to it, which will in turn keep the
143///       circuit from closing until all those streams have gone away.)
144///
145/// Note that in cases 1-4 the [`ClientCirc`] object itself will still exist: it
146/// will just be unusable for most purposes.  Most operations on it will fail
147/// with an error.
148//
149// Effectively, this struct contains two Arcs: one for `path` and one for
150// `control` (which surely has something Arc-like in it).  We cannot unify
151// these by putting a single Arc around the whole struct, and passing
152// an Arc strong reference to the `Reactor`, because then `control` would
153// not be dropped when the last user of the circuit goes away.  We could
154// make the reactor have a weak reference but weak references are more
155// expensive to dereference.
156//
157// Because of the above, cloning this struct is always going to involve
158// two atomic refcount changes/checks.  Wrapping it in another Arc would
159// be overkill.
160//
161pub struct ClientCirc {
162    /// Mutable state shared with the `Reactor`.
163    pub(super) mutable: Arc<TunnelMutableState>,
164    /// A unique identifier for this circuit.
165    unique_id: UniqId,
166    /// Channel to send control messages to the reactor.
167    pub(super) control: mpsc::UnboundedSender<CtrlMsg>,
168    /// Channel to send commands to the reactor.
169    pub(super) command: mpsc::UnboundedSender<CtrlCmd>,
170    /// A future that resolves to Cancelled once the reactor is shut down,
171    /// meaning that the circuit is closed.
172    #[cfg_attr(not(feature = "experimental-api"), allow(dead_code))]
173    reactor_closed_rx: futures::future::Shared<oneshot::Receiver<void::Void>>,
174    /// For testing purposes: the CircId, for use in peek_circid().
175    #[cfg(test)]
176    circid: CircId,
177    /// Memory quota account
178    pub(super) memquota: CircuitAccount,
179    /// Time provider
180    pub(super) time_provider: DynTimeProvider,
181    /// Indicate if this reactor is a multi path or not. This is flagged at the very first
182    /// LinkCircuit seen and never changed after.
183    ///
184    /// We can't just look at the number of legs because a multi path tunnel could have 1 leg only
185    /// because the other(s) have collapsed.
186    ///
187    /// This is very important because it allows to make a quick efficient safety check by the
188    /// circmgr higher level tunnel type without locking the mutable state or using the command
189    /// channel.
190    pub(super) is_multi_path: bool,
191}
192
193/// The mutable state of a tunnel, shared between [`ClientCirc`] and [`Reactor`].
194///
195/// NOTE(gabi): this mutex-inside-a-mutex might look suspicious,
196/// but it is currently the best option we have for sharing
197/// the circuit state with `ClientCirc` (and soon, with `ClientTunnel`).
198/// In practice, these mutexes won't be accessed very often
199/// (they're accessed for writing when a circuit is extended,
200/// and for reading by the various `ClientCirc` APIs),
201/// so they shouldn't really impact performance.
202///
203/// Alternatively, the circuit state information could be shared
204/// outside the reactor through a channel (passed to the reactor via a `CtrlCmd`),
205/// but in #1840 @opara notes that involves making the `ClientCirc` accessors
206/// (`ClientCirc::path`, `ClientCirc::binding_key`, etc.)
207/// asynchronous, which will significantly complicate their callsites,
208/// which would in turn need to be made async too.
209///
210/// We should revisit this decision at some point, and decide whether an async API
211/// would be preferable.
212#[derive(Debug, Default)]
213pub(super) struct TunnelMutableState(Mutex<HashMap<UniqId, Arc<MutableState>>>);
214
215impl TunnelMutableState {
216    /// Add the [`MutableState`] of a circuit.
217    pub(super) fn insert(&self, unique_id: UniqId, mutable: Arc<MutableState>) {
218        #[allow(unused)] // unused in non-debug builds
219        let state = self
220            .0
221            .lock()
222            .expect("lock poisoned")
223            .insert(unique_id, mutable);
224
225        debug_assert!(state.is_none());
226    }
227
228    /// Remove the [`MutableState`] of a circuit.
229    pub(super) fn remove(&self, unique_id: UniqId) {
230        #[allow(unused)] // unused in non-debug builds
231        let state = self.0.lock().expect("lock poisoned").remove(&unique_id);
232
233        debug_assert!(state.is_some());
234    }
235
236    /// Return a [`Path`] object describing all the circuits in this tunnel.
237    fn all_paths(&self) -> Vec<Arc<Path>> {
238        let lock = self.0.lock().expect("lock poisoned");
239        lock.values().map(|mutable| mutable.path()).collect()
240    }
241
242    /// Return a list of [`Path`] objects describing the only circuit in this tunnel.
243    ///
244    /// Returns an error if the tunnel has more than one tunnel.
245    //
246    // TODO: replace Itertools::exactly_one() with a stdlib equivalent when there is one.
247    //
248    // See issue #48919 <https://github.com/rust-lang/rust/issues/48919>
249    #[allow(unstable_name_collisions)]
250    fn single_path(&self) -> Result<Arc<Path>> {
251        use itertools::Itertools as _;
252
253        self.all_paths().into_iter().exactly_one().map_err(|_| {
254            bad_api_usage!("requested the single path of a multi-path tunnel?!").into()
255        })
256    }
257
258    /// Return a description of the first hop of this circuit.
259    ///
260    /// Returns an error if a circuit with the specified [`UniqId`] doesn't exist.
261    /// Returns `Ok(None)` if the specified circuit doesn't have any hops.
262    fn first_hop(&self, unique_id: UniqId) -> Result<Option<OwnedChanTarget>> {
263        let lock = self.0.lock().expect("lock poisoned");
264        let mutable = lock
265            .get(&unique_id)
266            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
267
268        let first_hop = mutable.first_hop().map(|first_hop| match first_hop {
269            path::HopDetail::Relay(r) => r,
270            #[cfg(feature = "hs-common")]
271            path::HopDetail::Virtual => {
272                panic!("somehow made a circuit with a virtual first hop.")
273            }
274        });
275
276        Ok(first_hop)
277    }
278
279    /// Return the [`HopNum`] of the last hop of the specified circuit.
280    ///
281    /// Returns an error if a circuit with the specified [`UniqId`] doesn't exist.
282    ///
283    /// See [`MutableState::last_hop_num`].
284    pub(super) fn last_hop_num(&self, unique_id: UniqId) -> Result<Option<HopNum>> {
285        let lock = self.0.lock().expect("lock poisoned");
286        let mutable = lock
287            .get(&unique_id)
288            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
289
290        Ok(mutable.last_hop_num())
291    }
292
293    /// Return the number of hops in the specified circuit.
294    ///
295    /// See [`MutableState::n_hops`].
296    fn n_hops(&self, unique_id: UniqId) -> Result<usize> {
297        let lock = self.0.lock().expect("lock poisoned");
298        let mutable = lock
299            .get(&unique_id)
300            .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
301
302        Ok(mutable.n_hops())
303    }
304}
305
306/// The mutable state of a circuit.
307#[derive(Educe, Default)]
308#[educe(Debug)]
309pub(super) struct MutableState(Mutex<CircuitState>);
310
311impl MutableState {
312    /// Add a hop to the path of this circuit.
313    pub(super) fn add_hop(&self, peer_id: HopDetail, binding: Option<CircuitBinding>) {
314        let mut mutable = self.0.lock().expect("poisoned lock");
315        Arc::make_mut(&mut mutable.path).push_hop(peer_id);
316        mutable.binding.push(binding);
317    }
318
319    /// Get a copy of the circuit's current [`path::Path`].
320    pub(super) fn path(&self) -> Arc<path::Path> {
321        let mutable = self.0.lock().expect("poisoned lock");
322        Arc::clone(&mutable.path)
323    }
324
325    /// Return the cryptographic material used to prove knowledge of a shared
326    /// secret with with `hop`.
327    pub(super) fn binding_key(&self, hop: HopNum) -> Option<CircuitBinding> {
328        let mutable = self.0.lock().expect("poisoned lock");
329
330        mutable.binding.get::<usize>(hop.into()).cloned().flatten()
331        // NOTE: I'm not thrilled to have to copy this information, but we use
332        // it very rarely, so it's not _that_ bad IMO.
333    }
334
335    /// Return a description of the first hop of this circuit.
336    fn first_hop(&self) -> Option<HopDetail> {
337        let mutable = self.0.lock().expect("poisoned lock");
338        mutable.path.first_hop()
339    }
340
341    /// Return the [`HopNum`] of the last hop of this circuit.
342    ///
343    /// NOTE: This function will return the [`HopNum`] of the hop
344    /// that is _currently_ the last. If there is an extend operation in progress,
345    /// the currently pending hop may or may not be counted, depending on whether
346    /// the extend operation finishes before this call is done.
347    fn last_hop_num(&self) -> Option<HopNum> {
348        let mutable = self.0.lock().expect("poisoned lock");
349        mutable.path.last_hop_num()
350    }
351
352    /// Return the number of hops in this circuit.
353    ///
354    /// NOTE: This function will currently return only the number of hops
355    /// _currently_ in the circuit. If there is an extend operation in progress,
356    /// the currently pending hop may or may not be counted, depending on whether
357    /// the extend operation finishes before this call is done.
358    fn n_hops(&self) -> usize {
359        let mutable = self.0.lock().expect("poisoned lock");
360        mutable.path.n_hops()
361    }
362}
363
364/// The shared state of a circuit.
365#[derive(Educe, Default)]
366#[educe(Debug)]
367pub(super) struct CircuitState {
368    /// Information about this circuit's path.
369    ///
370    /// This is stored in an Arc so that we can cheaply give a copy of it to
371    /// client code; when we need to add a hop (which is less frequent) we use
372    /// [`Arc::make_mut()`].
373    path: Arc<path::Path>,
374
375    /// Circuit binding keys [q.v.][`CircuitBinding`] information for each hop
376    /// in the circuit's path.
377    ///
378    /// NOTE: Right now, there is a `CircuitBinding` for every hop.  There's a
379    /// fair chance that this will change in the future, and I don't want other
380    /// code to assume that a `CircuitBinding` _must_ exist, so I'm making this
381    /// an `Option`.
382    #[educe(Debug(ignore))]
383    binding: Vec<Option<CircuitBinding>>,
384}
385
386/// A ClientCirc that needs to send a create cell and receive a created* cell.
387///
388/// To use one of these, call `create_firsthop_fast()` or `create_firsthop()`
389/// to negotiate the cryptographic handshake with the first hop.
390pub struct PendingClientTunnel {
391    /// A oneshot receiver on which we'll receive a CREATED* cell,
392    /// or a DESTROY cell.
393    recvcreated: oneshot::Receiver<CreateResponse>,
394    /// The ClientCirc object that we can expose on success.
395    circ: ClientCirc,
396}
397
398impl ClientCirc {
399    /// Convert this `ClientCirc` into a single circuit [`ClientTunnel`].
400    pub fn into_tunnel(self) -> Result<ClientTunnel> {
401        self.try_into()
402    }
403
404    /// Return a description of the first hop of this circuit.
405    ///
406    /// # Panics
407    ///
408    /// Panics if there is no first hop.  (This should be impossible outside of
409    /// the tor-proto crate, but within the crate it's possible to have a
410    /// circuit with no hops.)
411    pub fn first_hop(&self) -> Result<OwnedChanTarget> {
412        Ok(self
413            .mutable
414            .first_hop(self.unique_id)
415            .map_err(|_| Error::CircuitClosed)?
416            .expect("called first_hop on an un-constructed circuit"))
417    }
418
419    /// Return a description of the last hop of the tunnel.
420    ///
421    /// Return None if the last hop is virtual.
422    ///
423    /// # Panics
424    ///
425    /// Panics if there is no last hop.  (This should be impossible outside of
426    /// the tor-proto crate, but within the crate it's possible to have a
427    /// circuit with no hops.)
428    pub fn last_hop_info(&self) -> Result<Option<OwnedChanTarget>> {
429        let all_paths = self.all_paths();
430        let path = all_paths.first().ok_or_else(|| {
431            tor_error::bad_api_usage!("Called last_hop_info an an un-constructed tunnel")
432        })?;
433        Ok(path
434            .hops()
435            .last()
436            .expect("Called last_hop an an un-constructed circuit")
437            .as_chan_target()
438            .map(OwnedChanTarget::from_chan_target))
439    }
440
441    /// Return the [`HopNum`] of the last hop of this circuit.
442    ///
443    /// Returns an error if there is no last hop.  (This should be impossible outside of the
444    /// tor-proto crate, but within the crate it's possible to have a circuit with no hops.)
445    ///
446    /// NOTE: This function will return the [`HopNum`] of the hop
447    /// that is _currently_ the last. If there is an extend operation in progress,
448    /// the currently pending hop may or may not be counted, depending on whether
449    /// the extend operation finishes before this call is done.
450    pub fn last_hop_num(&self) -> Result<HopNum> {
451        Ok(self
452            .mutable
453            .last_hop_num(self.unique_id)?
454            .ok_or_else(|| internal!("no last hop index"))?)
455    }
456
457    /// Return a [`TargetHop`] representing precisely the last hop of the circuit as in set as a
458    /// HopLocation with its id and hop number.
459    ///
460    /// Return an error if there is no last hop.
461    pub fn last_hop(&self) -> Result<TargetHop> {
462        let hop_num = self
463            .mutable
464            .last_hop_num(self.unique_id)?
465            .ok_or_else(|| bad_api_usage!("no last hop"))?;
466        Ok((self.unique_id, hop_num).into())
467    }
468
469    /// Return a list of [`Path`] objects describing all the circuits in this tunnel.
470    ///
471    /// Note that these `Path`s are not automatically updated if the underlying
472    /// circuits are extended.
473    pub fn all_paths(&self) -> Vec<Arc<Path>> {
474        self.mutable.all_paths()
475    }
476
477    /// Return a list of [`Path`] objects describing the only circuit in this tunnel.
478    ///
479    /// Returns an error if the tunnel has more than one tunnel.
480    pub fn single_path(&self) -> Result<Arc<Path>> {
481        self.mutable.single_path()
482    }
483
484    /// Return the time at which this circuit last had any open streams.
485    ///
486    /// Returns `None` if this circuit has never had any open streams,
487    /// or if it currently has open streams.
488    ///
489    /// NOTE that the Instant returned by this method is not affected by
490    /// any runtime mocking; it is the output of an ordinary call to
491    /// `Instant::now()`.
492    pub async fn disused_since(&self) -> Result<Option<std::time::Instant>> {
493        let (tx, rx) = oneshot::channel();
494        self.command
495            .unbounded_send(CtrlCmd::GetTunnelActivity { sender: tx })
496            .map_err(|_| Error::CircuitClosed)?;
497
498        Ok(rx.await.map_err(|_| Error::CircuitClosed)?.disused_since())
499    }
500
501    /// Get the clock skew claimed by the first hop of the circuit.
502    ///
503    /// See [`Channel::clock_skew()`].
504    pub async fn first_hop_clock_skew(&self) -> Result<ClockSkew> {
505        let (tx, rx) = oneshot::channel();
506
507        self.control
508            .unbounded_send(CtrlMsg::FirstHopClockSkew { answer: tx })
509            .map_err(|_| Error::CircuitClosed)?;
510
511        Ok(rx.await.map_err(|_| Error::CircuitClosed)??)
512    }
513
514    /// Return a reference to this circuit's memory quota account
515    pub fn mq_account(&self) -> &CircuitAccount {
516        &self.memquota
517    }
518
519    /// Return the cryptographic material used to prove knowledge of a shared
520    /// secret with with `hop`.
521    ///
522    /// See [`CircuitBinding`] for more information on how this is used.
523    ///
524    /// Return None if we have no circuit binding information for the hop, or if
525    /// the hop does not exist.
526    #[cfg(feature = "hs-service")]
527    pub async fn binding_key(&self, hop: TargetHop) -> Result<Option<CircuitBinding>> {
528        let (sender, receiver) = oneshot::channel();
529        let msg = CtrlCmd::GetBindingKey { hop, done: sender };
530        self.command
531            .unbounded_send(msg)
532            .map_err(|_| Error::CircuitClosed)?;
533
534        receiver.await.map_err(|_| Error::CircuitClosed)?
535    }
536
537    /// Extend the circuit, via the most appropriate circuit extension handshake,
538    /// to the chosen `target` hop.
539    pub async fn extend<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
540    where
541        Tg: CircTarget,
542    {
543        #![allow(deprecated)]
544
545        // For now we use the simplest decision-making mechanism:
546        // we use ntor_v3 whenever it is present; and otherwise we use ntor.
547        //
548        // This behavior is slightly different from C tor, which uses ntor v3
549        // only whenever it want to send any extension in the circuit message.
550        // But thanks to congestion control (named::FLOWCTRL_CC), we'll _always_
551        // want to use an extension if we can, and so it doesn't make too much
552        // sense to detect the case where we have no extensions.
553        //
554        // (As of April 2025, RELAY_NTORV3 is not yet listed as Required for relays
555        // on the tor network, and so we cannot simply assume that everybody has it.)
556        if target
557            .protovers()
558            .supports_named_subver(named::RELAY_NTORV3)
559        {
560            self.extend_ntor_v3(target, params).await
561        } else {
562            self.extend_ntor(target, params).await
563        }
564    }
565
566    /// Extend the circuit via the ntor handshake to a new target last
567    /// hop.
568    #[deprecated(since = "1.6.1", note = "Use extend instead.")]
569    pub async fn extend_ntor<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
570    where
571        Tg: CircTarget,
572    {
573        let key = NtorPublicKey {
574            id: *target
575                .rsa_identity()
576                .ok_or(Error::MissingId(RelayIdType::Rsa))?,
577            pk: *target.ntor_onion_key(),
578        };
579        let mut linkspecs = target
580            .linkspecs()
581            .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
582        if !params.extend_by_ed25519_id {
583            linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
584        }
585
586        let (tx, rx) = oneshot::channel();
587
588        let peer_id = OwnedChanTarget::from_chan_target(target);
589        let settings = HopSettings::from_params_and_caps(
590            HopNegotiationType::None,
591            &params,
592            target.protovers(),
593        )?;
594        self.control
595            .unbounded_send(CtrlMsg::ExtendNtor {
596                peer_id,
597                public_key: key,
598                linkspecs,
599                settings,
600                done: tx,
601            })
602            .map_err(|_| Error::CircuitClosed)?;
603
604        rx.await.map_err(|_| Error::CircuitClosed)??;
605
606        Ok(())
607    }
608
609    /// Extend the circuit via the ntor handshake to a new target last
610    /// hop.
611    #[deprecated(since = "1.6.1", note = "Use extend instead.")]
612    pub async fn extend_ntor_v3<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
613    where
614        Tg: CircTarget,
615    {
616        let key = NtorV3PublicKey {
617            id: *target
618                .ed_identity()
619                .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
620            pk: *target.ntor_onion_key(),
621        };
622        let mut linkspecs = target
623            .linkspecs()
624            .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
625        if !params.extend_by_ed25519_id {
626            linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
627        }
628
629        let (tx, rx) = oneshot::channel();
630
631        let peer_id = OwnedChanTarget::from_chan_target(target);
632        let settings = HopSettings::from_params_and_caps(
633            HopNegotiationType::Full,
634            &params,
635            target.protovers(),
636        )?;
637        self.control
638            .unbounded_send(CtrlMsg::ExtendNtorV3 {
639                peer_id,
640                public_key: key,
641                linkspecs,
642                settings,
643                done: tx,
644            })
645            .map_err(|_| Error::CircuitClosed)?;
646
647        rx.await.map_err(|_| Error::CircuitClosed)??;
648
649        Ok(())
650    }
651
652    /// Extend this circuit by a single, "virtual" hop.
653    ///
654    /// A virtual hop is one for which we do not add an actual network connection
655    /// between separate hosts (such as Relays).  We only add a layer of
656    /// cryptography.
657    ///
658    /// This is used to implement onion services: the client and the service
659    /// both build a circuit to a single rendezvous point, and tell the
660    /// rendezvous point to relay traffic between their two circuits.  Having
661    /// completed a [`handshake`] out of band[^1], the parties each extend their
662    /// circuits by a single "virtual" encryption hop that represents their
663    /// shared cryptographic context.
664    ///
665    /// Once a circuit has been extended in this way, it is an error to try to
666    /// extend it in any other way.
667    ///
668    /// [^1]: Technically, the handshake is only _mostly_ out of band: the
669    ///     client sends their half of the handshake in an ` message, and the
670    ///     service's response is inline in its `RENDEZVOUS2` message.
671    //
672    // TODO hs: let's try to enforce the "you can't extend a circuit again once
673    // it has been extended this way" property.  We could do that with internal
674    // state, or some kind of a type state pattern.
675    #[cfg(feature = "hs-common")]
676    pub async fn extend_virtual(
677        &self,
678        protocol: handshake::RelayProtocol,
679        role: handshake::HandshakeRole,
680        seed: impl handshake::KeyGenerator,
681        params: &CircParameters,
682        capabilities: &tor_protover::Protocols,
683    ) -> Result<()> {
684        use self::handshake::BoxedClientLayer;
685
686        // TODO CGO: Possibly refactor this match into a separate method when we revisit this.
687        let negotiation_type = match protocol {
688            handshake::RelayProtocol::HsV3 => HopNegotiationType::HsV3,
689        };
690        let protocol = handshake::RelayCryptLayerProtocol::from(protocol);
691
692        let BoxedClientLayer { fwd, back, binding } =
693            protocol.construct_client_layers(role, seed)?;
694
695        let settings = HopSettings::from_params_and_caps(negotiation_type, params, capabilities)?;
696        let (tx, rx) = oneshot::channel();
697        let message = CtrlCmd::ExtendVirtual {
698            cell_crypto: (fwd, back, binding),
699            settings,
700            done: tx,
701        };
702
703        self.command
704            .unbounded_send(message)
705            .map_err(|_| Error::CircuitClosed)?;
706
707        rx.await.map_err(|_| Error::CircuitClosed)?
708    }
709
710    /// Install a [`CircuitPadder`] at the listed `hop`.
711    ///
712    /// Replaces any previous padder installed at that hop.
713    #[cfg(feature = "circ-padding-manual")]
714    pub async fn start_padding_at_hop(&self, hop: HopNum, padder: CircuitPadder) -> Result<()> {
715        self.set_padder_impl(crate::HopLocation::Hop((self.unique_id, hop)), Some(padder))
716            .await
717    }
718
719    /// Remove any [`CircuitPadder`] at the listed `hop`.
720    ///
721    /// Does nothing if there was not a padder installed there.
722    #[cfg(feature = "circ-padding-manual")]
723    pub async fn stop_padding_at_hop(&self, hop: HopNum) -> Result<()> {
724        self.set_padder_impl(crate::HopLocation::Hop((self.unique_id, hop)), None)
725            .await
726    }
727
728    /// Helper: replace the padder at `hop` with the provided `padder`, or with `None`.
729    #[cfg(feature = "circ-padding-manual")]
730    pub(super) async fn set_padder_impl(
731        &self,
732        hop: crate::HopLocation,
733        padder: Option<CircuitPadder>,
734    ) -> Result<()> {
735        let (tx, rx) = oneshot::channel();
736        let msg = CtrlCmd::SetPadder {
737            hop,
738            padder,
739            sender: tx,
740        };
741        self.command
742            .unbounded_send(msg)
743            .map_err(|_| Error::CircuitClosed)?;
744        rx.await.map_err(|_| Error::CircuitClosed)?
745    }
746
747    /// Return true if this circuit is closed and therefore unusable.
748    pub fn is_closing(&self) -> bool {
749        self.control.is_closed()
750    }
751
752    /// Return a process-unique identifier for this circuit.
753    pub fn unique_id(&self) -> UniqId {
754        self.unique_id
755    }
756
757    /// Return the number of hops in this circuit.
758    ///
759    /// NOTE: This function will currently return only the number of hops
760    /// _currently_ in the circuit. If there is an extend operation in progress,
761    /// the currently pending hop may or may not be counted, depending on whether
762    /// the extend operation finishes before this call is done.
763    pub fn n_hops(&self) -> Result<usize> {
764        self.mutable
765            .n_hops(self.unique_id)
766            .map_err(|_| Error::CircuitClosed)
767    }
768
769    /// Return a future that will resolve once this circuit has closed.
770    ///
771    /// Note that this method does not itself cause the circuit to shut down.
772    ///
773    /// TODO: Perhaps this should return some kind of status indication instead
774    /// of just ()
775    pub fn wait_for_close(
776        &self,
777    ) -> impl futures::Future<Output = ()> + Send + Sync + 'static + use<> {
778        self.reactor_closed_rx.clone().map(|_| ())
779    }
780}
781
782impl PendingClientTunnel {
783    /// Instantiate a new circuit object: used from Channel::new_tunnel().
784    ///
785    /// Does not send a CREATE* cell on its own.
786    #[allow(clippy::too_many_arguments)]
787    pub(crate) fn new(
788        id: CircId,
789        channel: Arc<Channel>,
790        createdreceiver: oneshot::Receiver<CreateResponse>,
791        input: CircuitRxReceiver,
792        unique_id: UniqId,
793        runtime: DynTimeProvider,
794        memquota: CircuitAccount,
795        padding_ctrl: PaddingController,
796        padding_stream: PaddingEventStream,
797        timeouts: Arc<dyn TimeoutEstimator>,
798    ) -> (PendingClientTunnel, crate::client::reactor::Reactor) {
799        let time_provider = channel.time_provider().clone();
800        let (reactor, control_tx, command_tx, reactor_closed_rx, mutable) = Reactor::new(
801            channel,
802            id,
803            unique_id,
804            input,
805            runtime,
806            memquota.clone(),
807            padding_ctrl,
808            padding_stream,
809            timeouts,
810        );
811
812        let circuit = ClientCirc {
813            mutable,
814            unique_id,
815            control: control_tx,
816            command: command_tx,
817            reactor_closed_rx: reactor_closed_rx.shared(),
818            #[cfg(test)]
819            circid: id,
820            memquota,
821            time_provider,
822            is_multi_path: false,
823        };
824
825        let pending = PendingClientTunnel {
826            recvcreated: createdreceiver,
827            circ: circuit,
828        };
829        (pending, reactor)
830    }
831
832    /// Extract the process-unique identifier for this pending circuit.
833    pub fn peek_unique_id(&self) -> UniqId {
834        self.circ.unique_id
835    }
836
837    /// Use the (questionable!) CREATE_FAST handshake to connect to the
838    /// first hop of this circuit.
839    ///
840    /// There's no authentication in CRATE_FAST,
841    /// so we don't need to know whom we're connecting to: we're just
842    /// connecting to whichever relay the channel is for.
843    pub async fn create_firsthop_fast(self, params: CircParameters) -> Result<ClientTunnel> {
844        // We know nothing about this relay, so we assume it supports no protocol capabilities at all.
845        //
846        // TODO: If we had a consensus, we could assume it supported all required-relay-protocols.
847        // TODO prop364: When we implement CreateOneHop, we will want a Protocols argument here.
848        let protocols = tor_protover::Protocols::new();
849        let settings =
850            HopSettings::from_params_and_caps(HopNegotiationType::None, &params, &protocols)?;
851        let (tx, rx) = oneshot::channel();
852        self.circ
853            .control
854            .unbounded_send(CtrlMsg::Create {
855                recv_created: self.recvcreated,
856                handshake: CircuitHandshake::CreateFast,
857                settings,
858                done: tx,
859            })
860            .map_err(|_| Error::CircuitClosed)?;
861
862        rx.await.map_err(|_| Error::CircuitClosed)??;
863
864        self.circ.into_tunnel()
865    }
866
867    /// Use the most appropriate handshake to connect to the first hop of this circuit.
868    ///
869    /// Note that the provided 'target' must match the channel's target,
870    /// or the handshake will fail.
871    pub async fn create_firsthop<Tg>(
872        self,
873        target: &Tg,
874        params: CircParameters,
875    ) -> Result<ClientTunnel>
876    where
877        Tg: tor_linkspec::CircTarget,
878    {
879        #![allow(deprecated)]
880        // (See note in ClientCirc::extend.)
881        if target
882            .protovers()
883            .supports_named_subver(named::RELAY_NTORV3)
884        {
885            self.create_firsthop_ntor_v3(target, params).await
886        } else {
887            self.create_firsthop_ntor(target, params).await
888        }
889    }
890
891    /// Use the ntor handshake to connect to the first hop of this circuit.
892    ///
893    /// Note that the provided 'target' must match the channel's target,
894    /// or the handshake will fail.
895    #[deprecated(since = "1.6.1", note = "Use create_firsthop instead.")]
896    pub async fn create_firsthop_ntor<Tg>(
897        self,
898        target: &Tg,
899        params: CircParameters,
900    ) -> Result<ClientTunnel>
901    where
902        Tg: tor_linkspec::CircTarget,
903    {
904        let (tx, rx) = oneshot::channel();
905        let settings = HopSettings::from_params_and_caps(
906            HopNegotiationType::None,
907            &params,
908            target.protovers(),
909        )?;
910
911        self.circ
912            .control
913            .unbounded_send(CtrlMsg::Create {
914                recv_created: self.recvcreated,
915                handshake: CircuitHandshake::Ntor {
916                    public_key: NtorPublicKey {
917                        id: *target
918                            .rsa_identity()
919                            .ok_or(Error::MissingId(RelayIdType::Rsa))?,
920                        pk: *target.ntor_onion_key(),
921                    },
922                    ed_identity: *target
923                        .ed_identity()
924                        .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
925                },
926                settings,
927                done: tx,
928            })
929            .map_err(|_| Error::CircuitClosed)?;
930
931        rx.await.map_err(|_| Error::CircuitClosed)??;
932
933        self.circ.into_tunnel()
934    }
935
936    /// Use the ntor_v3 handshake to connect to the first hop of this circuit.
937    ///
938    /// Assumes that the target supports ntor_v3. The caller should verify
939    /// this before calling this function, e.g. by validating that the target
940    /// has advertised ["Relay=4"](https://spec.torproject.org/tor-spec/subprotocol-versioning.html#relay).
941    ///
942    /// Note that the provided 'target' must match the channel's target,
943    /// or the handshake will fail.
944    #[deprecated(since = "1.6.1", note = "Use create_firsthop instead.")]
945    pub async fn create_firsthop_ntor_v3<Tg>(
946        self,
947        target: &Tg,
948        params: CircParameters,
949    ) -> Result<ClientTunnel>
950    where
951        Tg: tor_linkspec::CircTarget,
952    {
953        let settings = HopSettings::from_params_and_caps(
954            HopNegotiationType::Full,
955            &params,
956            target.protovers(),
957        )?;
958        let (tx, rx) = oneshot::channel();
959
960        self.circ
961            .control
962            .unbounded_send(CtrlMsg::Create {
963                recv_created: self.recvcreated,
964                handshake: CircuitHandshake::NtorV3 {
965                    public_key: NtorV3PublicKey {
966                        id: *target
967                            .ed_identity()
968                            .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
969                        pk: *target.ntor_onion_key(),
970                    },
971                },
972                settings,
973                done: tx,
974            })
975            .map_err(|_| Error::CircuitClosed)?;
976
977        rx.await.map_err(|_| Error::CircuitClosed)??;
978
979        self.circ.into_tunnel()
980    }
981}
982
983#[cfg(test)]
984pub(crate) mod test {
985    // @@ begin test lint list maintained by maint/add_warning @@
986    #![allow(clippy::bool_assert_comparison)]
987    #![allow(clippy::clone_on_copy)]
988    #![allow(clippy::dbg_macro)]
989    #![allow(clippy::mixed_attributes_style)]
990    #![allow(clippy::print_stderr)]
991    #![allow(clippy::print_stdout)]
992    #![allow(clippy::single_char_pattern)]
993    #![allow(clippy::unwrap_used)]
994    #![allow(clippy::unchecked_time_subtraction)]
995    #![allow(clippy::useless_vec)]
996    #![allow(clippy::needless_pass_by_value)]
997    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
998
999    use super::*;
1000    use crate::channel::test::{CodecResult, new_reactor};
1001    use crate::circuit::CircuitRxSender;
1002    use crate::client::circuit::padding::new_padding;
1003    use crate::client::stream::DataStream;
1004    #[cfg(feature = "hs-service")]
1005    use crate::client::stream::IncomingStreamRequestFilter;
1006    use crate::congestion::params::CongestionControlParams;
1007    use crate::congestion::test_utils::params::build_cc_vegas_params;
1008    use crate::crypto::cell::RelayCellBody;
1009    use crate::crypto::handshake::ntor_v3::NtorV3Server;
1010    use crate::memquota::SpecificAccount as _;
1011    use crate::stream::flow_ctrl::params::FlowCtrlParameters;
1012    use crate::util::DummyTimeoutEstimator;
1013    use assert_matches::assert_matches;
1014    use chanmsg::{AnyChanMsg, Created2, CreatedFast};
1015    use futures::channel::mpsc::{Receiver, Sender};
1016    use futures::io::{AsyncReadExt, AsyncWriteExt};
1017    use futures::sink::SinkExt;
1018    use futures::stream::StreamExt;
1019    use hex_literal::hex;
1020    use std::collections::{HashMap, VecDeque};
1021    use std::fmt::Debug;
1022    use std::time::Duration;
1023    use tor_basic_utils::test_rng::testing_rng;
1024    use tor_cell::chancell::{AnyChanCell, BoxedCellBody, ChanCell, ChanCmd, msg as chanmsg};
1025    use tor_cell::relaycell::extend::{self as extend_ext, CircRequestExt, CircResponseExt};
1026    use tor_cell::relaycell::msg::SendmeTag;
1027    use tor_cell::relaycell::{
1028        AnyRelayMsgOuter, RelayCellFormat, RelayCmd, StreamId, msg as relaymsg, msg::AnyRelayMsg,
1029    };
1030    use tor_cell::relaycell::{RelayMsg, UnparsedRelayMsg};
1031    use tor_linkspec::OwnedCircTarget;
1032    use tor_memquota::HasMemoryCost;
1033    use tor_rtcompat::Runtime;
1034    use tor_rtcompat::SpawnExt;
1035    use tracing::trace;
1036    use tracing_test::traced_test;
1037
1038    #[cfg(feature = "conflux")]
1039    use {
1040        crate::client::reactor::ConfluxHandshakeResult,
1041        crate::util::err::ConfluxHandshakeError,
1042        futures::future::FusedFuture,
1043        futures::lock::Mutex as AsyncMutex,
1044        std::pin::Pin,
1045        std::result::Result as StdResult,
1046        tor_cell::relaycell::conflux::{V1DesiredUx, V1LinkPayload, V1Nonce},
1047        tor_cell::relaycell::msg::ConfluxLink,
1048        tor_rtmock::MockRuntime,
1049    };
1050
1051    impl PendingClientTunnel {
1052        /// Testing only: Extract the circuit ID for this pending circuit.
1053        pub(crate) fn peek_circid(&self) -> CircId {
1054            self.circ.circid
1055        }
1056    }
1057
1058    impl ClientCirc {
1059        /// Testing only: Extract the circuit ID of this circuit.
1060        pub(crate) fn peek_circid(&self) -> CircId {
1061            self.circid
1062        }
1063    }
1064
1065    impl ClientTunnel {
1066        pub(crate) async fn resolve_last_hop(&self) -> TargetHop {
1067            let (sender, receiver) = oneshot::channel();
1068            let _ =
1069                self.as_single_circ()
1070                    .unwrap()
1071                    .command
1072                    .unbounded_send(CtrlCmd::ResolveTargetHop {
1073                        hop: TargetHop::LastHop,
1074                        done: sender,
1075                    });
1076            TargetHop::Hop(receiver.await.unwrap().unwrap())
1077        }
1078    }
1079
1080    fn rmsg_to_ccmsg(id: Option<StreamId>, msg: relaymsg::AnyRelayMsg) -> AnyChanMsg {
1081        // TODO #1947: test other formats.
1082        let rfmt = RelayCellFormat::V0;
1083        let body: BoxedCellBody = AnyRelayMsgOuter::new(id, msg)
1084            .encode(rfmt, &mut testing_rng())
1085            .unwrap();
1086        let chanmsg = chanmsg::Relay::from(body);
1087        AnyChanMsg::Relay(chanmsg)
1088    }
1089
1090    // Example relay IDs and keys
1091    const EXAMPLE_SK: [u8; 32] =
1092        hex!("7789d92a89711a7e2874c61ea495452cfd48627b3ca2ea9546aafa5bf7b55803");
1093    const EXAMPLE_PK: [u8; 32] =
1094        hex!("395cb26b83b3cd4b91dba9913e562ae87d21ecdd56843da7ca939a6a69001253");
1095    const EXAMPLE_ED_ID: [u8; 32] = [6; 32];
1096    const EXAMPLE_RSA_ID: [u8; 20] = [10; 20];
1097
1098    /// Make an MPSC queue, of the type we use in Channels, but a fake one for testing
1099    #[cfg(test)]
1100    pub(crate) fn fake_mpsc<T: HasMemoryCost + Debug + Send>(
1101        buffer: usize,
1102    ) -> (StreamMpscSender<T>, StreamMpscReceiver<T>) {
1103        crate::fake_mpsc(buffer)
1104    }
1105
1106    /// return an example OwnedCircTarget that can get used for an ntor handshake.
1107    fn example_target() -> OwnedCircTarget {
1108        let mut builder = OwnedCircTarget::builder();
1109        builder
1110            .chan_target()
1111            .ed_identity(EXAMPLE_ED_ID.into())
1112            .rsa_identity(EXAMPLE_RSA_ID.into());
1113        builder
1114            .ntor_onion_key(EXAMPLE_PK.into())
1115            .protocols("FlowCtrl=1-2".parse().unwrap())
1116            .build()
1117            .unwrap()
1118    }
1119    fn example_ntor_key() -> crate::crypto::handshake::ntor::NtorSecretKey {
1120        crate::crypto::handshake::ntor::NtorSecretKey::new(
1121            EXAMPLE_SK.into(),
1122            EXAMPLE_PK.into(),
1123            EXAMPLE_RSA_ID.into(),
1124        )
1125    }
1126    fn example_ntor_v3_key() -> crate::crypto::handshake::ntor_v3::NtorV3SecretKey {
1127        crate::crypto::handshake::ntor_v3::NtorV3SecretKey::new(
1128            EXAMPLE_SK.into(),
1129            EXAMPLE_PK.into(),
1130            EXAMPLE_ED_ID.into(),
1131        )
1132    }
1133
1134    fn working_fake_channel<R: Runtime>(
1135        rt: &R,
1136    ) -> (Arc<Channel>, Receiver<AnyChanCell>, Sender<CodecResult>) {
1137        let (channel, chan_reactor, rx, tx) = new_reactor(rt.clone());
1138        rt.spawn(async {
1139            let _ignore = chan_reactor.run().await;
1140        })
1141        .unwrap();
1142        (channel, rx, tx)
1143    }
1144
1145    /// Which handshake type to use.
1146    #[derive(Copy, Clone)]
1147    enum HandshakeType {
1148        Fast,
1149        Ntor,
1150        NtorV3,
1151    }
1152
1153    #[allow(deprecated)]
1154    async fn test_create<R: Runtime>(rt: &R, handshake_type: HandshakeType, with_cc: bool) {
1155        // We want to try progressing from a pending circuit to a circuit
1156        // via a crate_fast handshake.
1157
1158        use crate::crypto::handshake::{ServerHandshake, fast::CreateFastServer, ntor::NtorServer};
1159
1160        let (chan, mut rx, _sink) = working_fake_channel(rt);
1161        let circid = CircId::new(128).unwrap();
1162        let (created_send, created_recv) = oneshot::channel();
1163        let (_circmsg_send, circmsg_recv) = fake_mpsc(64);
1164        let unique_id = UniqId::new(23, 17);
1165        let (padding_ctrl, padding_stream) = new_padding(DynTimeProvider::new(rt.clone()));
1166
1167        let (pending, reactor) = PendingClientTunnel::new(
1168            circid,
1169            chan,
1170            created_recv,
1171            circmsg_recv,
1172            unique_id,
1173            DynTimeProvider::new(rt.clone()),
1174            CircuitAccount::new_noop(),
1175            padding_ctrl,
1176            padding_stream,
1177            Arc::new(DummyTimeoutEstimator),
1178        );
1179
1180        rt.spawn(async {
1181            let _ignore = reactor.run().await;
1182        })
1183        .unwrap();
1184
1185        // Future to pretend to be a relay on the other end of the circuit.
1186        let simulate_relay_fut = async move {
1187            let mut rng = testing_rng();
1188            let create_cell = rx.next().await.unwrap();
1189            assert_eq!(create_cell.circid(), Some(circid));
1190            let reply = match handshake_type {
1191                HandshakeType::Fast => {
1192                    let cf = match create_cell.msg() {
1193                        AnyChanMsg::CreateFast(cf) => cf,
1194                        other => panic!("{:?}", other),
1195                    };
1196                    let (_, rep) = CreateFastServer::server(
1197                        &mut rng,
1198                        &mut |_: &()| Some(()),
1199                        &[()],
1200                        cf.handshake(),
1201                    )
1202                    .unwrap();
1203                    CreateResponse::CreatedFast(CreatedFast::new(rep))
1204                }
1205                HandshakeType::Ntor => {
1206                    let c2 = match create_cell.msg() {
1207                        AnyChanMsg::Create2(c2) => c2,
1208                        other => panic!("{:?}", other),
1209                    };
1210                    let (_, rep) = NtorServer::server(
1211                        &mut rng,
1212                        &mut |_: &()| Some(()),
1213                        &[example_ntor_key()],
1214                        c2.body(),
1215                    )
1216                    .unwrap();
1217                    CreateResponse::Created2(Created2::new(rep))
1218                }
1219                HandshakeType::NtorV3 => {
1220                    let c2 = match create_cell.msg() {
1221                        AnyChanMsg::Create2(c2) => c2,
1222                        other => panic!("{:?}", other),
1223                    };
1224                    let mut reply_fn = if with_cc {
1225                        |client_exts: &[CircRequestExt]| {
1226                            let _ = client_exts
1227                                .iter()
1228                                .find(|e| matches!(e, CircRequestExt::CcRequest(_)))
1229                                .expect("Client failed to request CC");
1230                            // This needs to be aligned to test_utils params
1231                            // value due to validation that needs it in range.
1232                            Some(vec![CircResponseExt::CcResponse(
1233                                extend_ext::CcResponse::new(31),
1234                            )])
1235                        }
1236                    } else {
1237                        |_: &_| Some(vec![])
1238                    };
1239                    let (_, rep) = NtorV3Server::server(
1240                        &mut rng,
1241                        &mut reply_fn,
1242                        &[example_ntor_v3_key()],
1243                        c2.body(),
1244                    )
1245                    .unwrap();
1246                    CreateResponse::Created2(Created2::new(rep))
1247                }
1248            };
1249            created_send.send(reply).unwrap();
1250        };
1251        // Future to pretend to be a client.
1252        let client_fut = async move {
1253            let target = example_target();
1254            let params = CircParameters::default();
1255            let ret = match handshake_type {
1256                HandshakeType::Fast => {
1257                    trace!("doing fast create");
1258                    pending.create_firsthop_fast(params).await
1259                }
1260                HandshakeType::Ntor => {
1261                    trace!("doing ntor create");
1262                    pending.create_firsthop_ntor(&target, params).await
1263                }
1264                HandshakeType::NtorV3 => {
1265                    let params = if with_cc {
1266                        // Setup CC vegas parameters.
1267                        CircParameters::new(
1268                            true,
1269                            build_cc_vegas_params(),
1270                            FlowCtrlParameters::defaults_for_tests(),
1271                        )
1272                    } else {
1273                        params
1274                    };
1275                    trace!("doing ntor_v3 create");
1276                    pending.create_firsthop_ntor_v3(&target, params).await
1277                }
1278            };
1279            trace!("create done: result {:?}", ret);
1280            ret
1281        };
1282
1283        let (circ, _) = futures::join!(client_fut, simulate_relay_fut);
1284
1285        let _circ = circ.unwrap();
1286
1287        // pfew!  We've build a circuit!  Let's make sure it has one hop.
1288        assert_eq!(_circ.n_hops().unwrap(), 1);
1289    }
1290
1291    #[traced_test]
1292    #[test]
1293    fn test_create_fast() {
1294        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1295            test_create(&rt, HandshakeType::Fast, false).await;
1296        });
1297    }
1298    #[traced_test]
1299    #[test]
1300    fn test_create_ntor() {
1301        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1302            test_create(&rt, HandshakeType::Ntor, false).await;
1303        });
1304    }
1305    #[traced_test]
1306    #[test]
1307    fn test_create_ntor_v3() {
1308        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1309            test_create(&rt, HandshakeType::NtorV3, false).await;
1310        });
1311    }
1312    #[traced_test]
1313    #[test]
1314    #[cfg(feature = "flowctl-cc")]
1315    fn test_create_ntor_v3_with_cc() {
1316        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1317            test_create(&rt, HandshakeType::NtorV3, true).await;
1318        });
1319    }
1320
1321    // An encryption layer that doesn't do any crypto.   Can be used
1322    // as inbound or outbound, but not both at once.
1323    pub(crate) struct DummyCrypto {
1324        counter_tag: [u8; 20],
1325        counter: u32,
1326        lasthop: bool,
1327    }
1328    impl DummyCrypto {
1329        fn next_tag(&mut self) -> SendmeTag {
1330            #![allow(clippy::identity_op)]
1331            self.counter_tag[0] = ((self.counter >> 0) & 255) as u8;
1332            self.counter_tag[1] = ((self.counter >> 8) & 255) as u8;
1333            self.counter_tag[2] = ((self.counter >> 16) & 255) as u8;
1334            self.counter_tag[3] = ((self.counter >> 24) & 255) as u8;
1335            self.counter += 1;
1336            self.counter_tag.into()
1337        }
1338    }
1339
1340    impl crate::crypto::cell::OutboundClientLayer for DummyCrypto {
1341        fn originate_for(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) -> SendmeTag {
1342            self.next_tag()
1343        }
1344        fn encrypt_outbound(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) {}
1345    }
1346    impl crate::crypto::cell::InboundClientLayer for DummyCrypto {
1347        fn decrypt_inbound(
1348            &mut self,
1349            _cmd: ChanCmd,
1350            _cell: &mut RelayCellBody,
1351        ) -> Option<SendmeTag> {
1352            if self.lasthop {
1353                Some(self.next_tag())
1354            } else {
1355                None
1356            }
1357        }
1358    }
1359    impl DummyCrypto {
1360        pub(crate) fn new(lasthop: bool) -> Self {
1361            DummyCrypto {
1362                counter_tag: [0; 20],
1363                counter: 0,
1364                lasthop,
1365            }
1366        }
1367    }
1368
1369    // Helper: set up a 3-hop circuit with no encryption, where the
1370    // next inbound message seems to come from hop next_msg_from
1371    async fn newtunnel_ext<R: Runtime>(
1372        rt: &R,
1373        unique_id: UniqId,
1374        chan: Arc<Channel>,
1375        hops: Vec<path::HopDetail>,
1376        next_msg_from: HopNum,
1377        params: CircParameters,
1378    ) -> (ClientTunnel, CircuitRxSender) {
1379        let circid = CircId::new(128).unwrap();
1380        let (_created_send, created_recv) = oneshot::channel();
1381        let (circmsg_send, circmsg_recv) = fake_mpsc(64);
1382        let (padding_ctrl, padding_stream) = new_padding(DynTimeProvider::new(rt.clone()));
1383
1384        let (pending, reactor) = PendingClientTunnel::new(
1385            circid,
1386            chan,
1387            created_recv,
1388            circmsg_recv,
1389            unique_id,
1390            DynTimeProvider::new(rt.clone()),
1391            CircuitAccount::new_noop(),
1392            padding_ctrl,
1393            padding_stream,
1394            Arc::new(DummyTimeoutEstimator),
1395        );
1396
1397        rt.spawn(async {
1398            let _ignore = reactor.run().await;
1399        })
1400        .unwrap();
1401        let PendingClientTunnel {
1402            circ,
1403            recvcreated: _,
1404        } = pending;
1405
1406        // TODO #1067: Support other formats
1407        let relay_cell_format = RelayCellFormat::V0;
1408
1409        let last_hop_num = u8::try_from(hops.len() - 1).unwrap();
1410        for (idx, peer_id) in hops.into_iter().enumerate() {
1411            let (tx, rx) = oneshot::channel();
1412            let idx = idx as u8;
1413
1414            circ.command
1415                .unbounded_send(CtrlCmd::AddFakeHop {
1416                    relay_cell_format,
1417                    fwd_lasthop: idx == last_hop_num,
1418                    rev_lasthop: idx == u8::from(next_msg_from),
1419                    peer_id,
1420                    params: params.clone(),
1421                    done: tx,
1422                })
1423                .unwrap();
1424            rx.await.unwrap().unwrap();
1425        }
1426        (circ.into_tunnel().unwrap(), circmsg_send)
1427    }
1428
1429    // Helper: set up a 3-hop circuit with no encryption, where the
1430    // next inbound message seems to come from hop next_msg_from
1431    async fn newtunnel<R: Runtime>(
1432        rt: &R,
1433        chan: Arc<Channel>,
1434    ) -> (Arc<ClientTunnel>, CircuitRxSender) {
1435        let hops = std::iter::repeat_with(|| {
1436            let peer_id = tor_linkspec::OwnedChanTarget::builder()
1437                .ed_identity([4; 32].into())
1438                .rsa_identity([5; 20].into())
1439                .build()
1440                .expect("Could not construct fake hop");
1441
1442            path::HopDetail::Relay(peer_id)
1443        })
1444        .take(3)
1445        .collect();
1446
1447        let unique_id = UniqId::new(23, 17);
1448        let (tunnel, circmsg_send) = newtunnel_ext(
1449            rt,
1450            unique_id,
1451            chan,
1452            hops,
1453            2.into(),
1454            CircParameters::default(),
1455        )
1456        .await;
1457
1458        (Arc::new(tunnel), circmsg_send)
1459    }
1460
1461    /// Create `n` distinct [`path::HopDetail`]s,
1462    /// with the specified `start_idx` for the dummy identities.
1463    fn hop_details(n: u8, start_idx: u8) -> Vec<path::HopDetail> {
1464        (0..n)
1465            .map(|idx| {
1466                let peer_id = tor_linkspec::OwnedChanTarget::builder()
1467                    .ed_identity([idx + start_idx; 32].into())
1468                    .rsa_identity([idx + start_idx + 1; 20].into())
1469                    .build()
1470                    .expect("Could not construct fake hop");
1471
1472                path::HopDetail::Relay(peer_id)
1473            })
1474            .collect()
1475    }
1476
1477    #[allow(deprecated)]
1478    async fn test_extend<R: Runtime>(rt: &R, handshake_type: HandshakeType) {
1479        use crate::crypto::handshake::{ServerHandshake, ntor::NtorServer};
1480
1481        let (chan, mut rx, _sink) = working_fake_channel(rt);
1482        let (tunnel, mut sink) = newtunnel(rt, chan).await;
1483        let circ = Arc::new(tunnel.as_single_circ().unwrap());
1484        let circid = circ.peek_circid();
1485        let params = CircParameters::default();
1486
1487        let extend_fut = async move {
1488            let target = example_target();
1489            match handshake_type {
1490                HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
1491                HandshakeType::Ntor => circ.extend_ntor(&target, params).await.unwrap(),
1492                HandshakeType::NtorV3 => circ.extend_ntor_v3(&target, params).await.unwrap(),
1493            };
1494            circ // gotta keep the circ alive, or the reactor would exit.
1495        };
1496        let reply_fut = async move {
1497            // We've disabled encryption on this circuit, so we can just
1498            // read the extend2 cell.
1499            let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1500            assert_eq!(id, Some(circid));
1501            let rmsg = match chmsg {
1502                AnyChanMsg::RelayEarly(r) => {
1503                    AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1504                        .unwrap()
1505                }
1506                other => panic!("{:?}", other),
1507            };
1508            let e2 = match rmsg.msg() {
1509                AnyRelayMsg::Extend2(e2) => e2,
1510                other => panic!("{:?}", other),
1511            };
1512            let mut rng = testing_rng();
1513            let reply = match handshake_type {
1514                HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
1515                HandshakeType::Ntor => {
1516                    let (_keygen, reply) = NtorServer::server(
1517                        &mut rng,
1518                        &mut |_: &()| Some(()),
1519                        &[example_ntor_key()],
1520                        e2.handshake(),
1521                    )
1522                    .unwrap();
1523                    reply
1524                }
1525                HandshakeType::NtorV3 => {
1526                    let (_keygen, reply) = NtorV3Server::server(
1527                        &mut rng,
1528                        &mut |_: &[CircRequestExt]| Some(vec![]),
1529                        &[example_ntor_v3_key()],
1530                        e2.handshake(),
1531                    )
1532                    .unwrap();
1533                    reply
1534                }
1535            };
1536
1537            let extended2 = relaymsg::Extended2::new(reply).into();
1538            sink.send(rmsg_to_ccmsg(None, extended2)).await.unwrap();
1539            (sink, rx) // gotta keep the sink and receiver alive, or the reactor will exit.
1540        };
1541
1542        let (circ, (_sink, _rx)) = futures::join!(extend_fut, reply_fut);
1543
1544        // Did we really add another hop?
1545        assert_eq!(circ.n_hops().unwrap(), 4);
1546
1547        // Do the path accessors report a reasonable outcome?
1548        {
1549            let path = circ.single_path().unwrap();
1550            let path = path
1551                .all_hops()
1552                .filter_map(|hop| match hop {
1553                    path::HopDetail::Relay(r) => Some(r),
1554                    #[cfg(feature = "hs-common")]
1555                    path::HopDetail::Virtual => None,
1556                })
1557                .collect::<Vec<_>>();
1558
1559            assert_eq!(path.len(), 4);
1560            use tor_linkspec::HasRelayIds;
1561            assert_eq!(path[3].ed_identity(), example_target().ed_identity());
1562            assert_ne!(path[0].ed_identity(), example_target().ed_identity());
1563        }
1564        {
1565            let path = circ.single_path().unwrap();
1566            assert_eq!(path.n_hops(), 4);
1567            use tor_linkspec::HasRelayIds;
1568            assert_eq!(
1569                path.hops()[3].as_chan_target().unwrap().ed_identity(),
1570                example_target().ed_identity()
1571            );
1572            assert_ne!(
1573                path.hops()[0].as_chan_target().unwrap().ed_identity(),
1574                example_target().ed_identity()
1575            );
1576        }
1577    }
1578
1579    #[traced_test]
1580    #[test]
1581    fn test_extend_ntor() {
1582        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1583            test_extend(&rt, HandshakeType::Ntor).await;
1584        });
1585    }
1586
1587    #[traced_test]
1588    #[test]
1589    fn test_extend_ntor_v3() {
1590        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1591            test_extend(&rt, HandshakeType::NtorV3).await;
1592        });
1593    }
1594
1595    #[allow(deprecated)]
1596    async fn bad_extend_test_impl<R: Runtime>(
1597        rt: &R,
1598        reply_hop: HopNum,
1599        bad_reply: AnyChanMsg,
1600    ) -> Error {
1601        let (chan, mut rx, _sink) = working_fake_channel(rt);
1602        let hops = std::iter::repeat_with(|| {
1603            let peer_id = tor_linkspec::OwnedChanTarget::builder()
1604                .ed_identity([4; 32].into())
1605                .rsa_identity([5; 20].into())
1606                .build()
1607                .expect("Could not construct fake hop");
1608
1609            path::HopDetail::Relay(peer_id)
1610        })
1611        .take(3)
1612        .collect();
1613
1614        let unique_id = UniqId::new(23, 17);
1615        let (tunnel, mut sink) = newtunnel_ext(
1616            rt,
1617            unique_id,
1618            chan,
1619            hops,
1620            reply_hop,
1621            CircParameters::default(),
1622        )
1623        .await;
1624        let params = CircParameters::default();
1625
1626        let target = example_target();
1627        let reply_task_handle = rt
1628            .spawn_with_handle(async move {
1629                // Wait for a cell, and make sure it's EXTEND2.
1630                let (_circid, chanmsg) = rx.next().await.unwrap().into_circid_and_msg();
1631                let AnyChanMsg::RelayEarly(relay_early) = chanmsg else {
1632                    panic!("unexpected message {chanmsg:?}");
1633                };
1634                let relaymsg = UnparsedRelayMsg::from_singleton_body(
1635                    RelayCellFormat::V0,
1636                    relay_early.into_relay_body(),
1637                )
1638                .unwrap();
1639                assert_eq!(relaymsg.cmd(), RelayCmd::EXTEND2);
1640
1641                // Send back the "bad_reply."
1642                sink.send(bad_reply).await.unwrap();
1643                sink
1644            })
1645            .unwrap();
1646        let outcome = tunnel
1647            .as_single_circ()
1648            .unwrap()
1649            .extend_ntor(&target, params)
1650            .await;
1651        let _sink = reply_task_handle.await;
1652
1653        assert_eq!(tunnel.n_hops().unwrap(), 3);
1654        assert!(outcome.is_err());
1655        outcome.unwrap_err()
1656    }
1657
1658    #[traced_test]
1659    #[test]
1660    fn bad_extend_wronghop() {
1661        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1662            let extended2 = relaymsg::Extended2::new(vec![]).into();
1663            let cc = rmsg_to_ccmsg(None, extended2);
1664
1665            let error = bad_extend_test_impl(&rt, 1.into(), cc).await;
1666            // This case shows up as a CircDestroy, since a message sent
1667            // from the wrong hop won't even be delivered to the extend
1668            // code's meta-handler.  Instead the unexpected message will cause
1669            // the circuit to get torn down.
1670            match error {
1671                Error::CircuitClosed => {}
1672                x => panic!("got other error: {}", x),
1673            }
1674        });
1675    }
1676
1677    #[traced_test]
1678    #[test]
1679    fn bad_extend_wrongtype() {
1680        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1681            let extended = relaymsg::Extended::new(vec![7; 200]).into();
1682            let cc = rmsg_to_ccmsg(None, extended);
1683
1684            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
1685            match error {
1686                Error::BytesErr {
1687                    err: tor_bytes::Error::InvalidMessage(_),
1688                    object: "extended2 message",
1689                } => {}
1690                other => panic!("{:?}", other),
1691            }
1692        });
1693    }
1694
1695    #[traced_test]
1696    #[test]
1697    fn bad_extend_destroy() {
1698        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1699            let cc = AnyChanMsg::Destroy(chanmsg::Destroy::new(4.into()));
1700            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
1701            match error {
1702                Error::CircuitClosed => {}
1703                other => panic!("{:?}", other),
1704            }
1705        });
1706    }
1707
1708    #[traced_test]
1709    #[test]
1710    fn bad_extend_crypto() {
1711        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1712            let extended2 = relaymsg::Extended2::new(vec![99; 256]).into();
1713            let cc = rmsg_to_ccmsg(None, extended2);
1714            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
1715            assert_matches!(error, Error::BadCircHandshakeAuth);
1716        });
1717    }
1718
1719    #[traced_test]
1720    #[test]
1721    fn begindir() {
1722        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1723            let (chan, mut rx, _sink) = working_fake_channel(&rt);
1724            let (tunnel, mut sink) = newtunnel(&rt, chan).await;
1725            let circ = tunnel.as_single_circ().unwrap();
1726            let circid = circ.peek_circid();
1727
1728            let begin_and_send_fut = async move {
1729                // Here we'll say we've got a circuit, and we want to
1730                // make a simple BEGINDIR request with it.
1731                let mut stream = tunnel.begin_dir_stream().await.unwrap();
1732                stream.write_all(b"HTTP/1.0 GET /\r\n").await.unwrap();
1733                stream.flush().await.unwrap();
1734                let mut buf = [0_u8; 1024];
1735                let n = stream.read(&mut buf).await.unwrap();
1736                assert_eq!(&buf[..n], b"HTTP/1.0 404 Not found\r\n");
1737                let n = stream.read(&mut buf).await.unwrap();
1738                assert_eq!(n, 0);
1739                stream
1740            };
1741            let reply_fut = async move {
1742                // We've disabled encryption on this circuit, so we can just
1743                // read the begindir cell.
1744                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1745                assert_eq!(id, Some(circid));
1746                let rmsg = match chmsg {
1747                    AnyChanMsg::Relay(r) => {
1748                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1749                            .unwrap()
1750                    }
1751                    other => panic!("{:?}", other),
1752                };
1753                let (streamid, rmsg) = rmsg.into_streamid_and_msg();
1754                assert_matches!(rmsg, AnyRelayMsg::BeginDir(_));
1755
1756                // Reply with a Connected cell to indicate success.
1757                let connected = relaymsg::Connected::new_empty().into();
1758                sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
1759
1760                // Now read a DATA cell...
1761                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1762                assert_eq!(id, Some(circid));
1763                let rmsg = match chmsg {
1764                    AnyChanMsg::Relay(r) => {
1765                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1766                            .unwrap()
1767                    }
1768                    other => panic!("{:?}", other),
1769                };
1770                let (streamid_2, rmsg) = rmsg.into_streamid_and_msg();
1771                assert_eq!(streamid_2, streamid);
1772                if let AnyRelayMsg::Data(d) = rmsg {
1773                    assert_eq!(d.as_ref(), &b"HTTP/1.0 GET /\r\n"[..]);
1774                } else {
1775                    panic!();
1776                }
1777
1778                // Write another data cell in reply!
1779                let data = relaymsg::Data::new(b"HTTP/1.0 404 Not found\r\n")
1780                    .unwrap()
1781                    .into();
1782                sink.send(rmsg_to_ccmsg(streamid, data)).await.unwrap();
1783
1784                // Send an END cell to say that the conversation is over.
1785                let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into();
1786                sink.send(rmsg_to_ccmsg(streamid, end)).await.unwrap();
1787
1788                (rx, sink) // gotta keep these alive, or the reactor will exit.
1789            };
1790
1791            let (_stream, (_rx, _sink)) = futures::join!(begin_and_send_fut, reply_fut);
1792        });
1793    }
1794
1795    // Test: close a stream, either by dropping it or by calling AsyncWriteExt::close.
1796    fn close_stream_helper(by_drop: bool) {
1797        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1798            let (chan, mut rx, _sink) = working_fake_channel(&rt);
1799            let (tunnel, mut sink) = newtunnel(&rt, chan).await;
1800
1801            let stream_fut = async move {
1802                let stream = tunnel
1803                    .begin_stream("www.example.com", 80, None)
1804                    .await
1805                    .unwrap();
1806
1807                let (r, mut w) = stream.split();
1808                if by_drop {
1809                    // Drop the writer and the reader, which should close the stream.
1810                    drop(r);
1811                    drop(w);
1812                    (None, tunnel) // make sure to keep the circuit alive
1813                } else {
1814                    // Call close on the writer, while keeping the reader alive.
1815                    w.close().await.unwrap();
1816                    (Some(r), tunnel)
1817                }
1818            };
1819            let handler_fut = async {
1820                // Read the BEGIN message.
1821                let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
1822                let rmsg = match msg {
1823                    AnyChanMsg::Relay(r) => {
1824                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1825                            .unwrap()
1826                    }
1827                    other => panic!("{:?}", other),
1828                };
1829                let (streamid, rmsg) = rmsg.into_streamid_and_msg();
1830                assert_eq!(rmsg.cmd(), RelayCmd::BEGIN);
1831
1832                // Reply with a CONNECTED.
1833                let connected =
1834                    relaymsg::Connected::new_with_addr("10.0.0.1".parse().unwrap(), 1234).into();
1835                sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
1836
1837                // Expect an END.
1838                let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
1839                let rmsg = match msg {
1840                    AnyChanMsg::Relay(r) => {
1841                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1842                            .unwrap()
1843                    }
1844                    other => panic!("{:?}", other),
1845                };
1846                let (_, rmsg) = rmsg.into_streamid_and_msg();
1847                assert_eq!(rmsg.cmd(), RelayCmd::END);
1848
1849                (rx, sink) // keep these alive or the reactor will exit.
1850            };
1851
1852            let ((_opt_reader, _circ), (_rx, _sink)) = futures::join!(stream_fut, handler_fut);
1853        });
1854    }
1855
1856    #[traced_test]
1857    #[test]
1858    fn drop_stream() {
1859        close_stream_helper(true);
1860    }
1861
1862    #[traced_test]
1863    #[test]
1864    fn close_stream() {
1865        close_stream_helper(false);
1866    }
1867
1868    #[traced_test]
1869    #[test]
1870    fn expire_halfstreams() {
1871        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
1872            let (chan, mut rx, _sink) = working_fake_channel(&rt);
1873            let (tunnel, mut sink) = newtunnel(&rt, chan).await;
1874
1875            let client_fut = async move {
1876                let stream = tunnel
1877                    .begin_stream("www.example.com", 80, None)
1878                    .await
1879                    .unwrap();
1880
1881                let (r, mut w) = stream.split();
1882                // Close the stream
1883                w.close().await.unwrap();
1884                (Some(r), tunnel)
1885            };
1886            let exit_fut = async {
1887                // Read the BEGIN message.
1888                let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
1889                let rmsg = match msg {
1890                    AnyChanMsg::Relay(r) => {
1891                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1892                            .unwrap()
1893                    }
1894                    other => panic!("{:?}", other),
1895                };
1896                let (streamid, rmsg) = rmsg.into_streamid_and_msg();
1897                assert_eq!(rmsg.cmd(), RelayCmd::BEGIN);
1898
1899                // Reply with a CONNECTED.
1900                let connected =
1901                    relaymsg::Connected::new_with_addr("10.0.0.1".parse().unwrap(), 1234).into();
1902                sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
1903
1904                (rx, streamid, sink) // keep these alive or the reactor will exit.
1905            };
1906
1907            let ((_opt_reader, tunnel), (_rx, streamid, mut sink)) =
1908                futures::join!(client_fut, exit_fut);
1909
1910            // Progress all futures to ensure the reactor has a chance to notice
1911            // we closed the stream.
1912            rt.progress_until_stalled().await;
1913
1914            // The tunnel should remain open
1915            assert!(!tunnel.is_closed());
1916
1917            // Write some more data on the half-stream.
1918            // The half-stream hasn't expired yet, so it will simply be ignored.
1919            let data = relaymsg::Data::new(b"hello").unwrap();
1920            sink.send(rmsg_to_ccmsg(streamid, AnyRelayMsg::Data(data)))
1921                .await
1922                .unwrap();
1923            rt.progress_until_stalled().await;
1924
1925            // This was not a protocol violation, so the tunnel is still alive.
1926            assert!(!tunnel.is_closed());
1927
1928            // Advance the time to cause the half-streams to get garbage collected.
1929            //
1930            // Advancing it by 2 * CBT ought to be enough, because the RTT estimator
1931            // won't yet have an estimate for the max_rtt.
1932            let stream_timeout = DummyTimeoutEstimator.circuit_build_timeout(3);
1933            rt.advance_by(2 * stream_timeout).await;
1934
1935            // Sending this cell is a protocol violation now
1936            // that the half-stream expired.
1937            let data = relaymsg::Data::new(b"hello").unwrap();
1938            sink.send(rmsg_to_ccmsg(streamid, AnyRelayMsg::Data(data)))
1939                .await
1940                .unwrap();
1941            rt.progress_until_stalled().await;
1942
1943            // The tunnel shut down because of the proto violation.
1944            assert!(tunnel.is_closed());
1945        });
1946    }
1947
1948    // Set up a circuit and stream that expects some incoming SENDMEs.
1949    async fn setup_incoming_sendme_case<R: Runtime>(
1950        rt: &R,
1951        n_to_send: usize,
1952    ) -> (
1953        Arc<ClientTunnel>,
1954        DataStream,
1955        CircuitRxSender,
1956        Option<StreamId>,
1957        usize,
1958        Receiver<AnyChanCell>,
1959        Sender<CodecResult>,
1960    ) {
1961        let (chan, mut rx, sink2) = working_fake_channel(rt);
1962        let (tunnel, mut sink) = newtunnel(rt, chan).await;
1963        let circid = tunnel.as_single_circ().unwrap().peek_circid();
1964
1965        let begin_and_send_fut = {
1966            let tunnel = tunnel.clone();
1967            async move {
1968                // Take our circuit and make a stream on it.
1969                let mut stream = tunnel
1970                    .begin_stream("www.example.com", 443, None)
1971                    .await
1972                    .unwrap();
1973                let junk = [0_u8; 1024];
1974                let mut remaining = n_to_send;
1975                while remaining > 0 {
1976                    let n = std::cmp::min(remaining, junk.len());
1977                    stream.write_all(&junk[..n]).await.unwrap();
1978                    remaining -= n;
1979                }
1980                stream.flush().await.unwrap();
1981                stream
1982            }
1983        };
1984
1985        let receive_fut = async move {
1986            // Read the begin cell.
1987            let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1988            let rmsg = match chmsg {
1989                AnyChanMsg::Relay(r) => {
1990                    AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1991                        .unwrap()
1992                }
1993                other => panic!("{:?}", other),
1994            };
1995            let (streamid, rmsg) = rmsg.into_streamid_and_msg();
1996            assert_matches!(rmsg, AnyRelayMsg::Begin(_));
1997            // Reply with a connected cell...
1998            let connected = relaymsg::Connected::new_empty().into();
1999            sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2000            // Now read bytes from the stream until we have them all.
2001            let mut bytes_received = 0_usize;
2002            let mut cells_received = 0_usize;
2003            while bytes_received < n_to_send {
2004                // Read a data cell, and remember how much we got.
2005                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2006                assert_eq!(id, Some(circid));
2007
2008                let rmsg = match chmsg {
2009                    AnyChanMsg::Relay(r) => {
2010                        AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2011                            .unwrap()
2012                    }
2013                    other => panic!("{:?}", other),
2014                };
2015                let (streamid2, rmsg) = rmsg.into_streamid_and_msg();
2016                assert_eq!(streamid2, streamid);
2017                if let AnyRelayMsg::Data(dat) = rmsg {
2018                    cells_received += 1;
2019                    bytes_received += dat.as_ref().len();
2020                } else {
2021                    panic!();
2022                }
2023            }
2024
2025            (sink, streamid, cells_received, rx)
2026        };
2027
2028        let (stream, (sink, streamid, cells_received, rx)) =
2029            futures::join!(begin_and_send_fut, receive_fut);
2030
2031        (tunnel, stream, sink, streamid, cells_received, rx, sink2)
2032    }
2033
2034    #[traced_test]
2035    #[test]
2036    fn accept_valid_sendme() {
2037        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2038            let (tunnel, _stream, mut sink, streamid, cells_received, _rx, _sink2) =
2039                setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
2040            let circ = tunnel.as_single_circ().unwrap();
2041
2042            assert_eq!(cells_received, 301);
2043
2044            // Make sure that the circuit is indeed expecting the right sendmes
2045            {
2046                let (tx, rx) = oneshot::channel();
2047                circ.command
2048                    .unbounded_send(CtrlCmd::QuerySendWindow {
2049                        hop: 2.into(),
2050                        leg: tunnel.unique_id(),
2051                        done: tx,
2052                    })
2053                    .unwrap();
2054                let (window, tags) = rx.await.unwrap().unwrap();
2055                assert_eq!(window, 1000 - 301);
2056                assert_eq!(tags.len(), 3);
2057                // 100
2058                assert_eq!(
2059                    tags[0],
2060                    SendmeTag::from(hex!("6400000000000000000000000000000000000000"))
2061                );
2062                // 200
2063                assert_eq!(
2064                    tags[1],
2065                    SendmeTag::from(hex!("c800000000000000000000000000000000000000"))
2066                );
2067                // 300
2068                assert_eq!(
2069                    tags[2],
2070                    SendmeTag::from(hex!("2c01000000000000000000000000000000000000"))
2071                );
2072            }
2073
2074            let reply_with_sendme_fut = async move {
2075                // make and send a circuit-level sendme.
2076                let c_sendme =
2077                    relaymsg::Sendme::new_tag(hex!("6400000000000000000000000000000000000000"))
2078                        .into();
2079                sink.send(rmsg_to_ccmsg(None, c_sendme)).await.unwrap();
2080
2081                // Make and send a stream-level sendme.
2082                let s_sendme = relaymsg::Sendme::new_empty().into();
2083                sink.send(rmsg_to_ccmsg(streamid, s_sendme)).await.unwrap();
2084
2085                sink
2086            };
2087
2088            let _sink = reply_with_sendme_fut.await;
2089
2090            rt.advance_until_stalled().await;
2091
2092            // Now make sure that the circuit is still happy, and its
2093            // window is updated.
2094            {
2095                let (tx, rx) = oneshot::channel();
2096                circ.command
2097                    .unbounded_send(CtrlCmd::QuerySendWindow {
2098                        hop: 2.into(),
2099                        leg: tunnel.unique_id(),
2100                        done: tx,
2101                    })
2102                    .unwrap();
2103                let (window, _tags) = rx.await.unwrap().unwrap();
2104                assert_eq!(window, 1000 - 201);
2105            }
2106        });
2107    }
2108
2109    #[traced_test]
2110    #[test]
2111    fn invalid_circ_sendme() {
2112        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2113            // Same setup as accept_valid_sendme() test above but try giving
2114            // a sendme with the wrong tag.
2115
2116            let (tunnel, _stream, mut sink, _streamid, _cells_received, _rx, _sink2) =
2117                setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
2118
2119            let reply_with_sendme_fut = async move {
2120                // make and send a circuit-level sendme with a bad tag.
2121                let c_sendme =
2122                    relaymsg::Sendme::new_tag(hex!("FFFF0000000000000000000000000000000000FF"))
2123                        .into();
2124                sink.send(rmsg_to_ccmsg(None, c_sendme)).await.unwrap();
2125                sink
2126            };
2127
2128            let _sink = reply_with_sendme_fut.await;
2129
2130            // Check whether the reactor dies as a result of receiving invalid data.
2131            rt.advance_until_stalled().await;
2132            assert!(tunnel.is_closed());
2133        });
2134    }
2135
2136    #[traced_test]
2137    #[test]
2138    fn test_busy_stream_fairness() {
2139        // Number of streams to use.
2140        const N_STREAMS: usize = 3;
2141        // Number of cells (roughly) for each stream to send.
2142        const N_CELLS: usize = 20;
2143        // Number of bytes that *each* stream will send, and that we'll read
2144        // from the channel.
2145        const N_BYTES: usize = relaymsg::Data::MAXLEN_V0 * N_CELLS;
2146        // Ignoring cell granularity, with perfect fairness we'd expect
2147        // `N_BYTES/N_STREAMS` bytes from each stream.
2148        //
2149        // We currently allow for up to a full cell less than that.  This is
2150        // somewhat arbitrary and can be changed as needed, since we don't
2151        // provide any specific fairness guarantees.
2152        const MIN_EXPECTED_BYTES_PER_STREAM: usize =
2153            N_BYTES / N_STREAMS - relaymsg::Data::MAXLEN_V0;
2154
2155        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2156            let (chan, mut rx, _sink) = working_fake_channel(&rt);
2157            let (tunnel, mut sink) = newtunnel(&rt, chan).await;
2158
2159            // Run clients in a single task, doing our own round-robin
2160            // scheduling of writes to the reactor. Conversely, if we were to
2161            // put each client in its own task, we would be at the the mercy of
2162            // how fairly the runtime schedules the client tasks, which is outside
2163            // the scope of this test.
2164            rt.spawn({
2165                // Clone the circuit to keep it alive after writers have
2166                // finished with it.
2167                let tunnel = tunnel.clone();
2168                async move {
2169                    let mut clients = VecDeque::new();
2170                    struct Client {
2171                        stream: DataStream,
2172                        to_write: &'static [u8],
2173                    }
2174                    for _ in 0..N_STREAMS {
2175                        clients.push_back(Client {
2176                            stream: tunnel
2177                                .begin_stream("www.example.com", 80, None)
2178                                .await
2179                                .unwrap(),
2180                            to_write: &[0_u8; N_BYTES][..],
2181                        });
2182                    }
2183                    while let Some(mut client) = clients.pop_front() {
2184                        if client.to_write.is_empty() {
2185                            // Client is done. Don't put back in queue.
2186                            continue;
2187                        }
2188                        let written = client.stream.write(client.to_write).await.unwrap();
2189                        client.to_write = &client.to_write[written..];
2190                        clients.push_back(client);
2191                    }
2192                }
2193            })
2194            .unwrap();
2195
2196            let channel_handler_fut = async {
2197                let mut stream_bytes_received = HashMap::<StreamId, usize>::new();
2198                let mut total_bytes_received = 0;
2199
2200                loop {
2201                    let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
2202                    let rmsg = match msg {
2203                        AnyChanMsg::Relay(r) => AnyRelayMsgOuter::decode_singleton(
2204                            RelayCellFormat::V0,
2205                            r.into_relay_body(),
2206                        )
2207                        .unwrap(),
2208                        other => panic!("Unexpected chanmsg: {other:?}"),
2209                    };
2210                    let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2211                    match rmsg.cmd() {
2212                        RelayCmd::BEGIN => {
2213                            // Add an entry for this stream.
2214                            let prev = stream_bytes_received.insert(streamid.unwrap(), 0);
2215                            assert_eq!(prev, None);
2216                            // Reply with a CONNECTED.
2217                            let connected = relaymsg::Connected::new_with_addr(
2218                                "10.0.0.1".parse().unwrap(),
2219                                1234,
2220                            )
2221                            .into();
2222                            sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2223                        }
2224                        RelayCmd::DATA => {
2225                            let data_msg = relaymsg::Data::try_from(rmsg).unwrap();
2226                            let nbytes = data_msg.as_ref().len();
2227                            total_bytes_received += nbytes;
2228                            let streamid = streamid.unwrap();
2229                            let stream_bytes = stream_bytes_received.get_mut(&streamid).unwrap();
2230                            *stream_bytes += nbytes;
2231                            if total_bytes_received >= N_BYTES {
2232                                break;
2233                            }
2234                        }
2235                        RelayCmd::END => {
2236                            // Stream is done. If fair scheduling is working as
2237                            // expected we *probably* shouldn't get here, but we
2238                            // can ignore it and save the failure until we
2239                            // actually have the final stats.
2240                            continue;
2241                        }
2242                        other => {
2243                            panic!("Unexpected command {other:?}");
2244                        }
2245                    }
2246                }
2247
2248                // Return our stats, along with the `rx` and `sink` to keep the
2249                // reactor alive (since clients could still be writing).
2250                (total_bytes_received, stream_bytes_received, rx, sink)
2251            };
2252
2253            let (total_bytes_received, stream_bytes_received, _rx, _sink) =
2254                channel_handler_fut.await;
2255            assert_eq!(stream_bytes_received.len(), N_STREAMS);
2256            for (sid, stream_bytes) in stream_bytes_received {
2257                assert!(
2258                    stream_bytes >= MIN_EXPECTED_BYTES_PER_STREAM,
2259                    "Only {stream_bytes} of {total_bytes_received} bytes received from {N_STREAMS} came from {sid:?}; expected at least {MIN_EXPECTED_BYTES_PER_STREAM}"
2260                );
2261            }
2262        });
2263    }
2264
2265    #[test]
2266    fn basic_params() {
2267        use super::CircParameters;
2268        let mut p = CircParameters::default();
2269        assert!(p.extend_by_ed25519_id);
2270
2271        p.extend_by_ed25519_id = false;
2272        assert!(!p.extend_by_ed25519_id);
2273    }
2274
2275    #[cfg(feature = "hs-service")]
2276    struct AllowAllStreamsFilter;
2277    #[cfg(feature = "hs-service")]
2278    impl IncomingStreamRequestFilter for AllowAllStreamsFilter {
2279        fn disposition(
2280            &mut self,
2281            _ctx: &crate::client::stream::IncomingStreamRequestContext<'_>,
2282            _circ: &crate::circuit::CircHopSyncView<'_>,
2283        ) -> Result<crate::client::stream::IncomingStreamRequestDisposition> {
2284            Ok(crate::client::stream::IncomingStreamRequestDisposition::Accept)
2285        }
2286    }
2287
2288    #[traced_test]
2289    #[test]
2290    #[cfg(feature = "hs-service")]
2291    fn allow_stream_requests_twice() {
2292        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2293            let (chan, _rx, _sink) = working_fake_channel(&rt);
2294            let (tunnel, _send) = newtunnel(&rt, chan).await;
2295
2296            let _incoming = tunnel
2297                .allow_stream_requests(
2298                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2299                    tunnel.resolve_last_hop().await,
2300                    AllowAllStreamsFilter,
2301                )
2302                .await
2303                .unwrap();
2304
2305            let incoming = tunnel
2306                .allow_stream_requests(
2307                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2308                    tunnel.resolve_last_hop().await,
2309                    AllowAllStreamsFilter,
2310                )
2311                .await;
2312
2313            // There can only be one IncomingStream at a time on any given circuit.
2314            assert!(incoming.is_err());
2315        });
2316    }
2317
2318    #[traced_test]
2319    #[test]
2320    #[cfg(feature = "hs-service")]
2321    fn allow_stream_requests() {
2322        use tor_cell::relaycell::msg::BeginFlags;
2323
2324        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2325            const TEST_DATA: &[u8] = b"ping";
2326
2327            let (chan, _rx, _sink) = working_fake_channel(&rt);
2328            let (tunnel, mut send) = newtunnel(&rt, chan).await;
2329
2330            let rfmt = RelayCellFormat::V0;
2331
2332            // A helper channel for coordinating the "client"/"service" interaction
2333            let (tx, rx) = oneshot::channel();
2334            let mut incoming = tunnel
2335                .allow_stream_requests(
2336                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2337                    tunnel.resolve_last_hop().await,
2338                    AllowAllStreamsFilter,
2339                )
2340                .await
2341                .unwrap();
2342
2343            let simulate_service = async move {
2344                let stream = incoming.next().await.unwrap();
2345                let mut data_stream = stream
2346                    .accept_data(relaymsg::Connected::new_empty())
2347                    .await
2348                    .unwrap();
2349                // Notify the client task we're ready to accept DATA cells
2350                tx.send(()).unwrap();
2351
2352                // Read the data the client sent us
2353                let mut buf = [0_u8; TEST_DATA.len()];
2354                data_stream.read_exact(&mut buf).await.unwrap();
2355                assert_eq!(&buf, TEST_DATA);
2356
2357                tunnel
2358            };
2359
2360            let simulate_client = async move {
2361                let begin = relaymsg::Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2362                let body: BoxedCellBody =
2363                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2364                        .encode(rfmt, &mut testing_rng())
2365                        .unwrap();
2366                let begin_msg = chanmsg::Relay::from(body);
2367
2368                // Pretend to be a client at the other end of the circuit sending a begin cell
2369                send.send(AnyChanMsg::Relay(begin_msg)).await.unwrap();
2370
2371                // Wait until the service is ready to accept data
2372                // TODO: we shouldn't need to wait! This is needed because the service will reject
2373                // any DATA cells that aren't associated with a known stream. We need to wait until
2374                // the service receives our BEGIN cell (and the reactor updates hop.map with the
2375                // new stream).
2376                rx.await.unwrap();
2377                // Now send some data along the newly established circuit..
2378                let data = relaymsg::Data::new(TEST_DATA).unwrap();
2379                let body: BoxedCellBody =
2380                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
2381                        .encode(rfmt, &mut testing_rng())
2382                        .unwrap();
2383                let data_msg = chanmsg::Relay::from(body);
2384
2385                send.send(AnyChanMsg::Relay(data_msg)).await.unwrap();
2386                send
2387            };
2388
2389            let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2390        });
2391    }
2392
2393    #[traced_test]
2394    #[test]
2395    #[cfg(feature = "hs-service")]
2396    fn accept_stream_after_reject() {
2397        use tor_cell::relaycell::msg::AnyRelayMsg;
2398        use tor_cell::relaycell::msg::BeginFlags;
2399        use tor_cell::relaycell::msg::EndReason;
2400
2401        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2402            const TEST_DATA: &[u8] = b"ping";
2403            const STREAM_COUNT: usize = 2;
2404            let rfmt = RelayCellFormat::V0;
2405
2406            let (chan, _rx, _sink) = working_fake_channel(&rt);
2407            let (tunnel, mut send) = newtunnel(&rt, chan).await;
2408
2409            // A helper channel for coordinating the "client"/"service" interaction
2410            let (mut tx, mut rx) = mpsc::channel(STREAM_COUNT);
2411
2412            let mut incoming = tunnel
2413                .allow_stream_requests(
2414                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2415                    tunnel.resolve_last_hop().await,
2416                    AllowAllStreamsFilter,
2417                )
2418                .await
2419                .unwrap();
2420
2421            let simulate_service = async move {
2422                // Process 2 incoming streams
2423                for i in 0..STREAM_COUNT {
2424                    let stream = incoming.next().await.unwrap();
2425
2426                    // Reject the first one
2427                    if i == 0 {
2428                        stream
2429                            .reject(relaymsg::End::new_with_reason(EndReason::INTERNAL))
2430                            .await
2431                            .unwrap();
2432                        // Notify the client
2433                        tx.send(()).await.unwrap();
2434                        continue;
2435                    }
2436
2437                    let mut data_stream = stream
2438                        .accept_data(relaymsg::Connected::new_empty())
2439                        .await
2440                        .unwrap();
2441                    // Notify the client task we're ready to accept DATA cells
2442                    tx.send(()).await.unwrap();
2443
2444                    // Read the data the client sent us
2445                    let mut buf = [0_u8; TEST_DATA.len()];
2446                    data_stream.read_exact(&mut buf).await.unwrap();
2447                    assert_eq!(&buf, TEST_DATA);
2448                }
2449
2450                tunnel
2451            };
2452
2453            let simulate_client = async move {
2454                let begin = relaymsg::Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2455                let body: BoxedCellBody =
2456                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2457                        .encode(rfmt, &mut testing_rng())
2458                        .unwrap();
2459                let begin_msg = chanmsg::Relay::from(body);
2460
2461                // Pretend to be a client at the other end of the circuit sending 2 identical begin
2462                // cells (the first one will be rejected by the test service).
2463                for _ in 0..STREAM_COUNT {
2464                    send.send(AnyChanMsg::Relay(begin_msg.clone()))
2465                        .await
2466                        .unwrap();
2467
2468                    // Wait until the service rejects our request
2469                    rx.next().await.unwrap();
2470                }
2471
2472                // Now send some data along the newly established circuit..
2473                let data = relaymsg::Data::new(TEST_DATA).unwrap();
2474                let body: BoxedCellBody =
2475                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
2476                        .encode(rfmt, &mut testing_rng())
2477                        .unwrap();
2478                let data_msg = chanmsg::Relay::from(body);
2479
2480                send.send(AnyChanMsg::Relay(data_msg)).await.unwrap();
2481                send
2482            };
2483
2484            let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2485        });
2486    }
2487
2488    #[traced_test]
2489    #[test]
2490    #[cfg(feature = "hs-service")]
2491    fn incoming_stream_bad_hop() {
2492        use tor_cell::relaycell::msg::BeginFlags;
2493
2494        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2495            /// Expect the originator of the BEGIN cell to be hop 1.
2496            const EXPECTED_HOP: u8 = 1;
2497            let rfmt = RelayCellFormat::V0;
2498
2499            let (chan, _rx, _sink) = working_fake_channel(&rt);
2500            let (tunnel, mut send) = newtunnel(&rt, chan).await;
2501
2502            // Expect to receive incoming streams from hop EXPECTED_HOP
2503            let mut incoming = tunnel
2504                .allow_stream_requests(
2505                    &[tor_cell::relaycell::RelayCmd::BEGIN],
2506                    // Build the precise HopLocation with the underlying circuit.
2507                    (
2508                        tunnel.as_single_circ().unwrap().unique_id(),
2509                        EXPECTED_HOP.into(),
2510                    )
2511                        .into(),
2512                    AllowAllStreamsFilter,
2513                )
2514                .await
2515                .unwrap();
2516
2517            let simulate_service = async move {
2518                // The originator of the cell is actually the last hop on the circuit, not hop 1,
2519                // so we expect the reactor to shut down.
2520                assert!(incoming.next().await.is_none());
2521                tunnel
2522            };
2523
2524            let simulate_client = async move {
2525                let begin = relaymsg::Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2526                let body: BoxedCellBody =
2527                    AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2528                        .encode(rfmt, &mut testing_rng())
2529                        .unwrap();
2530                let begin_msg = chanmsg::Relay::from(body);
2531
2532                // Pretend to be a client at the other end of the circuit sending a begin cell
2533                send.send(AnyChanMsg::Relay(begin_msg)).await.unwrap();
2534
2535                send
2536            };
2537
2538            let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2539        });
2540    }
2541
2542    #[traced_test]
2543    #[test]
2544    #[cfg(feature = "conflux")]
2545    fn multipath_circ_validation() {
2546        use std::error::Error as _;
2547
2548        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2549            let params = CircParameters::default();
2550            let invalid_tunnels = [
2551                setup_bad_conflux_tunnel(&rt).await,
2552                setup_conflux_tunnel(&rt, true, params).await,
2553            ];
2554
2555            for tunnel in invalid_tunnels {
2556                let TestTunnelCtx {
2557                    tunnel: _tunnel,
2558                    circs: _circs,
2559                    conflux_link_rx,
2560                } = tunnel;
2561
2562                let conflux_hs_err = conflux_link_rx.await.unwrap().unwrap_err();
2563                let err_src = conflux_hs_err.source().unwrap();
2564
2565                // The two circuits don't end in the same hop (no join point),
2566                // so the reactor will refuse to link them
2567                assert!(
2568                    err_src
2569                        .to_string()
2570                        .contains("one more more conflux circuits are invalid")
2571                );
2572            }
2573        });
2574    }
2575
2576    // TODO: this structure could be reused for the other tests,
2577    // to address nickm's comment:
2578    // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3005#note_3202362
2579    #[derive(Debug)]
2580    #[allow(unused)]
2581    #[cfg(feature = "conflux")]
2582    struct TestCircuitCtx {
2583        chan_rx: Receiver<AnyChanCell>,
2584        chan_tx: Sender<std::result::Result<AnyChanCell, Error>>,
2585        circ_tx: CircuitRxSender,
2586        unique_id: UniqId,
2587    }
2588
2589    #[derive(Debug)]
2590    #[cfg(feature = "conflux")]
2591    struct TestTunnelCtx {
2592        tunnel: Arc<ClientTunnel>,
2593        circs: Vec<TestCircuitCtx>,
2594        conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
2595    }
2596
2597    /// Wait for a LINK cell to arrive on the specified channel and return its payload.
2598    #[cfg(feature = "conflux")]
2599    async fn await_link_payload(rx: &mut Receiver<AnyChanCell>) -> ConfluxLink {
2600        // Wait for the LINK cell...
2601        let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2602        let rmsg = match chmsg {
2603            AnyChanMsg::Relay(r) => {
2604                AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2605                    .unwrap()
2606            }
2607            other => panic!("{:?}", other),
2608        };
2609        let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2610
2611        let link = match rmsg {
2612            AnyRelayMsg::ConfluxLink(link) => link,
2613            _ => panic!("unexpected relay message {rmsg:?}"),
2614        };
2615
2616        assert!(streamid.is_none());
2617
2618        link
2619    }
2620
2621    #[cfg(feature = "conflux")]
2622    async fn setup_conflux_tunnel(
2623        rt: &MockRuntime,
2624        same_hops: bool,
2625        params: CircParameters,
2626    ) -> TestTunnelCtx {
2627        let hops1 = hop_details(3, 0);
2628        let hops2 = if same_hops {
2629            hops1.clone()
2630        } else {
2631            hop_details(3, 10)
2632        };
2633
2634        let (chan1, rx1, chan_sink1) = working_fake_channel(rt);
2635        let (mut tunnel1, sink1) = newtunnel_ext(
2636            rt,
2637            UniqId::new(1, 3),
2638            chan1,
2639            hops1,
2640            2.into(),
2641            params.clone(),
2642        )
2643        .await;
2644
2645        let (chan2, rx2, chan_sink2) = working_fake_channel(rt);
2646
2647        let (tunnel2, sink2) =
2648            newtunnel_ext(rt, UniqId::new(2, 4), chan2, hops2, 2.into(), params).await;
2649
2650        let (answer_tx, answer_rx) = oneshot::channel();
2651        tunnel2
2652            .as_single_circ()
2653            .unwrap()
2654            .command
2655            .unbounded_send(CtrlCmd::ShutdownAndReturnCircuit { answer: answer_tx })
2656            .unwrap();
2657
2658        let circuit = answer_rx.await.unwrap().unwrap();
2659        // The circuit should be shutting down its reactor
2660        rt.advance_until_stalled().await;
2661        assert!(tunnel2.is_closed());
2662
2663        let (conflux_link_tx, conflux_link_rx) = oneshot::channel();
2664        // Tell the first circuit to link with the second and form a multipath tunnel
2665        tunnel1
2666            .as_single_circ()
2667            .unwrap()
2668            .control
2669            .unbounded_send(CtrlMsg::LinkCircuits {
2670                circuits: vec![circuit],
2671                answer: conflux_link_tx,
2672            })
2673            .unwrap();
2674
2675        let circ_ctx1 = TestCircuitCtx {
2676            chan_rx: rx1,
2677            chan_tx: chan_sink1,
2678            circ_tx: sink1,
2679            unique_id: tunnel1.unique_id(),
2680        };
2681
2682        let circ_ctx2 = TestCircuitCtx {
2683            chan_rx: rx2,
2684            chan_tx: chan_sink2,
2685            circ_tx: sink2,
2686            unique_id: tunnel2.unique_id(),
2687        };
2688
2689        // TODO(conflux): nothing currently sets this,
2690        // so we need to manually set it.
2691        //
2692        // Instead of doing this, we should have a ClientCirc
2693        // API that sends CtrlMsg::Link circuits and sets this to true
2694        tunnel1.circ.is_multi_path = true;
2695        TestTunnelCtx {
2696            tunnel: Arc::new(tunnel1),
2697            circs: vec![circ_ctx1, circ_ctx2],
2698            conflux_link_rx,
2699        }
2700    }
2701
2702    #[cfg(feature = "conflux")]
2703    async fn setup_good_conflux_tunnel(
2704        rt: &MockRuntime,
2705        cc_params: CongestionControlParams,
2706    ) -> TestTunnelCtx {
2707        // Our 2 test circuits are identical, so they both have the same guards,
2708        // which technically violates the conflux set rule mentioned in prop354.
2709        // For testing purposes this is fine, but in production we'll need to ensure
2710        // the calling code prevents guard reuse (except in the case where
2711        // one of the guards happens to be Guard + Exit)
2712        let same_hops = true;
2713        let flow_ctrl_params = FlowCtrlParameters::defaults_for_tests();
2714        let params = CircParameters::new(true, cc_params, flow_ctrl_params);
2715        setup_conflux_tunnel(rt, same_hops, params).await
2716    }
2717
2718    #[cfg(feature = "conflux")]
2719    async fn setup_bad_conflux_tunnel(rt: &MockRuntime) -> TestTunnelCtx {
2720        // The two circuits don't share any hops,
2721        // so they won't end in the same hop (no join point),
2722        // causing the reactor to refuse to link them.
2723        let same_hops = false;
2724        let flow_ctrl_params = FlowCtrlParameters::defaults_for_tests();
2725        let params = CircParameters::new(true, build_cc_vegas_params(), flow_ctrl_params);
2726        setup_conflux_tunnel(rt, same_hops, params).await
2727    }
2728
2729    #[traced_test]
2730    #[test]
2731    #[cfg(feature = "conflux")]
2732    fn reject_conflux_linked_before_hs() {
2733        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2734            let (chan, mut _rx, _sink) = working_fake_channel(&rt);
2735            let (tunnel, mut sink) = newtunnel(&rt, chan).await;
2736
2737            let nonce = V1Nonce::new(&mut testing_rng());
2738            let payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
2739            // Send a LINKED cell
2740            let linked = relaymsg::ConfluxLinked::new(payload).into();
2741            sink.send(rmsg_to_ccmsg(None, linked)).await.unwrap();
2742
2743            rt.advance_until_stalled().await;
2744            assert!(tunnel.is_closed());
2745        });
2746    }
2747
2748    #[traced_test]
2749    #[test]
2750    #[cfg(feature = "conflux")]
2751    fn conflux_hs_timeout() {
2752        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2753            let TestTunnelCtx {
2754                tunnel: _tunnel,
2755                circs,
2756                conflux_link_rx,
2757            } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
2758
2759            let [mut circ1, _circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
2760
2761            // Wait for the LINK cell
2762            let link = await_link_payload(&mut circ1.chan_rx).await;
2763
2764            // Send a LINK cell on the first leg...
2765            let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
2766            circ1
2767                .circ_tx
2768                .send(rmsg_to_ccmsg(None, linked))
2769                .await
2770                .unwrap();
2771
2772            // Do nothing, and wait for the handshake to timeout on the second leg
2773            rt.advance_by(Duration::from_secs(60)).await;
2774
2775            let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
2776
2777            // Get the handshake results of each circuit
2778            let [res1, res2]: [StdResult<(), ConfluxHandshakeError>; 2] =
2779                conflux_hs_res.try_into().unwrap();
2780
2781            assert!(res1.is_ok());
2782
2783            let err = res2.unwrap_err();
2784            assert_matches!(err, ConfluxHandshakeError::Timeout);
2785        });
2786    }
2787
2788    #[traced_test]
2789    #[test]
2790    #[cfg(feature = "conflux")]
2791    fn conflux_bad_hs() {
2792        use crate::util::err::ConfluxHandshakeError;
2793
2794        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2795            let nonce = V1Nonce::new(&mut testing_rng());
2796            let bad_link_payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
2797            //let extended2 = relaymsg::Extended2::new(vec![]).into();
2798            let bad_hs_responses = [
2799                (
2800                    rmsg_to_ccmsg(
2801                        None,
2802                        relaymsg::ConfluxLinked::new(bad_link_payload.clone()).into(),
2803                    ),
2804                    "Received CONFLUX_LINKED cell with mismatched nonce",
2805                ),
2806                (
2807                    rmsg_to_ccmsg(None, relaymsg::ConfluxLink::new(bad_link_payload).into()),
2808                    "Unexpected CONFLUX_LINK cell from hop #3 on client circuit",
2809                ),
2810                (
2811                    rmsg_to_ccmsg(None, relaymsg::ConfluxSwitch::new(0).into()),
2812                    "Received CONFLUX_SWITCH on unlinked circuit?!",
2813                ),
2814                // TODO: this currently causes the reactor to shut down immediately,
2815                // without sending a response on the handshake channel
2816                /*
2817                (
2818                    rmsg_to_ccmsg(None, extended2),
2819                    "Received CONFLUX_LINKED cell with mismatched nonce",
2820                ),
2821                */
2822            ];
2823
2824            for (bad_cell, expected_err) in bad_hs_responses {
2825                let TestTunnelCtx {
2826                    tunnel,
2827                    circs,
2828                    conflux_link_rx,
2829                } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
2830
2831                let [mut _circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
2832
2833                // Respond with a bogus cell on one of the legs
2834                circ2.circ_tx.send(bad_cell).await.unwrap();
2835
2836                let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
2837                // Get the handshake results (the handshake results are reported early,
2838                // without waiting for the second circuit leg's handshake to timeout,
2839                // because this is a protocol violation causing the entire tunnel to shut down)
2840                let [res2]: [StdResult<(), ConfluxHandshakeError>; 1] =
2841                    conflux_hs_res.try_into().unwrap();
2842
2843                match res2.unwrap_err() {
2844                    ConfluxHandshakeError::Link(Error::CircProto(e)) => {
2845                        assert_eq!(e, expected_err);
2846                    }
2847                    e => panic!("unexpected error: {e:?}"),
2848                }
2849
2850                assert!(tunnel.is_closed());
2851            }
2852        });
2853    }
2854
2855    #[traced_test]
2856    #[test]
2857    #[cfg(feature = "conflux")]
2858    fn unexpected_conflux_cell() {
2859        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2860            let nonce = V1Nonce::new(&mut testing_rng());
2861            let link_payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
2862            let bad_cells = [
2863                rmsg_to_ccmsg(
2864                    None,
2865                    relaymsg::ConfluxLinked::new(link_payload.clone()).into(),
2866                ),
2867                rmsg_to_ccmsg(
2868                    None,
2869                    relaymsg::ConfluxLink::new(link_payload.clone()).into(),
2870                ),
2871                rmsg_to_ccmsg(None, relaymsg::ConfluxSwitch::new(0).into()),
2872            ];
2873
2874            for bad_cell in bad_cells {
2875                let (chan, mut _rx, _sink) = working_fake_channel(&rt);
2876                let (tunnel, mut sink) = newtunnel(&rt, chan).await;
2877
2878                sink.send(bad_cell).await.unwrap();
2879                rt.advance_until_stalled().await;
2880
2881                // Note: unfortunately we can't assert the circuit is
2882                // closing for the reason, because the reactor just logs
2883                // the error and then exits.
2884                assert!(tunnel.is_closed());
2885            }
2886        });
2887    }
2888
2889    #[traced_test]
2890    #[test]
2891    #[cfg(feature = "conflux")]
2892    fn conflux_bad_linked() {
2893        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2894            let TestTunnelCtx {
2895                tunnel,
2896                circs,
2897                conflux_link_rx: _,
2898            } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
2899
2900            let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
2901
2902            let link = await_link_payload(&mut circ1.chan_rx).await;
2903
2904            // Send a LINK cell on the first leg...
2905            let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
2906            circ1
2907                .circ_tx
2908                .send(rmsg_to_ccmsg(None, linked))
2909                .await
2910                .unwrap();
2911
2912            // ...and two LINKED cells on the second
2913            let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
2914            circ2
2915                .circ_tx
2916                .send(rmsg_to_ccmsg(None, linked))
2917                .await
2918                .unwrap();
2919            let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
2920            circ2
2921                .circ_tx
2922                .send(rmsg_to_ccmsg(None, linked))
2923                .await
2924                .unwrap();
2925
2926            rt.advance_until_stalled().await;
2927
2928            // Receiving a LINKED cell on an already linked leg causes
2929            // the tunnel to be torn down
2930            assert!(tunnel.is_closed());
2931        });
2932    }
2933
2934    #[traced_test]
2935    #[test]
2936    #[cfg(feature = "conflux")]
2937    fn conflux_bad_switch() {
2938        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2939            let cc_vegas_params = build_cc_vegas_params();
2940            let cwnd_init = cc_vegas_params.cwnd_params().cwnd_init();
2941            let bad_switch = [
2942                // SWITCH cells with seqno = 0 are not allowed
2943                relaymsg::ConfluxSwitch::new(0),
2944                // SWITCH cells with seqno > cc_init_cwnd are not allowed
2945                // on tunnels that have not received any data
2946                relaymsg::ConfluxSwitch::new(cwnd_init + 1),
2947            ];
2948
2949            for bad_cell in bad_switch {
2950                let TestTunnelCtx {
2951                    tunnel,
2952                    circs,
2953                    conflux_link_rx,
2954                } = setup_good_conflux_tunnel(&rt, cc_vegas_params.clone()).await;
2955
2956                let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
2957
2958                let link = await_link_payload(&mut circ1.chan_rx).await;
2959
2960                // Send a LINKED cell on both legs
2961                for circ in [&mut circ1, &mut circ2] {
2962                    let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
2963                    circ.circ_tx
2964                        .send(rmsg_to_ccmsg(None, linked))
2965                        .await
2966                        .unwrap();
2967                }
2968
2969                let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
2970                assert!(conflux_hs_res.iter().all(|res| res.is_ok()));
2971
2972                // Now send a bad SWITCH cell on the first leg.
2973                // This will cause the tunnel reactor to shut down.
2974                let msg = rmsg_to_ccmsg(None, bad_cell.clone().into());
2975                circ1.circ_tx.send(msg).await.unwrap();
2976
2977                // The tunnel should be shutting down
2978                rt.advance_until_stalled().await;
2979                assert!(tunnel.is_closed());
2980            }
2981        });
2982    }
2983
2984    #[traced_test]
2985    #[test]
2986    #[cfg(feature = "conflux")]
2987    fn conflux_consecutive_switch() {
2988        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2989            let TestTunnelCtx {
2990                tunnel,
2991                circs,
2992                conflux_link_rx,
2993            } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
2994
2995            let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
2996
2997            let link = await_link_payload(&mut circ1.chan_rx).await;
2998
2999            // Send a LINKED cell on both legs
3000            for circ in [&mut circ1, &mut circ2] {
3001                let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3002                circ.circ_tx
3003                    .send(rmsg_to_ccmsg(None, linked))
3004                    .await
3005                    .unwrap();
3006            }
3007
3008            let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
3009            assert!(conflux_hs_res.iter().all(|res| res.is_ok()));
3010
3011            // Send a valid SWITCH cell on the first leg.
3012            let switch1 = relaymsg::ConfluxSwitch::new(10);
3013            let msg = rmsg_to_ccmsg(None, switch1.into());
3014            circ1.circ_tx.send(msg).await.unwrap();
3015
3016            // The tunnel should not be shutting down
3017            rt.advance_until_stalled().await;
3018            assert!(!tunnel.is_closed());
3019
3020            // Send another valid SWITCH cell on the same leg.
3021            let switch2 = relaymsg::ConfluxSwitch::new(12);
3022            let msg = rmsg_to_ccmsg(None, switch2.into());
3023            circ1.circ_tx.send(msg).await.unwrap();
3024
3025            // The tunnel should now be shutting down
3026            // (consecutive switches are not allowed)
3027            rt.advance_until_stalled().await;
3028            assert!(tunnel.is_closed());
3029        });
3030    }
3031
3032    // This test ensures CtrlMsg::ShutdownAndReturnCircuit returns an
3033    // error when called on a multi-path tunnel
3034    #[traced_test]
3035    #[test]
3036    #[cfg(feature = "conflux")]
3037    fn shutdown_and_return_circ_multipath() {
3038        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3039            let TestTunnelCtx {
3040                tunnel,
3041                circs,
3042                conflux_link_rx: _,
3043            } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
3044
3045            rt.progress_until_stalled().await;
3046
3047            let (answer_tx, answer_rx) = oneshot::channel();
3048            tunnel
3049                .circ
3050                .command
3051                .unbounded_send(CtrlCmd::ShutdownAndReturnCircuit { answer: answer_tx })
3052                .unwrap();
3053
3054            // map explicitly returns () for clarity
3055            #[allow(clippy::unused_unit, clippy::semicolon_if_nothing_returned)]
3056            let err = answer_rx
3057                .await
3058                .unwrap()
3059                .map(|_| {
3060                    // Map to () so we can call unwrap
3061                    // (Circuit doesn't impl debug)
3062                    ()
3063                })
3064                .unwrap_err();
3065
3066            const MSG: &str = "not a single leg conflux set (got at least 2 elements when exactly one was expected)";
3067            assert!(err.to_string().contains(MSG), "{err}");
3068
3069            // The tunnel reactor should be shutting down,
3070            // regardless of the error
3071            rt.progress_until_stalled().await;
3072            assert!(tunnel.is_closed());
3073
3074            // Keep circs alive, to prevent the reactor
3075            // from shutting down prematurely
3076            drop(circs);
3077        });
3078    }
3079
3080    /// Run a conflux test endpoint.
3081    #[cfg(feature = "conflux")]
3082    #[derive(Debug)]
3083    enum ConfluxTestEndpoint<I: Iterator<Item = Option<Duration>>> {
3084        /// Pretend to be an exit relay.
3085        Relay(ConfluxExitState<I>),
3086        /// Client task.
3087        Client {
3088            /// Channel for receiving the outcome of the conflux handshakes.
3089            conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
3090            /// The tunnel reactor handle
3091            tunnel: Arc<ClientTunnel>,
3092            /// Data to send on a stream.
3093            send_data: Vec<u8>,
3094            /// Data we expect to receive on a stream.
3095            recv_data: Vec<u8>,
3096        },
3097    }
3098
3099    /// Structure for returning the sinks, channels, etc. that must stay
3100    /// alive until the test is complete.
3101    #[allow(unused, clippy::large_enum_variant)]
3102    #[derive(Debug)]
3103    #[cfg(feature = "conflux")]
3104    enum ConfluxEndpointResult {
3105        Circuit {
3106            tunnel: Arc<ClientTunnel>,
3107            stream: DataStream,
3108        },
3109        Relay {
3110            circ: TestCircuitCtx,
3111        },
3112    }
3113
3114    /// Stream data, shared by all the mock exit endpoints.
3115    #[derive(Debug)]
3116    #[cfg(feature = "conflux")]
3117    struct ConfluxStreamState {
3118        /// The data received so far on this stream (at the exit).
3119        data_recvd: Vec<u8>,
3120        /// The total amount of data we expect to receive on this stream.
3121        expected_data_len: usize,
3122        /// Whether we have seen a BEGIN cell yet.
3123        begin_recvd: bool,
3124        /// Whether we have seen an END cell yet.
3125        end_recvd: bool,
3126        /// Whether we have sent an END cell yet.
3127        end_sent: bool,
3128    }
3129
3130    #[cfg(feature = "conflux")]
3131    impl ConfluxStreamState {
3132        fn new(expected_data_len: usize) -> Self {
3133            Self {
3134                data_recvd: vec![],
3135                expected_data_len,
3136                begin_recvd: false,
3137                end_recvd: false,
3138                end_sent: false,
3139            }
3140        }
3141    }
3142
3143    /// An object describing a SWITCH cell that we expect to receive
3144    /// in the mock exit
3145    #[derive(Debug)]
3146    #[cfg(feature = "conflux")]
3147    struct ExpectedSwitch {
3148        /// The number of cells we've seen on this leg so far,
3149        /// up to and including the SWITCH.
3150        cells_so_far: usize,
3151        /// The expected seqno in SWITCH cell,
3152        seqno: u32,
3153    }
3154
3155    /// Object dispatching cells for delivery on the appropriate
3156    /// leg in a multipath tunnel.
3157    ///
3158    /// Used to send out-of-order cells from the mock exit
3159    /// to the client under test.
3160    #[cfg(feature = "conflux")]
3161    struct CellDispatcher {
3162        /// Channels on which to send the [`CellToSend`] commands on.
3163        leg_tx: HashMap<UniqId, mpsc::Sender<CellToSend>>,
3164        /// The list of cells to send,
3165        cells_to_send: Vec<(UniqId, AnyRelayMsg)>,
3166    }
3167
3168    #[cfg(feature = "conflux")]
3169    impl CellDispatcher {
3170        async fn run(mut self) {
3171            while !self.cells_to_send.is_empty() {
3172                let (circ_id, cell) = self.cells_to_send.remove(0);
3173                let cell_tx = self.leg_tx.get_mut(&circ_id).unwrap();
3174                let (done_tx, done_rx) = oneshot::channel();
3175                cell_tx.send(CellToSend { done_tx, cell }).await.unwrap();
3176                // Wait for the cell to be sent before sending the next one.
3177                let () = done_rx.await.unwrap();
3178            }
3179        }
3180    }
3181
3182    /// A cell for the mock exit to send on one of its legs.
3183    #[cfg(feature = "conflux")]
3184    #[derive(Debug)]
3185    struct CellToSend {
3186        /// Channel for notifying the control task that the cell was sent.
3187        done_tx: oneshot::Sender<()>,
3188        /// The cell to send.
3189        cell: AnyRelayMsg,
3190    }
3191
3192    /// The state of a mock exit.
3193    #[derive(Debug)]
3194    #[cfg(feature = "conflux")]
3195    struct ConfluxExitState<I: Iterator<Item = Option<Duration>>> {
3196        /// The runtime, shared by the test client and mock exit tasks.
3197        ///
3198        /// The mutex prevents the client and mock exit tasks from calling
3199        /// functions like [`MockRuntime::advance_until_stalled`]
3200        /// or [`MockRuntime::progress_until_stalled]` concurrently,
3201        /// as this is not supported by the mock runtime.
3202        runtime: Arc<AsyncMutex<MockRuntime>>,
3203        /// The client view of the tunnel.
3204        tunnel: Arc<ClientTunnel>,
3205        /// The circuit test context.
3206        circ: TestCircuitCtx,
3207        /// The RTT delay to introduce just before each SENDME.
3208        ///
3209        /// Used to trigger the client to send a SWITCH.
3210        rtt_delays: I,
3211        /// State of the (only) expected stream on this tunnel,
3212        /// shared by all the mock exit endpoints.
3213        stream_state: Arc<Mutex<ConfluxStreamState>>,
3214        /// The number of cells after which to expect a SWITCH
3215        /// cell from the client.
3216        expect_switch: Vec<ExpectedSwitch>,
3217        /// Channel for receiving notifications from the other leg.
3218        event_rx: mpsc::Receiver<MockExitEvent>,
3219        /// Channel for sending notifications to the other leg.
3220        event_tx: mpsc::Sender<MockExitEvent>,
3221        /// Whether this circuit leg should act as the primary (sending) leg.
3222        is_sending_leg: bool,
3223        /// A channel for receiving cells to send on this stream.
3224        cells_rx: mpsc::Receiver<CellToSend>,
3225    }
3226
3227    #[cfg(feature = "conflux")]
3228    async fn good_exit_handshake(
3229        runtime: &Arc<AsyncMutex<MockRuntime>>,
3230        init_rtt_delay: Option<Duration>,
3231        rx: &mut Receiver<ChanCell<AnyChanMsg>>,
3232        sink: &mut CircuitRxSender,
3233    ) {
3234        // Wait for the LINK cell
3235        let link = await_link_payload(rx).await;
3236
3237        // Introduce an artificial delay, to make one circ have a better initial RTT
3238        // than the other
3239        if let Some(init_rtt_delay) = init_rtt_delay {
3240            runtime.lock().await.advance_by(init_rtt_delay).await;
3241        }
3242
3243        // Reply with a LINKED cell...
3244        let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3245        sink.send(rmsg_to_ccmsg(None, linked)).await.unwrap();
3246
3247        // Wait for the client to respond with LINKED_ACK...
3248        let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
3249        let rmsg = match chmsg {
3250            AnyChanMsg::Relay(r) => {
3251                AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
3252                    .unwrap()
3253            }
3254            other => panic!("{other:?}"),
3255        };
3256        let (_streamid, rmsg) = rmsg.into_streamid_and_msg();
3257
3258        assert_matches!(rmsg, AnyRelayMsg::ConfluxLinkedAck(_));
3259    }
3260
3261    /// An event sent by one mock conflux leg to another.
3262    #[derive(Copy, Clone, Debug)]
3263    enum MockExitEvent {
3264        /// Inform the other leg we are done.
3265        Done,
3266        /// Inform the other leg a stream was opened.
3267        BeginRecvd(StreamId),
3268    }
3269
3270    #[cfg(feature = "conflux")]
3271    async fn run_mock_conflux_exit<I: Iterator<Item = Option<Duration>>>(
3272        state: ConfluxExitState<I>,
3273    ) -> ConfluxEndpointResult {
3274        let ConfluxExitState {
3275            runtime,
3276            tunnel,
3277            mut circ,
3278            rtt_delays,
3279            stream_state,
3280            mut expect_switch,
3281            mut event_tx,
3282            mut event_rx,
3283            is_sending_leg,
3284            mut cells_rx,
3285        } = state;
3286
3287        let mut rtt_delays = rtt_delays.into_iter();
3288
3289        // Expect the client to open a stream, and de-multiplex the received stream data
3290        let stream_len = stream_state.lock().unwrap().expected_data_len;
3291        let mut data_cells_received = 0_usize;
3292        let mut cell_count = 0_usize;
3293        let mut tags = vec![];
3294        let mut streamid = None;
3295        let mut done_writing = false;
3296
3297        loop {
3298            let should_exit = {
3299                let stream_state = stream_state.lock().unwrap();
3300                let done_reading = stream_state.data_recvd.len() >= stream_len;
3301
3302                (stream_state.begin_recvd || stream_state.end_recvd) && done_reading && done_writing
3303            };
3304
3305            if should_exit {
3306                break;
3307            }
3308
3309            use futures::select;
3310
3311            // Only start reading from the dispatcher channel after the stream is open
3312            // and we're ready to start sending cells.
3313            let mut next_cell = if streamid.is_some() && !done_writing {
3314                Box::pin(cells_rx.next().fuse())
3315                    as Pin<Box<dyn FusedFuture<Output = Option<CellToSend>> + Send>>
3316            } else {
3317                Box::pin(std::future::pending().fuse())
3318            };
3319
3320            // Wait for the BEGIN cell to arrive, or for the transfer to complete
3321            // (we need to bail if the other leg already completed);
3322            let res = select! {
3323                res = circ.chan_rx.next() => {
3324                    res.unwrap()
3325                },
3326                res = event_rx.next() => {
3327                    let Some(event) = res else {
3328                        break;
3329                    };
3330
3331                    match event {
3332                        MockExitEvent::Done => {
3333                            break;
3334                        },
3335                        MockExitEvent::BeginRecvd(id) => {
3336                            // The stream is now open (the other leg received the BEGIN),
3337                            // so we're reading to start reading cells from the cell dispatcher.
3338                            streamid = Some(id);
3339                            continue;
3340                        },
3341                    }
3342                }
3343                res = next_cell => {
3344                    if let Some(cell_to_send) = res {
3345                        let CellToSend { cell, done_tx } = cell_to_send;
3346
3347                        // SWITCH cells don't have a stream ID
3348                        let streamid = if matches!(cell, AnyRelayMsg::ConfluxSwitch(_)) {
3349                            None
3350                        } else {
3351                            streamid
3352                        };
3353
3354                        circ.circ_tx
3355                            .send(rmsg_to_ccmsg(streamid, cell))
3356                            .await
3357                            .unwrap();
3358
3359                        runtime.lock().await.advance_until_stalled().await;
3360                        done_tx.send(()).unwrap();
3361                    } else {
3362                        done_writing = true;
3363                    }
3364
3365                    continue;
3366                }
3367            };
3368
3369            let (_id, chmsg) = res.into_circid_and_msg();
3370            cell_count += 1;
3371            let rmsg = match chmsg {
3372                AnyChanMsg::Relay(r) => {
3373                    AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
3374                        .unwrap()
3375                }
3376                other => panic!("{:?}", other),
3377            };
3378            let (new_streamid, rmsg) = rmsg.into_streamid_and_msg();
3379            if streamid.is_none() {
3380                streamid = new_streamid;
3381            }
3382
3383            let begin_recvd = stream_state.lock().unwrap().begin_recvd;
3384            let end_recvd = stream_state.lock().unwrap().end_recvd;
3385            match rmsg {
3386                AnyRelayMsg::Begin(_) if begin_recvd => {
3387                    panic!("client tried to open two streams?!");
3388                }
3389                AnyRelayMsg::Begin(_) if !begin_recvd => {
3390                    stream_state.lock().unwrap().begin_recvd = true;
3391                    // Reply with a connected cell...
3392                    let connected = relaymsg::Connected::new_empty().into();
3393                    circ.circ_tx
3394                        .send(rmsg_to_ccmsg(streamid, connected))
3395                        .await
3396                        .unwrap();
3397                    // Tell the other leg we received a BEGIN cell
3398                    event_tx
3399                        .send(MockExitEvent::BeginRecvd(streamid.unwrap()))
3400                        .await
3401                        .unwrap();
3402                }
3403                AnyRelayMsg::End(_) if !end_recvd => {
3404                    stream_state.lock().unwrap().end_recvd = true;
3405                    break;
3406                }
3407                AnyRelayMsg::End(_) if end_recvd => {
3408                    panic!("received two END cells for the same stream?!");
3409                }
3410                AnyRelayMsg::ConfluxSwitch(cell) => {
3411                    // Ensure we got the SWITCH after the expected number of cells
3412                    let expected = expect_switch.remove(0);
3413
3414                    assert_eq!(expected.cells_so_far, cell_count);
3415                    assert_eq!(expected.seqno, cell.seqno());
3416
3417                    // To keep the tests simple, we don't handle out of order cells,
3418                    // and simply sort the received data at the end.
3419                    // This ensures all the data was actually received,
3420                    // but it doesn't actually test that the SWITCH cells
3421                    // contain the appropriate seqnos.
3422                    continue;
3423                }
3424                AnyRelayMsg::Data(dat) => {
3425                    data_cells_received += 1;
3426                    stream_state
3427                        .lock()
3428                        .unwrap()
3429                        .data_recvd
3430                        .extend_from_slice(dat.as_ref());
3431
3432                    let is_next_cell_sendme = data_cells_received.is_multiple_of(31);
3433                    if is_next_cell_sendme {
3434                        if tags.is_empty() {
3435                            // Important: we need to make sure all the SENDMEs
3436                            // we sent so far have been processed by the reactor
3437                            // (otherwise the next QuerySendWindow call
3438                            // might return an outdated list of tags!)
3439                            runtime.lock().await.advance_until_stalled().await;
3440                            let (tx, rx) = oneshot::channel();
3441                            tunnel
3442                                .circ
3443                                .command
3444                                .unbounded_send(CtrlCmd::QuerySendWindow {
3445                                    hop: 2.into(),
3446                                    leg: circ.unique_id,
3447                                    done: tx,
3448                                })
3449                                .unwrap();
3450
3451                            // Get a fresh batch of tags.
3452                            let (_window, new_tags) = rx.await.unwrap().unwrap();
3453                            tags = new_tags;
3454                        }
3455
3456                        let tag = tags.remove(0);
3457
3458                        // Introduce an artificial delay, to make one circ have worse RTT
3459                        // than the other, and thus trigger a SWITCH
3460                        if let Some(rtt_delay) = rtt_delays.next().flatten() {
3461                            runtime.lock().await.advance_by(rtt_delay).await;
3462                        }
3463                        // Make and send a circuit-level SENDME
3464                        let sendme = relaymsg::Sendme::from(tag).into();
3465
3466                        circ.circ_tx
3467                            .send(rmsg_to_ccmsg(None, sendme))
3468                            .await
3469                            .unwrap();
3470                    }
3471                }
3472                _ => panic!("unexpected message {rmsg:?} on leg {}", circ.unique_id),
3473            }
3474        }
3475
3476        let end_recvd = stream_state.lock().unwrap().end_recvd;
3477
3478        // Close the stream if the other endpoint hasn't already done so
3479        if is_sending_leg && !end_recvd {
3480            let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into();
3481            circ.circ_tx
3482                .send(rmsg_to_ccmsg(streamid, end))
3483                .await
3484                .unwrap();
3485            stream_state.lock().unwrap().end_sent = true;
3486        }
3487
3488        // This is allowed to fail, because the other leg might have exited first.
3489        let _ = event_tx.send(MockExitEvent::Done).await;
3490
3491        // Ensure we received all the switch cells we were expecting
3492        assert!(
3493            expect_switch.is_empty(),
3494            "expect_switch = {expect_switch:?}"
3495        );
3496
3497        ConfluxEndpointResult::Relay { circ }
3498    }
3499
3500    #[cfg(feature = "conflux")]
3501    async fn run_conflux_client(
3502        tunnel: Arc<ClientTunnel>,
3503        conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
3504        send_data: Vec<u8>,
3505        recv_data: Vec<u8>,
3506    ) -> ConfluxEndpointResult {
3507        let res = conflux_link_rx.await;
3508
3509        let res = res.unwrap().unwrap();
3510        assert_eq!(res.len(), 2);
3511
3512        // All circuit legs have completed the conflux handshake,
3513        // so we now have a multipath tunnel
3514
3515        // Now we're ready to open a stream
3516        let mut stream = tunnel
3517            .begin_stream("www.example.com", 443, None)
3518            .await
3519            .unwrap();
3520
3521        stream.write_all(&send_data).await.unwrap();
3522        stream.flush().await.unwrap();
3523
3524        let mut recv: Vec<u8> = Vec::new();
3525        let recv_len = stream.read_to_end(&mut recv).await.unwrap();
3526        assert_eq!(recv_len, recv_data.len());
3527        assert_eq!(recv_data, recv);
3528
3529        ConfluxEndpointResult::Circuit { tunnel, stream }
3530    }
3531
3532    #[cfg(feature = "conflux")]
3533    async fn run_conflux_endpoint<I: Iterator<Item = Option<Duration>>>(
3534        endpoint: ConfluxTestEndpoint<I>,
3535    ) -> ConfluxEndpointResult {
3536        match endpoint {
3537            ConfluxTestEndpoint::Relay(state) => run_mock_conflux_exit(state).await,
3538            ConfluxTestEndpoint::Client {
3539                tunnel,
3540                conflux_link_rx,
3541                send_data,
3542                recv_data,
3543            } => run_conflux_client(tunnel, conflux_link_rx, send_data, recv_data).await,
3544        }
3545    }
3546
3547    // In this test, a `ConfluxTestEndpoint::Client` task creates a multipath tunnel
3548    // with 2 legs, opens a stream and sends 300 DATA cells on it.
3549    //
3550    // The test spawns two `ConfluxTestEndpoint::Relay` tasks (one for each leg),
3551    // which mock the behavior of an exit. The two relay tasks introduce
3552    // artificial delays before each SENDME sent to the client,
3553    // in order to trigger it to switch its sending leg predictably.
3554    //
3555    // The mock exit does not send any data on the stream.
3556    //
3557    // This test checks that the client sends SWITCH cells at the right time,
3558    // and that all the data it sent over the stream arrived at the exit.
3559    //
3560    // Note, however, that it doesn't check that the client sends the data in
3561    // the right order. For simplicity, the test concatenates the data received
3562    // on both legs, sorts it, and then compares it against the of the data sent
3563    // by the client (TODO: improve this)
3564    #[traced_test]
3565    #[test]
3566    #[cfg(feature = "conflux")]
3567    fn multipath_client_to_exit() {
3568        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3569            /// The number of data cells to send.
3570            const NUM_CELLS: usize = 300;
3571            /// 498 bytes per DATA cell.
3572            const CELL_SIZE: usize = 498;
3573
3574            let TestTunnelCtx {
3575                tunnel,
3576                circs,
3577                conflux_link_rx,
3578            } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
3579            let [circ1, circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3580
3581            // The stream data we're going to send over the conflux tunnel
3582            let mut send_data = (0..255_u8)
3583                .cycle()
3584                .take(NUM_CELLS * CELL_SIZE)
3585                .collect::<Vec<_>>();
3586            let stream_state = Arc::new(Mutex::new(ConfluxStreamState::new(send_data.len())));
3587
3588            let mut tasks = vec![];
3589
3590            // Channels used by the mock relays to notify each other
3591            // of various events.
3592            let (tx1, rx1) = mpsc::channel(1);
3593            let (tx2, rx2) = mpsc::channel(1);
3594
3595            // The 9 RTT delays to insert before each of the 9 SENDMEs
3596            // the exit will end up sending.
3597            //
3598            // Note: the first delay is the init_rtt delay (measured during the conflux HS).
3599            let circ1_rtt_delays = [
3600                // Initially, circ1 has better RTT, so we will start on this leg.
3601                Some(Duration::from_millis(100)),
3602                // But then its RTT takes a turn for the worse,
3603                // triggering a switch after the first SENDME is processed
3604                // (this happens after sending 123 DATA cells).
3605                Some(Duration::from_millis(500)),
3606                Some(Duration::from_millis(700)),
3607                Some(Duration::from_millis(900)),
3608                Some(Duration::from_millis(1100)),
3609                Some(Duration::from_millis(1300)),
3610                Some(Duration::from_millis(1500)),
3611                Some(Duration::from_millis(1700)),
3612                Some(Duration::from_millis(1900)),
3613                Some(Duration::from_millis(2100)),
3614            ]
3615            .into_iter();
3616
3617            let circ2_rtt_delays = [
3618                Some(Duration::from_millis(200)),
3619                Some(Duration::from_millis(400)),
3620                Some(Duration::from_millis(600)),
3621                Some(Duration::from_millis(800)),
3622                Some(Duration::from_millis(1000)),
3623                Some(Duration::from_millis(1200)),
3624                Some(Duration::from_millis(1400)),
3625                Some(Duration::from_millis(1600)),
3626                Some(Duration::from_millis(1800)),
3627                Some(Duration::from_millis(2000)),
3628            ]
3629            .into_iter();
3630
3631            let expected_switches1 = vec![ExpectedSwitch {
3632                // We start on this leg, and receive a BEGIN cell,
3633                // followed by (4 * 31 - 1) = 123 DATA cells.
3634                // Then it becomes blocked on CC, then finally the reactor
3635                // realizes it has some SENDMEs to process, and
3636                // then as a result of the new RTT measurement, we switch to circ1,
3637                // and then finally we switch back here, and get another SWITCH
3638                // as the 126th cell.
3639                cells_so_far: 126,
3640                // Leg 2 switches back to this leg after the 249th cell
3641                // (just before sending the 250th one):
3642                // seqno = 125 carried over from leg 1 (see the seqno of the
3643                // SWITCH expected on leg 2 below), plus 1 SWITCH, plus
3644                // 4 * 31 = 124 DATA cells after which the RTT of the first leg
3645                // is deemed favorable again.
3646                //
3647                // 249 - 125 (last_seq_sent of leg 1) = 124
3648                seqno: 124,
3649            }];
3650
3651            let expected_switches2 = vec![ExpectedSwitch {
3652                // The SWITCH is the first cell we received after the conflux HS
3653                // on this leg.
3654                cells_so_far: 1,
3655                // See explanation on the ExpectedSwitch from circ1 above.
3656                seqno: 125,
3657            }];
3658
3659            let relay_runtime = Arc::new(AsyncMutex::new(rt.clone()));
3660
3661            // Drop the senders and close the channels,
3662            // we have nothing to send in this test.
3663            let (_, cells_rx1) = mpsc::channel(1);
3664            let (_, cells_rx2) = mpsc::channel(1);
3665
3666            let relay1 = ConfluxExitState {
3667                runtime: Arc::clone(&relay_runtime),
3668                tunnel: Arc::clone(&tunnel),
3669                circ: circ1,
3670                rtt_delays: circ1_rtt_delays,
3671                stream_state: Arc::clone(&stream_state),
3672                expect_switch: expected_switches1,
3673                event_tx: tx1,
3674                event_rx: rx2,
3675                is_sending_leg: true,
3676                cells_rx: cells_rx1,
3677            };
3678
3679            let relay2 = ConfluxExitState {
3680                runtime: Arc::clone(&relay_runtime),
3681                tunnel: Arc::clone(&tunnel),
3682                circ: circ2,
3683                rtt_delays: circ2_rtt_delays,
3684                stream_state: Arc::clone(&stream_state),
3685                expect_switch: expected_switches2,
3686                event_tx: tx2,
3687                event_rx: rx1,
3688                is_sending_leg: false,
3689                cells_rx: cells_rx2,
3690            };
3691
3692            for mut mock_relay in [relay1, relay2] {
3693                let leg = mock_relay.circ.unique_id;
3694
3695                // Do the conflux handshake
3696                //
3697                // We do this outside of run_conflux_endpoint,
3698                // toa void running both handshakes at concurrently
3699                // (this gives more predictable RTT delays:
3700                // if both handshake tasks run at once, they race
3701                // to advance the mock runtime's clock)
3702                good_exit_handshake(
3703                    &relay_runtime,
3704                    mock_relay.rtt_delays.next().flatten(),
3705                    &mut mock_relay.circ.chan_rx,
3706                    &mut mock_relay.circ.circ_tx,
3707                )
3708                .await;
3709
3710                let relay = ConfluxTestEndpoint::Relay(mock_relay);
3711
3712                tasks.push(rt.spawn_join(format!("relay task {leg}"), run_conflux_endpoint(relay)));
3713            }
3714
3715            tasks.push(rt.spawn_join(
3716                "client task".to_string(),
3717                run_conflux_endpoint(ConfluxTestEndpoint::Client {
3718                    tunnel,
3719                    conflux_link_rx,
3720                    send_data: send_data.clone(),
3721                    recv_data: vec![],
3722                }),
3723            ));
3724            let _sinks = futures::future::join_all(tasks).await;
3725            let mut stream_state = stream_state.lock().unwrap();
3726            assert!(stream_state.begin_recvd);
3727
3728            stream_state.data_recvd.sort();
3729            send_data.sort();
3730            assert_eq!(stream_state.data_recvd, send_data);
3731        });
3732    }
3733
3734    // In this test, a `ConfluxTestEndpoint::Client` task creates a multipath tunnel
3735    // with 2 legs, opens a stream and reads from the stream until the stream is closed.
3736    //
3737    // The test spawns two `ConfluxTestEndpoint::Relay` tasks (one for each leg),
3738    // which mock the behavior of an exit. The two tasks send DATA and SWITCH
3739    // cells on the two circuit "legs" such that some cells arrive out of order.
3740    // This forces the client to buffer some cells, and then reorder them when
3741    // the missing cells finally arrive.
3742    //
3743    // The client does not send any data on the stream.
3744    #[cfg(feature = "conflux")]
3745    async fn run_multipath_exit_to_client_test(
3746        rt: MockRuntime,
3747        tunnel: TestTunnelCtx,
3748        cells_to_send: Vec<(UniqId, AnyRelayMsg)>,
3749        send_data: Vec<u8>,
3750        recv_data: Vec<u8>,
3751    ) -> Arc<Mutex<ConfluxStreamState>> {
3752        let TestTunnelCtx {
3753            tunnel,
3754            circs,
3755            conflux_link_rx,
3756        } = tunnel;
3757        let [circ1, circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3758
3759        let stream_state = Arc::new(Mutex::new(ConfluxStreamState::new(send_data.len())));
3760
3761        let mut tasks = vec![];
3762        let relay_runtime = Arc::new(AsyncMutex::new(rt.clone()));
3763        let (cells_tx1, cells_rx1) = mpsc::channel(1);
3764        let (cells_tx2, cells_rx2) = mpsc::channel(1);
3765
3766        let dispatcher = CellDispatcher {
3767            leg_tx: [(circ1.unique_id, cells_tx1), (circ2.unique_id, cells_tx2)]
3768                .into_iter()
3769                .collect(),
3770            cells_to_send,
3771        };
3772
3773        // Channels used by the mock relays to notify each other
3774        // of various events.
3775        let (tx1, rx1) = mpsc::channel(1);
3776        let (tx2, rx2) = mpsc::channel(1);
3777
3778        let relay1 = ConfluxExitState {
3779            runtime: Arc::clone(&relay_runtime),
3780            tunnel: Arc::clone(&tunnel),
3781            circ: circ1,
3782            rtt_delays: [].into_iter(),
3783            stream_state: Arc::clone(&stream_state),
3784            // Expect no SWITCH cells from the client
3785            expect_switch: vec![],
3786            event_tx: tx1,
3787            event_rx: rx2,
3788            is_sending_leg: false,
3789            cells_rx: cells_rx1,
3790        };
3791
3792        let relay2 = ConfluxExitState {
3793            runtime: Arc::clone(&relay_runtime),
3794            tunnel: Arc::clone(&tunnel),
3795            circ: circ2,
3796            rtt_delays: [].into_iter(),
3797            stream_state: Arc::clone(&stream_state),
3798            // Expect no SWITCH cells from the client
3799            expect_switch: vec![],
3800            event_tx: tx2,
3801            event_rx: rx1,
3802            is_sending_leg: true,
3803            cells_rx: cells_rx2,
3804        };
3805
3806        // Run the cell dispatcher, which tells each exit leg task
3807        // what cells to write.
3808        //
3809        // This enables us to write out-of-order cells deterministically.
3810        rt.spawn(dispatcher.run()).unwrap();
3811
3812        for mut mock_relay in [relay1, relay2] {
3813            let leg = mock_relay.circ.unique_id;
3814
3815            good_exit_handshake(
3816                &relay_runtime,
3817                mock_relay.rtt_delays.next().flatten(),
3818                &mut mock_relay.circ.chan_rx,
3819                &mut mock_relay.circ.circ_tx,
3820            )
3821            .await;
3822
3823            let relay = ConfluxTestEndpoint::Relay(mock_relay);
3824
3825            tasks.push(rt.spawn_join(format!("relay task {leg}"), run_conflux_endpoint(relay)));
3826        }
3827
3828        tasks.push(rt.spawn_join(
3829            "client task".to_string(),
3830            run_conflux_endpoint(ConfluxTestEndpoint::Client {
3831                tunnel,
3832                conflux_link_rx,
3833                send_data: send_data.clone(),
3834                recv_data,
3835            }),
3836        ));
3837
3838        // Wait for all the tasks to complete
3839        let _sinks = futures::future::join_all(tasks).await;
3840
3841        stream_state
3842    }
3843
3844    #[traced_test]
3845    #[test]
3846    #[cfg(feature = "conflux")]
3847    fn multipath_exit_to_client() {
3848        // The data we expect the client to read from the stream
3849        const TO_SEND: &[u8] =
3850            b"But something about Buster Friendly irritated John Isidore, one specific thing";
3851
3852        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3853            // The indices of the tunnel legs.
3854            const CIRC1: usize = 0;
3855            const CIRC2: usize = 1;
3856
3857            // The client receives the following cells, in the order indicated
3858            // by the t0-t8 "timestamps" (where C = CONNECTED, D = DATA, E = END,
3859            // S = SWITCH):
3860            //
3861            //  Leg 1 (CIRC1):   -----------D--------------------- D -- D -- C
3862            //                              |                      |    |    | \
3863            //                              |                      |    |    |  v
3864            //                              |                      |    |    | client
3865            //                              |                      |    |    |  ^
3866            //                              |                      |    |    |/
3867            //  Leg 2 (CIRC2): E - D -- D --\--- D* -- S (seqno=4)-/----/----/
3868            //                 |   |    |   |    |       |         |    |    |
3869            //                 |   |    |   |    |       |         |    |    |
3870            //                 |   |    |   |    |       |         |    |    |
3871            //  Time:          t8  t7   t6  t5   t4      t3        t2   t1  t0
3872            //
3873            //
3874            //  The cells marked with * are out of order.
3875            //
3876            // Note: t0 is the time when the client receives the first cell,
3877            // and t8 is the time when it receives the last one.
3878            // In other words, this test simulates a mock exit that "sent" the cells
3879            // in the order t0, t1, t2, t5, t4, t6, t7, t8
3880            let simple_switch = vec![
3881                (CIRC1, relaymsg::Data::new(&TO_SEND[0..5]).unwrap().into()),
3882                (CIRC1, relaymsg::Data::new(&TO_SEND[5..10]).unwrap().into()),
3883                // Switch to sending on the second leg
3884                (CIRC2, relaymsg::ConfluxSwitch::new(4).into()),
3885                // An out of order cell!
3886                (CIRC2, relaymsg::Data::new(&TO_SEND[20..30]).unwrap().into()),
3887                // The missing cell (as indicated by seqno = 4 from the switch cell above)
3888                // is finally arriving on leg1
3889                (CIRC1, relaymsg::Data::new(&TO_SEND[10..20]).unwrap().into()),
3890                (CIRC2, relaymsg::Data::new(&TO_SEND[30..40]).unwrap().into()),
3891                (CIRC2, relaymsg::Data::new(&TO_SEND[40..]).unwrap().into()),
3892            ];
3893
3894            //  Leg 1 (CIRC1): ---------------- D  ------D* --- S(seqno = 3) -- D - D ---------------------------- C
3895            //                                  |        |          |           |   |                              | \
3896            //                                  |        |          |           |   |                              |  v
3897            //                                  |        |          |           |   |                              |  client
3898            //                                  |        |          |           |   |                              |  ^
3899            //                                  |        |          |           |   |                              | /
3900            //  Leg 2 (CIRC2): E - S(seqno = 2) \ -- D --\----------\---------- \ --\--- D* -- D* - S(seqno = 3) --/
3901            //                 |        |       |    |   |          |           |   |    |     |         |         |
3902            //                 |        |       |    |   |          |           |   |    |     |         |         |
3903            //                 |        |       |    |   |          |           |   |    |     |         |         |
3904            //  Time:          t11      t10     t9   t8  t7         t6          t5  t4   t3    t2        t1        t0
3905            //  =====================================================================================================
3906            //  Leg 1 LSR:      8        8      8 7  7   7          6           3   2    1      1        1         1
3907            //  Leg 2 LSR:      9        8      6 6  6   5          5           5   5    5      4        3         0
3908            //  LSD:            9        8      8 7  6   5          5       5   3   2    1      1        1         1
3909            //                                    ^ OOO cell is delivered   ^ the OOO cells are delivered to the stream
3910            //
3911            //
3912            //  (LSR = last seq received, LSD = last seq delivered, both from the client's POV)
3913            //
3914            //
3915            // The client keeps track of the `last_seqno_received` (LSR) on each leg.
3916            // This is incremented for each cell that counts towards the seqnos (BEGIN, DATA, etc.)
3917            // that is received on the leg. The client also tracks the `last_seqno_delivered` (LSD),
3918            // which is the seqno of the last cell delivered to a stream
3919            // (this is global for the whole tunnel, whereas the LSR is different for each leg).
3920            //
3921            // When switching to leg `N`, the seqno in the switch is, from the POV of the sender,
3922            // the delta between the absolute seqno (i.e. the total number of cells[^1] sent)
3923            // and the value of this absolute seqno when leg `N` was last used.
3924            //
3925            // At the time of the first SWITCH from `t1`, the exit "sent" 3 cells:
3926            // a `CONNECTED` cell, which was received by the client at `t0`, and 2 `DATA` cells that
3927            // haven't been received yet. At this point, the exit decides to switch to leg 2,
3928            // on which it hasn't sent any cells yet, so the seqno is set to `3 - 0 = 3`.
3929            //
3930            // At `t6` when the exit sends the second switch (leg 2 -> leg 1), has "sent" 6 cells
3931            // (`C` plus the data cells that are received at `t1 - 5` and `t8`.
3932            // The seqno is `6 - 3 = 3`, because when it last sent on leg 1,
3933            // the absolute seqno was `3`.
3934            //
3935            // At `t10`, the absolute seqno is 8 (8 qualifying cells have been sent so far).
3936            // When the exit last sent on leg 2 (which we are switching to),
3937            // the absolute seqno was `6`, so the `SWITCH` cell will have `8 - 6 = 2` as the seqno.
3938            //
3939            // [^1]: only counting the cells that count towards sequence numbers
3940            let multiple_switches = vec![
3941                // Immediately switch to sending on the second leg
3942                // (indicating that we've already sent 3 cells (including the CONNECTED)
3943                (CIRC2, relaymsg::ConfluxSwitch::new(3).into()),
3944                // Two out of order cells!
3945                (CIRC2, relaymsg::Data::new(&TO_SEND[15..20]).unwrap().into()),
3946                (CIRC2, relaymsg::Data::new(&TO_SEND[20..30]).unwrap().into()),
3947                // The missing cells finally arrive on the first leg
3948                (CIRC1, relaymsg::Data::new(&TO_SEND[0..10]).unwrap().into()),
3949                (CIRC1, relaymsg::Data::new(&TO_SEND[10..15]).unwrap().into()),
3950                // Switch back to the first leg
3951                (CIRC1, relaymsg::ConfluxSwitch::new(3).into()),
3952                // OOO cell
3953                (CIRC1, relaymsg::Data::new(&TO_SEND[31..40]).unwrap().into()),
3954                // Missing cell is received
3955                (CIRC2, relaymsg::Data::new(&TO_SEND[30..31]).unwrap().into()),
3956                // The remaining cells are in-order
3957                (CIRC1, relaymsg::Data::new(&TO_SEND[40..]).unwrap().into()),
3958                // Switch right after we've sent all the data we had to send
3959                (CIRC2, relaymsg::ConfluxSwitch::new(2).into()),
3960            ];
3961
3962            // TODO: give these tests the ability to control when END cells are sent
3963            // (currently we have ensure the is_sending_leg is set to true
3964            // on the leg that ends up sending the last data cell).
3965            //
3966            // TODO: test the edge cases
3967            let tests = [simple_switch, multiple_switches];
3968
3969            for cells_to_send in tests {
3970                let tunnel = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
3971                assert_eq!(tunnel.circs.len(), 2);
3972                let circ_ids = [tunnel.circs[0].unique_id, tunnel.circs[1].unique_id];
3973                let cells_to_send = cells_to_send
3974                    .into_iter()
3975                    .map(|(i, cell)| (circ_ids[i], cell))
3976                    .collect();
3977
3978                // The client won't be sending any DATA cells on this stream
3979                let send_data = vec![];
3980                let stream_state = run_multipath_exit_to_client_test(
3981                    rt.clone(),
3982                    tunnel,
3983                    cells_to_send,
3984                    send_data.clone(),
3985                    TO_SEND.into(),
3986                )
3987                .await;
3988                let stream_state = stream_state.lock().unwrap();
3989                assert!(stream_state.begin_recvd);
3990                // We don't expect the client to have sent anything
3991                assert!(stream_state.data_recvd.is_empty());
3992            }
3993        });
3994    }
3995
3996    #[traced_test]
3997    #[test]
3998    #[cfg(all(feature = "conflux", feature = "hs-service"))]
3999    fn conflux_incoming_stream() {
4000        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
4001            use std::error::Error as _;
4002
4003            const EXPECTED_HOP: u8 = 1;
4004
4005            let TestTunnelCtx {
4006                tunnel,
4007                circs,
4008                conflux_link_rx,
4009            } = setup_good_conflux_tunnel(&rt, build_cc_vegas_params()).await;
4010
4011            let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
4012
4013            let link = await_link_payload(&mut circ1.chan_rx).await;
4014            for circ in [&mut circ1, &mut circ2] {
4015                let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
4016                circ.circ_tx
4017                    .send(rmsg_to_ccmsg(None, linked))
4018                    .await
4019                    .unwrap();
4020            }
4021
4022            let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
4023            assert!(conflux_hs_res.iter().all(|res| res.is_ok()));
4024
4025            // TODO(#2002): we don't currently support conflux for onion services
4026            let err = tunnel
4027                .allow_stream_requests(
4028                    &[tor_cell::relaycell::RelayCmd::BEGIN],
4029                    (tunnel.circ.unique_id(), EXPECTED_HOP.into()).into(),
4030                    AllowAllStreamsFilter,
4031                )
4032                .await
4033                // IncomingStream doesn't impl Debug, so we need to map to a different type
4034                .map(|_| ())
4035                .unwrap_err();
4036
4037            let err_src = err.source().unwrap().to_string();
4038            assert!(
4039                err_src.contains("Cannot allow stream requests on a multi-path tunnel"),
4040                "{err_src}"
4041            );
4042        });
4043    }
4044
4045    #[test]
4046    fn client_circ_chan_msg() {
4047        use tor_cell::chancell::msg::{self, AnyChanMsg};
4048        fn good(m: AnyChanMsg) {
4049            assert!(ClientCircChanMsg::try_from(m).is_ok());
4050        }
4051        fn bad(m: AnyChanMsg) {
4052            assert!(ClientCircChanMsg::try_from(m).is_err());
4053        }
4054
4055        good(msg::Destroy::new(2.into()).into());
4056        bad(msg::CreatedFast::new(&b"guaranteed in this world"[..]).into());
4057        bad(msg::Created2::new(&b"and the next"[..]).into());
4058        good(msg::Relay::new(&b"guaranteed guaranteed"[..]).into());
4059        bad(msg::AnyChanMsg::RelayEarly(
4060            msg::Relay::new(&b"for the world and its mother"[..]).into(),
4061        ));
4062        bad(msg::Versions::new([1, 2, 3]).unwrap().into());
4063    }
4064}