1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![doc = include_str!("../README.md")]
3#![allow(renamed_and_removed_lints)] #![allow(unknown_lints)] #![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_time_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] #![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] #![allow(clippy::result_large_err)] #![allow(clippy::needless_raw_string_hashes)] #![allow(clippy::needless_lifetimes)] #![allow(mismatched_lifetime_syntaxes)] #![allow(clippy::collapsible_if)] #![deny(clippy::unused_async)]
47#![allow(clippy::single_component_path_imports)]
53
54mod bootstrap;
55pub mod config;
56mod docid;
57mod docmeta;
58mod err;
59mod event;
60mod shared_ref;
61mod state;
62mod storage;
63
64#[cfg(feature = "bridge-client")]
65pub mod bridgedesc;
66#[cfg(feature = "dirfilter")]
67pub mod filter;
68
69use crate::docid::{CacheUsage, ClientRequest, DocQuery};
70use crate::err::BootstrapAction;
71#[cfg(not(feature = "experimental-api"))]
72use crate::shared_ref::SharedMutArc;
73#[cfg(feature = "experimental-api")]
74pub use crate::shared_ref::SharedMutArc;
75use crate::storage::{DynStore, Store};
76use bootstrap::AttemptId;
77use event::DirProgress;
78use postage::watch;
79use scopeguard::ScopeGuard;
80use tor_circmgr::CircMgr;
81use tor_dirclient::SourceInfo;
82use tor_dircommon::config::DirTolerance;
83use tor_error::{info_report, into_internal, warn_report};
84use tor_netdir::params::NetParameters;
85use tor_netdir::{DirEvent, MdReceiver, NetDir, NetDirProvider};
86
87use async_trait::async_trait;
88use futures::stream::BoxStream;
89use oneshot_fused_workaround as oneshot;
90use tor_netdoc::doc::netstatus::ProtoStatuses;
91use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
92use tor_rtcompat::{Runtime, SpawnExt};
93use tracing::{debug, info, instrument, trace, warn};
94use web_time_compat::SystemTimeExt;
95
96use std::marker::PhantomData;
97use std::sync::atomic::{AtomicBool, Ordering};
98use std::sync::{Arc, Mutex};
99use std::time::Duration;
100use std::{collections::HashMap, sync::Weak};
101use std::{fmt::Debug, time::SystemTime};
102
103use crate::state::{DirState, NetDirChange};
104pub use config::DirMgrConfig;
105pub use docid::DocId;
106pub use err::Error;
107pub use event::{DirBlockage, DirBootstrapEvents, DirBootstrapStatus};
108pub use storage::DocumentText;
109pub use tor_dircommon::fallback::{FallbackDir, FallbackDirBuilder};
110pub use tor_netdir::Timeliness;
111
112use strum;
114
115pub type Result<T> = std::result::Result<T, Error>;
117
118#[derive(Clone)]
125pub struct DirMgrStore<R: Runtime> {
126 pub(crate) store: Arc<Mutex<crate::DynStore>>,
128
129 pub(crate) runtime: PhantomData<R>,
131}
132
133impl<R: Runtime> DirMgrStore<R> {
134 pub fn new(config: &DirMgrConfig, runtime: R, offline: bool) -> Result<Self> {
136 let store = Arc::new(Mutex::new(config.open_store(offline)?));
137 drop(runtime);
138 let runtime = PhantomData;
139 Ok(DirMgrStore { store, runtime })
140 }
141}
142
143#[async_trait]
145pub trait DirProvider: NetDirProvider {
146 fn reconfigure(
150 &self,
151 new_config: &DirMgrConfig,
152 how: tor_config::Reconfigure,
153 ) -> std::result::Result<(), tor_config::ReconfigureError>;
154
155 async fn bootstrap(&self) -> Result<()>;
157
158 fn bootstrap_events(&self) -> BoxStream<'static, DirBootstrapStatus>;
164
165 fn download_task_handle(&self) -> Option<TaskHandle> {
167 None
168 }
169}
170
171impl<R: Runtime> NetDirProvider for DirMgr<R> {
174 fn netdir(&self, timeliness: Timeliness) -> tor_netdir::Result<Arc<NetDir>> {
175 use tor_netdir::Error as NetDirError;
176 let netdir = self.netdir.get().ok_or(NetDirError::NoInfo)?;
177 let lifetime = match timeliness {
178 Timeliness::Strict => netdir.lifetime().clone(),
179 Timeliness::Timely => self
180 .config
181 .get()
182 .tolerance
183 .extend_lifetime(netdir.lifetime()),
184 Timeliness::Unchecked => return Ok(netdir),
185 };
186 let now = SystemTime::get();
188 if lifetime.valid_after() > now {
189 Err(NetDirError::DirNotYetValid)
190 } else if lifetime.valid_until() < now {
191 Err(NetDirError::DirExpired)
192 } else {
193 Ok(netdir)
194 }
195 }
196
197 fn events(&self) -> BoxStream<'static, DirEvent> {
198 Box::pin(self.events.subscribe())
199 }
200
201 fn params(&self) -> Arc<dyn AsRef<tor_netdir::params::NetParameters>> {
202 if let Some(netdir) = self.netdir.get() {
203 netdir
209 } else {
210 self.default_parameters
214 .lock()
215 .expect("Poisoned lock")
216 .clone()
217 }
218 }
224
225 fn protocol_statuses(&self) -> Option<(SystemTime, Arc<ProtoStatuses>)> {
226 self.protocols.lock().expect("Poisoned lock").clone()
227 }
228}
229
230#[async_trait]
231impl<R: Runtime> DirProvider for Arc<DirMgr<R>> {
232 fn reconfigure(
233 &self,
234 new_config: &DirMgrConfig,
235 how: tor_config::Reconfigure,
236 ) -> std::result::Result<(), tor_config::ReconfigureError> {
237 DirMgr::reconfigure(self, new_config, how)
238 }
239
240 #[instrument(level = "trace", skip_all)]
241 async fn bootstrap(&self) -> Result<()> {
242 DirMgr::bootstrap(self).await
243 }
244
245 fn bootstrap_events(&self) -> BoxStream<'static, DirBootstrapStatus> {
246 Box::pin(DirMgr::bootstrap_events(self))
247 }
248
249 fn download_task_handle(&self) -> Option<TaskHandle> {
250 Some(self.task_handle.clone())
251 }
252}
253
254pub struct DirMgr<R: Runtime> {
268 config: tor_config::MutCfg<DirMgrConfig>,
271 store: Arc<Mutex<DynStore>>,
276 netdir: Arc<SharedMutArc<NetDir>>,
283
284 protocols: Mutex<Option<(SystemTime, Arc<ProtoStatuses>)>>,
286
287 default_parameters: Mutex<Arc<NetParameters>>,
289
290 events: event::FlagPublisher<DirEvent>,
292
293 send_status: Mutex<watch::Sender<event::DirBootstrapStatus>>,
296
297 receive_status: DirBootstrapEvents,
303
304 circmgr: Option<Arc<CircMgr<R>>>,
306
307 runtime: R,
309
310 offline: bool,
312
313 bootstrap_started: AtomicBool,
320
321 #[cfg(feature = "dirfilter")]
323 filter: crate::filter::FilterConfig,
324
325 task_schedule: Mutex<Option<TaskSchedule<R>>>,
328
329 task_handle: TaskHandle,
331}
332
333#[derive(Debug, Clone)]
338#[non_exhaustive]
339pub enum DocSource {
340 LocalCache,
342 DirServer {
344 source: Option<SourceInfo>,
346 },
347}
348
349impl std::fmt::Display for DocSource {
350 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
351 match self {
352 DocSource::LocalCache => write!(f, "local cache"),
353 DocSource::DirServer { source: None } => write!(f, "directory server"),
354 DocSource::DirServer { source: Some(info) } => write!(f, "directory server {}", info),
355 }
356 }
357}
358
359impl<R: Runtime> DirMgr<R> {
360 pub fn load_once(runtime: R, config: DirMgrConfig) -> Result<Arc<NetDir>> {
370 let store = DirMgrStore::new(&config, runtime.clone(), true)?;
371 let dirmgr = Arc::new(Self::from_config(config, runtime, store, None, true)?);
372
373 let attempt = AttemptId::next();
375 trace!(%attempt, "Trying to load a full directory from cache");
376 let outcome = dirmgr.load_directory(attempt);
377 trace!(%attempt, "Load result: {outcome:?}");
378 let _success = outcome?;
379
380 dirmgr
381 .netdir(Timeliness::Timely)
382 .map_err(|_| Error::DirectoryNotPresent)
383 }
384
385 pub async fn load_or_bootstrap_once(
395 config: DirMgrConfig,
396 runtime: R,
397 store: DirMgrStore<R>,
398 circmgr: Arc<CircMgr<R>>,
399 ) -> Result<Arc<NetDir>> {
400 let dirmgr = DirMgr::bootstrap_from_config(config, runtime, store, circmgr).await?;
401 dirmgr
402 .timely_netdir()
403 .map_err(|_| Error::DirectoryNotPresent)
404 }
405
406 pub fn create_unbootstrapped(
410 config: DirMgrConfig,
411 runtime: R,
412 store: DirMgrStore<R>,
413 circmgr: Arc<CircMgr<R>>,
414 ) -> Result<Arc<Self>> {
415 Ok(Arc::new(DirMgr::from_config(
416 config,
417 runtime,
418 store,
419 Some(circmgr),
420 false,
421 )?))
422 }
423
424 #[allow(clippy::cognitive_complexity)] #[instrument(level = "trace", skip_all)]
446 pub async fn bootstrap(self: &Arc<Self>) -> Result<()> {
447 if self.offline {
448 return Err(Error::OfflineMode);
449 }
450
451 if self
458 .bootstrap_started
459 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
460 .is_err()
461 {
462 debug!("Attempted to bootstrap twice; ignoring.");
463 return Ok(());
464 }
465
466 let reset_bootstrap_started = scopeguard::guard(&self.bootstrap_started, |v| {
469 v.store(false, Ordering::SeqCst);
470 });
471
472 let schedule = {
473 let sched = self.task_schedule.lock().expect("poisoned lock").take();
474 match sched {
475 Some(sched) => sched,
476 None => {
477 debug!("Attempted to bootstrap twice; ignoring.");
478 return Ok(());
479 }
480 }
481 };
482
483 let attempt_id = AttemptId::next();
485 trace!(attempt=%attempt_id, "Starting to bootstrap directory");
486 let have_directory = self.load_directory(attempt_id)?;
487
488 let (mut sender, receiver) = if have_directory {
489 info!("Loaded a good directory from cache.");
490 (None, None)
491 } else {
492 info!("Didn't get usable directory from cache.");
493 let (sender, receiver) = oneshot::channel();
494 (Some(sender), Some(receiver))
495 };
496
497 let dirmgr_weak = Arc::downgrade(self);
499 self.runtime
500 .spawn(async move {
501 let mut schedule = scopeguard::guard(schedule, |schedule| {
508 if let Some(dm) = Weak::upgrade(&dirmgr_weak) {
509 *dm.task_schedule.lock().expect("poisoned lock") = Some(schedule);
510 }
511 });
512
513 if let Err(e) =
516 Self::reload_until_owner(&dirmgr_weak, &mut schedule, attempt_id, &mut sender)
517 .await
518 {
519 match e {
520 Error::ManagerDropped => {}
521 _ => warn_report!(e, "Unrecovered error while waiting for bootstrap",),
522 }
523 } else if let Err(e) =
524 Self::download_forever(dirmgr_weak.clone(), &mut schedule, attempt_id, sender)
525 .await
526 {
527 match e {
528 Error::ManagerDropped => {}
529 _ => warn_report!(e, "Unrecovered error while downloading"),
530 }
531 }
532 })
533 .map_err(|e| Error::from_spawn("directory updater task", e))?;
534
535 if let Some(receiver) = receiver {
536 match receiver.await {
537 Ok(()) => {
538 info!("We have enough information to build circuits.");
539 let _ = ScopeGuard::into_inner(reset_bootstrap_started);
541 }
542 Err(_) => {
543 warn!("Bootstrapping task exited before finishing.");
544 return Err(Error::CantAdvanceState);
545 }
546 }
547 }
548 Ok(())
549 }
550
551 pub fn bootstrap_started(&self) -> bool {
553 self.bootstrap_started.load(Ordering::SeqCst)
554 }
555
556 #[instrument(level = "trace", skip_all)]
559 pub async fn bootstrap_from_config(
560 config: DirMgrConfig,
561 runtime: R,
562 store: DirMgrStore<R>,
563 circmgr: Arc<CircMgr<R>>,
564 ) -> Result<Arc<Self>> {
565 let dirmgr = Self::create_unbootstrapped(config, runtime, store, circmgr)?;
566
567 dirmgr.bootstrap().await?;
568
569 Ok(dirmgr)
570 }
571
572 #[allow(clippy::cognitive_complexity)] async fn reload_until_owner(
581 weak: &Weak<Self>,
582 schedule: &mut TaskSchedule<R>,
583 attempt_id: AttemptId,
584 on_complete: &mut Option<oneshot::Sender<()>>,
585 ) -> Result<()> {
586 let mut logged = false;
587 let mut bootstrapped;
588 {
589 let dirmgr = upgrade_weak_ref(weak)?;
590 bootstrapped = dirmgr.netdir.get().is_some();
591 }
592
593 loop {
594 {
595 let dirmgr = upgrade_weak_ref(weak)?;
596 trace!("Trying to take ownership of the directory cache lock");
597 if dirmgr.try_upgrade_to_readwrite()? {
598 if logged {
602 info!(
603 "The previous owning process has given up the lock. We are now in charge of managing the directory."
604 );
605 }
606 return Ok(());
607 }
608 }
609
610 if !logged {
611 logged = true;
612 if bootstrapped {
613 info!("Another process is managing the directory. We'll use its cache.");
614 } else {
615 info!(
616 "Another process is bootstrapping the directory. Waiting till it finishes or exits."
617 );
618 }
619 }
620
621 let pause = if bootstrapped {
624 std::time::Duration::new(120, 0)
625 } else {
626 std::time::Duration::new(5, 0)
627 };
628 schedule.sleep(pause).await?;
629 {
633 let dirmgr = upgrade_weak_ref(weak)?;
634 trace!("Trying to load from the directory cache");
635 if dirmgr.load_directory(attempt_id)? {
636 if let Some(send_done) = on_complete.take() {
638 let _ = send_done.send(());
639 }
640 if !bootstrapped {
641 info!("The directory is now bootstrapped.");
642 }
643 bootstrapped = true;
644 }
645 }
646 }
647 }
648
649 #[allow(clippy::cognitive_complexity)] #[instrument(level = "trace", skip_all)]
655 async fn download_forever(
656 weak: Weak<Self>,
657 schedule: &mut TaskSchedule<R>,
658 mut attempt_id: AttemptId,
659 mut on_complete: Option<oneshot::Sender<()>>,
660 ) -> Result<()> {
661 let mut state: Box<dyn DirState> = {
662 let dirmgr = upgrade_weak_ref(&weak)?;
663 Box::new(state::GetConsensusState::new(
664 dirmgr.runtime.clone(),
665 dirmgr.config.get(),
666 CacheUsage::CacheOkay,
667 Some(dirmgr.netdir.clone()),
668 #[cfg(feature = "dirfilter")]
669 dirmgr
670 .filter
671 .clone()
672 .unwrap_or_else(|| Arc::new(crate::filter::NilFilter)),
673 ))
674 };
675
676 trace!("Entering download loop.");
677
678 loop {
679 let mut usable = false;
680
681 let retry_config = {
682 let dirmgr = upgrade_weak_ref(&weak)?;
683 dirmgr.config.get().schedule.retry_bootstrap()
687 };
688 let mut retry_delay = retry_config.schedule();
689
690 'retry_attempt: for try_num in retry_config.attempts() {
691 trace!(attempt=%attempt_id, ?try_num, "Trying to download a directory.");
692 let outcome = bootstrap::download(
693 Weak::clone(&weak),
694 &mut state,
695 schedule,
696 attempt_id,
697 &mut on_complete,
698 )
699 .await;
700 trace!(attempt=%attempt_id, ?try_num, ?outcome, "Download is over.");
701
702 if let Err(err) = outcome {
703 if state.is_ready(Readiness::Usable) {
704 usable = true;
705 info_report!(
706 err,
707 "Unable to completely download a directory. (Nevertheless, the directory is usable, so we'll pause for now)"
708 );
709 break 'retry_attempt;
710 }
711
712 match err.bootstrap_action() {
713 BootstrapAction::Nonfatal => {
714 return Err(into_internal!(
715 "Nonfatal error should not have propagated here"
716 )(err)
717 .into());
718 }
719 BootstrapAction::Reset => {}
720 BootstrapAction::Fatal => return Err(err),
721 }
722
723 let delay = retry_delay.next_delay(&mut rand::rng());
724 warn_report!(
725 err,
726 "Unable to download a usable directory. (We will restart in {})",
727 humantime::format_duration(delay),
728 );
729 {
730 let dirmgr = upgrade_weak_ref(&weak)?;
731 dirmgr.note_reset(attempt_id);
732 }
733 schedule.sleep(delay).await?;
734 state = state.reset();
735 } else {
736 info!(attempt=%attempt_id, "Directory is complete.");
737 usable = true;
738 break 'retry_attempt;
739 }
740 }
741
742 if !usable {
743 warn!(
745 "We failed {} times to bootstrap a directory. We're going to give up.",
746 retry_config.n_attempts()
747 );
748 return Err(Error::CantAdvanceState);
749 } else {
750 if let Some(send_done) = on_complete.take() {
752 let _ = send_done.send(());
753 }
754 }
755
756 let reset_at = state.reset_time();
757 match reset_at {
758 Some(t) => {
759 trace!("Sleeping until {}", time::OffsetDateTime::from(t));
760 schedule.sleep_until_wallclock(t).await?;
761 }
762 None => return Ok(()),
763 }
764 attempt_id = bootstrap::AttemptId::next();
765 trace!(attempt=%attempt_id, "Beginning new attempt to bootstrap directory");
766 state = state.reset();
767 }
768 }
769
770 fn circmgr(&self) -> Result<Arc<CircMgr<R>>> {
772 self.circmgr.clone().ok_or(Error::NoDownloadSupport)
773 }
774
775 pub fn reconfigure(
779 &self,
780 new_config: &DirMgrConfig,
781 how: tor_config::Reconfigure,
782 ) -> std::result::Result<(), tor_config::ReconfigureError> {
783 let config = self.config.get();
784 if new_config.cache_dir != config.cache_dir {
789 how.cannot_change("storage.cache_dir")?;
790 }
791 if new_config.cache_trust != config.cache_trust {
792 how.cannot_change("storage.permissions")?;
793 }
794 if new_config.authorities() != config.authorities() {
795 how.cannot_change("network.authorities")?;
796 }
797
798 if how == tor_config::Reconfigure::CheckAllOrNothing {
799 return Ok(());
800 }
801
802 let params_changed = new_config.override_net_params != config.override_net_params;
803
804 self.config
805 .map_and_replace(|cfg| cfg.update_from_config(new_config));
806
807 if params_changed {
808 let _ignore_err = self.netdir.mutate(|netdir| {
809 netdir.replace_overridden_parameters(&new_config.override_net_params);
810 Ok(())
811 });
812 {
813 let mut params = self.default_parameters.lock().expect("lock failed");
814 *params = Arc::new(NetParameters::from_map(&new_config.override_net_params));
815 }
816
817 self.events.publish(DirEvent::NewConsensus);
820 }
821
822 Ok(())
823 }
824
825 pub fn bootstrap_events(&self) -> event::DirBootstrapEvents {
831 self.receive_status.clone()
832 }
833
834 fn update_progress(&self, attempt_id: AttemptId, progress: DirProgress) {
837 let mut sender = self.send_status.lock().expect("poisoned lock");
839 let mut status = sender.borrow_mut();
840
841 status.update_progress(attempt_id, progress);
842 }
843
844 fn note_errors(&self, attempt_id: AttemptId, n_errors: usize) {
847 if n_errors == 0 {
848 return;
849 }
850 let mut sender = self.send_status.lock().expect("poisoned lock");
851 let mut status = sender.borrow_mut();
852
853 status.note_errors(attempt_id, n_errors);
854 }
855
856 fn note_reset(&self, attempt_id: AttemptId) {
858 let mut sender = self.send_status.lock().expect("poisoned lock");
859 let mut status = sender.borrow_mut();
860
861 status.note_reset(attempt_id);
862 }
863
864 fn try_upgrade_to_readwrite(&self) -> Result<bool> {
871 self.store
872 .lock()
873 .expect("Directory storage lock poisoned")
874 .upgrade_to_readwrite()
875 }
876
877 #[cfg(test)]
879 fn store_if_rw(&self) -> Option<&Mutex<DynStore>> {
880 let rw = !self
881 .store
882 .lock()
883 .expect("Directory storage lock poisoned")
884 .is_readonly();
885 if rw { Some(&self.store) } else { None }
887 }
888
889 #[allow(clippy::unnecessary_wraps)] fn from_config(
895 config: DirMgrConfig,
896 runtime: R,
897 store: DirMgrStore<R>,
898 circmgr: Option<Arc<CircMgr<R>>>,
899 offline: bool,
900 ) -> Result<Self> {
901 let netdir = Arc::new(SharedMutArc::new());
902 let events = event::FlagPublisher::new();
903 let default_parameters = NetParameters::from_map(&config.override_net_params);
904 let default_parameters = Mutex::new(Arc::new(default_parameters));
905
906 let (send_status, receive_status) = postage::watch::channel();
907 let send_status = Mutex::new(send_status);
908 let receive_status = DirBootstrapEvents {
909 inner: receive_status,
910 };
911 #[cfg(feature = "dirfilter")]
912 let filter = config.extensions.filter.clone();
913
914 let (task_schedule, task_handle) = TaskSchedule::new(runtime.clone());
916 let task_schedule = Mutex::new(Some(task_schedule));
917
918 let protocols = {
921 let store = store.store.lock().expect("lock poisoned");
922 store
923 .cached_protocol_recommendations()?
924 .map(|(t, p)| (t, Arc::new(p)))
925 };
926
927 Ok(DirMgr {
928 config: config.into(),
929 store: store.store,
930 netdir,
931 protocols: Mutex::new(protocols),
932 default_parameters,
933 events,
934 send_status,
935 receive_status,
936 circmgr,
937 runtime,
938 offline,
939 bootstrap_started: AtomicBool::new(false),
940 #[cfg(feature = "dirfilter")]
941 filter,
942 task_schedule,
943 task_handle,
944 })
945 }
946
947 fn load_directory(self: &Arc<Self>, attempt_id: AttemptId) -> Result<bool> {
952 let state = state::GetConsensusState::new(
953 self.runtime.clone(),
954 self.config.get(),
955 CacheUsage::CacheOnly,
956 None,
957 #[cfg(feature = "dirfilter")]
958 self.filter
959 .clone()
960 .unwrap_or_else(|| Arc::new(crate::filter::NilFilter)),
961 );
962 let _ = bootstrap::load(self, Box::new(state), attempt_id)?;
963
964 Ok(self.netdir.get().is_some())
965 }
966
967 pub fn events(&self) -> impl futures::Stream<Item = DirEvent> + use<R> {
974 self.events.subscribe()
975 }
976
977 pub fn text(&self, doc: &DocId) -> Result<Option<DocumentText>> {
980 use itertools::Itertools;
981 let mut result = HashMap::new();
982 let query: DocQuery = (*doc).into();
983 let store = self.store.lock().expect("store lock poisoned");
984 query.load_from_store_into(&mut result, &**store)?;
985 let item = result.into_iter().at_most_one().map_err(|_| {
986 Error::CacheCorruption("Found more than one entry in storage for given docid")
987 })?;
988 if let Some((docid, doctext)) = item {
989 if &docid != doc {
990 return Err(Error::CacheCorruption(
991 "Item from storage had incorrect docid.",
992 ));
993 }
994 Ok(Some(doctext))
995 } else {
996 Ok(None)
997 }
998 }
999
1000 pub fn texts<T>(&self, docs: T) -> Result<HashMap<DocId, DocumentText>>
1005 where
1006 T: IntoIterator<Item = DocId>,
1007 {
1008 let partitioned = docid::partition_by_type(docs);
1009 let mut result = HashMap::new();
1010 let store = self.store.lock().expect("store lock poisoned");
1011 for (_, query) in partitioned.into_iter() {
1012 query.load_from_store_into(&mut result, &**store)?;
1013 }
1014 Ok(result)
1015 }
1016
1017 fn expand_response_text(&self, req: &ClientRequest, text: String) -> Result<String> {
1025 if let ClientRequest::Consensus(req) = req {
1026 if tor_consdiff::looks_like_diff(&text) {
1027 if let Some(old_d) = req.old_consensus_digests().next() {
1028 let db_val = {
1029 let s = self.store.lock().expect("Directory storage lock poisoned");
1030 s.consensus_by_sha3_digest_of_signed_part(old_d)?
1031 };
1032 if let Some((old_consensus, meta)) = db_val {
1033 info!("Applying a consensus diff");
1034 let new_consensus = tor_consdiff::apply_diff(
1035 old_consensus.as_str()?,
1036 &text,
1037 Some(*meta.sha3_256_of_signed()),
1038 )?;
1039 new_consensus.check_digest()?;
1040 return Ok(new_consensus.to_string());
1041 }
1042 }
1043 return Err(Error::Unwanted(
1044 "Received a consensus diff we did not ask for",
1045 ));
1046 }
1047 }
1048 Ok(text)
1049 }
1050
1051 #[allow(clippy::cognitive_complexity)]
1053 fn apply_netdir_changes(
1054 self: &Arc<Self>,
1055 state: &mut Box<dyn DirState>,
1056 store: &mut dyn Store,
1057 ) -> Result<()> {
1058 if let Some(change) = state.get_netdir_change() {
1059 match change {
1060 NetDirChange::AttemptReplace {
1061 netdir,
1062 consensus_meta,
1063 } => {
1064 if let Some(ref cm) = self.circmgr {
1067 if !cm
1068 .netdir_is_sufficient(netdir.as_ref().expect("AttemptReplace had None"))
1069 {
1070 debug!("Got a new NetDir, but it doesn't have enough guards yet.");
1071 return Ok(());
1072 }
1073 }
1074 let is_stale = {
1075 self.netdir
1077 .get()
1078 .map(|x| {
1079 x.lifetime().valid_after()
1080 > netdir
1081 .as_ref()
1082 .expect("AttemptReplace had None")
1083 .lifetime()
1084 .valid_after()
1085 })
1086 .unwrap_or(false)
1087 };
1088 if is_stale {
1089 warn!("Got a new NetDir, but it's older than the one we currently have!");
1090 return Err(Error::NetDirOlder);
1091 }
1092 let cfg = self.config.get();
1093 let mut netdir = netdir.take().expect("AttemptReplace had None");
1094 netdir.replace_overridden_parameters(&cfg.override_net_params);
1095 self.netdir.replace(netdir);
1096 self.events.publish(DirEvent::NewConsensus);
1097 self.events.publish(DirEvent::NewDescriptors);
1098
1099 info!("Marked consensus usable.");
1100 if !store.is_readonly() {
1101 store.mark_consensus_usable(consensus_meta)?;
1102 store.expire_all(&crate::storage::EXPIRATION_DEFAULTS)?;
1105 }
1106 Ok(())
1107 }
1108 NetDirChange::AddMicrodescs(mds) => {
1109 self.netdir.mutate(|netdir| {
1110 for md in mds.drain(..) {
1111 netdir.add_microdesc(md);
1112 }
1113 Ok(())
1114 })?;
1115 self.events.publish(DirEvent::NewDescriptors);
1116 Ok(())
1117 }
1118 NetDirChange::SetRequiredProtocol { timestamp, protos } => {
1119 if !store.is_readonly() {
1120 store.update_protocol_recommendations(timestamp, protos.as_ref())?;
1121 }
1122 let mut pr = self.protocols.lock().expect("Poisoned lock");
1123 *pr = Some((timestamp, protos));
1124 self.events.publish(DirEvent::NewProtocolRecommendation);
1125 Ok(())
1126 }
1127 }
1128 } else {
1129 Ok(())
1130 }
1131 }
1132}
1133
1134#[derive(Debug, Copy, Clone)]
1136enum Readiness {
1137 Complete,
1139 Usable,
1141}
1142
1143fn upgrade_weak_ref<T>(weak: &Weak<T>) -> Result<Arc<T>> {
1146 Weak::upgrade(weak).ok_or(Error::ManagerDropped)
1147}
1148
1149pub(crate) fn default_consensus_cutoff(
1152 now: SystemTime,
1153 tolerance: &DirTolerance,
1154) -> Result<SystemTime> {
1155 const MIN_AGE_TO_ALLOW: Duration = Duration::from_secs(3 * 3600);
1158 let allow_skew = std::cmp::max(MIN_AGE_TO_ALLOW, tolerance.post_valid_tolerance());
1159 let cutoff = time::OffsetDateTime::from(now - allow_skew);
1160 let (h, _m, _s) = cutoff.to_hms();
1167 let cutoff = cutoff.replace_time(
1168 time::Time::from_hms(h, 0, 0)
1169 .map_err(tor_error::into_internal!("Failed clock calculation"))?,
1170 );
1171 let cutoff = cutoff + Duration::from_secs(3600);
1172
1173 Ok(cutoff.into())
1174}
1175
1176pub fn supported_client_protocols() -> tor_protover::Protocols {
1179 use tor_protover::named::*;
1180 [
1183 DIRCACHE_CONSDIFF,
1185 ]
1186 .into_iter()
1187 .collect()
1188}
1189
1190#[cfg(test)]
1191mod test {
1192 #![allow(clippy::bool_assert_comparison)]
1194 #![allow(clippy::clone_on_copy)]
1195 #![allow(clippy::dbg_macro)]
1196 #![allow(clippy::mixed_attributes_style)]
1197 #![allow(clippy::print_stderr)]
1198 #![allow(clippy::print_stdout)]
1199 #![allow(clippy::single_char_pattern)]
1200 #![allow(clippy::unwrap_used)]
1201 #![allow(clippy::unchecked_time_subtraction)]
1202 #![allow(clippy::useless_vec)]
1203 #![allow(clippy::needless_pass_by_value)]
1204 use super::*;
1206 use crate::docmeta::{AuthCertMeta, ConsensusMeta};
1207 use std::time::Duration;
1208 use tempfile::TempDir;
1209 use tor_basic_utils::test_rng::testing_rng;
1210 use tor_netdoc::doc::netstatus::ConsensusFlavor;
1211 use tor_netdoc::doc::{authcert::AuthCertKeyIds, netstatus::Lifetime};
1212 use tor_rtcompat::SleepProvider;
1213
1214 #[test]
1215 fn protocols() {
1216 let pr = supported_client_protocols();
1217 let expected = "DirCache=2".parse().unwrap();
1218 assert_eq!(pr, expected);
1219 }
1220
1221 pub(crate) fn new_mgr<R: Runtime>(runtime: R) -> (TempDir, DirMgr<R>) {
1222 let dir = TempDir::new().unwrap();
1223 let config = DirMgrConfig {
1224 cache_dir: dir.path().into(),
1225 ..Default::default()
1226 };
1227 let store = DirMgrStore::new(&config, runtime.clone(), false).unwrap();
1228 let dirmgr = DirMgr::from_config(config, runtime, store, None, false).unwrap();
1229
1230 (dir, dirmgr)
1231 }
1232
1233 #[test]
1234 fn failing_accessors() {
1235 tor_rtcompat::test_with_one_runtime!(|rt| async {
1236 let (_tempdir, mgr) = new_mgr(rt);
1237
1238 assert!(mgr.circmgr().is_err());
1239 assert!(mgr.netdir(Timeliness::Unchecked).is_err());
1240 });
1241 }
1242
1243 #[test]
1244 fn load_and_store_internals() {
1245 tor_rtcompat::test_with_one_runtime!(|rt| async {
1246 let now = rt.wallclock();
1247 let tomorrow = now + Duration::from_secs(86400);
1248 let later = tomorrow + Duration::from_secs(86400);
1249
1250 let (_tempdir, mgr) = new_mgr(rt);
1251
1252 let d1 = [5_u8; 32];
1254 let d2 = [7; 32];
1255 let d3 = [42; 32];
1256 let d4 = [99; 20];
1257 let d5 = [12; 20];
1258 let certid1 = AuthCertKeyIds {
1259 id_fingerprint: d4.into(),
1260 sk_fingerprint: d5.into(),
1261 };
1262 let certid2 = AuthCertKeyIds {
1263 id_fingerprint: d5.into(),
1264 sk_fingerprint: d4.into(),
1265 };
1266
1267 {
1268 let mut store = mgr.store.lock().unwrap();
1269
1270 store
1271 .store_microdescs(
1272 &[
1273 ("Fake micro 1", &d1),
1274 ("Fake micro 2", &d2),
1275 ("Fake micro 3", &d3),
1276 ],
1277 now,
1278 )
1279 .unwrap();
1280
1281 #[cfg(feature = "routerdesc")]
1282 store
1283 .store_routerdescs(&[("Fake rd1", now, &d4), ("Fake rd2", now, &d5)])
1284 .unwrap();
1285
1286 store
1287 .store_authcerts(&[
1288 (
1289 AuthCertMeta::new(certid1, now, tomorrow),
1290 "Fake certificate one",
1291 ),
1292 (
1293 AuthCertMeta::new(certid2, now, tomorrow),
1294 "Fake certificate two",
1295 ),
1296 ])
1297 .unwrap();
1298
1299 let cmeta = ConsensusMeta::new(
1300 Lifetime::new(now, tomorrow, later).unwrap(),
1301 [102; 32],
1302 [103; 32],
1303 );
1304 store
1305 .store_consensus(&cmeta, ConsensusFlavor::Microdesc, false, "Fake consensus!")
1306 .unwrap();
1307 }
1308
1309 let t1 = mgr.text(&DocId::Microdesc(d1)).unwrap().unwrap();
1311 assert_eq!(t1.as_str(), Ok("Fake micro 1"));
1312
1313 let t2 = mgr
1314 .text(&DocId::LatestConsensus {
1315 flavor: ConsensusFlavor::Microdesc,
1316 cache_usage: CacheUsage::CacheOkay,
1317 })
1318 .unwrap()
1319 .unwrap();
1320 assert_eq!(t2.as_str(), Ok("Fake consensus!"));
1321
1322 let t3 = mgr.text(&DocId::Microdesc([255; 32])).unwrap();
1323 assert!(t3.is_none());
1324
1325 let d_bogus = DocId::Microdesc([255; 32]);
1327 let res = mgr
1328 .texts(vec![
1329 DocId::Microdesc(d2),
1330 DocId::Microdesc(d3),
1331 d_bogus,
1332 DocId::AuthCert(certid2),
1333 #[cfg(feature = "routerdesc")]
1334 DocId::RouterDesc(d5),
1335 ])
1336 .unwrap();
1337 assert_eq!(
1338 res.get(&DocId::Microdesc(d2)).unwrap().as_str(),
1339 Ok("Fake micro 2")
1340 );
1341 assert_eq!(
1342 res.get(&DocId::Microdesc(d3)).unwrap().as_str(),
1343 Ok("Fake micro 3")
1344 );
1345 assert!(!res.contains_key(&d_bogus));
1346 assert_eq!(
1347 res.get(&DocId::AuthCert(certid2)).unwrap().as_str(),
1348 Ok("Fake certificate two")
1349 );
1350 #[cfg(feature = "routerdesc")]
1351 assert_eq!(
1352 res.get(&DocId::RouterDesc(d5)).unwrap().as_str(),
1353 Ok("Fake rd2")
1354 );
1355 });
1356 }
1357
1358 #[test]
1359 fn make_consensus_request() {
1360 tor_rtcompat::test_with_one_runtime!(|rt| async {
1361 let now = rt.wallclock();
1362 let tomorrow = now + Duration::from_secs(86400);
1363 let later = tomorrow + Duration::from_secs(86400);
1364
1365 let (_tempdir, mgr) = new_mgr(rt);
1366 let config = DirMgrConfig::default();
1367
1368 let req = {
1370 let store = mgr.store.lock().unwrap();
1371 bootstrap::make_consensus_request(
1372 now,
1373 ConsensusFlavor::Microdesc,
1374 &**store,
1375 &config,
1376 )
1377 .unwrap()
1378 };
1379 let tolerance = DirTolerance::default().post_valid_tolerance();
1380 match req {
1381 ClientRequest::Consensus(r) => {
1382 assert_eq!(r.old_consensus_digests().count(), 0);
1383 let date = r.last_consensus_date().unwrap();
1384 assert!(date >= now - tolerance);
1385 assert!(date <= now - tolerance + Duration::from_secs(3600));
1386 }
1387 _ => panic!("Wrong request type"),
1388 }
1389
1390 let d_prev = [42; 32];
1392 {
1393 let mut store = mgr.store.lock().unwrap();
1394
1395 let cmeta = ConsensusMeta::new(
1396 Lifetime::new(now, tomorrow, later).unwrap(),
1397 d_prev,
1398 [103; 32],
1399 );
1400 store
1401 .store_consensus(&cmeta, ConsensusFlavor::Microdesc, false, "Fake consensus!")
1402 .unwrap();
1403 }
1404
1405 let req = {
1407 let store = mgr.store.lock().unwrap();
1408 bootstrap::make_consensus_request(
1409 now,
1410 ConsensusFlavor::Microdesc,
1411 &**store,
1412 &config,
1413 )
1414 .unwrap()
1415 };
1416 match req {
1417 ClientRequest::Consensus(r) => {
1418 let ds: Vec<_> = r.old_consensus_digests().collect();
1419 assert_eq!(ds.len(), 1);
1420 assert_eq!(ds[0], &d_prev);
1421 assert_eq!(r.last_consensus_date(), Some(now));
1422 }
1423 _ => panic!("Wrong request type"),
1424 }
1425 });
1426 }
1427
1428 #[test]
1429 fn make_other_requests() {
1430 tor_rtcompat::test_with_one_runtime!(|rt| async {
1431 use rand::Rng;
1432 let (_tempdir, mgr) = new_mgr(rt);
1433
1434 let certid1 = AuthCertKeyIds {
1435 id_fingerprint: [99; 20].into(),
1436 sk_fingerprint: [100; 20].into(),
1437 };
1438 let mut rng = testing_rng();
1439 #[cfg(feature = "routerdesc")]
1440 let rd_ids: Vec<DocId> = (0..1000).map(|_| DocId::RouterDesc(rng.random())).collect();
1441 let md_ids: Vec<DocId> = (0..1000).map(|_| DocId::Microdesc(rng.random())).collect();
1442 let config = DirMgrConfig::default();
1443
1444 let query = DocId::AuthCert(certid1);
1446 let store = mgr.store.lock().unwrap();
1447 let reqs =
1448 bootstrap::make_requests_for_documents(&mgr.runtime, &[query], &**store, &config)
1449 .unwrap();
1450 assert_eq!(reqs.len(), 1);
1451 let req = &reqs[0];
1452 if let ClientRequest::AuthCert(r) = req {
1453 assert_eq!(r.keys().next(), Some(&certid1));
1454 } else {
1455 panic!();
1456 }
1457
1458 let reqs =
1460 bootstrap::make_requests_for_documents(&mgr.runtime, &md_ids, &**store, &config)
1461 .unwrap();
1462 assert_eq!(reqs.len(), 2);
1463 assert!(matches!(reqs[0], ClientRequest::Microdescs(_)));
1464
1465 #[cfg(feature = "routerdesc")]
1467 {
1468 let reqs = bootstrap::make_requests_for_documents(
1469 &mgr.runtime,
1470 &rd_ids,
1471 &**store,
1472 &config,
1473 )
1474 .unwrap();
1475 assert_eq!(reqs.len(), 2);
1476 assert!(matches!(reqs[0], ClientRequest::RouterDescs(_)));
1477 }
1478 });
1479 }
1480
1481 #[test]
1482 fn expand_response() {
1483 tor_rtcompat::test_with_one_runtime!(|rt| async {
1484 let now = rt.wallclock();
1485 let day = Duration::from_secs(86400);
1486 let config = DirMgrConfig::default();
1487
1488 let (_tempdir, mgr) = new_mgr(rt);
1489
1490 let q = DocId::Microdesc([99; 32]);
1492 let r = {
1493 let store = mgr.store.lock().unwrap();
1494 bootstrap::make_requests_for_documents(&mgr.runtime, &[q], &**store, &config)
1495 .unwrap()
1496 };
1497 let expanded = mgr.expand_response_text(&r[0], "ABC".to_string());
1498 assert_eq!(&expanded.unwrap(), "ABC");
1499
1500 let latest_id = DocId::LatestConsensus {
1503 flavor: ConsensusFlavor::Microdesc,
1504 cache_usage: CacheUsage::CacheOkay,
1505 };
1506 let r = {
1507 let store = mgr.store.lock().unwrap();
1508 bootstrap::make_requests_for_documents(
1509 &mgr.runtime,
1510 &[latest_id],
1511 &**store,
1512 &config,
1513 )
1514 .unwrap()
1515 };
1516 let expanded = mgr.expand_response_text(&r[0], "DEF".to_string());
1517 assert_eq!(&expanded.unwrap(), "DEF");
1518
1519 {
1522 let mut store = mgr.store.lock().unwrap();
1523 let d_in = [0x99; 32]; let cmeta = ConsensusMeta::new(
1525 Lifetime::new(now, now + day, now + 2 * day).unwrap(),
1526 d_in,
1527 d_in,
1528 );
1529 store
1530 .store_consensus(
1531 &cmeta,
1532 ConsensusFlavor::Microdesc,
1533 false,
1534 "line 1\nline2\nline 3\n",
1535 )
1536 .unwrap();
1537 }
1538
1539 let r = {
1542 let store = mgr.store.lock().unwrap();
1543 bootstrap::make_requests_for_documents(
1544 &mgr.runtime,
1545 &[latest_id],
1546 &**store,
1547 &config,
1548 )
1549 .unwrap()
1550 };
1551 let expanded = mgr.expand_response_text(&r[0], "hello".to_string());
1552 assert_eq!(&expanded.unwrap(), "hello");
1553
1554 let diff = "network-status-diff-version 1
1556hash 9999999999999999999999999999999999999999999999999999999999999999 8382374ca766873eb0d2530643191c6eaa2c5e04afa554cbac349b5d0592d300
15572c
1558replacement line
1559.
1560".to_string();
1561 let expanded = mgr.expand_response_text(&r[0], diff);
1562
1563 assert_eq!(expanded.unwrap(), "line 1\nreplacement line\nline 3\n");
1564
1565 let diff = "network-status-diff-version 1
1567hash 9999999999999999999999999999999999999999999999999999999999999999 9999999999999999999999999999999999999999999999999999999999999999
15682c
1569replacement line
1570.
1571".to_string();
1572 let expanded = mgr.expand_response_text(&r[0], diff);
1573 assert!(expanded.is_err());
1574 });
1575 }
1576}