Skip to main content

tor_circmgr/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![doc = include_str!("../README.md")]
3// @@ begin lint list maintained by maint/add_warning @@
4#![allow(renamed_and_removed_lints)] // @@REMOVE_WHEN(ci_arti_stable)
5#![allow(unknown_lints)] // @@REMOVE_WHEN(ci_arti_nightly)
6#![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_time_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness
39#![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945
41#![allow(clippy::result_large_err)] // temporary workaround for arti#587
42#![allow(clippy::needless_raw_string_hashes)] // complained-about code is fine, often best
43#![allow(clippy::needless_lifetimes)] // See arti#1765
44#![allow(mismatched_lifetime_syntaxes)] // temporary workaround for arti#2060
45#![allow(clippy::collapsible_if)] // See arti#2342
46#![deny(clippy::unused_async)]
47//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
48
49// TODO #1645 (either remove this, or decide to have it everywhere)
50#![cfg_attr(not(all(feature = "full", feature = "experimental")), allow(unused))]
51
52use build::TunnelBuilder;
53use mgr::{AbstractTunnel, AbstractTunnelBuilder};
54use tor_basic_utils::retry::RetryDelay;
55use tor_chanmgr::ChanMgr;
56use tor_dircommon::fallback::FallbackList;
57use tor_error::{error_report, warn_report};
58use tor_guardmgr::RetireCircuits;
59use tor_linkspec::ChanTarget;
60use tor_netdir::{DirEvent, NetDir, NetDirProvider, Timeliness};
61use tor_proto::circuit::UniqId;
62use tor_proto::client::circuit::CircParameters;
63use tor_rtcompat::Runtime;
64
65#[cfg(any(feature = "specific-relay", feature = "hs-common"))]
66use tor_linkspec::IntoOwnedChanTarget;
67
68use futures::StreamExt;
69use std::sync::{Arc, Mutex, Weak};
70use std::time::{Duration, Instant};
71use tor_rtcompat::SpawnExt;
72use tracing::{debug, info, instrument, trace, warn};
73
74#[cfg(feature = "testing")]
75pub use config::test_config::TestConfig;
76
77pub mod build;
78mod config;
79mod err;
80#[cfg(feature = "hs-common")]
81pub mod hspool;
82mod impls;
83pub mod isolation;
84mod mgr;
85#[cfg(test)]
86mod mocks;
87mod preemptive;
88pub mod timeouts;
89mod tunnel;
90mod usage;
91
92// Can't apply `visibility` to modules.
93cfg_if::cfg_if! {
94    if #[cfg(feature = "experimental-api")] {
95        pub mod path;
96    } else {
97        pub(crate) mod path;
98    }
99}
100
101pub use err::Error;
102pub use isolation::IsolationToken;
103pub use tor_guardmgr::{ClockSkewEvents, GuardMgrConfig, SkewEstimate};
104pub use tunnel::{
105    ClientDataTunnel, ClientDirTunnel, ClientOnionServiceDataTunnel, ClientOnionServiceDirTunnel,
106    ClientOnionServiceIntroTunnel, ServiceOnionServiceDataTunnel, ServiceOnionServiceDirTunnel,
107    ServiceOnionServiceIntroTunnel,
108};
109#[cfg(feature = "conflux")]
110pub use tunnel::{
111    ClientMultiPathDataTunnel, ClientMultiPathOnionServiceDataTunnel,
112    ServiceMultiPathOnionServiceDataTunnel,
113};
114pub use usage::{TargetPort, TargetPorts};
115
116pub use config::{
117    CircMgrConfig, CircuitTiming, CircuitTimingBuilder, PathConfig, PathConfigBuilder,
118    PreemptiveCircuitConfig, PreemptiveCircuitConfigBuilder,
119};
120
121use crate::isolation::StreamIsolation;
122use crate::mgr::TunnelProvenance;
123use crate::preemptive::PreemptiveCircuitPredictor;
124use usage::TargetTunnelUsage;
125
126use safelog::sensitive as sv;
127#[cfg(feature = "geoip")]
128use tor_geoip::CountryCode;
129pub use tor_guardmgr::{ExternalActivity, FirstHopId};
130use tor_persist::StateMgr;
131use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
132
133#[cfg(feature = "hs-common")]
134use crate::hspool::{HsCircKind, HsCircStemKind};
135#[cfg(all(feature = "vanguards", feature = "hs-common"))]
136use tor_guardmgr::vanguards::VanguardMgr;
137
138/// A Result type as returned from this crate.
139pub type Result<T> = std::result::Result<T, Error>;
140
141/// Type alias for dynamic StorageHandle that can handle our timeout state.
142type TimeoutStateHandle = tor_persist::DynStorageHandle<timeouts::pareto::ParetoTimeoutState>;
143
144/// Key used to load timeout state information.
145const PARETO_TIMEOUT_DATA_KEY: &str = "circuit_timeouts";
146
147/// Represents what we know about the Tor network.
148///
149/// This can either be a complete directory, or a list of fallbacks.
150///
151/// Not every DirInfo can be used to build every kind of circuit:
152/// if you try to build a path with an inadequate DirInfo, you'll get a
153/// NeedConsensus error.
154#[derive(Debug, Copy, Clone)]
155#[non_exhaustive]
156pub enum DirInfo<'a> {
157    /// A list of fallbacks, for use when we don't know a network directory.
158    Fallbacks(&'a FallbackList),
159    /// A complete network directory
160    Directory(&'a NetDir),
161    /// No information: we can only build one-hop paths: and that, only if the
162    /// guard manager knows some guards or fallbacks.
163    Nothing,
164}
165
166impl<'a> From<&'a FallbackList> for DirInfo<'a> {
167    fn from(v: &'a FallbackList) -> DirInfo<'a> {
168        DirInfo::Fallbacks(v)
169    }
170}
171impl<'a> From<&'a NetDir> for DirInfo<'a> {
172    fn from(v: &'a NetDir) -> DirInfo<'a> {
173        DirInfo::Directory(v)
174    }
175}
176impl<'a> DirInfo<'a> {
177    /// Return a set of circuit parameters for this DirInfo.
178    fn circ_params(&self, usage: &TargetTunnelUsage) -> Result<CircParameters> {
179        use tor_netdir::params::NetParameters;
180        // We use a common function for both cases here to be sure that
181        // we look at the defaults from NetParameters code.
182        let defaults = NetParameters::default();
183        let net_params = match self {
184            DirInfo::Directory(d) => d.params(),
185            _ => &defaults,
186        };
187        match usage {
188            #[cfg(feature = "hs-common")]
189            TargetTunnelUsage::HsCircBase { .. } => {
190                build::onion_circparams_from_netparams(net_params)
191            }
192            _ => build::exit_circparams_from_netparams(net_params),
193        }
194    }
195}
196
197/// A Circuit Manager (CircMgr) manages a set of circuits, returning them
198/// when they're suitable, and launching them if they don't already exist.
199///
200/// Right now, its notion of "suitable" is quite rudimentary: it just
201/// believes in two kinds of circuits: Exit circuits, and directory
202/// circuits.  Exit circuits are ones that were created to connect to
203/// a set of ports; directory circuits were made to talk to directory caches.
204///
205/// This is a "handle"; clones of it share state.
206pub struct CircMgr<R: Runtime>(Arc<CircMgrInner<build::TunnelBuilder<R>, R>>);
207
208impl<R: Runtime> CircMgr<R> {
209    /// Construct a new circuit manager.
210    ///
211    /// # Usage note
212    ///
213    /// For the manager to work properly, you will need to call `CircMgr::launch_background_tasks`.
214    pub fn new<SM, CFG: CircMgrConfig>(
215        config: &CFG,
216        storage: SM,
217        runtime: &R,
218        chanmgr: Arc<ChanMgr<R>>,
219        guardmgr: &tor_guardmgr::GuardMgr<R>,
220    ) -> Result<Self>
221    where
222        SM: tor_persist::StateMgr + Clone + Send + Sync + 'static,
223    {
224        Ok(Self(Arc::new(CircMgrInner::new(
225            config, storage, runtime, chanmgr, guardmgr,
226        )?)))
227    }
228
229    /// Return a circuit suitable for sending one-hop BEGINDIR streams,
230    /// launching it if necessary.
231    #[instrument(level = "trace", skip_all)]
232    pub async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result<ClientDirTunnel> {
233        let tunnel = self.0.get_or_launch_dir(netdir).await?;
234        Ok(tunnel.into())
235    }
236
237    /// Return a circuit suitable for exiting to all of the provided
238    /// `ports`, launching it if necessary.
239    ///
240    /// If the list of ports is empty, then the chosen circuit will
241    /// still end at _some_ exit.
242    #[instrument(level = "trace", skip_all)]
243    pub async fn get_or_launch_exit(
244        &self,
245        netdir: DirInfo<'_>, // TODO: This has to be a NetDir.
246        ports: &[TargetPort],
247        isolation: StreamIsolation,
248        // TODO GEOIP: this cannot be stabilised like this, since Cargo features need to be
249        //             additive. The function should be refactored to be builder-like.
250        #[cfg(feature = "geoip")] country_code: Option<CountryCode>,
251    ) -> Result<ClientDataTunnel> {
252        let tunnel = self
253            .0
254            .get_or_launch_exit(
255                netdir,
256                ports,
257                isolation,
258                #[cfg(feature = "geoip")]
259                country_code,
260            )
261            .await?;
262        Ok(tunnel.into())
263    }
264
265    /// Return a circuit to a specific relay, suitable for using for direct
266    /// (one-hop) directory downloads.
267    ///
268    /// This could be used, for example, to download a descriptor for a bridge.
269    #[cfg(feature = "specific-relay")]
270    #[instrument(level = "trace", skip_all)]
271    pub async fn get_or_launch_dir_specific<T: IntoOwnedChanTarget>(
272        &self,
273        target: T,
274    ) -> Result<ClientDirTunnel> {
275        let tunnel = self.0.get_or_launch_dir_specific(target).await?;
276        Ok(tunnel.into())
277    }
278
279    /// Launch the periodic daemon tasks required by the manager to function properly.
280    ///
281    /// Returns a set of [`TaskHandle`]s that can be used to manage the daemon tasks.
282    //
283    // NOTE(eta): The ?Sized on D is so we can pass a trait object in.
284    #[instrument(level = "trace", skip_all)]
285    pub fn launch_background_tasks<D, S>(
286        self: &Arc<Self>,
287        runtime: &R,
288        dir_provider: &Arc<D>,
289        state_mgr: S,
290    ) -> Result<Vec<TaskHandle>>
291    where
292        D: NetDirProvider + 'static + ?Sized,
293        S: StateMgr + std::marker::Send + 'static,
294    {
295        CircMgrInner::launch_background_tasks(&self.0.clone(), runtime, dir_provider, state_mgr)
296    }
297
298    /// Return true if `netdir` has enough information to be used for this
299    /// circuit manager.
300    ///
301    /// (This will check whether the netdir is missing any primary guard
302    /// microdescriptors)
303    pub fn netdir_is_sufficient(&self, netdir: &NetDir) -> bool {
304        self.0.netdir_is_sufficient(netdir)
305    }
306
307    /// If `circ_id` is the unique identifier for a circuit that we're
308    /// keeping track of, don't give it out for any future requests.
309    pub fn retire_circ(&self, circ_id: &UniqId) {
310        self.0.retire_circ(circ_id);
311    }
312
313    /// Record that a failure occurred on a circuit with a given guard, in a way
314    /// that makes us unwilling to use that guard for future circuits.
315    ///
316    pub fn note_external_failure(
317        &self,
318        target: &impl ChanTarget,
319        external_failure: ExternalActivity,
320    ) {
321        self.0.note_external_failure(target, external_failure);
322    }
323
324    /// Record that a success occurred on a circuit with a given guard, in a way
325    /// that makes us possibly willing to use that guard for future circuits.
326    pub fn note_external_success(
327        &self,
328        target: &impl ChanTarget,
329        external_activity: ExternalActivity,
330    ) {
331        self.0.note_external_success(target, external_activity);
332    }
333
334    /// Return a stream of events about our estimated clock skew; these events
335    /// are `None` when we don't have enough information to make an estimate,
336    /// and `Some(`[`SkewEstimate`]`)` otherwise.
337    ///
338    /// Note that this stream can be lossy: if the estimate changes more than
339    /// one before you read from the stream, you might only get the most recent
340    /// update.
341    pub fn skew_events(&self) -> ClockSkewEvents {
342        self.0.skew_events()
343    }
344
345    /// Try to change our configuration settings to `new_config`.
346    ///
347    /// The actual behavior here will depend on the value of `how`.
348    ///
349    /// Returns whether any of the circuit pools should be cleared.
350    #[instrument(level = "trace", skip_all)]
351    pub fn reconfigure<CFG: CircMgrConfig>(
352        &self,
353        new_config: &CFG,
354        how: tor_config::Reconfigure,
355    ) -> std::result::Result<RetireCircuits, tor_config::ReconfigureError> {
356        self.0.reconfigure(new_config, how)
357    }
358
359    /// Return an estimate-based delay for how long a given
360    /// [`Action`](timeouts::Action) should be allowed to complete.
361    ///
362    /// Note that **you do not need to use this function** in order to get
363    /// reasonable timeouts for the circuit-building operations provided by the
364    /// `tor-circmgr` crate: those, unless specifically noted, always use these
365    /// timeouts to cancel circuit operations that have taken too long.
366    ///
367    /// Instead, you should only use this function when you need to estimate how
368    /// long some _other_ operation should take to complete.  For example, if
369    /// you are sending a request over a 3-hop circuit and waiting for a reply,
370    /// you might choose to wait for `estimate_timeout(Action::RoundTrip {
371    /// length: 3 })`.
372    ///
373    /// Note also that this function returns a _timeout_ that the operation
374    /// should be permitted to complete, not an estimated Duration that the
375    /// operation _will_ take to complete. Timeouts are chosen to ensure that
376    /// most operations will complete, but very slow ones will not.  So even if
377    /// we expect that a circuit will complete in (say) 3 seconds, we might
378    /// still allow a timeout of 4.5 seconds, to ensure that most circuits can
379    /// complete.
380    ///
381    /// Estimate-based timeouts may change over time, given observations on the
382    /// actual amount of time needed for circuits to complete building.  If not
383    /// enough information has been gathered, a reasonable default will be used.
384    pub fn estimate_timeout(&self, timeout_action: &timeouts::Action) -> std::time::Duration {
385        self.0.estimate_timeout(timeout_action)
386    }
387
388    /// Return a reference to the associated CircuitBuilder that this CircMgr
389    /// will use to create its circuits.
390    #[cfg(feature = "experimental-api")]
391    pub fn builder(&self) -> &TunnelBuilder<R> {
392        CircMgrInner::builder(&self.0)
393    }
394}
395
396/// Internal object used to implement CircMgr, which allows for mocking.
397#[derive(Clone)]
398pub(crate) struct CircMgrInner<B: AbstractTunnelBuilder<R> + 'static, R: Runtime> {
399    /// The underlying circuit manager object that implements our behavior.
400    mgr: Arc<mgr::AbstractTunnelMgr<B, R>>,
401    /// A preemptive circuit predictor, for, uh, building circuits preemptively.
402    predictor: Arc<Mutex<PreemptiveCircuitPredictor>>,
403}
404
405impl<R: Runtime> CircMgrInner<TunnelBuilder<R>, R> {
406    /// Construct a new circuit manager.
407    ///
408    /// # Usage note
409    ///
410    /// For the manager to work properly, you will need to call `CircMgr::launch_background_tasks`.
411    #[allow(clippy::unnecessary_wraps)]
412    pub(crate) fn new<SM, CFG: CircMgrConfig>(
413        config: &CFG,
414        storage: SM,
415        runtime: &R,
416        chanmgr: Arc<ChanMgr<R>>,
417        guardmgr: &tor_guardmgr::GuardMgr<R>,
418    ) -> Result<Self>
419    where
420        SM: tor_persist::StateMgr + Clone + Send + Sync + 'static,
421    {
422        #[cfg(all(feature = "vanguards", feature = "hs-common"))]
423        let vanguardmgr = {
424            // TODO(#1382): we need a way of checking if this arti instance
425            // is running an onion service or not.
426            //
427            // Perhaps this information should be provided by CircMgrConfig.
428            let has_onion_svc = false;
429            VanguardMgr::new(
430                config.vanguard_config(),
431                runtime.clone(),
432                storage.clone(),
433                has_onion_svc,
434            )?
435        };
436
437        let storage_handle = storage.create_handle(PARETO_TIMEOUT_DATA_KEY);
438
439        let builder = build::TunnelBuilder::new(
440            runtime.clone(),
441            chanmgr,
442            config.path_rules().clone(),
443            storage_handle,
444            guardmgr.clone(),
445            #[cfg(all(feature = "vanguards", feature = "hs-common"))]
446            vanguardmgr,
447        );
448
449        Ok(Self::new_generic(config, runtime, guardmgr, builder))
450    }
451}
452
453impl<B: AbstractTunnelBuilder<R> + 'static, R: Runtime> CircMgrInner<B, R> {
454    /// Generic implementation for [`CircMgrInner::new`]
455    pub(crate) fn new_generic<CFG: CircMgrConfig>(
456        config: &CFG,
457        runtime: &R,
458        guardmgr: &tor_guardmgr::GuardMgr<R>,
459        builder: B,
460    ) -> Self {
461        let preemptive = Arc::new(Mutex::new(PreemptiveCircuitPredictor::new(
462            config.preemptive_circuits().clone(),
463        )));
464
465        guardmgr.set_filter(config.path_rules().build_guard_filter());
466
467        let mgr =
468            mgr::AbstractTunnelMgr::new(builder, runtime.clone(), config.circuit_timing().clone());
469
470        CircMgrInner {
471            mgr: Arc::new(mgr),
472            predictor: preemptive,
473        }
474    }
475
476    /// Launch the periodic daemon tasks required by the manager to function properly.
477    ///
478    /// Returns a set of [`TaskHandle`]s that can be used to manage the daemon tasks.
479    //
480    // NOTE(eta): The ?Sized on D is so we can pass a trait object in.
481    #[instrument(level = "trace", skip_all)]
482    pub(crate) fn launch_background_tasks<D, S>(
483        self: &Arc<Self>,
484        runtime: &R,
485        dir_provider: &Arc<D>,
486        state_mgr: S,
487    ) -> Result<Vec<TaskHandle>>
488    where
489        D: NetDirProvider + 'static + ?Sized,
490        S: StateMgr + std::marker::Send + 'static,
491    {
492        let mut ret = vec![];
493
494        runtime
495            .spawn(Self::keep_circmgr_params_updated(
496                dir_provider.events(),
497                Arc::downgrade(self),
498                Arc::downgrade(dir_provider),
499            ))
500            .map_err(|e| Error::from_spawn("circmgr parameter updater", e))?;
501
502        let (sched, handle) = TaskSchedule::new(runtime.clone());
503        ret.push(handle);
504
505        runtime
506            .spawn(Self::update_persistent_state(
507                sched,
508                Arc::downgrade(self),
509                state_mgr,
510            ))
511            .map_err(|e| Error::from_spawn("persistent state updater", e))?;
512
513        let (sched, handle) = TaskSchedule::new(runtime.clone());
514        ret.push(handle);
515
516        runtime
517            .spawn(Self::continually_launch_timeout_testing_circuits(
518                sched,
519                Arc::downgrade(self),
520                Arc::downgrade(dir_provider),
521            ))
522            .map_err(|e| Error::from_spawn("timeout-probe circuit launcher", e))?;
523
524        let (sched, handle) = TaskSchedule::new(runtime.clone());
525        ret.push(handle);
526
527        runtime
528            .spawn(Self::continually_preemptively_build_circuits(
529                sched,
530                Arc::downgrade(self),
531                Arc::downgrade(dir_provider),
532            ))
533            .map_err(|e| Error::from_spawn("preemptive circuit launcher", e))?;
534
535        self.mgr
536            .peek_builder()
537            .guardmgr()
538            .install_netdir_provider(&dir_provider.clone().upcast_arc())?;
539
540        #[cfg(all(feature = "vanguards", feature = "hs-common"))]
541        {
542            let () = self
543                .mgr
544                .peek_builder()
545                .vanguardmgr()
546                .launch_background_tasks(&dir_provider.clone().upcast_arc())?;
547        }
548
549        Ok(ret)
550    }
551
552    /// Return a circuit suitable for sending one-hop BEGINDIR streams,
553    /// launching it if necessary.
554    #[instrument(level = "trace", skip_all)]
555    pub(crate) async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result<Arc<B::Tunnel>> {
556        self.expire_circuits().await;
557        let usage = TargetTunnelUsage::Dir;
558        self.mgr.get_or_launch(&usage, netdir).await.map(|(c, _)| c)
559    }
560
561    /// Return a circuit suitable for exiting to all of the provided
562    /// `ports`, launching it if necessary.
563    ///
564    /// If the list of ports is empty, then the chosen circuit will
565    /// still end at _some_ exit.
566    #[instrument(level = "trace", skip_all)]
567    pub(crate) async fn get_or_launch_exit(
568        &self,
569        netdir: DirInfo<'_>, // TODO: This has to be a NetDir.
570        ports: &[TargetPort],
571        isolation: StreamIsolation,
572        // TODO GEOIP: this cannot be stabilised like this, since Cargo features need to be
573        //             additive. The function should be refactored to be builder-like.
574        #[cfg(feature = "geoip")] country_code: Option<CountryCode>,
575    ) -> Result<Arc<B::Tunnel>> {
576        self.expire_circuits().await;
577        let time = Instant::now();
578        {
579            let mut predictive = self.predictor.lock().expect("preemptive lock poisoned");
580            if ports.is_empty() {
581                predictive.note_usage(None, time);
582            } else {
583                for port in ports.iter() {
584                    predictive.note_usage(Some(*port), time);
585                }
586            }
587        }
588        let require_stability = ports.iter().any(|p| {
589            self.mgr
590                .peek_builder()
591                .path_config()
592                .long_lived_ports
593                .contains(&p.port)
594        });
595        let ports = ports.iter().map(Clone::clone).collect();
596        #[cfg(not(feature = "geoip"))]
597        let country_code = None;
598        let usage = TargetTunnelUsage::Exit {
599            ports,
600            isolation,
601            country_code,
602            require_stability,
603        };
604        self.mgr.get_or_launch(&usage, netdir).await.map(|(c, _)| c)
605    }
606
607    /// Return a circuit to a specific relay, suitable for using for direct
608    /// (one-hop) directory downloads.
609    ///
610    /// This could be used, for example, to download a descriptor for a bridge.
611    #[cfg(feature = "specific-relay")]
612    #[instrument(level = "trace", skip_all)]
613    pub(crate) async fn get_or_launch_dir_specific<T: IntoOwnedChanTarget>(
614        &self,
615        target: T,
616    ) -> Result<Arc<B::Tunnel>> {
617        self.expire_circuits().await;
618        let usage = TargetTunnelUsage::DirSpecificTarget(target.to_owned());
619        self.mgr
620            .get_or_launch(&usage, DirInfo::Nothing)
621            .await
622            .map(|(c, _)| c)
623    }
624
625    /// Try to change our configuration settings to `new_config`.
626    ///
627    /// The actual behavior here will depend on the value of `how`.
628    ///
629    /// Returns whether any of the circuit pools should be cleared.
630    #[instrument(level = "trace", skip_all)]
631    pub(crate) fn reconfigure<CFG: CircMgrConfig>(
632        &self,
633        new_config: &CFG,
634        how: tor_config::Reconfigure,
635    ) -> std::result::Result<RetireCircuits, tor_config::ReconfigureError> {
636        let old_path_rules = self.mgr.peek_builder().path_config();
637        let predictor = self.predictor.lock().expect("poisoned lock");
638        let preemptive_circuits = predictor.config();
639        if preemptive_circuits.initial_predicted_ports
640            != new_config.preemptive_circuits().initial_predicted_ports
641        {
642            // This change has no effect, since the list of ports was _initial_.
643            how.cannot_change("preemptive_circuits.initial_predicted_ports")?;
644        }
645
646        if how == tor_config::Reconfigure::CheckAllOrNothing {
647            return Ok(RetireCircuits::None);
648        }
649
650        let retire_because_of_guardmgr =
651            self.mgr.peek_builder().guardmgr().reconfigure(new_config)?;
652
653        #[cfg(all(feature = "vanguards", feature = "hs-common"))]
654        let retire_because_of_vanguardmgr = self
655            .mgr
656            .peek_builder()
657            .vanguardmgr()
658            .reconfigure(new_config.vanguard_config())?;
659
660        let new_reachable = &new_config.path_rules().reachable_addrs;
661        if new_reachable != &old_path_rules.reachable_addrs {
662            let filter = new_config.path_rules().build_guard_filter();
663            self.mgr.peek_builder().guardmgr().set_filter(filter);
664        }
665
666        let discard_all_circuits = !new_config
667            .path_rules()
668            .at_least_as_permissive_as(&old_path_rules)
669            || retire_because_of_guardmgr != tor_guardmgr::RetireCircuits::None;
670
671        #[cfg(all(feature = "vanguards", feature = "hs-common"))]
672        let discard_all_circuits = discard_all_circuits
673            || retire_because_of_vanguardmgr != tor_guardmgr::RetireCircuits::None;
674
675        self.mgr
676            .peek_builder()
677            .set_path_config(new_config.path_rules().clone());
678        self.mgr
679            .set_circuit_timing(new_config.circuit_timing().clone());
680        predictor.set_config(new_config.preemptive_circuits().clone());
681
682        if discard_all_circuits {
683            // TODO(nickm): Someday, we might want to take a more lenient approach, and only
684            // retire those circuits that do not conform to the new path rules,
685            // or do not conform to the new guard configuration.
686            info!("Path configuration has become more restrictive: retiring existing circuits.");
687            self.retire_all_circuits();
688            return Ok(RetireCircuits::All);
689        }
690        Ok(RetireCircuits::None)
691    }
692
693    /// Whenever a [`DirEvent::NewConsensus`] arrives on `events`, update
694    /// `circmgr` with the consensus parameters from `dirmgr`.
695    ///
696    /// Exit when `events` is closed, or one of `circmgr` or `dirmgr` becomes
697    /// dangling.
698    ///
699    /// This is a daemon task: it runs indefinitely in the background.
700    #[instrument(level = "trace", skip_all)]
701    async fn keep_circmgr_params_updated<D>(
702        mut events: impl futures::Stream<Item = DirEvent> + Unpin,
703        circmgr: Weak<Self>,
704        dirmgr: Weak<D>,
705    ) where
706        D: NetDirProvider + 'static + ?Sized,
707    {
708        use DirEvent::*;
709        while let Some(event) = events.next().await {
710            if matches!(event, NewConsensus) {
711                if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
712                    if let Ok(netdir) = dm.netdir(Timeliness::Timely) {
713                        cm.update_network_parameters(netdir.params());
714                    }
715                } else {
716                    debug!("Circmgr or dirmgr has disappeared; task exiting.");
717                    break;
718                }
719            }
720        }
721    }
722
723    /// Reconfigure this circuit manager using the latest set of
724    /// network parameters.
725    fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters) {
726        self.mgr.update_network_parameters(p);
727        self.mgr.peek_builder().update_network_parameters(p);
728    }
729
730    /// Run indefinitely, launching circuits as needed to get a good
731    /// estimate for our circuit build timeouts.
732    ///
733    /// Exit when we notice that `circmgr` or `dirmgr` has been dropped.
734    ///
735    /// This is a daemon task: it runs indefinitely in the background.
736    #[instrument(level = "trace", skip_all)]
737    async fn continually_launch_timeout_testing_circuits<D>(
738        mut sched: TaskSchedule<R>,
739        circmgr: Weak<Self>,
740        dirmgr: Weak<D>,
741    ) where
742        D: NetDirProvider + 'static + ?Sized,
743    {
744        while sched.next().await.is_some() {
745            if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
746                if let Ok(netdir) = dm.netdir(Timeliness::Unchecked) {
747                    if let Err(e) = cm
748                        .launch_timeout_testing_circuit_if_appropriate(&netdir)
749                        .await
750                    {
751                        warn_report!(e, "Problem launching a timeout testing circuit");
752                    }
753                    let delay = netdir
754                        .params()
755                        .cbt_testing_delay
756                        .try_into()
757                        .expect("Out-of-bounds value from BoundedInt32");
758
759                    drop((cm, dm));
760                    sched.fire_in(delay);
761                } else {
762                    // wait for the provider to announce some event, which will probably be
763                    // NewConsensus; this is therefore a decent yardstick for rechecking
764                    let _ = dm.events().next().await;
765                    sched.fire();
766                }
767            } else {
768                return;
769            }
770        }
771    }
772
773    /// If we need to launch a testing circuit to judge our circuit
774    /// build timeouts timeouts, do so.
775    ///
776    /// # Note
777    ///
778    /// This function is invoked periodically from
779    /// `continually_launch_timeout_testing_circuits`.
780    async fn launch_timeout_testing_circuit_if_appropriate(&self, netdir: &NetDir) -> Result<()> {
781        if !self.mgr.peek_builder().learning_timeouts() {
782            return Ok(());
783        }
784        // We expire any too-old circuits here, so they don't get
785        // counted towards max_circs.
786        self.expire_circuits().await;
787        let max_circs: u64 = netdir
788            .params()
789            .cbt_max_open_circuits_for_testing
790            .try_into()
791            .expect("Out-of-bounds result from BoundedInt32");
792        if (self.mgr.n_tunnels() as u64) < max_circs {
793            // Actually launch the circuit!
794            let usage = TargetTunnelUsage::TimeoutTesting;
795            let dirinfo = netdir.into();
796            let mgr = Arc::clone(&self.mgr);
797            debug!("Launching a circuit to test build times.");
798            let receiver = mgr.launch_by_usage(&usage, dirinfo)?;
799            // We don't actually care when this circuit is done,
800            // so it's okay to drop the Receiver without awaiting it.
801            drop(receiver);
802        }
803
804        Ok(())
805    }
806
807    /// Run forever, periodically telling `circmgr` to update its persistent
808    /// state.
809    ///
810    /// Exit when we notice that `circmgr` has been dropped.
811    ///
812    /// This is a daemon task: it runs indefinitely in the background.
813    #[allow(clippy::cognitive_complexity)] // because of tracing
814    #[instrument(level = "trace", skip_all)]
815    async fn update_persistent_state<S>(
816        mut sched: TaskSchedule<R>,
817        circmgr: Weak<Self>,
818        statemgr: S,
819    ) where
820        S: StateMgr + std::marker::Send,
821    {
822        while sched.next().await.is_some() {
823            if let Some(circmgr) = Weak::upgrade(&circmgr) {
824                use tor_persist::LockStatus::*;
825
826                match statemgr.try_lock() {
827                    Err(e) => {
828                        error_report!(e, "Problem with state lock file");
829                        break;
830                    }
831                    Ok(NewlyAcquired) => {
832                        info!("We now own the lock on our state files.");
833                        if let Err(e) = circmgr.upgrade_to_owned_persistent_state() {
834                            error_report!(e, "Unable to upgrade to owned state files");
835                            break;
836                        }
837                    }
838                    Ok(AlreadyHeld) => {
839                        if let Err(e) = circmgr.store_persistent_state() {
840                            error_report!(e, "Unable to flush circmgr state");
841                            break;
842                        }
843                    }
844                    Ok(NoLock) => {
845                        if let Err(e) = circmgr.reload_persistent_state() {
846                            error_report!(e, "Unable to reload circmgr state");
847                            break;
848                        }
849                    }
850                }
851            } else {
852                debug!("Circmgr has disappeared; task exiting.");
853                return;
854            }
855            // TODO(nickm): This delay is probably too small.
856            //
857            // Also, we probably don't even want a fixed delay here.  Instead,
858            // we should be updating more frequently when the data is volatile
859            // or has important info to save, and not at all when there are no
860            // changes.
861            sched.fire_in(Duration::from_secs(60));
862        }
863
864        debug!("State update task exiting (potentially due to handle drop).");
865    }
866
867    /// Switch from having an unowned persistent state to having an owned one.
868    ///
869    /// Requires that we hold the lock on the state files.
870    pub(crate) fn upgrade_to_owned_persistent_state(&self) -> Result<()> {
871        self.mgr.peek_builder().upgrade_to_owned_state()?;
872        Ok(())
873    }
874
875    /// Reload state from the state manager.
876    ///
877    /// We only call this method if we _don't_ have the lock on the state
878    /// files.  If we have the lock, we only want to save.
879    pub(crate) fn reload_persistent_state(&self) -> Result<()> {
880        self.mgr.peek_builder().reload_state()?;
881        Ok(())
882    }
883
884    /// Run indefinitely, launching circuits where the preemptive circuit
885    /// predictor thinks it'd be a good idea to have them.
886    ///
887    /// Exit when we notice that `circmgr` or `dirmgr` has been dropped.
888    ///
889    /// This is a daemon task: it runs indefinitely in the background.
890    ///
891    /// # Note
892    ///
893    /// This would be better handled entirely within `tor-circmgr`, like
894    /// other daemon tasks.
895    #[instrument(level = "trace", skip_all)]
896    async fn continually_preemptively_build_circuits<D>(
897        mut sched: TaskSchedule<R>,
898        circmgr: Weak<Self>,
899        dirmgr: Weak<D>,
900    ) where
901        D: NetDirProvider + 'static + ?Sized,
902    {
903        let base_delay = Duration::from_secs(10);
904        let mut retry = RetryDelay::from_duration(base_delay);
905
906        while sched.next().await.is_some() {
907            if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
908                if let Ok(netdir) = dm.netdir(Timeliness::Timely) {
909                    let result = cm
910                        .launch_circuits_preemptively(DirInfo::Directory(&netdir))
911                        .await;
912
913                    let delay = match result {
914                        Ok(()) => {
915                            retry.reset();
916                            base_delay
917                        }
918                        Err(_) => retry.next_delay(&mut rand::rng()),
919                    };
920
921                    sched.fire_in(delay);
922                } else {
923                    // wait for the provider to announce some event, which will probably be
924                    // NewConsensus; this is therefore a decent yardstick for rechecking
925                    let _ = dm.events().next().await;
926                    sched.fire();
927                }
928            } else {
929                return;
930            }
931        }
932    }
933
934    /// Launch circuits preemptively, using the preemptive circuit predictor's
935    /// predictions.
936    ///
937    /// # Note
938    ///
939    /// This function is invoked periodically from
940    /// `continually_preemptively_build_circuits()`.
941    #[allow(clippy::cognitive_complexity)]
942    #[instrument(level = "trace", skip_all)]
943    async fn launch_circuits_preemptively(
944        &self,
945        netdir: DirInfo<'_>,
946    ) -> std::result::Result<(), err::PreemptiveCircError> {
947        trace!("Checking preemptive circuit predictions.");
948        let (circs, threshold) = {
949            let path_config = self.mgr.peek_builder().path_config();
950            let preemptive = self.predictor.lock().expect("preemptive lock poisoned");
951            let threshold = preemptive.config().disable_at_threshold;
952            (preemptive.predict(&path_config), threshold)
953        };
954
955        if self.mgr.n_tunnels() >= threshold {
956            return Ok(());
957        }
958        let mut n_created = 0_usize;
959        let mut n_errors = 0_usize;
960
961        let futures = circs
962            .iter()
963            .map(|usage| self.mgr.get_or_launch(usage, netdir));
964        let results = futures::future::join_all(futures).await;
965        for (i, result) in results.into_iter().enumerate() {
966            match result {
967                Ok((_, TunnelProvenance::NewlyCreated)) => {
968                    debug!("Preeemptive circuit was created for {:?}", circs[i]);
969                    n_created += 1;
970                }
971                Ok((_, TunnelProvenance::Preexisting)) => {
972                    trace!("Circuit already existed created for {:?}", circs[i]);
973                }
974                Err(e) => {
975                    warn_report!(e, "Failed to build preemptive circuit {:?}", sv(&circs[i]));
976                    n_errors += 1;
977                }
978            }
979        }
980
981        if n_created > 0 || n_errors == 0 {
982            // Either we successfully made a circuit, or we didn't have any
983            // failures while looking for preexisting circuits.  Progress was
984            // made, so there's no need to back off.
985            Ok(())
986        } else {
987            // We didn't build any circuits and we hit at least one error:
988            // We'll call this unsuccessful.
989            Err(err::PreemptiveCircError)
990        }
991    }
992
993    /// Create and return a new (typically anonymous) onion circuit stem
994    /// of type `stem_kind`.
995    ///
996    /// If `circ_kind` is provided, we apply additional rules to make sure
997    /// that this will be usable as a stem for the given kind of onion service circuit.
998    /// Otherwise, we pick a stem that will probably be useful in general.
999    ///
1000    /// This circuit is guaranteed not to have been used for any traffic
1001    /// previously, and it will not be given out for any other requests in the
1002    /// future unless explicitly re-registered with a circuit manager.
1003    ///
1004    /// If `planned_target` is provided, then the circuit will be built so that
1005    /// it does not share any family members with the provided target.  (The
1006    /// circuit _will not be_ extended to that target itself!)
1007    ///
1008    /// Used to implement onion service clients and services.
1009    #[cfg(feature = "hs-common")]
1010    #[instrument(level = "trace", skip_all)]
1011    pub(crate) async fn launch_hs_unmanaged<T>(
1012        &self,
1013        planned_target: Option<T>,
1014        dir: &NetDir,
1015        stem_kind: HsCircStemKind,
1016        circ_kind: Option<HsCircKind>,
1017    ) -> Result<B::Tunnel>
1018    where
1019        T: IntoOwnedChanTarget,
1020    {
1021        let usage = TargetTunnelUsage::HsCircBase {
1022            compatible_with_target: planned_target.map(IntoOwnedChanTarget::to_owned),
1023            stem_kind,
1024            circ_kind,
1025        };
1026        let (_, client_circ) = self.mgr.launch_unmanaged(&usage, dir.into()).await?;
1027        Ok(client_circ)
1028    }
1029
1030    /// Return true if `netdir` has enough information to be used for this
1031    /// circuit manager.
1032    ///
1033    /// (This will check whether the netdir is missing any primary guard
1034    /// microdescriptors)
1035    pub(crate) fn netdir_is_sufficient(&self, netdir: &NetDir) -> bool {
1036        self.mgr
1037            .peek_builder()
1038            .guardmgr()
1039            .netdir_is_sufficient(netdir)
1040    }
1041
1042    /// Internal implementation for [`CircMgr::estimate_timeout`].
1043    pub(crate) fn estimate_timeout(
1044        &self,
1045        timeout_action: &timeouts::Action,
1046    ) -> std::time::Duration {
1047        let (timeout, _abandon) = self.mgr.peek_builder().estimator().timeouts(timeout_action);
1048        timeout
1049    }
1050
1051    /// Internal implementation for [`CircMgr::builder`].
1052    pub(crate) fn builder(&self) -> &B {
1053        self.mgr.peek_builder()
1054    }
1055
1056    /// Flush state to the state manager, if there is any unsaved state and
1057    /// we have the lock.
1058    ///
1059    /// Return true if we saved something; false if we didn't have the lock.
1060    pub(crate) fn store_persistent_state(&self) -> Result<bool> {
1061        self.mgr.peek_builder().save_state()
1062    }
1063
1064    /// Expire every circuit that has been dirty for too long.
1065    ///
1066    /// Expired circuits are not closed while they still have users,
1067    /// but they are no longer given out for new requests.
1068    async fn expire_circuits(&self) {
1069        // TODO: I would prefer not to call this at every request, but
1070        // it should be fine for now.  (At some point we may no longer
1071        // need this, or might not need to call it so often, now that
1072        // our circuit expiration runs on scheduled timers via
1073        // spawn_expiration_task.)
1074        let now = self.mgr.peek_runtime().now();
1075
1076        // TODO: Use return value here to implement the above TODO.
1077        let _next_expiration = self.mgr.expire_tunnels(now).await;
1078    }
1079
1080    /// Mark every circuit that we have launched so far as unsuitable for
1081    /// any future requests.  This won't close existing circuits that have
1082    /// streams attached to them, but it will prevent any future streams from
1083    /// being attached.
1084    ///
1085    /// TODO: we may want to expose this eventually.  If we do, we should
1086    /// be very clear that you don't want to use it haphazardly.
1087    pub(crate) fn retire_all_circuits(&self) {
1088        self.mgr.retire_all_tunnels();
1089    }
1090
1091    /// If `circ_id` is the unique identifier for a circuit that we're
1092    /// keeping track of, don't give it out for any future requests.
1093    pub(crate) fn retire_circ(&self, circ_id: &<B::Tunnel as AbstractTunnel>::Id) {
1094        let _ = self.mgr.take_tunnel(circ_id);
1095    }
1096
1097    /// Return a stream of events about our estimated clock skew; these events
1098    /// are `None` when we don't have enough information to make an estimate,
1099    /// and `Some(`[`SkewEstimate`]`)` otherwise.
1100    ///
1101    /// Note that this stream can be lossy: if the estimate changes more than
1102    /// one before you read from the stream, you might only get the most recent
1103    /// update.
1104    pub(crate) fn skew_events(&self) -> ClockSkewEvents {
1105        self.mgr.peek_builder().guardmgr().skew_events()
1106    }
1107
1108    /// Record that a failure occurred on a circuit with a given guard, in a way
1109    /// that makes us unwilling to use that guard for future circuits.
1110    ///
1111    pub(crate) fn note_external_failure(
1112        &self,
1113        target: &impl ChanTarget,
1114        external_failure: ExternalActivity,
1115    ) {
1116        self.mgr
1117            .peek_builder()
1118            .guardmgr()
1119            .note_external_failure(target, external_failure);
1120    }
1121
1122    /// Record that a success occurred on a circuit with a given guard, in a way
1123    /// that makes us possibly willing to use that guard for future circuits.
1124    pub(crate) fn note_external_success(
1125        &self,
1126        target: &impl ChanTarget,
1127        external_activity: ExternalActivity,
1128    ) {
1129        self.mgr
1130            .peek_builder()
1131            .guardmgr()
1132            .note_external_success(target, external_activity);
1133    }
1134}
1135
1136impl<B: AbstractTunnelBuilder<R> + 'static, R: Runtime> Drop for CircMgrInner<B, R> {
1137    fn drop(&mut self) {
1138        match self.store_persistent_state() {
1139            Ok(true) => info!("Flushed persistent state at exit."),
1140            Ok(false) => debug!("Lock not held; no state to flush."),
1141            Err(e) => error_report!(e, "Unable to flush state on circuit manager drop"),
1142        }
1143    }
1144}
1145
1146#[cfg(test)]
1147mod test {
1148    // @@ begin test lint list maintained by maint/add_warning @@
1149    #![allow(clippy::bool_assert_comparison)]
1150    #![allow(clippy::clone_on_copy)]
1151    #![allow(clippy::dbg_macro)]
1152    #![allow(clippy::mixed_attributes_style)]
1153    #![allow(clippy::print_stderr)]
1154    #![allow(clippy::print_stdout)]
1155    #![allow(clippy::single_char_pattern)]
1156    #![allow(clippy::unwrap_used)]
1157    #![allow(clippy::unchecked_time_subtraction)]
1158    #![allow(clippy::useless_vec)]
1159    #![allow(clippy::needless_pass_by_value)]
1160    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
1161    use mocks::FakeBuilder;
1162    use tor_guardmgr::GuardMgr;
1163    use tor_linkspec::OwnedChanTarget;
1164    use tor_netdir::testprovider::TestNetDirProvider;
1165    use tor_persist::TestingStateMgr;
1166
1167    use super::*;
1168
1169    #[test]
1170    fn get_params() {
1171        use tor_netdir::{MdReceiver, PartialNetDir};
1172        use tor_netdoc::doc::netstatus::NetParams;
1173        // If it's just fallbackdir, we get the default parameters.
1174        let fb = FallbackList::from([]);
1175        let di: DirInfo<'_> = (&fb).into();
1176
1177        let p1 = di.circ_params(&TargetTunnelUsage::Dir).unwrap();
1178        assert!(!p1.extend_by_ed25519_id);
1179
1180        // Now try with a directory and configured parameters.
1181        let (consensus, microdescs) = tor_netdir::testnet::construct_network().unwrap();
1182        let mut params = NetParams::default();
1183        params.set("circwindow".into(), 100);
1184        params.set("ExtendByEd25519ID".into(), 1);
1185        let mut dir = PartialNetDir::new(consensus, Some(&params));
1186        for m in microdescs {
1187            dir.add_microdesc(m);
1188        }
1189        let netdir = dir.unwrap_if_sufficient().unwrap();
1190        let di: DirInfo<'_> = (&netdir).into();
1191        let p2 = di.circ_params(&TargetTunnelUsage::Dir).unwrap();
1192        assert!(p2.extend_by_ed25519_id);
1193
1194        // Now try with a bogus circwindow value.
1195        let (consensus, microdescs) = tor_netdir::testnet::construct_network().unwrap();
1196        let mut params = NetParams::default();
1197        params.set("circwindow".into(), 100_000);
1198        params.set("ExtendByEd25519ID".into(), 1);
1199        let mut dir = PartialNetDir::new(consensus, Some(&params));
1200        for m in microdescs {
1201            dir.add_microdesc(m);
1202        }
1203        let netdir = dir.unwrap_if_sufficient().unwrap();
1204        let di: DirInfo<'_> = (&netdir).into();
1205        let p2 = di.circ_params(&TargetTunnelUsage::Dir).unwrap();
1206        assert!(p2.extend_by_ed25519_id);
1207    }
1208
1209    fn make_circmgr<R: Runtime>(runtime: R) -> Arc<CircMgrInner<FakeBuilder<R>, R>> {
1210        let config = crate::config::test_config::TestConfig::default();
1211        let statemgr = TestingStateMgr::new();
1212        let guardmgr =
1213            GuardMgr::new(runtime.clone(), statemgr.clone(), &config).expect("Create GuardMgr");
1214        let builder = FakeBuilder::new(
1215            &runtime,
1216            statemgr.clone(),
1217            &tor_guardmgr::TestConfig::default(),
1218        );
1219        let circmgr = Arc::new(CircMgrInner::new_generic(
1220            &config, &runtime, &guardmgr, builder,
1221        ));
1222        let netdir = Arc::new(TestNetDirProvider::new());
1223        CircMgrInner::launch_background_tasks(&circmgr, &runtime, &netdir, statemgr)
1224            .expect("launch CircMgrInner background tasks");
1225        circmgr
1226    }
1227
1228    #[test]
1229    #[cfg(feature = "hs-common")]
1230    fn test_launch_hs_unmanaged() {
1231        tor_rtmock::MockRuntime::test_with_various(|runtime| async move {
1232            let circmgr = make_circmgr(runtime.clone());
1233            let netdir = tor_netdir::testnet::construct_netdir()
1234                .unwrap_if_sufficient()
1235                .unwrap();
1236
1237            let (ret_tx, ret_rx) = tor_async_utils::oneshot::channel();
1238            runtime.spawn_identified("launch_hs_unamanged", async move {
1239                ret_tx
1240                    .send(
1241                        circmgr
1242                            .launch_hs_unmanaged::<OwnedChanTarget>(
1243                                None,
1244                                &netdir,
1245                                HsCircStemKind::Naive,
1246                                None,
1247                            )
1248                            .await,
1249                    )
1250                    .unwrap();
1251            });
1252            runtime.advance_by(Duration::from_millis(60)).await;
1253            ret_rx.await.unwrap().unwrap();
1254        });
1255    }
1256}