use std::borrow::Cow;
use std::cmp::Ordering;
use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
use std::fmt::{self, Debug, Display};
use std::num::NonZeroU8;
use std::panic::AssertUnwindSafe;
use std::sync::{Arc, Mutex, MutexGuard, Weak};
use async_trait::async_trait;
use derive_more::{Deref, DerefMut};
use educe::Educe;
use futures::FutureExt;
use futures::future;
use futures::select;
use futures::stream::{BoxStream, StreamExt};
use futures::task::SpawnError;
use tracing::{debug, info, trace};
use safelog::sensitive;
use tor_basic_utils::retry::RetryDelay;
use tor_checkable::{SelfSigned, Timebound};
use tor_circmgr::CircMgr;
use tor_error::{AbsRetryTime, HasRetryTime, RetryTime};
use tor_error::{ErrorKind, HasKind, error_report, internal};
use tor_guardmgr::bridge::{BridgeConfig, BridgeDesc};
use tor_guardmgr::bridge::{BridgeDescError, BridgeDescEvent, BridgeDescList, BridgeDescProvider};
use tor_netdoc::doc::routerdesc::RouterDesc;
use tor_rtcompat::{Runtime, SpawnExt as _};
use web_time_compat::{Duration, Instant, SystemTime};
use crate::event::FlagPublisher;
use crate::storage::CachedBridgeDescriptor;
use crate::{DirMgrStore, DynStore};
#[cfg(test)]
mod bdtest;
type BridgeKey = BridgeConfig;
#[non_exhaustive]
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
pub enum Dormancy {
Dormant,
Active,
}
#[derive(Clone)]
pub struct BridgeDescMgr<R: Runtime, M = ()>
where
M: Mockable<R>,
{
mgr: Arc<Manager<R, M>>,
}
#[derive(Debug, Clone)]
pub struct BridgeDescDownloadConfig {
parallelism: NonZeroU8,
retry: Duration,
prefetch: Duration,
min_refetch: Duration,
max_refetch: Duration,
}
impl Default for BridgeDescDownloadConfig {
fn default() -> Self {
let secs = Duration::from_secs;
BridgeDescDownloadConfig {
parallelism: 4.try_into().expect("parallelism is zero"),
retry: secs(30),
prefetch: secs(1000),
min_refetch: secs(3600),
max_refetch: secs(3600 * 3), }
}
}
pub trait Mockable<R>: mockable::MockableAPI<R> {}
impl<R: Runtime> Mockable<R> for () {}
mod mockable {
use super::*;
#[async_trait]
pub trait MockableAPI<R>: Clone + Send + Sync + 'static {
type CircMgr: Send + Sync + 'static;
async fn download(
self,
runtime: &R,
circmgr: &Self::CircMgr,
bridge: &BridgeConfig,
if_modified_since: Option<SystemTime>,
) -> Result<Option<String>, Error>;
}
}
#[async_trait]
impl<R: Runtime> mockable::MockableAPI<R> for () {
type CircMgr = Arc<CircMgr<R>>;
async fn download(
self,
runtime: &R,
circmgr: &Self::CircMgr,
bridge: &BridgeConfig,
_if_modified_since: Option<SystemTime>,
) -> Result<Option<String>, Error> {
let tunnel = circmgr.get_or_launch_dir_specific(bridge).await?;
let mut stream = tunnel
.begin_dir_stream()
.await
.map_err(Error::StreamFailed)?;
let request = tor_dirclient::request::RoutersOwnDescRequest::new();
let response = tor_dirclient::send_request(runtime, &request, &mut stream, None)
.await
.map_err(|dce| match dce {
tor_dirclient::Error::RequestFailed(re) => Error::RequestFailed(re),
_ => internal!(
"tor_dirclient::send_request gave non-RequestFailed {:?}",
dce
)
.into(),
})?;
let output = response.into_output_string()?;
Ok(Some(output))
}
}
struct Manager<R: Runtime, M: Mockable<R>> {
state: Mutex<State>,
runtime: R,
circmgr: M::CircMgr,
store: Arc<Mutex<DynStore>>,
mockable: M,
}
struct State {
config: Arc<BridgeDescDownloadConfig>,
subscribers: FlagPublisher<BridgeDescEvent>,
current: Arc<BridgeDescList>,
running: HashMap<BridgeKey, RunningInfo>,
queued: VecDeque<QueuedEntry>,
dormancy: Dormancy,
refetch_schedule: BinaryHeap<RefetchEntry<SystemTime, ()>>,
retry_schedule: BinaryHeap<RefetchEntry<Instant, RetryDelay>>,
earliest_timeout: postage::watch::Sender<Option<Instant>>,
}
impl Debug for State {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fn fmt_bridge(
f: &mut fmt::Formatter,
b: &BridgeConfig,
info: &(dyn Display + '_),
) -> fmt::Result {
let info = info.to_string(); writeln!(f, " {:80.80} | {}", info, b)
}
fn fmt_schedule<TT: Ord + Copy + Debug, RD>(
f: &mut fmt::Formatter,
summary: &str,
name: &str,
schedule: &BinaryHeap<RefetchEntry<TT, RD>>,
) -> fmt::Result {
writeln!(f, " {}:", name)?;
for b in schedule {
fmt_bridge(f, &b.bridge, &format_args!("{} {:?}", summary, &b.when))?;
}
Ok(())
}
writeln!(f, "State {{")?;
writeln!(f, " earliest_timeout: ???, ..,")?;
writeln!(f, " current:")?;
for (b, v) in &*self.current {
fmt_bridge(
f,
b,
&match v {
Err(e) => Cow::from(format!("C Err {}", e)),
Ok(_) => "C Ok".into(),
},
)?;
}
writeln!(f, " running:")?;
for b in self.running.keys() {
fmt_bridge(f, b, &"R")?;
}
writeln!(f, " queued:")?;
for qe in &self.queued {
fmt_bridge(f, &qe.bridge, &"Q")?;
}
fmt_schedule(f, "FS", "refetch_schedule", &self.refetch_schedule)?;
fmt_schedule(f, "TS", "retry_schedule", &self.retry_schedule)?;
write!(f, "}}")?;
Ok(())
}
}
#[derive(Debug)]
struct RunningInfo {
join: JoinHandle,
retry_delay: Option<RetryDelay>,
}
#[derive(Debug)]
struct QueuedEntry {
bridge: BridgeKey,
retry_delay: Option<RetryDelay>,
}
#[derive(Debug)]
struct RefetchEntry<TT, RD> {
when: TT,
bridge: BridgeKey,
retry_delay: RD,
}
impl<TT: Ord, RD> Ord for RefetchEntry<TT, RD> {
fn cmp(&self, other: &Self) -> Ordering {
self.when.cmp(&other.when).reverse()
}
}
impl<TT: Ord, RD> PartialOrd for RefetchEntry<TT, RD> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<TT: Ord, RD> PartialEq for RefetchEntry<TT, RD> {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == Ordering::Equal
}
}
impl<TT: Ord, RD> Eq for RefetchEntry<TT, RD> {}
#[derive(Debug)]
struct JoinHandle;
impl JoinHandle {
fn abort(&self) {}
}
impl<R: Runtime> BridgeDescMgr<R> {
pub fn new(
config: &BridgeDescDownloadConfig,
runtime: R,
store: DirMgrStore<R>,
circmgr: Arc<tor_circmgr::CircMgr<R>>,
dormancy: Dormancy,
) -> Result<Self, StartupError> {
Self::new_internal(runtime, circmgr, store.store, config, dormancy, ())
}
}
#[derive(Debug)]
struct Downloaded {
desc: BridgeDesc,
refetch: SystemTime,
}
impl<R: Runtime, M: Mockable<R>> BridgeDescMgr<R, M> {
#[allow(clippy::needless_pass_by_value)]
fn new_internal(
runtime: R,
circmgr: M::CircMgr,
store: Arc<Mutex<DynStore>>,
config: &BridgeDescDownloadConfig,
dormancy: Dormancy,
mockable: M,
) -> Result<Self, StartupError> {
fn default<T: Default>() -> T {
Default::default()
}
let config = config.clone().into();
let (earliest_timeout, timeout_update) = postage::watch::channel();
let state = Mutex::new(State {
config,
subscribers: default(),
current: default(),
running: default(),
queued: default(),
dormancy,
retry_schedule: default(),
refetch_schedule: default(),
earliest_timeout,
});
let mgr = Arc::new(Manager {
state,
runtime: runtime.clone(),
circmgr,
store,
mockable,
});
runtime
.spawn(timeout_task(
runtime.clone(),
Arc::downgrade(&mgr),
timeout_update,
))
.map_err(|cause| StartupError::Spawn {
spawning: "timeout task",
cause: cause.into(),
})?;
Ok(BridgeDescMgr { mgr })
}
#[cfg(test)]
fn check_consistency<'i, I>(&self, input_bridges: Option<I>)
where
I: IntoIterator<Item = &'i BridgeKey>,
{
self.mgr
.lock_only()
.check_consistency(&self.mgr.runtime, input_bridges);
}
pub fn set_dormancy(&self, dormancy: Dormancy) {
self.mgr.lock_then_process().dormancy = dormancy;
}
}
impl<R: Runtime, M: Mockable<R>> BridgeDescProvider for BridgeDescMgr<R, M> {
fn bridges(&self) -> Arc<BridgeDescList> {
self.mgr.lock_only().current.clone()
}
fn events(&self) -> BoxStream<'static, BridgeDescEvent> {
let stream = self.mgr.lock_only().subscribers.subscribe();
Box::pin(stream) as _
}
fn set_bridges(&self, new_bridges: &[BridgeConfig]) {
fn note_found_keep_p(
new_bridges: &mut HashSet<BridgeKey>,
bridge: &BridgeKey,
was_state: &str,
) -> bool {
let keep = new_bridges.remove(bridge);
if !keep {
debug!(r#"forgetting bridge ({}) "{}""#, was_state, bridge);
}
keep
}
fn filter_schedule<TT: Ord + Copy, RD>(
new_bridges: &mut HashSet<BridgeKey>,
schedule: &mut BinaryHeap<RefetchEntry<TT, RD>>,
was_state: &str,
) {
schedule.retain(|b| note_found_keep_p(new_bridges, &b.bridge, was_state));
}
let mut state = self.mgr.lock_then_process();
let state = &mut **state;
let mut new_bridges: HashSet<_> = new_bridges.iter().cloned().collect();
if state.current.keys().any(|b| !new_bridges.contains(b)) {
let current: BridgeDescList = state
.current
.iter()
.filter(|(b, _)| new_bridges.contains(&**b))
.map(|(b, v)| (b.clone(), v.clone()))
.collect();
state.set_current_and_notify(current);
} else {
}
state.running.retain(|b, ri| {
let keep = note_found_keep_p(&mut new_bridges, b, "was downloading");
if !keep {
ri.join.abort();
}
keep
});
state
.queued
.retain(|qe| note_found_keep_p(&mut new_bridges, &qe.bridge, "was queued"));
filter_schedule(
&mut new_bridges,
&mut state.retry_schedule,
"previously failed",
);
filter_schedule(
&mut new_bridges,
&mut state.refetch_schedule,
"previously downloaded",
);
state.queued.extend(new_bridges.into_iter().map(|bridge| {
debug!(r#" added bridge, queueing for download "{}""#, &bridge);
QueuedEntry {
bridge,
retry_delay: None,
}
}));
}
}
impl<R: Runtime, M: Mockable<R>> Manager<R, M> {
fn lock_then_process<'s>(self: &'s Arc<Self>) -> StateGuard<'s, R, M> {
StateGuard {
state: self.lock_only(),
mgr: self,
}
}
fn lock_only(&self) -> MutexGuard<State> {
self.state.lock().expect("bridge desc manager poisoned")
}
}
#[derive(Educe, Deref, DerefMut)]
#[educe(Debug)]
struct StateGuard<'s, R: Runtime, M: Mockable<R>> {
#[deref]
#[deref_mut]
state: MutexGuard<'s, State>,
#[educe(Debug(ignore))]
mgr: &'s Arc<Manager<R, M>>,
}
impl<R: Runtime, M: Mockable<R>> Drop for StateGuard<'_, R, M> {
fn drop(&mut self) {
self.state.process(self.mgr);
}
}
impl State {
fn process<R: Runtime, M: Mockable<R>>(&mut self, mgr: &Arc<Manager<R, M>>) {
self.consider_launching(mgr);
let now_wall = mgr.runtime.wallclock();
let max_refetch_wall = now_wall + self.config.max_refetch;
if self
.refetch_schedule
.peek()
.map(|re| re.when > max_refetch_wall)
== Some(true)
{
info!("bridge descriptor manager: clock warped, clamping refetch times");
self.refetch_schedule = self
.refetch_schedule
.drain()
.map(|mut re| {
re.when = max_refetch_wall;
re
})
.collect();
}
let new_earliest_timeout = [
self.retry_schedule.peek().map(|re| re.when),
self.refetch_schedule.peek().map(|re| {
let wait = re.when.duration_since(now_wall).unwrap_or_default();
mgr.runtime.now() + wait
}),
]
.into_iter()
.flatten()
.min();
*self.earliest_timeout.borrow_mut() = new_earliest_timeout;
}
#[allow(clippy::blocks_in_conditions)]
fn consider_launching<R: Runtime, M: Mockable<R>>(&mut self, mgr: &Arc<Manager<R, M>>) {
let mut to_remove = vec![];
while self.running.len() < self.effective_parallelism() {
let QueuedEntry {
bridge,
retry_delay,
} = match self.queued.pop_front() {
Some(qe) => qe,
None => break,
};
match mgr
.runtime
.spawn({
let config = self.config.clone();
let bridge = bridge.clone();
let inner = mgr.clone();
let mockable = inner.mockable.clone();
async move {
let got =
AssertUnwindSafe(inner.download_descriptor(mockable, &bridge, &config))
.catch_unwind()
.await
.unwrap_or_else(|_| {
Err(internal!("download descriptor task panicked!").into())
});
match &got {
Ok(_) => debug!(r#"download succeeded for "{}""#, bridge),
Err(err) => debug!(r#"download failed for "{}": {}"#, bridge, err),
};
let mut state = inner.lock_then_process();
state.record_download_outcome(bridge, got);
}
})
.map(|()| JoinHandle)
{
Ok(join) => {
self.running
.insert(bridge, RunningInfo { join, retry_delay });
}
Err(_) => {
to_remove.push(bridge);
}
}
}
if !to_remove.is_empty() {
self.modify_current(|current| {
for bridge in to_remove {
current.remove(&bridge);
}
});
}
}
fn modify_current<T, F: FnOnce(&mut BridgeDescList) -> T>(&mut self, f: F) -> T {
let mut current = (*self.current).clone();
let r = f(&mut current);
self.set_current_and_notify(current);
r
}
fn set_current_and_notify<BDL: Into<Arc<BridgeDescList>>>(&mut self, new: BDL) {
self.current = new.into();
self.subscribers.publish(BridgeDescEvent::SomethingChanged);
}
fn effective_parallelism(&self) -> usize {
match self.dormancy {
Dormancy::Active => usize::from(u8::from(self.config.parallelism)),
Dormancy::Dormant => 0,
}
}
}
impl<R: Runtime, M: Mockable<R>> StateGuard<'_, R, M> {
fn record_download_outcome(&mut self, bridge: BridgeKey, got: Result<Downloaded, Error>) {
let RunningInfo { retry_delay, .. } = match self.running.remove(&bridge) {
Some(ri) => ri,
None => {
debug!("bridge descriptor download completed for no-longer-configured bridge");
return;
}
};
let insert = match got {
Ok(Downloaded { desc, refetch }) => {
self.refetch_schedule.push(RefetchEntry {
when: refetch,
bridge: bridge.clone(),
retry_delay: (),
});
Ok(desc)
}
Err(err) => {
let mut retry_delay =
retry_delay.unwrap_or_else(|| RetryDelay::from_duration(self.config.retry));
let retry = err.retry_time();
let now = self.mgr.runtime.now();
let retry = retry.absolute(now, || retry_delay.next_delay(&mut rand::rng()));
let retry = {
let earliest = now;
let latest = || now + self.config.max_refetch;
match retry {
AbsRetryTime::Immediate => earliest,
AbsRetryTime::Never => latest(),
AbsRetryTime::At(i) => i.clamp(earliest, latest()),
}
};
self.retry_schedule.push(RefetchEntry {
when: retry,
bridge: bridge.clone(),
retry_delay,
});
Err(Box::new(err) as _)
}
};
self.modify_current(|current| current.insert(bridge, insert));
}
}
impl<R: Runtime, M: Mockable<R>> Manager<R, M> {
async fn download_descriptor(
&self,
mockable: M,
bridge: &BridgeConfig,
config: &BridgeDescDownloadConfig,
) -> Result<Downloaded, Error> {
let process_document = |text| process_document(&self.runtime, config, text);
let store = || {
self.store
.lock()
.map_err(|_| internal!("bridge descriptor store poisoned"))
};
let cache_entry: Option<CachedBridgeDescriptor> = (|| store()?.lookup_bridgedesc(bridge))()
.unwrap_or_else(|err| {
error_report!(
err,
r#"bridge descriptor cache lookup failed, for "{}""#,
sensitive(bridge),
);
None
});
let now = self.runtime.wallclock();
let cached_good: Option<Downloaded> = if let Some(cached) = &cache_entry {
if cached.fetched > now {
None
} else {
match process_document(&cached.document) {
Err(err) => {
trace!(r#"cached document for "{}" invalid: {}"#, &bridge, err);
None
}
Ok(got) => {
if now.duration_since(cached.fetched).ok() <= Some(config.max_refetch) {
return Ok(got);
}
Some(got)
}
}
}
} else {
None
};
let if_modified_since = cached_good
.as_ref()
.map(|got| got.desc.as_ref().published());
debug!(
r#"starting download for "{}"{}"#,
bridge,
match if_modified_since {
Some(ims) => format!(
" if-modified-since {}",
humantime::format_rfc3339_seconds(ims),
),
None => "".into(),
}
);
let text = mockable
.clone()
.download(&self.runtime, &self.circmgr, bridge, if_modified_since)
.await?;
let (document, got) = if let Some(text) = text {
let got = process_document(&text)?;
(text, got)
} else if let Some(cached) = cached_good {
(
cache_entry
.expect("cached_good but not cache_entry")
.document,
cached,
)
} else {
return Err(internal!("download gave None but no if-modified-since").into());
};
(|| {
let cached = CachedBridgeDescriptor {
document,
fetched: now, };
let until = got
.refetch
.checked_add(config.prefetch)
.unwrap_or(got.refetch );
store()?.store_bridgedesc(bridge, cached, until)?;
Ok(())
})()
.unwrap_or_else(|err: crate::Error| {
error_report!(err, "failed to cache downloaded bridge descriptor",);
});
Ok(got)
}
}
fn process_document<R: Runtime>(
runtime: &R,
config: &BridgeDescDownloadConfig,
text: &str,
) -> Result<Downloaded, Error> {
let desc = RouterDesc::parse(text)?;
let desc = desc.check_signature().map_err(Arc::new)?;
let now = runtime.wallclock();
desc.is_valid_at(&now)?;
let (desc, (_, expires)) = desc.dangerously_into_parts();
let refetch = match expires {
Some(expires) => expires
.checked_sub(config.prefetch)
.ok_or(Error::ExtremeValidityTime)?,
None => now
.checked_add(config.max_refetch)
.ok_or(Error::ExtremeValidityTime)?,
};
let refetch = refetch.clamp(now + config.min_refetch, now + config.max_refetch);
let desc = BridgeDesc::new(Arc::new(desc));
Ok(Downloaded { desc, refetch })
}
async fn timeout_task<R: Runtime, M: Mockable<R>>(
runtime: R,
inner: Weak<Manager<R, M>>,
update: postage::watch::Receiver<Option<Instant>>,
) {
fn requeue_as_required<TT: Ord + Copy + Debug, RD, RDM: Fn(RD) -> Option<RetryDelay>>(
queued: &mut VecDeque<QueuedEntry>,
schedule: &mut BinaryHeap<RefetchEntry<TT, RD>>,
now: TT,
retry_delay_map: RDM,
) {
while let Some(ent) = schedule.peek() {
if ent.when > now {
break;
}
let re = schedule.pop().expect("schedule became empty!");
let bridge = re.bridge;
let retry_delay = retry_delay_map(re.retry_delay);
queued.push_back(QueuedEntry {
bridge,
retry_delay,
});
}
}
let mut next_wakeup = Some(runtime.now());
let mut update = update.fuse();
loop {
select! {
changed = update.next() => {
next_wakeup = if let Some(changed) = changed {
changed
} else {
break
}
},
() = async {
if let Some(next_wakeup) = next_wakeup {
let now = runtime.now();
if next_wakeup > now {
let duration = next_wakeup - now;
runtime.sleep(duration).await;
}
} else {
#[allow(clippy::semicolon_if_nothing_returned)] { future::pending().await }
}
}.fuse() => {
let inner = if let Some(i) = inner.upgrade() { i } else { break; };
let mut state = inner.lock_then_process();
let state = &mut **state;
requeue_as_required(
&mut state.queued,
&mut state.refetch_schedule,
runtime.wallclock(),
|()| None,
);
requeue_as_required(
&mut state.queued,
&mut state.retry_schedule,
runtime.now(),
Some,
);
}
}
}
}
#[derive(Clone, Debug, thiserror::Error)]
#[non_exhaustive]
pub enum StartupError {
#[error(
"tried to create bridge descriptor manager from directory manager with no circuit manager"
)]
MissingCircMgr,
#[error("Unable to spawn {spawning}")]
Spawn {
spawning: &'static str,
#[source]
cause: Arc<SpawnError>,
},
}
impl HasKind for StartupError {
fn kind(&self) -> ErrorKind {
use ErrorKind as EK;
use StartupError as SE;
match self {
SE::MissingCircMgr => EK::Internal,
SE::Spawn { cause, .. } => cause.kind(),
}
}
}
#[derive(Clone, Debug, thiserror::Error)]
#[non_exhaustive]
pub enum Error {
#[error("Failed to establish circuit")]
CircuitFailed(#[from] tor_circmgr::Error),
#[error("Failed to establish directory stream")]
StreamFailed(#[source] tor_circmgr::Error),
#[error("Directory request failed")]
RequestFailed(#[from] tor_dirclient::RequestFailedError),
#[error("Failed to parse descriptor in response")]
ParseFailed(#[from] tor_netdoc::Error),
#[error("Signature check failed")]
SignatureCheckFailed(#[from] Arc<signature::Error>),
#[error("Descriptor is outside its validity time, as supplied")]
BadValidityTime(#[from] tor_checkable::TimeValidityError),
#[error("Descriptor validity time range is too extreme for us to cope with")]
ExtremeValidityTime,
#[error("Programming error")]
Bug(#[from] tor_error::Bug),
#[cfg(test)]
#[error("Error for testing, {0:?}, retry at {1:?}")]
TestError(&'static str, RetryTime),
}
impl HasKind for Error {
fn kind(&self) -> ErrorKind {
use Error as E;
use ErrorKind as EK;
let bridge_protocol_violation = EK::TorAccessFailed;
match self {
E::CircuitFailed(e) => e.kind(),
E::StreamFailed(e) => e.kind(),
E::RequestFailed(e) => e.kind(),
E::ParseFailed(..) => bridge_protocol_violation,
E::SignatureCheckFailed(..) => bridge_protocol_violation,
E::ExtremeValidityTime => bridge_protocol_violation,
E::BadValidityTime(..) => EK::ClockSkew,
E::Bug(e) => e.kind(),
#[cfg(test)]
E::TestError(..) => EK::Internal,
}
}
}
impl HasRetryTime for Error {
fn retry_time(&self) -> RetryTime {
use Error as E;
use RetryTime as R;
match self {
E::CircuitFailed(e) => e.retry_time(),
E::StreamFailed(..) => R::AfterWaiting,
E::RequestFailed(..) => R::AfterWaiting,
E::ParseFailed(..) => R::Never,
E::SignatureCheckFailed(..) => R::Never,
E::BadValidityTime(..) => R::Never,
E::ExtremeValidityTime => R::Never,
E::Bug(..) => R::Never,
#[cfg(test)]
E::TestError(_, retry) => *retry,
}
}
}
impl BridgeDescError for Error {}
impl State {
#[cfg(test)]
fn check_consistency<'i, R, I>(&mut self, runtime: &R, input: Option<I>)
where
R: Runtime,
I: IntoIterator<Item = &'i BridgeKey>,
{
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
enum Where {
Running,
Queued,
Schedule {
sch_name: &'static str,
found_in_current: bool,
},
}
struct Tracked {
known_input: bool,
tracked: HashMap<BridgeKey, Option<Where>>,
earliest: Option<Instant>,
}
let mut tracked = if let Some(input) = input {
let tracked = input.into_iter().map(|b| (b.clone(), None)).collect();
Tracked {
tracked,
known_input: true,
earliest: None,
}
} else {
Tracked {
tracked: HashMap::new(),
known_input: false,
earliest: None,
}
};
impl Tracked {
fn note(&mut self, where_: Where, b: &BridgeKey) {
match self.tracked.get(b) {
Some(Some(prev_where)) => {
panic!("duplicate {:?} {:?} {:?}", prev_where, where_, b);
}
None if self.known_input => {
panic!("unexpected {:?} {:?}", where_, b);
}
_ => {
self.tracked.insert(b.clone(), Some(where_));
}
}
}
}
#[cfg(test)]
fn walk_sch<TT: Ord + Copy + Debug, RD, CT: Fn(TT) -> Instant>(
tracked: &mut Tracked,
sch_name: &'static str,
schedule: &BinaryHeap<RefetchEntry<TT, RD>>,
conv_time: CT,
) {
let where_ = Where::Schedule {
sch_name,
found_in_current: false,
};
if let Some(first) = schedule.peek() {
for re in schedule {
tracked.note(where_, &re.bridge);
}
let scanned = schedule
.iter()
.map(|re| re.when)
.min()
.expect("schedule empty!");
assert_eq!(scanned, first.when);
tracked.earliest = Some(
[tracked.earliest, Some(conv_time(scanned))]
.into_iter()
.flatten()
.min()
.expect("flatten of chain Some was empty"),
);
}
}
let now_wall = runtime.wallclock();
let now_mono = runtime.now();
let adj_wall = |wallclock: SystemTime| {
if let Ok(ahead) = wallclock.duration_since(now_wall) {
now_mono + ahead
} else if let Ok(behind) = now_wall.duration_since(wallclock) {
now_mono
.checked_sub(behind)
.expect("time subtraction underflow")
} else {
panic!("times should be totally ordered!")
}
};
for b in self.running.keys() {
tracked.note(Where::Running, b);
}
for qe in &self.queued {
tracked.note(Where::Queued, &qe.bridge);
}
walk_sch(&mut tracked, "refetch", &self.refetch_schedule, adj_wall);
walk_sch(&mut tracked, "retry", &self.retry_schedule, |t| t);
for b in self.current.keys() {
let found = tracked
.tracked
.get_mut(b)
.and_then(Option::as_mut)
.unwrap_or_else(|| panic!("current but untracked {:?}", b));
if let Where::Schedule {
found_in_current, ..
} = found
{
*found_in_current = true;
}
}
for (b, where_) in &tracked.tracked {
match where_ {
None => panic!("missing {}", &b),
Some(Where::Schedule {
sch_name,
found_in_current,
}) => {
assert!(found_in_current, "not-Schedule {} {}", &b, sch_name);
}
_ => {}
}
}
let parallelism = self.effective_parallelism();
assert!(self.running.len() <= parallelism);
assert!(self.running.len() == parallelism || self.queued.is_empty());
assert_eq!(tracked.earliest, *self.earliest_timeout.borrow());
}
}