1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![doc = include_str!("../README.md")]
3#![allow(renamed_and_removed_lints)] #![allow(unknown_lints)] #![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_time_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] #![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] #![allow(clippy::result_large_err)] #![allow(clippy::needless_raw_string_hashes)] #![allow(clippy::needless_lifetimes)] #![allow(mismatched_lifetime_syntaxes)] #![allow(clippy::collapsible_if)] #![deny(clippy::unused_async)]
47#![deny(clippy::string_slice)] #![cfg_attr(not(all(feature = "full", feature = "experimental")), allow(unused))]
52
53use build::TunnelBuilder;
54use mgr::{AbstractTunnel, AbstractTunnelBuilder};
55use tor_basic_utils::retry::RetryDelay;
56use tor_chanmgr::ChanMgr;
57use tor_dircommon::fallback::FallbackList;
58use tor_error::{error_report, warn_report};
59use tor_guardmgr::RetireCircuits;
60use tor_linkspec::ChanTarget;
61use tor_netdir::{DirEvent, NetDir, NetDirProvider, Timeliness};
62use tor_proto::circuit::UniqId;
63use tor_proto::client::circuit::CircParameters;
64use tor_rtcompat::Runtime;
65
66#[cfg(any(feature = "specific-relay", feature = "hs-common"))]
67use tor_linkspec::IntoOwnedChanTarget;
68
69use futures::StreamExt;
70use std::sync::{Arc, Mutex, Weak};
71use std::time::Duration;
72use tor_rtcompat::SpawnExt;
73use tracing::{debug, info, instrument, trace, warn};
74
75#[cfg(feature = "testing")]
76pub use config::test_config::TestConfig;
77
78pub mod build;
79mod config;
80mod err;
81#[cfg(feature = "hs-common")]
82pub mod hspool;
83mod impls;
84pub mod isolation;
85mod mgr;
86#[cfg(test)]
87mod mocks;
88mod preemptive;
89pub mod timeouts;
90mod tunnel;
91mod usage;
92
93cfg_if::cfg_if! {
95 if #[cfg(feature = "experimental-api")] {
96 pub mod path;
97 } else {
98 pub(crate) mod path;
99 }
100}
101
102pub use err::Error;
103pub use isolation::IsolationToken;
104pub use tor_guardmgr::{ClockSkewEvents, GuardMgrConfig, SkewEstimate};
105pub use tunnel::{
106 ClientDataTunnel, ClientDirTunnel, ClientOnionServiceDataTunnel, ClientOnionServiceDirTunnel,
107 ClientOnionServiceIntroTunnel, ServiceOnionServiceDataTunnel, ServiceOnionServiceDirTunnel,
108 ServiceOnionServiceIntroTunnel,
109};
110#[cfg(feature = "conflux")]
111pub use tunnel::{
112 ClientMultiPathDataTunnel, ClientMultiPathOnionServiceDataTunnel,
113 ServiceMultiPathOnionServiceDataTunnel,
114};
115pub use usage::{TargetPort, TargetPorts};
116
117pub use config::{
118 CircMgrConfig, CircuitTiming, CircuitTimingBuilder, PathConfig, PathConfigBuilder,
119 PreemptiveCircuitConfig, PreemptiveCircuitConfigBuilder,
120};
121
122use crate::isolation::StreamIsolation;
123use crate::mgr::TunnelProvenance;
124use crate::preemptive::PreemptiveCircuitPredictor;
125use usage::TargetTunnelUsage;
126
127use safelog::sensitive as sv;
128#[cfg(feature = "geoip")]
129use tor_geoip::CountryCode;
130pub use tor_guardmgr::{ExternalActivity, FirstHopId};
131use tor_persist::StateMgr;
132use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
133
134#[cfg(feature = "hs-common")]
135use crate::hspool::{HsCircKind, HsCircStemKind};
136#[cfg(all(feature = "vanguards", feature = "hs-common"))]
137use tor_guardmgr::vanguards::VanguardMgr;
138
139pub type Result<T> = std::result::Result<T, Error>;
141
142type TimeoutStateHandle = tor_persist::DynStorageHandle<timeouts::pareto::ParetoTimeoutState>;
144
145const PARETO_TIMEOUT_DATA_KEY: &str = "circuit_timeouts";
147
148#[derive(Debug, Copy, Clone)]
156#[non_exhaustive]
157pub enum DirInfo<'a> {
158 Fallbacks(&'a FallbackList),
160 Directory(&'a NetDir),
162 Nothing,
165}
166
167impl<'a> From<&'a FallbackList> for DirInfo<'a> {
168 fn from(v: &'a FallbackList) -> DirInfo<'a> {
169 DirInfo::Fallbacks(v)
170 }
171}
172impl<'a> From<&'a NetDir> for DirInfo<'a> {
173 fn from(v: &'a NetDir) -> DirInfo<'a> {
174 DirInfo::Directory(v)
175 }
176}
177impl<'a> DirInfo<'a> {
178 fn circ_params(&self, usage: &TargetTunnelUsage) -> Result<CircParameters> {
180 use tor_netdir::params::NetParameters;
181 let defaults = NetParameters::default();
184 let net_params = match self {
185 DirInfo::Directory(d) => d.params(),
186 _ => &defaults,
187 };
188 match usage {
189 #[cfg(feature = "hs-common")]
190 TargetTunnelUsage::HsCircBase { .. } => {
191 build::onion_circparams_from_netparams(net_params)
192 }
193 _ => build::exit_circparams_from_netparams(net_params),
194 }
195 }
196}
197
198pub struct CircMgr<R: Runtime>(Arc<CircMgrInner<build::TunnelBuilder<R>, R>>);
208
209impl<R: Runtime> CircMgr<R> {
210 pub fn new<SM, CFG: CircMgrConfig>(
216 config: &CFG,
217 storage: SM,
218 runtime: &R,
219 chanmgr: Arc<ChanMgr<R>>,
220 guardmgr: &tor_guardmgr::GuardMgr<R>,
221 ) -> Result<Self>
222 where
223 SM: tor_persist::StateMgr + Clone + Send + Sync + 'static,
224 {
225 Ok(Self(Arc::new(CircMgrInner::new(
226 config, storage, runtime, chanmgr, guardmgr,
227 )?)))
228 }
229
230 #[instrument(level = "trace", skip_all)]
233 pub async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result<ClientDirTunnel> {
234 let tunnel = self.0.get_or_launch_dir(netdir).await?;
235 Ok(tunnel.into())
236 }
237
238 #[instrument(level = "trace", skip_all)]
244 pub async fn get_or_launch_exit(
245 &self,
246 netdir: DirInfo<'_>, ports: &[TargetPort],
248 isolation: StreamIsolation,
249 #[cfg(feature = "geoip")] country_code: Option<CountryCode>,
252 ) -> Result<ClientDataTunnel> {
253 let tunnel = self
254 .0
255 .get_or_launch_exit(
256 netdir,
257 ports,
258 isolation,
259 #[cfg(feature = "geoip")]
260 country_code,
261 )
262 .await?;
263 Ok(tunnel.into())
264 }
265
266 #[cfg(feature = "specific-relay")]
271 #[instrument(level = "trace", skip_all)]
272 pub async fn get_or_launch_dir_specific<T: IntoOwnedChanTarget>(
273 &self,
274 target: T,
275 ) -> Result<ClientDirTunnel> {
276 let tunnel = self.0.get_or_launch_dir_specific(target).await?;
277 Ok(tunnel.into())
278 }
279
280 #[instrument(level = "trace", skip_all)]
286 pub fn launch_background_tasks<D, S>(
287 self: &Arc<Self>,
288 runtime: &R,
289 dir_provider: &Arc<D>,
290 state_mgr: S,
291 ) -> Result<Vec<TaskHandle>>
292 where
293 D: NetDirProvider + 'static + ?Sized,
294 S: StateMgr + std::marker::Send + 'static,
295 {
296 CircMgrInner::launch_background_tasks(&self.0.clone(), runtime, dir_provider, state_mgr)
297 }
298
299 pub fn netdir_is_sufficient(&self, netdir: &NetDir) -> bool {
305 self.0.netdir_is_sufficient(netdir)
306 }
307
308 pub fn retire_circ(&self, circ_id: &UniqId) {
311 self.0.retire_circ(circ_id);
312 }
313
314 pub fn note_external_failure(
318 &self,
319 target: &impl ChanTarget,
320 external_failure: ExternalActivity,
321 ) {
322 self.0.note_external_failure(target, external_failure);
323 }
324
325 pub fn note_external_success(
328 &self,
329 target: &impl ChanTarget,
330 external_activity: ExternalActivity,
331 ) {
332 self.0.note_external_success(target, external_activity);
333 }
334
335 pub fn skew_events(&self) -> ClockSkewEvents {
343 self.0.skew_events()
344 }
345
346 #[instrument(level = "trace", skip_all)]
352 pub fn reconfigure<CFG: CircMgrConfig>(
353 &self,
354 new_config: &CFG,
355 how: tor_config::Reconfigure,
356 ) -> std::result::Result<RetireCircuits, tor_config::ReconfigureError> {
357 self.0.reconfigure(new_config, how)
358 }
359
360 pub fn estimate_timeout(&self, timeout_action: &timeouts::Action) -> std::time::Duration {
386 self.0.estimate_timeout(timeout_action)
387 }
388
389 #[cfg(feature = "experimental-api")]
392 pub fn builder(&self) -> &TunnelBuilder<R> {
393 CircMgrInner::builder(&self.0)
394 }
395}
396
397#[derive(Clone)]
399pub(crate) struct CircMgrInner<B: AbstractTunnelBuilder<R> + 'static, R: Runtime> {
400 mgr: Arc<mgr::AbstractTunnelMgr<B, R>>,
402 predictor: Arc<Mutex<PreemptiveCircuitPredictor>>,
404}
405
406impl<R: Runtime> CircMgrInner<TunnelBuilder<R>, R> {
407 #[allow(clippy::unnecessary_wraps)]
413 pub(crate) fn new<SM, CFG: CircMgrConfig>(
414 config: &CFG,
415 storage: SM,
416 runtime: &R,
417 chanmgr: Arc<ChanMgr<R>>,
418 guardmgr: &tor_guardmgr::GuardMgr<R>,
419 ) -> Result<Self>
420 where
421 SM: tor_persist::StateMgr + Clone + Send + Sync + 'static,
422 {
423 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
424 let vanguardmgr = {
425 let has_onion_svc = false;
430 VanguardMgr::new(
431 config.vanguard_config(),
432 runtime.clone(),
433 storage.clone(),
434 has_onion_svc,
435 )?
436 };
437
438 let storage_handle = storage.create_handle(PARETO_TIMEOUT_DATA_KEY);
439
440 let builder = build::TunnelBuilder::new(
441 runtime.clone(),
442 chanmgr,
443 config.path_rules().clone(),
444 storage_handle,
445 guardmgr.clone(),
446 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
447 vanguardmgr,
448 );
449
450 Ok(Self::new_generic(config, runtime, guardmgr, builder))
451 }
452}
453
454impl<B: AbstractTunnelBuilder<R> + 'static, R: Runtime> CircMgrInner<B, R> {
455 pub(crate) fn new_generic<CFG: CircMgrConfig>(
457 config: &CFG,
458 runtime: &R,
459 guardmgr: &tor_guardmgr::GuardMgr<R>,
460 builder: B,
461 ) -> Self {
462 let preemptive = Arc::new(Mutex::new(PreemptiveCircuitPredictor::new(
463 config.preemptive_circuits().clone(),
464 )));
465
466 guardmgr.set_filter(config.path_rules().build_guard_filter());
467
468 let mgr =
469 mgr::AbstractTunnelMgr::new(builder, runtime.clone(), config.circuit_timing().clone());
470
471 CircMgrInner {
472 mgr: Arc::new(mgr),
473 predictor: preemptive,
474 }
475 }
476
477 #[instrument(level = "trace", skip_all)]
483 pub(crate) fn launch_background_tasks<D, S>(
484 self: &Arc<Self>,
485 runtime: &R,
486 dir_provider: &Arc<D>,
487 state_mgr: S,
488 ) -> Result<Vec<TaskHandle>>
489 where
490 D: NetDirProvider + 'static + ?Sized,
491 S: StateMgr + std::marker::Send + 'static,
492 {
493 let mut ret = vec![];
494
495 runtime
496 .spawn(Self::keep_circmgr_params_updated(
497 dir_provider.events(),
498 Arc::downgrade(self),
499 Arc::downgrade(dir_provider),
500 ))
501 .map_err(|e| Error::from_spawn("circmgr parameter updater", e))?;
502
503 let (sched, handle) = TaskSchedule::new(runtime.clone());
504 ret.push(handle);
505
506 runtime
507 .spawn(Self::update_persistent_state(
508 sched,
509 Arc::downgrade(self),
510 state_mgr,
511 ))
512 .map_err(|e| Error::from_spawn("persistent state updater", e))?;
513
514 let (sched, handle) = TaskSchedule::new(runtime.clone());
515 ret.push(handle);
516
517 runtime
518 .spawn(Self::continually_launch_timeout_testing_circuits(
519 sched,
520 Arc::downgrade(self),
521 Arc::downgrade(dir_provider),
522 ))
523 .map_err(|e| Error::from_spawn("timeout-probe circuit launcher", e))?;
524
525 let (sched, handle) = TaskSchedule::new(runtime.clone());
526 ret.push(handle);
527
528 runtime
529 .spawn(Self::continually_preemptively_build_circuits(
530 sched,
531 Arc::downgrade(self),
532 Arc::downgrade(dir_provider),
533 ))
534 .map_err(|e| Error::from_spawn("preemptive circuit launcher", e))?;
535
536 self.mgr
537 .peek_builder()
538 .guardmgr()
539 .install_netdir_provider(&dir_provider.clone().upcast_arc())?;
540
541 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
542 {
543 let () = self
544 .mgr
545 .peek_builder()
546 .vanguardmgr()
547 .launch_background_tasks(&dir_provider.clone().upcast_arc())?;
548 }
549
550 Ok(ret)
551 }
552
553 #[instrument(level = "trace", skip_all)]
556 pub(crate) async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result<Arc<B::Tunnel>> {
557 self.expire_circuits().await;
558 let usage = TargetTunnelUsage::Dir;
559 self.mgr.get_or_launch(&usage, netdir).await.map(|(c, _)| c)
560 }
561
562 #[instrument(level = "trace", skip_all)]
568 pub(crate) async fn get_or_launch_exit(
569 &self,
570 netdir: DirInfo<'_>, ports: &[TargetPort],
572 isolation: StreamIsolation,
573 #[cfg(feature = "geoip")] country_code: Option<CountryCode>,
576 ) -> Result<Arc<B::Tunnel>> {
577 self.expire_circuits().await;
578 let time = self.mgr.peek_runtime().now();
579 {
580 let mut predictive = self.predictor.lock().expect("preemptive lock poisoned");
581 if ports.is_empty() {
582 predictive.note_usage(None, time);
583 } else {
584 for port in ports.iter() {
585 predictive.note_usage(Some(*port), time);
586 }
587 }
588 }
589 let require_stability = ports.iter().any(|p| {
590 self.mgr
591 .peek_builder()
592 .path_config()
593 .long_lived_ports
594 .contains(&p.port)
595 });
596 let ports = ports.iter().map(Clone::clone).collect();
597 #[cfg(not(feature = "geoip"))]
598 let country_code = None;
599 let usage = TargetTunnelUsage::Exit {
600 ports,
601 isolation,
602 country_code,
603 require_stability,
604 };
605 self.mgr.get_or_launch(&usage, netdir).await.map(|(c, _)| c)
606 }
607
608 #[cfg(feature = "specific-relay")]
613 #[instrument(level = "trace", skip_all)]
614 pub(crate) async fn get_or_launch_dir_specific<T: IntoOwnedChanTarget>(
615 &self,
616 target: T,
617 ) -> Result<Arc<B::Tunnel>> {
618 self.expire_circuits().await;
619 let usage = TargetTunnelUsage::DirSpecificTarget(target.to_owned());
620 self.mgr
621 .get_or_launch(&usage, DirInfo::Nothing)
622 .await
623 .map(|(c, _)| c)
624 }
625
626 #[instrument(level = "trace", skip_all)]
632 pub(crate) fn reconfigure<CFG: CircMgrConfig>(
633 &self,
634 new_config: &CFG,
635 how: tor_config::Reconfigure,
636 ) -> std::result::Result<RetireCircuits, tor_config::ReconfigureError> {
637 let old_path_rules = self.mgr.peek_builder().path_config();
638 let predictor = self.predictor.lock().expect("poisoned lock");
639 let preemptive_circuits = predictor.config();
640 if preemptive_circuits.initial_predicted_ports
641 != new_config.preemptive_circuits().initial_predicted_ports
642 {
643 how.cannot_change("preemptive_circuits.initial_predicted_ports")?;
645 }
646
647 if how == tor_config::Reconfigure::CheckAllOrNothing {
648 return Ok(RetireCircuits::None);
649 }
650
651 let retire_because_of_guardmgr =
652 self.mgr.peek_builder().guardmgr().reconfigure(new_config)?;
653
654 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
655 let retire_because_of_vanguardmgr = self
656 .mgr
657 .peek_builder()
658 .vanguardmgr()
659 .reconfigure(new_config.vanguard_config())?;
660
661 let new_reachable = &new_config.path_rules().reachable_addrs;
662 if new_reachable != &old_path_rules.reachable_addrs {
663 let filter = new_config.path_rules().build_guard_filter();
664 self.mgr.peek_builder().guardmgr().set_filter(filter);
665 }
666
667 let discard_all_circuits = !new_config
668 .path_rules()
669 .at_least_as_permissive_as(&old_path_rules)
670 || retire_because_of_guardmgr != tor_guardmgr::RetireCircuits::None;
671
672 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
673 let discard_all_circuits = discard_all_circuits
674 || retire_because_of_vanguardmgr != tor_guardmgr::RetireCircuits::None;
675
676 self.mgr
677 .peek_builder()
678 .set_path_config(new_config.path_rules().clone());
679 self.mgr
680 .set_circuit_timing(new_config.circuit_timing().clone());
681 predictor.set_config(new_config.preemptive_circuits().clone());
682
683 if discard_all_circuits {
684 info!("Path configuration has become more restrictive: retiring existing circuits.");
688 self.retire_all_circuits();
689 return Ok(RetireCircuits::All);
690 }
691 Ok(RetireCircuits::None)
692 }
693
694 #[instrument(level = "trace", skip_all)]
702 async fn keep_circmgr_params_updated<D>(
703 mut events: impl futures::Stream<Item = DirEvent> + Unpin,
704 circmgr: Weak<Self>,
705 dirmgr: Weak<D>,
706 ) where
707 D: NetDirProvider + 'static + ?Sized,
708 {
709 use DirEvent::*;
710 while let Some(event) = events.next().await {
711 if matches!(event, NewConsensus) {
712 if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
713 if let Ok(netdir) = dm.netdir(Timeliness::Timely) {
714 cm.update_network_parameters(netdir.params());
715 }
716 } else {
717 debug!("Circmgr or dirmgr has disappeared; task exiting.");
718 break;
719 }
720 }
721 }
722 }
723
724 fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters) {
727 self.mgr.update_network_parameters(p);
728 self.mgr.peek_builder().update_network_parameters(p);
729 }
730
731 #[instrument(level = "trace", skip_all)]
738 async fn continually_launch_timeout_testing_circuits<D>(
739 mut sched: TaskSchedule<R>,
740 circmgr: Weak<Self>,
741 dirmgr: Weak<D>,
742 ) where
743 D: NetDirProvider + 'static + ?Sized,
744 {
745 while sched.next().await.is_some() {
746 if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
747 if let Ok(netdir) = dm.netdir(Timeliness::Unchecked) {
748 if let Err(e) = cm
749 .launch_timeout_testing_circuit_if_appropriate(&netdir)
750 .await
751 {
752 warn_report!(e, "Problem launching a timeout testing circuit");
753 }
754 let delay = netdir
755 .params()
756 .cbt_testing_delay
757 .try_into()
758 .expect("Out-of-bounds value from BoundedInt32");
759
760 drop((cm, dm));
761 sched.fire_in(delay);
762 } else {
763 let _ = dm.events().next().await;
766 sched.fire();
767 }
768 } else {
769 return;
770 }
771 }
772 }
773
774 async fn launch_timeout_testing_circuit_if_appropriate(&self, netdir: &NetDir) -> Result<()> {
782 if !self.mgr.peek_builder().learning_timeouts() {
783 return Ok(());
784 }
785 self.expire_circuits().await;
788 let max_circs: u64 = netdir
789 .params()
790 .cbt_max_open_circuits_for_testing
791 .try_into()
792 .expect("Out-of-bounds result from BoundedInt32");
793 if (self.mgr.n_tunnels() as u64) < max_circs {
794 let usage = TargetTunnelUsage::TimeoutTesting;
796 let dirinfo = netdir.into();
797 let mgr = Arc::clone(&self.mgr);
798 debug!("Launching a circuit to test build times.");
799 let receiver = mgr.launch_by_usage(&usage, dirinfo)?;
800 drop(receiver);
803 }
804
805 Ok(())
806 }
807
808 #[allow(clippy::cognitive_complexity)] #[instrument(level = "trace", skip_all)]
816 async fn update_persistent_state<S>(
817 mut sched: TaskSchedule<R>,
818 circmgr: Weak<Self>,
819 statemgr: S,
820 ) where
821 S: StateMgr + std::marker::Send,
822 {
823 while sched.next().await.is_some() {
824 if let Some(circmgr) = Weak::upgrade(&circmgr) {
825 use tor_persist::LockStatus::*;
826
827 match statemgr.try_lock() {
828 Err(e) => {
829 error_report!(e, "Problem with state lock file");
830 break;
831 }
832 Ok(NewlyAcquired) => {
833 info!("We now own the lock on our state files.");
834 if let Err(e) = circmgr.upgrade_to_owned_persistent_state() {
835 error_report!(e, "Unable to upgrade to owned state files");
836 break;
837 }
838 }
839 Ok(AlreadyHeld) => {
840 if let Err(e) = circmgr.store_persistent_state() {
841 error_report!(e, "Unable to flush circmgr state");
842 break;
843 }
844 }
845 Ok(NoLock) => {
846 if let Err(e) = circmgr.reload_persistent_state() {
847 error_report!(e, "Unable to reload circmgr state");
848 break;
849 }
850 }
851 }
852 } else {
853 debug!("Circmgr has disappeared; task exiting.");
854 return;
855 }
856 sched.fire_in(Duration::from_secs(60));
863 }
864
865 debug!("State update task exiting (potentially due to handle drop).");
866 }
867
868 pub(crate) fn upgrade_to_owned_persistent_state(&self) -> Result<()> {
872 self.mgr.peek_builder().upgrade_to_owned_state()?;
873 Ok(())
874 }
875
876 pub(crate) fn reload_persistent_state(&self) -> Result<()> {
881 self.mgr.peek_builder().reload_state()?;
882 Ok(())
883 }
884
885 #[instrument(level = "trace", skip_all)]
897 async fn continually_preemptively_build_circuits<D>(
898 mut sched: TaskSchedule<R>,
899 circmgr: Weak<Self>,
900 dirmgr: Weak<D>,
901 ) where
902 D: NetDirProvider + 'static + ?Sized,
903 {
904 let base_delay = Duration::from_secs(10);
905 let mut retry = RetryDelay::from_duration(base_delay);
906
907 while sched.next().await.is_some() {
908 if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
909 if let Ok(netdir) = dm.netdir(Timeliness::Timely) {
910 let result = cm
911 .launch_circuits_preemptively(DirInfo::Directory(&netdir))
912 .await;
913
914 let delay = match result {
915 Ok(()) => {
916 retry.reset();
917 base_delay
918 }
919 Err(_) => retry.next_delay(&mut rand::rng()),
920 };
921
922 sched.fire_in(delay);
923 } else {
924 let _ = dm.events().next().await;
927 sched.fire();
928 }
929 } else {
930 return;
931 }
932 }
933 }
934
935 #[allow(clippy::cognitive_complexity)]
943 #[instrument(level = "trace", skip_all)]
944 async fn launch_circuits_preemptively(
945 &self,
946 netdir: DirInfo<'_>,
947 ) -> std::result::Result<(), err::PreemptiveCircError> {
948 trace!("Checking preemptive circuit predictions.");
949 let (circs, threshold) = {
950 let path_config = self.mgr.peek_builder().path_config();
951 let preemptive = self.predictor.lock().expect("preemptive lock poisoned");
952 let threshold = preemptive.config().disable_at_threshold;
953 (preemptive.predict(&path_config), threshold)
954 };
955
956 if self.mgr.n_tunnels() >= threshold {
957 return Ok(());
958 }
959 let mut n_created = 0_usize;
960 let mut n_errors = 0_usize;
961
962 let futures = circs
963 .iter()
964 .map(|usage| self.mgr.get_or_launch(usage, netdir));
965 let results = futures::future::join_all(futures).await;
966 for (i, result) in results.into_iter().enumerate() {
967 match result {
968 Ok((t, TunnelProvenance::NewlyCreated)) => {
969 debug!(
970 tunnel_id=%t.unique_id(),
971 "Preemptive circuit was created for {:?}", circs[i]);
972 n_created += 1;
973 }
974 Ok((t, TunnelProvenance::Preexisting)) => {
975 trace!(
976 tunnel_id=%t.unique_id(),
977 "Circuit already existed created for {:?}", circs[i]);
978 }
979 Err(e) => {
980 warn_report!(e, "Failed to build preemptive circuit {:?}", sv(&circs[i]));
981 n_errors += 1;
982 }
983 }
984 }
985
986 if n_created > 0 || n_errors == 0 {
987 Ok(())
991 } else {
992 Err(err::PreemptiveCircError)
995 }
996 }
997
998 #[cfg(feature = "hs-common")]
1015 #[instrument(level = "trace", skip_all)]
1016 pub(crate) async fn launch_hs_unmanaged<T>(
1017 &self,
1018 planned_target: Option<T>,
1019 dir: &NetDir,
1020 stem_kind: HsCircStemKind,
1021 circ_kind: Option<HsCircKind>,
1022 ) -> Result<B::Tunnel>
1023 where
1024 T: IntoOwnedChanTarget,
1025 {
1026 let usage = TargetTunnelUsage::HsCircBase {
1027 compatible_with_target: planned_target.map(IntoOwnedChanTarget::to_owned),
1028 stem_kind,
1029 circ_kind,
1030 };
1031 let (_, client_circ) = self.mgr.launch_unmanaged(&usage, dir.into()).await?;
1032 Ok(client_circ)
1033 }
1034
1035 pub(crate) fn netdir_is_sufficient(&self, netdir: &NetDir) -> bool {
1041 self.mgr
1042 .peek_builder()
1043 .guardmgr()
1044 .netdir_is_sufficient(netdir)
1045 }
1046
1047 pub(crate) fn estimate_timeout(
1049 &self,
1050 timeout_action: &timeouts::Action,
1051 ) -> std::time::Duration {
1052 let (timeout, _abandon) = self.mgr.peek_builder().estimator().timeouts(timeout_action);
1053 timeout
1054 }
1055
1056 pub(crate) fn builder(&self) -> &B {
1058 self.mgr.peek_builder()
1059 }
1060
1061 pub(crate) fn store_persistent_state(&self) -> Result<bool> {
1066 self.mgr.peek_builder().save_state()
1067 }
1068
1069 async fn expire_circuits(&self) {
1074 let now = self.mgr.peek_runtime().now();
1080
1081 let _next_expiration = self.mgr.expire_tunnels(now).await;
1083 }
1084
1085 pub(crate) fn retire_all_circuits(&self) {
1093 self.mgr.retire_all_tunnels();
1094 }
1095
1096 pub(crate) fn retire_circ(&self, circ_id: &<B::Tunnel as AbstractTunnel>::Id) {
1099 let _ = self.mgr.take_tunnel(circ_id);
1100 }
1101
1102 pub(crate) fn skew_events(&self) -> ClockSkewEvents {
1110 self.mgr.peek_builder().guardmgr().skew_events()
1111 }
1112
1113 pub(crate) fn note_external_failure(
1117 &self,
1118 target: &impl ChanTarget,
1119 external_failure: ExternalActivity,
1120 ) {
1121 self.mgr
1122 .peek_builder()
1123 .guardmgr()
1124 .note_external_failure(target, external_failure);
1125 }
1126
1127 pub(crate) fn note_external_success(
1130 &self,
1131 target: &impl ChanTarget,
1132 external_activity: ExternalActivity,
1133 ) {
1134 self.mgr
1135 .peek_builder()
1136 .guardmgr()
1137 .note_external_success(target, external_activity);
1138 }
1139}
1140
1141impl<B: AbstractTunnelBuilder<R> + 'static, R: Runtime> Drop for CircMgrInner<B, R> {
1142 fn drop(&mut self) {
1143 match self.store_persistent_state() {
1144 Ok(true) => info!("Flushed persistent state at exit."),
1145 Ok(false) => debug!("Lock not held; no state to flush."),
1146 Err(e) => error_report!(e, "Unable to flush state on circuit manager drop"),
1147 }
1148 }
1149}
1150
1151#[cfg(test)]
1152mod test {
1153 #![allow(clippy::bool_assert_comparison)]
1155 #![allow(clippy::clone_on_copy)]
1156 #![allow(clippy::dbg_macro)]
1157 #![allow(clippy::mixed_attributes_style)]
1158 #![allow(clippy::print_stderr)]
1159 #![allow(clippy::print_stdout)]
1160 #![allow(clippy::single_char_pattern)]
1161 #![allow(clippy::unwrap_used)]
1162 #![allow(clippy::unchecked_time_subtraction)]
1163 #![allow(clippy::useless_vec)]
1164 #![allow(clippy::needless_pass_by_value)]
1165 #![allow(clippy::string_slice)] use mocks::FakeBuilder;
1168 use tor_guardmgr::GuardMgr;
1169 use tor_linkspec::OwnedChanTarget;
1170 use tor_netdir::testprovider::TestNetDirProvider;
1171 use tor_persist::TestingStateMgr;
1172
1173 use super::*;
1174
1175 #[test]
1176 fn get_params() {
1177 use tor_netdir::{MdReceiver, PartialNetDir};
1178 use tor_netdoc::doc::netstatus::NetParams;
1179 let fb = FallbackList::from([]);
1181 let di: DirInfo<'_> = (&fb).into();
1182
1183 let p1 = di.circ_params(&TargetTunnelUsage::Dir).unwrap();
1184 assert!(!p1.extend_by_ed25519_id);
1185
1186 let (consensus, microdescs) = tor_netdir::testnet::construct_network().unwrap();
1188 let mut params = NetParams::default();
1189 params.set("circwindow".into(), 100);
1190 params.set("ExtendByEd25519ID".into(), 1);
1191 let mut dir = PartialNetDir::new(consensus, Some(¶ms));
1192 for m in microdescs {
1193 dir.add_microdesc(m);
1194 }
1195 let netdir = dir.unwrap_if_sufficient().unwrap();
1196 let di: DirInfo<'_> = (&netdir).into();
1197 let p2 = di.circ_params(&TargetTunnelUsage::Dir).unwrap();
1198 assert!(p2.extend_by_ed25519_id);
1199
1200 let (consensus, microdescs) = tor_netdir::testnet::construct_network().unwrap();
1202 let mut params = NetParams::default();
1203 params.set("circwindow".into(), 100_000);
1204 params.set("ExtendByEd25519ID".into(), 1);
1205 let mut dir = PartialNetDir::new(consensus, Some(¶ms));
1206 for m in microdescs {
1207 dir.add_microdesc(m);
1208 }
1209 let netdir = dir.unwrap_if_sufficient().unwrap();
1210 let di: DirInfo<'_> = (&netdir).into();
1211 let p2 = di.circ_params(&TargetTunnelUsage::Dir).unwrap();
1212 assert!(p2.extend_by_ed25519_id);
1213 }
1214
1215 fn make_circmgr<R: Runtime>(runtime: R) -> Arc<CircMgrInner<FakeBuilder<R>, R>> {
1216 let config = crate::config::test_config::TestConfig::default();
1217 let statemgr = TestingStateMgr::new();
1218 let guardmgr =
1219 GuardMgr::new(runtime.clone(), statemgr.clone(), &config).expect("Create GuardMgr");
1220 let builder = FakeBuilder::new(
1221 &runtime,
1222 statemgr.clone(),
1223 &tor_guardmgr::TestConfig::default(),
1224 );
1225 let circmgr = Arc::new(CircMgrInner::new_generic(
1226 &config, &runtime, &guardmgr, builder,
1227 ));
1228 let netdir = Arc::new(TestNetDirProvider::new());
1229 CircMgrInner::launch_background_tasks(&circmgr, &runtime, &netdir, statemgr)
1230 .expect("launch CircMgrInner background tasks");
1231 circmgr
1232 }
1233
1234 #[test]
1235 #[cfg(feature = "hs-common")]
1236 fn test_launch_hs_unmanaged() {
1237 tor_rtmock::MockRuntime::test_with_various(|runtime| async move {
1238 let circmgr = make_circmgr(runtime.clone());
1239 let netdir = tor_netdir::testnet::construct_netdir()
1240 .unwrap_if_sufficient()
1241 .unwrap();
1242
1243 let (ret_tx, ret_rx) = tor_async_utils::oneshot::channel();
1244 runtime.spawn_identified("launch_hs_unamanged", async move {
1245 ret_tx
1246 .send(
1247 circmgr
1248 .launch_hs_unmanaged::<OwnedChanTarget>(
1249 None,
1250 &netdir,
1251 HsCircStemKind::Naive,
1252 None,
1253 )
1254 .await,
1255 )
1256 .unwrap();
1257 });
1258 runtime.advance_by(Duration::from_millis(60)).await;
1259 ret_rx.await.unwrap().unwrap();
1260 });
1261 }
1262}