Skip to main content

tor_chanmgr/
mgr.rs

1//! Abstract implementation of a channel manager
2
3use crate::factory::BootstrapReporter;
4use crate::mgr::state::{ChannelForTarget, PendingChannelHandle};
5use crate::{ChanProvenance, ChannelConfig, ChannelUsage, Dormancy, Error, Result};
6
7use async_trait::async_trait;
8use futures::future::Shared;
9use oneshot_fused_workaround as oneshot;
10use std::result::Result as StdResult;
11use std::sync::Arc;
12use std::time::Duration;
13use tor_error::{error_report, internal};
14use tor_linkspec::{HasChanMethod, HasRelayIds};
15use tor_netdir::params::NetParameters;
16use tor_proto::channel::kist::KistParams;
17use tor_proto::channel::params::ChannelPaddingInstructionsUpdates;
18use tor_proto::memquota::{ChannelAccount, SpecificAccount as _, ToplevelAccount};
19use tracing::{instrument, trace};
20
21#[cfg(feature = "relay")]
22use {safelog::Sensitive, std::net::SocketAddr, tor_proto::RelayChannelAuthMaterial};
23
24mod select;
25mod state;
26
27/// Trait to describe as much of a
28/// [`Channel`](tor_proto::channel::Channel) as `AbstractChanMgr`
29/// needs to use.
30pub(crate) trait AbstractChannel: HasRelayIds {
31    /// Return true iff this channel is considered canonical by us.
32    fn is_canonical(&self) -> bool;
33    /// Return true if we think the peer considers this channel as canonical.
34    fn is_canonical_to_peer(&self) -> bool;
35    /// Return true if this channel is usable.
36    ///
37    /// A channel might be unusable because it is closed, because it has
38    /// hit a bug, or for some other reason.  We don't return unusable
39    /// channels back to the user.
40    fn is_usable(&self) -> bool;
41    /// Return the amount of time a channel has not been in use.
42    /// Return None if the channel is currently in use.
43    fn duration_unused(&self) -> Option<Duration>;
44
45    /// Reparameterize this channel according to the provided `ChannelPaddingInstructionsUpdates`
46    ///
47    /// The changed parameters may not be implemented "immediately",
48    /// but this will be done "reasonably soon".
49    fn reparameterize(
50        &self,
51        updates: Arc<ChannelPaddingInstructionsUpdates>,
52    ) -> tor_proto::Result<()>;
53
54    /// Update the KIST parameters.
55    ///
56    /// The changed parameters may not be implemented "immediately",
57    /// but this will be done "reasonably soon".
58    fn reparameterize_kist(&self, kist_params: KistParams) -> tor_proto::Result<()>;
59
60    /// Specify that this channel should do activities related to channel padding
61    ///
62    /// See [`Channel::engage_padding_activities`]
63    ///
64    /// [`Channel::engage_padding_activities`]: tor_proto::channel::Channel::engage_padding_activities
65    fn engage_padding_activities(&self);
66}
67
68/// Trait to describe how channels-like objects are created.
69///
70/// This differs from [`ChannelFactory`](crate::factory::ChannelFactory) in that
71/// it's a purely crate-internal type that we use to decouple the
72/// AbstractChanMgr code from actual "what is a channel" concerns.
73#[async_trait]
74pub(crate) trait AbstractChannelFactory {
75    /// The type of channel that this factory can build.
76    type Channel: AbstractChannel;
77    /// Type that explains how to build an outgoing channel.
78    type BuildSpec: HasRelayIds + HasChanMethod;
79    /// The type of byte stream that's required to build channels for incoming connections.
80    type Stream;
81
82    /// Construct a new channel to the destination described at `target`.
83    ///
84    /// This function must take care of all timeouts, error detection,
85    /// and so on.
86    ///
87    /// It should not retry; that is handled at a higher level.
88    async fn build_channel(
89        &self,
90        target: &Self::BuildSpec,
91        reporter: BootstrapReporter,
92        memquota: ChannelAccount,
93    ) -> Result<Arc<Self::Channel>>;
94
95    /// Construct a new channel for an incoming connection.
96    #[cfg(feature = "relay")]
97    async fn build_channel_using_incoming(
98        &self,
99        peer: Sensitive<std::net::SocketAddr>,
100        stream: Self::Stream,
101        memquota: ChannelAccount,
102    ) -> Result<Arc<Self::Channel>>;
103}
104
105/// This is the configuration for a [`ChanMgr`](crate::ChanMgr) given to the constructor.
106#[derive(Default)]
107pub struct ChanMgrConfig {
108    /// Channel configuration which usually comes from a configuration file.
109    pub(crate) cfg: ChannelConfig,
110    /// Relay authentication key material for relay channels.
111    #[cfg(feature = "relay")]
112    pub(crate) auth_material: Option<Arc<RelayChannelAuthMaterial>>,
113    /// Our address(es). When building outgoing channel, we need our addresses in order to send
114    /// them in the NETINFO cell. It will also be used to validate initiator channel target.
115    #[cfg(feature = "relay")]
116    pub(crate) my_addrs: Vec<SocketAddr>,
117    // TODO: Would be good to add more things such as NetParameters and Dormancy maybe?
118}
119
120impl ChanMgrConfig {
121    /// Constructor.
122    pub fn new(cfg: ChannelConfig) -> Self {
123        Self {
124            cfg,
125            #[cfg(feature = "relay")]
126            auth_material: None,
127            #[cfg(feature = "relay")]
128            my_addrs: Vec::new(),
129        }
130    }
131
132    /// Set the relay channel authentication key material and return itself.
133    #[cfg(feature = "relay")]
134    pub fn with_auth_material(mut self, auth_material: Arc<RelayChannelAuthMaterial>) -> Self {
135        self.auth_material = Some(auth_material);
136        self
137    }
138
139    /// Set our addresses that we advertise to the world.
140    #[cfg(feature = "relay")]
141    pub fn with_my_addrs(mut self, my_addrs: Vec<SocketAddr>) -> Self {
142        self.my_addrs = my_addrs;
143        self
144    }
145}
146
147/// A type- and network-agnostic implementation for [`ChanMgr`](crate::ChanMgr).
148///
149/// This type does the work of keeping track of open channels and pending
150/// channel requests, launching requests as needed, waiting for pending
151/// requests, and so forth.
152///
153/// The actual job of launching connections is deferred to an
154/// `AbstractChannelFactory` type.
155pub(crate) struct AbstractChanMgr<CF: AbstractChannelFactory> {
156    /// All internal state held by this channel manager.
157    ///
158    /// The most important part is the map from relay identity to channel, or
159    /// to pending channel status.
160    pub(crate) channels: state::MgrState<CF>,
161
162    /// A bootstrap reporter to give out when building channels.
163    pub(crate) reporter: BootstrapReporter,
164
165    /// The memory quota account that every channel will be a child of
166    pub(crate) memquota: ToplevelAccount,
167}
168
169/// Type alias for a future that we wait on to see when a pending
170/// channel is done or failed.
171type Pending = Shared<oneshot::Receiver<Result<()>>>;
172
173/// Type alias for the sender we notify when we complete a channel (or fail to
174/// complete it).
175type Sending = oneshot::Sender<Result<()>>;
176
177/// Keeps a pending launch entry and its waiters in sync.
178///
179/// Every exit path from a launch attempt must either remove the pending entry
180/// or upgrade it to an open channel, and must notify all waiters with the
181/// outcome. This guard makes cancellation and early returns follow the same
182/// cleanup path as ordinary failures.
183struct PendingLaunchGuard<'a, CF: AbstractChannelFactory> {
184    /// Channel state used to remove or upgrade the pending entry.
185    channels: &'a state::MgrState<CF>,
186    /// Handle to the pending entry, if it has not yet been removed.
187    handle: Option<PendingChannelHandle>,
188    /// Sender used to notify tasks waiting on this launch.
189    send: Option<Sending>,
190    /// Result to report to the waiters if the launch ends here.
191    result: Result<()>,
192}
193
194impl<'a, CF: AbstractChannelFactory> PendingLaunchGuard<'a, CF> {
195    /// Create a new guard for a pending launch.
196    fn new(channels: &'a state::MgrState<CF>, handle: PendingChannelHandle, send: Sending) -> Self {
197        Self {
198            channels,
199            handle: Some(handle),
200            send: Some(send),
201            result: Err(Error::RequestCancelled),
202        }
203    }
204
205    /// Record the result that should be reported to any waiters.
206    fn note_result(&mut self, result: Result<()>) {
207        self.result = result;
208    }
209
210    /// Replace the pending channel with an open one.
211    fn upgrade_pending_channel_to_open(&mut self, channel: Arc<CF::Channel>) -> Result<()> {
212        let handle = self
213            .handle
214            .take()
215            .expect("pending launch guard lost its handle before upgrade");
216        self.channels
217            .upgrade_pending_channel_to_open(handle, channel)
218    }
219}
220
221impl<'a, CF: AbstractChannelFactory> Drop for PendingLaunchGuard<'a, CF> {
222    fn drop(&mut self) {
223        if let Some(handle) = self.handle.take() {
224            if let Err(e) = self.channels.remove_pending_channel(handle) {
225                // Just log an error if we're unable to remove it, since there's
226                // nothing else we can do here, and returning the error would
227                // hide the actual error that we care about (the channel build
228                // failure).
229                #[allow(clippy::missing_docs_in_private_items)]
230                const MSG: &str = "Unable to remove the pending channel";
231                error_report!(internal!("{e}"), "{}", MSG);
232            }
233        }
234
235        if let Some(send) = self.send.take() {
236            // It's okay if all the receivers went away:
237            // that means that nobody was waiting for this channel.
238            let _ignore_err = send.send(self.result.clone());
239        }
240    }
241}
242
243impl<CF: AbstractChannelFactory + Clone> AbstractChanMgr<CF> {
244    /// Make a new empty channel manager.
245    pub(crate) fn new(
246        connector: CF,
247        config: ChannelConfig,
248        dormancy: Dormancy,
249        netparams: &NetParameters,
250        reporter: BootstrapReporter,
251        memquota: ToplevelAccount,
252    ) -> Self {
253        AbstractChanMgr {
254            channels: state::MgrState::new(connector, config, dormancy, netparams),
255            reporter,
256            memquota,
257        }
258    }
259
260    /// Run a function to modify the channel builder in this object.
261    #[allow(unused)]
262    pub(crate) fn with_mut_builder<F>(&self, func: F)
263    where
264        F: FnOnce(&mut CF),
265    {
266        self.channels.with_mut_builder(func);
267    }
268
269    /// Remove every unusable entry from this channel manager.
270    #[cfg(test)]
271    pub(crate) fn remove_unusable_entries(&self) -> Result<()> {
272        self.channels.remove_unusable()
273    }
274
275    /// Build a channel for an incoming stream. See
276    /// [`ChanMgr::handle_incoming`](crate::ChanMgr::handle_incoming).
277    #[cfg(feature = "relay")]
278    pub(crate) async fn handle_incoming(
279        &self,
280        src: Sensitive<std::net::SocketAddr>,
281        stream: CF::Stream,
282    ) -> Result<Arc<CF::Channel>> {
283        let chan_builder = self.channels.builder();
284        let memquota = ChannelAccount::new(&self.memquota)?;
285        let channel = chan_builder
286            .build_channel_using_incoming(src, stream, memquota)
287            .await?;
288        // Add it to our list.
289        self.channels.add_open(channel.clone())?;
290        Ok(channel)
291    }
292
293    /// Get a channel corresponding to the identities of `target`.
294    ///
295    /// If a usable channel exists with that identity, return it.
296    ///
297    /// If no such channel exists already, and none is in progress,
298    /// launch a new request using `target`.
299    ///
300    /// If no such channel exists already, but we have one that's in
301    /// progress, wait for it to succeed or fail.
302    #[instrument(skip_all, level = "trace")]
303    pub(crate) async fn get_or_launch(
304        &self,
305        target: CF::BuildSpec,
306        usage: ChannelUsage,
307    ) -> Result<(Arc<CF::Channel>, ChanProvenance)> {
308        use ChannelUsage as CU;
309
310        let chan = self.get_or_launch_internal(target).await?;
311
312        match usage {
313            CU::Dir | CU::UselessCircuit => {}
314            CU::UserTraffic => chan.0.engage_padding_activities(),
315        }
316
317        Ok(chan)
318    }
319
320    /// Get a channel whose identity is `ident` - internal implementation
321    #[allow(clippy::cognitive_complexity)]
322    #[instrument(skip_all, level = "trace")]
323    async fn get_or_launch_internal(
324        &self,
325        target: CF::BuildSpec,
326    ) -> Result<(Arc<CF::Channel>, ChanProvenance)> {
327        /// How many times do we try?
328        const N_ATTEMPTS: usize = 2;
329        let mut attempts_so_far = 0;
330        let mut final_attempt = false;
331        let mut provenance = ChanProvenance::Preexisting;
332
333        // TODO(nickm): It would be neat to use tor_retry instead.
334        let mut last_err = None;
335
336        while attempts_so_far < N_ATTEMPTS || final_attempt {
337            attempts_so_far += 1;
338
339            // For each attempt, we _first_ look at the state of the channel map
340            // to decide on an `Action`, and _then_ we execute that action.
341
342            // First, see what state we're in, and what we should do about it.
343            let action = self.choose_action(&target, final_attempt)?;
344
345            // We are done deciding on our Action! It's time act based on the
346            // Action that we chose.
347            match action {
348                // If this happens, we were trying to make one final check of our state, but
349                // we would have had to make additional attempts.
350                None => {
351                    if !final_attempt {
352                        return Err(Error::Internal(internal!(
353                            "No action returned while not on final attempt"
354                        )));
355                    }
356                    break;
357                }
358                // Easy case: we have an error or a channel to return.
359                Some(Action::Return(v)) => {
360                    trace!("Returning existing channel");
361                    return v.map(|chan| (chan, provenance));
362                }
363                // There's an in-progress channel.  Wait for it.
364                Some(Action::Wait(pend)) => {
365                    trace!("Waiting for in-progress channel");
366                    match pend.await {
367                        Ok(Ok(())) => {
368                            // We were waiting for a channel, and it succeeded, or it
369                            // got cancelled.  But it might have gotten more
370                            // identities while negotiating than it had when it was
371                            // launched, or it might have failed to get all the
372                            // identities we want. Check for this.
373                            final_attempt = true;
374                            provenance = ChanProvenance::NewlyCreated;
375                            last_err.get_or_insert(Error::RequestCancelled);
376                        }
377                        Ok(Err(e)) => {
378                            last_err = Some(e);
379                        }
380                        Err(_) => {
381                            last_err =
382                                Some(Error::Internal(internal!("channel build task disappeared")));
383                        }
384                    }
385                }
386                // We need to launch a channel.
387                Some(Action::Launch((handle, send))) => {
388                    trace!("Launching channel");
389                    let connector = self.channels.builder();
390                    let mut launch = PendingLaunchGuard::new(&self.channels, handle, send);
391                    let memquota = match ChannelAccount::new(&self.memquota) {
392                        Ok(memquota) => memquota,
393                        Err(e) => {
394                            let e: Error = e.into();
395                            launch.note_result(Err(e.clone()));
396                            return Err(e);
397                        }
398                    };
399
400                    let outcome = connector
401                        .build_channel(&target, self.reporter.clone(), memquota)
402                        .await;
403
404                    match outcome {
405                        Ok(ref chan) => {
406                            // Replace the pending channel with the newly built channel.
407                            match launch.upgrade_pending_channel_to_open(Arc::clone(chan)) {
408                                Ok(()) => launch.note_result(Ok(())),
409                                Err(e) => {
410                                    launch.note_result(Err(e.clone()));
411                                    return Err(e);
412                                }
413                            }
414                        }
415                        Err(_) => {
416                            launch.note_result(outcome.clone().map(|_| ()));
417                        }
418                    }
419
420                    match outcome {
421                        Ok(chan) => {
422                            return Ok((chan, ChanProvenance::NewlyCreated));
423                        }
424                        Err(e) => last_err = Some(e),
425                    }
426                }
427            }
428
429            // End of this attempt. We will try again...
430        }
431
432        Err(last_err.unwrap_or_else(|| Error::Internal(internal!("no error was set!?"))))
433    }
434
435    /// Helper: based on our internal state, decide which action to take when
436    /// asked for a channel, and update our internal state accordingly.
437    ///
438    /// If `final_attempt` is true, then we will not pick any action that does
439    /// not result in an immediate result. If we would pick such an action, we
440    /// instead return `Ok(None)`.  (We could instead have the caller detect
441    /// such actions, but it's less efficient to construct them, insert them,
442    /// and immediately revert them.)
443    #[instrument(skip_all, level = "trace")]
444    fn choose_action(
445        &self,
446        target: &CF::BuildSpec,
447        final_attempt: bool,
448    ) -> Result<Option<Action<CF::Channel>>> {
449        // don't create new channels on the final attempt
450        let response = self.channels.request_channel(
451            target,
452            /* add_new_entry_if_not_found= */ !final_attempt,
453        );
454
455        match response {
456            Ok(Some(ChannelForTarget::Open(channel))) => Ok(Some(Action::Return(Ok(channel)))),
457            Ok(Some(ChannelForTarget::Pending(pending))) => {
458                if !final_attempt {
459                    Ok(Some(Action::Wait(pending)))
460                } else {
461                    // don't return a pending channel on the final attempt
462                    Ok(None)
463                }
464            }
465            Ok(Some(ChannelForTarget::NewEntry((handle, send)))) => {
466                // do not drop the handle if refactoring; see `PendingChannelHandle` for details
467                Ok(Some(Action::Launch((handle, send))))
468            }
469            Ok(None) => Ok(None),
470            Err(e @ Error::IdentityConflict) => Ok(Some(Action::Return(Err(e)))),
471            Err(e) => Err(e),
472        }
473    }
474
475    /// Update the netdir
476    pub(crate) fn update_netparams(
477        &self,
478        netparams: Arc<dyn AsRef<NetParameters>>,
479    ) -> StdResult<(), tor_error::Bug> {
480        self.channels.reconfigure_general(None, None, netparams)
481    }
482
483    /// Notifies the chanmgr to be dormant like dormancy
484    pub(crate) fn set_dormancy(
485        &self,
486        dormancy: Dormancy,
487        netparams: Arc<dyn AsRef<NetParameters>>,
488    ) -> StdResult<(), tor_error::Bug> {
489        self.channels
490            .reconfigure_general(None, Some(dormancy), netparams)
491    }
492
493    /// Reconfigure all channels
494    pub(crate) fn reconfigure(
495        &self,
496        config: &ChannelConfig,
497        netparams: Arc<dyn AsRef<NetParameters>>,
498    ) -> StdResult<(), tor_error::Bug> {
499        self.channels
500            .reconfigure_general(Some(config), None, netparams)
501    }
502
503    /// Expire any channels that have been unused longer than
504    /// their maximum unused duration assigned during creation.
505    ///
506    /// Return a duration from now until next channel expires.
507    ///
508    /// If all channels are in use or there are no open channels,
509    /// return 180 seconds which is the minimum value of
510    /// max_unused_duration.
511    pub(crate) fn expire_channels(&self) -> Duration {
512        self.channels.expire_channels()
513    }
514
515    /// Test only: return the open usable channels with a given `ident`.
516    #[cfg(test)]
517    pub(crate) fn get_nowait<'a, T>(&self, ident: T) -> Vec<Arc<CF::Channel>>
518    where
519        T: Into<tor_linkspec::RelayIdRef<'a>>,
520    {
521        use state::ChannelState::*;
522        self.channels
523            .with_channels(|channel_map| {
524                channel_map
525                    .by_id(ident)
526                    .filter_map(|entry| match entry {
527                        Open(ent) if ent.channel.is_usable() => Some(Arc::clone(&ent.channel)),
528                        _ => None,
529                    })
530                    .collect()
531            })
532            .expect("Poisoned lock")
533    }
534}
535
536/// Possible actions that we'll decide to take when asked for a channel.
537#[allow(clippy::large_enum_variant)]
538enum Action<C: AbstractChannel> {
539    /// We found no channel.  We're going to launch a new one,
540    /// then tell everybody about it.
541    Launch((PendingChannelHandle, Sending)),
542    /// We found an in-progress attempt at making a channel.
543    /// We're going to wait for it to finish.
544    Wait(Pending),
545    /// We found a usable channel.  We're going to return it.
546    Return(Result<Arc<C>>),
547}
548
549#[cfg(test)]
550mod test {
551    // @@ begin test lint list maintained by maint/add_warning @@
552    #![allow(clippy::bool_assert_comparison)]
553    #![allow(clippy::clone_on_copy)]
554    #![allow(clippy::dbg_macro)]
555    #![allow(clippy::mixed_attributes_style)]
556    #![allow(clippy::print_stderr)]
557    #![allow(clippy::print_stdout)]
558    #![allow(clippy::single_char_pattern)]
559    #![allow(clippy::unwrap_used)]
560    #![allow(clippy::unchecked_time_subtraction)]
561    #![allow(clippy::useless_vec)]
562    #![allow(clippy::needless_pass_by_value)]
563    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
564    use super::*;
565    use crate::Error;
566
567    use futures::{join, poll};
568    use std::error::Error as StdError;
569    use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
570    use std::sync::Arc;
571    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
572    use std::time::Duration;
573    use tor_error::bad_api_usage;
574    use tor_linkspec::ChannelMethod;
575    use tor_llcrypto::pk::ed25519::Ed25519Identity;
576    use tor_memquota::ArcMemoryQuotaTrackerExt as _;
577
578    use crate::ChannelUsage as CU;
579    use tor_rtcompat::{Runtime, task::yield_now, test_with_one_runtime};
580
581    // Two distinct addresses we can use in tests.
582    const ADDR_A: SocketAddr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(1, 1, 1, 1), 443));
583    const ADDR_B: SocketAddr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(2, 2, 2, 2), 443));
584
585    #[derive(Clone)]
586    struct FakeChannelFactory<RT> {
587        runtime: RT,
588        build_attempts: Arc<AtomicUsize>,
589    }
590
591    #[derive(Clone, Debug)]
592    struct FakeChannel {
593        ed_ident: Ed25519Identity,
594        mood: char,
595        closing: Arc<AtomicBool>,
596        detect_reuse: Arc<char>,
597        // last_params: Option<ChannelPaddingInstructionsUpdates>,
598    }
599
600    impl PartialEq for FakeChannel {
601        fn eq(&self, other: &Self) -> bool {
602            Arc::ptr_eq(&self.detect_reuse, &other.detect_reuse)
603        }
604    }
605
606    impl AbstractChannel for FakeChannel {
607        fn is_canonical(&self) -> bool {
608            unimplemented!()
609        }
610        fn is_canonical_to_peer(&self) -> bool {
611            unimplemented!()
612        }
613        fn is_usable(&self) -> bool {
614            !self.closing.load(Ordering::SeqCst)
615        }
616        fn duration_unused(&self) -> Option<Duration> {
617            None
618        }
619        fn reparameterize(
620            &self,
621            _updates: Arc<ChannelPaddingInstructionsUpdates>,
622        ) -> tor_proto::Result<()> {
623            // *self.last_params.lock().unwrap() = Some((*updates).clone());
624            match self.mood {
625                // Build succeeds, but installing the channel into the manager fails.
626                'r' => Err(tor_proto::Error::ChanProto(
627                    "synthetic reparameterize failure".into(),
628                )),
629                _ => Ok(()),
630            }
631        }
632        fn reparameterize_kist(&self, _kist_params: KistParams) -> tor_proto::Result<()> {
633            Ok(())
634        }
635        fn engage_padding_activities(&self) {}
636    }
637
638    impl HasRelayIds for FakeChannel {
639        fn identity(
640            &self,
641            key_type: tor_linkspec::RelayIdType,
642        ) -> Option<tor_linkspec::RelayIdRef<'_>> {
643            match key_type {
644                tor_linkspec::RelayIdType::Ed25519 => Some((&self.ed_ident).into()),
645                _ => None,
646            }
647        }
648    }
649
650    impl FakeChannel {
651        fn start_closing(&self) {
652            self.closing.store(true, Ordering::SeqCst);
653        }
654    }
655
656    impl<RT: Runtime> FakeChannelFactory<RT> {
657        fn new(runtime: RT, build_attempts: Arc<AtomicUsize>) -> Self {
658            FakeChannelFactory {
659                runtime,
660                build_attempts,
661            }
662        }
663    }
664
665    fn new_test_abstract_chanmgr<R: Runtime>(runtime: R) -> AbstractChanMgr<FakeChannelFactory<R>> {
666        new_test_abstract_chanmgr_and_build_attempts(runtime).0
667    }
668
669    fn new_test_abstract_chanmgr_and_build_attempts<R: Runtime>(
670        runtime: R,
671    ) -> (AbstractChanMgr<FakeChannelFactory<R>>, Arc<AtomicUsize>) {
672        let build_attempts = Arc::new(AtomicUsize::new(0));
673        let cf = FakeChannelFactory::new(runtime, Arc::clone(&build_attempts));
674        let mgr = AbstractChanMgr::new(
675            cf,
676            Default::default(),
677            Default::default(),
678            &Default::default(),
679            BootstrapReporter::fake(),
680            ToplevelAccount::new_noop(),
681        );
682        (mgr, build_attempts)
683    }
684
685    #[derive(Clone, Debug)]
686    struct FakeBuildSpec(u32, char, Ed25519Identity, SocketAddr);
687
688    impl HasRelayIds for FakeBuildSpec {
689        fn identity(
690            &self,
691            key_type: tor_linkspec::RelayIdType,
692        ) -> Option<tor_linkspec::RelayIdRef<'_>> {
693            match key_type {
694                tor_linkspec::RelayIdType::Ed25519 => Some((&self.2).into()),
695                _ => None,
696            }
697        }
698    }
699
700    impl HasChanMethod for FakeBuildSpec {
701        fn chan_method(&self) -> ChannelMethod {
702            ChannelMethod::Direct(vec![self.3.clone()])
703        }
704    }
705
706    /// Helper to make a fake Ed identity from a u32.
707    fn u32_to_ed(n: u32) -> Ed25519Identity {
708        let mut bytes = [0; 32];
709        bytes[0..4].copy_from_slice(&n.to_be_bytes());
710        bytes.into()
711    }
712
713    /// Return true if `needle` appears anywhere in `err`'s error chain.
714    fn error_contains(err: &Error, needle: &str) -> bool {
715        let mut source: Option<&(dyn StdError + 'static)> = Some(err);
716        while let Some(err) = source {
717            if err.to_string().contains(needle) || format!("{err:?}").contains(needle) {
718                return true;
719            }
720            source = err.source();
721        }
722        false
723    }
724
725    #[async_trait]
726    impl<RT: Runtime> AbstractChannelFactory for FakeChannelFactory<RT> {
727        type Channel = FakeChannel;
728        type BuildSpec = FakeBuildSpec;
729        type Stream = ();
730
731        async fn build_channel(
732            &self,
733            target: &Self::BuildSpec,
734            _reporter: BootstrapReporter,
735            _memquota: ChannelAccount,
736        ) -> Result<Arc<FakeChannel>> {
737            self.build_attempts.fetch_add(1, Ordering::SeqCst);
738            yield_now().await;
739            let FakeBuildSpec(ident, mood, id, _addr) = *target;
740            let ed_ident = u32_to_ed(ident);
741            assert_eq!(ed_ident, id);
742            match mood {
743                // "X" means never connect.
744                '❌' | '🔥' => return Err(Error::UnusableTarget(bad_api_usage!("emoji"))),
745                // "zzz" means wait for 15 seconds then succeed.
746                '💤' => {
747                    self.runtime.sleep(Duration::new(15, 0)).await;
748                }
749                _ => {}
750            }
751            Ok(Arc::new(FakeChannel {
752                ed_ident,
753                mood,
754                closing: Arc::new(AtomicBool::new(false)),
755                detect_reuse: Default::default(),
756                // last_params: None,
757            }))
758        }
759
760        #[cfg(feature = "relay")]
761        async fn build_channel_using_incoming(
762            &self,
763            _peer: Sensitive<std::net::SocketAddr>,
764            _stream: Self::Stream,
765            _memquota: ChannelAccount,
766        ) -> Result<Arc<Self::Channel>> {
767            unimplemented!()
768        }
769    }
770
771    #[test]
772    fn connect_one_ok() {
773        test_with_one_runtime!(|runtime| async {
774            let mgr = new_test_abstract_chanmgr(runtime);
775            let target = FakeBuildSpec(413, '!', u32_to_ed(413), ADDR_A);
776            let chan1 = mgr
777                .get_or_launch(target.clone(), CU::UserTraffic)
778                .await
779                .unwrap()
780                .0;
781            let chan2 = mgr.get_or_launch(target, CU::UserTraffic).await.unwrap().0;
782
783            assert_eq!(chan1, chan2);
784            assert_eq!(mgr.get_nowait(&u32_to_ed(413)), vec![chan1]);
785        });
786    }
787
788    #[test]
789    fn connect_one_fail() {
790        test_with_one_runtime!(|runtime| async {
791            let mgr = new_test_abstract_chanmgr(runtime);
792
793            // This is set up to always fail.
794            let target = FakeBuildSpec(999, '❌', u32_to_ed(999), ADDR_A);
795            let res1 = mgr.get_or_launch(target, CU::UserTraffic).await;
796            assert!(matches!(res1, Err(Error::UnusableTarget(_))));
797
798            assert!(mgr.get_nowait(&u32_to_ed(999)).is_empty());
799        });
800    }
801
802    #[test]
803    fn connect_different_address() {
804        test_with_one_runtime!(|runtime| async {
805            let mgr = new_test_abstract_chanmgr(runtime);
806
807            // Two targets that have different addresses.
808            let target1 = FakeBuildSpec(413, '!', u32_to_ed(413), ADDR_A);
809            let mut target2 = target1.clone();
810            target2.3 = ADDR_B;
811
812            let chan1 = mgr.get_or_launch(target1, CU::UserTraffic).await.unwrap().0;
813            let chan2 = mgr.get_or_launch(target2, CU::UserTraffic).await.unwrap().0;
814
815            // Even with different addresses, the original channel is returned.
816            assert_eq!(chan1, chan2);
817            assert_eq!(mgr.get_nowait(&u32_to_ed(413)), vec![chan1]);
818        });
819    }
820
821    #[test]
822    fn test_concurrent() {
823        test_with_one_runtime!(|runtime| async {
824            let mgr = new_test_abstract_chanmgr(runtime);
825
826            let usage = CU::UserTraffic;
827
828            // TODO(nickm): figure out how to make these actually run
829            // concurrently. Right now it seems that they don't actually
830            // interact.
831            let (ch3a, ch3b, ch44a, ch44b, ch50a, ch50b, ch86a, ch86b) = join!(
832                mgr.get_or_launch(FakeBuildSpec(3, 'a', u32_to_ed(3), ADDR_A), usage),
833                mgr.get_or_launch(FakeBuildSpec(3, 'b', u32_to_ed(3), ADDR_A), usage),
834                mgr.get_or_launch(FakeBuildSpec(44, 'a', u32_to_ed(44), ADDR_A), usage),
835                mgr.get_or_launch(FakeBuildSpec(44, 'b', u32_to_ed(44), ADDR_A), usage),
836                mgr.get_or_launch(FakeBuildSpec(50, 'a', u32_to_ed(50), ADDR_A), usage),
837                mgr.get_or_launch(FakeBuildSpec(50, 'b', u32_to_ed(50), ADDR_B), usage),
838                mgr.get_or_launch(FakeBuildSpec(86, '❌', u32_to_ed(86), ADDR_A), usage),
839                mgr.get_or_launch(FakeBuildSpec(86, '🔥', u32_to_ed(86), ADDR_A), usage),
840            );
841            let ch3a = ch3a.unwrap();
842            let ch3b = ch3b.unwrap();
843            let ch44a = ch44a.unwrap();
844            let ch44b = ch44b.unwrap();
845            let ch50a = ch50a.unwrap();
846            let ch50b = ch50b.unwrap();
847            let err_a = ch86a.unwrap_err();
848            let err_b = ch86b.unwrap_err();
849
850            assert_eq!(ch3a, ch3b);
851            assert_eq!(ch44a, ch44b);
852            assert_eq!(ch50a, ch50b);
853            assert_ne!(ch44a, ch3a);
854
855            assert!(matches!(err_a, Error::UnusableTarget(_)));
856            assert!(matches!(err_b, Error::UnusableTarget(_)));
857        });
858    }
859
860    #[test]
861    fn dropped_launch_reports_request_cancelled_to_waiters() {
862        test_with_one_runtime!(|runtime| async {
863            let mgr = new_test_abstract_chanmgr(runtime);
864            let target = FakeBuildSpec(777, '💤', u32_to_ed(777), ADDR_A);
865            let usage = CU::UserTraffic;
866
867            let mut owner1 = Box::pin(mgr.get_or_launch(target.clone(), usage));
868            assert!(poll!(&mut owner1).is_pending());
869
870            let mut waiter = Box::pin(mgr.get_or_launch(target.clone(), usage));
871            assert!(poll!(&mut waiter).is_pending());
872
873            drop(owner1);
874
875            let mut owner2 = Box::pin(mgr.get_or_launch(target, usage));
876            assert!(poll!(&mut owner2).is_pending());
877
878            assert!(poll!(&mut waiter).is_pending());
879
880            drop(owner2);
881
882            let waiter = waiter.await;
883            assert!(
884                matches!(&waiter, Err(Error::RequestCancelled)),
885                "{waiter:?}"
886            );
887            if let Err(ref err) = waiter {
888                assert!(!error_contains(err, "channel build task disappeared"));
889            }
890        });
891    }
892
893    #[test]
894    fn failed_upgrade_reports_original_error_without_owner_retry() {
895        test_with_one_runtime!(|runtime| async {
896            let (mgr, build_attempts) = new_test_abstract_chanmgr_and_build_attempts(runtime);
897            let target = FakeBuildSpec(778, 'r', u32_to_ed(778), ADDR_A);
898            let usage = CU::UserTraffic;
899
900            let mut owner = Box::pin(mgr.get_or_launch(target.clone(), usage));
901            assert!(poll!(&mut owner).is_pending());
902
903            let mut waiter = Box::pin(mgr.get_or_launch(target.clone(), usage));
904            assert!(poll!(&mut waiter).is_pending());
905
906            let owner = owner.await;
907            assert!(matches!(&owner, Err(Error::Internal(_))), "{owner:?}");
908            if let Err(ref err) = owner {
909                assert!(error_contains(err, "failure on new channel"));
910                assert!(!error_contains(err, "channel build task disappeared"));
911            }
912
913            assert_eq!(build_attempts.load(Ordering::SeqCst), 1);
914            assert!(mgr.get_nowait(&u32_to_ed(778)).is_empty());
915
916            let waiter = waiter.await;
917            assert!(matches!(&waiter, Err(Error::Internal(_))), "{waiter:?}");
918            if let Err(ref err) = waiter {
919                assert!(error_contains(err, "failure on new channel"));
920                assert!(!error_contains(err, "channel build task disappeared"));
921            }
922        });
923    }
924
925    #[test]
926    fn unusable_entries() {
927        test_with_one_runtime!(|runtime| async {
928            let mgr = new_test_abstract_chanmgr(runtime);
929
930            let (ch3, ch4, ch5) = join!(
931                mgr.get_or_launch(FakeBuildSpec(3, 'a', u32_to_ed(3), ADDR_A), CU::UserTraffic),
932                mgr.get_or_launch(FakeBuildSpec(4, 'a', u32_to_ed(4), ADDR_A), CU::UserTraffic),
933                mgr.get_or_launch(FakeBuildSpec(5, 'a', u32_to_ed(5), ADDR_A), CU::UserTraffic),
934            );
935
936            let ch3 = ch3.unwrap().0;
937            let _ch4 = ch4.unwrap();
938            let ch5 = ch5.unwrap().0;
939
940            ch3.start_closing();
941            ch5.start_closing();
942
943            let ch3_new = mgr
944                .get_or_launch(FakeBuildSpec(3, 'b', u32_to_ed(3), ADDR_A), CU::UserTraffic)
945                .await
946                .unwrap()
947                .0;
948            assert_ne!(ch3, ch3_new);
949            assert_eq!(ch3_new.mood, 'b');
950
951            mgr.remove_unusable_entries().unwrap();
952
953            assert!(!mgr.get_nowait(&u32_to_ed(3)).is_empty());
954            assert!(!mgr.get_nowait(&u32_to_ed(4)).is_empty());
955            assert!(mgr.get_nowait(&u32_to_ed(5)).is_empty());
956        });
957    }
958}