1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![doc = include_str!("../README.md")]
3#![allow(renamed_and_removed_lints)] #![allow(unknown_lints)] #![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_time_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] #![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] #![allow(clippy::result_large_err)] #![allow(clippy::needless_raw_string_hashes)] #![allow(clippy::needless_lifetimes)] #![allow(mismatched_lifetime_syntaxes)] #![allow(clippy::collapsible_if)] #![deny(clippy::unused_async)]
47#![deny(clippy::string_slice)] #![allow(clippy::single_component_path_imports)]
54
55mod bootstrap;
56pub mod config;
57mod docid;
58mod docmeta;
59mod err;
60mod event;
61mod shared_ref;
62mod state;
63mod storage;
64
65#[cfg(feature = "bridge-client")]
66pub mod bridgedesc;
67#[cfg(feature = "dirfilter")]
68pub mod filter;
69
70use crate::docid::{CacheUsage, ClientRequest, DocQuery};
71use crate::err::BootstrapAction;
72#[cfg(not(feature = "experimental-api"))]
73use crate::shared_ref::SharedMutArc;
74#[cfg(feature = "experimental-api")]
75pub use crate::shared_ref::SharedMutArc;
76use crate::storage::{DynStore, Store};
77use bootstrap::AttemptId;
78use event::DirProgress;
79use postage::watch;
80use scopeguard::ScopeGuard;
81use tor_circmgr::CircMgr;
82use tor_dirclient::SourceInfo;
83use tor_dircommon::config::DirTolerance;
84use tor_error::{info_report, into_internal, warn_report};
85use tor_netdir::params::NetParameters;
86use tor_netdir::{DirEvent, MdReceiver, NetDir, NetDirProvider};
87
88use async_trait::async_trait;
89use futures::stream::BoxStream;
90use oneshot_fused_workaround as oneshot;
91use tor_netdoc::doc::netstatus::ProtoStatuses;
92use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
93use tor_rtcompat::{Runtime, SpawnExt};
94use tracing::{debug, info, instrument, trace, warn};
95use web_time_compat::SystemTimeExt;
96
97use std::marker::PhantomData;
98use std::sync::atomic::{AtomicBool, Ordering};
99use std::sync::{Arc, Mutex};
100use std::time::Duration;
101use std::{collections::HashMap, sync::Weak};
102use std::{fmt::Debug, time::SystemTime};
103
104use crate::state::{DirState, NetDirChange};
105pub use config::DirMgrConfig;
106pub use docid::DocId;
107pub use err::Error;
108pub use event::{DirBlockage, DirBootstrapEvents, DirBootstrapStatus};
109pub use storage::DocumentText;
110pub use tor_dircommon::fallback::{FallbackDir, FallbackDirBuilder};
111pub use tor_netdir::Timeliness;
112
113use strum;
115
116pub type Result<T> = std::result::Result<T, Error>;
118
119#[derive(Clone)]
126pub struct DirMgrStore<R: Runtime> {
127 pub(crate) store: Arc<Mutex<crate::DynStore>>,
129
130 pub(crate) runtime: PhantomData<R>,
132}
133
134impl<R: Runtime> DirMgrStore<R> {
135 pub fn new(config: &DirMgrConfig, runtime: R, offline: bool) -> Result<Self> {
137 let store = Arc::new(Mutex::new(config.open_store(offline)?));
138 drop(runtime);
139 let runtime = PhantomData;
140 Ok(DirMgrStore { store, runtime })
141 }
142}
143
144#[async_trait]
146pub trait DirProvider: NetDirProvider {
147 fn reconfigure(
151 &self,
152 new_config: &DirMgrConfig,
153 how: tor_config::Reconfigure,
154 ) -> std::result::Result<(), tor_config::ReconfigureError>;
155
156 async fn bootstrap(&self) -> Result<()>;
158
159 fn bootstrap_events(&self) -> BoxStream<'static, DirBootstrapStatus>;
165
166 fn download_task_handle(&self) -> Option<TaskHandle> {
168 None
169 }
170}
171
172impl<R: Runtime> NetDirProvider for DirMgr<R> {
175 fn netdir(&self, timeliness: Timeliness) -> tor_netdir::Result<Arc<NetDir>> {
176 use tor_netdir::Error as NetDirError;
177 let netdir = self.netdir.get().ok_or(NetDirError::NoInfo)?;
178 let lifetime = match timeliness {
179 Timeliness::Strict => netdir.lifetime().clone(),
180 Timeliness::Timely => self
181 .config
182 .get()
183 .tolerance
184 .extend_lifetime(netdir.lifetime()),
185 Timeliness::Unchecked => return Ok(netdir),
186 };
187 let now = SystemTime::get();
189 if lifetime.valid_after() > now {
190 Err(NetDirError::DirNotYetValid)
191 } else if lifetime.valid_until() < now {
192 Err(NetDirError::DirExpired)
193 } else {
194 Ok(netdir)
195 }
196 }
197
198 fn events(&self) -> BoxStream<'static, DirEvent> {
199 Box::pin(self.events.subscribe())
200 }
201
202 fn params(&self) -> Arc<dyn AsRef<tor_netdir::params::NetParameters>> {
203 if let Some(netdir) = self.netdir.get() {
204 netdir
210 } else {
211 self.default_parameters
215 .lock()
216 .expect("Poisoned lock")
217 .clone()
218 }
219 }
225
226 fn protocol_statuses(&self) -> Option<(SystemTime, Arc<ProtoStatuses>)> {
227 self.protocols.lock().expect("Poisoned lock").clone()
228 }
229}
230
231#[async_trait]
232impl<R: Runtime> DirProvider for Arc<DirMgr<R>> {
233 fn reconfigure(
234 &self,
235 new_config: &DirMgrConfig,
236 how: tor_config::Reconfigure,
237 ) -> std::result::Result<(), tor_config::ReconfigureError> {
238 DirMgr::reconfigure(self, new_config, how)
239 }
240
241 #[instrument(level = "trace", skip_all)]
242 async fn bootstrap(&self) -> Result<()> {
243 DirMgr::bootstrap(self).await
244 }
245
246 fn bootstrap_events(&self) -> BoxStream<'static, DirBootstrapStatus> {
247 Box::pin(DirMgr::bootstrap_events(self))
248 }
249
250 fn download_task_handle(&self) -> Option<TaskHandle> {
251 Some(self.task_handle.clone())
252 }
253}
254
255pub struct DirMgr<R: Runtime> {
269 config: tor_config::MutCfg<DirMgrConfig>,
272 store: Arc<Mutex<DynStore>>,
277 netdir: Arc<SharedMutArc<NetDir>>,
284
285 protocols: Mutex<Option<(SystemTime, Arc<ProtoStatuses>)>>,
287
288 default_parameters: Mutex<Arc<NetParameters>>,
290
291 events: event::FlagPublisher<DirEvent>,
293
294 send_status: Mutex<watch::Sender<event::DirBootstrapStatus>>,
297
298 receive_status: DirBootstrapEvents,
304
305 circmgr: Option<Arc<CircMgr<R>>>,
307
308 runtime: R,
310
311 offline: bool,
313
314 bootstrap_started: AtomicBool,
321
322 #[cfg(feature = "dirfilter")]
324 filter: crate::filter::FilterConfig,
325
326 task_schedule: Mutex<Option<TaskSchedule<R>>>,
329
330 task_handle: TaskHandle,
332}
333
334#[derive(Debug, Clone)]
339#[non_exhaustive]
340pub enum DocSource {
341 LocalCache,
343 DirServer {
345 source: Option<SourceInfo>,
347 },
348}
349
350impl std::fmt::Display for DocSource {
351 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
352 match self {
353 DocSource::LocalCache => write!(f, "local cache"),
354 DocSource::DirServer { source: None } => write!(f, "directory server"),
355 DocSource::DirServer { source: Some(info) } => write!(f, "directory server {}", info),
356 }
357 }
358}
359
360impl<R: Runtime> DirMgr<R> {
361 pub fn load_once(runtime: R, config: DirMgrConfig) -> Result<Arc<NetDir>> {
371 let store = DirMgrStore::new(&config, runtime.clone(), true)?;
372 let dirmgr = Arc::new(Self::from_config(config, runtime, store, None, true)?);
373
374 let attempt = AttemptId::next();
376 trace!(%attempt, "Trying to load a full directory from cache");
377 let outcome = dirmgr.load_directory(attempt);
378 trace!(%attempt, "Load result: {outcome:?}");
379 let _success = outcome?;
380
381 dirmgr
382 .netdir(Timeliness::Timely)
383 .map_err(|_| Error::DirectoryNotPresent)
384 }
385
386 pub async fn load_or_bootstrap_once(
396 config: DirMgrConfig,
397 runtime: R,
398 store: DirMgrStore<R>,
399 circmgr: Arc<CircMgr<R>>,
400 ) -> Result<Arc<NetDir>> {
401 let dirmgr = DirMgr::bootstrap_from_config(config, runtime, store, circmgr).await?;
402 dirmgr
403 .timely_netdir()
404 .map_err(|_| Error::DirectoryNotPresent)
405 }
406
407 pub fn create_unbootstrapped(
411 config: DirMgrConfig,
412 runtime: R,
413 store: DirMgrStore<R>,
414 circmgr: Arc<CircMgr<R>>,
415 ) -> Result<Arc<Self>> {
416 Ok(Arc::new(DirMgr::from_config(
417 config,
418 runtime,
419 store,
420 Some(circmgr),
421 false,
422 )?))
423 }
424
425 #[allow(clippy::cognitive_complexity)] #[instrument(level = "trace", skip_all)]
447 pub async fn bootstrap(self: &Arc<Self>) -> Result<()> {
448 if self.offline {
449 return Err(Error::OfflineMode);
450 }
451
452 if self
459 .bootstrap_started
460 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
461 .is_err()
462 {
463 debug!("Attempted to bootstrap twice; ignoring.");
464 return Ok(());
465 }
466
467 let reset_bootstrap_started = scopeguard::guard(&self.bootstrap_started, |v| {
470 v.store(false, Ordering::SeqCst);
471 });
472
473 let schedule = {
474 let sched = self.task_schedule.lock().expect("poisoned lock").take();
475 match sched {
476 Some(sched) => sched,
477 None => {
478 debug!("Attempted to bootstrap twice; ignoring.");
479 return Ok(());
480 }
481 }
482 };
483
484 let attempt_id = AttemptId::next();
486 trace!(attempt=%attempt_id, "Starting to bootstrap directory");
487 let have_directory = self.load_directory(attempt_id)?;
488
489 let (mut sender, receiver) = if have_directory {
490 info!("Loaded a good directory from cache.");
491 (None, None)
492 } else {
493 info!("Didn't get usable directory from cache.");
494 let (sender, receiver) = oneshot::channel();
495 (Some(sender), Some(receiver))
496 };
497
498 let dirmgr_weak = Arc::downgrade(self);
500 self.runtime
501 .spawn(async move {
502 let mut schedule = scopeguard::guard(schedule, |schedule| {
509 if let Some(dm) = Weak::upgrade(&dirmgr_weak) {
510 *dm.task_schedule.lock().expect("poisoned lock") = Some(schedule);
511 }
512 });
513
514 if let Err(e) =
517 Self::reload_until_owner(&dirmgr_weak, &mut schedule, attempt_id, &mut sender)
518 .await
519 {
520 match e {
521 Error::ManagerDropped => {}
522 _ => warn_report!(e, "Unrecovered error while waiting for bootstrap",),
523 }
524 } else if let Err(e) =
525 Self::download_forever(dirmgr_weak.clone(), &mut schedule, attempt_id, sender)
526 .await
527 {
528 match e {
529 Error::ManagerDropped => {}
530 _ => warn_report!(e, "Unrecovered error while downloading"),
531 }
532 }
533 })
534 .map_err(|e| Error::from_spawn("directory updater task", e))?;
535
536 if let Some(receiver) = receiver {
537 match receiver.await {
538 Ok(()) => {
539 info!("We have enough information to build circuits.");
540 let _ = ScopeGuard::into_inner(reset_bootstrap_started);
542 }
543 Err(_) => {
544 warn!("Bootstrapping task exited before finishing.");
545 return Err(Error::CantAdvanceState);
546 }
547 }
548 }
549 Ok(())
550 }
551
552 pub fn bootstrap_started(&self) -> bool {
554 self.bootstrap_started.load(Ordering::SeqCst)
555 }
556
557 #[instrument(level = "trace", skip_all)]
560 pub async fn bootstrap_from_config(
561 config: DirMgrConfig,
562 runtime: R,
563 store: DirMgrStore<R>,
564 circmgr: Arc<CircMgr<R>>,
565 ) -> Result<Arc<Self>> {
566 let dirmgr = Self::create_unbootstrapped(config, runtime, store, circmgr)?;
567
568 dirmgr.bootstrap().await?;
569
570 Ok(dirmgr)
571 }
572
573 #[allow(clippy::cognitive_complexity)] async fn reload_until_owner(
582 weak: &Weak<Self>,
583 schedule: &mut TaskSchedule<R>,
584 attempt_id: AttemptId,
585 on_complete: &mut Option<oneshot::Sender<()>>,
586 ) -> Result<()> {
587 let mut logged = false;
588 let mut bootstrapped;
589 {
590 let dirmgr = upgrade_weak_ref(weak)?;
591 bootstrapped = dirmgr.netdir.get().is_some();
592 }
593
594 loop {
595 {
596 let dirmgr = upgrade_weak_ref(weak)?;
597 trace!("Trying to take ownership of the directory cache lock");
598 if dirmgr.try_upgrade_to_readwrite()? {
599 if logged {
603 info!(
604 "The previous owning process has given up the lock. We are now in charge of managing the directory."
605 );
606 }
607 return Ok(());
608 }
609 }
610
611 if !logged {
612 logged = true;
613 if bootstrapped {
614 info!("Another process is managing the directory. We'll use its cache.");
615 } else {
616 info!(
617 "Another process is bootstrapping the directory. Waiting till it finishes or exits."
618 );
619 }
620 }
621
622 let pause = if bootstrapped {
625 std::time::Duration::new(120, 0)
626 } else {
627 std::time::Duration::new(5, 0)
628 };
629 schedule.sleep(pause).await?;
630 {
634 let dirmgr = upgrade_weak_ref(weak)?;
635 trace!("Trying to load from the directory cache");
636 if dirmgr.load_directory(attempt_id)? {
637 if let Some(send_done) = on_complete.take() {
639 let _ = send_done.send(());
640 }
641 if !bootstrapped {
642 info!("The directory is now bootstrapped.");
643 }
644 bootstrapped = true;
645 }
646 }
647 }
648 }
649
650 #[allow(clippy::cognitive_complexity)] #[instrument(level = "trace", skip_all)]
656 async fn download_forever(
657 weak: Weak<Self>,
658 schedule: &mut TaskSchedule<R>,
659 mut attempt_id: AttemptId,
660 mut on_complete: Option<oneshot::Sender<()>>,
661 ) -> Result<()> {
662 let mut state: Box<dyn DirState> = {
663 let dirmgr = upgrade_weak_ref(&weak)?;
664 Box::new(state::GetConsensusState::new(
665 dirmgr.runtime.clone(),
666 dirmgr.config.get(),
667 CacheUsage::CacheOkay,
668 Some(dirmgr.netdir.clone()),
669 #[cfg(feature = "dirfilter")]
670 dirmgr
671 .filter
672 .clone()
673 .unwrap_or_else(|| Arc::new(crate::filter::NilFilter)),
674 ))
675 };
676
677 trace!("Entering download loop.");
678
679 loop {
680 let mut usable = false;
681
682 let retry_config = {
683 let dirmgr = upgrade_weak_ref(&weak)?;
684 dirmgr.config.get().schedule.retry_bootstrap()
688 };
689 let mut retry_delay = retry_config.schedule();
690
691 'retry_attempt: for try_num in retry_config.attempts() {
692 trace!(attempt=%attempt_id, ?try_num, "Trying to download a directory.");
693 let outcome = bootstrap::download(
694 Weak::clone(&weak),
695 &mut state,
696 schedule,
697 attempt_id,
698 &mut on_complete,
699 )
700 .await;
701 trace!(attempt=%attempt_id, ?try_num, ?outcome, "Download is over.");
702
703 if let Err(err) = outcome {
704 if state.is_ready(Readiness::Usable) {
705 usable = true;
706 info_report!(
707 err,
708 "Unable to completely download a directory. (Nevertheless, the directory is usable, so we'll pause for now)"
709 );
710 break 'retry_attempt;
711 }
712
713 match err.bootstrap_action() {
714 BootstrapAction::Nonfatal => {
715 return Err(into_internal!(
716 "Nonfatal error should not have propagated here"
717 )(err)
718 .into());
719 }
720 BootstrapAction::Reset => {}
721 BootstrapAction::Fatal => return Err(err),
722 }
723
724 let delay = retry_delay.next_delay(&mut rand::rng());
725 warn_report!(
726 err,
727 "Unable to download a usable directory. (We will restart in {})",
728 humantime::format_duration(delay),
729 );
730 {
731 let dirmgr = upgrade_weak_ref(&weak)?;
732 dirmgr.note_reset(attempt_id);
733 }
734 schedule.sleep(delay).await?;
735 state = state.reset();
736 } else {
737 info!(attempt=%attempt_id, "Directory is complete.");
738 usable = true;
739 break 'retry_attempt;
740 }
741 }
742
743 if !usable {
744 warn!(
746 "We failed {} times to bootstrap a directory. We're going to give up.",
747 retry_config.n_attempts()
748 );
749 return Err(Error::CantAdvanceState);
750 } else {
751 if let Some(send_done) = on_complete.take() {
753 let _ = send_done.send(());
754 }
755 }
756
757 let reset_at = state.reset_time();
758 match reset_at {
759 Some(t) => {
760 trace!("Sleeping until {}", time::OffsetDateTime::from(t));
761 schedule.sleep_until_wallclock(t).await?;
762 }
763 None => return Ok(()),
764 }
765 attempt_id = bootstrap::AttemptId::next();
766 trace!(attempt=%attempt_id, "Beginning new attempt to bootstrap directory");
767 state = state.reset();
768 }
769 }
770
771 fn circmgr(&self) -> Result<Arc<CircMgr<R>>> {
773 self.circmgr.clone().ok_or(Error::NoDownloadSupport)
774 }
775
776 pub fn reconfigure(
780 &self,
781 new_config: &DirMgrConfig,
782 how: tor_config::Reconfigure,
783 ) -> std::result::Result<(), tor_config::ReconfigureError> {
784 let config = self.config.get();
785 if new_config.cache_dir != config.cache_dir {
790 how.cannot_change("storage.cache_dir")?;
791 }
792 if new_config.cache_trust != config.cache_trust {
793 how.cannot_change("storage.permissions")?;
794 }
795 if new_config.authorities() != config.authorities() {
796 how.cannot_change("network.authorities")?;
797 }
798
799 if how == tor_config::Reconfigure::CheckAllOrNothing {
800 return Ok(());
801 }
802
803 let params_changed = new_config.override_net_params != config.override_net_params;
804
805 self.config
806 .map_and_replace(|cfg| cfg.update_from_config(new_config));
807
808 if params_changed {
809 let _ignore_err = self.netdir.mutate(|netdir| {
810 netdir.replace_overridden_parameters(&new_config.override_net_params);
811 Ok(())
812 });
813 {
814 let mut params = self.default_parameters.lock().expect("lock failed");
815 *params = Arc::new(NetParameters::from_map(&new_config.override_net_params));
816 }
817
818 self.events.publish(DirEvent::NewConsensus);
821 }
822
823 Ok(())
824 }
825
826 pub fn bootstrap_events(&self) -> event::DirBootstrapEvents {
832 self.receive_status.clone()
833 }
834
835 fn update_progress(&self, attempt_id: AttemptId, progress: DirProgress) {
838 let mut sender = self.send_status.lock().expect("poisoned lock");
840 let mut status = sender.borrow_mut();
841
842 status.update_progress(attempt_id, progress);
843 }
844
845 fn note_errors(&self, attempt_id: AttemptId, n_errors: usize) {
848 if n_errors == 0 {
849 return;
850 }
851 let mut sender = self.send_status.lock().expect("poisoned lock");
852 let mut status = sender.borrow_mut();
853
854 status.note_errors(attempt_id, n_errors);
855 }
856
857 fn note_reset(&self, attempt_id: AttemptId) {
859 let mut sender = self.send_status.lock().expect("poisoned lock");
860 let mut status = sender.borrow_mut();
861
862 status.note_reset(attempt_id);
863 }
864
865 fn try_upgrade_to_readwrite(&self) -> Result<bool> {
872 self.store
873 .lock()
874 .expect("Directory storage lock poisoned")
875 .upgrade_to_readwrite()
876 }
877
878 #[cfg(test)]
880 fn store_if_rw(&self) -> Option<&Mutex<DynStore>> {
881 let rw = !self
882 .store
883 .lock()
884 .expect("Directory storage lock poisoned")
885 .is_readonly();
886 if rw { Some(&self.store) } else { None }
888 }
889
890 #[allow(clippy::unnecessary_wraps)] fn from_config(
896 config: DirMgrConfig,
897 runtime: R,
898 store: DirMgrStore<R>,
899 circmgr: Option<Arc<CircMgr<R>>>,
900 offline: bool,
901 ) -> Result<Self> {
902 let netdir = Arc::new(SharedMutArc::new());
903 let events = event::FlagPublisher::new();
904 let default_parameters = NetParameters::from_map(&config.override_net_params);
905 let default_parameters = Mutex::new(Arc::new(default_parameters));
906
907 let (send_status, receive_status) = postage::watch::channel();
908 let send_status = Mutex::new(send_status);
909 let receive_status = DirBootstrapEvents {
910 inner: receive_status,
911 };
912 #[cfg(feature = "dirfilter")]
913 let filter = config.extensions.filter.clone();
914
915 let (task_schedule, task_handle) = TaskSchedule::new(runtime.clone());
917 let task_schedule = Mutex::new(Some(task_schedule));
918
919 let protocols = {
922 let store = store.store.lock().expect("lock poisoned");
923 store
924 .cached_protocol_recommendations()?
925 .map(|(t, p)| (t, Arc::new(p)))
926 };
927
928 Ok(DirMgr {
929 config: config.into(),
930 store: store.store,
931 netdir,
932 protocols: Mutex::new(protocols),
933 default_parameters,
934 events,
935 send_status,
936 receive_status,
937 circmgr,
938 runtime,
939 offline,
940 bootstrap_started: AtomicBool::new(false),
941 #[cfg(feature = "dirfilter")]
942 filter,
943 task_schedule,
944 task_handle,
945 })
946 }
947
948 fn load_directory(self: &Arc<Self>, attempt_id: AttemptId) -> Result<bool> {
953 let state = state::GetConsensusState::new(
954 self.runtime.clone(),
955 self.config.get(),
956 CacheUsage::CacheOnly,
957 None,
958 #[cfg(feature = "dirfilter")]
959 self.filter
960 .clone()
961 .unwrap_or_else(|| Arc::new(crate::filter::NilFilter)),
962 );
963 let _ = bootstrap::load(self, Box::new(state), attempt_id)?;
964
965 Ok(self.netdir.get().is_some())
966 }
967
968 pub fn events(&self) -> impl futures::Stream<Item = DirEvent> + use<R> {
975 self.events.subscribe()
976 }
977
978 pub fn text(&self, doc: &DocId) -> Result<Option<DocumentText>> {
981 use itertools::Itertools;
982 let mut result = HashMap::new();
983 let query: DocQuery = (*doc).into();
984 let store = self.store.lock().expect("store lock poisoned");
985 query.load_from_store_into(&mut result, &**store)?;
986 let item = result.into_iter().at_most_one().map_err(|_| {
987 Error::CacheCorruption("Found more than one entry in storage for given docid")
988 })?;
989 if let Some((docid, doctext)) = item {
990 if &docid != doc {
991 return Err(Error::CacheCorruption(
992 "Item from storage had incorrect docid.",
993 ));
994 }
995 Ok(Some(doctext))
996 } else {
997 Ok(None)
998 }
999 }
1000
1001 pub fn texts<T>(&self, docs: T) -> Result<HashMap<DocId, DocumentText>>
1006 where
1007 T: IntoIterator<Item = DocId>,
1008 {
1009 let partitioned = docid::partition_by_type(docs);
1010 let mut result = HashMap::new();
1011 let store = self.store.lock().expect("store lock poisoned");
1012 for (_, query) in partitioned.into_iter() {
1013 query.load_from_store_into(&mut result, &**store)?;
1014 }
1015 Ok(result)
1016 }
1017
1018 fn expand_response_text(&self, req: &ClientRequest, text: String) -> Result<String> {
1026 if let ClientRequest::Consensus(req) = req {
1027 if tor_consdiff::looks_like_diff(&text) {
1028 if let Some(old_d) = req.old_consensus_digests().next() {
1029 let db_val = {
1030 let s = self.store.lock().expect("Directory storage lock poisoned");
1031 s.consensus_by_sha3_digest_of_signed_part(old_d)?
1032 };
1033 if let Some((old_consensus, meta)) = db_val {
1034 info!("Applying a consensus diff");
1035 let new_consensus = tor_consdiff::apply_diff(
1036 old_consensus.as_str()?,
1037 &text,
1038 Some(*meta.sha3_256_of_signed()),
1039 )?;
1040 new_consensus.check_digest()?;
1041 return Ok(new_consensus.to_string());
1042 }
1043 }
1044 return Err(Error::Unwanted(
1045 "Received a consensus diff we did not ask for",
1046 ));
1047 }
1048 }
1049 Ok(text)
1050 }
1051
1052 #[allow(clippy::cognitive_complexity)]
1054 fn apply_netdir_changes(
1055 self: &Arc<Self>,
1056 state: &mut Box<dyn DirState>,
1057 store: &mut dyn Store,
1058 ) -> Result<()> {
1059 if let Some(change) = state.get_netdir_change() {
1060 match change {
1061 NetDirChange::AttemptReplace {
1062 netdir,
1063 consensus_meta,
1064 } => {
1065 if let Some(ref cm) = self.circmgr {
1068 if !cm
1069 .netdir_is_sufficient(netdir.as_ref().expect("AttemptReplace had None"))
1070 {
1071 debug!("Got a new NetDir, but it doesn't have enough guards yet.");
1072 return Ok(());
1073 }
1074 }
1075 let is_stale = {
1076 self.netdir
1078 .get()
1079 .map(|x| {
1080 x.lifetime().valid_after()
1081 > netdir
1082 .as_ref()
1083 .expect("AttemptReplace had None")
1084 .lifetime()
1085 .valid_after()
1086 })
1087 .unwrap_or(false)
1088 };
1089 if is_stale {
1090 warn!("Got a new NetDir, but it's older than the one we currently have!");
1091 return Err(Error::NetDirOlder);
1092 }
1093 let cfg = self.config.get();
1094 let mut netdir = netdir.take().expect("AttemptReplace had None");
1095 netdir.replace_overridden_parameters(&cfg.override_net_params);
1096 self.netdir.replace(netdir);
1097 self.events.publish(DirEvent::NewConsensus);
1098 self.events.publish(DirEvent::NewDescriptors);
1099
1100 info!("Marked consensus usable.");
1101 if !store.is_readonly() {
1102 store.mark_consensus_usable(consensus_meta)?;
1103 store.expire_all(&crate::storage::EXPIRATION_DEFAULTS)?;
1106 }
1107 Ok(())
1108 }
1109 NetDirChange::AddMicrodescs(mds) => {
1110 self.netdir.mutate(|netdir| {
1111 for md in mds.drain(..) {
1112 netdir.add_microdesc(md);
1113 }
1114 Ok(())
1115 })?;
1116 self.events.publish(DirEvent::NewDescriptors);
1117 Ok(())
1118 }
1119 NetDirChange::SetRequiredProtocol { timestamp, protos } => {
1120 if !store.is_readonly() {
1121 store.update_protocol_recommendations(timestamp, protos.as_ref())?;
1122 }
1123 let mut pr = self.protocols.lock().expect("Poisoned lock");
1124 *pr = Some((timestamp, protos));
1125 self.events.publish(DirEvent::NewProtocolRecommendation);
1126 Ok(())
1127 }
1128 }
1129 } else {
1130 Ok(())
1131 }
1132 }
1133}
1134
1135#[derive(Debug, Copy, Clone)]
1137enum Readiness {
1138 Complete,
1140 Usable,
1142}
1143
1144fn upgrade_weak_ref<T>(weak: &Weak<T>) -> Result<Arc<T>> {
1147 Weak::upgrade(weak).ok_or(Error::ManagerDropped)
1148}
1149
1150pub(crate) fn default_consensus_cutoff(
1153 now: SystemTime,
1154 tolerance: &DirTolerance,
1155) -> Result<SystemTime> {
1156 const MIN_AGE_TO_ALLOW: Duration = Duration::from_secs(3 * 3600);
1159 let allow_skew = std::cmp::max(MIN_AGE_TO_ALLOW, tolerance.post_valid_tolerance());
1160 let cutoff = time::OffsetDateTime::from(now - allow_skew);
1161 let (h, _m, _s) = cutoff.to_hms();
1168 let cutoff = cutoff.replace_time(
1169 time::Time::from_hms(h, 0, 0)
1170 .map_err(tor_error::into_internal!("Failed clock calculation"))?,
1171 );
1172 let cutoff = cutoff + Duration::from_secs(3600);
1173
1174 Ok(cutoff.into())
1175}
1176
1177pub fn supported_client_protocols() -> tor_protover::Protocols {
1180 use tor_protover::named::*;
1181 [
1184 DIRCACHE_CONSDIFF,
1186 ]
1187 .into_iter()
1188 .collect()
1189}
1190
1191#[cfg(test)]
1192mod test {
1193 #![allow(clippy::bool_assert_comparison)]
1195 #![allow(clippy::clone_on_copy)]
1196 #![allow(clippy::dbg_macro)]
1197 #![allow(clippy::mixed_attributes_style)]
1198 #![allow(clippy::print_stderr)]
1199 #![allow(clippy::print_stdout)]
1200 #![allow(clippy::single_char_pattern)]
1201 #![allow(clippy::unwrap_used)]
1202 #![allow(clippy::unchecked_time_subtraction)]
1203 #![allow(clippy::useless_vec)]
1204 #![allow(clippy::needless_pass_by_value)]
1205 #![allow(clippy::string_slice)] use super::*;
1208 use crate::docmeta::{AuthCertMeta, ConsensusMeta};
1209 use std::time::Duration;
1210 use tempfile::TempDir;
1211 use tor_basic_utils::test_rng::testing_rng;
1212 use tor_netdoc::doc::netstatus::ConsensusFlavor;
1213 use tor_netdoc::doc::{authcert::AuthCertKeyIds, netstatus::Lifetime};
1214 use tor_rtcompat::SleepProvider;
1215
1216 #[test]
1217 fn protocols() {
1218 let pr = supported_client_protocols();
1219 let expected = "DirCache=2".parse().unwrap();
1220 assert_eq!(pr, expected);
1221 }
1222
1223 pub(crate) fn new_mgr<R: Runtime>(runtime: R) -> (TempDir, DirMgr<R>) {
1224 let dir = TempDir::new().unwrap();
1225 let config = DirMgrConfig {
1226 cache_dir: dir.path().into(),
1227 ..Default::default()
1228 };
1229 let store = DirMgrStore::new(&config, runtime.clone(), false).unwrap();
1230 let dirmgr = DirMgr::from_config(config, runtime, store, None, false).unwrap();
1231
1232 (dir, dirmgr)
1233 }
1234
1235 #[test]
1236 fn failing_accessors() {
1237 tor_rtcompat::test_with_one_runtime!(|rt| async {
1238 let (_tempdir, mgr) = new_mgr(rt);
1239
1240 assert!(mgr.circmgr().is_err());
1241 assert!(mgr.netdir(Timeliness::Unchecked).is_err());
1242 });
1243 }
1244
1245 #[test]
1246 fn load_and_store_internals() {
1247 tor_rtcompat::test_with_one_runtime!(|rt| async {
1248 let now = rt.wallclock();
1249 let tomorrow = now + Duration::from_secs(86400);
1250 let later = tomorrow + Duration::from_secs(86400);
1251
1252 let (_tempdir, mgr) = new_mgr(rt);
1253
1254 let d1 = [5_u8; 32];
1256 let d2 = [7; 32];
1257 let d3 = [42; 32];
1258 let d4 = [99; 20];
1259 let d5 = [12; 20];
1260 let certid1 = AuthCertKeyIds {
1261 id_fingerprint: d4.into(),
1262 sk_fingerprint: d5.into(),
1263 };
1264 let certid2 = AuthCertKeyIds {
1265 id_fingerprint: d5.into(),
1266 sk_fingerprint: d4.into(),
1267 };
1268
1269 {
1270 let mut store = mgr.store.lock().unwrap();
1271
1272 store
1273 .store_microdescs(
1274 &[
1275 ("Fake micro 1", &d1),
1276 ("Fake micro 2", &d2),
1277 ("Fake micro 3", &d3),
1278 ],
1279 now,
1280 )
1281 .unwrap();
1282
1283 #[cfg(feature = "routerdesc")]
1284 store
1285 .store_routerdescs(&[("Fake rd1", now, &d4), ("Fake rd2", now, &d5)])
1286 .unwrap();
1287
1288 store
1289 .store_authcerts(&[
1290 (
1291 AuthCertMeta::new(certid1, now, tomorrow),
1292 "Fake certificate one",
1293 ),
1294 (
1295 AuthCertMeta::new(certid2, now, tomorrow),
1296 "Fake certificate two",
1297 ),
1298 ])
1299 .unwrap();
1300
1301 let cmeta = ConsensusMeta::new(
1302 Lifetime::new(now, tomorrow, later).unwrap(),
1303 [102; 32],
1304 [103; 32],
1305 );
1306 store
1307 .store_consensus(&cmeta, ConsensusFlavor::Microdesc, false, "Fake consensus!")
1308 .unwrap();
1309 }
1310
1311 let t1 = mgr.text(&DocId::Microdesc(d1)).unwrap().unwrap();
1313 assert_eq!(t1.as_str(), Ok("Fake micro 1"));
1314
1315 let t2 = mgr
1316 .text(&DocId::LatestConsensus {
1317 flavor: ConsensusFlavor::Microdesc,
1318 cache_usage: CacheUsage::CacheOkay,
1319 })
1320 .unwrap()
1321 .unwrap();
1322 assert_eq!(t2.as_str(), Ok("Fake consensus!"));
1323
1324 let t3 = mgr.text(&DocId::Microdesc([255; 32])).unwrap();
1325 assert!(t3.is_none());
1326
1327 let d_bogus = DocId::Microdesc([255; 32]);
1329 let res = mgr
1330 .texts(vec![
1331 DocId::Microdesc(d2),
1332 DocId::Microdesc(d3),
1333 d_bogus,
1334 DocId::AuthCert(certid2),
1335 #[cfg(feature = "routerdesc")]
1336 DocId::RouterDesc(d5),
1337 ])
1338 .unwrap();
1339 assert_eq!(
1340 res.get(&DocId::Microdesc(d2)).unwrap().as_str(),
1341 Ok("Fake micro 2")
1342 );
1343 assert_eq!(
1344 res.get(&DocId::Microdesc(d3)).unwrap().as_str(),
1345 Ok("Fake micro 3")
1346 );
1347 assert!(!res.contains_key(&d_bogus));
1348 assert_eq!(
1349 res.get(&DocId::AuthCert(certid2)).unwrap().as_str(),
1350 Ok("Fake certificate two")
1351 );
1352 #[cfg(feature = "routerdesc")]
1353 assert_eq!(
1354 res.get(&DocId::RouterDesc(d5)).unwrap().as_str(),
1355 Ok("Fake rd2")
1356 );
1357 });
1358 }
1359
1360 #[test]
1361 fn make_consensus_request() {
1362 tor_rtcompat::test_with_one_runtime!(|rt| async {
1363 let now = rt.wallclock();
1364 let tomorrow = now + Duration::from_secs(86400);
1365 let later = tomorrow + Duration::from_secs(86400);
1366
1367 let (_tempdir, mgr) = new_mgr(rt);
1368 let config = DirMgrConfig::default();
1369
1370 let req = {
1372 let store = mgr.store.lock().unwrap();
1373 bootstrap::make_consensus_request(
1374 now,
1375 ConsensusFlavor::Microdesc,
1376 &**store,
1377 &config,
1378 )
1379 .unwrap()
1380 };
1381 let tolerance = DirTolerance::default().post_valid_tolerance();
1382 match req {
1383 ClientRequest::Consensus(r) => {
1384 assert_eq!(r.old_consensus_digests().count(), 0);
1385 let date = r.last_consensus_date().unwrap();
1386 assert!(date >= now - tolerance);
1387 assert!(date <= now - tolerance + Duration::from_secs(3600));
1388 }
1389 _ => panic!("Wrong request type"),
1390 }
1391
1392 let d_prev = [42; 32];
1394 {
1395 let mut store = mgr.store.lock().unwrap();
1396
1397 let cmeta = ConsensusMeta::new(
1398 Lifetime::new(now, tomorrow, later).unwrap(),
1399 d_prev,
1400 [103; 32],
1401 );
1402 store
1403 .store_consensus(&cmeta, ConsensusFlavor::Microdesc, false, "Fake consensus!")
1404 .unwrap();
1405 }
1406
1407 let req = {
1409 let store = mgr.store.lock().unwrap();
1410 bootstrap::make_consensus_request(
1411 now,
1412 ConsensusFlavor::Microdesc,
1413 &**store,
1414 &config,
1415 )
1416 .unwrap()
1417 };
1418 match req {
1419 ClientRequest::Consensus(r) => {
1420 let ds: Vec<_> = r.old_consensus_digests().collect();
1421 assert_eq!(ds.len(), 1);
1422 assert_eq!(ds[0], &d_prev);
1423 assert_eq!(r.last_consensus_date(), Some(now));
1424 }
1425 _ => panic!("Wrong request type"),
1426 }
1427 });
1428 }
1429
1430 #[test]
1431 fn make_other_requests() {
1432 tor_rtcompat::test_with_one_runtime!(|rt| async {
1433 use rand::RngExt;
1434 let (_tempdir, mgr) = new_mgr(rt);
1435
1436 let certid1 = AuthCertKeyIds {
1437 id_fingerprint: [99; 20].into(),
1438 sk_fingerprint: [100; 20].into(),
1439 };
1440 let mut rng = testing_rng();
1441 #[cfg(feature = "routerdesc")]
1442 let rd_ids: Vec<DocId> = (0..1000).map(|_| DocId::RouterDesc(rng.random())).collect();
1443 let md_ids: Vec<DocId> = (0..1000).map(|_| DocId::Microdesc(rng.random())).collect();
1444 let config = DirMgrConfig::default();
1445
1446 let query = DocId::AuthCert(certid1);
1448 let store = mgr.store.lock().unwrap();
1449 let reqs =
1450 bootstrap::make_requests_for_documents(&mgr.runtime, &[query], &**store, &config)
1451 .unwrap();
1452 assert_eq!(reqs.len(), 1);
1453 let req = &reqs[0];
1454 if let ClientRequest::AuthCert(r) = req {
1455 assert_eq!(r.keys().next(), Some(&certid1));
1456 } else {
1457 panic!();
1458 }
1459
1460 let reqs =
1462 bootstrap::make_requests_for_documents(&mgr.runtime, &md_ids, &**store, &config)
1463 .unwrap();
1464 assert_eq!(reqs.len(), 2);
1465 assert!(matches!(reqs[0], ClientRequest::Microdescs(_)));
1466
1467 #[cfg(feature = "routerdesc")]
1469 {
1470 let reqs = bootstrap::make_requests_for_documents(
1471 &mgr.runtime,
1472 &rd_ids,
1473 &**store,
1474 &config,
1475 )
1476 .unwrap();
1477 assert_eq!(reqs.len(), 2);
1478 assert!(matches!(reqs[0], ClientRequest::RouterDescs(_)));
1479 }
1480 });
1481 }
1482
1483 #[test]
1484 fn expand_response() {
1485 tor_rtcompat::test_with_one_runtime!(|rt| async {
1486 let now = rt.wallclock();
1487 let day = Duration::from_secs(86400);
1488 let config = DirMgrConfig::default();
1489
1490 let (_tempdir, mgr) = new_mgr(rt);
1491
1492 let q = DocId::Microdesc([99; 32]);
1494 let r = {
1495 let store = mgr.store.lock().unwrap();
1496 bootstrap::make_requests_for_documents(&mgr.runtime, &[q], &**store, &config)
1497 .unwrap()
1498 };
1499 let expanded = mgr.expand_response_text(&r[0], "ABC".to_string());
1500 assert_eq!(&expanded.unwrap(), "ABC");
1501
1502 let latest_id = DocId::LatestConsensus {
1505 flavor: ConsensusFlavor::Microdesc,
1506 cache_usage: CacheUsage::CacheOkay,
1507 };
1508 let r = {
1509 let store = mgr.store.lock().unwrap();
1510 bootstrap::make_requests_for_documents(
1511 &mgr.runtime,
1512 &[latest_id],
1513 &**store,
1514 &config,
1515 )
1516 .unwrap()
1517 };
1518 let expanded = mgr.expand_response_text(&r[0], "DEF".to_string());
1519 assert_eq!(&expanded.unwrap(), "DEF");
1520
1521 {
1524 let mut store = mgr.store.lock().unwrap();
1525 let d_in = [0x99; 32]; let cmeta = ConsensusMeta::new(
1527 Lifetime::new(now, now + day, now + 2 * day).unwrap(),
1528 d_in,
1529 d_in,
1530 );
1531 store
1532 .store_consensus(
1533 &cmeta,
1534 ConsensusFlavor::Microdesc,
1535 false,
1536 "line 1\nline2\nline 3\n",
1537 )
1538 .unwrap();
1539 }
1540
1541 let r = {
1544 let store = mgr.store.lock().unwrap();
1545 bootstrap::make_requests_for_documents(
1546 &mgr.runtime,
1547 &[latest_id],
1548 &**store,
1549 &config,
1550 )
1551 .unwrap()
1552 };
1553 let expanded = mgr.expand_response_text(&r[0], "hello".to_string());
1554 assert_eq!(&expanded.unwrap(), "hello");
1555
1556 let diff = "network-status-diff-version 1
1558hash 9999999999999999999999999999999999999999999999999999999999999999 8382374ca766873eb0d2530643191c6eaa2c5e04afa554cbac349b5d0592d300
15592c
1560replacement line
1561.
1562".to_string();
1563 let expanded = mgr.expand_response_text(&r[0], diff);
1564
1565 assert_eq!(expanded.unwrap(), "line 1\nreplacement line\nline 3\n");
1566
1567 let diff = "network-status-diff-version 1
1569hash 9999999999999999999999999999999999999999999999999999999999999999 9999999999999999999999999999999999999999999999999999999999999999
15702c
1571replacement line
1572.
1573".to_string();
1574 let expanded = mgr.expand_response_text(&r[0], diff);
1575 assert!(expanded.is_err());
1576 });
1577 }
1578}