1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![doc = include_str!("../README.md")]
3#![allow(renamed_and_removed_lints)] #![allow(unknown_lints)] #![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_time_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] #![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] #![allow(clippy::result_large_err)] #![allow(clippy::needless_raw_string_hashes)] #![allow(clippy::needless_lifetimes)] #![allow(mismatched_lifetime_syntaxes)] #![allow(clippy::collapsible_if)] #![deny(clippy::unused_async)]
47#![cfg_attr(not(all(feature = "full", feature = "experimental")), allow(unused))]
51
52use build::TunnelBuilder;
53use mgr::{AbstractTunnel, AbstractTunnelBuilder};
54use tor_basic_utils::retry::RetryDelay;
55use tor_chanmgr::ChanMgr;
56use tor_dircommon::fallback::FallbackList;
57use tor_error::{error_report, warn_report};
58use tor_guardmgr::RetireCircuits;
59use tor_linkspec::ChanTarget;
60use tor_netdir::{DirEvent, NetDir, NetDirProvider, Timeliness};
61use tor_proto::circuit::UniqId;
62use tor_proto::client::circuit::CircParameters;
63use tor_rtcompat::Runtime;
64
65#[cfg(any(feature = "specific-relay", feature = "hs-common"))]
66use tor_linkspec::IntoOwnedChanTarget;
67
68use futures::StreamExt;
69use std::sync::{Arc, Mutex, Weak};
70use std::time::{Duration, Instant};
71use tor_rtcompat::SpawnExt;
72use tracing::{debug, info, instrument, trace, warn};
73
74#[cfg(feature = "testing")]
75pub use config::test_config::TestConfig;
76
77pub mod build;
78mod config;
79mod err;
80#[cfg(feature = "hs-common")]
81pub mod hspool;
82mod impls;
83pub mod isolation;
84mod mgr;
85#[cfg(test)]
86mod mocks;
87mod preemptive;
88pub mod timeouts;
89mod tunnel;
90mod usage;
91
92cfg_if::cfg_if! {
94 if #[cfg(feature = "experimental-api")] {
95 pub mod path;
96 } else {
97 pub(crate) mod path;
98 }
99}
100
101pub use err::Error;
102pub use isolation::IsolationToken;
103pub use tor_guardmgr::{ClockSkewEvents, GuardMgrConfig, SkewEstimate};
104pub use tunnel::{
105 ClientDataTunnel, ClientDirTunnel, ClientOnionServiceDataTunnel, ClientOnionServiceDirTunnel,
106 ClientOnionServiceIntroTunnel, ServiceOnionServiceDataTunnel, ServiceOnionServiceDirTunnel,
107 ServiceOnionServiceIntroTunnel,
108};
109#[cfg(feature = "conflux")]
110pub use tunnel::{
111 ClientMultiPathDataTunnel, ClientMultiPathOnionServiceDataTunnel,
112 ServiceMultiPathOnionServiceDataTunnel,
113};
114pub use usage::{TargetPort, TargetPorts};
115
116pub use config::{
117 CircMgrConfig, CircuitTiming, CircuitTimingBuilder, PathConfig, PathConfigBuilder,
118 PreemptiveCircuitConfig, PreemptiveCircuitConfigBuilder,
119};
120
121use crate::isolation::StreamIsolation;
122use crate::mgr::TunnelProvenance;
123use crate::preemptive::PreemptiveCircuitPredictor;
124use usage::TargetTunnelUsage;
125
126use safelog::sensitive as sv;
127#[cfg(feature = "geoip")]
128use tor_geoip::CountryCode;
129pub use tor_guardmgr::{ExternalActivity, FirstHopId};
130use tor_persist::StateMgr;
131use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
132
133#[cfg(feature = "hs-common")]
134use crate::hspool::{HsCircKind, HsCircStemKind};
135#[cfg(all(feature = "vanguards", feature = "hs-common"))]
136use tor_guardmgr::vanguards::VanguardMgr;
137
138pub type Result<T> = std::result::Result<T, Error>;
140
141type TimeoutStateHandle = tor_persist::DynStorageHandle<timeouts::pareto::ParetoTimeoutState>;
143
144const PARETO_TIMEOUT_DATA_KEY: &str = "circuit_timeouts";
146
147#[derive(Debug, Copy, Clone)]
155#[non_exhaustive]
156pub enum DirInfo<'a> {
157 Fallbacks(&'a FallbackList),
159 Directory(&'a NetDir),
161 Nothing,
164}
165
166impl<'a> From<&'a FallbackList> for DirInfo<'a> {
167 fn from(v: &'a FallbackList) -> DirInfo<'a> {
168 DirInfo::Fallbacks(v)
169 }
170}
171impl<'a> From<&'a NetDir> for DirInfo<'a> {
172 fn from(v: &'a NetDir) -> DirInfo<'a> {
173 DirInfo::Directory(v)
174 }
175}
176impl<'a> DirInfo<'a> {
177 fn circ_params(&self, usage: &TargetTunnelUsage) -> Result<CircParameters> {
179 use tor_netdir::params::NetParameters;
180 let defaults = NetParameters::default();
183 let net_params = match self {
184 DirInfo::Directory(d) => d.params(),
185 _ => &defaults,
186 };
187 match usage {
188 #[cfg(feature = "hs-common")]
189 TargetTunnelUsage::HsCircBase { .. } => {
190 build::onion_circparams_from_netparams(net_params)
191 }
192 _ => build::exit_circparams_from_netparams(net_params),
193 }
194 }
195}
196
197pub struct CircMgr<R: Runtime>(Arc<CircMgrInner<build::TunnelBuilder<R>, R>>);
207
208impl<R: Runtime> CircMgr<R> {
209 pub fn new<SM, CFG: CircMgrConfig>(
215 config: &CFG,
216 storage: SM,
217 runtime: &R,
218 chanmgr: Arc<ChanMgr<R>>,
219 guardmgr: &tor_guardmgr::GuardMgr<R>,
220 ) -> Result<Self>
221 where
222 SM: tor_persist::StateMgr + Clone + Send + Sync + 'static,
223 {
224 Ok(Self(Arc::new(CircMgrInner::new(
225 config, storage, runtime, chanmgr, guardmgr,
226 )?)))
227 }
228
229 #[instrument(level = "trace", skip_all)]
232 pub async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result<ClientDirTunnel> {
233 let tunnel = self.0.get_or_launch_dir(netdir).await?;
234 Ok(tunnel.into())
235 }
236
237 #[instrument(level = "trace", skip_all)]
243 pub async fn get_or_launch_exit(
244 &self,
245 netdir: DirInfo<'_>, ports: &[TargetPort],
247 isolation: StreamIsolation,
248 #[cfg(feature = "geoip")] country_code: Option<CountryCode>,
251 ) -> Result<ClientDataTunnel> {
252 let tunnel = self
253 .0
254 .get_or_launch_exit(
255 netdir,
256 ports,
257 isolation,
258 #[cfg(feature = "geoip")]
259 country_code,
260 )
261 .await?;
262 Ok(tunnel.into())
263 }
264
265 #[cfg(feature = "specific-relay")]
270 #[instrument(level = "trace", skip_all)]
271 pub async fn get_or_launch_dir_specific<T: IntoOwnedChanTarget>(
272 &self,
273 target: T,
274 ) -> Result<ClientDirTunnel> {
275 let tunnel = self.0.get_or_launch_dir_specific(target).await?;
276 Ok(tunnel.into())
277 }
278
279 #[instrument(level = "trace", skip_all)]
285 pub fn launch_background_tasks<D, S>(
286 self: &Arc<Self>,
287 runtime: &R,
288 dir_provider: &Arc<D>,
289 state_mgr: S,
290 ) -> Result<Vec<TaskHandle>>
291 where
292 D: NetDirProvider + 'static + ?Sized,
293 S: StateMgr + std::marker::Send + 'static,
294 {
295 CircMgrInner::launch_background_tasks(&self.0.clone(), runtime, dir_provider, state_mgr)
296 }
297
298 pub fn netdir_is_sufficient(&self, netdir: &NetDir) -> bool {
304 self.0.netdir_is_sufficient(netdir)
305 }
306
307 pub fn retire_circ(&self, circ_id: &UniqId) {
310 self.0.retire_circ(circ_id);
311 }
312
313 pub fn note_external_failure(
317 &self,
318 target: &impl ChanTarget,
319 external_failure: ExternalActivity,
320 ) {
321 self.0.note_external_failure(target, external_failure);
322 }
323
324 pub fn note_external_success(
327 &self,
328 target: &impl ChanTarget,
329 external_activity: ExternalActivity,
330 ) {
331 self.0.note_external_success(target, external_activity);
332 }
333
334 pub fn skew_events(&self) -> ClockSkewEvents {
342 self.0.skew_events()
343 }
344
345 #[instrument(level = "trace", skip_all)]
351 pub fn reconfigure<CFG: CircMgrConfig>(
352 &self,
353 new_config: &CFG,
354 how: tor_config::Reconfigure,
355 ) -> std::result::Result<RetireCircuits, tor_config::ReconfigureError> {
356 self.0.reconfigure(new_config, how)
357 }
358
359 pub fn estimate_timeout(&self, timeout_action: &timeouts::Action) -> std::time::Duration {
385 self.0.estimate_timeout(timeout_action)
386 }
387
388 #[cfg(feature = "experimental-api")]
391 pub fn builder(&self) -> &TunnelBuilder<R> {
392 CircMgrInner::builder(&self.0)
393 }
394}
395
396#[derive(Clone)]
398pub(crate) struct CircMgrInner<B: AbstractTunnelBuilder<R> + 'static, R: Runtime> {
399 mgr: Arc<mgr::AbstractTunnelMgr<B, R>>,
401 predictor: Arc<Mutex<PreemptiveCircuitPredictor>>,
403}
404
405impl<R: Runtime> CircMgrInner<TunnelBuilder<R>, R> {
406 #[allow(clippy::unnecessary_wraps)]
412 pub(crate) fn new<SM, CFG: CircMgrConfig>(
413 config: &CFG,
414 storage: SM,
415 runtime: &R,
416 chanmgr: Arc<ChanMgr<R>>,
417 guardmgr: &tor_guardmgr::GuardMgr<R>,
418 ) -> Result<Self>
419 where
420 SM: tor_persist::StateMgr + Clone + Send + Sync + 'static,
421 {
422 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
423 let vanguardmgr = {
424 let has_onion_svc = false;
429 VanguardMgr::new(
430 config.vanguard_config(),
431 runtime.clone(),
432 storage.clone(),
433 has_onion_svc,
434 )?
435 };
436
437 let storage_handle = storage.create_handle(PARETO_TIMEOUT_DATA_KEY);
438
439 let builder = build::TunnelBuilder::new(
440 runtime.clone(),
441 chanmgr,
442 config.path_rules().clone(),
443 storage_handle,
444 guardmgr.clone(),
445 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
446 vanguardmgr,
447 );
448
449 Ok(Self::new_generic(config, runtime, guardmgr, builder))
450 }
451}
452
453impl<B: AbstractTunnelBuilder<R> + 'static, R: Runtime> CircMgrInner<B, R> {
454 pub(crate) fn new_generic<CFG: CircMgrConfig>(
456 config: &CFG,
457 runtime: &R,
458 guardmgr: &tor_guardmgr::GuardMgr<R>,
459 builder: B,
460 ) -> Self {
461 let preemptive = Arc::new(Mutex::new(PreemptiveCircuitPredictor::new(
462 config.preemptive_circuits().clone(),
463 )));
464
465 guardmgr.set_filter(config.path_rules().build_guard_filter());
466
467 let mgr =
468 mgr::AbstractTunnelMgr::new(builder, runtime.clone(), config.circuit_timing().clone());
469
470 CircMgrInner {
471 mgr: Arc::new(mgr),
472 predictor: preemptive,
473 }
474 }
475
476 #[instrument(level = "trace", skip_all)]
482 pub(crate) fn launch_background_tasks<D, S>(
483 self: &Arc<Self>,
484 runtime: &R,
485 dir_provider: &Arc<D>,
486 state_mgr: S,
487 ) -> Result<Vec<TaskHandle>>
488 where
489 D: NetDirProvider + 'static + ?Sized,
490 S: StateMgr + std::marker::Send + 'static,
491 {
492 let mut ret = vec![];
493
494 runtime
495 .spawn(Self::keep_circmgr_params_updated(
496 dir_provider.events(),
497 Arc::downgrade(self),
498 Arc::downgrade(dir_provider),
499 ))
500 .map_err(|e| Error::from_spawn("circmgr parameter updater", e))?;
501
502 let (sched, handle) = TaskSchedule::new(runtime.clone());
503 ret.push(handle);
504
505 runtime
506 .spawn(Self::update_persistent_state(
507 sched,
508 Arc::downgrade(self),
509 state_mgr,
510 ))
511 .map_err(|e| Error::from_spawn("persistent state updater", e))?;
512
513 let (sched, handle) = TaskSchedule::new(runtime.clone());
514 ret.push(handle);
515
516 runtime
517 .spawn(Self::continually_launch_timeout_testing_circuits(
518 sched,
519 Arc::downgrade(self),
520 Arc::downgrade(dir_provider),
521 ))
522 .map_err(|e| Error::from_spawn("timeout-probe circuit launcher", e))?;
523
524 let (sched, handle) = TaskSchedule::new(runtime.clone());
525 ret.push(handle);
526
527 runtime
528 .spawn(Self::continually_preemptively_build_circuits(
529 sched,
530 Arc::downgrade(self),
531 Arc::downgrade(dir_provider),
532 ))
533 .map_err(|e| Error::from_spawn("preemptive circuit launcher", e))?;
534
535 self.mgr
536 .peek_builder()
537 .guardmgr()
538 .install_netdir_provider(&dir_provider.clone().upcast_arc())?;
539
540 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
541 {
542 let () = self
543 .mgr
544 .peek_builder()
545 .vanguardmgr()
546 .launch_background_tasks(&dir_provider.clone().upcast_arc())?;
547 }
548
549 Ok(ret)
550 }
551
552 #[instrument(level = "trace", skip_all)]
555 pub(crate) async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result<Arc<B::Tunnel>> {
556 self.expire_circuits().await;
557 let usage = TargetTunnelUsage::Dir;
558 self.mgr.get_or_launch(&usage, netdir).await.map(|(c, _)| c)
559 }
560
561 #[instrument(level = "trace", skip_all)]
567 pub(crate) async fn get_or_launch_exit(
568 &self,
569 netdir: DirInfo<'_>, ports: &[TargetPort],
571 isolation: StreamIsolation,
572 #[cfg(feature = "geoip")] country_code: Option<CountryCode>,
575 ) -> Result<Arc<B::Tunnel>> {
576 self.expire_circuits().await;
577 let time = Instant::now();
578 {
579 let mut predictive = self.predictor.lock().expect("preemptive lock poisoned");
580 if ports.is_empty() {
581 predictive.note_usage(None, time);
582 } else {
583 for port in ports.iter() {
584 predictive.note_usage(Some(*port), time);
585 }
586 }
587 }
588 let require_stability = ports.iter().any(|p| {
589 self.mgr
590 .peek_builder()
591 .path_config()
592 .long_lived_ports
593 .contains(&p.port)
594 });
595 let ports = ports.iter().map(Clone::clone).collect();
596 #[cfg(not(feature = "geoip"))]
597 let country_code = None;
598 let usage = TargetTunnelUsage::Exit {
599 ports,
600 isolation,
601 country_code,
602 require_stability,
603 };
604 self.mgr.get_or_launch(&usage, netdir).await.map(|(c, _)| c)
605 }
606
607 #[cfg(feature = "specific-relay")]
612 #[instrument(level = "trace", skip_all)]
613 pub(crate) async fn get_or_launch_dir_specific<T: IntoOwnedChanTarget>(
614 &self,
615 target: T,
616 ) -> Result<Arc<B::Tunnel>> {
617 self.expire_circuits().await;
618 let usage = TargetTunnelUsage::DirSpecificTarget(target.to_owned());
619 self.mgr
620 .get_or_launch(&usage, DirInfo::Nothing)
621 .await
622 .map(|(c, _)| c)
623 }
624
625 #[instrument(level = "trace", skip_all)]
631 pub(crate) fn reconfigure<CFG: CircMgrConfig>(
632 &self,
633 new_config: &CFG,
634 how: tor_config::Reconfigure,
635 ) -> std::result::Result<RetireCircuits, tor_config::ReconfigureError> {
636 let old_path_rules = self.mgr.peek_builder().path_config();
637 let predictor = self.predictor.lock().expect("poisoned lock");
638 let preemptive_circuits = predictor.config();
639 if preemptive_circuits.initial_predicted_ports
640 != new_config.preemptive_circuits().initial_predicted_ports
641 {
642 how.cannot_change("preemptive_circuits.initial_predicted_ports")?;
644 }
645
646 if how == tor_config::Reconfigure::CheckAllOrNothing {
647 return Ok(RetireCircuits::None);
648 }
649
650 let retire_because_of_guardmgr =
651 self.mgr.peek_builder().guardmgr().reconfigure(new_config)?;
652
653 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
654 let retire_because_of_vanguardmgr = self
655 .mgr
656 .peek_builder()
657 .vanguardmgr()
658 .reconfigure(new_config.vanguard_config())?;
659
660 let new_reachable = &new_config.path_rules().reachable_addrs;
661 if new_reachable != &old_path_rules.reachable_addrs {
662 let filter = new_config.path_rules().build_guard_filter();
663 self.mgr.peek_builder().guardmgr().set_filter(filter);
664 }
665
666 let discard_all_circuits = !new_config
667 .path_rules()
668 .at_least_as_permissive_as(&old_path_rules)
669 || retire_because_of_guardmgr != tor_guardmgr::RetireCircuits::None;
670
671 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
672 let discard_all_circuits = discard_all_circuits
673 || retire_because_of_vanguardmgr != tor_guardmgr::RetireCircuits::None;
674
675 self.mgr
676 .peek_builder()
677 .set_path_config(new_config.path_rules().clone());
678 self.mgr
679 .set_circuit_timing(new_config.circuit_timing().clone());
680 predictor.set_config(new_config.preemptive_circuits().clone());
681
682 if discard_all_circuits {
683 info!("Path configuration has become more restrictive: retiring existing circuits.");
687 self.retire_all_circuits();
688 return Ok(RetireCircuits::All);
689 }
690 Ok(RetireCircuits::None)
691 }
692
693 #[instrument(level = "trace", skip_all)]
701 async fn keep_circmgr_params_updated<D>(
702 mut events: impl futures::Stream<Item = DirEvent> + Unpin,
703 circmgr: Weak<Self>,
704 dirmgr: Weak<D>,
705 ) where
706 D: NetDirProvider + 'static + ?Sized,
707 {
708 use DirEvent::*;
709 while let Some(event) = events.next().await {
710 if matches!(event, NewConsensus) {
711 if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
712 if let Ok(netdir) = dm.netdir(Timeliness::Timely) {
713 cm.update_network_parameters(netdir.params());
714 }
715 } else {
716 debug!("Circmgr or dirmgr has disappeared; task exiting.");
717 break;
718 }
719 }
720 }
721 }
722
723 fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters) {
726 self.mgr.update_network_parameters(p);
727 self.mgr.peek_builder().update_network_parameters(p);
728 }
729
730 #[instrument(level = "trace", skip_all)]
737 async fn continually_launch_timeout_testing_circuits<D>(
738 mut sched: TaskSchedule<R>,
739 circmgr: Weak<Self>,
740 dirmgr: Weak<D>,
741 ) where
742 D: NetDirProvider + 'static + ?Sized,
743 {
744 while sched.next().await.is_some() {
745 if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
746 if let Ok(netdir) = dm.netdir(Timeliness::Unchecked) {
747 if let Err(e) = cm
748 .launch_timeout_testing_circuit_if_appropriate(&netdir)
749 .await
750 {
751 warn_report!(e, "Problem launching a timeout testing circuit");
752 }
753 let delay = netdir
754 .params()
755 .cbt_testing_delay
756 .try_into()
757 .expect("Out-of-bounds value from BoundedInt32");
758
759 drop((cm, dm));
760 sched.fire_in(delay);
761 } else {
762 let _ = dm.events().next().await;
765 sched.fire();
766 }
767 } else {
768 return;
769 }
770 }
771 }
772
773 async fn launch_timeout_testing_circuit_if_appropriate(&self, netdir: &NetDir) -> Result<()> {
781 if !self.mgr.peek_builder().learning_timeouts() {
782 return Ok(());
783 }
784 self.expire_circuits().await;
787 let max_circs: u64 = netdir
788 .params()
789 .cbt_max_open_circuits_for_testing
790 .try_into()
791 .expect("Out-of-bounds result from BoundedInt32");
792 if (self.mgr.n_tunnels() as u64) < max_circs {
793 let usage = TargetTunnelUsage::TimeoutTesting;
795 let dirinfo = netdir.into();
796 let mgr = Arc::clone(&self.mgr);
797 debug!("Launching a circuit to test build times.");
798 let receiver = mgr.launch_by_usage(&usage, dirinfo)?;
799 drop(receiver);
802 }
803
804 Ok(())
805 }
806
807 #[allow(clippy::cognitive_complexity)] #[instrument(level = "trace", skip_all)]
815 async fn update_persistent_state<S>(
816 mut sched: TaskSchedule<R>,
817 circmgr: Weak<Self>,
818 statemgr: S,
819 ) where
820 S: StateMgr + std::marker::Send,
821 {
822 while sched.next().await.is_some() {
823 if let Some(circmgr) = Weak::upgrade(&circmgr) {
824 use tor_persist::LockStatus::*;
825
826 match statemgr.try_lock() {
827 Err(e) => {
828 error_report!(e, "Problem with state lock file");
829 break;
830 }
831 Ok(NewlyAcquired) => {
832 info!("We now own the lock on our state files.");
833 if let Err(e) = circmgr.upgrade_to_owned_persistent_state() {
834 error_report!(e, "Unable to upgrade to owned state files");
835 break;
836 }
837 }
838 Ok(AlreadyHeld) => {
839 if let Err(e) = circmgr.store_persistent_state() {
840 error_report!(e, "Unable to flush circmgr state");
841 break;
842 }
843 }
844 Ok(NoLock) => {
845 if let Err(e) = circmgr.reload_persistent_state() {
846 error_report!(e, "Unable to reload circmgr state");
847 break;
848 }
849 }
850 }
851 } else {
852 debug!("Circmgr has disappeared; task exiting.");
853 return;
854 }
855 sched.fire_in(Duration::from_secs(60));
862 }
863
864 debug!("State update task exiting (potentially due to handle drop).");
865 }
866
867 pub(crate) fn upgrade_to_owned_persistent_state(&self) -> Result<()> {
871 self.mgr.peek_builder().upgrade_to_owned_state()?;
872 Ok(())
873 }
874
875 pub(crate) fn reload_persistent_state(&self) -> Result<()> {
880 self.mgr.peek_builder().reload_state()?;
881 Ok(())
882 }
883
884 #[instrument(level = "trace", skip_all)]
896 async fn continually_preemptively_build_circuits<D>(
897 mut sched: TaskSchedule<R>,
898 circmgr: Weak<Self>,
899 dirmgr: Weak<D>,
900 ) where
901 D: NetDirProvider + 'static + ?Sized,
902 {
903 let base_delay = Duration::from_secs(10);
904 let mut retry = RetryDelay::from_duration(base_delay);
905
906 while sched.next().await.is_some() {
907 if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
908 if let Ok(netdir) = dm.netdir(Timeliness::Timely) {
909 let result = cm
910 .launch_circuits_preemptively(DirInfo::Directory(&netdir))
911 .await;
912
913 let delay = match result {
914 Ok(()) => {
915 retry.reset();
916 base_delay
917 }
918 Err(_) => retry.next_delay(&mut rand::rng()),
919 };
920
921 sched.fire_in(delay);
922 } else {
923 let _ = dm.events().next().await;
926 sched.fire();
927 }
928 } else {
929 return;
930 }
931 }
932 }
933
934 #[allow(clippy::cognitive_complexity)]
942 #[instrument(level = "trace", skip_all)]
943 async fn launch_circuits_preemptively(
944 &self,
945 netdir: DirInfo<'_>,
946 ) -> std::result::Result<(), err::PreemptiveCircError> {
947 trace!("Checking preemptive circuit predictions.");
948 let (circs, threshold) = {
949 let path_config = self.mgr.peek_builder().path_config();
950 let preemptive = self.predictor.lock().expect("preemptive lock poisoned");
951 let threshold = preemptive.config().disable_at_threshold;
952 (preemptive.predict(&path_config), threshold)
953 };
954
955 if self.mgr.n_tunnels() >= threshold {
956 return Ok(());
957 }
958 let mut n_created = 0_usize;
959 let mut n_errors = 0_usize;
960
961 let futures = circs
962 .iter()
963 .map(|usage| self.mgr.get_or_launch(usage, netdir));
964 let results = futures::future::join_all(futures).await;
965 for (i, result) in results.into_iter().enumerate() {
966 match result {
967 Ok((_, TunnelProvenance::NewlyCreated)) => {
968 debug!("Preeemptive circuit was created for {:?}", circs[i]);
969 n_created += 1;
970 }
971 Ok((_, TunnelProvenance::Preexisting)) => {
972 trace!("Circuit already existed created for {:?}", circs[i]);
973 }
974 Err(e) => {
975 warn_report!(e, "Failed to build preemptive circuit {:?}", sv(&circs[i]));
976 n_errors += 1;
977 }
978 }
979 }
980
981 if n_created > 0 || n_errors == 0 {
982 Ok(())
986 } else {
987 Err(err::PreemptiveCircError)
990 }
991 }
992
993 #[cfg(feature = "hs-common")]
1010 #[instrument(level = "trace", skip_all)]
1011 pub(crate) async fn launch_hs_unmanaged<T>(
1012 &self,
1013 planned_target: Option<T>,
1014 dir: &NetDir,
1015 stem_kind: HsCircStemKind,
1016 circ_kind: Option<HsCircKind>,
1017 ) -> Result<B::Tunnel>
1018 where
1019 T: IntoOwnedChanTarget,
1020 {
1021 let usage = TargetTunnelUsage::HsCircBase {
1022 compatible_with_target: planned_target.map(IntoOwnedChanTarget::to_owned),
1023 stem_kind,
1024 circ_kind,
1025 };
1026 let (_, client_circ) = self.mgr.launch_unmanaged(&usage, dir.into()).await?;
1027 Ok(client_circ)
1028 }
1029
1030 pub(crate) fn netdir_is_sufficient(&self, netdir: &NetDir) -> bool {
1036 self.mgr
1037 .peek_builder()
1038 .guardmgr()
1039 .netdir_is_sufficient(netdir)
1040 }
1041
1042 pub(crate) fn estimate_timeout(
1044 &self,
1045 timeout_action: &timeouts::Action,
1046 ) -> std::time::Duration {
1047 let (timeout, _abandon) = self.mgr.peek_builder().estimator().timeouts(timeout_action);
1048 timeout
1049 }
1050
1051 pub(crate) fn builder(&self) -> &B {
1053 self.mgr.peek_builder()
1054 }
1055
1056 pub(crate) fn store_persistent_state(&self) -> Result<bool> {
1061 self.mgr.peek_builder().save_state()
1062 }
1063
1064 async fn expire_circuits(&self) {
1069 let now = self.mgr.peek_runtime().now();
1075
1076 let _next_expiration = self.mgr.expire_tunnels(now).await;
1078 }
1079
1080 pub(crate) fn retire_all_circuits(&self) {
1088 self.mgr.retire_all_tunnels();
1089 }
1090
1091 pub(crate) fn retire_circ(&self, circ_id: &<B::Tunnel as AbstractTunnel>::Id) {
1094 let _ = self.mgr.take_tunnel(circ_id);
1095 }
1096
1097 pub(crate) fn skew_events(&self) -> ClockSkewEvents {
1105 self.mgr.peek_builder().guardmgr().skew_events()
1106 }
1107
1108 pub(crate) fn note_external_failure(
1112 &self,
1113 target: &impl ChanTarget,
1114 external_failure: ExternalActivity,
1115 ) {
1116 self.mgr
1117 .peek_builder()
1118 .guardmgr()
1119 .note_external_failure(target, external_failure);
1120 }
1121
1122 pub(crate) fn note_external_success(
1125 &self,
1126 target: &impl ChanTarget,
1127 external_activity: ExternalActivity,
1128 ) {
1129 self.mgr
1130 .peek_builder()
1131 .guardmgr()
1132 .note_external_success(target, external_activity);
1133 }
1134}
1135
1136impl<B: AbstractTunnelBuilder<R> + 'static, R: Runtime> Drop for CircMgrInner<B, R> {
1137 fn drop(&mut self) {
1138 match self.store_persistent_state() {
1139 Ok(true) => info!("Flushed persistent state at exit."),
1140 Ok(false) => debug!("Lock not held; no state to flush."),
1141 Err(e) => error_report!(e, "Unable to flush state on circuit manager drop"),
1142 }
1143 }
1144}
1145
1146#[cfg(test)]
1147mod test {
1148 #![allow(clippy::bool_assert_comparison)]
1150 #![allow(clippy::clone_on_copy)]
1151 #![allow(clippy::dbg_macro)]
1152 #![allow(clippy::mixed_attributes_style)]
1153 #![allow(clippy::print_stderr)]
1154 #![allow(clippy::print_stdout)]
1155 #![allow(clippy::single_char_pattern)]
1156 #![allow(clippy::unwrap_used)]
1157 #![allow(clippy::unchecked_time_subtraction)]
1158 #![allow(clippy::useless_vec)]
1159 #![allow(clippy::needless_pass_by_value)]
1160 use mocks::FakeBuilder;
1162 use tor_guardmgr::GuardMgr;
1163 use tor_linkspec::OwnedChanTarget;
1164 use tor_netdir::testprovider::TestNetDirProvider;
1165 use tor_persist::TestingStateMgr;
1166
1167 use super::*;
1168
1169 #[test]
1170 fn get_params() {
1171 use tor_netdir::{MdReceiver, PartialNetDir};
1172 use tor_netdoc::doc::netstatus::NetParams;
1173 let fb = FallbackList::from([]);
1175 let di: DirInfo<'_> = (&fb).into();
1176
1177 let p1 = di.circ_params(&TargetTunnelUsage::Dir).unwrap();
1178 assert!(!p1.extend_by_ed25519_id);
1179
1180 let (consensus, microdescs) = tor_netdir::testnet::construct_network().unwrap();
1182 let mut params = NetParams::default();
1183 params.set("circwindow".into(), 100);
1184 params.set("ExtendByEd25519ID".into(), 1);
1185 let mut dir = PartialNetDir::new(consensus, Some(¶ms));
1186 for m in microdescs {
1187 dir.add_microdesc(m);
1188 }
1189 let netdir = dir.unwrap_if_sufficient().unwrap();
1190 let di: DirInfo<'_> = (&netdir).into();
1191 let p2 = di.circ_params(&TargetTunnelUsage::Dir).unwrap();
1192 assert!(p2.extend_by_ed25519_id);
1193
1194 let (consensus, microdescs) = tor_netdir::testnet::construct_network().unwrap();
1196 let mut params = NetParams::default();
1197 params.set("circwindow".into(), 100_000);
1198 params.set("ExtendByEd25519ID".into(), 1);
1199 let mut dir = PartialNetDir::new(consensus, Some(¶ms));
1200 for m in microdescs {
1201 dir.add_microdesc(m);
1202 }
1203 let netdir = dir.unwrap_if_sufficient().unwrap();
1204 let di: DirInfo<'_> = (&netdir).into();
1205 let p2 = di.circ_params(&TargetTunnelUsage::Dir).unwrap();
1206 assert!(p2.extend_by_ed25519_id);
1207 }
1208
1209 fn make_circmgr<R: Runtime>(runtime: R) -> Arc<CircMgrInner<FakeBuilder<R>, R>> {
1210 let config = crate::config::test_config::TestConfig::default();
1211 let statemgr = TestingStateMgr::new();
1212 let guardmgr =
1213 GuardMgr::new(runtime.clone(), statemgr.clone(), &config).expect("Create GuardMgr");
1214 let builder = FakeBuilder::new(
1215 &runtime,
1216 statemgr.clone(),
1217 &tor_guardmgr::TestConfig::default(),
1218 );
1219 let circmgr = Arc::new(CircMgrInner::new_generic(
1220 &config, &runtime, &guardmgr, builder,
1221 ));
1222 let netdir = Arc::new(TestNetDirProvider::new());
1223 CircMgrInner::launch_background_tasks(&circmgr, &runtime, &netdir, statemgr)
1224 .expect("launch CircMgrInner background tasks");
1225 circmgr
1226 }
1227
1228 #[test]
1229 #[cfg(feature = "hs-common")]
1230 fn test_launch_hs_unmanaged() {
1231 tor_rtmock::MockRuntime::test_with_various(|runtime| async move {
1232 let circmgr = make_circmgr(runtime.clone());
1233 let netdir = tor_netdir::testnet::construct_netdir()
1234 .unwrap_if_sufficient()
1235 .unwrap();
1236
1237 let (ret_tx, ret_rx) = tor_async_utils::oneshot::channel();
1238 runtime.spawn_identified("launch_hs_unamanged", async move {
1239 ret_tx
1240 .send(
1241 circmgr
1242 .launch_hs_unmanaged::<OwnedChanTarget>(
1243 None,
1244 &netdir,
1245 HsCircStemKind::Naive,
1246 None,
1247 )
1248 .await,
1249 )
1250 .unwrap();
1251 });
1252 runtime.advance_by(Duration::from_millis(60)).await;
1253 ret_rx.await.unwrap().unwrap();
1254 });
1255 }
1256}