1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![doc = include_str!("../README.md")]
3#![allow(renamed_and_removed_lints)] #![allow(unknown_lints)] #![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_time_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] #![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] #![allow(clippy::result_large_err)] #![allow(clippy::needless_raw_string_hashes)] #![allow(clippy::needless_lifetimes)] #![allow(mismatched_lifetime_syntaxes)] #![cfg_attr(not(all(feature = "full", feature = "experimental")), allow(unused))]
49
50use build::TunnelBuilder;
51use mgr::{AbstractTunnel, AbstractTunnelBuilder};
52use tor_basic_utils::retry::RetryDelay;
53use tor_chanmgr::ChanMgr;
54use tor_dircommon::fallback::FallbackList;
55use tor_error::{error_report, warn_report};
56use tor_guardmgr::RetireCircuits;
57use tor_linkspec::ChanTarget;
58use tor_netdir::{DirEvent, NetDir, NetDirProvider, Timeliness};
59use tor_proto::circuit::UniqId;
60use tor_proto::client::circuit::CircParameters;
61use tor_rtcompat::Runtime;
62
63#[cfg(any(feature = "specific-relay", feature = "hs-common"))]
64use tor_linkspec::IntoOwnedChanTarget;
65
66use futures::StreamExt;
67use std::sync::{Arc, Mutex, Weak};
68use std::time::{Duration, Instant};
69use tor_rtcompat::SpawnExt;
70use tracing::{debug, info, instrument, trace, warn};
71
72#[cfg(feature = "testing")]
73pub use config::test_config::TestConfig;
74
75pub mod build;
76mod config;
77mod err;
78#[cfg(feature = "hs-common")]
79pub mod hspool;
80mod impls;
81pub mod isolation;
82mod mgr;
83#[cfg(test)]
84mod mocks;
85mod preemptive;
86pub mod timeouts;
87mod tunnel;
88mod usage;
89
90cfg_if::cfg_if! {
92 if #[cfg(feature = "experimental-api")] {
93 pub mod path;
94 } else {
95 pub(crate) mod path;
96 }
97}
98
99pub use err::Error;
100pub use isolation::IsolationToken;
101pub use tor_guardmgr::{ClockSkewEvents, GuardMgrConfig, SkewEstimate};
102pub use tunnel::{
103 ClientDataTunnel, ClientDirTunnel, ClientOnionServiceDataTunnel, ClientOnionServiceDirTunnel,
104 ClientOnionServiceIntroTunnel, ServiceOnionServiceDataTunnel, ServiceOnionServiceDirTunnel,
105 ServiceOnionServiceIntroTunnel,
106};
107#[cfg(feature = "conflux")]
108pub use tunnel::{
109 ClientMultiPathDataTunnel, ClientMultiPathOnionServiceDataTunnel,
110 ServiceMultiPathOnionServiceDataTunnel,
111};
112pub use usage::{TargetPort, TargetPorts};
113
114pub use config::{
115 CircMgrConfig, CircuitTiming, CircuitTimingBuilder, PathConfig, PathConfigBuilder,
116 PreemptiveCircuitConfig, PreemptiveCircuitConfigBuilder,
117};
118
119use crate::isolation::StreamIsolation;
120use crate::mgr::TunnelProvenance;
121use crate::preemptive::PreemptiveCircuitPredictor;
122use usage::TargetTunnelUsage;
123
124use safelog::sensitive as sv;
125#[cfg(feature = "geoip")]
126use tor_geoip::CountryCode;
127pub use tor_guardmgr::{ExternalActivity, FirstHopId};
128use tor_persist::StateMgr;
129use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
130
131#[cfg(feature = "hs-common")]
132use crate::hspool::{HsCircKind, HsCircStemKind};
133#[cfg(all(feature = "vanguards", feature = "hs-common"))]
134use tor_guardmgr::vanguards::VanguardMgr;
135
136pub type Result<T> = std::result::Result<T, Error>;
138
139type TimeoutStateHandle = tor_persist::DynStorageHandle<timeouts::pareto::ParetoTimeoutState>;
141
142const PARETO_TIMEOUT_DATA_KEY: &str = "circuit_timeouts";
144
145#[derive(Debug, Copy, Clone)]
153#[non_exhaustive]
154pub enum DirInfo<'a> {
155 Fallbacks(&'a FallbackList),
157 Directory(&'a NetDir),
159 Nothing,
162}
163
164impl<'a> From<&'a FallbackList> for DirInfo<'a> {
165 fn from(v: &'a FallbackList) -> DirInfo<'a> {
166 DirInfo::Fallbacks(v)
167 }
168}
169impl<'a> From<&'a NetDir> for DirInfo<'a> {
170 fn from(v: &'a NetDir) -> DirInfo<'a> {
171 DirInfo::Directory(v)
172 }
173}
174impl<'a> DirInfo<'a> {
175 fn circ_params(&self, usage: &TargetTunnelUsage) -> Result<CircParameters> {
177 use tor_netdir::params::NetParameters;
178 let defaults = NetParameters::default();
181 let net_params = match self {
182 DirInfo::Directory(d) => d.params(),
183 _ => &defaults,
184 };
185 match usage {
186 #[cfg(feature = "hs-common")]
187 TargetTunnelUsage::HsCircBase { .. } => {
188 build::onion_circparams_from_netparams(net_params)
189 }
190 _ => build::exit_circparams_from_netparams(net_params),
191 }
192 }
193}
194
195pub struct CircMgr<R: Runtime>(Arc<CircMgrInner<build::TunnelBuilder<R>, R>>);
205
206impl<R: Runtime> CircMgr<R> {
207 pub fn new<SM, CFG: CircMgrConfig>(
213 config: &CFG,
214 storage: SM,
215 runtime: &R,
216 chanmgr: Arc<ChanMgr<R>>,
217 guardmgr: &tor_guardmgr::GuardMgr<R>,
218 ) -> Result<Self>
219 where
220 SM: tor_persist::StateMgr + Clone + Send + Sync + 'static,
221 {
222 Ok(Self(Arc::new(CircMgrInner::new(
223 config, storage, runtime, chanmgr, guardmgr,
224 )?)))
225 }
226
227 #[instrument(level = "trace", skip_all)]
230 pub async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result<ClientDirTunnel> {
231 let tunnel = self.0.get_or_launch_dir(netdir).await?;
232 Ok(tunnel.into())
233 }
234
235 #[instrument(level = "trace", skip_all)]
241 pub async fn get_or_launch_exit(
242 &self,
243 netdir: DirInfo<'_>, ports: &[TargetPort],
245 isolation: StreamIsolation,
246 #[cfg(feature = "geoip")] country_code: Option<CountryCode>,
249 ) -> Result<ClientDataTunnel> {
250 let tunnel = self
251 .0
252 .get_or_launch_exit(
253 netdir,
254 ports,
255 isolation,
256 #[cfg(feature = "geoip")]
257 country_code,
258 )
259 .await?;
260 Ok(tunnel.into())
261 }
262
263 #[cfg(feature = "specific-relay")]
268 #[instrument(level = "trace", skip_all)]
269 pub async fn get_or_launch_dir_specific<T: IntoOwnedChanTarget>(
270 &self,
271 target: T,
272 ) -> Result<ClientDirTunnel> {
273 let tunnel = self.0.get_or_launch_dir_specific(target).await?;
274 Ok(tunnel.into())
275 }
276
277 #[instrument(level = "trace", skip_all)]
283 pub fn launch_background_tasks<D, S>(
284 self: &Arc<Self>,
285 runtime: &R,
286 dir_provider: &Arc<D>,
287 state_mgr: S,
288 ) -> Result<Vec<TaskHandle>>
289 where
290 D: NetDirProvider + 'static + ?Sized,
291 S: StateMgr + std::marker::Send + 'static,
292 {
293 CircMgrInner::launch_background_tasks(&self.0.clone(), runtime, dir_provider, state_mgr)
294 }
295
296 pub fn netdir_is_sufficient(&self, netdir: &NetDir) -> bool {
302 self.0.netdir_is_sufficient(netdir)
303 }
304
305 pub fn retire_circ(&self, circ_id: &UniqId) {
308 self.0.retire_circ(circ_id);
309 }
310
311 pub fn note_external_failure(
315 &self,
316 target: &impl ChanTarget,
317 external_failure: ExternalActivity,
318 ) {
319 self.0.note_external_failure(target, external_failure);
320 }
321
322 pub fn note_external_success(
325 &self,
326 target: &impl ChanTarget,
327 external_activity: ExternalActivity,
328 ) {
329 self.0.note_external_success(target, external_activity);
330 }
331
332 pub fn skew_events(&self) -> ClockSkewEvents {
340 self.0.skew_events()
341 }
342
343 #[instrument(level = "trace", skip_all)]
349 pub fn reconfigure<CFG: CircMgrConfig>(
350 &self,
351 new_config: &CFG,
352 how: tor_config::Reconfigure,
353 ) -> std::result::Result<RetireCircuits, tor_config::ReconfigureError> {
354 self.0.reconfigure(new_config, how)
355 }
356
357 pub fn estimate_timeout(&self, timeout_action: &timeouts::Action) -> std::time::Duration {
383 self.0.estimate_timeout(timeout_action)
384 }
385
386 #[cfg(feature = "experimental-api")]
389 pub fn builder(&self) -> &TunnelBuilder<R> {
390 CircMgrInner::builder(&self.0)
391 }
392}
393
394#[derive(Clone)]
396pub(crate) struct CircMgrInner<B: AbstractTunnelBuilder<R> + 'static, R: Runtime> {
397 mgr: Arc<mgr::AbstractTunnelMgr<B, R>>,
399 predictor: Arc<Mutex<PreemptiveCircuitPredictor>>,
401}
402
403impl<R: Runtime> CircMgrInner<TunnelBuilder<R>, R> {
404 #[allow(clippy::unnecessary_wraps)]
410 pub(crate) fn new<SM, CFG: CircMgrConfig>(
411 config: &CFG,
412 storage: SM,
413 runtime: &R,
414 chanmgr: Arc<ChanMgr<R>>,
415 guardmgr: &tor_guardmgr::GuardMgr<R>,
416 ) -> Result<Self>
417 where
418 SM: tor_persist::StateMgr + Clone + Send + Sync + 'static,
419 {
420 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
421 let vanguardmgr = {
422 let has_onion_svc = false;
427 VanguardMgr::new(
428 config.vanguard_config(),
429 runtime.clone(),
430 storage.clone(),
431 has_onion_svc,
432 )?
433 };
434
435 let storage_handle = storage.create_handle(PARETO_TIMEOUT_DATA_KEY);
436
437 let builder = build::TunnelBuilder::new(
438 runtime.clone(),
439 chanmgr,
440 config.path_rules().clone(),
441 storage_handle,
442 guardmgr.clone(),
443 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
444 vanguardmgr,
445 );
446
447 Ok(Self::new_generic(config, runtime, guardmgr, builder))
448 }
449}
450
451impl<B: AbstractTunnelBuilder<R> + 'static, R: Runtime> CircMgrInner<B, R> {
452 pub(crate) fn new_generic<CFG: CircMgrConfig>(
454 config: &CFG,
455 runtime: &R,
456 guardmgr: &tor_guardmgr::GuardMgr<R>,
457 builder: B,
458 ) -> Self {
459 let preemptive = Arc::new(Mutex::new(PreemptiveCircuitPredictor::new(
460 config.preemptive_circuits().clone(),
461 )));
462
463 guardmgr.set_filter(config.path_rules().build_guard_filter());
464
465 let mgr =
466 mgr::AbstractTunnelMgr::new(builder, runtime.clone(), config.circuit_timing().clone());
467
468 CircMgrInner {
469 mgr: Arc::new(mgr),
470 predictor: preemptive,
471 }
472 }
473
474 #[instrument(level = "trace", skip_all)]
480 pub(crate) fn launch_background_tasks<D, S>(
481 self: &Arc<Self>,
482 runtime: &R,
483 dir_provider: &Arc<D>,
484 state_mgr: S,
485 ) -> Result<Vec<TaskHandle>>
486 where
487 D: NetDirProvider + 'static + ?Sized,
488 S: StateMgr + std::marker::Send + 'static,
489 {
490 let mut ret = vec![];
491
492 runtime
493 .spawn(Self::keep_circmgr_params_updated(
494 dir_provider.events(),
495 Arc::downgrade(self),
496 Arc::downgrade(dir_provider),
497 ))
498 .map_err(|e| Error::from_spawn("circmgr parameter updater", e))?;
499
500 let (sched, handle) = TaskSchedule::new(runtime.clone());
501 ret.push(handle);
502
503 runtime
504 .spawn(Self::update_persistent_state(
505 sched,
506 Arc::downgrade(self),
507 state_mgr,
508 ))
509 .map_err(|e| Error::from_spawn("persistent state updater", e))?;
510
511 let (sched, handle) = TaskSchedule::new(runtime.clone());
512 ret.push(handle);
513
514 runtime
515 .spawn(Self::continually_launch_timeout_testing_circuits(
516 sched,
517 Arc::downgrade(self),
518 Arc::downgrade(dir_provider),
519 ))
520 .map_err(|e| Error::from_spawn("timeout-probe circuit launcher", e))?;
521
522 let (sched, handle) = TaskSchedule::new(runtime.clone());
523 ret.push(handle);
524
525 runtime
526 .spawn(Self::continually_preemptively_build_circuits(
527 sched,
528 Arc::downgrade(self),
529 Arc::downgrade(dir_provider),
530 ))
531 .map_err(|e| Error::from_spawn("preemptive circuit launcher", e))?;
532
533 self.mgr
534 .peek_builder()
535 .guardmgr()
536 .install_netdir_provider(&dir_provider.clone().upcast_arc())?;
537
538 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
539 {
540 let () = self
541 .mgr
542 .peek_builder()
543 .vanguardmgr()
544 .launch_background_tasks(&dir_provider.clone().upcast_arc())?;
545 }
546
547 Ok(ret)
548 }
549
550 #[instrument(level = "trace", skip_all)]
553 pub(crate) async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result<Arc<B::Tunnel>> {
554 self.expire_circuits().await;
555 let usage = TargetTunnelUsage::Dir;
556 self.mgr.get_or_launch(&usage, netdir).await.map(|(c, _)| c)
557 }
558
559 #[instrument(level = "trace", skip_all)]
565 pub(crate) async fn get_or_launch_exit(
566 &self,
567 netdir: DirInfo<'_>, ports: &[TargetPort],
569 isolation: StreamIsolation,
570 #[cfg(feature = "geoip")] country_code: Option<CountryCode>,
573 ) -> Result<Arc<B::Tunnel>> {
574 self.expire_circuits().await;
575 let time = Instant::now();
576 {
577 let mut predictive = self.predictor.lock().expect("preemptive lock poisoned");
578 if ports.is_empty() {
579 predictive.note_usage(None, time);
580 } else {
581 for port in ports.iter() {
582 predictive.note_usage(Some(*port), time);
583 }
584 }
585 }
586 let require_stability = ports.iter().any(|p| {
587 self.mgr
588 .peek_builder()
589 .path_config()
590 .long_lived_ports
591 .contains(&p.port)
592 });
593 let ports = ports.iter().map(Clone::clone).collect();
594 #[cfg(not(feature = "geoip"))]
595 let country_code = None;
596 let usage = TargetTunnelUsage::Exit {
597 ports,
598 isolation,
599 country_code,
600 require_stability,
601 };
602 self.mgr.get_or_launch(&usage, netdir).await.map(|(c, _)| c)
603 }
604
605 #[cfg(feature = "specific-relay")]
610 #[instrument(level = "trace", skip_all)]
611 pub(crate) async fn get_or_launch_dir_specific<T: IntoOwnedChanTarget>(
612 &self,
613 target: T,
614 ) -> Result<Arc<B::Tunnel>> {
615 self.expire_circuits().await;
616 let usage = TargetTunnelUsage::DirSpecificTarget(target.to_owned());
617 self.mgr
618 .get_or_launch(&usage, DirInfo::Nothing)
619 .await
620 .map(|(c, _)| c)
621 }
622
623 #[instrument(level = "trace", skip_all)]
629 pub(crate) fn reconfigure<CFG: CircMgrConfig>(
630 &self,
631 new_config: &CFG,
632 how: tor_config::Reconfigure,
633 ) -> std::result::Result<RetireCircuits, tor_config::ReconfigureError> {
634 let old_path_rules = self.mgr.peek_builder().path_config();
635 let predictor = self.predictor.lock().expect("poisoned lock");
636 let preemptive_circuits = predictor.config();
637 if preemptive_circuits.initial_predicted_ports
638 != new_config.preemptive_circuits().initial_predicted_ports
639 {
640 how.cannot_change("preemptive_circuits.initial_predicted_ports")?;
642 }
643
644 if how == tor_config::Reconfigure::CheckAllOrNothing {
645 return Ok(RetireCircuits::None);
646 }
647
648 let retire_because_of_guardmgr =
649 self.mgr.peek_builder().guardmgr().reconfigure(new_config)?;
650
651 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
652 let retire_because_of_vanguardmgr = self
653 .mgr
654 .peek_builder()
655 .vanguardmgr()
656 .reconfigure(new_config.vanguard_config())?;
657
658 let new_reachable = &new_config.path_rules().reachable_addrs;
659 if new_reachable != &old_path_rules.reachable_addrs {
660 let filter = new_config.path_rules().build_guard_filter();
661 self.mgr.peek_builder().guardmgr().set_filter(filter);
662 }
663
664 let discard_all_circuits = !new_config
665 .path_rules()
666 .at_least_as_permissive_as(&old_path_rules)
667 || retire_because_of_guardmgr != tor_guardmgr::RetireCircuits::None;
668
669 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
670 let discard_all_circuits = discard_all_circuits
671 || retire_because_of_vanguardmgr != tor_guardmgr::RetireCircuits::None;
672
673 self.mgr
674 .peek_builder()
675 .set_path_config(new_config.path_rules().clone());
676 self.mgr
677 .set_circuit_timing(new_config.circuit_timing().clone());
678 predictor.set_config(new_config.preemptive_circuits().clone());
679
680 if discard_all_circuits {
681 info!("Path configuration has become more restrictive: retiring existing circuits.");
685 self.retire_all_circuits();
686 return Ok(RetireCircuits::All);
687 }
688 Ok(RetireCircuits::None)
689 }
690
691 #[instrument(level = "trace", skip_all)]
699 async fn keep_circmgr_params_updated<D>(
700 mut events: impl futures::Stream<Item = DirEvent> + Unpin,
701 circmgr: Weak<Self>,
702 dirmgr: Weak<D>,
703 ) where
704 D: NetDirProvider + 'static + ?Sized,
705 {
706 use DirEvent::*;
707 while let Some(event) = events.next().await {
708 if matches!(event, NewConsensus) {
709 if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
710 if let Ok(netdir) = dm.netdir(Timeliness::Timely) {
711 cm.update_network_parameters(netdir.params());
712 }
713 } else {
714 debug!("Circmgr or dirmgr has disappeared; task exiting.");
715 break;
716 }
717 }
718 }
719 }
720
721 fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters) {
724 self.mgr.update_network_parameters(p);
725 self.mgr.peek_builder().update_network_parameters(p);
726 }
727
728 #[instrument(level = "trace", skip_all)]
735 async fn continually_launch_timeout_testing_circuits<D>(
736 mut sched: TaskSchedule<R>,
737 circmgr: Weak<Self>,
738 dirmgr: Weak<D>,
739 ) where
740 D: NetDirProvider + 'static + ?Sized,
741 {
742 while sched.next().await.is_some() {
743 if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
744 if let Ok(netdir) = dm.netdir(Timeliness::Unchecked) {
745 if let Err(e) = cm
746 .launch_timeout_testing_circuit_if_appropriate(&netdir)
747 .await
748 {
749 warn_report!(e, "Problem launching a timeout testing circuit");
750 }
751 let delay = netdir
752 .params()
753 .cbt_testing_delay
754 .try_into()
755 .expect("Out-of-bounds value from BoundedInt32");
756
757 drop((cm, dm));
758 sched.fire_in(delay);
759 } else {
760 let _ = dm.events().next().await;
763 sched.fire();
764 }
765 } else {
766 return;
767 }
768 }
769 }
770
771 async fn launch_timeout_testing_circuit_if_appropriate(&self, netdir: &NetDir) -> Result<()> {
779 if !self.mgr.peek_builder().learning_timeouts() {
780 return Ok(());
781 }
782 self.expire_circuits().await;
785 let max_circs: u64 = netdir
786 .params()
787 .cbt_max_open_circuits_for_testing
788 .try_into()
789 .expect("Out-of-bounds result from BoundedInt32");
790 if (self.mgr.n_tunnels() as u64) < max_circs {
791 let usage = TargetTunnelUsage::TimeoutTesting;
793 let dirinfo = netdir.into();
794 let mgr = Arc::clone(&self.mgr);
795 debug!("Launching a circuit to test build times.");
796 let receiver = mgr.launch_by_usage(&usage, dirinfo)?;
797 drop(receiver);
800 }
801
802 Ok(())
803 }
804
805 #[allow(clippy::cognitive_complexity)] #[instrument(level = "trace", skip_all)]
813 async fn update_persistent_state<S>(
814 mut sched: TaskSchedule<R>,
815 circmgr: Weak<Self>,
816 statemgr: S,
817 ) where
818 S: StateMgr + std::marker::Send,
819 {
820 while sched.next().await.is_some() {
821 if let Some(circmgr) = Weak::upgrade(&circmgr) {
822 use tor_persist::LockStatus::*;
823
824 match statemgr.try_lock() {
825 Err(e) => {
826 error_report!(e, "Problem with state lock file");
827 break;
828 }
829 Ok(NewlyAcquired) => {
830 info!("We now own the lock on our state files.");
831 if let Err(e) = circmgr.upgrade_to_owned_persistent_state() {
832 error_report!(e, "Unable to upgrade to owned state files");
833 break;
834 }
835 }
836 Ok(AlreadyHeld) => {
837 if let Err(e) = circmgr.store_persistent_state() {
838 error_report!(e, "Unable to flush circmgr state");
839 break;
840 }
841 }
842 Ok(NoLock) => {
843 if let Err(e) = circmgr.reload_persistent_state() {
844 error_report!(e, "Unable to reload circmgr state");
845 break;
846 }
847 }
848 }
849 } else {
850 debug!("Circmgr has disappeared; task exiting.");
851 return;
852 }
853 sched.fire_in(Duration::from_secs(60));
860 }
861
862 debug!("State update task exiting (potentially due to handle drop).");
863 }
864
865 pub(crate) fn upgrade_to_owned_persistent_state(&self) -> Result<()> {
869 self.mgr.peek_builder().upgrade_to_owned_state()?;
870 Ok(())
871 }
872
873 pub(crate) fn reload_persistent_state(&self) -> Result<()> {
878 self.mgr.peek_builder().reload_state()?;
879 Ok(())
880 }
881
882 #[instrument(level = "trace", skip_all)]
894 async fn continually_preemptively_build_circuits<D>(
895 mut sched: TaskSchedule<R>,
896 circmgr: Weak<Self>,
897 dirmgr: Weak<D>,
898 ) where
899 D: NetDirProvider + 'static + ?Sized,
900 {
901 let base_delay = Duration::from_secs(10);
902 let mut retry = RetryDelay::from_duration(base_delay);
903
904 while sched.next().await.is_some() {
905 if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
906 if let Ok(netdir) = dm.netdir(Timeliness::Timely) {
907 let result = cm
908 .launch_circuits_preemptively(DirInfo::Directory(&netdir))
909 .await;
910
911 let delay = match result {
912 Ok(()) => {
913 retry.reset();
914 base_delay
915 }
916 Err(_) => retry.next_delay(&mut rand::rng()),
917 };
918
919 sched.fire_in(delay);
920 } else {
921 let _ = dm.events().next().await;
924 sched.fire();
925 }
926 } else {
927 return;
928 }
929 }
930 }
931
932 #[allow(clippy::cognitive_complexity)]
940 #[instrument(level = "trace", skip_all)]
941 async fn launch_circuits_preemptively(
942 &self,
943 netdir: DirInfo<'_>,
944 ) -> std::result::Result<(), err::PreemptiveCircError> {
945 trace!("Checking preemptive circuit predictions.");
946 let (circs, threshold) = {
947 let path_config = self.mgr.peek_builder().path_config();
948 let preemptive = self.predictor.lock().expect("preemptive lock poisoned");
949 let threshold = preemptive.config().disable_at_threshold;
950 (preemptive.predict(&path_config), threshold)
951 };
952
953 if self.mgr.n_tunnels() >= threshold {
954 return Ok(());
955 }
956 let mut n_created = 0_usize;
957 let mut n_errors = 0_usize;
958
959 let futures = circs
960 .iter()
961 .map(|usage| self.mgr.get_or_launch(usage, netdir));
962 let results = futures::future::join_all(futures).await;
963 for (i, result) in results.into_iter().enumerate() {
964 match result {
965 Ok((_, TunnelProvenance::NewlyCreated)) => {
966 debug!("Preeemptive circuit was created for {:?}", circs[i]);
967 n_created += 1;
968 }
969 Ok((_, TunnelProvenance::Preexisting)) => {
970 trace!("Circuit already existed created for {:?}", circs[i]);
971 }
972 Err(e) => {
973 warn_report!(e, "Failed to build preemptive circuit {:?}", sv(&circs[i]));
974 n_errors += 1;
975 }
976 }
977 }
978
979 if n_created > 0 || n_errors == 0 {
980 Ok(())
984 } else {
985 Err(err::PreemptiveCircError)
988 }
989 }
990
991 #[cfg(feature = "hs-common")]
1008 #[instrument(level = "trace", skip_all)]
1009 pub(crate) async fn launch_hs_unmanaged<T>(
1010 &self,
1011 planned_target: Option<T>,
1012 dir: &NetDir,
1013 stem_kind: HsCircStemKind,
1014 circ_kind: Option<HsCircKind>,
1015 ) -> Result<B::Tunnel>
1016 where
1017 T: IntoOwnedChanTarget,
1018 {
1019 let usage = TargetTunnelUsage::HsCircBase {
1020 compatible_with_target: planned_target.map(IntoOwnedChanTarget::to_owned),
1021 stem_kind,
1022 circ_kind,
1023 };
1024 let (_, client_circ) = self.mgr.launch_unmanaged(&usage, dir.into()).await?;
1025 Ok(client_circ)
1026 }
1027
1028 pub(crate) fn netdir_is_sufficient(&self, netdir: &NetDir) -> bool {
1034 self.mgr
1035 .peek_builder()
1036 .guardmgr()
1037 .netdir_is_sufficient(netdir)
1038 }
1039
1040 pub(crate) fn estimate_timeout(
1042 &self,
1043 timeout_action: &timeouts::Action,
1044 ) -> std::time::Duration {
1045 let (timeout, _abandon) = self.mgr.peek_builder().estimator().timeouts(timeout_action);
1046 timeout
1047 }
1048
1049 pub(crate) fn builder(&self) -> &B {
1051 self.mgr.peek_builder()
1052 }
1053
1054 pub(crate) fn store_persistent_state(&self) -> Result<bool> {
1059 self.mgr.peek_builder().save_state()
1060 }
1061
1062 async fn expire_circuits(&self) {
1067 let now = self.mgr.peek_runtime().now();
1073
1074 let _next_expiration = self.mgr.expire_tunnels(now).await;
1076 }
1077
1078 pub(crate) fn retire_all_circuits(&self) {
1086 self.mgr.retire_all_tunnels();
1087 }
1088
1089 pub(crate) fn retire_circ(&self, circ_id: &<B::Tunnel as AbstractTunnel>::Id) {
1092 let _ = self.mgr.take_tunnel(circ_id);
1093 }
1094
1095 pub(crate) fn skew_events(&self) -> ClockSkewEvents {
1103 self.mgr.peek_builder().guardmgr().skew_events()
1104 }
1105
1106 pub(crate) fn note_external_failure(
1110 &self,
1111 target: &impl ChanTarget,
1112 external_failure: ExternalActivity,
1113 ) {
1114 self.mgr
1115 .peek_builder()
1116 .guardmgr()
1117 .note_external_failure(target, external_failure);
1118 }
1119
1120 pub(crate) fn note_external_success(
1123 &self,
1124 target: &impl ChanTarget,
1125 external_activity: ExternalActivity,
1126 ) {
1127 self.mgr
1128 .peek_builder()
1129 .guardmgr()
1130 .note_external_success(target, external_activity);
1131 }
1132}
1133
1134impl<B: AbstractTunnelBuilder<R> + 'static, R: Runtime> Drop for CircMgrInner<B, R> {
1135 fn drop(&mut self) {
1136 match self.store_persistent_state() {
1137 Ok(true) => info!("Flushed persistent state at exit."),
1138 Ok(false) => debug!("Lock not held; no state to flush."),
1139 Err(e) => error_report!(e, "Unable to flush state on circuit manager drop"),
1140 }
1141 }
1142}
1143
1144#[cfg(test)]
1145mod test {
1146 #![allow(clippy::bool_assert_comparison)]
1148 #![allow(clippy::clone_on_copy)]
1149 #![allow(clippy::dbg_macro)]
1150 #![allow(clippy::mixed_attributes_style)]
1151 #![allow(clippy::print_stderr)]
1152 #![allow(clippy::print_stdout)]
1153 #![allow(clippy::single_char_pattern)]
1154 #![allow(clippy::unwrap_used)]
1155 #![allow(clippy::unchecked_time_subtraction)]
1156 #![allow(clippy::useless_vec)]
1157 #![allow(clippy::needless_pass_by_value)]
1158 use mocks::FakeBuilder;
1160 use tor_guardmgr::GuardMgr;
1161 use tor_linkspec::OwnedChanTarget;
1162 use tor_netdir::testprovider::TestNetDirProvider;
1163 use tor_persist::TestingStateMgr;
1164
1165 use super::*;
1166
1167 #[test]
1168 fn get_params() {
1169 use tor_netdir::{MdReceiver, PartialNetDir};
1170 use tor_netdoc::doc::netstatus::NetParams;
1171 let fb = FallbackList::from([]);
1173 let di: DirInfo<'_> = (&fb).into();
1174
1175 let p1 = di.circ_params(&TargetTunnelUsage::Dir).unwrap();
1176 assert!(!p1.extend_by_ed25519_id);
1177
1178 let (consensus, microdescs) = tor_netdir::testnet::construct_network().unwrap();
1180 let mut params = NetParams::default();
1181 params.set("circwindow".into(), 100);
1182 params.set("ExtendByEd25519ID".into(), 1);
1183 let mut dir = PartialNetDir::new(consensus, Some(¶ms));
1184 for m in microdescs {
1185 dir.add_microdesc(m);
1186 }
1187 let netdir = dir.unwrap_if_sufficient().unwrap();
1188 let di: DirInfo<'_> = (&netdir).into();
1189 let p2 = di.circ_params(&TargetTunnelUsage::Dir).unwrap();
1190 assert!(p2.extend_by_ed25519_id);
1191
1192 let (consensus, microdescs) = tor_netdir::testnet::construct_network().unwrap();
1194 let mut params = NetParams::default();
1195 params.set("circwindow".into(), 100_000);
1196 params.set("ExtendByEd25519ID".into(), 1);
1197 let mut dir = PartialNetDir::new(consensus, Some(¶ms));
1198 for m in microdescs {
1199 dir.add_microdesc(m);
1200 }
1201 let netdir = dir.unwrap_if_sufficient().unwrap();
1202 let di: DirInfo<'_> = (&netdir).into();
1203 let p2 = di.circ_params(&TargetTunnelUsage::Dir).unwrap();
1204 assert!(p2.extend_by_ed25519_id);
1205 }
1206
1207 fn make_circmgr<R: Runtime>(runtime: R) -> Arc<CircMgrInner<FakeBuilder<R>, R>> {
1208 let config = crate::config::test_config::TestConfig::default();
1209 let statemgr = TestingStateMgr::new();
1210 let guardmgr =
1211 GuardMgr::new(runtime.clone(), statemgr.clone(), &config).expect("Create GuardMgr");
1212 let builder = FakeBuilder::new(
1213 &runtime,
1214 statemgr.clone(),
1215 &tor_guardmgr::TestConfig::default(),
1216 );
1217 let circmgr = Arc::new(CircMgrInner::new_generic(
1218 &config, &runtime, &guardmgr, builder,
1219 ));
1220 let netdir = Arc::new(TestNetDirProvider::new());
1221 CircMgrInner::launch_background_tasks(&circmgr, &runtime, &netdir, statemgr)
1222 .expect("launch CircMgrInner background tasks");
1223 circmgr
1224 }
1225
1226 #[test]
1227 #[cfg(feature = "hs-common")]
1228 fn test_launch_hs_unmanaged() {
1229 tor_rtmock::MockRuntime::test_with_various(|runtime| async move {
1230 let circmgr = make_circmgr(runtime.clone());
1231 let netdir = tor_netdir::testnet::construct_netdir()
1232 .unwrap_if_sufficient()
1233 .unwrap();
1234
1235 let (ret_tx, ret_rx) = tor_async_utils::oneshot::channel();
1236 runtime.spawn_identified("launch_hs_unamanged", async move {
1237 ret_tx
1238 .send(
1239 circmgr
1240 .launch_hs_unmanaged::<OwnedChanTarget>(
1241 None,
1242 &netdir,
1243 HsCircStemKind::Naive,
1244 None,
1245 )
1246 .await,
1247 )
1248 .unwrap();
1249 });
1250 runtime.advance_by(Duration::from_millis(60)).await;
1251 ret_rx.await.unwrap().unwrap();
1252 });
1253 }
1254}