#![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg))]
#![doc = include_str!("../README.md")]
#![cfg_attr(not(ci_arti_stable), allow(renamed_and_removed_lints))]
#![cfg_attr(not(ci_arti_nightly), allow(unknown_lints))]
#![deny(missing_docs)]
#![warn(noop_method_call)]
#![deny(unreachable_pub)]
#![warn(clippy::all)]
#![deny(clippy::await_holding_lock)]
#![deny(clippy::cargo_common_metadata)]
#![deny(clippy::cast_lossless)]
#![deny(clippy::checked_conversions)]
#![warn(clippy::cognitive_complexity)]
#![deny(clippy::debug_assert_with_mut_call)]
#![deny(clippy::exhaustive_enums)]
#![deny(clippy::exhaustive_structs)]
#![deny(clippy::expl_impl_clone_on_copy)]
#![deny(clippy::fallible_impl_from)]
#![deny(clippy::implicit_clone)]
#![deny(clippy::large_stack_arrays)]
#![warn(clippy::manual_ok_or)]
#![deny(clippy::missing_docs_in_private_items)]
#![deny(clippy::missing_panics_doc)]
#![warn(clippy::needless_borrow)]
#![warn(clippy::needless_pass_by_value)]
#![warn(clippy::option_option)]
#![warn(clippy::rc_buffer)]
#![deny(clippy::ref_option_ref)]
#![warn(clippy::semicolon_if_nothing_returned)]
#![warn(clippy::trait_duplication_in_bounds)]
#![deny(clippy::unnecessary_wraps)]
#![warn(clippy::unseparated_literal_suffix)]
#![deny(clippy::unwrap_used)]
#![allow(clippy::let_unit_value)] #![allow(clippy::significant_drop_in_scrutinee)] #![allow(clippy::result_large_err)]
#![allow(clippy::single_component_path_imports)]
pub mod authority;
mod bootstrap;
pub mod config;
mod docid;
mod docmeta;
mod err;
mod event;
mod retry;
mod shared_ref;
mod state;
mod storage;
#[cfg(feature = "bridge-client")]
pub mod bridgedesc;
#[cfg(feature = "dirfilter")]
pub mod filter;
use crate::docid::{CacheUsage, ClientRequest, DocQuery};
use crate::err::BootstrapAction;
#[cfg(not(feature = "experimental-api"))]
use crate::shared_ref::SharedMutArc;
#[cfg(feature = "experimental-api")]
pub use crate::shared_ref::SharedMutArc;
use crate::storage::{DynStore, Store};
use bootstrap::AttemptId;
use event::DirProgress;
use postage::watch;
pub use retry::{DownloadSchedule, DownloadScheduleBuilder};
use scopeguard::ScopeGuard;
use tor_circmgr::CircMgr;
use tor_dirclient::SourceInfo;
use tor_error::into_internal;
use tor_netdir::params::NetParameters;
use tor_netdir::{DirEvent, MdReceiver, NetDir, NetDirProvider};
use async_trait::async_trait;
use futures::{channel::oneshot, stream::BoxStream, task::SpawnExt};
use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
use tor_rtcompat::Runtime;
use tracing::{debug, info, trace, warn};
use std::marker::PhantomData;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::{collections::HashMap, sync::Weak};
use std::{fmt::Debug, time::SystemTime};
use crate::state::{DirState, NetDirChange};
pub use authority::{Authority, AuthorityBuilder};
pub use config::{
DirMgrConfig, DirTolerance, DirToleranceBuilder, DownloadScheduleConfig,
DownloadScheduleConfigBuilder, NetworkConfig, NetworkConfigBuilder,
};
pub use docid::DocId;
pub use err::Error;
pub use event::{DirBlockage, DirBootstrapEvents, DirBootstrapStatus};
pub use storage::DocumentText;
pub use tor_guardmgr::fallback::{FallbackDir, FallbackDirBuilder};
pub use tor_netdir::Timeliness;
use strum;
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Clone)]
pub struct DirMgrStore<R: Runtime> {
pub(crate) store: Arc<Mutex<crate::DynStore>>,
pub(crate) runtime: PhantomData<R>,
}
impl<R: Runtime> DirMgrStore<R> {
pub fn new(config: &DirMgrConfig, runtime: R, offline: bool) -> Result<Self> {
let store = Arc::new(Mutex::new(config.open_store(offline)?));
drop(runtime);
let runtime = PhantomData;
Ok(DirMgrStore { store, runtime })
}
}
#[async_trait]
pub trait DirProvider: NetDirProvider {
fn reconfigure(
&self,
new_config: &DirMgrConfig,
how: tor_config::Reconfigure,
) -> std::result::Result<(), tor_config::ReconfigureError>;
async fn bootstrap(&self) -> Result<()>;
fn bootstrap_events(&self) -> BoxStream<'static, DirBootstrapStatus>;
fn download_task_handle(&self) -> Option<TaskHandle> {
None
}
}
impl<R: Runtime> NetDirProvider for DirMgr<R> {
fn netdir(&self, timeliness: Timeliness) -> tor_netdir::Result<Arc<NetDir>> {
use tor_netdir::Error as NetDirError;
let netdir = self.netdir.get().ok_or(NetDirError::NoInfo)?;
let lifetime = match timeliness {
Timeliness::Strict => netdir.lifetime().clone(),
Timeliness::Timely => self
.config
.get()
.tolerance
.extend_lifetime(netdir.lifetime()),
Timeliness::Unchecked => return Ok(netdir),
};
let now = SystemTime::now();
if lifetime.valid_after() > now {
Err(NetDirError::DirNotYetValid)
} else if lifetime.valid_until() < now {
Err(NetDirError::DirExpired)
} else {
Ok(netdir)
}
}
fn events(&self) -> BoxStream<'static, DirEvent> {
Box::pin(self.events.subscribe())
}
fn params(&self) -> Arc<dyn AsRef<tor_netdir::params::NetParameters>> {
if let Some(netdir) = self.netdir.get() {
netdir
} else {
self.default_parameters
.lock()
.expect("Poisoned lock")
.clone()
}
}
}
#[async_trait]
impl<R: Runtime> DirProvider for Arc<DirMgr<R>> {
fn reconfigure(
&self,
new_config: &DirMgrConfig,
how: tor_config::Reconfigure,
) -> std::result::Result<(), tor_config::ReconfigureError> {
DirMgr::reconfigure(self, new_config, how)
}
async fn bootstrap(&self) -> Result<()> {
DirMgr::bootstrap(self).await
}
fn bootstrap_events(&self) -> BoxStream<'static, DirBootstrapStatus> {
Box::pin(DirMgr::bootstrap_events(self))
}
fn download_task_handle(&self) -> Option<TaskHandle> {
Some(self.task_handle.clone())
}
}
pub struct DirMgr<R: Runtime> {
config: tor_config::MutCfg<DirMgrConfig>,
store: Arc<Mutex<DynStore>>,
netdir: Arc<SharedMutArc<NetDir>>,
default_parameters: Mutex<Arc<NetParameters>>,
events: event::FlagPublisher<DirEvent>,
send_status: Mutex<watch::Sender<event::DirBootstrapStatus>>,
receive_status: DirBootstrapEvents,
circmgr: Option<Arc<CircMgr<R>>>,
runtime: R,
offline: bool,
bootstrap_started: AtomicBool,
#[cfg(feature = "dirfilter")]
filter: crate::filter::FilterConfig,
task_schedule: Mutex<Option<TaskSchedule<R>>>,
task_handle: TaskHandle,
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum DocSource {
LocalCache,
DirServer {
source: Option<SourceInfo>,
},
}
impl std::fmt::Display for DocSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DocSource::LocalCache => write!(f, "local cache"),
DocSource::DirServer { source: None } => write!(f, "directory server"),
DocSource::DirServer { source: Some(info) } => write!(f, "directory server {}", info),
}
}
}
impl<R: Runtime> DirMgr<R> {
pub async fn load_once(runtime: R, config: DirMgrConfig) -> Result<Arc<NetDir>> {
let store = DirMgrStore::new(&config, runtime.clone(), true)?;
let dirmgr = Arc::new(Self::from_config(config, runtime, store, None, true)?);
let _success = dirmgr.load_directory(AttemptId::next()).await?;
dirmgr
.netdir(Timeliness::Timely)
.map_err(|_| Error::DirectoryNotPresent)
}
pub async fn load_or_bootstrap_once(
config: DirMgrConfig,
runtime: R,
store: DirMgrStore<R>,
circmgr: Arc<CircMgr<R>>,
) -> Result<Arc<NetDir>> {
let dirmgr = DirMgr::bootstrap_from_config(config, runtime, store, circmgr).await?;
dirmgr
.timely_netdir()
.map_err(|_| Error::DirectoryNotPresent)
}
pub fn create_unbootstrapped(
config: DirMgrConfig,
runtime: R,
store: DirMgrStore<R>,
circmgr: Arc<CircMgr<R>>,
) -> Result<Arc<Self>> {
Ok(Arc::new(DirMgr::from_config(
config,
runtime,
store,
Some(circmgr),
false,
)?))
}
pub async fn bootstrap(self: &Arc<Self>) -> Result<()> {
if self.offline {
return Err(Error::OfflineMode);
}
if self
.bootstrap_started
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_err()
{
debug!("Attempted to bootstrap twice; ignoring.");
return Ok(());
}
let reset_bootstrap_started = scopeguard::guard(&self.bootstrap_started, |v| {
v.store(false, Ordering::SeqCst);
});
let schedule = {
let sched = self.task_schedule.lock().expect("poisoned lock").take();
match sched {
Some(sched) => sched,
None => {
debug!("Attempted to bootstrap twice; ignoring.");
return Ok(());
}
}
};
let attempt_id = AttemptId::next();
let have_directory = self.load_directory(attempt_id).await?;
let (mut sender, receiver) = if have_directory {
info!("Loaded a good directory from cache.");
(None, None)
} else {
info!("Didn't get usable directory from cache.");
let (sender, receiver) = oneshot::channel();
(Some(sender), Some(receiver))
};
let dirmgr_weak = Arc::downgrade(self);
self.runtime
.spawn(async move {
let mut schedule = scopeguard::guard(schedule, |schedule| {
if let Some(dm) = Weak::upgrade(&dirmgr_weak) {
*dm.task_schedule.lock().expect("poisoned lock") = Some(schedule);
}
});
if let Err(e) =
Self::reload_until_owner(&dirmgr_weak, &mut schedule, attempt_id, &mut sender)
.await
{
match e {
Error::ManagerDropped => {}
_ => warn!("Unrecovered error while waiting for bootstrap: {}", e),
}
} else if let Err(e) =
Self::download_forever(dirmgr_weak.clone(), &mut schedule, attempt_id, sender)
.await
{
match e {
Error::ManagerDropped => {}
_ => warn!("Unrecovered error while downloading: {}", e),
}
}
})
.map_err(|e| Error::from_spawn("directory updater task", e))?;
if let Some(receiver) = receiver {
match receiver.await {
Ok(()) => {
info!("We have enough information to build circuits.");
let _ = ScopeGuard::into_inner(reset_bootstrap_started);
}
Err(_) => {
warn!("Bootstrapping task exited before finishing.");
return Err(Error::CantAdvanceState);
}
}
}
Ok(())
}
pub fn bootstrap_started(&self) -> bool {
self.bootstrap_started.load(Ordering::SeqCst)
}
pub async fn bootstrap_from_config(
config: DirMgrConfig,
runtime: R,
store: DirMgrStore<R>,
circmgr: Arc<CircMgr<R>>,
) -> Result<Arc<Self>> {
let dirmgr = Self::create_unbootstrapped(config, runtime, store, circmgr)?;
dirmgr.bootstrap().await?;
Ok(dirmgr)
}
async fn reload_until_owner(
weak: &Weak<Self>,
schedule: &mut TaskSchedule<R>,
attempt_id: AttemptId,
on_complete: &mut Option<oneshot::Sender<()>>,
) -> Result<()> {
let mut logged = false;
let mut bootstrapped;
{
let dirmgr = upgrade_weak_ref(weak)?;
bootstrapped = dirmgr.netdir.get().is_some();
}
loop {
{
let dirmgr = upgrade_weak_ref(weak)?;
trace!("Trying to take ownership of the directory cache lock");
if dirmgr.try_upgrade_to_readwrite()? {
if logged {
info!("The previous owning process has given up the lock. We are now in charge of managing the directory.");
}
return Ok(());
}
}
if !logged {
logged = true;
if bootstrapped {
info!("Another process is managing the directory. We'll use its cache.");
} else {
info!("Another process is bootstrapping the directory. Waiting till it finishes or exits.");
}
}
let pause = if bootstrapped {
std::time::Duration::new(120, 0)
} else {
std::time::Duration::new(5, 0)
};
schedule.sleep(pause).await?;
{
let dirmgr = upgrade_weak_ref(weak)?;
trace!("Trying to load from the directory cache");
if dirmgr.load_directory(attempt_id).await? {
if let Some(send_done) = on_complete.take() {
let _ = send_done.send(());
}
if !bootstrapped {
info!("The directory is now bootstrapped.");
}
bootstrapped = true;
}
}
}
}
async fn download_forever(
weak: Weak<Self>,
schedule: &mut TaskSchedule<R>,
mut attempt_id: AttemptId,
mut on_complete: Option<oneshot::Sender<()>>,
) -> Result<()> {
let mut state: Box<dyn DirState> = {
let dirmgr = upgrade_weak_ref(&weak)?;
Box::new(state::GetConsensusState::new(
dirmgr.runtime.clone(),
dirmgr.config.get(),
CacheUsage::CacheOkay,
Some(dirmgr.netdir.clone()),
#[cfg(feature = "dirfilter")]
dirmgr
.filter
.clone()
.unwrap_or_else(|| Arc::new(crate::filter::NilFilter)),
))
};
loop {
let mut usable = false;
let retry_config = {
let dirmgr = upgrade_weak_ref(&weak)?;
dirmgr.config.get().schedule.retry_bootstrap
};
let mut retry_delay = retry_config.schedule();
'retry_attempt: for _ in retry_config.attempts() {
let outcome = bootstrap::download(
Weak::clone(&weak),
&mut state,
schedule,
attempt_id,
&mut on_complete,
)
.await;
if let Err(err) = outcome {
if state.is_ready(Readiness::Usable) {
usable = true;
info!("Unable to completely download a directory: {}. Nevertheless, the directory is usable, so we'll pause for now.", err);
break 'retry_attempt;
}
match err.bootstrap_action() {
BootstrapAction::Nonfatal => {
return Err(into_internal!(
"Nonfatal error should not have propagated here"
)(err)
.into());
}
BootstrapAction::Reset => {}
BootstrapAction::Fatal => return Err(err),
}
let delay = retry_delay.next_delay(&mut rand::thread_rng());
warn!(
"Unable to download a usable directory: {}. We will restart in {:?}.",
err, delay
);
{
let dirmgr = upgrade_weak_ref(&weak)?;
dirmgr.note_reset(attempt_id);
}
schedule.sleep(delay).await?;
state = state.reset();
} else {
info!("Directory is complete.");
usable = true;
break 'retry_attempt;
}
}
if !usable {
warn!(
"We failed {} times to bootstrap a directory. We're going to give up.",
retry_config.n_attempts()
);
return Err(Error::CantAdvanceState);
} else {
if let Some(send_done) = on_complete.take() {
let _ = send_done.send(());
}
}
let reset_at = state.reset_time();
match reset_at {
Some(t) => schedule.sleep_until_wallclock(t).await?,
None => return Ok(()),
}
attempt_id = bootstrap::AttemptId::next();
state = state.reset();
}
}
fn circmgr(&self) -> Result<Arc<CircMgr<R>>> {
self.circmgr
.as_ref()
.map(Arc::clone)
.ok_or(Error::NoDownloadSupport)
}
pub fn reconfigure(
&self,
new_config: &DirMgrConfig,
how: tor_config::Reconfigure,
) -> std::result::Result<(), tor_config::ReconfigureError> {
let config = self.config.get();
if new_config.cache_path != config.cache_path {
how.cannot_change("storage.cache_path")?;
}
if new_config.authorities() != config.authorities() {
how.cannot_change("network.authorities")?;
}
if how == tor_config::Reconfigure::CheckAllOrNothing {
return Ok(());
}
let params_changed = new_config.override_net_params != config.override_net_params;
self.config
.map_and_replace(|cfg| cfg.update_from_config(new_config));
if params_changed {
let _ignore_err = self.netdir.mutate(|netdir| {
netdir.replace_overridden_parameters(&new_config.override_net_params);
Ok(())
});
{
let mut params = self.default_parameters.lock().expect("lock failed");
*params = Arc::new(NetParameters::from_map(&new_config.override_net_params));
}
self.events.publish(DirEvent::NewConsensus);
}
Ok(())
}
pub fn bootstrap_events(&self) -> event::DirBootstrapEvents {
self.receive_status.clone()
}
fn update_progress(&self, attempt_id: AttemptId, progress: DirProgress) {
let mut sender = self.send_status.lock().expect("poisoned lock");
let mut status = sender.borrow_mut();
status.update_progress(attempt_id, progress);
}
fn note_errors(&self, attempt_id: AttemptId, n_errors: usize) {
if n_errors == 0 {
return;
}
let mut sender = self.send_status.lock().expect("poisoned lock");
let mut status = sender.borrow_mut();
status.note_errors(attempt_id, n_errors);
}
fn note_reset(&self, attempt_id: AttemptId) {
let mut sender = self.send_status.lock().expect("poisoned lock");
let mut status = sender.borrow_mut();
status.note_reset(attempt_id);
}
fn try_upgrade_to_readwrite(&self) -> Result<bool> {
self.store
.lock()
.expect("Directory storage lock poisoned")
.upgrade_to_readwrite()
}
#[cfg(test)]
fn store_if_rw(&self) -> Option<&Mutex<DynStore>> {
let rw = !self
.store
.lock()
.expect("Directory storage lock poisoned")
.is_readonly();
if rw {
Some(&self.store)
} else {
None
}
}
#[allow(clippy::unnecessary_wraps)] fn from_config(
config: DirMgrConfig,
runtime: R,
store: DirMgrStore<R>,
circmgr: Option<Arc<CircMgr<R>>>,
offline: bool,
) -> Result<Self> {
let netdir = Arc::new(SharedMutArc::new());
let events = event::FlagPublisher::new();
let default_parameters = NetParameters::from_map(&config.override_net_params);
let default_parameters = Mutex::new(Arc::new(default_parameters));
let (send_status, receive_status) = postage::watch::channel();
let send_status = Mutex::new(send_status);
let receive_status = DirBootstrapEvents {
inner: receive_status,
};
#[cfg(feature = "dirfilter")]
let filter = config.extensions.filter.clone();
let (task_schedule, task_handle) = TaskSchedule::new(runtime.clone());
let task_schedule = Mutex::new(Some(task_schedule));
Ok(DirMgr {
config: config.into(),
store: store.store,
netdir,
default_parameters,
events,
send_status,
receive_status,
circmgr,
runtime,
offline,
bootstrap_started: AtomicBool::new(false),
#[cfg(feature = "dirfilter")]
filter,
task_schedule,
task_handle,
})
}
async fn load_directory(self: &Arc<Self>, attempt_id: AttemptId) -> Result<bool> {
let state = state::GetConsensusState::new(
self.runtime.clone(),
self.config.get(),
CacheUsage::CacheOnly,
None,
#[cfg(feature = "dirfilter")]
self.filter
.clone()
.unwrap_or_else(|| Arc::new(crate::filter::NilFilter)),
);
let _ = bootstrap::load(Arc::clone(self), Box::new(state), attempt_id).await?;
Ok(self.netdir.get().is_some())
}
pub fn events(&self) -> impl futures::Stream<Item = DirEvent> {
self.events.subscribe()
}
pub fn text(&self, doc: &DocId) -> Result<Option<DocumentText>> {
use itertools::Itertools;
let mut result = HashMap::new();
let query: DocQuery = (*doc).into();
let store = self.store.lock().expect("store lock poisoned");
query.load_from_store_into(&mut result, &**store)?;
let item = result.into_iter().at_most_one().map_err(|_| {
Error::CacheCorruption("Found more than one entry in storage for given docid")
})?;
if let Some((docid, doctext)) = item {
if &docid != doc {
return Err(Error::CacheCorruption(
"Item from storage had incorrect docid.",
));
}
Ok(Some(doctext))
} else {
Ok(None)
}
}
pub fn texts<T>(&self, docs: T) -> Result<HashMap<DocId, DocumentText>>
where
T: IntoIterator<Item = DocId>,
{
let partitioned = docid::partition_by_type(docs);
let mut result = HashMap::new();
let store = self.store.lock().expect("store lock poisoned");
for (_, query) in partitioned.into_iter() {
query.load_from_store_into(&mut result, &**store)?;
}
Ok(result)
}
fn expand_response_text(&self, req: &ClientRequest, text: String) -> Result<String> {
if let ClientRequest::Consensus(req) = req {
if tor_consdiff::looks_like_diff(&text) {
if let Some(old_d) = req.old_consensus_digests().next() {
let db_val = {
let s = self.store.lock().expect("Directory storage lock poisoned");
s.consensus_by_sha3_digest_of_signed_part(old_d)?
};
if let Some((old_consensus, meta)) = db_val {
info!("Applying a consensus diff");
let new_consensus = tor_consdiff::apply_diff(
old_consensus.as_str()?,
&text,
Some(*meta.sha3_256_of_signed()),
)?;
new_consensus.check_digest()?;
return Ok(new_consensus.to_string());
}
}
return Err(Error::Unwanted(
"Received a consensus diff we did not ask for",
));
}
}
Ok(text)
}
fn apply_netdir_changes(
self: &Arc<Self>,
state: &mut Box<dyn DirState>,
store: &mut dyn Store,
) -> Result<()> {
if let Some(change) = state.get_netdir_change() {
match change {
NetDirChange::AttemptReplace {
netdir,
consensus_meta,
} => {
if let Some(ref cm) = self.circmgr {
if !cm
.netdir_is_sufficient(netdir.as_ref().expect("AttemptReplace had None"))
{
debug!("Got a new NetDir, but it doesn't have enough guards yet.");
return Ok(());
}
}
let is_stale = {
self.netdir
.get()
.map(|x| {
x.lifetime().valid_after()
> netdir
.as_ref()
.expect("AttemptReplace had None")
.lifetime()
.valid_after()
})
.unwrap_or(false)
};
if is_stale {
warn!("Got a new NetDir, but it's older than the one we currently have!");
return Err(Error::NetDirOlder);
}
let cfg = self.config.get();
let mut netdir = netdir.take().expect("AttemptReplace had None");
netdir.replace_overridden_parameters(&cfg.override_net_params);
self.netdir.replace(netdir);
self.events.publish(DirEvent::NewConsensus);
self.events.publish(DirEvent::NewDescriptors);
info!("Marked consensus usable.");
store.mark_consensus_usable(consensus_meta)?;
store.expire_all(&crate::storage::EXPIRATION_DEFAULTS)?;
Ok(())
}
NetDirChange::AddMicrodescs(mds) => {
self.netdir.mutate(|netdir| {
for md in mds.drain(..) {
netdir.add_microdesc(md);
}
Ok(())
})?;
self.events.publish(DirEvent::NewDescriptors);
Ok(())
}
}
} else {
Ok(())
}
}
}
#[derive(Debug, Copy, Clone)]
enum Readiness {
Complete,
Usable,
}
fn upgrade_weak_ref<T>(weak: &Weak<T>) -> Result<Arc<T>> {
Weak::upgrade(weak).ok_or(Error::ManagerDropped)
}
pub(crate) fn default_consensus_cutoff(
now: SystemTime,
tolerance: &DirTolerance,
) -> Result<SystemTime> {
const MIN_AGE_TO_ALLOW: Duration = Duration::from_secs(3 * 3600);
let allow_skew = std::cmp::max(MIN_AGE_TO_ALLOW, tolerance.post_valid_tolerance);
let cutoff = time::OffsetDateTime::from(now - allow_skew);
let (h, _m, _s) = cutoff.to_hms();
let cutoff = cutoff.replace_time(
time::Time::from_hms(h, 0, 0)
.map_err(tor_error::into_internal!("Failed clock calculation"))?,
);
let cutoff = cutoff + Duration::from_secs(3600);
Ok(cutoff.into())
}
#[cfg(test)]
mod test {
#![allow(clippy::unwrap_used)]
use super::*;
use crate::docmeta::{AuthCertMeta, ConsensusMeta};
use std::time::Duration;
use tempfile::TempDir;
use tor_basic_utils::test_rng::testing_rng;
use tor_netdoc::doc::netstatus::ConsensusFlavor;
use tor_netdoc::doc::{authcert::AuthCertKeyIds, netstatus::Lifetime};
use tor_rtcompat::SleepProvider;
pub(crate) fn new_mgr<R: Runtime>(runtime: R) -> (TempDir, DirMgr<R>) {
let dir = TempDir::new().unwrap();
let config = DirMgrConfig {
cache_path: dir.path().into(),
..Default::default()
};
let store = DirMgrStore::new(&config, runtime.clone(), false).unwrap();
let dirmgr = DirMgr::from_config(config, runtime, store, None, false).unwrap();
(dir, dirmgr)
}
#[test]
fn failing_accessors() {
tor_rtcompat::test_with_one_runtime!(|rt| async {
let (_tempdir, mgr) = new_mgr(rt);
assert!(mgr.circmgr().is_err());
assert!(mgr.netdir(Timeliness::Unchecked).is_err());
});
}
#[test]
fn load_and_store_internals() {
tor_rtcompat::test_with_one_runtime!(|rt| async {
let now = rt.wallclock();
let tomorrow = now + Duration::from_secs(86400);
let later = tomorrow + Duration::from_secs(86400);
let (_tempdir, mgr) = new_mgr(rt);
let d1 = [5_u8; 32];
let d2 = [7; 32];
let d3 = [42; 32];
let d4 = [99; 20];
let d5 = [12; 20];
let certid1 = AuthCertKeyIds {
id_fingerprint: d4.into(),
sk_fingerprint: d5.into(),
};
let certid2 = AuthCertKeyIds {
id_fingerprint: d5.into(),
sk_fingerprint: d4.into(),
};
{
let mut store = mgr.store.lock().unwrap();
store
.store_microdescs(
&[
("Fake micro 1", &d1),
("Fake micro 2", &d2),
("Fake micro 3", &d3),
],
now,
)
.unwrap();
#[cfg(feature = "routerdesc")]
store
.store_routerdescs(&[("Fake rd1", now, &d4), ("Fake rd2", now, &d5)])
.unwrap();
store
.store_authcerts(&[
(
AuthCertMeta::new(certid1, now, tomorrow),
"Fake certificate one",
),
(
AuthCertMeta::new(certid2, now, tomorrow),
"Fake certificate two",
),
])
.unwrap();
let cmeta = ConsensusMeta::new(
Lifetime::new(now, tomorrow, later).unwrap(),
[102; 32],
[103; 32],
);
store
.store_consensus(&cmeta, ConsensusFlavor::Microdesc, false, "Fake consensus!")
.unwrap();
}
let t1 = mgr.text(&DocId::Microdesc(d1)).unwrap().unwrap();
assert_eq!(t1.as_str(), Ok("Fake micro 1"));
let t2 = mgr
.text(&DocId::LatestConsensus {
flavor: ConsensusFlavor::Microdesc,
cache_usage: CacheUsage::CacheOkay,
})
.unwrap()
.unwrap();
assert_eq!(t2.as_str(), Ok("Fake consensus!"));
let t3 = mgr.text(&DocId::Microdesc([255; 32])).unwrap();
assert!(t3.is_none());
let d_bogus = DocId::Microdesc([255; 32]);
let res = mgr
.texts(vec![
DocId::Microdesc(d2),
DocId::Microdesc(d3),
d_bogus,
DocId::AuthCert(certid2),
#[cfg(feature = "routerdesc")]
DocId::RouterDesc(d5),
])
.unwrap();
assert_eq!(
res.get(&DocId::Microdesc(d2)).unwrap().as_str(),
Ok("Fake micro 2")
);
assert_eq!(
res.get(&DocId::Microdesc(d3)).unwrap().as_str(),
Ok("Fake micro 3")
);
assert!(res.get(&d_bogus).is_none());
assert_eq!(
res.get(&DocId::AuthCert(certid2)).unwrap().as_str(),
Ok("Fake certificate two")
);
#[cfg(feature = "routerdesc")]
assert_eq!(
res.get(&DocId::RouterDesc(d5)).unwrap().as_str(),
Ok("Fake rd2")
);
});
}
#[test]
fn make_consensus_request() {
tor_rtcompat::test_with_one_runtime!(|rt| async {
let now = rt.wallclock();
let tomorrow = now + Duration::from_secs(86400);
let later = tomorrow + Duration::from_secs(86400);
let (_tempdir, mgr) = new_mgr(rt);
let config = DirMgrConfig::default();
let req = {
let store = mgr.store.lock().unwrap();
bootstrap::make_consensus_request(
now,
ConsensusFlavor::Microdesc,
&**store,
&config,
)
.unwrap()
};
let tolerance = DirTolerance::default().post_valid_tolerance;
match req {
ClientRequest::Consensus(r) => {
assert_eq!(r.old_consensus_digests().count(), 0);
let date = r.last_consensus_date().unwrap();
assert!(date >= now - tolerance);
assert!(date <= now - tolerance + Duration::from_secs(3600));
}
_ => panic!("Wrong request type"),
}
let d_prev = [42; 32];
{
let mut store = mgr.store.lock().unwrap();
let cmeta = ConsensusMeta::new(
Lifetime::new(now, tomorrow, later).unwrap(),
d_prev,
[103; 32],
);
store
.store_consensus(&cmeta, ConsensusFlavor::Microdesc, false, "Fake consensus!")
.unwrap();
}
let req = {
let store = mgr.store.lock().unwrap();
bootstrap::make_consensus_request(
now,
ConsensusFlavor::Microdesc,
&**store,
&config,
)
.unwrap()
};
match req {
ClientRequest::Consensus(r) => {
let ds: Vec<_> = r.old_consensus_digests().collect();
assert_eq!(ds.len(), 1);
assert_eq!(ds[0], &d_prev);
assert_eq!(r.last_consensus_date(), Some(now));
}
_ => panic!("Wrong request type"),
}
});
}
#[test]
fn make_other_requests() {
tor_rtcompat::test_with_one_runtime!(|rt| async {
use rand::Rng;
let (_tempdir, mgr) = new_mgr(rt);
let certid1 = AuthCertKeyIds {
id_fingerprint: [99; 20].into(),
sk_fingerprint: [100; 20].into(),
};
let mut rng = testing_rng();
#[cfg(feature = "routerdesc")]
let rd_ids: Vec<DocId> = (0..1000).map(|_| DocId::RouterDesc(rng.gen())).collect();
let md_ids: Vec<DocId> = (0..1000).map(|_| DocId::Microdesc(rng.gen())).collect();
let config = DirMgrConfig::default();
let query = DocId::AuthCert(certid1);
let store = mgr.store.lock().unwrap();
let reqs =
bootstrap::make_requests_for_documents(&mgr.runtime, &[query], &**store, &config)
.unwrap();
assert_eq!(reqs.len(), 1);
let req = &reqs[0];
if let ClientRequest::AuthCert(r) = req {
assert_eq!(r.keys().next(), Some(&certid1));
} else {
panic!();
}
let reqs =
bootstrap::make_requests_for_documents(&mgr.runtime, &md_ids, &**store, &config)
.unwrap();
assert_eq!(reqs.len(), 2);
assert!(matches!(reqs[0], ClientRequest::Microdescs(_)));
#[cfg(feature = "routerdesc")]
{
let reqs = bootstrap::make_requests_for_documents(
&mgr.runtime,
&rd_ids,
&**store,
&config,
)
.unwrap();
assert_eq!(reqs.len(), 2);
assert!(matches!(reqs[0], ClientRequest::RouterDescs(_)));
}
});
}
#[test]
fn expand_response() {
tor_rtcompat::test_with_one_runtime!(|rt| async {
let now = rt.wallclock();
let day = Duration::from_secs(86400);
let config = DirMgrConfig::default();
let (_tempdir, mgr) = new_mgr(rt);
let q = DocId::Microdesc([99; 32]);
let r = {
let store = mgr.store.lock().unwrap();
bootstrap::make_requests_for_documents(&mgr.runtime, &[q], &**store, &config)
.unwrap()
};
let expanded = mgr.expand_response_text(&r[0], "ABC".to_string());
assert_eq!(&expanded.unwrap(), "ABC");
let latest_id = DocId::LatestConsensus {
flavor: ConsensusFlavor::Microdesc,
cache_usage: CacheUsage::CacheOkay,
};
let r = {
let store = mgr.store.lock().unwrap();
bootstrap::make_requests_for_documents(
&mgr.runtime,
&[latest_id],
&**store,
&config,
)
.unwrap()
};
let expanded = mgr.expand_response_text(&r[0], "DEF".to_string());
assert_eq!(&expanded.unwrap(), "DEF");
{
let mut store = mgr.store.lock().unwrap();
let d_in = [0x99; 32]; let cmeta = ConsensusMeta::new(
Lifetime::new(now, now + day, now + 2 * day).unwrap(),
d_in,
d_in,
);
store
.store_consensus(
&cmeta,
ConsensusFlavor::Microdesc,
false,
"line 1\nline2\nline 3\n",
)
.unwrap();
}
let r = {
let store = mgr.store.lock().unwrap();
bootstrap::make_requests_for_documents(
&mgr.runtime,
&[latest_id],
&**store,
&config,
)
.unwrap()
};
let expanded = mgr.expand_response_text(&r[0], "hello".to_string());
assert_eq!(&expanded.unwrap(), "hello");
let diff = "network-status-diff-version 1
hash 9999999999999999999999999999999999999999999999999999999999999999 8382374ca766873eb0d2530643191c6eaa2c5e04afa554cbac349b5d0592d300
2c
replacement line
.
".to_string();
let expanded = mgr.expand_response_text(&r[0], diff);
assert_eq!(expanded.unwrap(), "line 1\nreplacement line\nline 3\n");
let diff = "network-status-diff-version 1
hash 9999999999999999999999999999999999999999999999999999999999999999 9999999999999999999999999999999999999999999999999999999999999999
2c
replacement line
.
".to_string();
let expanded = mgr.expand_response_text(&r[0], diff);
assert!(expanded.is_err());
});
}
}