//! `tor-dirmgr`: Code to fetch, store, and update Tor directory information.
//!
//! # Overview
//!
//! This crate is part of
//! [Arti](https://gitlab.torproject.org/tpo/core/arti/), a project to
//! implement [Tor](https://www.torproject.org/) in Rust.
//!
//! In its current design, Tor requires a set of up-to-date
//! authenticated directory documents in order to build multi-hop
//! anonymized circuits through the network.
//!
//! This directory manager crate is responsible for figuring out which
//! directory information we lack, downloading what we're missing, and
//! keeping a cache of it on disk.
//!
//! # Compile-time features
//!
//! `mmap` (default) -- Use memory mapping to reduce the memory load for
//! reading large directory objects from disk.
//!
//! `static` -- Try to link with a static copy of sqlite3.
//!
//! `routerdesc` -- (Incomplete) support for downloading and storing
//! router descriptors.
#![deny(missing_docs)]
#![warn(noop_method_call)]
#![deny(unreachable_pub)]
#![deny(clippy::await_holding_lock)]
#![deny(clippy::cargo_common_metadata)]
#![deny(clippy::cast_lossless)]
#![deny(clippy::checked_conversions)]
#![warn(clippy::clone_on_ref_ptr)]
#![warn(clippy::cognitive_complexity)]
#![deny(clippy::debug_assert_with_mut_call)]
#![deny(clippy::exhaustive_enums)]
#![deny(clippy::exhaustive_structs)]
#![deny(clippy::expl_impl_clone_on_copy)]
#![deny(clippy::fallible_impl_from)]
#![deny(clippy::implicit_clone)]
#![deny(clippy::large_stack_arrays)]
#![warn(clippy::manual_ok_or)]
#![deny(clippy::missing_docs_in_private_items)]
#![deny(clippy::missing_panics_doc)]
#![warn(clippy::needless_borrow)]
#![warn(clippy::needless_pass_by_value)]
#![warn(clippy::option_option)]
#![warn(clippy::rc_buffer)]
#![deny(clippy::ref_option_ref)]
#![warn(clippy::semicolon_if_nothing_returned)]
#![warn(clippy::trait_duplication_in_bounds)]
#![deny(clippy::unnecessary_wraps)]
#![warn(clippy::unseparated_literal_suffix)]
#![deny(clippy::unwrap_used)]
pub mod authority;
mod bootstrap;
mod config;
mod docid;
mod docmeta;
mod err;
mod event;
mod retry;
mod shared_ref;
mod state;
mod storage;
use crate::docid::{CacheUsage, ClientRequest, DocQuery};
use crate::shared_ref::SharedMutArc;
use crate::storage::sqlite::SqliteStore;
pub use retry::DownloadSchedule;
use tor_circmgr::CircMgr;
use tor_netdir::NetDir;
use tor_netdoc::doc::netstatus::ConsensusFlavor;
use futures::{channel::oneshot, task::SpawnExt};
use tor_rtcompat::{Runtime, SleepProviderExt};
use tracing::{info, trace, warn};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::{collections::HashMap, sync::Weak};
use std::{fmt::Debug, time::SystemTime};
pub use authority::{Authority, AuthorityBuilder};
pub use config::{
DirMgrConfig, DirMgrConfigBuilder, DownloadScheduleConfig, DownloadScheduleConfigBuilder,
NetworkConfig, NetworkConfigBuilder,
};
pub use docid::DocId;
pub use err::Error;
pub use event::DirEvent;
pub use storage::DocumentText;
pub use tor_netdir::fallback::{FallbackDir, FallbackDirBuilder};
/// A Result as returned by this crate.
pub type Result<T> = std::result::Result<T, Error>;
/// A directory manager to download, fetch, and cache a Tor directory.
///
/// A DirMgr can operate in three modes:
/// * In **offline** mode, it only reads from the cache, and can
/// only read once.
/// * In **read-only** mode, it reads from the cache, but checks
/// whether it can acquire an associated lock file. If it can, then
/// it enters read-write mode. If not, it checks the cache
/// periodically for new information.
/// * In **read-write** mode, it knows that no other process will be
/// writing to the cache, and it takes responsibility for fetching
/// data from the network and updating the directory with new
/// directory information.
pub struct DirMgr<R: Runtime> {
/// Configuration information: where to find directories, how to
/// validate them, and so on.
config: DirMgrConfig,
/// Handle to our sqlite cache.
// XXXX I'd like to use an rwlock, but that's not feasible, since
// rusqlite::Connection isn't Sync.
store: Mutex<SqliteStore>,
/// Our latest sufficiently bootstrapped directory, if we have one.
///
/// We use the RwLock so that we can give this out to a bunch of other
/// users, and replace it once a new directory is bootstrapped.
netdir: SharedMutArc<NetDir>,
/// A flag that gets set whenever the _consensus_ part of `netdir` has
/// changed.
netdir_consensus_changed: AtomicBool,
/// A flag that gets set whenever the _descriptors_ part of `netdir` has
/// changed without adding a new consensus.
netdir_descriptors_changed: AtomicBool,
/// A publisher handle, used to inform others about changes in the
/// status of this directory handle.
publisher: event::Publisher,
/// A circuit manager, if this DirMgr supports downloading.
circmgr: Option<Arc<CircMgr<R>>>,
/// Our asynchronous runtime.
runtime: R,
}
impl<R: Runtime> DirMgr<R> {
/// Try to load the directory from disk, without launching any
/// kind of update process.
///
/// This function runs in **offline** mode: it will give an error
/// if the result is not up-to-date, or not fully downloaded.
///
/// In general, you shouldn't use this function in a long-running
/// program; it's only suitable for command-line or batch tools.
// TODO: I wish this function didn't have to be async or take a runtime.
pub async fn load_once(runtime: R, config: DirMgrConfig) -> Result<Arc<NetDir>> {
let dirmgr = Arc::new(Self::from_config(config, runtime, None, true)?);
// TODO: add some way to return a directory that isn't up-to-date
let _success = dirmgr.load_directory().await?;
dirmgr.opt_netdir().ok_or(Error::DirectoryNotPresent)
}
/// Return a current netdir, either loading it or bootstrapping it
/// as needed.
///
/// Like load_once, but will try to bootstrap (or wait for another
/// process to bootstrap) if we don't have an up-to-date
/// bootstrapped directory.
///
/// In general, you shouldn't use this function in a long-running
/// program; it's only suitable for command-line or batch tools.
pub async fn load_or_bootstrap_once(
config: DirMgrConfig,
runtime: R,
circmgr: Arc<CircMgr<R>>,
) -> Result<Arc<NetDir>> {
let dirmgr = DirMgr::bootstrap_from_config(config, runtime, circmgr).await?;
Ok(dirmgr.netdir())
}
/// Return a new directory manager from a given configuration,
/// bootstrapping from the network as necessary.
///
/// This function will to return until the directory is
/// bootstrapped enough to build circuits. It will also launch a
/// background task that fetches any missing information, and that
/// replaces the directory when a new one is available.
pub async fn bootstrap_from_config(
config: DirMgrConfig,
runtime: R,
circmgr: Arc<CircMgr<R>>,
) -> Result<Arc<Self>> {
let dirmgr = Arc::new(DirMgr::from_config(
config,
runtime.clone(),
Some(circmgr),
false,
)?);
// Try to load from the cache.
let have_directory = dirmgr.load_directory().await?;
let (mut sender, receiver) = if have_directory {
info!("Loaded a good directory from cache.");
(None, None)
} else {
info!("Didn't get usable directory from cache.");
let (sender, receiver) = oneshot::channel();
(Some(sender), Some(receiver))
};
// Whether we loaded or not, we now start downloading.
let dirmgr_weak = Arc::downgrade(&dirmgr);
runtime.spawn(async move {
// NOTE: This is a daemon task. It should eventually get
// treated as one.
// TODO: don't warn when these are Error::ManagerDropped: that
// means that the DirMgr has been shut down.
if let Err(e) = Self::reload_until_owner(&dirmgr_weak, &mut sender).await {
warn!("Unrecovered error while waiting for bootstrap: {}", e);
} else if let Err(e) = Self::download_forever(dirmgr_weak, sender).await {
warn!("Unrecovered error while downloading: {}", e);
}
})?;
if let Some(receiver) = receiver {
match receiver.await {
Ok(()) => {
info!("We have enough information to build circuits.");
}
Err(_) => {
warn!("Bootstrapping task exited before finishing.");
return Err(Error::CantAdvanceState);
}
}
}
Ok(dirmgr)
}
/// Try forever to either lock the storage (and thereby become the
/// owner), or to reload the database.
///
/// If we have begin to have a bootstrapped directory, send a
/// message using `on_complete`.
///
/// If we eventually become the owner, return Ok().
async fn reload_until_owner(
weak: &Weak<Self>,
on_complete: &mut Option<oneshot::Sender<()>>,
) -> Result<()> {
let mut logged = false;
let mut bootstrapped;
let runtime;
{
let dirmgr = upgrade_weak_ref(weak)?;
runtime = dirmgr.runtime.clone();
bootstrapped = dirmgr.netdir.get().is_some();
}
loop {
{
let dirmgr = upgrade_weak_ref(weak)?;
trace!("Trying to take ownership of the directory cache lock");
if dirmgr.try_upgrade_to_readwrite()? {
// We now own the lock! (Maybe we owned it before; the
// upgrade_to_readwrite() function is idempotent.) We can
// do our own bootstrapping.
if logged {
info!("The previous owning process has given up the lock. We are now in charge of managing the directory.");
}
return Ok(());
}
}
if !logged {
logged = true;
if bootstrapped {
info!("Another process is managing the directory. We'll use its cache.");
} else {
info!("Another process is bootstrapping the directory. Waiting till it finishes or exits.");
}
}
// We don't own the lock. Somebody else owns the cache. They
// should be updating it. Wait a bit, then try again.
let pause = if bootstrapped {
std::time::Duration::new(120, 0)
} else {
std::time::Duration::new(5, 0)
};
runtime.sleep(pause).await;
// TODO: instead of loading the whole thing we should have a
// database entry that says when the last update was, or use
// our state functions.
{
let dirmgr = upgrade_weak_ref(weak)?;
trace!("Trying to load from the directory cache");
if dirmgr.load_directory().await? {
// Successfully loaded a bootstrapped directory.
if let Some(send_done) = on_complete.take() {
let _ = send_done.send(());
}
if !bootstrapped {
info!("The directory is now bootstrapped.");
}
bootstrapped = true;
}
}
}
}
/// Try to fetch our directory info and keep it updated, indefinitely.
///
/// If we have begin to have a bootstrapped directory, send a
/// message using `on_complete`.
async fn download_forever(
weak: Weak<Self>,
mut on_complete: Option<oneshot::Sender<()>>,
) -> Result<()> {
let mut state: Box<dyn DirState> = Box::new(state::GetConsensusState::new(
Weak::clone(&weak),
CacheUsage::CacheOkay,
)?);
let (retry_config, runtime) = {
let dirmgr = upgrade_weak_ref(&weak)?;
(
*dirmgr.config.schedule().retry_bootstrap(),
dirmgr.runtime.clone(),
)
};
loop {
let mut usable = false;
let mut retry_delay = retry_config.schedule();
'retry_attempt: for _ in retry_config.attempts() {
let (newstate, recoverable_err) =
bootstrap::download(Weak::clone(&weak), state, &mut on_complete).await?;
state = newstate;
if let Some(err) = recoverable_err {
if state.is_ready(Readiness::Usable) {
usable = true;
info!("Unable to completely download a directory: {}. Nevertheless, the directory is usable, so we'll pause for now.", err);
break 'retry_attempt;
}
let delay = retry_delay.next_delay(&mut rand::thread_rng());
warn!(
"Unable to download a usable directory: {}. We will restart in {:?}.",
err, delay
);
runtime.sleep(delay).await;
state = state.reset()?;
} else {
info!("Directory is complete.");
usable = true;
break 'retry_attempt;
}
}
if !usable {
// we ran out of attempts.
warn!(
"We failed {} times to bootstrap a directory. We're going to give up.",
retry_config.n_attempts()
);
return Err(Error::CantAdvanceState);
} else {
// Report success, if appropriate.
if let Some(send_done) = on_complete.take() {
let _ = send_done.send(());
}
}
let reset_at = state.reset_time();
match reset_at {
Some(t) => runtime.sleep_until_wallclock(t).await,
None => return Ok(()),
}
state = state.reset()?;
}
}
/// Get a reference to the circuit manager, if we have one.
fn circmgr(&self) -> Result<Arc<CircMgr<R>>> {
self.circmgr
.as_ref()
.map(Arc::clone)
.ok_or(Error::NoDownloadSupport)
}
/// Try to make this a directory manager with read-write access to its
/// storage.
///
/// Return true if we got the lock, or if we already had it.
///
/// Return false if another process has the lock
fn try_upgrade_to_readwrite(&self) -> Result<bool> {
self.store
.lock()
.expect("Directory storage lock poisoned")
.upgrade_to_readwrite()
}
/// Return a reference to the store, if it is currently read-write.
fn store_if_rw(&self) -> Option<&Mutex<SqliteStore>> {
let rw = !self
.store
.lock()
.expect("Directory storage lock poisoned")
.is_readonly();
// A race-condition is possible here, but I believe it's harmless.
if rw {
Some(&self.store)
} else {
None
}
}
/// Construct a DirMgr from a DirMgrConfig.
fn from_config(
config: DirMgrConfig,
runtime: R,
circmgr: Option<Arc<CircMgr<R>>>,
readonly: bool,
) -> Result<Self> {
let store = Mutex::new(config.open_sqlite_store(readonly)?);
let netdir = SharedMutArc::new();
let netdir_consensus_changed = AtomicBool::new(false);
let netdir_descriptors_changed = AtomicBool::new(false);
let publisher = event::Publisher::new();
Ok(DirMgr {
config,
store,
netdir,
netdir_consensus_changed,
netdir_descriptors_changed,
publisher,
circmgr,
runtime,
})
}
/// Load the latest non-pending non-expired directory from the
/// cache, if it is newer than the one we have.
///
/// Return false if there is no such consensus.
async fn load_directory(self: &Arc<Self>) -> Result<bool> {
let state = state::GetConsensusState::new(Arc::downgrade(self), CacheUsage::CacheOnly)?;
let _ = bootstrap::load(Arc::clone(self), Box::new(state)).await?;
Ok(self.netdir.get().is_some())
}
/// Return an Arc handle to our latest directory, if we have one.
///
/// This is a private method, since by the time anybody else has a
/// handle to a DirMgr, the NetDir should definitely be
/// bootstrapped.
fn opt_netdir(&self) -> Option<Arc<NetDir>> {
self.netdir.get()
}
/// Return an Arc handle to our latest directory, if we have one.
// TODO: Add variants of this that make sure that it's up-to-date?
pub fn netdir(&self) -> Arc<NetDir> {
self.opt_netdir().expect("DirMgr was not bootstrapped!")
}
/// Return a new asynchronous stream about events taking place with
/// this directory manager.
///
/// The caller must regularly process events from this stream to
/// prevent it from blocking.
pub fn events(&self) -> impl futures::Stream<Item = DirEvent> {
self.publisher.subscribe()
}
/// Try to load the text of a single document described by `doc` from
/// storage.
pub fn text(&self, doc: &DocId) -> Result<Option<DocumentText>> {
use itertools::Itertools;
let mut result = HashMap::new();
let query = (*doc).into();
self.load_documents_into(&query, &mut result)?;
let item = result.into_iter().at_most_one().map_err(|_| {
Error::CacheCorruption("Found more than one entry in storage for given docid")
})?;
if let Some((docid, doctext)) = item {
if &docid != doc {
return Err(Error::CacheCorruption(
"Item from storage had incorrect docid.",
));
}
Ok(Some(doctext))
} else {
Ok(None)
}
}
/// Load the text for a collection of documents.
///
/// If many of the documents have the same type, this can be more
/// efficient than calling [`text`](Self::text).
pub fn texts<T>(&self, docs: T) -> Result<HashMap<DocId, DocumentText>>
where
T: IntoIterator<Item = DocId>,
{
let partitioned = docid::partition_by_type(docs);
let mut result = HashMap::new();
for (_, query) in partitioned.into_iter() {
self.load_documents_into(&query, &mut result)?;
}
Ok(result)
}
/// If the consensus has changed, notify any subscribers.
// TODO: I don't like all the different places in `bootstrap`
// where we have to call this function. Can we simplify it or
// clean it up somehow? Maybe we can build some kind of intelligence into
// shared_ref?
pub(crate) async fn notify(&self) {
if self.netdir_consensus_changed.swap(false, Ordering::SeqCst) {
self.publisher.send(DirEvent::NewConsensus).await;
}
if self
.netdir_descriptors_changed
.swap(false, Ordering::SeqCst)
{
self.publisher.send(DirEvent::NewDescriptors).await;
}
}
/// Load all the documents for a single DocumentQuery from the store.
fn load_documents_into(
&self,
query: &DocQuery,
result: &mut HashMap<DocId, DocumentText>,
) -> Result<()> {
use DocQuery::*;
let store = self.store.lock().expect("Directory storage lock poisoned");
match query {
LatestConsensus {
flavor,
cache_usage,
} => {
if *cache_usage == CacheUsage::MustDownload {
// Do nothing: we don't want a cached consensus.
trace!("MustDownload is set; not checking for cached consensus.");
} else if let Some(c) =
store.latest_consensus(*flavor, cache_usage.pending_requirement())?
{
trace!("Found a reasonable consensus in the cache");
let id = DocId::LatestConsensus {
flavor: *flavor,
cache_usage: *cache_usage,
};
result.insert(id, c.into());
}
}
AuthCert(ids) => result.extend(
store
.authcerts(ids)?
.into_iter()
.map(|(id, c)| (DocId::AuthCert(id), DocumentText::from_string(c))),
),
Microdesc(digests) => {
result.extend(
store
.microdescs(digests)?
.into_iter()
.map(|(id, md)| (DocId::Microdesc(id), DocumentText::from_string(md))),
);
}
#[cfg(feature = "routerdesc")]
RouterDesc(digests) => result.extend(
store
.routerdescs(digests)?
.into_iter()
.map(|(id, rd)| (DocId::RouterDesc(id), DocumentText::from_string(rd))),
),
}
Ok(())
}
/// Convert a DocQuery into a set of ClientRequests, suitable for sending
/// to a directory cache.
///
/// This conversion has to be a function of the dirmgr, since it may
/// require knowledge about our current state.
fn query_into_requests(&self, q: DocQuery) -> Result<Vec<ClientRequest>> {
let mut res = Vec::new();
for q in q.split_for_download() {
match q {
DocQuery::LatestConsensus { flavor, .. } => {
res.push(self.make_consensus_request(flavor)?);
}
DocQuery::AuthCert(ids) => {
res.push(ClientRequest::AuthCert(ids.into_iter().collect()));
}
DocQuery::Microdesc(ids) => {
res.push(ClientRequest::Microdescs(ids.into_iter().collect()));
}
#[cfg(feature = "routerdesc")]
DocQuery::RouterDesc(ids) => {
res.push(ClientRequest::RouterDescs(ids.into_iter().collect()));
}
}
}
Ok(res)
}
/// Construct an appropriate ClientRequest to download a consensus
/// of the given flavor.
fn make_consensus_request(&self, flavor: ConsensusFlavor) -> Result<ClientRequest> {
#![allow(clippy::unnecessary_wraps)]
let mut request = tor_dirclient::request::ConsensusRequest::new(flavor);
let r = self.store.lock().expect("Directory storage lock poisoned");
match r.latest_consensus_meta(flavor) {
Ok(Some(meta)) => {
request.set_last_consensus_date(meta.lifetime().valid_after());
request.push_old_consensus_digest(*meta.sha3_256_of_signed());
}
Ok(None) => {}
Err(e) => {
warn!("Error loading directory metadata: {}", e);
}
}
Ok(ClientRequest::Consensus(request))
}
/// Given a request we sent and the response we got from a
/// directory server, see whether we should expand that response
/// into "something larger".
///
/// Currently, this handles expanding consensus diffs, and nothing
/// else. We do it at this stage of our downloading operation
/// because it requires access to the store.
fn expand_response_text(&self, req: &ClientRequest, text: String) -> Result<String> {
if let ClientRequest::Consensus(req) = req {
if tor_consdiff::looks_like_diff(&text) {
if let Some(old_d) = req.old_consensus_digests().next() {
let db_val = {
let s = self.store.lock().expect("Directory storage lock poisoned");
s.consensus_by_sha3_digest_of_signed_part(old_d)?
};
if let Some((old_consensus, meta)) = db_val {
info!("Applying a consensus diff");
let new_consensus = tor_consdiff::apply_diff(
old_consensus.as_str()?,
&text,
Some(*meta.sha3_256_of_signed()),
)?;
new_consensus.check_digest()?;
return Ok(new_consensus.to_string());
}
}
return Err(Error::Unwanted(
"Received a consensus diff we did not ask for",
));
}
}
Ok(text)
}
}
/// A degree of readiness for a given directory state object.
#[derive(Debug, Copy, Clone)]
enum Readiness {
/// There is no more information to download.
Complete,
/// There is more information to download, but we don't need to
Usable,
}
/// A "state" object used to represent our progress in downloading a
/// directory.
///
/// These state objects are not meant to know about the network, or
/// how to fetch documents at all. Instead, they keep track of what
/// information they are missing, and what to do when they get that
/// information.
///
/// Every state object has two possible transitions: "resetting", and
/// "advancing". Advancing happens when a state has no more work to
/// do, and needs to transform into a different kind of object.
/// Resetting happens when this state needs to go back to an initial
/// state in order to start over -- either because of an error or
/// because the information it has downloaded is no longer timely.
trait DirState: Send {
/// Return a human-readable description of this state.
fn describe(&self) -> String;
/// Return a list of the documents we're missing.
///
/// If every document on this list were to be loaded or downloaded, then
/// the state should either become "ready to advance", or "complete."
///
/// This list should never _grow_ on a given state; only advancing
/// or resetting the state should add new DocIds that weren't
/// there before.
fn missing_docs(&self) -> Vec<DocId>;
/// Describe whether this state has reached `ready` status.
fn is_ready(&self, ready: Readiness) -> bool;
/// Return true if this state can advance to another state via its
/// `advance` method.
fn can_advance(&self) -> bool;
/// Add one or more documents from our cache; returns 'true' if there
/// was any change in this state.
///
/// If `storage` is provided, then we should write any state changes into
/// it. (We don't read from it in this method.)
fn add_from_cache(
&mut self,
docs: HashMap<DocId, DocumentText>,
storage: Option<&Mutex<SqliteStore>>,
) -> Result<bool>;
/// Add information that we have just downloaded to this state; returns
/// 'true' if there as any change in this state.
///
/// This method receives a copy of the original request, and
/// should reject any documents that do not pertain to it.
///
/// If `storage` is provided, then we should write any accepted documents
/// into `storage` so they can be saved in a cache.
// TODO: It might be good to say "there was a change but also an
// error" in this API if possible.
// TODO: It would be better to not have this function be async,
// once the `must_not_suspend` lint is stable.
// TODO: this should take a "DirSource" too.
fn add_from_download(
&mut self,
text: &str,
request: &ClientRequest,
storage: Option<&Mutex<SqliteStore>>,
) -> Result<bool>;
/// Return a configuration for attempting downloads.
fn dl_config(&self) -> Result<DownloadSchedule>;
/// If possible, advance to the next state.
fn advance(self: Box<Self>) -> Result<Box<dyn DirState>>;
/// Return a time (if any) when downloaders should stop attempting to
/// advance this state, and should instead reset it and start over.
fn reset_time(&self) -> Option<SystemTime>;
/// Reset this state and start over.
fn reset(self: Box<Self>) -> Result<Box<dyn DirState>>;
}
/// Try to upgrade a weak reference to a DirMgr, and give an error on
/// failure.
fn upgrade_weak_ref<T>(weak: &Weak<T>) -> Result<Arc<T>> {
Weak::upgrade(weak).ok_or(Error::ManagerDropped)
}
#[cfg(test)]
mod test {
#![allow(clippy::unwrap_used)]
use super::*;
use crate::docmeta::{AuthCertMeta, ConsensusMeta};
use std::time::Duration;
use tempfile::TempDir;
use tor_netdoc::doc::{authcert::AuthCertKeyIds, netstatus::Lifetime};
pub(crate) fn new_mgr<R: Runtime>(runtime: R) -> (TempDir, DirMgr<R>) {
let dir = TempDir::new().unwrap();
let config = DirMgrConfig::builder()
.cache_path(dir.path())
.build()
.unwrap();
let dirmgr = DirMgr::from_config(config, runtime, None, false).unwrap();
(dir, dirmgr)
}
#[test]
fn failing_accessors() {
tor_rtcompat::test_with_one_runtime!(|rt| async {
let (_tempdir, mgr) = new_mgr(rt);
assert!(mgr.circmgr().is_err());
assert!(mgr.opt_netdir().is_none());
});
}
#[test]
fn load_and_store_internals() {
tor_rtcompat::test_with_one_runtime!(|rt| async {
let (_tempdir, mgr) = new_mgr(rt);
let now = SystemTime::now();
let tomorrow = now + Duration::from_secs(86400);
let later = tomorrow + Duration::from_secs(86400);
// Seed the storage with a bunch of junk.
let d1 = [5_u8; 32];
let d2 = [7; 32];
let d3 = [42; 32];
let d4 = [99; 20];
let d5 = [12; 20];
let certid1 = AuthCertKeyIds {
id_fingerprint: d4.into(),
sk_fingerprint: d5.into(),
};
let certid2 = AuthCertKeyIds {
id_fingerprint: d5.into(),
sk_fingerprint: d4.into(),
};
{
let mut store = mgr.store.lock().unwrap();
store
.store_microdescs(
vec![
("Fake micro 1", &d1),
("Fake micro 2", &d2),
("Fake micro 3", &d3),
],
now,
)
.unwrap();
#[cfg(feature = "routerdesc")]
store
.store_routerdescs(vec![("Fake rd1", now, &d4), ("Fake rd2", now, &d5)])
.unwrap();
store
.store_authcerts(&[
(
AuthCertMeta::new(certid1, now, tomorrow),
"Fake certificate one",
),
(
AuthCertMeta::new(certid2, now, tomorrow),
"Fake certificate two",
),
])
.unwrap();
let cmeta = ConsensusMeta::new(
Lifetime::new(now, tomorrow, later).unwrap(),
[102; 32],
[103; 32],
);
store
.store_consensus(&cmeta, ConsensusFlavor::Microdesc, false, "Fake consensus!")
.unwrap();
}
// Try to get it with text().
let t1 = mgr.text(&DocId::Microdesc(d1)).unwrap().unwrap();
assert_eq!(t1.as_str(), Ok("Fake micro 1"));
let t2 = mgr
.text(&DocId::LatestConsensus {
flavor: ConsensusFlavor::Microdesc,
cache_usage: CacheUsage::CacheOkay,
})
.unwrap()
.unwrap();
assert_eq!(t2.as_str(), Ok("Fake consensus!"));
let t3 = mgr.text(&DocId::Microdesc([255; 32])).unwrap();
assert!(t3.is_none());
// Now try texts()
let d_bogus = DocId::Microdesc([255; 32]);
let res = mgr
.texts(vec![
DocId::Microdesc(d2),
DocId::Microdesc(d3),
d_bogus,
DocId::AuthCert(certid2),
#[cfg(feature = "routerdesc")]
DocId::RouterDesc(d5),
])
.unwrap();
assert_eq!(
res.get(&DocId::Microdesc(d2)).unwrap().as_str(),
Ok("Fake micro 2")
);
assert_eq!(
res.get(&DocId::Microdesc(d3)).unwrap().as_str(),
Ok("Fake micro 3")
);
assert!(res.get(&d_bogus).is_none());
assert_eq!(
res.get(&DocId::AuthCert(certid2)).unwrap().as_str(),
Ok("Fake certificate two")
);
#[cfg(feature = "routerdesc")]
assert_eq!(
res.get(&DocId::RouterDesc(d5)).unwrap().as_str(),
Ok("Fake rd2")
);
});
}
#[test]
fn make_consensus_request() {
tor_rtcompat::test_with_one_runtime!(|rt| async {
let (_tempdir, mgr) = new_mgr(rt);
let now = SystemTime::now();
let tomorrow = now + Duration::from_secs(86400);
let later = tomorrow + Duration::from_secs(86400);
// Try with an empty store.
let req = mgr
.make_consensus_request(ConsensusFlavor::Microdesc)
.unwrap();
match req {
ClientRequest::Consensus(r) => {
assert_eq!(r.old_consensus_digests().count(), 0);
assert_eq!(r.last_consensus_date(), None);
}
_ => panic!("Wrong request type"),
}
// Add a fake consensus record.
let d_prev = [42; 32];
{
let mut store = mgr.store.lock().unwrap();
let cmeta = ConsensusMeta::new(
Lifetime::new(now, tomorrow, later).unwrap(),
d_prev,
[103; 32],
);
store
.store_consensus(&cmeta, ConsensusFlavor::Microdesc, false, "Fake consensus!")
.unwrap();
}
// Now try again.
let req = mgr
.make_consensus_request(ConsensusFlavor::Microdesc)
.unwrap();
match req {
ClientRequest::Consensus(r) => {
let ds: Vec<_> = r.old_consensus_digests().collect();
assert_eq!(ds.len(), 1);
assert_eq!(ds[0], &d_prev);
assert_eq!(r.last_consensus_date(), Some(now));
}
_ => panic!("Wrong request type"),
}
});
}
#[test]
fn make_other_requests() {
tor_rtcompat::test_with_one_runtime!(|rt| async {
use rand::Rng;
let (_tempdir, mgr) = new_mgr(rt);
let certid1 = AuthCertKeyIds {
id_fingerprint: [99; 20].into(),
sk_fingerprint: [100; 20].into(),
};
let mut rng = rand::thread_rng();
#[cfg(feature = "routerdesc")]
let rd_ids: Vec<[u8; 20]> = (0..1000).map(|_| rng.gen()).collect();
let md_ids: Vec<[u8; 32]> = (0..1000).map(|_| rng.gen()).collect();
// Try an authcert.
let query = DocQuery::AuthCert(vec![certid1]);
let reqs = mgr.query_into_requests(query).unwrap();
assert_eq!(reqs.len(), 1);
let req = &reqs[0];
if let ClientRequest::AuthCert(r) = req {
assert_eq!(r.keys().next(), Some(&certid1));
} else {
panic!();
}
// Try a bunch of mds.
let query = DocQuery::Microdesc(md_ids);
let reqs = mgr.query_into_requests(query).unwrap();
assert_eq!(reqs.len(), 2);
assert!(matches!(reqs[0], ClientRequest::Microdescs(_)));
// Try a bunch of rds.
#[cfg(feature = "routerdesc")]
{
let query = DocQuery::RouterDesc(rd_ids);
let reqs = mgr.query_into_requests(query).unwrap();
assert_eq!(reqs.len(), 2);
assert!(matches!(reqs[0], ClientRequest::RouterDescs(_)));
}
});
}
#[test]
fn expand_response() {
tor_rtcompat::test_with_one_runtime!(|rt| async {
let (_tempdir, mgr) = new_mgr(rt);
// Try a simple request: nothing should happen.
let q = DocId::Microdesc([99; 32]).into();
let r = &mgr.query_into_requests(q).unwrap()[0];
let expanded = mgr.expand_response_text(r, "ABC".to_string());
assert_eq!(&expanded.unwrap(), "ABC");
// Try a consensus response that doesn't look like a diff in
// response to a query that doesn't ask for one.
let latest_id = DocId::LatestConsensus {
flavor: ConsensusFlavor::Microdesc,
cache_usage: CacheUsage::CacheOkay,
};
let q: DocQuery = latest_id.into();
let r = &mgr.query_into_requests(q.clone()).unwrap()[0];
let expanded = mgr.expand_response_text(r, "DEF".to_string());
assert_eq!(&expanded.unwrap(), "DEF");
// Now stick some metadata and a string into the storage so that
// we can ask for a diff.
{
let mut store = mgr.store.lock().unwrap();
let now = SystemTime::now();
let day = Duration::from_secs(86400);
let d_in = [0x99; 32]; // This one, we can fake.
let cmeta = ConsensusMeta::new(
Lifetime::new(now, now + day, now + 2 * day).unwrap(),
d_in,
d_in,
);
store
.store_consensus(
&cmeta,
ConsensusFlavor::Microdesc,
false,
"line 1\nline2\nline 3\n",
)
.unwrap();
}
// Try expanding something that isn't a consensus, even if we'd like
// one.
let r = &mgr.query_into_requests(q).unwrap()[0];
let expanded = mgr.expand_response_text(r, "hello".to_string());
assert_eq!(&expanded.unwrap(), "hello");
// Finally, try "expanding" a diff (by applying it and checking the digest.
let diff = "network-status-diff-version 1
hash 9999999999999999999999999999999999999999999999999999999999999999 8382374ca766873eb0d2530643191c6eaa2c5e04afa554cbac349b5d0592d300
2c
replacement line
.
".to_string();
let expanded = mgr.expand_response_text(r, diff);
assert_eq!(expanded.unwrap(), "line 1\nreplacement line\nline 3\n");
// If the digest is wrong, that should get rejected.
let diff = "network-status-diff-version 1
hash 9999999999999999999999999999999999999999999999999999999999999999 9999999999999999999999999999999999999999999999999999999999999999
2c
replacement line
.
".to_string();
let expanded = mgr.expand_response_text(r, diff);
assert!(expanded.is_err());
});
}
}