//! `BridgeDescMgr` - downloads and caches bridges' router descriptors
use std::borrow::Cow;
use std::cmp::Ordering;
use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
use std::fmt::{self, Debug, Display};
use std::num::NonZeroU8;
use std::ops;
use std::panic::AssertUnwindSafe;
use std::sync::{Arc, Mutex, MutexGuard, Weak};
use std::time::{Duration, Instant, SystemTime};
use async_trait::async_trait;
use derive_more::{Deref, DerefMut};
use educe::Educe;
use futures::future;
use futures::select;
use futures::stream::{BoxStream, StreamExt};
use futures::task::{SpawnError, SpawnExt as _};
use futures::FutureExt;
use tracing::{debug, error, info, trace};
use safelog::sensitive;
use tor_basic_utils::retry::RetryDelay;
use tor_basic_utils::BinaryHeapExt as _;
use tor_checkable::{SelfSigned, Timebound};
use tor_circmgr::CircMgr;
use tor_error::{internal, ErrorKind, HasKind};
use tor_error::{AbsRetryTime, HasRetryTime, RetryTime};
use tor_guardmgr::bridge::{BridgeConfig, BridgeDesc};
use tor_guardmgr::bridge::{BridgeDescError, BridgeDescEvent, BridgeDescList, BridgeDescProvider};
use tor_netdoc::doc::routerdesc::RouterDesc;
use tor_rtcompat::Runtime;
use crate::event::FlagPublisher;
use crate::storage::CachedBridgeDescriptor;
use crate::{DirMgrStore, DynStore};
#[cfg(test)]
mod bdtest;
/// The key we use in all our data structures
///
/// This type saves typing and would make it easier to change the bridge descriptor manager
/// to take and handle another way of identifying the bridges it is working with.
type BridgeKey = BridgeConfig;
/// Active vs dormant state, as far as the bridge descriptor manager is concerned
///
/// This is usually derived in higher layers from `arti_client::DormantMode`,
/// whether `TorClient::bootstrap()` has been called, etc.
#[non_exhaustive]
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
// TODO: These proliferating `Dormancy` enums should be centralised and unified with `TaskHandle`
// https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/845#note_2853190
pub enum Dormancy {
/// Dormant (inactive)
///
/// Bridge descriptor downloads, or refreshes, will not be started.
///
/// In-progress downloads will be stopped if possible,
/// but they may continue until they complete (or fail).
// TODO async task cancellation: actually cancel these in this case
///
/// So a dormant BridgeDescManager may still continue to
/// change the return value from [`bridges()`](BridgeDescProvider::bridges)
/// and continue to report [`BridgeDescEvent`]s.
///
/// When the BridgeDescManager is dormant,
/// `bridges()` may return stale descriptors
/// (that is, descriptors which ought to have been refetched and may no longer be valid),
/// or stale errors
/// (that is, errors which occurred some time ago,
/// and which would normally have been retried by now).
Dormant,
/// Active
///
/// Bridge descriptors will be downloaded as requested.
///
/// When a bridge descriptor manager has been `Dormant`,
/// it may continue to provide stale data (as described)
/// for a while after it is made `Active`,
/// until the required refreshes and retries have taken place (or failed).
Active,
}
/// **Downloader and cache for bridges' router descriptors**
///
/// This is a handle which is cheap to clone and has internal mutability.
#[derive(Clone)]
pub struct BridgeDescMgr<R: Runtime, M = ()>
where
M: Mockable<R>,
{
/// The actual manager
///
/// We have the `Arc` in here, rather than in our callers, because this
/// makes the API nicer for them, and also because some of our tasks
/// want a handle they can use to relock and modify the state.
mgr: Arc<Manager<R, M>>,
}
/// Configuration for the `BridgeDescMgr`
///
/// Currently, the only way to make this is via its `Default` impl.
// TODO: there should be some way to override the defaults. See #629 for considerations.
#[derive(Debug, Clone)]
pub struct BridgeDescDownloadConfig {
/// How many bridge descriptor downloads to attempt in parallel?
parallelism: NonZeroU8,
/// Default/initial time to retry a failure to download a descriptor
///
/// (This has the semantics of an initial delay for [`RetryDelay`],
/// and is used unless there is more specific retry information for the particular failure.)
retry: Duration,
/// When a downloaded descriptor is going to expire, how soon in advance to refetch it?
prefetch: Duration,
/// Minimum interval between successive refetches of the descriptor for the same bridge
///
/// This limits the download activity which can be caused by an errant bridge.
///
/// If the descriptor's validity information is shorter than this, we will use
/// it after it has expired (rather than treating the bridge as broken).
min_refetch: Duration,
/// Maximum interval between successive refetches of the descriptor for the same bridge
///
/// This sets an upper bound on how old a descriptor we are willing to use.
/// When this time expires, a refetch attempt will be started even if the
/// descriptor is not going to expire soon.
//
// TODO: When this is configurable, we need to make sure we reject
// configurations with max_refresh < min_refresh, or we may panic.
max_refetch: Duration,
}
impl Default for BridgeDescDownloadConfig {
fn default() -> Self {
let secs = Duration::from_secs;
BridgeDescDownloadConfig {
parallelism: 4.try_into().expect("parallelism is zero"),
retry: secs(30),
prefetch: secs(1000),
min_refetch: secs(3600),
max_refetch: secs(3600 * 3), // matches C Tor behaviour
}
}
}
/// Mockable internal methods for within the `BridgeDescMgr`
///
/// Implemented for `()`, meaning "do not use mocks: use the real versions of everything".
///
/// This (`()`) is the default for the type parameter in
/// [`BridgeDescMgr`],
/// and it is the only publicly available implementation,
/// since this trait is sealed.
pub trait Mockable<R>: mockable::MockableAPI<R> {}
impl<R: Runtime> Mockable<R> for () {}
/// Private module which seals [`Mockable`]
/// by containing [`MockableAPI`](mockable::MockableAPI)
mod mockable {
use super::*;
/// Defines the actual mockable APIs
///
/// Not nameable (and therefore not implementable)
/// outside the `bridgedesc` module,
#[async_trait]
pub trait MockableAPI<R>: Clone + Send + Sync + 'static {
/// Circuit manager
type CircMgr: Send + Sync + 'static;
/// Download this bridge's descriptor, and return it as a string
///
/// Runs in a task.
/// Called by `Manager::download_descriptor`, which handles parsing and validation.
///
/// If `if_modified_since` is `Some`,
/// should tolerate an HTTP 304 Not Modified and return `None` in that case.
/// If `if_modified_since` is `None`, returning `Ok(None,)` is forbidden.
async fn download(
self,
runtime: &R,
circmgr: &Self::CircMgr,
bridge: &BridgeConfig,
if_modified_since: Option<SystemTime>,
) -> Result<Option<String>, Error>;
}
}
#[async_trait]
impl<R: Runtime> mockable::MockableAPI<R> for () {
type CircMgr = Arc<CircMgr<R>>;
/// Actual code for downloading a descriptor document
async fn download(
self,
runtime: &R,
circmgr: &Self::CircMgr,
bridge: &BridgeConfig,
_if_modified_since: Option<SystemTime>,
) -> Result<Option<String>, Error> {
// TODO actually support _if_modified_since
let circuit = circmgr.get_or_launch_dir_specific(bridge).await?;
let mut stream = circuit
.begin_dir_stream()
.await
.map_err(Error::StreamFailed)?;
let request = tor_dirclient::request::RoutersOwnDescRequest::new();
let response = tor_dirclient::download(runtime, &request, &mut stream, None)
.await
.map_err(|dce| match dce {
tor_dirclient::Error::RequestFailed(re) => Error::RequestFailed(re),
_ => internal!("tor_dirclient::download gave non-RequestFailed {:?}", dce).into(),
})?;
let output = response.into_output_string()?;
Ok(Some(output))
}
}
/// The actual manager.
struct Manager<R: Runtime, M: Mockable<R>> {
/// The mutable state
state: Mutex<State>,
/// Runtime, used for tasks and sleeping
runtime: R,
/// Circuit manager, used for creating circuits
circmgr: M::CircMgr,
/// Persistent state store
store: Arc<Mutex<DynStore>>,
/// Mock for testing, usually `()`
mockable: M,
}
/// State: our downloaded descriptors (cache), and records of what we're doing
///
/// Various functions (both tasks and public entrypoints),
/// which generally start with a `Manager`,
/// lock the mutex and modify this.
///
/// Generally, the flow is:
///
/// * A public entrypoint, or task, obtains a [`StateGuard`].
/// It modifies the state to represent the callers' new requirements,
/// or things it has done, by updating the state,
/// preserving the invariants but disturbing the "liveness" (see below).
///
/// * [`StateGuard::drop`] calls [`State::process`].
/// This restores the liveness properties.
///
/// ### Possible states of a bridge:
///
/// A bridge can be in one of the following states,
/// represented by its presence in these particular data structures inside `State`:
///
/// * `running`/`queued`: newly added, no outcome yet.
/// * `current` + `running`/`queued`: we are fetching (or going to)
/// * `current = OK` + `refetch_schedule`: fetched OK, will refetch before expiry
/// * `current = Err` + `retry_schedule`: failed, will retry at some point
///
/// ### Invariants:
///
/// Can be disrupted in the middle of a principal function,
/// but should be restored on return.
///
/// * **Tracked**:
/// Each bridge appears at most once in
/// `running`, `queued`, `refetch_schedule` and `retry_schedule`.
/// We call such a bridge Tracked.
///
/// * **Current**
/// Every bridge in `current` is Tracked.
/// (But not every Tracked bridge is necessarily in `current`, yet.)
///
/// * **Schedules**
/// Every bridge in `refetch_schedule` or `retry_schedule` is also in `current`.
///
/// * **Input**:
/// Exactly each bridge that was passed to
/// the last call to [`set_bridges()`](BridgeDescMgr::set_bridges) is Tracked.
/// (If we encountered spawn failures, we treat this as trying to shut down,
/// so we cease attempts to get bridges, and discard the relevant state, violating this.)
///
/// * **Limit**:
/// `running` is capped at the effective parallelism: zero if we are dormant,
/// the configured parallelism otherwise.
///
/// ### Liveness properties:
///
/// These can be disrupted by any function which holds a [`StateGuard`].
/// Will be restored by [`process()`](State::process),
/// which is called when `StateGuard` is dropped.
///
/// Functions that take a `StateGuard` may disturb these invariants
/// and rely on someone else to restore them.
///
/// * **Running**:
/// If `queued` is nonempty, `running` is full.
///
/// * **Timeout**:
/// `earliest_timeout` is the earliest timeout in
/// either `retry_schedule` or `refetch_schedule`.
/// (Disturbances of this property which occur due to system time warps
/// are not necessarily detected and remedied in a timely way,
/// but will be remedied no later than after `max_refetch`.)
struct State {
/// Our configuration
config: Arc<BridgeDescDownloadConfig>,
/// People who will be told when `current` changes.
subscribers: FlagPublisher<BridgeDescEvent>,
/// Our current idea of our output, which we give out handles onto.
current: Arc<BridgeDescList>,
/// Bridges whose descriptors we are currently downloading.
running: HashMap<BridgeKey, RunningInfo>,
/// Bridges which we want to download,
/// but we're waiting for `running` to be less than `effective_parallelism()`.
queued: VecDeque<QueuedEntry>,
/// Are we dormant?
dormancy: Dormancy,
/// Bridges that we have a descriptor for,
/// and when they should be refetched due to validity expiry.
///
/// This is indexed by `SystemTime` because that helps avoids undesirable behaviours
/// when the system clock changes.
refetch_schedule: BinaryHeap<RefetchEntry<SystemTime, ()>>,
/// Bridges that failed earlier, and when they should be retried.
retry_schedule: BinaryHeap<RefetchEntry<Instant, RetryDelay>>,
/// Earliest time from either `retry_schedule` or `refetch_schedule`
///
/// `None` means "wait indefinitely".
earliest_timeout: postage::watch::Sender<Option<Instant>>,
}
impl Debug for State {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
/// Helper to format one bridge entry somewhere
fn fmt_bridge(
f: &mut fmt::Formatter,
b: &BridgeConfig,
info: &(dyn Display + '_),
) -> fmt::Result {
let info = info.to_string(); // fmt::Formatter doesn't enforce precision, so do this
writeln!(f, " {:80.80} | {}", info, b)
}
/// Helper to format one of the schedules
fn fmt_schedule<TT: Ord + Copy + Debug, RD>(
f: &mut fmt::Formatter,
summary: &str,
name: &str,
schedule: &BinaryHeap<RefetchEntry<TT, RD>>,
) -> fmt::Result {
writeln!(f, " {}:", name)?;
for b in schedule {
fmt_bridge(f, &b.bridge, &format_args!("{} {:?}", summary, &b.when))?;
}
Ok(())
}
// We are going to have to go multi-line because of the bridge lines,
// so do completely bespoke formatting rather than `std::fmt::DebugStruct`
// or a derive.
writeln!(f, "State {{")?;
// We'd like to print earliest_timeout but watch::Sender::borrow takes &mut
writeln!(f, " earliest_timeout: ???, ..,")?;
writeln!(f, " current:")?;
for (b, v) in &*self.current {
fmt_bridge(
f,
b,
&match v {
Err(e) => Cow::from(format!("C Err {}", e)),
Ok(_) => "C Ok".into(),
},
)?;
}
writeln!(f, " running:")?;
for b in self.running.keys() {
fmt_bridge(f, b, &"R")?;
}
writeln!(f, " queued:")?;
for qe in &self.queued {
fmt_bridge(f, &qe.bridge, &"Q")?;
}
fmt_schedule(f, "FS", "refetch_schedule", &self.refetch_schedule)?;
fmt_schedule(f, "TS", "retry_schedule", &self.retry_schedule)?;
write!(f, "}}")?;
Ok(())
}
}
/// Value of the entry in `running`
#[derive(Debug)]
struct RunningInfo {
/// For cancelling downloads no longer wanted
join: JoinHandle,
/// If this previously failed, the persistent retry delay.
retry_delay: Option<RetryDelay>,
}
/// Entry in `queued`
#[derive(Debug)]
struct QueuedEntry {
/// The bridge to fetch
bridge: BridgeKey,
/// If this previously failed, the persistent retry delay.
retry_delay: Option<RetryDelay>,
}
/// Entry in one of the `*_schedule`s
///
/// Implements `Ord` and `Eq` but *only looking at the refetch time*.
/// So don't dedupe by `[Partial]Eq`, or use as a key in a map.
#[derive(Debug)]
struct RefetchEntry<TT, RD> {
/// When should we requeued this bridge for fetching
///
/// Either [`Instant`] (in `retry_schedule`) or [`SystemTime`] (in `refetch_schedule`).
when: TT,
/// The bridge to refetch
bridge: BridgeKey,
/// Retry delay
///
/// `RetryDelay` if we previously failed (ie, if this is a retry entry);
/// otherwise `()`.
retry_delay: RD,
}
impl<TT: Ord, RD> Ord for RefetchEntry<TT, RD> {
fn cmp(&self, other: &Self) -> Ordering {
self.when.cmp(&other.when).reverse()
// We don't care about the ordering of BridgeConfig or retry_delay.
// Different BridgeConfig with the same fetch time will be fetched in "some order".
}
}
impl<TT: Ord, RD> PartialOrd for RefetchEntry<TT, RD> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<TT: Ord, RD> PartialEq for RefetchEntry<TT, RD> {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == Ordering::Equal
}
}
impl<TT: Ord, RD> Eq for RefetchEntry<TT, RD> {}
/// Dummy task join handle
///
/// We would like to be able to cancel now-redundant downloads
/// using something like `tokio::task::JoinHandle::abort()`.
/// tor-rtcompat doesn't support that so we stub it for now.
///
/// Providing this stub means the place where the cancellation needs to take place
/// already has the appropriate call to our [`JoinHandle::abort`].
#[derive(Debug)]
struct JoinHandle;
impl JoinHandle {
/// Would abort this async task, if we could do that.
fn abort(&self) {}
}
impl<R: Runtime> BridgeDescMgr<R> {
/// Create a new `BridgeDescMgr`
///
/// This is the public constructor.
//
// TODO: That this constructor requires a DirMgr is rather odd.
// In principle there is little reason why you need a DirMgr to make a BridgeDescMgr.
// However, BridgeDescMgr needs a Store, and currently that is a private trait, and the
// implementation is constructible only from the dirmgr's config. This should probably be
// tidied up somehow, at some point, perhaps by exposing `Store` and its configuration.
pub fn new(
config: &BridgeDescDownloadConfig,
runtime: R,
store: DirMgrStore<R>,
circmgr: Arc<tor_circmgr::CircMgr<R>>,
dormancy: Dormancy,
) -> Result<Self, StartupError> {
Self::new_internal(runtime, circmgr, store.store, config, dormancy, ())
}
}
/// If download was successful, what we obtained
///
/// Generated by `process_document`, from a downloaded (or cached) textual descriptor.
#[derive(Debug)]
struct Downloaded {
/// The bridge descriptor, fully parsed and verified
desc: BridgeDesc,
/// When we should start a refresh for this descriptor
///
/// This is derived from the expiry time,
/// and clamped according to limits in the configuration).
refetch: SystemTime,
}
impl<R: Runtime, M: Mockable<R>> BridgeDescMgr<R, M> {
/// Actual constructor, which takes a mockable
//
// Allow passing `runtime` by value, which is usual API for this kind of setup function.
#[allow(clippy::needless_pass_by_value)]
fn new_internal(
runtime: R,
circmgr: M::CircMgr,
store: Arc<Mutex<DynStore>>,
config: &BridgeDescDownloadConfig,
dormancy: Dormancy,
mockable: M,
) -> Result<Self, StartupError> {
/// Convenience alias
fn default<T: Default>() -> T {
Default::default()
}
let config = config.clone().into();
let (earliest_timeout, timeout_update) = postage::watch::channel();
let state = Mutex::new(State {
config,
subscribers: default(),
current: default(),
running: default(),
queued: default(),
dormancy,
retry_schedule: default(),
refetch_schedule: default(),
earliest_timeout,
});
let mgr = Arc::new(Manager {
state,
runtime: runtime.clone(),
circmgr,
store,
mockable,
});
runtime
.spawn(timeout_task(
runtime.clone(),
Arc::downgrade(&mgr),
timeout_update,
))
.map_err(|cause| StartupError::Spawn {
spawning: "timeout task",
cause: cause.into(),
})?;
Ok(BridgeDescMgr { mgr })
}
/// Consistency check convenience wrapper
#[cfg(test)]
fn check_consistency<'i, I>(&self, input_bridges: Option<I>)
where
I: IntoIterator<Item = &'i BridgeKey>,
{
self.mgr
.lock_only()
.check_consistency(&self.mgr.runtime, input_bridges);
}
/// Set whether this `BridgeDescMgr` is active
// TODO this should instead be handled by a central mechanism; see TODO on Dormancy
pub fn set_dormancy(&self, dormancy: Dormancy) {
self.mgr.lock_then_process().dormancy = dormancy;
}
}
impl<R: Runtime, M: Mockable<R>> BridgeDescProvider for BridgeDescMgr<R, M> {
fn bridges(&self) -> Arc<BridgeDescList> {
self.mgr.lock_only().current.clone()
}
fn events(&self) -> BoxStream<'static, BridgeDescEvent> {
let stream = self.mgr.lock_only().subscribers.subscribe();
Box::pin(stream) as _
}
fn set_bridges(&self, new_bridges: &[BridgeConfig]) {
/// Helper: Called for each bridge that is currently Tracked.
///
/// Checks if `new_bridges` has `bridge`. If so, removes it from `new_bridges`,
/// and returns `true`, indicating that this bridge should be kept.
///
/// If not, returns `false`, indicating that this bridge should be removed,
/// and logs a message.
fn note_found_keep_p(
new_bridges: &mut HashSet<BridgeKey>,
bridge: &BridgeKey,
was_state: &str,
) -> bool {
let keep = new_bridges.remove(bridge);
if !keep {
debug!(r#"forgetting bridge ({}) "{}""#, was_state, bridge);
}
keep
}
/// Helper: filters `*_schedule` so that it contains only things in `new_bridges`,
/// removing them as we go.
fn filter_schedule<TT: Ord + Copy, RD>(
new_bridges: &mut HashSet<BridgeKey>,
schedule: &mut BinaryHeap<RefetchEntry<TT, RD>>,
was_state: &str,
) {
schedule.retain_ext(|b| note_found_keep_p(new_bridges, &b.bridge, was_state));
}
let mut state = self.mgr.lock_then_process();
let state = &mut **state;
// We go through our own data structures, comparing them with `new_bridges`.
// Entries in our own structures that aren't in `new_bridges` are removed.
// Entries that *are* are removed from `new_bridges`.
// Eventually `new_bridges` is just the list of new bridges to *add*.
let mut new_bridges: HashSet<_> = new_bridges.iter().cloned().collect();
// Is there anything in `current` that ought to be deleted?
if state.current.keys().any(|b| !new_bridges.contains(b)) {
// Found a brridge In `current` but not `new`
// We need to remove it (and any others like it) from `current`.
//
// Disturbs the invariant *Schedules*:
// After this maybe the schedules have entries they shouldn't.
let current: BridgeDescList = state
.current
.iter()
.filter(|(b, _)| new_bridges.contains(&**b))
.map(|(b, v)| (b.clone(), v.clone()))
.collect();
state.set_current_and_notify(current);
} else {
// Nothing is being removed, so we can keep `current`.
}
// Bridges being newly requested will be added to `current`
// later, after they have been fetched.
// Is there anything in running we should abort?
state.running.retain(|b, ri| {
let keep = note_found_keep_p(&mut new_bridges, b, "was downloading");
if !keep {
ri.join.abort();
}
keep
});
// Is there anything in queued we should forget about?
state
.queued
.retain(|qe| note_found_keep_p(&mut new_bridges, &qe.bridge, "was queued"));
// Restore the invariant *Schedules*, that the schedules contain only things in current,
// by removing the same things from the schedules that we earlier removed from current.
filter_schedule(
&mut new_bridges,
&mut state.retry_schedule,
"previously failed",
);
filter_schedule(
&mut new_bridges,
&mut state.refetch_schedule,
"previously downloaded",
);
// OK now we have the list of bridges to add (if any).
state.queued.extend(new_bridges.into_iter().map(|bridge| {
debug!(r#" added bridge, queueing for download "{}""#, &bridge);
QueuedEntry {
bridge,
retry_delay: None,
}
}));
// `StateGuard`, from `lock_then_process`, gets dropped here, and runs `process`,
// to make further progress and restore the liveness properties.
}
}
impl<R: Runtime, M: Mockable<R>> Manager<R, M> {
/// Obtain a lock on state, for functions that want to disrupt liveness properties
///
/// When `StateGuard` is dropped, the liveness properties will be restored
/// by making whatever progress is required.
///
/// See [`State`].
fn lock_then_process<'s>(self: &'s Arc<Self>) -> StateGuard<'s, R, M> {
StateGuard {
state: self.lock_only(),
mgr: self,
}
}
/// Obtains the lock on state.
///
/// Caller ought not to modify state
/// so as to invalidate invariants or liveness properties.
/// Callers which are part of the algorithms in this crate
/// ought to consider [`lock_then_process`](Manager::lock_then_process) instead.
fn lock_only(&self) -> MutexGuard<State> {
self.state.lock().expect("bridge desc manager poisoned")
}
}
/// Writeable reference to [`State`], entitling the holder to disrupt liveness properties.
///
/// The holder must still maintain the invariants.
///
/// Obtained from [`Manager::lock_then_process`]. See [`State`].
#[derive(Educe, Deref, DerefMut)]
#[educe(Debug)]
struct StateGuard<'s, R: Runtime, M: Mockable<R>> {
/// Reference to the mutable state
#[deref]
#[deref_mut]
state: MutexGuard<'s, State>,
/// Reference to the outer container
///
/// Allows the holder to obtain a `'static` (owned) handle `Arc<Manager>`,
/// for use by spawned tasks.
#[educe(Debug(ignore))]
mgr: &'s Arc<Manager<R, M>>,
}
impl<R: Runtime, M: Mockable<R>> Drop for StateGuard<'_, R, M> {
fn drop(&mut self) {
self.state.process(self.mgr);
}
}
impl State {
/// Ensure progress is made, by restoring all the liveness invariants
///
/// This includes launching circuits as needed.
fn process<R: Runtime, M: Mockable<R>>(&mut self, mgr: &Arc<Manager<R, M>>) {
// Restore liveness property *Running*
self.consider_launching(mgr);
let now_wall = mgr.runtime.wallclock();
// Mitigate clock warping
//
// If the earliest `SystemTime` is more than `max_refetch` away,
// the clock must have warped. If that happens we clamp
// them all to `max_refetch`.
//
// (This is not perfect but will mitigate the worst effects by ensuring
// that we do *something* at least every `max_refetch`, in the worst case,
// other than just getting completely stuck.)
let max_refetch_wall = now_wall + self.config.max_refetch;
if self
.refetch_schedule
.peek()
.map(|re| re.when > max_refetch_wall)
== Some(true)
{
info!("bridge descriptor manager: clock warped, clamping refetch times");
self.refetch_schedule = self
.refetch_schedule
.drain()
.map(|mut re| {
re.when = max_refetch_wall;
re
})
.collect();
}
// Restore liveness property *Timeout**
// postage::watch will tell up the timeout task about the new wakeup time.
let new_earliest_timeout = [
// First retry. These are std Instant.
self.retry_schedule.peek().map(|re| re.when),
// First refetch. These are SystemTime, so we must convert them.
self.refetch_schedule.peek().map(|re| {
// If duration_since gives Err, that means when is before now,
// ie we should not be waiting: the wait duration should be 0.
let wait = re.when.duration_since(now_wall).unwrap_or_default();
mgr.runtime.now() + wait
}),
]
.into_iter()
.flatten()
.min();
*self.earliest_timeout.borrow_mut() = new_earliest_timeout;
}
/// Launch download attempts if we can
///
/// Specifically: if we have things in `queued`, and `running` is shorter than
/// `effective_parallelism()`, we launch task(s) to attempt download(s).
///
/// Restores liveness invariant *Running*.
///
/// Idempotent. Forms part of `process`.
fn consider_launching<R: Runtime, M: Mockable<R>>(&mut self, mgr: &Arc<Manager<R, M>>) {
let mut to_remove = vec![];
while self.running.len() < self.effective_parallelism() {
let QueuedEntry {
bridge,
retry_delay,
} = match self.queued.pop_front() {
Some(qe) => qe,
None => break,
};
match mgr
.runtime
.spawn({
let config = self.config.clone();
let bridge = bridge.clone();
let inner = mgr.clone();
let mockable = inner.mockable.clone();
// The task which actually downloads a descriptor.
async move {
let got =
AssertUnwindSafe(inner.download_descriptor(mockable, &bridge, &config))
.catch_unwind()
.await
.unwrap_or_else(|_| {
Err(internal!("download descriptor task paniced!").into())
});
match &got {
Ok(_) => debug!(r#"download succeeded for "{}""#, bridge),
Err(err) => debug!(r#"download failed for "{}": {}"#, bridge, err),
};
let mut state = inner.lock_then_process();
state.record_download_outcome(bridge, got);
// `StateGuard`, from `lock_then_process`, gets dropped here, and runs `process`,
// to make further progress and restore the liveness properties.
}
})
.map(|()| JoinHandle)
{
Ok(join) => {
self.running
.insert(bridge, RunningInfo { join, retry_delay });
}
Err(_) => {
// Spawn failed.
//
// We are going to forget about this bridge.
// And we're going to do that without notifying anyone.
// We *do* want to remove it from `current` because simply forgetting
// about a refetch could leave expired data there.
// We amortise this, so we don't do a lot of O(n^2) work on shutdown.
to_remove.push(bridge);
}
}
}
if !to_remove.is_empty() {
self.modify_current(|current| {
for bridge in to_remove {
current.remove(&bridge);
}
});
}
}
/// Modify `current` and notify subscribers
///
/// Helper function which modifies only `current`, not any of the rest of the state.
/// it is the caller's responsibility to ensure that the invariants are upheld.
///
/// The implementation actually involves cloning `current`,
/// so it is best to amortise calls to this function.
fn modify_current<T, F: FnOnce(&mut BridgeDescList) -> T>(&mut self, f: F) -> T {
let mut current = (*self.current).clone();
let r = f(&mut current);
self.set_current_and_notify(current);
r
}
/// Set `current` to a value and notify
///
/// Helper function which modifies only `current`, not any of the rest of the state.
/// it is the caller's responsibility to ensure that the invariants are upheld.
fn set_current_and_notify<BDL: Into<Arc<BridgeDescList>>>(&mut self, new: BDL) {
self.current = new.into();
self.subscribers.publish(BridgeDescEvent::SomethingChanged);
}
/// Obtain the currently-desired level of parallelism
///
/// Helper function. The return value depends the mutable state and also the `config`.
///
/// This is how we implement dormancy.
fn effective_parallelism(&self) -> usize {
match self.dormancy {
Dormancy::Active => usize::from(u8::from(self.config.parallelism)),
Dormancy::Dormant => 0,
}
}
}
impl<R: Runtime, M: Mockable<R>> StateGuard<'_, R, M> {
/// Record a download outcome.
///
/// Final act of the the descriptor download task.
/// `got` is from [`download_descriptor`](Manager::download_descriptor).
fn record_download_outcome(&mut self, bridge: BridgeKey, got: Result<Downloaded, Error>) {
let RunningInfo { retry_delay, .. } = match self.running.remove(&bridge) {
Some(ri) => ri,
None => {
debug!("bridge descriptor download completed for deconfigured bridge");
return;
}
};
let insert = match got {
Ok(Downloaded { desc, refetch }) => {
// Successful download. Schedule the refetch, and we'll insert Ok.
self.refetch_schedule.push(RefetchEntry {
when: refetch,
bridge: bridge.clone(),
retry_delay: (),
});
Ok(desc)
}
Err(err) => {
// Failed. Schedule the retry, and we'll insert Err.
let mut retry_delay =
retry_delay.unwrap_or_else(|| RetryDelay::from_duration(self.config.retry));
let retry = err.retry_time();
// We retry at least as early as
let now = self.mgr.runtime.now();
let retry = retry.absolute(now, || retry_delay.next_delay(&mut rand::thread_rng()));
// Retry at least as early as max_refetch. That way if a bridge is
// misconfigured we will see it be fixed eventually.
let retry = {
let earliest = now;
let latest = || now + self.config.max_refetch;
match retry {
AbsRetryTime::Immediate => earliest,
AbsRetryTime::Never => latest(),
AbsRetryTime::At(i) => i.clamp(earliest, latest()),
}
};
self.retry_schedule.push(RefetchEntry {
when: retry,
bridge: bridge.clone(),
retry_delay,
});
Err(Box::new(err) as _)
}
};
self.modify_current(|current| current.insert(bridge, insert));
}
}
impl<R: Runtime, M: Mockable<R>> Manager<R, M> {
/// Downloads a descriptor.
///
/// The core of the descriptor download task
/// launched by `State::consider_launching`.
///
/// Uses Mockable::download to actually get the document.
/// So most of this function is parsing and checking.
///
/// The returned value is precisely the `got` input to
/// [`record_download_outcome`](StateGuard::record_download_outcome).
async fn download_descriptor(
&self,
mockable: M,
bridge: &BridgeConfig,
config: &BridgeDescDownloadConfig,
) -> Result<Downloaded, Error> {
// convenience alias, capturing the usual parameters from our variables.
let process_document = |text| process_document(&self.runtime, config, text);
let store = || {
self.store
.lock()
.map_err(|_| internal!("bridge descriptor store poisoned"))
};
let cache_entry: Option<CachedBridgeDescriptor> = (|| store()?.lookup_bridgedesc(bridge))()
.unwrap_or_else(|err| {
error!(
r#"bridge descriptor cache lookup failed, for "{}": {}"#,
sensitive(bridge),
tor_error::Report(&err)
);
None
});
let now = self.runtime.wallclock();
let cached_good: Option<Downloaded> = if let Some(cached) = &cache_entry {
if cached.fetched > now {
// was fetched "in the future"
None
} else {
// let's see if it's any use
match process_document(&cached.document) {
Err(err) => {
// We had a doc in the cache but our attempt to use it failed
// We wouldn't have written a bad cache entry.
// So one of the following must be true:
// * We were buggy or are stricter now or something
// * The document was valid but its validity time has expired
// In any case we can't reuse it.
// (This happens in normal operation, when a document expires.)
trace!(r#"cached document for "{}" invalid: {}"#, &bridge, err);
None
}
Ok(got) => {
// The cached document looks valid.
// But how long ago did we fetch it?
// We need to enforce max_refresh even for still-valid documents.
if now.duration_since(cached.fetched).ok() <= Some(config.max_refetch) {
// Was fetchd recently, too. We can just reuse it.
return Ok(got);
}
Some(got)
}
}
}
} else {
None
};
// If cached_good is Some, we found a plausible cache entry; if we got here, it was
// past its max_refresh. So in that case we want to send a request with
// if-modified-since. If we get Not Modified, we can reuse it (and update the fetched time).
let if_modified_since = cached_good
.as_ref()
.map(|got| got.desc.as_ref().published());
debug!(
r#"starting download for "{}"{}"#,
bridge,
match if_modified_since {
Some(ims) => format!(
" if-modified-since {}",
humantime::format_rfc3339_seconds(ims),
),
None => "".into(),
}
);
let text = mockable
.clone()
.download(&self.runtime, &self.circmgr, bridge, if_modified_since)
.await?;
let (document, got) = if let Some(text) = text {
let got = process_document(&text)?;
(text, got)
} else if let Some(cached) = cached_good {
(
cache_entry
.expect("cached_good but not cache_entry")
.document,
cached,
)
} else {
return Err(internal!("download gave None but no if-modified-since").into());
};
// IEFI catches cache store errors, which we log but don't do anything else with
(|| {
let cached = CachedBridgeDescriptor {
document,
fetched: now, // this is from before we started the fetch, which is correct
};
// Calculate when the cache should forget about this.
// We want to add a bit of slop for the purposes of mild clock skew handling,
// etc., and the prefetch time is a good proxy for that.
let until = got
.refetch
.checked_add(config.prefetch)
.unwrap_or(got.refetch /*uh*/);
store()?.store_bridgedesc(bridge, cached, until)?;
Ok(())
})()
.unwrap_or_else(|err: crate::Error| {
error!(
"failed to cache downloaded bridge descriptor: {}",
tor_error::Report(err),
);
});
Ok(got)
}
}
/// Processes and analyses a textual descriptor document into a `Downloaded`
///
/// Parses it, checks the signature, checks the document validity times,
/// and if that's all good, calculates when will want to refetch it.
fn process_document<R: Runtime>(
runtime: &R,
config: &BridgeDescDownloadConfig,
text: &str,
) -> Result<Downloaded, Error> {
let desc = RouterDesc::parse(text)?;
// We *could* just trust this because we have trustworthy provenance
// we know that the channel machinery authenticated the identity keys in `bridge`.
// But let's do some cross-checking anyway.
// `check_signature` checks the self-signature.
let desc = desc.check_signature().map_err(Arc::new)?;
let now = runtime.wallclock();
desc.is_valid_at(&now)?;
// Justification that use of "dangerously" is correct:
// 1. We have checked this just above, so it is valid now.
// 2. We are extracting the timeout and implement our own refetch logic using expires.
let (desc, (_, expires)) = desc.dangerously_into_parts();
// Our refetch schedule, and enforcement of descriptor expiry, is somewhat approximate.
// The following situations can result in a nominally-expired descriptor being used:
//
// 1. We primarily enforce the timeout by looking at the expiry time,
// subtracting a configured constant, and scheduling the start of a refetch then.
// If it takes us longer to do the retry, than the prefetch constant,
// we'll still be providing the old descriptor to consumers in the meantime.
//
// 2. We apply a minimum time before we will refetch a descriptor.
// So if the validity time is unreasonably short, we'll use it beyond that time.
//
// 3. Clock warping could confuse this algorithm. This is inevitable because we
// are relying on calendar times (SystemTime) in the descriptor, and because
// we don't have a mechanism for being told about clock warps rather than the
// passage of time.
//
// We think this is all OK given that a bridge descriptor is used for trying to
// connect to the bridge itself. In particular, we don't want to completely trust
// bridges to control our retry logic.
let refetch = match expires {
ops::Bound::Included(expires) | ops::Bound::Excluded(expires) => expires
.checked_sub(config.prefetch)
.ok_or(Error::ExtremeValidityTime)?,
ops::Bound::Unbounded => now
.checked_add(config.max_refetch)
.ok_or(Error::ExtremeValidityTime)?,
};
let refetch = refetch.clamp(now + config.min_refetch, now + config.max_refetch);
let desc = BridgeDesc::new(Arc::new(desc));
Ok(Downloaded { desc, refetch })
}
/// Task which waits for the timeout, and requeues bridges that need to be refetched
///
/// This task's job is to execute the wakeup instructions provided via `updates`.
///
/// `updates` is the receiving end of [`State`]'s `earliest_timeout`,
/// which is maintained to be the earliest time any of the schedules says we should wake up
/// (liveness property *Timeout*).
async fn timeout_task<R: Runtime, M: Mockable<R>>(
runtime: R,
inner: Weak<Manager<R, M>>,
update: postage::watch::Receiver<Option<Instant>>,
) {
/// Requeue things in `*_schedule` whose time for action has arrived
///
/// `retry_delay_map` converts `retry_delay` from the schedule (`RetryDelay` or `()`)
/// into the `Option` which appears in [`QueuedEntry`].
///
/// Helper function. Idempotent.
fn requeue_as_required<TT: Ord + Copy + Debug, RD, RDM: Fn(RD) -> Option<RetryDelay>>(
queued: &mut VecDeque<QueuedEntry>,
schedule: &mut BinaryHeap<RefetchEntry<TT, RD>>,
now: TT,
retry_delay_map: RDM,
) {
while let Some(ent) = schedule.peek() {
if ent.when > now {
break;
}
let re = schedule.pop().expect("schedule became empty!");
let bridge = re.bridge;
let retry_delay = retry_delay_map(re.retry_delay);
queued.push_back(QueuedEntry {
bridge,
retry_delay,
});
}
}
let mut next_wakeup = Some(runtime.now());
let mut update = update.fuse();
loop {
select! {
// Someone modified the schedules, and sent us a new earliest timeout
changed = update.next() => {
// changed is Option<Option< >>.
// The outer Option is from the Stream impl for watch::Receiver - None means EOF.
// The inner Option is Some(wakeup_time), or None meaning "wait indefinitely"
next_wakeup = if let Some(changed) = changed {
changed
} else {
// Oh, actually, the watch::Receiver is EOF - we're to shut down
break
}
},
// Wait until the specified earliest wakeup time
() = async {
if let Some(next_wakeup) = next_wakeup {
let now = runtime.now();
if next_wakeup > now {
let duration = next_wakeup - now;
runtime.sleep(duration).await;
}
} else {
#[allow(clippy::semicolon_if_nothing_returned)] // rust-clippy/issues/9729
{ future::pending().await }
}
}.fuse() => {
// We have reached the pre-programmed time. Check what needs doing.
let inner = if let Some(i) = inner.upgrade() { i } else { break; };
let mut state = inner.lock_then_process();
let state = &mut **state; // Do the DerefMut once so we can borrow fields
requeue_as_required(
&mut state.queued,
&mut state.refetch_schedule,
runtime.wallclock(),
|()| None,
);
requeue_as_required(
&mut state.queued,
&mut state.retry_schedule,
runtime.now(),
Some,
);
// `StateGuard`, from `lock_then_process`, gets dropped here, and runs `process`,
// to make further progress and restore the liveness properties.
}
}
}
}
/// Error which occurs during bridge descriptor manager startup
#[derive(Clone, Debug, thiserror::Error)]
#[non_exhaustive]
pub enum StartupError {
/// No circuit manager in the directory manager
#[error(
"tried to create bridge descriptor manager from directory manager with no circuit manager"
)]
MissingCircMgr,
/// Unable to spawn task
//
// TODO lots of our Errors have a variant exactly like this.
// Maybe we should make a struct tor_error::SpawnError.
#[error("Unable to spawn {spawning}")]
Spawn {
/// What we were trying to spawn.
spawning: &'static str,
/// What happened when we tried to spawn it.
#[source]
cause: Arc<SpawnError>,
},
}
impl HasKind for StartupError {
fn kind(&self) -> ErrorKind {
use ErrorKind as EK;
use StartupError as SE;
match self {
SE::MissingCircMgr => EK::Internal,
SE::Spawn { cause, .. } => cause.kind(),
}
}
}
/// An error which occurred trying to obtain the descriptor for a particular bridge
#[derive(Clone, Debug, thiserror::Error)]
#[non_exhaustive]
pub enum Error {
/// Couldn't establish a circuit to the bridge
#[error("Failed to establish circuit")]
CircuitFailed(#[from] tor_circmgr::Error),
/// Couldn't establish a directory stream to the bridge
#[error("Failed to establish directory stream")]
StreamFailed(#[source] tor_proto::Error),
/// Directory request failed
#[error("Directory request failed")]
RequestFailed(#[from] tor_dirclient::RequestFailedError),
/// Failed to parse descriptor in response
#[error("Failed to parse descriptor in response")]
ParseFailed(#[from] tor_netdoc::Error),
/// Signature check failed
#[error("Signature check failed")]
SignatureCheckFailed(#[from] Arc<signature::Error>),
/// Obtained descriptor but it is outside its validity time
#[error("Descriptor is outside its validity time, as supplied")]
BadValidityTime(#[from] tor_checkable::TimeValidityError),
/// A bridge descriptor has very extreme validity times
/// such that our refetch time calculations overflow.
#[error("Descriptor validity time range is too extreme for us to cope with")]
ExtremeValidityTime,
/// There was a programming error somewhere in our code, or the calling code.
#[error("Programming error")]
Bug(#[from] tor_error::Bug),
/// Error used for testing
#[cfg(test)]
#[error("Error for testing, {0:?}, retry at {1:?}")]
TestError(&'static str, RetryTime),
}
impl HasKind for Error {
fn kind(&self) -> ErrorKind {
use Error as E;
use ErrorKind as EK;
let bridge_protocol_violation = EK::TorAccessFailed;
match self {
// We trust that tor_circmgr returns TorAccessFailed when it ought to.
E::CircuitFailed(e) => e.kind(),
E::StreamFailed(e) => e.kind(),
E::RequestFailed(e) => e.kind(),
E::ParseFailed(..) => bridge_protocol_violation,
E::SignatureCheckFailed(..) => bridge_protocol_violation,
E::ExtremeValidityTime => bridge_protocol_violation,
E::BadValidityTime(..) => EK::ClockSkew,
E::Bug(e) => e.kind(),
#[cfg(test)]
E::TestError(..) => EK::Internal,
}
}
}
impl HasRetryTime for Error {
fn retry_time(&self) -> RetryTime {
use Error as E;
use RetryTime as R;
match self {
// Errors with their own retry times
E::CircuitFailed(e) => e.retry_time(),
// Remote misbehaviour, maybe the network is being strange?
E::StreamFailed(..) => R::AfterWaiting,
E::RequestFailed(..) => R::AfterWaiting,
// Remote misconfiguration, detected *after* we successfully made the channel
// (so not a network problem). We'll say "never" for RetryTime,
// even though actually we will in fact retry in at most `max_refetch`.
E::ParseFailed(..) => R::Never,
E::SignatureCheckFailed(..) => R::Never,
E::BadValidityTime(..) => R::Never,
E::ExtremeValidityTime => R::Never,
// Probably, things are broken here, rather than remotely.
E::Bug(..) => R::Never,
#[cfg(test)]
E::TestError(_, retry) => *retry,
}
}
}
impl BridgeDescError for Error {}
impl State {
/// Consistency check (for testing)
///
/// `input` should be what was passed to `set_bridges` (or `None` if not known).
///
/// Does not make any changes.
/// Only takes `&mut` because postage::watch::Sender::borrow` wants it.
#[cfg(test)]
fn check_consistency<'i, R, I>(&mut self, runtime: &R, input: Option<I>)
where
R: Runtime,
I: IntoIterator<Item = &'i BridgeKey>,
{
/// Where we found a thing was Tracked
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
enum Where {
/// Found in `running`
Running,
/// Found in `queued`
Queued,
/// Found in the schedule `sch`
Schedule {
sch_name: &'static str,
/// Starts out as `false`, set to `true` when we find this in `current`
found_in_current: bool,
},
}
/// Records the expected input from `input`, and what we have found so far
struct Tracked {
/// Were we told what the last `set_bridges` call got as input?
known_input: bool,
/// `Some` means we have seen this bridge in one our records (other than `current`)
tracked: HashMap<BridgeKey, Option<Where>>,
/// Earliest instant found in any schedule
earliest: Option<Instant>,
}
let mut tracked = if let Some(input) = input {
let tracked = input.into_iter().map(|b| (b.clone(), None)).collect();
Tracked {
tracked,
known_input: true,
earliest: None,
}
} else {
Tracked {
tracked: HashMap::new(),
known_input: false,
earliest: None,
}
};
impl Tracked {
/// Note that `bridge` is Tracked
fn note(&mut self, where_: Where, b: &BridgeKey) {
match self.tracked.get(b) {
// Invariant *Tracked* - ie appears at most once
Some(Some(prev_where)) => {
panic!("duplicate {:?} {:?} {:?}", prev_where, where_, b);
}
// Invariant *Input (every tracked bridge is was in input)*
None if self.known_input => {
panic!("unexpected {:?} {:?}", where_, b);
}
// OK, we've not seen it before, note it as being here
_ => {
self.tracked.insert(b.clone(), Some(where_));
}
}
}
}
/// Walk `schedule` and update `tracked` (including `tracked.earliest`)
///
/// Check invariant *Tracked* and *Schedule* wrt this schedule.
#[cfg(test)]
fn walk_sch<TT: Ord + Copy + Debug, RD, CT: Fn(TT) -> Instant>(
tracked: &mut Tracked,
sch_name: &'static str,
schedule: &BinaryHeap<RefetchEntry<TT, RD>>,
conv_time: CT,
) {
let where_ = Where::Schedule {
sch_name,
found_in_current: false,
};
if let Some(first) = schedule.peek() {
// Of course this is a heap, so this ought to be a wasteful scan,
// but, indirectly,this tests our implementation of `Ord` for `RefetchEntry`.
for re in schedule {
tracked.note(where_, &re.bridge);
}
let scanned = schedule
.iter()
.map(|re| re.when)
.min()
.expect("schedule empty!");
assert_eq!(scanned, first.when);
tracked.earliest = Some(
[tracked.earliest, Some(conv_time(scanned))]
.into_iter()
.flatten()
.min()
.expect("flatten of chain Some was empty"),
);
}
}
// *Timeout* (prep)
//
// This will fail if there is clock skew, but won't mind if
// the earliest refetch time is in the past.
let now_wall = runtime.wallclock();
let now_mono = runtime.now();
let adj_wall = |wallclock: SystemTime| {
// Good grief what a palaver!
if let Ok(ahead) = wallclock.duration_since(now_wall) {
now_mono + ahead
} else if let Ok(behind) = now_wall.duration_since(wallclock) {
now_mono - behind
} else {
panic!("times should be totally ordered!")
}
};
// *Tracked*
//
// We walk our data structures in turn
for b in self.running.keys() {
tracked.note(Where::Running, b);
}
for qe in &self.queued {
tracked.note(Where::Queued, &qe.bridge);
}
walk_sch(&mut tracked, "refetch", &self.refetch_schedule, adj_wall);
walk_sch(&mut tracked, "retry", &self.retry_schedule, |t| t);
// *Current*
for b in self.current.keys() {
let found = tracked
.tracked
.get_mut(b)
.and_then(Option::as_mut)
.unwrap_or_else(|| panic!("current but untracked {:?}", b));
if let Where::Schedule {
found_in_current, ..
} = found
{
*found_in_current = true;
}
}
// *Input (sense: every input bridge is tracked)*
//
// (Will not cope if spawn ever failed, since that violates the invariant.)
for (b, where_) in &tracked.tracked {
match where_ {
None => panic!("missing {}", &b),
Some(Where::Schedule {
sch_name,
found_in_current,
}) => {
assert!(found_in_current, "not-Schedule {} {}", &b, sch_name);
}
_ => {}
}
}
// *Limit*
let parallelism = self.effective_parallelism();
assert!(self.running.len() <= parallelism);
// *Running*
assert!(self.running.len() == parallelism || self.queued.is_empty());
// *Timeout* (final)
assert_eq!(tracked.earliest, *self.earliest_timeout.borrow());
}
}