use rand::Rng;
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::mem;
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
use time::OffsetDateTime;
use tor_error::internal;
use tor_netdir::{MdReceiver, NetDir, PartialNetDir};
use tor_netdoc::doc::authcert::UncheckedAuthCert;
use tor_netdoc::doc::netstatus::Lifetime;
use tracing::{info, warn};
use crate::event::DirProgress;
use crate::storage::DynStore;
use crate::{
docmeta::{AuthCertMeta, ConsensusMeta},
event,
retry::DownloadSchedule,
CacheUsage, ClientRequest, DirMgrConfig, DocId, DocumentText, Error, Readiness, Result,
};
use crate::{DocSource, SharedMutArc};
use tor_checkable::{ExternallySigned, SelfSigned, Timebound};
use tor_llcrypto::pk::rsa::RsaIdentity;
use tor_netdoc::doc::{
microdesc::{MdDigest, Microdesc},
netstatus::MdConsensus,
};
use tor_netdoc::{
doc::{
authcert::{AuthCert, AuthCertKeyIds},
microdesc::MicrodescReader,
netstatus::{ConsensusFlavor, UnvalidatedMdConsensus},
},
AllowAnnotations,
};
use tor_rtcompat::Runtime;
#[derive(Debug)]
pub(crate) enum NetDirChange<'a> {
AttemptReplace {
netdir: &'a mut Option<NetDir>,
consensus_meta: &'a ConsensusMeta,
},
AddMicrodescs(&'a mut Vec<Microdesc>),
}
pub(crate) trait DirState: Send {
fn describe(&self) -> String;
fn missing_docs(&self) -> Vec<DocId>;
fn is_ready(&self, ready: Readiness) -> bool;
fn get_netdir_change(&mut self) -> Option<NetDirChange<'_>> {
None
}
fn can_advance(&self) -> bool;
fn add_from_cache(
&mut self,
docs: HashMap<DocId, DocumentText>,
changed: &mut bool,
) -> Result<()>;
fn add_from_download(
&mut self,
text: &str,
request: &ClientRequest,
source: DocSource,
storage: Option<&Mutex<DynStore>>,
changed: &mut bool,
) -> Result<()>;
fn bootstrap_progress(&self) -> event::DirProgress;
fn dl_config(&self) -> DownloadSchedule;
fn advance(self: Box<Self>) -> Box<dyn DirState>;
fn reset_time(&self) -> Option<SystemTime>;
fn reset(self: Box<Self>) -> Box<dyn DirState>;
}
pub(crate) trait PreviousNetDir: Send + Sync + 'static + Debug {
fn get_netdir(&self) -> Option<Arc<NetDir>>;
}
impl PreviousNetDir for SharedMutArc<NetDir> {
fn get_netdir(&self) -> Option<Arc<NetDir>> {
self.get()
}
}
#[derive(Clone, Debug)]
pub(crate) struct GetConsensusState<R: Runtime> {
cache_usage: CacheUsage,
after: Option<SystemTime>,
next: Option<GetCertsState<R>>,
authority_ids: Vec<RsaIdentity>,
rt: R,
config: Arc<DirMgrConfig>,
prev_netdir: Option<Arc<dyn PreviousNetDir>>,
#[cfg(feature = "dirfilter")]
filter: Arc<dyn crate::filter::DirFilter>,
}
impl<R: Runtime> GetConsensusState<R> {
pub(crate) fn new(
rt: R,
config: Arc<DirMgrConfig>,
cache_usage: CacheUsage,
prev_netdir: Option<Arc<dyn PreviousNetDir>>,
#[cfg(feature = "dirfilter")] filter: Arc<dyn crate::filter::DirFilter>,
) -> Self {
let authority_ids = config
.authorities()
.iter()
.map(|auth| auth.v3ident)
.collect();
let after = prev_netdir
.as_ref()
.and_then(|x| x.get_netdir())
.map(|nd| nd.lifetime().valid_after());
GetConsensusState {
cache_usage,
after,
next: None,
authority_ids,
rt,
config,
prev_netdir,
#[cfg(feature = "dirfilter")]
filter,
}
}
}
impl<R: Runtime> DirState for GetConsensusState<R> {
fn describe(&self) -> String {
if self.next.is_some() {
"About to fetch certificates."
} else {
match self.cache_usage {
CacheUsage::CacheOnly => "Looking for a cached consensus.",
CacheUsage::CacheOkay => "Looking for a consensus.",
CacheUsage::MustDownload => "Downloading a consensus.",
}
}
.to_string()
}
fn missing_docs(&self) -> Vec<DocId> {
if self.can_advance() {
return Vec::new();
}
let flavor = ConsensusFlavor::Microdesc;
vec![DocId::LatestConsensus {
flavor,
cache_usage: self.cache_usage,
}]
}
fn is_ready(&self, _ready: Readiness) -> bool {
false
}
fn can_advance(&self) -> bool {
self.next.is_some()
}
fn bootstrap_progress(&self) -> DirProgress {
if let Some(next) = &self.next {
next.bootstrap_progress()
} else {
DirProgress::NoConsensus { after: self.after }
}
}
fn dl_config(&self) -> DownloadSchedule {
self.config.schedule.retry_consensus
}
fn add_from_cache(
&mut self,
docs: HashMap<DocId, DocumentText>,
changed: &mut bool,
) -> Result<()> {
let text = match docs.into_iter().next() {
None => return Ok(()),
Some((
DocId::LatestConsensus {
flavor: ConsensusFlavor::Microdesc,
..
},
text,
)) => text,
_ => return Err(Error::CacheCorruption("Not an md consensus")),
};
let source = DocSource::LocalCache;
self.add_consensus_text(
source,
text.as_str().map_err(Error::BadUtf8InCache)?,
None,
changed,
)?;
Ok(())
}
fn add_from_download(
&mut self,
text: &str,
request: &ClientRequest,
source: DocSource,
storage: Option<&Mutex<DynStore>>,
changed: &mut bool,
) -> Result<()> {
let requested_newer_than = match request {
ClientRequest::Consensus(r) => r.last_consensus_date(),
_ => None,
};
let meta = self.add_consensus_text(source, text, requested_newer_than, changed)?;
if let Some(store) = storage {
let mut w = store.lock().expect("Directory storage lock poisoned");
w.store_consensus(meta, ConsensusFlavor::Microdesc, true, text)?;
}
Ok(())
}
fn advance(self: Box<Self>) -> Box<dyn DirState> {
match self.next {
Some(next) => Box::new(next),
None => self,
}
}
fn reset_time(&self) -> Option<SystemTime> {
None
}
fn reset(self: Box<Self>) -> Box<dyn DirState> {
self
}
}
impl<R: Runtime> GetConsensusState<R> {
fn add_consensus_text(
&mut self,
source: DocSource,
text: &str,
cutoff: Option<SystemTime>,
changed: &mut bool,
) -> Result<&ConsensusMeta> {
let (consensus_meta, unvalidated) = {
let (signedval, remainder, parsed) =
MdConsensus::parse(text).map_err(|e| Error::from_netdoc(source.clone(), e))?;
#[cfg(feature = "dirfilter")]
let parsed = self.filter.filter_consensus(parsed)?;
let parsed = self.config.tolerance.extend_tolerance(parsed);
let now = self.rt.wallclock();
let timely = parsed.check_valid_at(&now)?;
if let Some(cutoff) = cutoff {
if timely.peek_lifetime().valid_after() < cutoff {
return Err(Error::Unwanted("consensus was older than requested"));
}
}
let meta = ConsensusMeta::from_unvalidated(signedval, remainder, &timely);
(meta, timely)
};
let n_authorities = self.authority_ids.len() as u16;
let unvalidated = unvalidated.set_n_authorities(n_authorities);
let id_refs: Vec<_> = self.authority_ids.iter().collect();
if !unvalidated.authorities_are_correct(&id_refs[..]) {
return Err(Error::UnrecognizedAuthorities);
}
*changed = true;
let desired_certs = unvalidated
.signing_cert_ids()
.filter(|m| self.recognizes_authority(&m.id_fingerprint))
.collect();
self.next = Some(GetCertsState {
cache_usage: self.cache_usage,
consensus_source: source,
consensus: GetCertsConsensus::Unvalidated(unvalidated),
consensus_meta,
missing_certs: desired_certs,
certs: Vec::new(),
rt: self.rt.clone(),
config: self.config.clone(),
prev_netdir: self.prev_netdir.take(),
#[cfg(feature = "dirfilter")]
filter: self.filter.clone(),
});
#[allow(clippy::unwrap_used)]
Ok(&self.next.as_ref().unwrap().consensus_meta)
}
fn recognizes_authority(&self, id: &RsaIdentity) -> bool {
self.authority_ids.iter().any(|auth| auth == id)
}
}
#[derive(Clone, Debug)]
enum GetCertsConsensus {
Unvalidated(UnvalidatedMdConsensus),
Validated(MdConsensus),
Failed,
}
#[derive(Clone, Debug)]
struct GetCertsState<R: Runtime> {
cache_usage: CacheUsage,
consensus_source: DocSource,
consensus: GetCertsConsensus,
consensus_meta: ConsensusMeta,
missing_certs: HashSet<AuthCertKeyIds>,
certs: Vec<AuthCert>,
rt: R,
config: Arc<DirMgrConfig>,
prev_netdir: Option<Arc<dyn PreviousNetDir>>,
#[cfg(feature = "dirfilter")]
filter: Arc<dyn crate::filter::DirFilter>,
}
impl<R: Runtime> GetCertsState<R> {
fn check_parsed_certificate<'s>(
&self,
parsed: tor_netdoc::Result<UncheckedAuthCert>,
source: &DocSource,
within: &'s str,
) -> Result<(AuthCert, &'s str)> {
let parsed = parsed.map_err(|e| Error::from_netdoc(source.clone(), e))?;
let cert_text = parsed
.within(within)
.expect("Certificate was not in input as expected");
let wellsigned = parsed.check_signature()?;
let now = self.rt.wallclock();
let timely_cert = self
.config
.tolerance
.extend_tolerance(wellsigned)
.check_valid_at(&now)?;
Ok((timely_cert, cert_text))
}
fn try_checking_sigs(&mut self) -> Result<()> {
use GetCertsConsensus as C;
let mut consensus = C::Failed;
std::mem::swap(&mut consensus, &mut self.consensus);
let unvalidated = match consensus {
C::Unvalidated(uv) if uv.key_is_correct(&self.certs[..]).is_ok() => uv,
_ => {
self.consensus = consensus;
return Ok(());
}
};
let (new_consensus, outcome) = match unvalidated.check_signature(&self.certs[..]) {
Ok(validated) => (C::Validated(validated), Ok(())),
Err(cause) => (
C::Failed,
Err(Error::ConsensusInvalid {
source: self.consensus_source.clone(),
cause,
}),
),
};
self.consensus = new_consensus;
outcome
}
}
impl<R: Runtime> DirState for GetCertsState<R> {
fn describe(&self) -> String {
use GetCertsConsensus as C;
match &self.consensus {
C::Unvalidated(_) => {
let total = self.certs.len() + self.missing_certs.len();
format!(
"Downloading certificates for consensus (we are missing {}/{}).",
self.missing_certs.len(),
total
)
}
C::Validated(_) => "Validated consensus; about to get microdescriptors".to_string(),
C::Failed => "Failed to validate consensus".to_string(),
}
}
fn missing_docs(&self) -> Vec<DocId> {
self.missing_certs
.iter()
.map(|id| DocId::AuthCert(*id))
.collect()
}
fn is_ready(&self, _ready: Readiness) -> bool {
false
}
fn can_advance(&self) -> bool {
matches!(self.consensus, GetCertsConsensus::Validated(_))
}
fn bootstrap_progress(&self) -> DirProgress {
let n_certs = self.certs.len();
let n_missing_certs = self.missing_certs.len();
let total_certs = n_missing_certs + n_certs;
DirProgress::FetchingCerts {
lifetime: self.consensus_meta.lifetime().clone(),
usable_lifetime: self
.config
.tolerance
.extend_lifetime(self.consensus_meta.lifetime()),
n_certs: (n_certs as u16, total_certs as u16),
}
}
fn dl_config(&self) -> DownloadSchedule {
self.config.schedule.retry_certs
}
fn add_from_cache(
&mut self,
docs: HashMap<DocId, DocumentText>,
changed: &mut bool,
) -> Result<()> {
let source = DocSource::LocalCache;
let mut nonfatal_error = None;
for id in &self.missing_docs() {
if let Some(cert) = docs.get(id) {
let text = cert.as_str().map_err(Error::BadUtf8InCache)?;
let parsed = AuthCert::parse(text);
match self.check_parsed_certificate(parsed, &source, text) {
Ok((cert, _text)) => {
self.missing_certs.remove(cert.key_ids());
self.certs.push(cert);
*changed = true;
}
Err(e) => {
nonfatal_error.get_or_insert(e);
}
}
}
}
if *changed {
self.try_checking_sigs()?;
}
opt_err_to_result(nonfatal_error)
}
fn add_from_download(
&mut self,
text: &str,
request: &ClientRequest,
source: DocSource,
storage: Option<&Mutex<DynStore>>,
changed: &mut bool,
) -> Result<()> {
let asked_for: HashSet<_> = match request {
ClientRequest::AuthCert(a) => a.keys().collect(),
_ => return Err(internal!("expected an AuthCert request").into()),
};
let mut nonfatal_error = None;
let mut newcerts = Vec::new();
for cert in AuthCert::parse_multiple(text) {
match self.check_parsed_certificate(cert, &source, text) {
Ok((cert, cert_text)) => {
newcerts.push((cert, cert_text));
}
Err(e) => {
warn!("Problem with certificate received from {}: {}", &source, &e);
nonfatal_error.get_or_insert(e);
}
}
}
let len_orig = newcerts.len();
newcerts.retain(|(cert, _)| asked_for.contains(cert.key_ids()));
if newcerts.len() != len_orig {
warn!(
"Discarding certificates from {} that we didn't ask for.",
source
);
nonfatal_error.get_or_insert(Error::Unwanted("Certificate we didn't request"));
}
if newcerts.is_empty() {
return opt_err_to_result(nonfatal_error);
}
if let Some(store) = storage {
let v: Vec<_> = newcerts[..]
.iter()
.map(|(cert, s)| (AuthCertMeta::from_authcert(cert), *s))
.collect();
let mut w = store.lock().expect("Directory storage lock poisoned");
w.store_authcerts(&v[..])?;
}
for (cert, _) in newcerts {
let ids = cert.key_ids();
if self.missing_certs.contains(ids) {
self.missing_certs.remove(ids);
self.certs.push(cert);
*changed = true;
}
}
if *changed {
self.try_checking_sigs()?;
}
opt_err_to_result(nonfatal_error)
}
fn advance(self: Box<Self>) -> Box<dyn DirState> {
use GetCertsConsensus::*;
match self.consensus {
Validated(validated) => Box::new(GetMicrodescsState::new(
self.cache_usage,
validated,
self.consensus_meta,
self.rt,
self.config,
self.prev_netdir,
#[cfg(feature = "dirfilter")]
self.filter,
)),
_ => self,
}
}
fn reset_time(&self) -> Option<SystemTime> {
Some(
self.consensus_meta.lifetime().valid_until()
+ self.config.tolerance.post_valid_tolerance,
)
}
fn reset(self: Box<Self>) -> Box<dyn DirState> {
let cache_usage = if self.cache_usage == CacheUsage::CacheOnly {
CacheUsage::CacheOnly
} else {
CacheUsage::MustDownload
};
Box::new(GetConsensusState::new(
self.rt,
self.config,
cache_usage,
self.prev_netdir,
#[cfg(feature = "dirfilter")]
self.filter,
))
}
}
#[derive(Debug, Clone)]
struct GetMicrodescsState<R: Runtime> {
cache_usage: CacheUsage,
n_microdescs: usize,
partial: PendingNetDir,
meta: ConsensusMeta,
newly_listed: Vec<MdDigest>,
reset_time: SystemTime,
rt: R,
config: Arc<DirMgrConfig>,
prev_netdir: Option<Arc<dyn PreviousNetDir>>,
#[cfg(feature = "dirfilter")]
filter: Arc<dyn crate::filter::DirFilter>,
}
#[derive(Debug, Clone)]
enum PendingNetDir {
Partial(PartialNetDir),
Yielding {
netdir: Option<NetDir>,
collected_microdescs: Vec<Microdesc>,
missing_microdescs: HashSet<MdDigest>,
replace_dir_time: SystemTime,
},
Dummy,
}
impl MdReceiver for PendingNetDir {
fn missing_microdescs(&self) -> Box<dyn Iterator<Item = &MdDigest> + '_> {
match self {
PendingNetDir::Partial(partial) => partial.missing_microdescs(),
PendingNetDir::Yielding {
netdir,
missing_microdescs,
..
} => {
if let Some(nd) = netdir.as_ref() {
nd.missing_microdescs()
} else {
Box::new(missing_microdescs.iter())
}
}
PendingNetDir::Dummy => unreachable!(),
}
}
fn add_microdesc(&mut self, md: Microdesc) -> bool {
match self {
PendingNetDir::Partial(partial) => partial.add_microdesc(md),
PendingNetDir::Yielding {
netdir,
missing_microdescs,
collected_microdescs,
..
} => {
let wanted = missing_microdescs.remove(md.digest());
if let Some(nd) = netdir.as_mut() {
let nd_wanted = nd.add_microdesc(md);
debug_assert_eq!(wanted, nd_wanted);
nd_wanted
} else {
collected_microdescs.push(md);
wanted
}
}
PendingNetDir::Dummy => unreachable!(),
}
}
fn n_missing(&self) -> usize {
match self {
PendingNetDir::Partial(partial) => partial.n_missing(),
PendingNetDir::Yielding {
netdir,
missing_microdescs,
..
} => {
if let Some(nd) = netdir.as_ref() {
debug_assert_eq!(nd.n_missing(), missing_microdescs.len());
nd.n_missing()
} else {
missing_microdescs.len()
}
}
PendingNetDir::Dummy => unreachable!(),
}
}
}
impl PendingNetDir {
fn upgrade_if_necessary(&mut self) {
if matches!(self, PendingNetDir::Partial(..)) {
match mem::replace(self, PendingNetDir::Dummy) {
PendingNetDir::Partial(p) => match p.unwrap_if_sufficient() {
Ok(nd) => {
let missing = nd.missing_microdescs().copied().collect();
let replace_dir_time = pick_download_time(nd.lifetime());
*self = PendingNetDir::Yielding {
netdir: Some(nd),
collected_microdescs: vec![],
missing_microdescs: missing,
replace_dir_time,
};
}
Err(p) => {
*self = PendingNetDir::Partial(p);
}
},
_ => unreachable!(),
}
}
assert!(!matches!(self, PendingNetDir::Dummy));
}
}
impl<R: Runtime> GetMicrodescsState<R> {
fn new(
cache_usage: CacheUsage,
consensus: MdConsensus,
meta: ConsensusMeta,
rt: R,
config: Arc<DirMgrConfig>,
prev_netdir: Option<Arc<dyn PreviousNetDir>>,
#[cfg(feature = "dirfilter")] filter: Arc<dyn crate::filter::DirFilter>,
) -> Self {
let reset_time = consensus.lifetime().valid_until() + config.tolerance.post_valid_tolerance;
let n_microdescs = consensus.relays().len();
let params = &config.override_net_params;
let mut partial_dir = PartialNetDir::new(consensus, Some(params));
if let Some(old_dir) = prev_netdir.as_ref().and_then(|x| x.get_netdir()) {
partial_dir.fill_from_previous_netdir(&old_dir);
}
GetMicrodescsState {
cache_usage,
n_microdescs,
partial: PendingNetDir::Partial(partial_dir),
meta,
newly_listed: Vec::new(),
reset_time,
rt,
config,
prev_netdir,
#[cfg(feature = "dirfilter")]
filter,
}
}
fn register_microdescs<I>(&mut self, mds: I, _source: &DocSource, changed: &mut bool)
where
I: IntoIterator<Item = Microdesc>,
{
#[cfg(feature = "dirfilter")]
let mds: Vec<Microdesc> = mds
.into_iter()
.filter_map(|m| self.filter.filter_md(m).ok())
.collect();
let is_partial = matches!(self.partial, PendingNetDir::Partial(..));
for md in mds {
if is_partial {
self.newly_listed.push(*md.digest());
}
self.partial.add_microdesc(md);
*changed = true;
}
self.partial.upgrade_if_necessary();
}
}
impl<R: Runtime> DirState for GetMicrodescsState<R> {
fn describe(&self) -> String {
format!(
"Downloading microdescriptors (we are missing {}).",
self.partial.n_missing()
)
}
fn missing_docs(&self) -> Vec<DocId> {
self.partial
.missing_microdescs()
.map(|d| DocId::Microdesc(*d))
.collect()
}
fn get_netdir_change(&mut self) -> Option<NetDirChange<'_>> {
match self.partial {
PendingNetDir::Yielding {
ref mut netdir,
ref mut collected_microdescs,
..
} => {
if netdir.is_some() {
Some(NetDirChange::AttemptReplace {
netdir,
consensus_meta: &self.meta,
})
} else {
collected_microdescs
.is_empty()
.then(move || NetDirChange::AddMicrodescs(collected_microdescs))
}
}
_ => None,
}
}
fn is_ready(&self, ready: Readiness) -> bool {
match ready {
Readiness::Complete => self.partial.n_missing() == 0,
Readiness::Usable => {
matches!(self.partial, PendingNetDir::Yielding { ref netdir, .. } if netdir.is_none())
}
}
}
fn can_advance(&self) -> bool {
false
}
fn bootstrap_progress(&self) -> DirProgress {
let n_present = self.n_microdescs - self.partial.n_missing();
DirProgress::Validated {
lifetime: self.meta.lifetime().clone(),
usable_lifetime: self.config.tolerance.extend_lifetime(self.meta.lifetime()),
n_mds: (n_present as u32, self.n_microdescs as u32),
usable: self.is_ready(Readiness::Usable),
}
}
fn dl_config(&self) -> DownloadSchedule {
self.config.schedule.retry_microdescs
}
fn add_from_cache(
&mut self,
docs: HashMap<DocId, DocumentText>,
changed: &mut bool,
) -> Result<()> {
let mut microdescs = Vec::new();
for (id, text) in docs {
if let DocId::Microdesc(digest) = id {
if let Ok(md) = Microdesc::parse(text.as_str().map_err(Error::BadUtf8InCache)?) {
if md.digest() == &digest {
microdescs.push(md);
continue;
}
}
warn!("Found a mismatched microdescriptor in cache; ignoring");
}
}
self.register_microdescs(microdescs, &DocSource::LocalCache, changed);
Ok(())
}
fn add_from_download(
&mut self,
text: &str,
request: &ClientRequest,
source: DocSource,
storage: Option<&Mutex<DynStore>>,
changed: &mut bool,
) -> Result<()> {
let requested: HashSet<_> = if let ClientRequest::Microdescs(req) = request {
req.digests().collect()
} else {
return Err(internal!("expected a microdesc request").into());
};
let mut new_mds = Vec::new();
let mut nonfatal_err = None;
for anno in MicrodescReader::new(text, &AllowAnnotations::AnnotationsNotAllowed) {
let anno = match anno {
Err(e) => {
nonfatal_err.get_or_insert_with(|| Error::from_netdoc(source.clone(), e));
continue;
}
Ok(a) => a,
};
let txt = anno
.within(text)
.expect("microdesc not from within text as expected");
let md = anno.into_microdesc();
if !requested.contains(md.digest()) {
warn!(
"Received microdescriptor from {} we did not ask for: {:?}",
source,
md.digest()
);
nonfatal_err.get_or_insert(Error::Unwanted("un-requested microdescriptor"));
continue;
}
new_mds.push((txt, md));
}
let mark_listed = self.meta.lifetime().valid_after();
if let Some(store) = storage {
let mut s = store
.lock()
.expect("Directory storage lock poisoned");
if !self.newly_listed.is_empty() {
s.update_microdescs_listed(&self.newly_listed, mark_listed)?;
self.newly_listed.clear();
}
if !new_mds.is_empty() {
s.store_microdescs(
&new_mds
.iter()
.map(|(text, md)| (*text, md.digest()))
.collect::<Vec<_>>(),
mark_listed,
)?;
}
}
self.register_microdescs(new_mds.into_iter().map(|(_, md)| md), &source, changed);
opt_err_to_result(nonfatal_err)
}
fn advance(self: Box<Self>) -> Box<dyn DirState> {
self
}
fn reset_time(&self) -> Option<SystemTime> {
Some(match self.partial {
PendingNetDir::Yielding {
replace_dir_time,
netdir: None,
..
} => replace_dir_time,
_ => self.reset_time,
})
}
fn reset(self: Box<Self>) -> Box<dyn DirState> {
let cache_usage = if self.cache_usage == CacheUsage::CacheOnly {
CacheUsage::CacheOnly
} else if self.is_ready(Readiness::Usable) {
CacheUsage::MustDownload
} else {
CacheUsage::CacheOkay
};
Box::new(GetConsensusState::new(
self.rt,
self.config,
cache_usage,
self.prev_netdir,
#[cfg(feature = "dirfilter")]
self.filter,
))
}
}
fn pick_download_time(lifetime: &Lifetime) -> SystemTime {
let (lowbound, uncertainty) = client_download_range(lifetime);
let zero = Duration::new(0, 0);
let t = lowbound + rand::thread_rng().gen_range(zero..uncertainty);
info!("The current consensus is fresh until {}, and valid until {}. I've picked {} as the earliest time to replace it.",
OffsetDateTime::from(lifetime.fresh_until()),
OffsetDateTime::from(lifetime.valid_until()),
OffsetDateTime::from(t));
t
}
fn client_download_range(lt: &Lifetime) -> (SystemTime, Duration) {
let valid_after = lt.valid_after();
let fresh_until = lt.fresh_until();
let valid_until = lt.valid_until();
let voting_interval = fresh_until
.duration_since(valid_after)
.expect("valid-after must precede fresh-until");
let whole_lifetime = valid_until
.duration_since(valid_after)
.expect("valid-after must precede valid-until");
let lowbound = voting_interval + (voting_interval * 3) / 4;
let remainder = whole_lifetime - lowbound;
let uncertainty = (remainder * 7) / 8;
(valid_after + lowbound, uncertainty)
}
fn opt_err_to_result(e: Option<Error>) -> Result<()> {
match e {
Some(e) => Err(e),
None => Ok(()),
}
}
#[derive(Clone, Debug)]
pub(crate) struct PoisonedState;
impl DirState for PoisonedState {
fn describe(&self) -> String {
unimplemented!()
}
fn missing_docs(&self) -> Vec<DocId> {
unimplemented!()
}
fn is_ready(&self, _ready: Readiness) -> bool {
unimplemented!()
}
fn can_advance(&self) -> bool {
unimplemented!()
}
fn add_from_cache(
&mut self,
_docs: HashMap<DocId, DocumentText>,
_changed: &mut bool,
) -> Result<()> {
unimplemented!()
}
fn add_from_download(
&mut self,
_text: &str,
_request: &ClientRequest,
_source: DocSource,
_storage: Option<&Mutex<DynStore>>,
_changed: &mut bool,
) -> Result<()> {
unimplemented!()
}
fn bootstrap_progress(&self) -> event::DirProgress {
unimplemented!()
}
fn dl_config(&self) -> DownloadSchedule {
unimplemented!()
}
fn advance(self: Box<Self>) -> Box<dyn DirState> {
unimplemented!()
}
fn reset_time(&self) -> Option<SystemTime> {
unimplemented!()
}
fn reset(self: Box<Self>) -> Box<dyn DirState> {
unimplemented!()
}
}
#[cfg(test)]
mod test {
#![allow(clippy::unwrap_used)]
#![allow(clippy::cognitive_complexity)]
use super::*;
use crate::{Authority, AuthorityBuilder, DownloadScheduleConfig};
use std::convert::TryInto;
use std::sync::Arc;
use tempfile::TempDir;
use time::macros::datetime;
use tor_netdoc::doc::authcert::AuthCertKeyIds;
use tor_rtcompat::CompoundRuntime;
use tor_rtmock::time::MockSleepProvider;
#[test]
fn download_schedule() {
let va = datetime!(2008-08-02 20:00 UTC).into();
let fu = datetime!(2008-08-02 21:00 UTC).into();
let vu = datetime!(2008-08-02 23:00 UTC).into();
let lifetime = Lifetime::new(va, fu, vu).unwrap();
let expected_start: SystemTime = datetime!(2008-08-02 21:45 UTC).into();
let expected_range = Duration::from_millis((75 * 60 * 1000) * 7 / 8);
let (start, range) = client_download_range(&lifetime);
assert_eq!(start, expected_start);
assert_eq!(range, expected_range);
for _ in 0..100 {
let when = pick_download_time(&lifetime);
assert!(when > va);
assert!(when >= expected_start);
assert!(when < vu);
assert!(when <= expected_start + range);
}
}
fn temp_store() -> (TempDir, Mutex<DynStore>) {
let tempdir = TempDir::new().unwrap();
let store = crate::storage::SqliteStore::from_path_and_mistrust(
tempdir.path(),
&fs_mistrust::Mistrust::new_dangerously_trust_everyone(),
false,
)
.unwrap();
(tempdir, Mutex::new(Box::new(store)))
}
fn make_time_shifted_runtime(now: SystemTime, rt: impl Runtime) -> impl Runtime {
let msp = MockSleepProvider::new(now);
CompoundRuntime::new(rt.clone(), msp, rt.clone(), rt.clone(), rt)
}
fn make_dirmgr_config(authorities: Option<Vec<AuthorityBuilder>>) -> Arc<DirMgrConfig> {
let mut netcfg = crate::NetworkConfig::builder();
netcfg.set_fallback_caches(vec![]);
if let Some(a) = authorities {
netcfg.set_authorities(a);
}
let cfg = DirMgrConfig {
cache_path: "/we_will_never_use_this/".into(),
network: netcfg.build().unwrap(),
..Default::default()
};
Arc::new(cfg)
}
const CONSENSUS: &str = include_str!("../testdata/mdconsensus1.txt");
const CONSENSUS2: &str = include_str!("../testdata/mdconsensus2.txt");
const AUTHCERT_5696: &str = include_str!("../testdata/cert-5696.txt");
const AUTHCERT_5A23: &str = include_str!("../testdata/cert-5A23.txt");
#[allow(unused)]
const AUTHCERT_7C47: &str = include_str!("../testdata/cert-7C47.txt");
fn test_time() -> SystemTime {
datetime!(2020-08-07 12:42:45 UTC).into()
}
fn rsa(s: &str) -> RsaIdentity {
RsaIdentity::from_hex(s).unwrap()
}
fn test_authorities() -> Vec<AuthorityBuilder> {
fn a(s: &str) -> AuthorityBuilder {
Authority::builder().name("ignore").v3ident(rsa(s)).clone()
}
vec![
a("5696AB38CB3852AFA476A5C07B2D4788963D5567"),
a("5A23BA701776C9C1AB1C06E734E92AB3D5350D64"),
]
}
fn authcert_id_5696() -> AuthCertKeyIds {
AuthCertKeyIds {
id_fingerprint: rsa("5696ab38cb3852afa476a5c07b2d4788963d5567"),
sk_fingerprint: rsa("f6ed4aa64d83caede34e19693a7fcf331aae8a6a"),
}
}
fn authcert_id_5a23() -> AuthCertKeyIds {
AuthCertKeyIds {
id_fingerprint: rsa("5a23ba701776c9c1ab1c06e734e92ab3d5350d64"),
sk_fingerprint: rsa("d08e965cc6dcb6cb6ed776db43e616e93af61177"),
}
}
fn authcert_id_7c47() -> AuthCertKeyIds {
AuthCertKeyIds {
id_fingerprint: rsa("7C47DCB4A90E2C2B7C7AD27BD641D038CF5D7EBE"),
sk_fingerprint: rsa("D3C013E0E6C82E246090D1C0798B75FCB7ACF120"),
}
}
fn microdescs() -> HashMap<MdDigest, String> {
const MICRODESCS: &str = include_str!("../testdata/microdescs.txt");
let text = MICRODESCS;
MicrodescReader::new(text, &AllowAnnotations::AnnotationsNotAllowed)
.map(|res| {
let anno = res.unwrap();
let text = anno.within(text).unwrap();
let md = anno.into_microdesc();
(*md.digest(), text.to_owned())
})
.collect()
}
#[test]
fn get_consensus_state() {
tor_rtcompat::test_with_one_runtime!(|rt| async move {
let rt = make_time_shifted_runtime(test_time(), rt);
let cfg = make_dirmgr_config(None);
let (_tempdir, store) = temp_store();
let mut state = GetConsensusState::new(
rt.clone(),
cfg,
CacheUsage::CacheOkay,
None,
#[cfg(feature = "dirfilter")]
Arc::new(crate::filter::NilFilter),
);
assert_eq!(&state.describe(), "Looking for a consensus.");
assert!(!state.can_advance());
assert!(!state.is_ready(Readiness::Complete));
assert!(!state.is_ready(Readiness::Usable));
assert!(state.reset_time().is_none());
assert_eq!(
state.bootstrap_progress().to_string(),
"fetching a consensus"
);
let retry = state.dl_config();
assert_eq!(retry, DownloadScheduleConfig::default().retry_consensus);
let docs = state.missing_docs();
assert_eq!(docs.len(), 1);
let docid = docs[0];
assert!(matches!(
docid,
DocId::LatestConsensus {
flavor: ConsensusFlavor::Microdesc,
cache_usage: CacheUsage::CacheOkay,
}
));
let source = DocSource::DirServer { source: None };
let req = tor_dirclient::request::ConsensusRequest::new(ConsensusFlavor::Microdesc);
let req = crate::docid::ClientRequest::Consensus(req);
let mut changed = false;
let outcome = state.add_from_download(
"this isn't a consensus",
&req,
source.clone(),
Some(&store),
&mut changed,
);
assert!(matches!(outcome, Err(Error::NetDocError { .. })));
assert!(!changed);
assert!(store
.lock()
.unwrap()
.latest_consensus(ConsensusFlavor::Microdesc, None)
.unwrap()
.is_none());
let mut changed = false;
let outcome = state.add_from_download(
CONSENSUS,
&req,
source.clone(),
Some(&store),
&mut changed,
);
assert!(matches!(outcome, Err(Error::UnrecognizedAuthorities)));
assert!(!changed);
assert!(store
.lock()
.unwrap()
.latest_consensus(ConsensusFlavor::Microdesc, None)
.unwrap()
.is_none());
let cfg = make_dirmgr_config(Some(test_authorities()));
let mut state = GetConsensusState::new(
rt.clone(),
cfg,
CacheUsage::CacheOkay,
None,
#[cfg(feature = "dirfilter")]
Arc::new(crate::filter::NilFilter),
);
let mut changed = false;
let outcome =
state.add_from_download(CONSENSUS, &req, source, Some(&store), &mut changed);
assert!(outcome.is_ok());
assert!(changed);
assert!(store
.lock()
.unwrap()
.latest_consensus(ConsensusFlavor::Microdesc, None)
.unwrap()
.is_some());
assert!(state.can_advance());
assert_eq!(&state.describe(), "About to fetch certificates.");
assert_eq!(state.missing_docs(), Vec::new());
let next = Box::new(state).advance();
assert_eq!(
&next.describe(),
"Downloading certificates for consensus (we are missing 2/2)."
);
let cfg = make_dirmgr_config(Some(test_authorities()));
let mut state = GetConsensusState::new(
rt,
cfg,
CacheUsage::CacheOkay,
None,
#[cfg(feature = "dirfilter")]
Arc::new(crate::filter::NilFilter),
);
let text: crate::storage::InputString = CONSENSUS.to_owned().into();
let map = vec![(docid, text.into())].into_iter().collect();
let mut changed = false;
let outcome = state.add_from_cache(map, &mut changed);
assert!(outcome.is_ok());
assert!(changed);
assert!(state.can_advance());
});
}
#[test]
fn get_certs_state() {
tor_rtcompat::test_with_one_runtime!(|rt| async move {
fn new_getcerts_state(rt: impl Runtime) -> Box<dyn DirState> {
let rt = make_time_shifted_runtime(test_time(), rt);
let cfg = make_dirmgr_config(Some(test_authorities()));
let mut state = GetConsensusState::new(
rt,
cfg,
CacheUsage::CacheOkay,
None,
#[cfg(feature = "dirfilter")]
Arc::new(crate::filter::NilFilter),
);
let source = DocSource::DirServer { source: None };
let req = tor_dirclient::request::ConsensusRequest::new(ConsensusFlavor::Microdesc);
let req = crate::docid::ClientRequest::Consensus(req);
let mut changed = false;
let outcome = state.add_from_download(CONSENSUS, &req, source, None, &mut changed);
assert!(outcome.is_ok());
Box::new(state).advance()
}
let (_tempdir, store) = temp_store();
let mut state = new_getcerts_state(rt.clone());
assert_eq!(
&state.describe(),
"Downloading certificates for consensus (we are missing 2/2)."
);
assert!(!state.can_advance());
assert!(!state.is_ready(Readiness::Complete));
assert!(!state.is_ready(Readiness::Usable));
let consensus_expires: SystemTime = datetime!(2020-08-07 12:43:20 UTC).into();
let post_valid_tolerance = crate::DirTolerance::default().post_valid_tolerance;
assert_eq!(
state.reset_time(),
Some(consensus_expires + post_valid_tolerance)
);
let retry = state.dl_config();
assert_eq!(retry, DownloadScheduleConfig::default().retry_certs);
assert_eq!(
state.bootstrap_progress().to_string(),
"fetching authority certificates (0/2)"
);
let missing = state.missing_docs();
assert_eq!(missing.len(), 2); assert!(missing.contains(&DocId::AuthCert(authcert_id_5696())));
assert!(missing.contains(&DocId::AuthCert(authcert_id_5a23())));
assert!(!missing.contains(&DocId::AuthCert(authcert_id_7c47())));
let text1: crate::storage::InputString = AUTHCERT_5696.to_owned().into();
let docs = vec![(DocId::AuthCert(authcert_id_5696()), text1.into())]
.into_iter()
.collect();
let mut changed = false;
let outcome = state.add_from_cache(docs, &mut changed);
assert!(changed);
assert!(outcome.is_ok()); assert!(!state.can_advance()); let missing = state.missing_docs();
assert_eq!(missing.len(), 1); assert!(missing.contains(&DocId::AuthCert(authcert_id_5a23())));
assert_eq!(
state.bootstrap_progress().to_string(),
"fetching authority certificates (1/2)"
);
let source = DocSource::DirServer { source: None };
let mut req = tor_dirclient::request::AuthCertRequest::new();
req.push(authcert_id_5696()); let req = ClientRequest::AuthCert(req);
let mut changed = false;
let outcome = state.add_from_download(
AUTHCERT_5A23,
&req,
source.clone(),
Some(&store),
&mut changed,
);
assert!(matches!(outcome, Err(Error::Unwanted(_))));
assert!(!changed);
let missing2 = state.missing_docs();
assert_eq!(missing, missing2); assert!(store
.lock()
.unwrap()
.authcerts(&[authcert_id_5a23()])
.unwrap()
.is_empty());
let mut req = tor_dirclient::request::AuthCertRequest::new();
req.push(authcert_id_5a23()); let req = ClientRequest::AuthCert(req);
let mut changed = false;
let outcome =
state.add_from_download(AUTHCERT_5A23, &req, source, Some(&store), &mut changed);
assert!(outcome.is_ok()); assert!(changed);
let missing3 = state.missing_docs();
assert!(missing3.is_empty());
assert!(state.can_advance());
assert!(!store
.lock()
.unwrap()
.authcerts(&[authcert_id_5a23()])
.unwrap()
.is_empty());
let next = state.advance();
assert_eq!(
&next.describe(),
"Downloading microdescriptors (we are missing 6)."
);
let state = new_getcerts_state(rt);
let state = state.reset();
assert_eq!(&state.describe(), "Downloading a consensus.");
});
}
#[test]
fn get_microdescs_state() {
tor_rtcompat::test_with_one_runtime!(|rt| async move {
fn new_getmicrodescs_state(rt: impl Runtime) -> GetMicrodescsState<impl Runtime> {
let rt = make_time_shifted_runtime(test_time(), rt);
let cfg = make_dirmgr_config(Some(test_authorities()));
let (signed, rest, consensus) = MdConsensus::parse(CONSENSUS2).unwrap();
let consensus = consensus
.dangerously_assume_timely()
.dangerously_assume_wellsigned();
let meta = ConsensusMeta::from_consensus(signed, rest, &consensus);
GetMicrodescsState::new(
CacheUsage::CacheOkay,
consensus,
meta,
rt,
cfg,
None,
#[cfg(feature = "dirfilter")]
Arc::new(crate::filter::NilFilter),
)
}
fn d64(s: &str) -> MdDigest {
base64::decode(s).unwrap().try_into().unwrap()
}
let state = new_getmicrodescs_state(rt.clone());
let state = Box::new(state).reset();
assert_eq!(&state.describe(), "Looking for a consensus.");
let mut state = new_getmicrodescs_state(rt.clone());
assert_eq!(
&state.describe(),
"Downloading microdescriptors (we are missing 4)."
);
assert!(!state.can_advance());
assert!(!state.is_ready(Readiness::Complete));
assert!(!state.is_ready(Readiness::Usable));
{
let reset_time = state.reset_time().unwrap();
let fresh_until: SystemTime = datetime!(2021-10-27 21:27:00 UTC).into();
let valid_until: SystemTime = datetime!(2021-10-27 21:27:20 UTC).into();
assert!(reset_time >= fresh_until);
assert!(reset_time <= valid_until + state.config.tolerance.post_valid_tolerance);
}
let retry = state.dl_config();
assert_eq!(retry, DownloadScheduleConfig::default().retry_microdescs);
assert_eq!(
state.bootstrap_progress().to_string(),
"fetching microdescriptors (0/4)"
);
let missing = state.missing_docs();
let md_text = microdescs();
assert_eq!(missing.len(), 4);
assert_eq!(md_text.len(), 4);
let md1 = d64("LOXRj8YZP0kwpEAsYOvBZWZWGoWv5b/Bp2Mz2Us8d8g");
let md2 = d64("iOhVp33NyZxMRDMHsVNq575rkpRViIJ9LN9yn++nPG0");
let md3 = d64("/Cd07b3Bl0K0jX2/1cAvsYXJJMi5d8UBU+oWKaLxoGo");
let md4 = d64("z+oOlR7Ga6cg9OoC/A3D3Ey9Rtc4OldhKlpQblMfQKo");
for md_digest in [md1, md2, md3, md4] {
assert!(missing.contains(&DocId::Microdesc(md_digest)));
assert!(md_text.contains_key(&md_digest));
}
let (_tempdir, store) = temp_store();
let doc1: crate::storage::InputString = md_text.get(&md1).unwrap().clone().into();
let docs = vec![(DocId::Microdesc(md1), doc1.into())]
.into_iter()
.collect();
let mut changed = false;
let outcome = state.add_from_cache(docs, &mut changed);
assert!(outcome.is_ok()); assert!(changed);
assert!(!state.can_advance());
assert!(!state.is_ready(Readiness::Complete));
assert!(!state.is_ready(Readiness::Usable));
let missing = state.missing_docs();
assert_eq!(missing.len(), 3);
assert!(!missing.contains(&DocId::Microdesc(md1)));
assert_eq!(
state.bootstrap_progress().to_string(),
"fetching microdescriptors (1/4)"
);
let mut req = tor_dirclient::request::MicrodescRequest::new();
let mut response = "".to_owned();
for md_digest in [md2, md3, md4] {
response.push_str(md_text.get(&md_digest).unwrap());
req.push(md_digest);
}
let req = ClientRequest::Microdescs(req);
let source = DocSource::DirServer { source: None };
let mut changed = false;
let outcome = state.add_from_download(
response.as_str(),
&req,
source,
Some(&store),
&mut changed,
);
assert!(outcome.is_ok()); assert!(changed);
match state.get_netdir_change().unwrap() {
NetDirChange::AttemptReplace { netdir, .. } => {
assert!(netdir.take().is_some());
}
x => panic!("wrong netdir change: {:?}", x),
}
assert!(state.is_ready(Readiness::Complete));
assert!(state.is_ready(Readiness::Usable));
assert_eq!(
store
.lock()
.unwrap()
.microdescs(&[md2, md3, md4])
.unwrap()
.len(),
3
);
let missing = state.missing_docs();
assert!(missing.is_empty());
});
}
}