1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![doc = include_str!("../README.md")]
3#![allow(renamed_and_removed_lints)] #![allow(unknown_lints)] #![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_time_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] #![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] #![allow(clippy::result_large_err)] #![allow(clippy::needless_raw_string_hashes)] #![allow(clippy::needless_lifetimes)] #![allow(mismatched_lifetime_syntaxes)] #![allow(clippy::collapsible_if)] #![deny(clippy::unused_async)]
47#![allow(clippy::single_component_path_imports)]
53
54mod bootstrap;
55pub mod config;
56mod docid;
57mod docmeta;
58mod err;
59mod event;
60mod shared_ref;
61mod state;
62mod storage;
63
64#[cfg(feature = "bridge-client")]
65pub mod bridgedesc;
66#[cfg(feature = "dirfilter")]
67pub mod filter;
68
69use crate::docid::{CacheUsage, ClientRequest, DocQuery};
70use crate::err::BootstrapAction;
71#[cfg(not(feature = "experimental-api"))]
72use crate::shared_ref::SharedMutArc;
73#[cfg(feature = "experimental-api")]
74pub use crate::shared_ref::SharedMutArc;
75use crate::storage::{DynStore, Store};
76use bootstrap::AttemptId;
77use event::DirProgress;
78use postage::watch;
79use scopeguard::ScopeGuard;
80use tor_circmgr::CircMgr;
81use tor_dirclient::SourceInfo;
82use tor_dircommon::config::DirTolerance;
83use tor_error::{info_report, into_internal, warn_report};
84use tor_netdir::params::NetParameters;
85use tor_netdir::{DirEvent, MdReceiver, NetDir, NetDirProvider};
86
87use async_trait::async_trait;
88use futures::stream::BoxStream;
89use oneshot_fused_workaround as oneshot;
90use tor_netdoc::doc::netstatus::ProtoStatuses;
91use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
92use tor_rtcompat::{Runtime, SpawnExt};
93use tracing::{debug, info, instrument, trace, warn};
94
95use std::marker::PhantomData;
96use std::sync::atomic::{AtomicBool, Ordering};
97use std::sync::{Arc, Mutex};
98use std::time::Duration;
99use std::{collections::HashMap, sync::Weak};
100use std::{fmt::Debug, time::SystemTime};
101
102use crate::state::{DirState, NetDirChange};
103pub use config::DirMgrConfig;
104pub use docid::DocId;
105pub use err::Error;
106pub use event::{DirBlockage, DirBootstrapEvents, DirBootstrapStatus};
107pub use storage::DocumentText;
108pub use tor_dircommon::fallback::{FallbackDir, FallbackDirBuilder};
109pub use tor_netdir::Timeliness;
110
111use strum;
113
114pub type Result<T> = std::result::Result<T, Error>;
116
117#[derive(Clone)]
124pub struct DirMgrStore<R: Runtime> {
125 pub(crate) store: Arc<Mutex<crate::DynStore>>,
127
128 pub(crate) runtime: PhantomData<R>,
130}
131
132impl<R: Runtime> DirMgrStore<R> {
133 pub fn new(config: &DirMgrConfig, runtime: R, offline: bool) -> Result<Self> {
135 let store = Arc::new(Mutex::new(config.open_store(offline)?));
136 drop(runtime);
137 let runtime = PhantomData;
138 Ok(DirMgrStore { store, runtime })
139 }
140}
141
142#[async_trait]
144pub trait DirProvider: NetDirProvider {
145 fn reconfigure(
149 &self,
150 new_config: &DirMgrConfig,
151 how: tor_config::Reconfigure,
152 ) -> std::result::Result<(), tor_config::ReconfigureError>;
153
154 async fn bootstrap(&self) -> Result<()>;
156
157 fn bootstrap_events(&self) -> BoxStream<'static, DirBootstrapStatus>;
163
164 fn download_task_handle(&self) -> Option<TaskHandle> {
166 None
167 }
168}
169
170impl<R: Runtime> NetDirProvider for DirMgr<R> {
173 fn netdir(&self, timeliness: Timeliness) -> tor_netdir::Result<Arc<NetDir>> {
174 use tor_netdir::Error as NetDirError;
175 let netdir = self.netdir.get().ok_or(NetDirError::NoInfo)?;
176 let lifetime = match timeliness {
177 Timeliness::Strict => netdir.lifetime().clone(),
178 Timeliness::Timely => self
179 .config
180 .get()
181 .tolerance
182 .extend_lifetime(netdir.lifetime()),
183 Timeliness::Unchecked => return Ok(netdir),
184 };
185 let now = SystemTime::now();
186 if lifetime.valid_after() > now {
187 Err(NetDirError::DirNotYetValid)
188 } else if lifetime.valid_until() < now {
189 Err(NetDirError::DirExpired)
190 } else {
191 Ok(netdir)
192 }
193 }
194
195 fn events(&self) -> BoxStream<'static, DirEvent> {
196 Box::pin(self.events.subscribe())
197 }
198
199 fn params(&self) -> Arc<dyn AsRef<tor_netdir::params::NetParameters>> {
200 if let Some(netdir) = self.netdir.get() {
201 netdir
207 } else {
208 self.default_parameters
212 .lock()
213 .expect("Poisoned lock")
214 .clone()
215 }
216 }
222
223 fn protocol_statuses(&self) -> Option<(SystemTime, Arc<ProtoStatuses>)> {
224 self.protocols.lock().expect("Poisoned lock").clone()
225 }
226}
227
228#[async_trait]
229impl<R: Runtime> DirProvider for Arc<DirMgr<R>> {
230 fn reconfigure(
231 &self,
232 new_config: &DirMgrConfig,
233 how: tor_config::Reconfigure,
234 ) -> std::result::Result<(), tor_config::ReconfigureError> {
235 DirMgr::reconfigure(self, new_config, how)
236 }
237
238 #[instrument(level = "trace", skip_all)]
239 async fn bootstrap(&self) -> Result<()> {
240 DirMgr::bootstrap(self).await
241 }
242
243 fn bootstrap_events(&self) -> BoxStream<'static, DirBootstrapStatus> {
244 Box::pin(DirMgr::bootstrap_events(self))
245 }
246
247 fn download_task_handle(&self) -> Option<TaskHandle> {
248 Some(self.task_handle.clone())
249 }
250}
251
252pub struct DirMgr<R: Runtime> {
266 config: tor_config::MutCfg<DirMgrConfig>,
269 store: Arc<Mutex<DynStore>>,
274 netdir: Arc<SharedMutArc<NetDir>>,
281
282 protocols: Mutex<Option<(SystemTime, Arc<ProtoStatuses>)>>,
284
285 default_parameters: Mutex<Arc<NetParameters>>,
287
288 events: event::FlagPublisher<DirEvent>,
290
291 send_status: Mutex<watch::Sender<event::DirBootstrapStatus>>,
294
295 receive_status: DirBootstrapEvents,
301
302 circmgr: Option<Arc<CircMgr<R>>>,
304
305 runtime: R,
307
308 offline: bool,
310
311 bootstrap_started: AtomicBool,
318
319 #[cfg(feature = "dirfilter")]
321 filter: crate::filter::FilterConfig,
322
323 task_schedule: Mutex<Option<TaskSchedule<R>>>,
326
327 task_handle: TaskHandle,
329}
330
331#[derive(Debug, Clone)]
336#[non_exhaustive]
337pub enum DocSource {
338 LocalCache,
340 DirServer {
342 source: Option<SourceInfo>,
344 },
345}
346
347impl std::fmt::Display for DocSource {
348 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
349 match self {
350 DocSource::LocalCache => write!(f, "local cache"),
351 DocSource::DirServer { source: None } => write!(f, "directory server"),
352 DocSource::DirServer { source: Some(info) } => write!(f, "directory server {}", info),
353 }
354 }
355}
356
357impl<R: Runtime> DirMgr<R> {
358 pub fn load_once(runtime: R, config: DirMgrConfig) -> Result<Arc<NetDir>> {
368 let store = DirMgrStore::new(&config, runtime.clone(), true)?;
369 let dirmgr = Arc::new(Self::from_config(config, runtime, store, None, true)?);
370
371 let attempt = AttemptId::next();
373 trace!(%attempt, "Trying to load a full directory from cache");
374 let outcome = dirmgr.load_directory(attempt);
375 trace!(%attempt, "Load result: {outcome:?}");
376 let _success = outcome?;
377
378 dirmgr
379 .netdir(Timeliness::Timely)
380 .map_err(|_| Error::DirectoryNotPresent)
381 }
382
383 pub async fn load_or_bootstrap_once(
393 config: DirMgrConfig,
394 runtime: R,
395 store: DirMgrStore<R>,
396 circmgr: Arc<CircMgr<R>>,
397 ) -> Result<Arc<NetDir>> {
398 let dirmgr = DirMgr::bootstrap_from_config(config, runtime, store, circmgr).await?;
399 dirmgr
400 .timely_netdir()
401 .map_err(|_| Error::DirectoryNotPresent)
402 }
403
404 pub fn create_unbootstrapped(
408 config: DirMgrConfig,
409 runtime: R,
410 store: DirMgrStore<R>,
411 circmgr: Arc<CircMgr<R>>,
412 ) -> Result<Arc<Self>> {
413 Ok(Arc::new(DirMgr::from_config(
414 config,
415 runtime,
416 store,
417 Some(circmgr),
418 false,
419 )?))
420 }
421
422 #[allow(clippy::cognitive_complexity)] #[instrument(level = "trace", skip_all)]
444 pub async fn bootstrap(self: &Arc<Self>) -> Result<()> {
445 if self.offline {
446 return Err(Error::OfflineMode);
447 }
448
449 if self
456 .bootstrap_started
457 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
458 .is_err()
459 {
460 debug!("Attempted to bootstrap twice; ignoring.");
461 return Ok(());
462 }
463
464 let reset_bootstrap_started = scopeguard::guard(&self.bootstrap_started, |v| {
467 v.store(false, Ordering::SeqCst);
468 });
469
470 let schedule = {
471 let sched = self.task_schedule.lock().expect("poisoned lock").take();
472 match sched {
473 Some(sched) => sched,
474 None => {
475 debug!("Attempted to bootstrap twice; ignoring.");
476 return Ok(());
477 }
478 }
479 };
480
481 let attempt_id = AttemptId::next();
483 trace!(attempt=%attempt_id, "Starting to bootstrap directory");
484 let have_directory = self.load_directory(attempt_id)?;
485
486 let (mut sender, receiver) = if have_directory {
487 info!("Loaded a good directory from cache.");
488 (None, None)
489 } else {
490 info!("Didn't get usable directory from cache.");
491 let (sender, receiver) = oneshot::channel();
492 (Some(sender), Some(receiver))
493 };
494
495 let dirmgr_weak = Arc::downgrade(self);
497 self.runtime
498 .spawn(async move {
499 let mut schedule = scopeguard::guard(schedule, |schedule| {
506 if let Some(dm) = Weak::upgrade(&dirmgr_weak) {
507 *dm.task_schedule.lock().expect("poisoned lock") = Some(schedule);
508 }
509 });
510
511 if let Err(e) =
514 Self::reload_until_owner(&dirmgr_weak, &mut schedule, attempt_id, &mut sender)
515 .await
516 {
517 match e {
518 Error::ManagerDropped => {}
519 _ => warn_report!(e, "Unrecovered error while waiting for bootstrap",),
520 }
521 } else if let Err(e) =
522 Self::download_forever(dirmgr_weak.clone(), &mut schedule, attempt_id, sender)
523 .await
524 {
525 match e {
526 Error::ManagerDropped => {}
527 _ => warn_report!(e, "Unrecovered error while downloading"),
528 }
529 }
530 })
531 .map_err(|e| Error::from_spawn("directory updater task", e))?;
532
533 if let Some(receiver) = receiver {
534 match receiver.await {
535 Ok(()) => {
536 info!("We have enough information to build circuits.");
537 let _ = ScopeGuard::into_inner(reset_bootstrap_started);
539 }
540 Err(_) => {
541 warn!("Bootstrapping task exited before finishing.");
542 return Err(Error::CantAdvanceState);
543 }
544 }
545 }
546 Ok(())
547 }
548
549 pub fn bootstrap_started(&self) -> bool {
551 self.bootstrap_started.load(Ordering::SeqCst)
552 }
553
554 #[instrument(level = "trace", skip_all)]
557 pub async fn bootstrap_from_config(
558 config: DirMgrConfig,
559 runtime: R,
560 store: DirMgrStore<R>,
561 circmgr: Arc<CircMgr<R>>,
562 ) -> Result<Arc<Self>> {
563 let dirmgr = Self::create_unbootstrapped(config, runtime, store, circmgr)?;
564
565 dirmgr.bootstrap().await?;
566
567 Ok(dirmgr)
568 }
569
570 #[allow(clippy::cognitive_complexity)] async fn reload_until_owner(
579 weak: &Weak<Self>,
580 schedule: &mut TaskSchedule<R>,
581 attempt_id: AttemptId,
582 on_complete: &mut Option<oneshot::Sender<()>>,
583 ) -> Result<()> {
584 let mut logged = false;
585 let mut bootstrapped;
586 {
587 let dirmgr = upgrade_weak_ref(weak)?;
588 bootstrapped = dirmgr.netdir.get().is_some();
589 }
590
591 loop {
592 {
593 let dirmgr = upgrade_weak_ref(weak)?;
594 trace!("Trying to take ownership of the directory cache lock");
595 if dirmgr.try_upgrade_to_readwrite()? {
596 if logged {
600 info!(
601 "The previous owning process has given up the lock. We are now in charge of managing the directory."
602 );
603 }
604 return Ok(());
605 }
606 }
607
608 if !logged {
609 logged = true;
610 if bootstrapped {
611 info!("Another process is managing the directory. We'll use its cache.");
612 } else {
613 info!(
614 "Another process is bootstrapping the directory. Waiting till it finishes or exits."
615 );
616 }
617 }
618
619 let pause = if bootstrapped {
622 std::time::Duration::new(120, 0)
623 } else {
624 std::time::Duration::new(5, 0)
625 };
626 schedule.sleep(pause).await?;
627 {
631 let dirmgr = upgrade_weak_ref(weak)?;
632 trace!("Trying to load from the directory cache");
633 if dirmgr.load_directory(attempt_id)? {
634 if let Some(send_done) = on_complete.take() {
636 let _ = send_done.send(());
637 }
638 if !bootstrapped {
639 info!("The directory is now bootstrapped.");
640 }
641 bootstrapped = true;
642 }
643 }
644 }
645 }
646
647 #[allow(clippy::cognitive_complexity)] #[instrument(level = "trace", skip_all)]
653 async fn download_forever(
654 weak: Weak<Self>,
655 schedule: &mut TaskSchedule<R>,
656 mut attempt_id: AttemptId,
657 mut on_complete: Option<oneshot::Sender<()>>,
658 ) -> Result<()> {
659 let mut state: Box<dyn DirState> = {
660 let dirmgr = upgrade_weak_ref(&weak)?;
661 Box::new(state::GetConsensusState::new(
662 dirmgr.runtime.clone(),
663 dirmgr.config.get(),
664 CacheUsage::CacheOkay,
665 Some(dirmgr.netdir.clone()),
666 #[cfg(feature = "dirfilter")]
667 dirmgr
668 .filter
669 .clone()
670 .unwrap_or_else(|| Arc::new(crate::filter::NilFilter)),
671 ))
672 };
673
674 trace!("Entering download loop.");
675
676 loop {
677 let mut usable = false;
678
679 let retry_config = {
680 let dirmgr = upgrade_weak_ref(&weak)?;
681 dirmgr.config.get().schedule.retry_bootstrap()
685 };
686 let mut retry_delay = retry_config.schedule();
687
688 'retry_attempt: for try_num in retry_config.attempts() {
689 trace!(attempt=%attempt_id, ?try_num, "Trying to download a directory.");
690 let outcome = bootstrap::download(
691 Weak::clone(&weak),
692 &mut state,
693 schedule,
694 attempt_id,
695 &mut on_complete,
696 )
697 .await;
698 trace!(attempt=%attempt_id, ?try_num, ?outcome, "Download is over.");
699
700 if let Err(err) = outcome {
701 if state.is_ready(Readiness::Usable) {
702 usable = true;
703 info_report!(
704 err,
705 "Unable to completely download a directory. (Nevertheless, the directory is usable, so we'll pause for now)"
706 );
707 break 'retry_attempt;
708 }
709
710 match err.bootstrap_action() {
711 BootstrapAction::Nonfatal => {
712 return Err(into_internal!(
713 "Nonfatal error should not have propagated here"
714 )(err)
715 .into());
716 }
717 BootstrapAction::Reset => {}
718 BootstrapAction::Fatal => return Err(err),
719 }
720
721 let delay = retry_delay.next_delay(&mut rand::rng());
722 warn_report!(
723 err,
724 "Unable to download a usable directory. (We will restart in {})",
725 humantime::format_duration(delay),
726 );
727 {
728 let dirmgr = upgrade_weak_ref(&weak)?;
729 dirmgr.note_reset(attempt_id);
730 }
731 schedule.sleep(delay).await?;
732 state = state.reset();
733 } else {
734 info!(attempt=%attempt_id, "Directory is complete.");
735 usable = true;
736 break 'retry_attempt;
737 }
738 }
739
740 if !usable {
741 warn!(
743 "We failed {} times to bootstrap a directory. We're going to give up.",
744 retry_config.n_attempts()
745 );
746 return Err(Error::CantAdvanceState);
747 } else {
748 if let Some(send_done) = on_complete.take() {
750 let _ = send_done.send(());
751 }
752 }
753
754 let reset_at = state.reset_time();
755 match reset_at {
756 Some(t) => {
757 trace!("Sleeping until {}", time::OffsetDateTime::from(t));
758 schedule.sleep_until_wallclock(t).await?;
759 }
760 None => return Ok(()),
761 }
762 attempt_id = bootstrap::AttemptId::next();
763 trace!(attempt=%attempt_id, "Beginning new attempt to bootstrap directory");
764 state = state.reset();
765 }
766 }
767
768 fn circmgr(&self) -> Result<Arc<CircMgr<R>>> {
770 self.circmgr.clone().ok_or(Error::NoDownloadSupport)
771 }
772
773 pub fn reconfigure(
777 &self,
778 new_config: &DirMgrConfig,
779 how: tor_config::Reconfigure,
780 ) -> std::result::Result<(), tor_config::ReconfigureError> {
781 let config = self.config.get();
782 if new_config.cache_dir != config.cache_dir {
787 how.cannot_change("storage.cache_dir")?;
788 }
789 if new_config.cache_trust != config.cache_trust {
790 how.cannot_change("storage.permissions")?;
791 }
792 if new_config.authorities() != config.authorities() {
793 how.cannot_change("network.authorities")?;
794 }
795
796 if how == tor_config::Reconfigure::CheckAllOrNothing {
797 return Ok(());
798 }
799
800 let params_changed = new_config.override_net_params != config.override_net_params;
801
802 self.config
803 .map_and_replace(|cfg| cfg.update_from_config(new_config));
804
805 if params_changed {
806 let _ignore_err = self.netdir.mutate(|netdir| {
807 netdir.replace_overridden_parameters(&new_config.override_net_params);
808 Ok(())
809 });
810 {
811 let mut params = self.default_parameters.lock().expect("lock failed");
812 *params = Arc::new(NetParameters::from_map(&new_config.override_net_params));
813 }
814
815 self.events.publish(DirEvent::NewConsensus);
818 }
819
820 Ok(())
821 }
822
823 pub fn bootstrap_events(&self) -> event::DirBootstrapEvents {
829 self.receive_status.clone()
830 }
831
832 fn update_progress(&self, attempt_id: AttemptId, progress: DirProgress) {
835 let mut sender = self.send_status.lock().expect("poisoned lock");
837 let mut status = sender.borrow_mut();
838
839 status.update_progress(attempt_id, progress);
840 }
841
842 fn note_errors(&self, attempt_id: AttemptId, n_errors: usize) {
845 if n_errors == 0 {
846 return;
847 }
848 let mut sender = self.send_status.lock().expect("poisoned lock");
849 let mut status = sender.borrow_mut();
850
851 status.note_errors(attempt_id, n_errors);
852 }
853
854 fn note_reset(&self, attempt_id: AttemptId) {
856 let mut sender = self.send_status.lock().expect("poisoned lock");
857 let mut status = sender.borrow_mut();
858
859 status.note_reset(attempt_id);
860 }
861
862 fn try_upgrade_to_readwrite(&self) -> Result<bool> {
869 self.store
870 .lock()
871 .expect("Directory storage lock poisoned")
872 .upgrade_to_readwrite()
873 }
874
875 #[cfg(test)]
877 fn store_if_rw(&self) -> Option<&Mutex<DynStore>> {
878 let rw = !self
879 .store
880 .lock()
881 .expect("Directory storage lock poisoned")
882 .is_readonly();
883 if rw { Some(&self.store) } else { None }
885 }
886
887 #[allow(clippy::unnecessary_wraps)] fn from_config(
893 config: DirMgrConfig,
894 runtime: R,
895 store: DirMgrStore<R>,
896 circmgr: Option<Arc<CircMgr<R>>>,
897 offline: bool,
898 ) -> Result<Self> {
899 let netdir = Arc::new(SharedMutArc::new());
900 let events = event::FlagPublisher::new();
901 let default_parameters = NetParameters::from_map(&config.override_net_params);
902 let default_parameters = Mutex::new(Arc::new(default_parameters));
903
904 let (send_status, receive_status) = postage::watch::channel();
905 let send_status = Mutex::new(send_status);
906 let receive_status = DirBootstrapEvents {
907 inner: receive_status,
908 };
909 #[cfg(feature = "dirfilter")]
910 let filter = config.extensions.filter.clone();
911
912 let (task_schedule, task_handle) = TaskSchedule::new(runtime.clone());
914 let task_schedule = Mutex::new(Some(task_schedule));
915
916 let protocols = {
919 let store = store.store.lock().expect("lock poisoned");
920 store
921 .cached_protocol_recommendations()?
922 .map(|(t, p)| (t, Arc::new(p)))
923 };
924
925 Ok(DirMgr {
926 config: config.into(),
927 store: store.store,
928 netdir,
929 protocols: Mutex::new(protocols),
930 default_parameters,
931 events,
932 send_status,
933 receive_status,
934 circmgr,
935 runtime,
936 offline,
937 bootstrap_started: AtomicBool::new(false),
938 #[cfg(feature = "dirfilter")]
939 filter,
940 task_schedule,
941 task_handle,
942 })
943 }
944
945 fn load_directory(self: &Arc<Self>, attempt_id: AttemptId) -> Result<bool> {
950 let state = state::GetConsensusState::new(
951 self.runtime.clone(),
952 self.config.get(),
953 CacheUsage::CacheOnly,
954 None,
955 #[cfg(feature = "dirfilter")]
956 self.filter
957 .clone()
958 .unwrap_or_else(|| Arc::new(crate::filter::NilFilter)),
959 );
960 let _ = bootstrap::load(self, Box::new(state), attempt_id)?;
961
962 Ok(self.netdir.get().is_some())
963 }
964
965 pub fn events(&self) -> impl futures::Stream<Item = DirEvent> + use<R> {
972 self.events.subscribe()
973 }
974
975 pub fn text(&self, doc: &DocId) -> Result<Option<DocumentText>> {
978 use itertools::Itertools;
979 let mut result = HashMap::new();
980 let query: DocQuery = (*doc).into();
981 let store = self.store.lock().expect("store lock poisoned");
982 query.load_from_store_into(&mut result, &**store)?;
983 let item = result.into_iter().at_most_one().map_err(|_| {
984 Error::CacheCorruption("Found more than one entry in storage for given docid")
985 })?;
986 if let Some((docid, doctext)) = item {
987 if &docid != doc {
988 return Err(Error::CacheCorruption(
989 "Item from storage had incorrect docid.",
990 ));
991 }
992 Ok(Some(doctext))
993 } else {
994 Ok(None)
995 }
996 }
997
998 pub fn texts<T>(&self, docs: T) -> Result<HashMap<DocId, DocumentText>>
1003 where
1004 T: IntoIterator<Item = DocId>,
1005 {
1006 let partitioned = docid::partition_by_type(docs);
1007 let mut result = HashMap::new();
1008 let store = self.store.lock().expect("store lock poisoned");
1009 for (_, query) in partitioned.into_iter() {
1010 query.load_from_store_into(&mut result, &**store)?;
1011 }
1012 Ok(result)
1013 }
1014
1015 fn expand_response_text(&self, req: &ClientRequest, text: String) -> Result<String> {
1023 if let ClientRequest::Consensus(req) = req {
1024 if tor_consdiff::looks_like_diff(&text) {
1025 if let Some(old_d) = req.old_consensus_digests().next() {
1026 let db_val = {
1027 let s = self.store.lock().expect("Directory storage lock poisoned");
1028 s.consensus_by_sha3_digest_of_signed_part(old_d)?
1029 };
1030 if let Some((old_consensus, meta)) = db_val {
1031 info!("Applying a consensus diff");
1032 let new_consensus = tor_consdiff::apply_diff(
1033 old_consensus.as_str()?,
1034 &text,
1035 Some(*meta.sha3_256_of_signed()),
1036 )?;
1037 new_consensus.check_digest()?;
1038 return Ok(new_consensus.to_string());
1039 }
1040 }
1041 return Err(Error::Unwanted(
1042 "Received a consensus diff we did not ask for",
1043 ));
1044 }
1045 }
1046 Ok(text)
1047 }
1048
1049 #[allow(clippy::cognitive_complexity)]
1051 fn apply_netdir_changes(
1052 self: &Arc<Self>,
1053 state: &mut Box<dyn DirState>,
1054 store: &mut dyn Store,
1055 ) -> Result<()> {
1056 if let Some(change) = state.get_netdir_change() {
1057 match change {
1058 NetDirChange::AttemptReplace {
1059 netdir,
1060 consensus_meta,
1061 } => {
1062 if let Some(ref cm) = self.circmgr {
1065 if !cm
1066 .netdir_is_sufficient(netdir.as_ref().expect("AttemptReplace had None"))
1067 {
1068 debug!("Got a new NetDir, but it doesn't have enough guards yet.");
1069 return Ok(());
1070 }
1071 }
1072 let is_stale = {
1073 self.netdir
1075 .get()
1076 .map(|x| {
1077 x.lifetime().valid_after()
1078 > netdir
1079 .as_ref()
1080 .expect("AttemptReplace had None")
1081 .lifetime()
1082 .valid_after()
1083 })
1084 .unwrap_or(false)
1085 };
1086 if is_stale {
1087 warn!("Got a new NetDir, but it's older than the one we currently have!");
1088 return Err(Error::NetDirOlder);
1089 }
1090 let cfg = self.config.get();
1091 let mut netdir = netdir.take().expect("AttemptReplace had None");
1092 netdir.replace_overridden_parameters(&cfg.override_net_params);
1093 self.netdir.replace(netdir);
1094 self.events.publish(DirEvent::NewConsensus);
1095 self.events.publish(DirEvent::NewDescriptors);
1096
1097 info!("Marked consensus usable.");
1098 if !store.is_readonly() {
1099 store.mark_consensus_usable(consensus_meta)?;
1100 store.expire_all(&crate::storage::EXPIRATION_DEFAULTS)?;
1103 }
1104 Ok(())
1105 }
1106 NetDirChange::AddMicrodescs(mds) => {
1107 self.netdir.mutate(|netdir| {
1108 for md in mds.drain(..) {
1109 netdir.add_microdesc(md);
1110 }
1111 Ok(())
1112 })?;
1113 self.events.publish(DirEvent::NewDescriptors);
1114 Ok(())
1115 }
1116 NetDirChange::SetRequiredProtocol { timestamp, protos } => {
1117 if !store.is_readonly() {
1118 store.update_protocol_recommendations(timestamp, protos.as_ref())?;
1119 }
1120 let mut pr = self.protocols.lock().expect("Poisoned lock");
1121 *pr = Some((timestamp, protos));
1122 self.events.publish(DirEvent::NewProtocolRecommendation);
1123 Ok(())
1124 }
1125 }
1126 } else {
1127 Ok(())
1128 }
1129 }
1130}
1131
1132#[derive(Debug, Copy, Clone)]
1134enum Readiness {
1135 Complete,
1137 Usable,
1139}
1140
1141fn upgrade_weak_ref<T>(weak: &Weak<T>) -> Result<Arc<T>> {
1144 Weak::upgrade(weak).ok_or(Error::ManagerDropped)
1145}
1146
1147pub(crate) fn default_consensus_cutoff(
1150 now: SystemTime,
1151 tolerance: &DirTolerance,
1152) -> Result<SystemTime> {
1153 const MIN_AGE_TO_ALLOW: Duration = Duration::from_secs(3 * 3600);
1156 let allow_skew = std::cmp::max(MIN_AGE_TO_ALLOW, tolerance.post_valid_tolerance());
1157 let cutoff = time::OffsetDateTime::from(now - allow_skew);
1158 let (h, _m, _s) = cutoff.to_hms();
1165 let cutoff = cutoff.replace_time(
1166 time::Time::from_hms(h, 0, 0)
1167 .map_err(tor_error::into_internal!("Failed clock calculation"))?,
1168 );
1169 let cutoff = cutoff + Duration::from_secs(3600);
1170
1171 Ok(cutoff.into())
1172}
1173
1174pub fn supported_client_protocols() -> tor_protover::Protocols {
1177 use tor_protover::named::*;
1178 [
1181 DIRCACHE_CONSDIFF,
1183 ]
1184 .into_iter()
1185 .collect()
1186}
1187
1188#[cfg(test)]
1189mod test {
1190 #![allow(clippy::bool_assert_comparison)]
1192 #![allow(clippy::clone_on_copy)]
1193 #![allow(clippy::dbg_macro)]
1194 #![allow(clippy::mixed_attributes_style)]
1195 #![allow(clippy::print_stderr)]
1196 #![allow(clippy::print_stdout)]
1197 #![allow(clippy::single_char_pattern)]
1198 #![allow(clippy::unwrap_used)]
1199 #![allow(clippy::unchecked_time_subtraction)]
1200 #![allow(clippy::useless_vec)]
1201 #![allow(clippy::needless_pass_by_value)]
1202 use super::*;
1204 use crate::docmeta::{AuthCertMeta, ConsensusMeta};
1205 use std::time::Duration;
1206 use tempfile::TempDir;
1207 use tor_basic_utils::test_rng::testing_rng;
1208 use tor_netdoc::doc::netstatus::ConsensusFlavor;
1209 use tor_netdoc::doc::{authcert::AuthCertKeyIds, netstatus::Lifetime};
1210 use tor_rtcompat::SleepProvider;
1211
1212 #[test]
1213 fn protocols() {
1214 let pr = supported_client_protocols();
1215 let expected = "DirCache=2".parse().unwrap();
1216 assert_eq!(pr, expected);
1217 }
1218
1219 pub(crate) fn new_mgr<R: Runtime>(runtime: R) -> (TempDir, DirMgr<R>) {
1220 let dir = TempDir::new().unwrap();
1221 let config = DirMgrConfig {
1222 cache_dir: dir.path().into(),
1223 ..Default::default()
1224 };
1225 let store = DirMgrStore::new(&config, runtime.clone(), false).unwrap();
1226 let dirmgr = DirMgr::from_config(config, runtime, store, None, false).unwrap();
1227
1228 (dir, dirmgr)
1229 }
1230
1231 #[test]
1232 fn failing_accessors() {
1233 tor_rtcompat::test_with_one_runtime!(|rt| async {
1234 let (_tempdir, mgr) = new_mgr(rt);
1235
1236 assert!(mgr.circmgr().is_err());
1237 assert!(mgr.netdir(Timeliness::Unchecked).is_err());
1238 });
1239 }
1240
1241 #[test]
1242 fn load_and_store_internals() {
1243 tor_rtcompat::test_with_one_runtime!(|rt| async {
1244 let now = rt.wallclock();
1245 let tomorrow = now + Duration::from_secs(86400);
1246 let later = tomorrow + Duration::from_secs(86400);
1247
1248 let (_tempdir, mgr) = new_mgr(rt);
1249
1250 let d1 = [5_u8; 32];
1252 let d2 = [7; 32];
1253 let d3 = [42; 32];
1254 let d4 = [99; 20];
1255 let d5 = [12; 20];
1256 let certid1 = AuthCertKeyIds {
1257 id_fingerprint: d4.into(),
1258 sk_fingerprint: d5.into(),
1259 };
1260 let certid2 = AuthCertKeyIds {
1261 id_fingerprint: d5.into(),
1262 sk_fingerprint: d4.into(),
1263 };
1264
1265 {
1266 let mut store = mgr.store.lock().unwrap();
1267
1268 store
1269 .store_microdescs(
1270 &[
1271 ("Fake micro 1", &d1),
1272 ("Fake micro 2", &d2),
1273 ("Fake micro 3", &d3),
1274 ],
1275 now,
1276 )
1277 .unwrap();
1278
1279 #[cfg(feature = "routerdesc")]
1280 store
1281 .store_routerdescs(&[("Fake rd1", now, &d4), ("Fake rd2", now, &d5)])
1282 .unwrap();
1283
1284 store
1285 .store_authcerts(&[
1286 (
1287 AuthCertMeta::new(certid1, now, tomorrow),
1288 "Fake certificate one",
1289 ),
1290 (
1291 AuthCertMeta::new(certid2, now, tomorrow),
1292 "Fake certificate two",
1293 ),
1294 ])
1295 .unwrap();
1296
1297 let cmeta = ConsensusMeta::new(
1298 Lifetime::new(now, tomorrow, later).unwrap(),
1299 [102; 32],
1300 [103; 32],
1301 );
1302 store
1303 .store_consensus(&cmeta, ConsensusFlavor::Microdesc, false, "Fake consensus!")
1304 .unwrap();
1305 }
1306
1307 let t1 = mgr.text(&DocId::Microdesc(d1)).unwrap().unwrap();
1309 assert_eq!(t1.as_str(), Ok("Fake micro 1"));
1310
1311 let t2 = mgr
1312 .text(&DocId::LatestConsensus {
1313 flavor: ConsensusFlavor::Microdesc,
1314 cache_usage: CacheUsage::CacheOkay,
1315 })
1316 .unwrap()
1317 .unwrap();
1318 assert_eq!(t2.as_str(), Ok("Fake consensus!"));
1319
1320 let t3 = mgr.text(&DocId::Microdesc([255; 32])).unwrap();
1321 assert!(t3.is_none());
1322
1323 let d_bogus = DocId::Microdesc([255; 32]);
1325 let res = mgr
1326 .texts(vec![
1327 DocId::Microdesc(d2),
1328 DocId::Microdesc(d3),
1329 d_bogus,
1330 DocId::AuthCert(certid2),
1331 #[cfg(feature = "routerdesc")]
1332 DocId::RouterDesc(d5),
1333 ])
1334 .unwrap();
1335 assert_eq!(
1336 res.get(&DocId::Microdesc(d2)).unwrap().as_str(),
1337 Ok("Fake micro 2")
1338 );
1339 assert_eq!(
1340 res.get(&DocId::Microdesc(d3)).unwrap().as_str(),
1341 Ok("Fake micro 3")
1342 );
1343 assert!(!res.contains_key(&d_bogus));
1344 assert_eq!(
1345 res.get(&DocId::AuthCert(certid2)).unwrap().as_str(),
1346 Ok("Fake certificate two")
1347 );
1348 #[cfg(feature = "routerdesc")]
1349 assert_eq!(
1350 res.get(&DocId::RouterDesc(d5)).unwrap().as_str(),
1351 Ok("Fake rd2")
1352 );
1353 });
1354 }
1355
1356 #[test]
1357 fn make_consensus_request() {
1358 tor_rtcompat::test_with_one_runtime!(|rt| async {
1359 let now = rt.wallclock();
1360 let tomorrow = now + Duration::from_secs(86400);
1361 let later = tomorrow + Duration::from_secs(86400);
1362
1363 let (_tempdir, mgr) = new_mgr(rt);
1364 let config = DirMgrConfig::default();
1365
1366 let req = {
1368 let store = mgr.store.lock().unwrap();
1369 bootstrap::make_consensus_request(
1370 now,
1371 ConsensusFlavor::Microdesc,
1372 &**store,
1373 &config,
1374 )
1375 .unwrap()
1376 };
1377 let tolerance = DirTolerance::default().post_valid_tolerance();
1378 match req {
1379 ClientRequest::Consensus(r) => {
1380 assert_eq!(r.old_consensus_digests().count(), 0);
1381 let date = r.last_consensus_date().unwrap();
1382 assert!(date >= now - tolerance);
1383 assert!(date <= now - tolerance + Duration::from_secs(3600));
1384 }
1385 _ => panic!("Wrong request type"),
1386 }
1387
1388 let d_prev = [42; 32];
1390 {
1391 let mut store = mgr.store.lock().unwrap();
1392
1393 let cmeta = ConsensusMeta::new(
1394 Lifetime::new(now, tomorrow, later).unwrap(),
1395 d_prev,
1396 [103; 32],
1397 );
1398 store
1399 .store_consensus(&cmeta, ConsensusFlavor::Microdesc, false, "Fake consensus!")
1400 .unwrap();
1401 }
1402
1403 let req = {
1405 let store = mgr.store.lock().unwrap();
1406 bootstrap::make_consensus_request(
1407 now,
1408 ConsensusFlavor::Microdesc,
1409 &**store,
1410 &config,
1411 )
1412 .unwrap()
1413 };
1414 match req {
1415 ClientRequest::Consensus(r) => {
1416 let ds: Vec<_> = r.old_consensus_digests().collect();
1417 assert_eq!(ds.len(), 1);
1418 assert_eq!(ds[0], &d_prev);
1419 assert_eq!(r.last_consensus_date(), Some(now));
1420 }
1421 _ => panic!("Wrong request type"),
1422 }
1423 });
1424 }
1425
1426 #[test]
1427 fn make_other_requests() {
1428 tor_rtcompat::test_with_one_runtime!(|rt| async {
1429 use rand::Rng;
1430 let (_tempdir, mgr) = new_mgr(rt);
1431
1432 let certid1 = AuthCertKeyIds {
1433 id_fingerprint: [99; 20].into(),
1434 sk_fingerprint: [100; 20].into(),
1435 };
1436 let mut rng = testing_rng();
1437 #[cfg(feature = "routerdesc")]
1438 let rd_ids: Vec<DocId> = (0..1000).map(|_| DocId::RouterDesc(rng.random())).collect();
1439 let md_ids: Vec<DocId> = (0..1000).map(|_| DocId::Microdesc(rng.random())).collect();
1440 let config = DirMgrConfig::default();
1441
1442 let query = DocId::AuthCert(certid1);
1444 let store = mgr.store.lock().unwrap();
1445 let reqs =
1446 bootstrap::make_requests_for_documents(&mgr.runtime, &[query], &**store, &config)
1447 .unwrap();
1448 assert_eq!(reqs.len(), 1);
1449 let req = &reqs[0];
1450 if let ClientRequest::AuthCert(r) = req {
1451 assert_eq!(r.keys().next(), Some(&certid1));
1452 } else {
1453 panic!();
1454 }
1455
1456 let reqs =
1458 bootstrap::make_requests_for_documents(&mgr.runtime, &md_ids, &**store, &config)
1459 .unwrap();
1460 assert_eq!(reqs.len(), 2);
1461 assert!(matches!(reqs[0], ClientRequest::Microdescs(_)));
1462
1463 #[cfg(feature = "routerdesc")]
1465 {
1466 let reqs = bootstrap::make_requests_for_documents(
1467 &mgr.runtime,
1468 &rd_ids,
1469 &**store,
1470 &config,
1471 )
1472 .unwrap();
1473 assert_eq!(reqs.len(), 2);
1474 assert!(matches!(reqs[0], ClientRequest::RouterDescs(_)));
1475 }
1476 });
1477 }
1478
1479 #[test]
1480 fn expand_response() {
1481 tor_rtcompat::test_with_one_runtime!(|rt| async {
1482 let now = rt.wallclock();
1483 let day = Duration::from_secs(86400);
1484 let config = DirMgrConfig::default();
1485
1486 let (_tempdir, mgr) = new_mgr(rt);
1487
1488 let q = DocId::Microdesc([99; 32]);
1490 let r = {
1491 let store = mgr.store.lock().unwrap();
1492 bootstrap::make_requests_for_documents(&mgr.runtime, &[q], &**store, &config)
1493 .unwrap()
1494 };
1495 let expanded = mgr.expand_response_text(&r[0], "ABC".to_string());
1496 assert_eq!(&expanded.unwrap(), "ABC");
1497
1498 let latest_id = DocId::LatestConsensus {
1501 flavor: ConsensusFlavor::Microdesc,
1502 cache_usage: CacheUsage::CacheOkay,
1503 };
1504 let r = {
1505 let store = mgr.store.lock().unwrap();
1506 bootstrap::make_requests_for_documents(
1507 &mgr.runtime,
1508 &[latest_id],
1509 &**store,
1510 &config,
1511 )
1512 .unwrap()
1513 };
1514 let expanded = mgr.expand_response_text(&r[0], "DEF".to_string());
1515 assert_eq!(&expanded.unwrap(), "DEF");
1516
1517 {
1520 let mut store = mgr.store.lock().unwrap();
1521 let d_in = [0x99; 32]; let cmeta = ConsensusMeta::new(
1523 Lifetime::new(now, now + day, now + 2 * day).unwrap(),
1524 d_in,
1525 d_in,
1526 );
1527 store
1528 .store_consensus(
1529 &cmeta,
1530 ConsensusFlavor::Microdesc,
1531 false,
1532 "line 1\nline2\nline 3\n",
1533 )
1534 .unwrap();
1535 }
1536
1537 let r = {
1540 let store = mgr.store.lock().unwrap();
1541 bootstrap::make_requests_for_documents(
1542 &mgr.runtime,
1543 &[latest_id],
1544 &**store,
1545 &config,
1546 )
1547 .unwrap()
1548 };
1549 let expanded = mgr.expand_response_text(&r[0], "hello".to_string());
1550 assert_eq!(&expanded.unwrap(), "hello");
1551
1552 let diff = "network-status-diff-version 1
1554hash 9999999999999999999999999999999999999999999999999999999999999999 8382374ca766873eb0d2530643191c6eaa2c5e04afa554cbac349b5d0592d300
15552c
1556replacement line
1557.
1558".to_string();
1559 let expanded = mgr.expand_response_text(&r[0], diff);
1560
1561 assert_eq!(expanded.unwrap(), "line 1\nreplacement line\nline 3\n");
1562
1563 let diff = "network-status-diff-version 1
1565hash 9999999999999999999999999999999999999999999999999999999999999999 9999999999999999999999999999999999999999999999999999999999999999
15662c
1567replacement line
1568.
1569".to_string();
1570 let expanded = mgr.expand_response_text(&r[0], diff);
1571 assert!(expanded.is_err());
1572 });
1573 }
1574}