Skip to main content

tor_circmgr/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![doc = include_str!("../README.md")]
3// @@ begin lint list maintained by maint/add_warning @@
4#![allow(renamed_and_removed_lints)] // @@REMOVE_WHEN(ci_arti_stable)
5#![allow(unknown_lints)] // @@REMOVE_WHEN(ci_arti_nightly)
6#![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_time_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness
39#![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945
41#![allow(clippy::result_large_err)] // temporary workaround for arti#587
42#![allow(clippy::needless_raw_string_hashes)] // complained-about code is fine, often best
43#![allow(clippy::needless_lifetimes)] // See arti#1765
44#![allow(mismatched_lifetime_syntaxes)] // temporary workaround for arti#2060
45#![allow(clippy::collapsible_if)] // See arti#2342
46#![deny(clippy::unused_async)]
47#![deny(clippy::string_slice)] // See arti#2571
48//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
49
50// TODO #1645 (either remove this, or decide to have it everywhere)
51#![cfg_attr(not(all(feature = "full", feature = "experimental")), allow(unused))]
52
53use build::TunnelBuilder;
54use mgr::{AbstractTunnel, AbstractTunnelBuilder};
55use tor_basic_utils::retry::RetryDelay;
56use tor_chanmgr::ChanMgr;
57use tor_dircommon::fallback::FallbackList;
58use tor_error::{error_report, warn_report};
59use tor_guardmgr::RetireCircuits;
60use tor_linkspec::ChanTarget;
61use tor_netdir::{DirEvent, NetDir, NetDirProvider, Timeliness};
62use tor_proto::circuit::UniqId;
63use tor_proto::client::circuit::CircParameters;
64use tor_rtcompat::Runtime;
65
66#[cfg(any(feature = "specific-relay", feature = "hs-common"))]
67use tor_linkspec::IntoOwnedChanTarget;
68
69use futures::StreamExt;
70use std::sync::{Arc, Mutex, Weak};
71use std::time::Duration;
72use tor_rtcompat::SpawnExt;
73use tracing::{debug, info, instrument, trace, warn};
74
75#[cfg(feature = "testing")]
76pub use config::test_config::TestConfig;
77
78pub mod build;
79mod config;
80mod err;
81#[cfg(feature = "hs-common")]
82pub mod hspool;
83mod impls;
84pub mod isolation;
85mod mgr;
86#[cfg(test)]
87mod mocks;
88mod preemptive;
89pub mod timeouts;
90mod tunnel;
91mod usage;
92
93// Can't apply `visibility` to modules.
94cfg_if::cfg_if! {
95    if #[cfg(feature = "experimental-api")] {
96        pub mod path;
97    } else {
98        pub(crate) mod path;
99    }
100}
101
102pub use err::Error;
103pub use isolation::IsolationToken;
104pub use tor_guardmgr::{ClockSkewEvents, GuardMgrConfig, SkewEstimate};
105pub use tunnel::{
106    ClientDataTunnel, ClientDirTunnel, ClientOnionServiceDataTunnel, ClientOnionServiceDirTunnel,
107    ClientOnionServiceIntroTunnel, ServiceOnionServiceDataTunnel, ServiceOnionServiceDirTunnel,
108    ServiceOnionServiceIntroTunnel,
109};
110#[cfg(feature = "conflux")]
111pub use tunnel::{
112    ClientMultiPathDataTunnel, ClientMultiPathOnionServiceDataTunnel,
113    ServiceMultiPathOnionServiceDataTunnel,
114};
115pub use usage::{TargetPort, TargetPorts};
116
117pub use config::{
118    CircMgrConfig, CircuitTiming, CircuitTimingBuilder, PathConfig, PathConfigBuilder,
119    PreemptiveCircuitConfig, PreemptiveCircuitConfigBuilder,
120};
121
122use crate::isolation::StreamIsolation;
123use crate::mgr::TunnelProvenance;
124use crate::preemptive::PreemptiveCircuitPredictor;
125use usage::TargetTunnelUsage;
126
127use safelog::sensitive as sv;
128#[cfg(feature = "geoip")]
129use tor_geoip::CountryCode;
130pub use tor_guardmgr::{ExternalActivity, FirstHopId};
131use tor_persist::StateMgr;
132use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
133
134#[cfg(feature = "hs-common")]
135use crate::hspool::{HsCircKind, HsCircStemKind};
136#[cfg(all(feature = "vanguards", feature = "hs-common"))]
137use tor_guardmgr::vanguards::VanguardMgr;
138
139/// A Result type as returned from this crate.
140pub type Result<T> = std::result::Result<T, Error>;
141
142/// Type alias for dynamic StorageHandle that can handle our timeout state.
143type TimeoutStateHandle = tor_persist::DynStorageHandle<timeouts::pareto::ParetoTimeoutState>;
144
145/// Key used to load timeout state information.
146const PARETO_TIMEOUT_DATA_KEY: &str = "circuit_timeouts";
147
148/// Represents what we know about the Tor network.
149///
150/// This can either be a complete directory, or a list of fallbacks.
151///
152/// Not every DirInfo can be used to build every kind of circuit:
153/// if you try to build a path with an inadequate DirInfo, you'll get a
154/// NeedConsensus error.
155#[derive(Debug, Copy, Clone)]
156#[non_exhaustive]
157pub enum DirInfo<'a> {
158    /// A list of fallbacks, for use when we don't know a network directory.
159    Fallbacks(&'a FallbackList),
160    /// A complete network directory
161    Directory(&'a NetDir),
162    /// No information: we can only build one-hop paths: and that, only if the
163    /// guard manager knows some guards or fallbacks.
164    Nothing,
165}
166
167impl<'a> From<&'a FallbackList> for DirInfo<'a> {
168    fn from(v: &'a FallbackList) -> DirInfo<'a> {
169        DirInfo::Fallbacks(v)
170    }
171}
172impl<'a> From<&'a NetDir> for DirInfo<'a> {
173    fn from(v: &'a NetDir) -> DirInfo<'a> {
174        DirInfo::Directory(v)
175    }
176}
177impl<'a> DirInfo<'a> {
178    /// Return a set of circuit parameters for this DirInfo.
179    fn circ_params(&self, usage: &TargetTunnelUsage) -> Result<CircParameters> {
180        use tor_netdir::params::NetParameters;
181        // We use a common function for both cases here to be sure that
182        // we look at the defaults from NetParameters code.
183        let defaults = NetParameters::default();
184        let net_params = match self {
185            DirInfo::Directory(d) => d.params(),
186            _ => &defaults,
187        };
188        match usage {
189            #[cfg(feature = "hs-common")]
190            TargetTunnelUsage::HsCircBase { .. } => {
191                build::onion_circparams_from_netparams(net_params)
192            }
193            _ => build::exit_circparams_from_netparams(net_params),
194        }
195    }
196}
197
198/// A Circuit Manager (CircMgr) manages a set of circuits, returning them
199/// when they're suitable, and launching them if they don't already exist.
200///
201/// Right now, its notion of "suitable" is quite rudimentary: it just
202/// believes in two kinds of circuits: Exit circuits, and directory
203/// circuits.  Exit circuits are ones that were created to connect to
204/// a set of ports; directory circuits were made to talk to directory caches.
205///
206/// This is a "handle"; clones of it share state.
207pub struct CircMgr<R: Runtime>(Arc<CircMgrInner<build::TunnelBuilder<R>, R>>);
208
209impl<R: Runtime> CircMgr<R> {
210    /// Construct a new circuit manager.
211    ///
212    /// # Usage note
213    ///
214    /// For the manager to work properly, you will need to call `CircMgr::launch_background_tasks`.
215    pub fn new<SM, CFG: CircMgrConfig>(
216        config: &CFG,
217        storage: SM,
218        runtime: &R,
219        chanmgr: Arc<ChanMgr<R>>,
220        guardmgr: &tor_guardmgr::GuardMgr<R>,
221    ) -> Result<Self>
222    where
223        SM: tor_persist::StateMgr + Clone + Send + Sync + 'static,
224    {
225        Ok(Self(Arc::new(CircMgrInner::new(
226            config, storage, runtime, chanmgr, guardmgr,
227        )?)))
228    }
229
230    /// Return a circuit suitable for sending one-hop BEGINDIR streams,
231    /// launching it if necessary.
232    #[instrument(level = "trace", skip_all)]
233    pub async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result<ClientDirTunnel> {
234        let tunnel = self.0.get_or_launch_dir(netdir).await?;
235        Ok(tunnel.into())
236    }
237
238    /// Return a circuit suitable for exiting to all of the provided
239    /// `ports`, launching it if necessary.
240    ///
241    /// If the list of ports is empty, then the chosen circuit will
242    /// still end at _some_ exit.
243    #[instrument(level = "trace", skip_all)]
244    pub async fn get_or_launch_exit(
245        &self,
246        netdir: DirInfo<'_>, // TODO: This has to be a NetDir.
247        ports: &[TargetPort],
248        isolation: StreamIsolation,
249        // TODO GEOIP: this cannot be stabilised like this, since Cargo features need to be
250        //             additive. The function should be refactored to be builder-like.
251        #[cfg(feature = "geoip")] country_code: Option<CountryCode>,
252    ) -> Result<ClientDataTunnel> {
253        let tunnel = self
254            .0
255            .get_or_launch_exit(
256                netdir,
257                ports,
258                isolation,
259                #[cfg(feature = "geoip")]
260                country_code,
261            )
262            .await?;
263        Ok(tunnel.into())
264    }
265
266    /// Return a circuit to a specific relay, suitable for using for direct
267    /// (one-hop) directory downloads.
268    ///
269    /// This could be used, for example, to download a descriptor for a bridge.
270    #[cfg(feature = "specific-relay")]
271    #[instrument(level = "trace", skip_all)]
272    pub async fn get_or_launch_dir_specific<T: IntoOwnedChanTarget>(
273        &self,
274        target: T,
275    ) -> Result<ClientDirTunnel> {
276        let tunnel = self.0.get_or_launch_dir_specific(target).await?;
277        Ok(tunnel.into())
278    }
279
280    /// Launch the periodic daemon tasks required by the manager to function properly.
281    ///
282    /// Returns a set of [`TaskHandle`]s that can be used to manage the daemon tasks.
283    //
284    // NOTE(eta): The ?Sized on D is so we can pass a trait object in.
285    #[instrument(level = "trace", skip_all)]
286    pub fn launch_background_tasks<D, S>(
287        self: &Arc<Self>,
288        runtime: &R,
289        dir_provider: &Arc<D>,
290        state_mgr: S,
291    ) -> Result<Vec<TaskHandle>>
292    where
293        D: NetDirProvider + 'static + ?Sized,
294        S: StateMgr + std::marker::Send + 'static,
295    {
296        CircMgrInner::launch_background_tasks(&self.0.clone(), runtime, dir_provider, state_mgr)
297    }
298
299    /// Return true if `netdir` has enough information to be used for this
300    /// circuit manager.
301    ///
302    /// (This will check whether the netdir is missing any primary guard
303    /// microdescriptors)
304    pub fn netdir_is_sufficient(&self, netdir: &NetDir) -> bool {
305        self.0.netdir_is_sufficient(netdir)
306    }
307
308    /// If `circ_id` is the unique identifier for a circuit that we're
309    /// keeping track of, don't give it out for any future requests.
310    pub fn retire_circ(&self, circ_id: &UniqId) {
311        self.0.retire_circ(circ_id);
312    }
313
314    /// Record that a failure occurred on a circuit with a given guard, in a way
315    /// that makes us unwilling to use that guard for future circuits.
316    ///
317    pub fn note_external_failure(
318        &self,
319        target: &impl ChanTarget,
320        external_failure: ExternalActivity,
321    ) {
322        self.0.note_external_failure(target, external_failure);
323    }
324
325    /// Record that a success occurred on a circuit with a given guard, in a way
326    /// that makes us possibly willing to use that guard for future circuits.
327    pub fn note_external_success(
328        &self,
329        target: &impl ChanTarget,
330        external_activity: ExternalActivity,
331    ) {
332        self.0.note_external_success(target, external_activity);
333    }
334
335    /// Return a stream of events about our estimated clock skew; these events
336    /// are `None` when we don't have enough information to make an estimate,
337    /// and `Some(`[`SkewEstimate`]`)` otherwise.
338    ///
339    /// Note that this stream can be lossy: if the estimate changes more than
340    /// one before you read from the stream, you might only get the most recent
341    /// update.
342    pub fn skew_events(&self) -> ClockSkewEvents {
343        self.0.skew_events()
344    }
345
346    /// Try to change our configuration settings to `new_config`.
347    ///
348    /// The actual behavior here will depend on the value of `how`.
349    ///
350    /// Returns whether any of the circuit pools should be cleared.
351    #[instrument(level = "trace", skip_all)]
352    pub fn reconfigure<CFG: CircMgrConfig>(
353        &self,
354        new_config: &CFG,
355        how: tor_config::Reconfigure,
356    ) -> std::result::Result<RetireCircuits, tor_config::ReconfigureError> {
357        self.0.reconfigure(new_config, how)
358    }
359
360    /// Return an estimate-based delay for how long a given
361    /// [`Action`](timeouts::Action) should be allowed to complete.
362    ///
363    /// Note that **you do not need to use this function** in order to get
364    /// reasonable timeouts for the circuit-building operations provided by the
365    /// `tor-circmgr` crate: those, unless specifically noted, always use these
366    /// timeouts to cancel circuit operations that have taken too long.
367    ///
368    /// Instead, you should only use this function when you need to estimate how
369    /// long some _other_ operation should take to complete.  For example, if
370    /// you are sending a request over a 3-hop circuit and waiting for a reply,
371    /// you might choose to wait for `estimate_timeout(Action::RoundTrip {
372    /// length: 3 })`.
373    ///
374    /// Note also that this function returns a _timeout_ that the operation
375    /// should be permitted to complete, not an estimated Duration that the
376    /// operation _will_ take to complete. Timeouts are chosen to ensure that
377    /// most operations will complete, but very slow ones will not.  So even if
378    /// we expect that a circuit will complete in (say) 3 seconds, we might
379    /// still allow a timeout of 4.5 seconds, to ensure that most circuits can
380    /// complete.
381    ///
382    /// Estimate-based timeouts may change over time, given observations on the
383    /// actual amount of time needed for circuits to complete building.  If not
384    /// enough information has been gathered, a reasonable default will be used.
385    pub fn estimate_timeout(&self, timeout_action: &timeouts::Action) -> std::time::Duration {
386        self.0.estimate_timeout(timeout_action)
387    }
388
389    /// Return a reference to the associated CircuitBuilder that this CircMgr
390    /// will use to create its circuits.
391    #[cfg(feature = "experimental-api")]
392    pub fn builder(&self) -> &TunnelBuilder<R> {
393        CircMgrInner::builder(&self.0)
394    }
395}
396
397/// Internal object used to implement CircMgr, which allows for mocking.
398#[derive(Clone)]
399pub(crate) struct CircMgrInner<B: AbstractTunnelBuilder<R> + 'static, R: Runtime> {
400    /// The underlying circuit manager object that implements our behavior.
401    mgr: Arc<mgr::AbstractTunnelMgr<B, R>>,
402    /// A preemptive circuit predictor, for, uh, building circuits preemptively.
403    predictor: Arc<Mutex<PreemptiveCircuitPredictor>>,
404}
405
406impl<R: Runtime> CircMgrInner<TunnelBuilder<R>, R> {
407    /// Construct a new circuit manager.
408    ///
409    /// # Usage note
410    ///
411    /// For the manager to work properly, you will need to call `CircMgr::launch_background_tasks`.
412    #[allow(clippy::unnecessary_wraps)]
413    pub(crate) fn new<SM, CFG: CircMgrConfig>(
414        config: &CFG,
415        storage: SM,
416        runtime: &R,
417        chanmgr: Arc<ChanMgr<R>>,
418        guardmgr: &tor_guardmgr::GuardMgr<R>,
419    ) -> Result<Self>
420    where
421        SM: tor_persist::StateMgr + Clone + Send + Sync + 'static,
422    {
423        #[cfg(all(feature = "vanguards", feature = "hs-common"))]
424        let vanguardmgr = {
425            // TODO(#1382): we need a way of checking if this arti instance
426            // is running an onion service or not.
427            //
428            // Perhaps this information should be provided by CircMgrConfig.
429            let has_onion_svc = false;
430            VanguardMgr::new(
431                config.vanguard_config(),
432                runtime.clone(),
433                storage.clone(),
434                has_onion_svc,
435            )?
436        };
437
438        let storage_handle = storage.create_handle(PARETO_TIMEOUT_DATA_KEY);
439
440        let builder = build::TunnelBuilder::new(
441            runtime.clone(),
442            chanmgr,
443            config.path_rules().clone(),
444            storage_handle,
445            guardmgr.clone(),
446            #[cfg(all(feature = "vanguards", feature = "hs-common"))]
447            vanguardmgr,
448        );
449
450        Ok(Self::new_generic(config, runtime, guardmgr, builder))
451    }
452}
453
454impl<B: AbstractTunnelBuilder<R> + 'static, R: Runtime> CircMgrInner<B, R> {
455    /// Generic implementation for [`CircMgrInner::new`]
456    pub(crate) fn new_generic<CFG: CircMgrConfig>(
457        config: &CFG,
458        runtime: &R,
459        guardmgr: &tor_guardmgr::GuardMgr<R>,
460        builder: B,
461    ) -> Self {
462        let preemptive = Arc::new(Mutex::new(PreemptiveCircuitPredictor::new(
463            config.preemptive_circuits().clone(),
464        )));
465
466        guardmgr.set_filter(config.path_rules().build_guard_filter());
467
468        let mgr =
469            mgr::AbstractTunnelMgr::new(builder, runtime.clone(), config.circuit_timing().clone());
470
471        CircMgrInner {
472            mgr: Arc::new(mgr),
473            predictor: preemptive,
474        }
475    }
476
477    /// Launch the periodic daemon tasks required by the manager to function properly.
478    ///
479    /// Returns a set of [`TaskHandle`]s that can be used to manage the daemon tasks.
480    //
481    // NOTE(eta): The ?Sized on D is so we can pass a trait object in.
482    #[instrument(level = "trace", skip_all)]
483    pub(crate) fn launch_background_tasks<D, S>(
484        self: &Arc<Self>,
485        runtime: &R,
486        dir_provider: &Arc<D>,
487        state_mgr: S,
488    ) -> Result<Vec<TaskHandle>>
489    where
490        D: NetDirProvider + 'static + ?Sized,
491        S: StateMgr + std::marker::Send + 'static,
492    {
493        let mut ret = vec![];
494
495        runtime
496            .spawn(Self::keep_circmgr_params_updated(
497                dir_provider.events(),
498                Arc::downgrade(self),
499                Arc::downgrade(dir_provider),
500            ))
501            .map_err(|e| Error::from_spawn("circmgr parameter updater", e))?;
502
503        let (sched, handle) = TaskSchedule::new(runtime.clone());
504        ret.push(handle);
505
506        runtime
507            .spawn(Self::update_persistent_state(
508                sched,
509                Arc::downgrade(self),
510                state_mgr,
511            ))
512            .map_err(|e| Error::from_spawn("persistent state updater", e))?;
513
514        let (sched, handle) = TaskSchedule::new(runtime.clone());
515        ret.push(handle);
516
517        runtime
518            .spawn(Self::continually_launch_timeout_testing_circuits(
519                sched,
520                Arc::downgrade(self),
521                Arc::downgrade(dir_provider),
522            ))
523            .map_err(|e| Error::from_spawn("timeout-probe circuit launcher", e))?;
524
525        let (sched, handle) = TaskSchedule::new(runtime.clone());
526        ret.push(handle);
527
528        runtime
529            .spawn(Self::continually_preemptively_build_circuits(
530                sched,
531                Arc::downgrade(self),
532                Arc::downgrade(dir_provider),
533            ))
534            .map_err(|e| Error::from_spawn("preemptive circuit launcher", e))?;
535
536        self.mgr
537            .peek_builder()
538            .guardmgr()
539            .install_netdir_provider(&dir_provider.clone().upcast_arc())?;
540
541        #[cfg(all(feature = "vanguards", feature = "hs-common"))]
542        {
543            let () = self
544                .mgr
545                .peek_builder()
546                .vanguardmgr()
547                .launch_background_tasks(&dir_provider.clone().upcast_arc())?;
548        }
549
550        Ok(ret)
551    }
552
553    /// Return a circuit suitable for sending one-hop BEGINDIR streams,
554    /// launching it if necessary.
555    #[instrument(level = "trace", skip_all)]
556    pub(crate) async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result<Arc<B::Tunnel>> {
557        self.expire_circuits().await;
558        let usage = TargetTunnelUsage::Dir;
559        self.mgr.get_or_launch(&usage, netdir).await.map(|(c, _)| c)
560    }
561
562    /// Return a circuit suitable for exiting to all of the provided
563    /// `ports`, launching it if necessary.
564    ///
565    /// If the list of ports is empty, then the chosen circuit will
566    /// still end at _some_ exit.
567    #[instrument(level = "trace", skip_all)]
568    pub(crate) async fn get_or_launch_exit(
569        &self,
570        netdir: DirInfo<'_>, // TODO: This has to be a NetDir.
571        ports: &[TargetPort],
572        isolation: StreamIsolation,
573        // TODO GEOIP: this cannot be stabilised like this, since Cargo features need to be
574        //             additive. The function should be refactored to be builder-like.
575        #[cfg(feature = "geoip")] country_code: Option<CountryCode>,
576    ) -> Result<Arc<B::Tunnel>> {
577        self.expire_circuits().await;
578        let time = self.mgr.peek_runtime().now();
579        {
580            let mut predictive = self.predictor.lock().expect("preemptive lock poisoned");
581            if ports.is_empty() {
582                predictive.note_usage(None, time);
583            } else {
584                for port in ports.iter() {
585                    predictive.note_usage(Some(*port), time);
586                }
587            }
588        }
589        let require_stability = ports.iter().any(|p| {
590            self.mgr
591                .peek_builder()
592                .path_config()
593                .long_lived_ports
594                .contains(&p.port)
595        });
596        let ports = ports.iter().map(Clone::clone).collect();
597        #[cfg(not(feature = "geoip"))]
598        let country_code = None;
599        let usage = TargetTunnelUsage::Exit {
600            ports,
601            isolation,
602            country_code,
603            require_stability,
604        };
605        self.mgr.get_or_launch(&usage, netdir).await.map(|(c, _)| c)
606    }
607
608    /// Return a circuit to a specific relay, suitable for using for direct
609    /// (one-hop) directory downloads.
610    ///
611    /// This could be used, for example, to download a descriptor for a bridge.
612    #[cfg(feature = "specific-relay")]
613    #[instrument(level = "trace", skip_all)]
614    pub(crate) async fn get_or_launch_dir_specific<T: IntoOwnedChanTarget>(
615        &self,
616        target: T,
617    ) -> Result<Arc<B::Tunnel>> {
618        self.expire_circuits().await;
619        let usage = TargetTunnelUsage::DirSpecificTarget(target.to_owned());
620        self.mgr
621            .get_or_launch(&usage, DirInfo::Nothing)
622            .await
623            .map(|(c, _)| c)
624    }
625
626    /// Try to change our configuration settings to `new_config`.
627    ///
628    /// The actual behavior here will depend on the value of `how`.
629    ///
630    /// Returns whether any of the circuit pools should be cleared.
631    #[instrument(level = "trace", skip_all)]
632    pub(crate) fn reconfigure<CFG: CircMgrConfig>(
633        &self,
634        new_config: &CFG,
635        how: tor_config::Reconfigure,
636    ) -> std::result::Result<RetireCircuits, tor_config::ReconfigureError> {
637        let old_path_rules = self.mgr.peek_builder().path_config();
638        let predictor = self.predictor.lock().expect("poisoned lock");
639        let preemptive_circuits = predictor.config();
640        if preemptive_circuits.initial_predicted_ports
641            != new_config.preemptive_circuits().initial_predicted_ports
642        {
643            // This change has no effect, since the list of ports was _initial_.
644            how.cannot_change("preemptive_circuits.initial_predicted_ports")?;
645        }
646
647        if how == tor_config::Reconfigure::CheckAllOrNothing {
648            return Ok(RetireCircuits::None);
649        }
650
651        let retire_because_of_guardmgr =
652            self.mgr.peek_builder().guardmgr().reconfigure(new_config)?;
653
654        #[cfg(all(feature = "vanguards", feature = "hs-common"))]
655        let retire_because_of_vanguardmgr = self
656            .mgr
657            .peek_builder()
658            .vanguardmgr()
659            .reconfigure(new_config.vanguard_config())?;
660
661        let new_reachable = &new_config.path_rules().reachable_addrs;
662        if new_reachable != &old_path_rules.reachable_addrs {
663            let filter = new_config.path_rules().build_guard_filter();
664            self.mgr.peek_builder().guardmgr().set_filter(filter);
665        }
666
667        let discard_all_circuits = !new_config
668            .path_rules()
669            .at_least_as_permissive_as(&old_path_rules)
670            || retire_because_of_guardmgr != tor_guardmgr::RetireCircuits::None;
671
672        #[cfg(all(feature = "vanguards", feature = "hs-common"))]
673        let discard_all_circuits = discard_all_circuits
674            || retire_because_of_vanguardmgr != tor_guardmgr::RetireCircuits::None;
675
676        self.mgr
677            .peek_builder()
678            .set_path_config(new_config.path_rules().clone());
679        self.mgr
680            .set_circuit_timing(new_config.circuit_timing().clone());
681        predictor.set_config(new_config.preemptive_circuits().clone());
682
683        if discard_all_circuits {
684            // TODO(nickm): Someday, we might want to take a more lenient approach, and only
685            // retire those circuits that do not conform to the new path rules,
686            // or do not conform to the new guard configuration.
687            info!("Path configuration has become more restrictive: retiring existing circuits.");
688            self.retire_all_circuits();
689            return Ok(RetireCircuits::All);
690        }
691        Ok(RetireCircuits::None)
692    }
693
694    /// Whenever a [`DirEvent::NewConsensus`] arrives on `events`, update
695    /// `circmgr` with the consensus parameters from `dirmgr`.
696    ///
697    /// Exit when `events` is closed, or one of `circmgr` or `dirmgr` becomes
698    /// dangling.
699    ///
700    /// This is a daemon task: it runs indefinitely in the background.
701    #[instrument(level = "trace", skip_all)]
702    async fn keep_circmgr_params_updated<D>(
703        mut events: impl futures::Stream<Item = DirEvent> + Unpin,
704        circmgr: Weak<Self>,
705        dirmgr: Weak<D>,
706    ) where
707        D: NetDirProvider + 'static + ?Sized,
708    {
709        use DirEvent::*;
710        while let Some(event) = events.next().await {
711            if matches!(event, NewConsensus) {
712                if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
713                    if let Ok(netdir) = dm.netdir(Timeliness::Timely) {
714                        cm.update_network_parameters(netdir.params());
715                    }
716                } else {
717                    debug!("Circmgr or dirmgr has disappeared; task exiting.");
718                    break;
719                }
720            }
721        }
722    }
723
724    /// Reconfigure this circuit manager using the latest set of
725    /// network parameters.
726    fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters) {
727        self.mgr.update_network_parameters(p);
728        self.mgr.peek_builder().update_network_parameters(p);
729    }
730
731    /// Run indefinitely, launching circuits as needed to get a good
732    /// estimate for our circuit build timeouts.
733    ///
734    /// Exit when we notice that `circmgr` or `dirmgr` has been dropped.
735    ///
736    /// This is a daemon task: it runs indefinitely in the background.
737    #[instrument(level = "trace", skip_all)]
738    async fn continually_launch_timeout_testing_circuits<D>(
739        mut sched: TaskSchedule<R>,
740        circmgr: Weak<Self>,
741        dirmgr: Weak<D>,
742    ) where
743        D: NetDirProvider + 'static + ?Sized,
744    {
745        while sched.next().await.is_some() {
746            if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
747                if let Ok(netdir) = dm.netdir(Timeliness::Unchecked) {
748                    if let Err(e) = cm
749                        .launch_timeout_testing_circuit_if_appropriate(&netdir)
750                        .await
751                    {
752                        warn_report!(e, "Problem launching a timeout testing circuit");
753                    }
754                    let delay = netdir
755                        .params()
756                        .cbt_testing_delay
757                        .try_into()
758                        .expect("Out-of-bounds value from BoundedInt32");
759
760                    drop((cm, dm));
761                    sched.fire_in(delay);
762                } else {
763                    // wait for the provider to announce some event, which will probably be
764                    // NewConsensus; this is therefore a decent yardstick for rechecking
765                    let _ = dm.events().next().await;
766                    sched.fire();
767                }
768            } else {
769                return;
770            }
771        }
772    }
773
774    /// If we need to launch a testing circuit to judge our circuit
775    /// build timeouts timeouts, do so.
776    ///
777    /// # Note
778    ///
779    /// This function is invoked periodically from
780    /// `continually_launch_timeout_testing_circuits`.
781    async fn launch_timeout_testing_circuit_if_appropriate(&self, netdir: &NetDir) -> Result<()> {
782        if !self.mgr.peek_builder().learning_timeouts() {
783            return Ok(());
784        }
785        // We expire any too-old circuits here, so they don't get
786        // counted towards max_circs.
787        self.expire_circuits().await;
788        let max_circs: u64 = netdir
789            .params()
790            .cbt_max_open_circuits_for_testing
791            .try_into()
792            .expect("Out-of-bounds result from BoundedInt32");
793        if (self.mgr.n_tunnels() as u64) < max_circs {
794            // Actually launch the circuit!
795            let usage = TargetTunnelUsage::TimeoutTesting;
796            let dirinfo = netdir.into();
797            let mgr = Arc::clone(&self.mgr);
798            debug!("Launching a circuit to test build times.");
799            let receiver = mgr.launch_by_usage(&usage, dirinfo)?;
800            // We don't actually care when this circuit is done,
801            // so it's okay to drop the Receiver without awaiting it.
802            drop(receiver);
803        }
804
805        Ok(())
806    }
807
808    /// Run forever, periodically telling `circmgr` to update its persistent
809    /// state.
810    ///
811    /// Exit when we notice that `circmgr` has been dropped.
812    ///
813    /// This is a daemon task: it runs indefinitely in the background.
814    #[allow(clippy::cognitive_complexity)] // because of tracing
815    #[instrument(level = "trace", skip_all)]
816    async fn update_persistent_state<S>(
817        mut sched: TaskSchedule<R>,
818        circmgr: Weak<Self>,
819        statemgr: S,
820    ) where
821        S: StateMgr + std::marker::Send,
822    {
823        while sched.next().await.is_some() {
824            if let Some(circmgr) = Weak::upgrade(&circmgr) {
825                use tor_persist::LockStatus::*;
826
827                match statemgr.try_lock() {
828                    Err(e) => {
829                        error_report!(e, "Problem with state lock file");
830                        break;
831                    }
832                    Ok(NewlyAcquired) => {
833                        info!("We now own the lock on our state files.");
834                        if let Err(e) = circmgr.upgrade_to_owned_persistent_state() {
835                            error_report!(e, "Unable to upgrade to owned state files");
836                            break;
837                        }
838                    }
839                    Ok(AlreadyHeld) => {
840                        if let Err(e) = circmgr.store_persistent_state() {
841                            error_report!(e, "Unable to flush circmgr state");
842                            break;
843                        }
844                    }
845                    Ok(NoLock) => {
846                        if let Err(e) = circmgr.reload_persistent_state() {
847                            error_report!(e, "Unable to reload circmgr state");
848                            break;
849                        }
850                    }
851                }
852            } else {
853                debug!("Circmgr has disappeared; task exiting.");
854                return;
855            }
856            // TODO(nickm): This delay is probably too small.
857            //
858            // Also, we probably don't even want a fixed delay here.  Instead,
859            // we should be updating more frequently when the data is volatile
860            // or has important info to save, and not at all when there are no
861            // changes.
862            sched.fire_in(Duration::from_secs(60));
863        }
864
865        debug!("State update task exiting (potentially due to handle drop).");
866    }
867
868    /// Switch from having an unowned persistent state to having an owned one.
869    ///
870    /// Requires that we hold the lock on the state files.
871    pub(crate) fn upgrade_to_owned_persistent_state(&self) -> Result<()> {
872        self.mgr.peek_builder().upgrade_to_owned_state()?;
873        Ok(())
874    }
875
876    /// Reload state from the state manager.
877    ///
878    /// We only call this method if we _don't_ have the lock on the state
879    /// files.  If we have the lock, we only want to save.
880    pub(crate) fn reload_persistent_state(&self) -> Result<()> {
881        self.mgr.peek_builder().reload_state()?;
882        Ok(())
883    }
884
885    /// Run indefinitely, launching circuits where the preemptive circuit
886    /// predictor thinks it'd be a good idea to have them.
887    ///
888    /// Exit when we notice that `circmgr` or `dirmgr` has been dropped.
889    ///
890    /// This is a daemon task: it runs indefinitely in the background.
891    ///
892    /// # Note
893    ///
894    /// This would be better handled entirely within `tor-circmgr`, like
895    /// other daemon tasks.
896    #[instrument(level = "trace", skip_all)]
897    async fn continually_preemptively_build_circuits<D>(
898        mut sched: TaskSchedule<R>,
899        circmgr: Weak<Self>,
900        dirmgr: Weak<D>,
901    ) where
902        D: NetDirProvider + 'static + ?Sized,
903    {
904        let base_delay = Duration::from_secs(10);
905        let mut retry = RetryDelay::from_duration(base_delay);
906
907        while sched.next().await.is_some() {
908            if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
909                if let Ok(netdir) = dm.netdir(Timeliness::Timely) {
910                    let result = cm
911                        .launch_circuits_preemptively(DirInfo::Directory(&netdir))
912                        .await;
913
914                    let delay = match result {
915                        Ok(()) => {
916                            retry.reset();
917                            base_delay
918                        }
919                        Err(_) => retry.next_delay(&mut rand::rng()),
920                    };
921
922                    sched.fire_in(delay);
923                } else {
924                    // wait for the provider to announce some event, which will probably be
925                    // NewConsensus; this is therefore a decent yardstick for rechecking
926                    let _ = dm.events().next().await;
927                    sched.fire();
928                }
929            } else {
930                return;
931            }
932        }
933    }
934
935    /// Launch circuits preemptively, using the preemptive circuit predictor's
936    /// predictions.
937    ///
938    /// # Note
939    ///
940    /// This function is invoked periodically from
941    /// `continually_preemptively_build_circuits()`.
942    #[allow(clippy::cognitive_complexity)]
943    #[instrument(level = "trace", skip_all)]
944    async fn launch_circuits_preemptively(
945        &self,
946        netdir: DirInfo<'_>,
947    ) -> std::result::Result<(), err::PreemptiveCircError> {
948        trace!("Checking preemptive circuit predictions.");
949        let (circs, threshold) = {
950            let path_config = self.mgr.peek_builder().path_config();
951            let preemptive = self.predictor.lock().expect("preemptive lock poisoned");
952            let threshold = preemptive.config().disable_at_threshold;
953            (preemptive.predict(&path_config), threshold)
954        };
955
956        if self.mgr.n_tunnels() >= threshold {
957            return Ok(());
958        }
959        let mut n_created = 0_usize;
960        let mut n_errors = 0_usize;
961
962        let futures = circs
963            .iter()
964            .map(|usage| self.mgr.get_or_launch(usage, netdir));
965        let results = futures::future::join_all(futures).await;
966        for (i, result) in results.into_iter().enumerate() {
967            match result {
968                Ok((t, TunnelProvenance::NewlyCreated)) => {
969                    debug!(
970                        tunnel_id=%t.unique_id(),
971                        "Preemptive circuit was created for {:?}", circs[i]);
972                    n_created += 1;
973                }
974                Ok((t, TunnelProvenance::Preexisting)) => {
975                    trace!(
976                        tunnel_id=%t.unique_id(),
977                        "Circuit already existed created for {:?}", circs[i]);
978                }
979                Err(e) => {
980                    warn_report!(e, "Failed to build preemptive circuit {:?}", sv(&circs[i]));
981                    n_errors += 1;
982                }
983            }
984        }
985
986        if n_created > 0 || n_errors == 0 {
987            // Either we successfully made a circuit, or we didn't have any
988            // failures while looking for preexisting circuits.  Progress was
989            // made, so there's no need to back off.
990            Ok(())
991        } else {
992            // We didn't build any circuits and we hit at least one error:
993            // We'll call this unsuccessful.
994            Err(err::PreemptiveCircError)
995        }
996    }
997
998    /// Create and return a new (typically anonymous) onion circuit stem
999    /// of type `stem_kind`.
1000    ///
1001    /// If `circ_kind` is provided, we apply additional rules to make sure
1002    /// that this will be usable as a stem for the given kind of onion service circuit.
1003    /// Otherwise, we pick a stem that will probably be useful in general.
1004    ///
1005    /// This circuit is guaranteed not to have been used for any traffic
1006    /// previously, and it will not be given out for any other requests in the
1007    /// future unless explicitly re-registered with a circuit manager.
1008    ///
1009    /// If `planned_target` is provided, then the circuit will be built so that
1010    /// it does not share any family members with the provided target.  (The
1011    /// circuit _will not be_ extended to that target itself!)
1012    ///
1013    /// Used to implement onion service clients and services.
1014    #[cfg(feature = "hs-common")]
1015    #[instrument(level = "trace", skip_all)]
1016    pub(crate) async fn launch_hs_unmanaged<T>(
1017        &self,
1018        planned_target: Option<T>,
1019        dir: &NetDir,
1020        stem_kind: HsCircStemKind,
1021        circ_kind: Option<HsCircKind>,
1022    ) -> Result<B::Tunnel>
1023    where
1024        T: IntoOwnedChanTarget,
1025    {
1026        let usage = TargetTunnelUsage::HsCircBase {
1027            compatible_with_target: planned_target.map(IntoOwnedChanTarget::to_owned),
1028            stem_kind,
1029            circ_kind,
1030        };
1031        let (_, client_circ) = self.mgr.launch_unmanaged(&usage, dir.into()).await?;
1032        Ok(client_circ)
1033    }
1034
1035    /// Return true if `netdir` has enough information to be used for this
1036    /// circuit manager.
1037    ///
1038    /// (This will check whether the netdir is missing any primary guard
1039    /// microdescriptors)
1040    pub(crate) fn netdir_is_sufficient(&self, netdir: &NetDir) -> bool {
1041        self.mgr
1042            .peek_builder()
1043            .guardmgr()
1044            .netdir_is_sufficient(netdir)
1045    }
1046
1047    /// Internal implementation for [`CircMgr::estimate_timeout`].
1048    pub(crate) fn estimate_timeout(
1049        &self,
1050        timeout_action: &timeouts::Action,
1051    ) -> std::time::Duration {
1052        let (timeout, _abandon) = self.mgr.peek_builder().estimator().timeouts(timeout_action);
1053        timeout
1054    }
1055
1056    /// Internal implementation for [`CircMgr::builder`].
1057    pub(crate) fn builder(&self) -> &B {
1058        self.mgr.peek_builder()
1059    }
1060
1061    /// Flush state to the state manager, if there is any unsaved state and
1062    /// we have the lock.
1063    ///
1064    /// Return true if we saved something; false if we didn't have the lock.
1065    pub(crate) fn store_persistent_state(&self) -> Result<bool> {
1066        self.mgr.peek_builder().save_state()
1067    }
1068
1069    /// Expire every circuit that has been dirty for too long.
1070    ///
1071    /// Expired circuits are not closed while they still have users,
1072    /// but they are no longer given out for new requests.
1073    async fn expire_circuits(&self) {
1074        // TODO: I would prefer not to call this at every request, but
1075        // it should be fine for now.  (At some point we may no longer
1076        // need this, or might not need to call it so often, now that
1077        // our circuit expiration runs on scheduled timers via
1078        // spawn_expiration_task.)
1079        let now = self.mgr.peek_runtime().now();
1080
1081        // TODO: Use return value here to implement the above TODO.
1082        let _next_expiration = self.mgr.expire_tunnels(now).await;
1083    }
1084
1085    /// Mark every circuit that we have launched so far as unsuitable for
1086    /// any future requests.  This won't close existing circuits that have
1087    /// streams attached to them, but it will prevent any future streams from
1088    /// being attached.
1089    ///
1090    /// TODO: we may want to expose this eventually.  If we do, we should
1091    /// be very clear that you don't want to use it haphazardly.
1092    pub(crate) fn retire_all_circuits(&self) {
1093        self.mgr.retire_all_tunnels();
1094    }
1095
1096    /// If `circ_id` is the unique identifier for a circuit that we're
1097    /// keeping track of, don't give it out for any future requests.
1098    pub(crate) fn retire_circ(&self, circ_id: &<B::Tunnel as AbstractTunnel>::Id) {
1099        let _ = self.mgr.take_tunnel(circ_id);
1100    }
1101
1102    /// Return a stream of events about our estimated clock skew; these events
1103    /// are `None` when we don't have enough information to make an estimate,
1104    /// and `Some(`[`SkewEstimate`]`)` otherwise.
1105    ///
1106    /// Note that this stream can be lossy: if the estimate changes more than
1107    /// one before you read from the stream, you might only get the most recent
1108    /// update.
1109    pub(crate) fn skew_events(&self) -> ClockSkewEvents {
1110        self.mgr.peek_builder().guardmgr().skew_events()
1111    }
1112
1113    /// Record that a failure occurred on a circuit with a given guard, in a way
1114    /// that makes us unwilling to use that guard for future circuits.
1115    ///
1116    pub(crate) fn note_external_failure(
1117        &self,
1118        target: &impl ChanTarget,
1119        external_failure: ExternalActivity,
1120    ) {
1121        self.mgr
1122            .peek_builder()
1123            .guardmgr()
1124            .note_external_failure(target, external_failure);
1125    }
1126
1127    /// Record that a success occurred on a circuit with a given guard, in a way
1128    /// that makes us possibly willing to use that guard for future circuits.
1129    pub(crate) fn note_external_success(
1130        &self,
1131        target: &impl ChanTarget,
1132        external_activity: ExternalActivity,
1133    ) {
1134        self.mgr
1135            .peek_builder()
1136            .guardmgr()
1137            .note_external_success(target, external_activity);
1138    }
1139}
1140
1141impl<B: AbstractTunnelBuilder<R> + 'static, R: Runtime> Drop for CircMgrInner<B, R> {
1142    fn drop(&mut self) {
1143        match self.store_persistent_state() {
1144            Ok(true) => info!("Flushed persistent state at exit."),
1145            Ok(false) => debug!("Lock not held; no state to flush."),
1146            Err(e) => error_report!(e, "Unable to flush state on circuit manager drop"),
1147        }
1148    }
1149}
1150
1151#[cfg(test)]
1152mod test {
1153    // @@ begin test lint list maintained by maint/add_warning @@
1154    #![allow(clippy::bool_assert_comparison)]
1155    #![allow(clippy::clone_on_copy)]
1156    #![allow(clippy::dbg_macro)]
1157    #![allow(clippy::mixed_attributes_style)]
1158    #![allow(clippy::print_stderr)]
1159    #![allow(clippy::print_stdout)]
1160    #![allow(clippy::single_char_pattern)]
1161    #![allow(clippy::unwrap_used)]
1162    #![allow(clippy::unchecked_time_subtraction)]
1163    #![allow(clippy::useless_vec)]
1164    #![allow(clippy::needless_pass_by_value)]
1165    #![allow(clippy::string_slice)] // See arti#2571
1166    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
1167    use mocks::FakeBuilder;
1168    use tor_guardmgr::GuardMgr;
1169    use tor_linkspec::OwnedChanTarget;
1170    use tor_netdir::testprovider::TestNetDirProvider;
1171    use tor_persist::TestingStateMgr;
1172
1173    use super::*;
1174
1175    #[test]
1176    fn get_params() {
1177        use tor_netdir::{MdReceiver, PartialNetDir};
1178        use tor_netdoc::doc::netstatus::NetParams;
1179        // If it's just fallbackdir, we get the default parameters.
1180        let fb = FallbackList::from([]);
1181        let di: DirInfo<'_> = (&fb).into();
1182
1183        let p1 = di.circ_params(&TargetTunnelUsage::Dir).unwrap();
1184        assert!(!p1.extend_by_ed25519_id);
1185
1186        // Now try with a directory and configured parameters.
1187        let (consensus, microdescs) = tor_netdir::testnet::construct_network().unwrap();
1188        let mut params = NetParams::default();
1189        params.set("circwindow".into(), 100);
1190        params.set("ExtendByEd25519ID".into(), 1);
1191        let mut dir = PartialNetDir::new(consensus, Some(&params));
1192        for m in microdescs {
1193            dir.add_microdesc(m);
1194        }
1195        let netdir = dir.unwrap_if_sufficient().unwrap();
1196        let di: DirInfo<'_> = (&netdir).into();
1197        let p2 = di.circ_params(&TargetTunnelUsage::Dir).unwrap();
1198        assert!(p2.extend_by_ed25519_id);
1199
1200        // Now try with a bogus circwindow value.
1201        let (consensus, microdescs) = tor_netdir::testnet::construct_network().unwrap();
1202        let mut params = NetParams::default();
1203        params.set("circwindow".into(), 100_000);
1204        params.set("ExtendByEd25519ID".into(), 1);
1205        let mut dir = PartialNetDir::new(consensus, Some(&params));
1206        for m in microdescs {
1207            dir.add_microdesc(m);
1208        }
1209        let netdir = dir.unwrap_if_sufficient().unwrap();
1210        let di: DirInfo<'_> = (&netdir).into();
1211        let p2 = di.circ_params(&TargetTunnelUsage::Dir).unwrap();
1212        assert!(p2.extend_by_ed25519_id);
1213    }
1214
1215    fn make_circmgr<R: Runtime>(runtime: R) -> Arc<CircMgrInner<FakeBuilder<R>, R>> {
1216        let config = crate::config::test_config::TestConfig::default();
1217        let statemgr = TestingStateMgr::new();
1218        let guardmgr =
1219            GuardMgr::new(runtime.clone(), statemgr.clone(), &config).expect("Create GuardMgr");
1220        let builder = FakeBuilder::new(
1221            &runtime,
1222            statemgr.clone(),
1223            &tor_guardmgr::TestConfig::default(),
1224        );
1225        let circmgr = Arc::new(CircMgrInner::new_generic(
1226            &config, &runtime, &guardmgr, builder,
1227        ));
1228        let netdir = Arc::new(TestNetDirProvider::new());
1229        CircMgrInner::launch_background_tasks(&circmgr, &runtime, &netdir, statemgr)
1230            .expect("launch CircMgrInner background tasks");
1231        circmgr
1232    }
1233
1234    #[test]
1235    #[cfg(feature = "hs-common")]
1236    fn test_launch_hs_unmanaged() {
1237        tor_rtmock::MockRuntime::test_with_various(|runtime| async move {
1238            let circmgr = make_circmgr(runtime.clone());
1239            let netdir = tor_netdir::testnet::construct_netdir()
1240                .unwrap_if_sufficient()
1241                .unwrap();
1242
1243            let (ret_tx, ret_rx) = tor_async_utils::oneshot::channel();
1244            runtime.spawn_identified("launch_hs_unamanged", async move {
1245                ret_tx
1246                    .send(
1247                        circmgr
1248                            .launch_hs_unmanaged::<OwnedChanTarget>(
1249                                None,
1250                                &netdir,
1251                                HsCircStemKind::Naive,
1252                                None,
1253                            )
1254                            .await,
1255                    )
1256                    .unwrap();
1257            });
1258            runtime.advance_by(Duration::from_millis(60)).await;
1259            ret_rx.await.unwrap().unwrap();
1260        });
1261    }
1262}