tor_circmgr/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![doc = include_str!("../README.md")]
3// @@ begin lint list maintained by maint/add_warning @@
4#![allow(renamed_and_removed_lints)] // @@REMOVE_WHEN(ci_arti_stable)
5#![allow(unknown_lints)] // @@REMOVE_WHEN(ci_arti_nightly)
6#![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_time_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness
39#![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945
41#![allow(clippy::result_large_err)] // temporary workaround for arti#587
42#![allow(clippy::needless_raw_string_hashes)] // complained-about code is fine, often best
43#![allow(clippy::needless_lifetimes)] // See arti#1765
44#![allow(mismatched_lifetime_syntaxes)] // temporary workaround for arti#2060
45//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
46
47// TODO #1645 (either remove this, or decide to have it everywhere)
48#![cfg_attr(not(all(feature = "full", feature = "experimental")), allow(unused))]
49
50use build::TunnelBuilder;
51use mgr::{AbstractTunnel, AbstractTunnelBuilder};
52use tor_basic_utils::retry::RetryDelay;
53use tor_chanmgr::ChanMgr;
54use tor_dircommon::fallback::FallbackList;
55use tor_error::{error_report, warn_report};
56use tor_guardmgr::RetireCircuits;
57use tor_linkspec::ChanTarget;
58use tor_netdir::{DirEvent, NetDir, NetDirProvider, Timeliness};
59use tor_proto::circuit::UniqId;
60use tor_proto::client::circuit::CircParameters;
61use tor_rtcompat::Runtime;
62
63#[cfg(any(feature = "specific-relay", feature = "hs-common"))]
64use tor_linkspec::IntoOwnedChanTarget;
65
66use futures::StreamExt;
67use std::sync::{Arc, Mutex, Weak};
68use std::time::{Duration, Instant};
69use tor_rtcompat::SpawnExt;
70use tracing::{debug, info, instrument, trace, warn};
71
72#[cfg(feature = "testing")]
73pub use config::test_config::TestConfig;
74
75pub mod build;
76mod config;
77mod err;
78#[cfg(feature = "hs-common")]
79pub mod hspool;
80mod impls;
81pub mod isolation;
82mod mgr;
83#[cfg(test)]
84mod mocks;
85mod preemptive;
86pub mod timeouts;
87mod tunnel;
88mod usage;
89
90// Can't apply `visibility` to modules.
91cfg_if::cfg_if! {
92    if #[cfg(feature = "experimental-api")] {
93        pub mod path;
94    } else {
95        pub(crate) mod path;
96    }
97}
98
99pub use err::Error;
100pub use isolation::IsolationToken;
101pub use tor_guardmgr::{ClockSkewEvents, GuardMgrConfig, SkewEstimate};
102pub use tunnel::{
103    ClientDataTunnel, ClientDirTunnel, ClientOnionServiceDataTunnel, ClientOnionServiceDirTunnel,
104    ClientOnionServiceIntroTunnel, ServiceOnionServiceDataTunnel, ServiceOnionServiceDirTunnel,
105    ServiceOnionServiceIntroTunnel,
106};
107#[cfg(feature = "conflux")]
108pub use tunnel::{
109    ClientMultiPathDataTunnel, ClientMultiPathOnionServiceDataTunnel,
110    ServiceMultiPathOnionServiceDataTunnel,
111};
112pub use usage::{TargetPort, TargetPorts};
113
114pub use config::{
115    CircMgrConfig, CircuitTiming, CircuitTimingBuilder, PathConfig, PathConfigBuilder,
116    PreemptiveCircuitConfig, PreemptiveCircuitConfigBuilder,
117};
118
119use crate::isolation::StreamIsolation;
120use crate::mgr::TunnelProvenance;
121use crate::preemptive::PreemptiveCircuitPredictor;
122use usage::TargetTunnelUsage;
123
124use safelog::sensitive as sv;
125#[cfg(feature = "geoip")]
126use tor_geoip::CountryCode;
127pub use tor_guardmgr::{ExternalActivity, FirstHopId};
128use tor_persist::StateMgr;
129use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
130
131#[cfg(feature = "hs-common")]
132use crate::hspool::{HsCircKind, HsCircStemKind};
133#[cfg(all(feature = "vanguards", feature = "hs-common"))]
134use tor_guardmgr::vanguards::VanguardMgr;
135
136/// A Result type as returned from this crate.
137pub type Result<T> = std::result::Result<T, Error>;
138
139/// Type alias for dynamic StorageHandle that can handle our timeout state.
140type TimeoutStateHandle = tor_persist::DynStorageHandle<timeouts::pareto::ParetoTimeoutState>;
141
142/// Key used to load timeout state information.
143const PARETO_TIMEOUT_DATA_KEY: &str = "circuit_timeouts";
144
145/// Represents what we know about the Tor network.
146///
147/// This can either be a complete directory, or a list of fallbacks.
148///
149/// Not every DirInfo can be used to build every kind of circuit:
150/// if you try to build a path with an inadequate DirInfo, you'll get a
151/// NeedConsensus error.
152#[derive(Debug, Copy, Clone)]
153#[non_exhaustive]
154pub enum DirInfo<'a> {
155    /// A list of fallbacks, for use when we don't know a network directory.
156    Fallbacks(&'a FallbackList),
157    /// A complete network directory
158    Directory(&'a NetDir),
159    /// No information: we can only build one-hop paths: and that, only if the
160    /// guard manager knows some guards or fallbacks.
161    Nothing,
162}
163
164impl<'a> From<&'a FallbackList> for DirInfo<'a> {
165    fn from(v: &'a FallbackList) -> DirInfo<'a> {
166        DirInfo::Fallbacks(v)
167    }
168}
169impl<'a> From<&'a NetDir> for DirInfo<'a> {
170    fn from(v: &'a NetDir) -> DirInfo<'a> {
171        DirInfo::Directory(v)
172    }
173}
174impl<'a> DirInfo<'a> {
175    /// Return a set of circuit parameters for this DirInfo.
176    fn circ_params(&self, usage: &TargetTunnelUsage) -> Result<CircParameters> {
177        use tor_netdir::params::NetParameters;
178        // We use a common function for both cases here to be sure that
179        // we look at the defaults from NetParameters code.
180        let defaults = NetParameters::default();
181        let net_params = match self {
182            DirInfo::Directory(d) => d.params(),
183            _ => &defaults,
184        };
185        match usage {
186            #[cfg(feature = "hs-common")]
187            TargetTunnelUsage::HsCircBase { .. } => {
188                build::onion_circparams_from_netparams(net_params)
189            }
190            _ => build::exit_circparams_from_netparams(net_params),
191        }
192    }
193}
194
195/// A Circuit Manager (CircMgr) manages a set of circuits, returning them
196/// when they're suitable, and launching them if they don't already exist.
197///
198/// Right now, its notion of "suitable" is quite rudimentary: it just
199/// believes in two kinds of circuits: Exit circuits, and directory
200/// circuits.  Exit circuits are ones that were created to connect to
201/// a set of ports; directory circuits were made to talk to directory caches.
202///
203/// This is a "handle"; clones of it share state.
204pub struct CircMgr<R: Runtime>(Arc<CircMgrInner<build::TunnelBuilder<R>, R>>);
205
206impl<R: Runtime> CircMgr<R> {
207    /// Construct a new circuit manager.
208    ///
209    /// # Usage note
210    ///
211    /// For the manager to work properly, you will need to call `CircMgr::launch_background_tasks`.
212    pub fn new<SM, CFG: CircMgrConfig>(
213        config: &CFG,
214        storage: SM,
215        runtime: &R,
216        chanmgr: Arc<ChanMgr<R>>,
217        guardmgr: &tor_guardmgr::GuardMgr<R>,
218    ) -> Result<Self>
219    where
220        SM: tor_persist::StateMgr + Clone + Send + Sync + 'static,
221    {
222        Ok(Self(Arc::new(CircMgrInner::new(
223            config, storage, runtime, chanmgr, guardmgr,
224        )?)))
225    }
226
227    /// Return a circuit suitable for sending one-hop BEGINDIR streams,
228    /// launching it if necessary.
229    #[instrument(level = "trace", skip_all)]
230    pub async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result<ClientDirTunnel> {
231        let tunnel = self.0.get_or_launch_dir(netdir).await?;
232        Ok(tunnel.into())
233    }
234
235    /// Return a circuit suitable for exiting to all of the provided
236    /// `ports`, launching it if necessary.
237    ///
238    /// If the list of ports is empty, then the chosen circuit will
239    /// still end at _some_ exit.
240    #[instrument(level = "trace", skip_all)]
241    pub async fn get_or_launch_exit(
242        &self,
243        netdir: DirInfo<'_>, // TODO: This has to be a NetDir.
244        ports: &[TargetPort],
245        isolation: StreamIsolation,
246        // TODO GEOIP: this cannot be stabilised like this, since Cargo features need to be
247        //             additive. The function should be refactored to be builder-like.
248        #[cfg(feature = "geoip")] country_code: Option<CountryCode>,
249    ) -> Result<ClientDataTunnel> {
250        let tunnel = self
251            .0
252            .get_or_launch_exit(
253                netdir,
254                ports,
255                isolation,
256                #[cfg(feature = "geoip")]
257                country_code,
258            )
259            .await?;
260        Ok(tunnel.into())
261    }
262
263    /// Return a circuit to a specific relay, suitable for using for direct
264    /// (one-hop) directory downloads.
265    ///
266    /// This could be used, for example, to download a descriptor for a bridge.
267    #[cfg(feature = "specific-relay")]
268    #[instrument(level = "trace", skip_all)]
269    pub async fn get_or_launch_dir_specific<T: IntoOwnedChanTarget>(
270        &self,
271        target: T,
272    ) -> Result<ClientDirTunnel> {
273        let tunnel = self.0.get_or_launch_dir_specific(target).await?;
274        Ok(tunnel.into())
275    }
276
277    /// Launch the periodic daemon tasks required by the manager to function properly.
278    ///
279    /// Returns a set of [`TaskHandle`]s that can be used to manage the daemon tasks.
280    //
281    // NOTE(eta): The ?Sized on D is so we can pass a trait object in.
282    #[instrument(level = "trace", skip_all)]
283    pub fn launch_background_tasks<D, S>(
284        self: &Arc<Self>,
285        runtime: &R,
286        dir_provider: &Arc<D>,
287        state_mgr: S,
288    ) -> Result<Vec<TaskHandle>>
289    where
290        D: NetDirProvider + 'static + ?Sized,
291        S: StateMgr + std::marker::Send + 'static,
292    {
293        CircMgrInner::launch_background_tasks(&self.0.clone(), runtime, dir_provider, state_mgr)
294    }
295
296    /// Return true if `netdir` has enough information to be used for this
297    /// circuit manager.
298    ///
299    /// (This will check whether the netdir is missing any primary guard
300    /// microdescriptors)
301    pub fn netdir_is_sufficient(&self, netdir: &NetDir) -> bool {
302        self.0.netdir_is_sufficient(netdir)
303    }
304
305    /// If `circ_id` is the unique identifier for a circuit that we're
306    /// keeping track of, don't give it out for any future requests.
307    pub fn retire_circ(&self, circ_id: &UniqId) {
308        self.0.retire_circ(circ_id);
309    }
310
311    /// Record that a failure occurred on a circuit with a given guard, in a way
312    /// that makes us unwilling to use that guard for future circuits.
313    ///
314    pub fn note_external_failure(
315        &self,
316        target: &impl ChanTarget,
317        external_failure: ExternalActivity,
318    ) {
319        self.0.note_external_failure(target, external_failure);
320    }
321
322    /// Record that a success occurred on a circuit with a given guard, in a way
323    /// that makes us possibly willing to use that guard for future circuits.
324    pub fn note_external_success(
325        &self,
326        target: &impl ChanTarget,
327        external_activity: ExternalActivity,
328    ) {
329        self.0.note_external_success(target, external_activity);
330    }
331
332    /// Return a stream of events about our estimated clock skew; these events
333    /// are `None` when we don't have enough information to make an estimate,
334    /// and `Some(`[`SkewEstimate`]`)` otherwise.
335    ///
336    /// Note that this stream can be lossy: if the estimate changes more than
337    /// one before you read from the stream, you might only get the most recent
338    /// update.
339    pub fn skew_events(&self) -> ClockSkewEvents {
340        self.0.skew_events()
341    }
342
343    /// Try to change our configuration settings to `new_config`.
344    ///
345    /// The actual behavior here will depend on the value of `how`.
346    ///
347    /// Returns whether any of the circuit pools should be cleared.
348    #[instrument(level = "trace", skip_all)]
349    pub fn reconfigure<CFG: CircMgrConfig>(
350        &self,
351        new_config: &CFG,
352        how: tor_config::Reconfigure,
353    ) -> std::result::Result<RetireCircuits, tor_config::ReconfigureError> {
354        self.0.reconfigure(new_config, how)
355    }
356
357    /// Return an estimate-based delay for how long a given
358    /// [`Action`](timeouts::Action) should be allowed to complete.
359    ///
360    /// Note that **you do not need to use this function** in order to get
361    /// reasonable timeouts for the circuit-building operations provided by the
362    /// `tor-circmgr` crate: those, unless specifically noted, always use these
363    /// timeouts to cancel circuit operations that have taken too long.
364    ///
365    /// Instead, you should only use this function when you need to estimate how
366    /// long some _other_ operation should take to complete.  For example, if
367    /// you are sending a request over a 3-hop circuit and waiting for a reply,
368    /// you might choose to wait for `estimate_timeout(Action::RoundTrip {
369    /// length: 3 })`.
370    ///
371    /// Note also that this function returns a _timeout_ that the operation
372    /// should be permitted to complete, not an estimated Duration that the
373    /// operation _will_ take to complete. Timeouts are chosen to ensure that
374    /// most operations will complete, but very slow ones will not.  So even if
375    /// we expect that a circuit will complete in (say) 3 seconds, we might
376    /// still allow a timeout of 4.5 seconds, to ensure that most circuits can
377    /// complete.
378    ///
379    /// Estimate-based timeouts may change over time, given observations on the
380    /// actual amount of time needed for circuits to complete building.  If not
381    /// enough information has been gathered, a reasonable default will be used.
382    pub fn estimate_timeout(&self, timeout_action: &timeouts::Action) -> std::time::Duration {
383        self.0.estimate_timeout(timeout_action)
384    }
385
386    /// Return a reference to the associated CircuitBuilder that this CircMgr
387    /// will use to create its circuits.
388    #[cfg(feature = "experimental-api")]
389    pub fn builder(&self) -> &TunnelBuilder<R> {
390        CircMgrInner::builder(&self.0)
391    }
392}
393
394/// Internal object used to implement CircMgr, which allows for mocking.
395#[derive(Clone)]
396pub(crate) struct CircMgrInner<B: AbstractTunnelBuilder<R> + 'static, R: Runtime> {
397    /// The underlying circuit manager object that implements our behavior.
398    mgr: Arc<mgr::AbstractTunnelMgr<B, R>>,
399    /// A preemptive circuit predictor, for, uh, building circuits preemptively.
400    predictor: Arc<Mutex<PreemptiveCircuitPredictor>>,
401}
402
403impl<R: Runtime> CircMgrInner<TunnelBuilder<R>, R> {
404    /// Construct a new circuit manager.
405    ///
406    /// # Usage note
407    ///
408    /// For the manager to work properly, you will need to call `CircMgr::launch_background_tasks`.
409    #[allow(clippy::unnecessary_wraps)]
410    pub(crate) fn new<SM, CFG: CircMgrConfig>(
411        config: &CFG,
412        storage: SM,
413        runtime: &R,
414        chanmgr: Arc<ChanMgr<R>>,
415        guardmgr: &tor_guardmgr::GuardMgr<R>,
416    ) -> Result<Self>
417    where
418        SM: tor_persist::StateMgr + Clone + Send + Sync + 'static,
419    {
420        #[cfg(all(feature = "vanguards", feature = "hs-common"))]
421        let vanguardmgr = {
422            // TODO(#1382): we need a way of checking if this arti instance
423            // is running an onion service or not.
424            //
425            // Perhaps this information should be provided by CircMgrConfig.
426            let has_onion_svc = false;
427            VanguardMgr::new(
428                config.vanguard_config(),
429                runtime.clone(),
430                storage.clone(),
431                has_onion_svc,
432            )?
433        };
434
435        let storage_handle = storage.create_handle(PARETO_TIMEOUT_DATA_KEY);
436
437        let builder = build::TunnelBuilder::new(
438            runtime.clone(),
439            chanmgr,
440            config.path_rules().clone(),
441            storage_handle,
442            guardmgr.clone(),
443            #[cfg(all(feature = "vanguards", feature = "hs-common"))]
444            vanguardmgr,
445        );
446
447        Ok(Self::new_generic(config, runtime, guardmgr, builder))
448    }
449}
450
451impl<B: AbstractTunnelBuilder<R> + 'static, R: Runtime> CircMgrInner<B, R> {
452    /// Generic implementation for [`CircMgrInner::new`]
453    pub(crate) fn new_generic<CFG: CircMgrConfig>(
454        config: &CFG,
455        runtime: &R,
456        guardmgr: &tor_guardmgr::GuardMgr<R>,
457        builder: B,
458    ) -> Self {
459        let preemptive = Arc::new(Mutex::new(PreemptiveCircuitPredictor::new(
460            config.preemptive_circuits().clone(),
461        )));
462
463        guardmgr.set_filter(config.path_rules().build_guard_filter());
464
465        let mgr =
466            mgr::AbstractTunnelMgr::new(builder, runtime.clone(), config.circuit_timing().clone());
467
468        CircMgrInner {
469            mgr: Arc::new(mgr),
470            predictor: preemptive,
471        }
472    }
473
474    /// Launch the periodic daemon tasks required by the manager to function properly.
475    ///
476    /// Returns a set of [`TaskHandle`]s that can be used to manage the daemon tasks.
477    //
478    // NOTE(eta): The ?Sized on D is so we can pass a trait object in.
479    #[instrument(level = "trace", skip_all)]
480    pub(crate) fn launch_background_tasks<D, S>(
481        self: &Arc<Self>,
482        runtime: &R,
483        dir_provider: &Arc<D>,
484        state_mgr: S,
485    ) -> Result<Vec<TaskHandle>>
486    where
487        D: NetDirProvider + 'static + ?Sized,
488        S: StateMgr + std::marker::Send + 'static,
489    {
490        let mut ret = vec![];
491
492        runtime
493            .spawn(Self::keep_circmgr_params_updated(
494                dir_provider.events(),
495                Arc::downgrade(self),
496                Arc::downgrade(dir_provider),
497            ))
498            .map_err(|e| Error::from_spawn("circmgr parameter updater", e))?;
499
500        let (sched, handle) = TaskSchedule::new(runtime.clone());
501        ret.push(handle);
502
503        runtime
504            .spawn(Self::update_persistent_state(
505                sched,
506                Arc::downgrade(self),
507                state_mgr,
508            ))
509            .map_err(|e| Error::from_spawn("persistent state updater", e))?;
510
511        let (sched, handle) = TaskSchedule::new(runtime.clone());
512        ret.push(handle);
513
514        runtime
515            .spawn(Self::continually_launch_timeout_testing_circuits(
516                sched,
517                Arc::downgrade(self),
518                Arc::downgrade(dir_provider),
519            ))
520            .map_err(|e| Error::from_spawn("timeout-probe circuit launcher", e))?;
521
522        let (sched, handle) = TaskSchedule::new(runtime.clone());
523        ret.push(handle);
524
525        runtime
526            .spawn(Self::continually_preemptively_build_circuits(
527                sched,
528                Arc::downgrade(self),
529                Arc::downgrade(dir_provider),
530            ))
531            .map_err(|e| Error::from_spawn("preemptive circuit launcher", e))?;
532
533        self.mgr
534            .peek_builder()
535            .guardmgr()
536            .install_netdir_provider(&dir_provider.clone().upcast_arc())?;
537
538        #[cfg(all(feature = "vanguards", feature = "hs-common"))]
539        {
540            let () = self
541                .mgr
542                .peek_builder()
543                .vanguardmgr()
544                .launch_background_tasks(&dir_provider.clone().upcast_arc())?;
545        }
546
547        Ok(ret)
548    }
549
550    /// Return a circuit suitable for sending one-hop BEGINDIR streams,
551    /// launching it if necessary.
552    #[instrument(level = "trace", skip_all)]
553    pub(crate) async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result<Arc<B::Tunnel>> {
554        self.expire_circuits().await;
555        let usage = TargetTunnelUsage::Dir;
556        self.mgr.get_or_launch(&usage, netdir).await.map(|(c, _)| c)
557    }
558
559    /// Return a circuit suitable for exiting to all of the provided
560    /// `ports`, launching it if necessary.
561    ///
562    /// If the list of ports is empty, then the chosen circuit will
563    /// still end at _some_ exit.
564    #[instrument(level = "trace", skip_all)]
565    pub(crate) async fn get_or_launch_exit(
566        &self,
567        netdir: DirInfo<'_>, // TODO: This has to be a NetDir.
568        ports: &[TargetPort],
569        isolation: StreamIsolation,
570        // TODO GEOIP: this cannot be stabilised like this, since Cargo features need to be
571        //             additive. The function should be refactored to be builder-like.
572        #[cfg(feature = "geoip")] country_code: Option<CountryCode>,
573    ) -> Result<Arc<B::Tunnel>> {
574        self.expire_circuits().await;
575        let time = Instant::now();
576        {
577            let mut predictive = self.predictor.lock().expect("preemptive lock poisoned");
578            if ports.is_empty() {
579                predictive.note_usage(None, time);
580            } else {
581                for port in ports.iter() {
582                    predictive.note_usage(Some(*port), time);
583                }
584            }
585        }
586        let require_stability = ports.iter().any(|p| {
587            self.mgr
588                .peek_builder()
589                .path_config()
590                .long_lived_ports
591                .contains(&p.port)
592        });
593        let ports = ports.iter().map(Clone::clone).collect();
594        #[cfg(not(feature = "geoip"))]
595        let country_code = None;
596        let usage = TargetTunnelUsage::Exit {
597            ports,
598            isolation,
599            country_code,
600            require_stability,
601        };
602        self.mgr.get_or_launch(&usage, netdir).await.map(|(c, _)| c)
603    }
604
605    /// Return a circuit to a specific relay, suitable for using for direct
606    /// (one-hop) directory downloads.
607    ///
608    /// This could be used, for example, to download a descriptor for a bridge.
609    #[cfg(feature = "specific-relay")]
610    #[instrument(level = "trace", skip_all)]
611    pub(crate) async fn get_or_launch_dir_specific<T: IntoOwnedChanTarget>(
612        &self,
613        target: T,
614    ) -> Result<Arc<B::Tunnel>> {
615        self.expire_circuits().await;
616        let usage = TargetTunnelUsage::DirSpecificTarget(target.to_owned());
617        self.mgr
618            .get_or_launch(&usage, DirInfo::Nothing)
619            .await
620            .map(|(c, _)| c)
621    }
622
623    /// Try to change our configuration settings to `new_config`.
624    ///
625    /// The actual behavior here will depend on the value of `how`.
626    ///
627    /// Returns whether any of the circuit pools should be cleared.
628    #[instrument(level = "trace", skip_all)]
629    pub(crate) fn reconfigure<CFG: CircMgrConfig>(
630        &self,
631        new_config: &CFG,
632        how: tor_config::Reconfigure,
633    ) -> std::result::Result<RetireCircuits, tor_config::ReconfigureError> {
634        let old_path_rules = self.mgr.peek_builder().path_config();
635        let predictor = self.predictor.lock().expect("poisoned lock");
636        let preemptive_circuits = predictor.config();
637        if preemptive_circuits.initial_predicted_ports
638            != new_config.preemptive_circuits().initial_predicted_ports
639        {
640            // This change has no effect, since the list of ports was _initial_.
641            how.cannot_change("preemptive_circuits.initial_predicted_ports")?;
642        }
643
644        if how == tor_config::Reconfigure::CheckAllOrNothing {
645            return Ok(RetireCircuits::None);
646        }
647
648        let retire_because_of_guardmgr =
649            self.mgr.peek_builder().guardmgr().reconfigure(new_config)?;
650
651        #[cfg(all(feature = "vanguards", feature = "hs-common"))]
652        let retire_because_of_vanguardmgr = self
653            .mgr
654            .peek_builder()
655            .vanguardmgr()
656            .reconfigure(new_config.vanguard_config())?;
657
658        let new_reachable = &new_config.path_rules().reachable_addrs;
659        if new_reachable != &old_path_rules.reachable_addrs {
660            let filter = new_config.path_rules().build_guard_filter();
661            self.mgr.peek_builder().guardmgr().set_filter(filter);
662        }
663
664        let discard_all_circuits = !new_config
665            .path_rules()
666            .at_least_as_permissive_as(&old_path_rules)
667            || retire_because_of_guardmgr != tor_guardmgr::RetireCircuits::None;
668
669        #[cfg(all(feature = "vanguards", feature = "hs-common"))]
670        let discard_all_circuits = discard_all_circuits
671            || retire_because_of_vanguardmgr != tor_guardmgr::RetireCircuits::None;
672
673        self.mgr
674            .peek_builder()
675            .set_path_config(new_config.path_rules().clone());
676        self.mgr
677            .set_circuit_timing(new_config.circuit_timing().clone());
678        predictor.set_config(new_config.preemptive_circuits().clone());
679
680        if discard_all_circuits {
681            // TODO(nickm): Someday, we might want to take a more lenient approach, and only
682            // retire those circuits that do not conform to the new path rules,
683            // or do not conform to the new guard configuration.
684            info!("Path configuration has become more restrictive: retiring existing circuits.");
685            self.retire_all_circuits();
686            return Ok(RetireCircuits::All);
687        }
688        Ok(RetireCircuits::None)
689    }
690
691    /// Whenever a [`DirEvent::NewConsensus`] arrives on `events`, update
692    /// `circmgr` with the consensus parameters from `dirmgr`.
693    ///
694    /// Exit when `events` is closed, or one of `circmgr` or `dirmgr` becomes
695    /// dangling.
696    ///
697    /// This is a daemon task: it runs indefinitely in the background.
698    #[instrument(level = "trace", skip_all)]
699    async fn keep_circmgr_params_updated<D>(
700        mut events: impl futures::Stream<Item = DirEvent> + Unpin,
701        circmgr: Weak<Self>,
702        dirmgr: Weak<D>,
703    ) where
704        D: NetDirProvider + 'static + ?Sized,
705    {
706        use DirEvent::*;
707        while let Some(event) = events.next().await {
708            if matches!(event, NewConsensus) {
709                if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
710                    if let Ok(netdir) = dm.netdir(Timeliness::Timely) {
711                        cm.update_network_parameters(netdir.params());
712                    }
713                } else {
714                    debug!("Circmgr or dirmgr has disappeared; task exiting.");
715                    break;
716                }
717            }
718        }
719    }
720
721    /// Reconfigure this circuit manager using the latest set of
722    /// network parameters.
723    fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters) {
724        self.mgr.update_network_parameters(p);
725        self.mgr.peek_builder().update_network_parameters(p);
726    }
727
728    /// Run indefinitely, launching circuits as needed to get a good
729    /// estimate for our circuit build timeouts.
730    ///
731    /// Exit when we notice that `circmgr` or `dirmgr` has been dropped.
732    ///
733    /// This is a daemon task: it runs indefinitely in the background.
734    #[instrument(level = "trace", skip_all)]
735    async fn continually_launch_timeout_testing_circuits<D>(
736        mut sched: TaskSchedule<R>,
737        circmgr: Weak<Self>,
738        dirmgr: Weak<D>,
739    ) where
740        D: NetDirProvider + 'static + ?Sized,
741    {
742        while sched.next().await.is_some() {
743            if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
744                if let Ok(netdir) = dm.netdir(Timeliness::Unchecked) {
745                    if let Err(e) = cm
746                        .launch_timeout_testing_circuit_if_appropriate(&netdir)
747                        .await
748                    {
749                        warn_report!(e, "Problem launching a timeout testing circuit");
750                    }
751                    let delay = netdir
752                        .params()
753                        .cbt_testing_delay
754                        .try_into()
755                        .expect("Out-of-bounds value from BoundedInt32");
756
757                    drop((cm, dm));
758                    sched.fire_in(delay);
759                } else {
760                    // wait for the provider to announce some event, which will probably be
761                    // NewConsensus; this is therefore a decent yardstick for rechecking
762                    let _ = dm.events().next().await;
763                    sched.fire();
764                }
765            } else {
766                return;
767            }
768        }
769    }
770
771    /// If we need to launch a testing circuit to judge our circuit
772    /// build timeouts timeouts, do so.
773    ///
774    /// # Note
775    ///
776    /// This function is invoked periodically from
777    /// `continually_launch_timeout_testing_circuits`.
778    async fn launch_timeout_testing_circuit_if_appropriate(&self, netdir: &NetDir) -> Result<()> {
779        if !self.mgr.peek_builder().learning_timeouts() {
780            return Ok(());
781        }
782        // We expire any too-old circuits here, so they don't get
783        // counted towards max_circs.
784        self.expire_circuits().await;
785        let max_circs: u64 = netdir
786            .params()
787            .cbt_max_open_circuits_for_testing
788            .try_into()
789            .expect("Out-of-bounds result from BoundedInt32");
790        if (self.mgr.n_tunnels() as u64) < max_circs {
791            // Actually launch the circuit!
792            let usage = TargetTunnelUsage::TimeoutTesting;
793            let dirinfo = netdir.into();
794            let mgr = Arc::clone(&self.mgr);
795            debug!("Launching a circuit to test build times.");
796            let receiver = mgr.launch_by_usage(&usage, dirinfo)?;
797            // We don't actually care when this circuit is done,
798            // so it's okay to drop the Receiver without awaiting it.
799            drop(receiver);
800        }
801
802        Ok(())
803    }
804
805    /// Run forever, periodically telling `circmgr` to update its persistent
806    /// state.
807    ///
808    /// Exit when we notice that `circmgr` has been dropped.
809    ///
810    /// This is a daemon task: it runs indefinitely in the background.
811    #[allow(clippy::cognitive_complexity)] // because of tracing
812    #[instrument(level = "trace", skip_all)]
813    async fn update_persistent_state<S>(
814        mut sched: TaskSchedule<R>,
815        circmgr: Weak<Self>,
816        statemgr: S,
817    ) where
818        S: StateMgr + std::marker::Send,
819    {
820        while sched.next().await.is_some() {
821            if let Some(circmgr) = Weak::upgrade(&circmgr) {
822                use tor_persist::LockStatus::*;
823
824                match statemgr.try_lock() {
825                    Err(e) => {
826                        error_report!(e, "Problem with state lock file");
827                        break;
828                    }
829                    Ok(NewlyAcquired) => {
830                        info!("We now own the lock on our state files.");
831                        if let Err(e) = circmgr.upgrade_to_owned_persistent_state() {
832                            error_report!(e, "Unable to upgrade to owned state files");
833                            break;
834                        }
835                    }
836                    Ok(AlreadyHeld) => {
837                        if let Err(e) = circmgr.store_persistent_state() {
838                            error_report!(e, "Unable to flush circmgr state");
839                            break;
840                        }
841                    }
842                    Ok(NoLock) => {
843                        if let Err(e) = circmgr.reload_persistent_state() {
844                            error_report!(e, "Unable to reload circmgr state");
845                            break;
846                        }
847                    }
848                }
849            } else {
850                debug!("Circmgr has disappeared; task exiting.");
851                return;
852            }
853            // TODO(nickm): This delay is probably too small.
854            //
855            // Also, we probably don't even want a fixed delay here.  Instead,
856            // we should be updating more frequently when the data is volatile
857            // or has important info to save, and not at all when there are no
858            // changes.
859            sched.fire_in(Duration::from_secs(60));
860        }
861
862        debug!("State update task exiting (potentially due to handle drop).");
863    }
864
865    /// Switch from having an unowned persistent state to having an owned one.
866    ///
867    /// Requires that we hold the lock on the state files.
868    pub(crate) fn upgrade_to_owned_persistent_state(&self) -> Result<()> {
869        self.mgr.peek_builder().upgrade_to_owned_state()?;
870        Ok(())
871    }
872
873    /// Reload state from the state manager.
874    ///
875    /// We only call this method if we _don't_ have the lock on the state
876    /// files.  If we have the lock, we only want to save.
877    pub(crate) fn reload_persistent_state(&self) -> Result<()> {
878        self.mgr.peek_builder().reload_state()?;
879        Ok(())
880    }
881
882    /// Run indefinitely, launching circuits where the preemptive circuit
883    /// predictor thinks it'd be a good idea to have them.
884    ///
885    /// Exit when we notice that `circmgr` or `dirmgr` has been dropped.
886    ///
887    /// This is a daemon task: it runs indefinitely in the background.
888    ///
889    /// # Note
890    ///
891    /// This would be better handled entirely within `tor-circmgr`, like
892    /// other daemon tasks.
893    #[instrument(level = "trace", skip_all)]
894    async fn continually_preemptively_build_circuits<D>(
895        mut sched: TaskSchedule<R>,
896        circmgr: Weak<Self>,
897        dirmgr: Weak<D>,
898    ) where
899        D: NetDirProvider + 'static + ?Sized,
900    {
901        let base_delay = Duration::from_secs(10);
902        let mut retry = RetryDelay::from_duration(base_delay);
903
904        while sched.next().await.is_some() {
905            if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
906                if let Ok(netdir) = dm.netdir(Timeliness::Timely) {
907                    let result = cm
908                        .launch_circuits_preemptively(DirInfo::Directory(&netdir))
909                        .await;
910
911                    let delay = match result {
912                        Ok(()) => {
913                            retry.reset();
914                            base_delay
915                        }
916                        Err(_) => retry.next_delay(&mut rand::rng()),
917                    };
918
919                    sched.fire_in(delay);
920                } else {
921                    // wait for the provider to announce some event, which will probably be
922                    // NewConsensus; this is therefore a decent yardstick for rechecking
923                    let _ = dm.events().next().await;
924                    sched.fire();
925                }
926            } else {
927                return;
928            }
929        }
930    }
931
932    /// Launch circuits preemptively, using the preemptive circuit predictor's
933    /// predictions.
934    ///
935    /// # Note
936    ///
937    /// This function is invoked periodically from
938    /// `continually_preemptively_build_circuits()`.
939    #[allow(clippy::cognitive_complexity)]
940    #[instrument(level = "trace", skip_all)]
941    async fn launch_circuits_preemptively(
942        &self,
943        netdir: DirInfo<'_>,
944    ) -> std::result::Result<(), err::PreemptiveCircError> {
945        trace!("Checking preemptive circuit predictions.");
946        let (circs, threshold) = {
947            let path_config = self.mgr.peek_builder().path_config();
948            let preemptive = self.predictor.lock().expect("preemptive lock poisoned");
949            let threshold = preemptive.config().disable_at_threshold;
950            (preemptive.predict(&path_config), threshold)
951        };
952
953        if self.mgr.n_tunnels() >= threshold {
954            return Ok(());
955        }
956        let mut n_created = 0_usize;
957        let mut n_errors = 0_usize;
958
959        let futures = circs
960            .iter()
961            .map(|usage| self.mgr.get_or_launch(usage, netdir));
962        let results = futures::future::join_all(futures).await;
963        for (i, result) in results.into_iter().enumerate() {
964            match result {
965                Ok((_, TunnelProvenance::NewlyCreated)) => {
966                    debug!("Preeemptive circuit was created for {:?}", circs[i]);
967                    n_created += 1;
968                }
969                Ok((_, TunnelProvenance::Preexisting)) => {
970                    trace!("Circuit already existed created for {:?}", circs[i]);
971                }
972                Err(e) => {
973                    warn_report!(e, "Failed to build preemptive circuit {:?}", sv(&circs[i]));
974                    n_errors += 1;
975                }
976            }
977        }
978
979        if n_created > 0 || n_errors == 0 {
980            // Either we successfully made a circuit, or we didn't have any
981            // failures while looking for preexisting circuits.  Progress was
982            // made, so there's no need to back off.
983            Ok(())
984        } else {
985            // We didn't build any circuits and we hit at least one error:
986            // We'll call this unsuccessful.
987            Err(err::PreemptiveCircError)
988        }
989    }
990
991    /// Create and return a new (typically anonymous) onion circuit stem
992    /// of type `stem_kind`.
993    ///
994    /// If `circ_kind` is provided, we apply additional rules to make sure
995    /// that this will be usable as a stem for the given kind of onion service circuit.
996    /// Otherwise, we pick a stem that will probably be useful in general.
997    ///
998    /// This circuit is guaranteed not to have been used for any traffic
999    /// previously, and it will not be given out for any other requests in the
1000    /// future unless explicitly re-registered with a circuit manager.
1001    ///
1002    /// If `planned_target` is provided, then the circuit will be built so that
1003    /// it does not share any family members with the provided target.  (The
1004    /// circuit _will not be_ extended to that target itself!)
1005    ///
1006    /// Used to implement onion service clients and services.
1007    #[cfg(feature = "hs-common")]
1008    #[instrument(level = "trace", skip_all)]
1009    pub(crate) async fn launch_hs_unmanaged<T>(
1010        &self,
1011        planned_target: Option<T>,
1012        dir: &NetDir,
1013        stem_kind: HsCircStemKind,
1014        circ_kind: Option<HsCircKind>,
1015    ) -> Result<B::Tunnel>
1016    where
1017        T: IntoOwnedChanTarget,
1018    {
1019        let usage = TargetTunnelUsage::HsCircBase {
1020            compatible_with_target: planned_target.map(IntoOwnedChanTarget::to_owned),
1021            stem_kind,
1022            circ_kind,
1023        };
1024        let (_, client_circ) = self.mgr.launch_unmanaged(&usage, dir.into()).await?;
1025        Ok(client_circ)
1026    }
1027
1028    /// Return true if `netdir` has enough information to be used for this
1029    /// circuit manager.
1030    ///
1031    /// (This will check whether the netdir is missing any primary guard
1032    /// microdescriptors)
1033    pub(crate) fn netdir_is_sufficient(&self, netdir: &NetDir) -> bool {
1034        self.mgr
1035            .peek_builder()
1036            .guardmgr()
1037            .netdir_is_sufficient(netdir)
1038    }
1039
1040    /// Internal implementation for [`CircMgr::estimate_timeout`].
1041    pub(crate) fn estimate_timeout(
1042        &self,
1043        timeout_action: &timeouts::Action,
1044    ) -> std::time::Duration {
1045        let (timeout, _abandon) = self.mgr.peek_builder().estimator().timeouts(timeout_action);
1046        timeout
1047    }
1048
1049    /// Internal implementation for [`CircMgr::builder`].
1050    pub(crate) fn builder(&self) -> &B {
1051        self.mgr.peek_builder()
1052    }
1053
1054    /// Flush state to the state manager, if there is any unsaved state and
1055    /// we have the lock.
1056    ///
1057    /// Return true if we saved something; false if we didn't have the lock.
1058    pub(crate) fn store_persistent_state(&self) -> Result<bool> {
1059        self.mgr.peek_builder().save_state()
1060    }
1061
1062    /// Expire every circuit that has been dirty for too long.
1063    ///
1064    /// Expired circuits are not closed while they still have users,
1065    /// but they are no longer given out for new requests.
1066    async fn expire_circuits(&self) {
1067        // TODO: I would prefer not to call this at every request, but
1068        // it should be fine for now.  (At some point we may no longer
1069        // need this, or might not need to call it so often, now that
1070        // our circuit expiration runs on scheduled timers via
1071        // spawn_expiration_task.)
1072        let now = self.mgr.peek_runtime().now();
1073
1074        // TODO: Use return value here to implement the above TODO.
1075        let _next_expiration = self.mgr.expire_tunnels(now).await;
1076    }
1077
1078    /// Mark every circuit that we have launched so far as unsuitable for
1079    /// any future requests.  This won't close existing circuits that have
1080    /// streams attached to them, but it will prevent any future streams from
1081    /// being attached.
1082    ///
1083    /// TODO: we may want to expose this eventually.  If we do, we should
1084    /// be very clear that you don't want to use it haphazardly.
1085    pub(crate) fn retire_all_circuits(&self) {
1086        self.mgr.retire_all_tunnels();
1087    }
1088
1089    /// If `circ_id` is the unique identifier for a circuit that we're
1090    /// keeping track of, don't give it out for any future requests.
1091    pub(crate) fn retire_circ(&self, circ_id: &<B::Tunnel as AbstractTunnel>::Id) {
1092        let _ = self.mgr.take_tunnel(circ_id);
1093    }
1094
1095    /// Return a stream of events about our estimated clock skew; these events
1096    /// are `None` when we don't have enough information to make an estimate,
1097    /// and `Some(`[`SkewEstimate`]`)` otherwise.
1098    ///
1099    /// Note that this stream can be lossy: if the estimate changes more than
1100    /// one before you read from the stream, you might only get the most recent
1101    /// update.
1102    pub(crate) fn skew_events(&self) -> ClockSkewEvents {
1103        self.mgr.peek_builder().guardmgr().skew_events()
1104    }
1105
1106    /// Record that a failure occurred on a circuit with a given guard, in a way
1107    /// that makes us unwilling to use that guard for future circuits.
1108    ///
1109    pub(crate) fn note_external_failure(
1110        &self,
1111        target: &impl ChanTarget,
1112        external_failure: ExternalActivity,
1113    ) {
1114        self.mgr
1115            .peek_builder()
1116            .guardmgr()
1117            .note_external_failure(target, external_failure);
1118    }
1119
1120    /// Record that a success occurred on a circuit with a given guard, in a way
1121    /// that makes us possibly willing to use that guard for future circuits.
1122    pub(crate) fn note_external_success(
1123        &self,
1124        target: &impl ChanTarget,
1125        external_activity: ExternalActivity,
1126    ) {
1127        self.mgr
1128            .peek_builder()
1129            .guardmgr()
1130            .note_external_success(target, external_activity);
1131    }
1132}
1133
1134impl<B: AbstractTunnelBuilder<R> + 'static, R: Runtime> Drop for CircMgrInner<B, R> {
1135    fn drop(&mut self) {
1136        match self.store_persistent_state() {
1137            Ok(true) => info!("Flushed persistent state at exit."),
1138            Ok(false) => debug!("Lock not held; no state to flush."),
1139            Err(e) => error_report!(e, "Unable to flush state on circuit manager drop"),
1140        }
1141    }
1142}
1143
1144#[cfg(test)]
1145mod test {
1146    // @@ begin test lint list maintained by maint/add_warning @@
1147    #![allow(clippy::bool_assert_comparison)]
1148    #![allow(clippy::clone_on_copy)]
1149    #![allow(clippy::dbg_macro)]
1150    #![allow(clippy::mixed_attributes_style)]
1151    #![allow(clippy::print_stderr)]
1152    #![allow(clippy::print_stdout)]
1153    #![allow(clippy::single_char_pattern)]
1154    #![allow(clippy::unwrap_used)]
1155    #![allow(clippy::unchecked_time_subtraction)]
1156    #![allow(clippy::useless_vec)]
1157    #![allow(clippy::needless_pass_by_value)]
1158    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
1159    use mocks::FakeBuilder;
1160    use tor_guardmgr::GuardMgr;
1161    use tor_linkspec::OwnedChanTarget;
1162    use tor_netdir::testprovider::TestNetDirProvider;
1163    use tor_persist::TestingStateMgr;
1164
1165    use super::*;
1166
1167    #[test]
1168    fn get_params() {
1169        use tor_netdir::{MdReceiver, PartialNetDir};
1170        use tor_netdoc::doc::netstatus::NetParams;
1171        // If it's just fallbackdir, we get the default parameters.
1172        let fb = FallbackList::from([]);
1173        let di: DirInfo<'_> = (&fb).into();
1174
1175        let p1 = di.circ_params(&TargetTunnelUsage::Dir).unwrap();
1176        assert!(!p1.extend_by_ed25519_id);
1177
1178        // Now try with a directory and configured parameters.
1179        let (consensus, microdescs) = tor_netdir::testnet::construct_network().unwrap();
1180        let mut params = NetParams::default();
1181        params.set("circwindow".into(), 100);
1182        params.set("ExtendByEd25519ID".into(), 1);
1183        let mut dir = PartialNetDir::new(consensus, Some(&params));
1184        for m in microdescs {
1185            dir.add_microdesc(m);
1186        }
1187        let netdir = dir.unwrap_if_sufficient().unwrap();
1188        let di: DirInfo<'_> = (&netdir).into();
1189        let p2 = di.circ_params(&TargetTunnelUsage::Dir).unwrap();
1190        assert!(p2.extend_by_ed25519_id);
1191
1192        // Now try with a bogus circwindow value.
1193        let (consensus, microdescs) = tor_netdir::testnet::construct_network().unwrap();
1194        let mut params = NetParams::default();
1195        params.set("circwindow".into(), 100_000);
1196        params.set("ExtendByEd25519ID".into(), 1);
1197        let mut dir = PartialNetDir::new(consensus, Some(&params));
1198        for m in microdescs {
1199            dir.add_microdesc(m);
1200        }
1201        let netdir = dir.unwrap_if_sufficient().unwrap();
1202        let di: DirInfo<'_> = (&netdir).into();
1203        let p2 = di.circ_params(&TargetTunnelUsage::Dir).unwrap();
1204        assert!(p2.extend_by_ed25519_id);
1205    }
1206
1207    fn make_circmgr<R: Runtime>(runtime: R) -> Arc<CircMgrInner<FakeBuilder<R>, R>> {
1208        let config = crate::config::test_config::TestConfig::default();
1209        let statemgr = TestingStateMgr::new();
1210        let guardmgr =
1211            GuardMgr::new(runtime.clone(), statemgr.clone(), &config).expect("Create GuardMgr");
1212        let builder = FakeBuilder::new(
1213            &runtime,
1214            statemgr.clone(),
1215            &tor_guardmgr::TestConfig::default(),
1216        );
1217        let circmgr = Arc::new(CircMgrInner::new_generic(
1218            &config, &runtime, &guardmgr, builder,
1219        ));
1220        let netdir = Arc::new(TestNetDirProvider::new());
1221        CircMgrInner::launch_background_tasks(&circmgr, &runtime, &netdir, statemgr)
1222            .expect("launch CircMgrInner background tasks");
1223        circmgr
1224    }
1225
1226    #[test]
1227    #[cfg(feature = "hs-common")]
1228    fn test_launch_hs_unmanaged() {
1229        tor_rtmock::MockRuntime::test_with_various(|runtime| async move {
1230            let circmgr = make_circmgr(runtime.clone());
1231            let netdir = tor_netdir::testnet::construct_netdir()
1232                .unwrap_if_sufficient()
1233                .unwrap();
1234
1235            let (ret_tx, ret_rx) = tor_async_utils::oneshot::channel();
1236            runtime.spawn_identified("launch_hs_unamanged", async move {
1237                ret_tx
1238                    .send(
1239                        circmgr
1240                            .launch_hs_unmanaged::<OwnedChanTarget>(
1241                                None,
1242                                &netdir,
1243                                HsCircStemKind::Naive,
1244                                None,
1245                            )
1246                            .await,
1247                    )
1248                    .unwrap();
1249            });
1250            runtime.advance_by(Duration::from_millis(60)).await;
1251            ret_rx.await.unwrap().unwrap();
1252        });
1253    }
1254}