use std::{
fmt,
marker::PhantomData,
pin::Pin,
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
task::Poll,
time::SystemTime,
};
use educe::Educe;
use futures::{Future, StreamExt, stream::Stream};
use itertools::chain;
use paste::paste;
use time::OffsetDateTime;
use tor_basic_utils::skip_fmt;
use tor_netdir::DirEvent;
use tor_netdoc::doc::netstatus;
#[cfg(feature = "bridge-client")]
use tor_guardmgr::bridge::BridgeDescEvent;
use crate::bootstrap::AttemptId;
pub(crate) trait FlagEvent: Sized {
const MAXIMUM: u16;
fn to_index(self) -> u16;
fn from_index(flag: u16) -> Option<Self>;
}
macro_rules! impl_FlagEvent { { $ty:ident } => { paste!{
impl FlagEvent for $ty {
const MAXIMUM: u16 = {
let count = <$ty as $crate::strum::EnumCount>::COUNT;
(count - 1) as u16
};
fn to_index(self) -> u16 {
self.into()
}
fn from_index(flag: u16) -> Option<Self> {
flag.try_into().ok()
}
}
#[test]
#[allow(non_snake_case)]
fn [< flagevent_test_variant_numbers_ $ty >]() {
for variant in <$ty as $crate::strum::IntoEnumIterator>::iter() {
assert!(<$ty as FlagEvent>::to_index(variant) <=
<$ty as FlagEvent>::MAXIMUM,
"impl_FlagEvent only allowed if discriminators are dense");
}
}
} } }
impl_FlagEvent! { DirEvent }
#[cfg(feature = "bridge-client")]
impl_FlagEvent! { BridgeDescEvent }
pub(crate) struct FlagPublisher<F> {
inner: Arc<Inner<F>>,
}
struct Inner<F> {
event: event_listener::Event,
counts: Vec<AtomicUsize>, n_publishers: AtomicUsize,
_phantom: PhantomData<fn(F) -> F>,
}
pub(crate) struct FlagListener<F> {
my_counts: Vec<usize>,
listener: event_listener::EventListener,
inner: Arc<Inner<F>>,
}
impl<F: FlagEvent> Default for FlagPublisher<F> {
fn default() -> Self {
Self::new()
}
}
impl<F: FlagEvent> FlagPublisher<F> {
pub(crate) fn new() -> Self {
let counts = std::iter::repeat_with(AtomicUsize::default)
.take(F::MAXIMUM as usize + 1)
.collect();
FlagPublisher {
inner: Arc::new(Inner {
event: event_listener::Event::new(),
counts,
n_publishers: AtomicUsize::new(1),
_phantom: PhantomData,
}),
}
}
pub(crate) fn subscribe(&self) -> FlagListener<F> {
let listener = self.inner.event.listen();
FlagListener {
my_counts: self
.inner
.counts
.iter()
.map(|a| a.load(Ordering::SeqCst))
.collect(),
listener,
inner: Arc::clone(&self.inner),
}
}
pub(crate) fn publish(&self, flag: F) {
self.inner.counts[flag.to_index() as usize].fetch_add(1, Ordering::SeqCst);
self.inner.event.notify(usize::MAX);
}
}
impl<F> Clone for FlagPublisher<F> {
fn clone(&self) -> FlagPublisher<F> {
self.inner.n_publishers.fetch_add(1, Ordering::SeqCst);
FlagPublisher {
inner: Arc::clone(&self.inner),
}
}
}
impl<F> Drop for FlagPublisher<F> {
fn drop(&mut self) {
if self.inner.n_publishers.fetch_sub(1, Ordering::SeqCst) == 1 {
self.inner.event.notify(usize::MAX);
}
}
}
impl<F: FlagEvent> Stream for FlagListener<F> {
type Item = F;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
loop {
for idx in 0..F::MAXIMUM as usize + 1 {
let cur = self.inner.counts[idx].load(Ordering::SeqCst);
if cur != self.my_counts[idx] {
self.my_counts[idx] = cur;
return Poll::Ready(Some(F::from_index(idx as u16).expect("Internal error")));
}
}
if self.inner.n_publishers.load(Ordering::SeqCst) == 0 {
return Poll::Ready(None);
}
if let Poll::Ready(()) = Pin::new(&mut self.listener).poll(cx) {
self.listener = self.inner.event.listen();
} else {
return Poll::Pending;
}
}
}
}
#[derive(Clone, Debug, Default)]
pub struct DirBootstrapStatus(StatusEnum);
#[derive(Clone, Debug, Default)]
enum StatusEnum {
#[default]
NoActivity,
Single {
current: StatusEntry,
},
Replacing {
current: StatusEntry,
next: StatusEntry,
},
}
#[derive(Clone, Debug)]
struct StatusEntry {
id: AttemptId,
status: DirStatus,
}
#[derive(Clone, Debug, Default, derive_more::Display)]
#[display("{0}", progress)]
pub(crate) struct DirStatus {
progress: DirProgress,
n_resets: usize,
n_errors: usize,
n_stalls: usize,
}
#[derive(Clone, Debug, Educe)]
#[educe(Default)]
pub(crate) enum DirProgress {
#[educe(Default)]
NoConsensus {
#[allow(dead_code)]
after: Option<SystemTime>,
},
FetchingCerts {
lifetime: netstatus::Lifetime,
usable_lifetime: netstatus::Lifetime,
n_certs: (u16, u16),
},
Validated {
lifetime: netstatus::Lifetime,
usable_lifetime: netstatus::Lifetime,
n_mds: (u32, u32),
usable: bool,
},
}
#[derive(Clone, Debug, derive_more::Display)]
#[non_exhaustive]
pub enum DirBlockage {
#[display("Can't make progress.")]
Stalled,
#[display("Too many errors without making progress.")]
TooManyErrors,
#[display("Had to reset bootstrapping too many times.")]
TooManyResets,
}
impl fmt::Display for DirProgress {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fn fmt_time(t: SystemTime) -> String {
use std::sync::LazyLock;
static FORMAT: LazyLock<Vec<time::format_description::FormatItem>> =
LazyLock::new(|| {
time::format_description::parse(
"[year]-[month]-[day] [hour]:[minute]:[second] UTC",
)
.expect("Invalid time format")
});
OffsetDateTime::from(t)
.format(&FORMAT)
.unwrap_or_else(|_| "(could not format)".into())
}
match &self {
DirProgress::NoConsensus { .. } => write!(f, "fetching a consensus"),
DirProgress::FetchingCerts { n_certs, .. } => write!(
f,
"fetching authority certificates ({}/{})",
n_certs.0, n_certs.1
),
DirProgress::Validated {
usable: false,
n_mds,
..
} => write!(f, "fetching microdescriptors ({}/{})", n_mds.0, n_mds.1),
DirProgress::Validated {
usable: true,
lifetime,
..
} => write!(
f,
"usable, fresh until {}, and valid until {}",
fmt_time(lifetime.fresh_until()),
fmt_time(lifetime.valid_until())
),
}
}
}
impl fmt::Display for DirBootstrapStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.0 {
StatusEnum::NoActivity => write!(f, "not downloading")?,
StatusEnum::Single { current } => write!(f, "directory is {}", current.status)?,
StatusEnum::Replacing { current, next } => write!(
f,
"directory is {}; next directory is {}",
current.status, next.status
)?,
}
Ok(())
}
}
impl DirBootstrapStatus {
fn current(&self) -> Option<&DirStatus> {
match &self.0 {
StatusEnum::NoActivity => None,
StatusEnum::Single { current } => Some(¤t.status),
StatusEnum::Replacing { current, .. } => Some(¤t.status),
}
}
fn next(&self) -> Option<&DirStatus> {
match &self.0 {
StatusEnum::Replacing { next, .. } => Some(&next.status),
_ => None,
}
}
#[allow(clippy::implied_bounds_in_impls)]
fn statuses(&self) -> impl Iterator<Item = &DirStatus> + DoubleEndedIterator {
chain!(self.current(), self.next(),)
}
#[allow(clippy::implied_bounds_in_impls)]
fn entries_mut(&mut self) -> impl Iterator<Item = &mut StatusEntry> + DoubleEndedIterator {
let (current, next) = match &mut self.0 {
StatusEnum::NoActivity => (None, None),
StatusEnum::Single { current } => (Some(current), None),
StatusEnum::Replacing { current, next } => (Some(current), Some(next)),
};
chain!(current, next,)
}
pub fn frac_at(&self, when: SystemTime) -> f32 {
self.statuses()
.filter_map(|st| st.frac_at(when))
.next()
.unwrap_or(0.0)
}
pub fn usable_at(&self, now: SystemTime) -> bool {
if let Some(current) = self.current() {
current.progress.usable() && current.okay_to_use_at(now)
} else {
false
}
}
pub fn blockage(&self, now: SystemTime) -> Option<DirBlockage> {
if let Some(current) = self.current() {
if current.progress.usable() && current.declared_live_at(now) {
return None;
}
}
self.statuses().filter_map(|st| st.blockage()).next()
}
#[allow(clippy::search_is_some)] fn mut_status_for(&mut self, attempt_id: AttemptId) -> Option<&mut DirStatus> {
if self
.entries_mut()
.rev()
.take(1)
.find(|entry| entry.id >= attempt_id)
.is_none()
{
let current = match std::mem::take(&mut self.0) {
StatusEnum::NoActivity => None,
StatusEnum::Single { current } => Some(current),
StatusEnum::Replacing { current, .. } => Some(current),
};
let next = StatusEntry::new(attempt_id);
self.0 = match current {
None => StatusEnum::Single { current: next },
Some(current) => StatusEnum::Replacing { current, next },
};
}
self.entries_mut()
.find(|entry| entry.id == attempt_id)
.map(|entry| &mut entry.status)
}
fn advance_status(&mut self) {
self.0 = match std::mem::take(&mut self.0) {
StatusEnum::Replacing { next, .. } if next.status.progress.usable() => {
StatusEnum::Single { current: next }
}
other => other,
};
}
pub(crate) fn update_progress(&mut self, attempt_id: AttemptId, new_progress: DirProgress) {
if let Some(status) = self.mut_status_for(attempt_id) {
let old_frac = status.frac();
status.progress = new_progress;
let new_frac = status.frac();
if new_frac > old_frac {
status.n_errors = 0;
status.n_stalls = 0;
} else {
status.n_stalls += 1;
}
self.advance_status();
}
}
pub(crate) fn note_errors(&mut self, attempt_id: AttemptId, n_errors: usize) {
if let Some(status) = self.mut_status_for(attempt_id) {
status.n_errors += n_errors;
}
}
pub(crate) fn note_reset(&mut self, attempt_id: AttemptId) {
if let Some(status) = self.mut_status_for(attempt_id) {
status.n_resets += 1;
}
}
}
impl StatusEntry {
fn new(id: AttemptId) -> Self {
Self {
id,
status: DirStatus::default(),
}
}
}
impl DirStatus {
fn declared_lifetime(&self) -> Option<&netstatus::Lifetime> {
match &self.progress {
DirProgress::NoConsensus { .. } => None,
DirProgress::FetchingCerts { lifetime, .. } => Some(lifetime),
DirProgress::Validated { lifetime, .. } => Some(lifetime),
}
}
fn usable_lifetime(&self) -> Option<&netstatus::Lifetime> {
match &self.progress {
DirProgress::NoConsensus { .. } => None,
DirProgress::FetchingCerts {
usable_lifetime, ..
} => Some(usable_lifetime),
DirProgress::Validated {
usable_lifetime, ..
} => Some(usable_lifetime),
}
}
fn okay_to_use_at(&self, when: SystemTime) -> bool {
self.usable_lifetime()
.map(|lt| lt.valid_at(when))
.unwrap_or(false)
}
fn declared_live_at(&self, when: SystemTime) -> bool {
self.declared_lifetime()
.map(|lt| lt.valid_at(when))
.unwrap_or(false)
}
fn frac_at(&self, when: SystemTime) -> Option<f32> {
if self
.declared_lifetime()
.map(|lt| lt.valid_at(when))
.unwrap_or(false)
{
Some(self.frac())
} else if self.okay_to_use_at(when) {
Some(self.frac() * 0.9)
} else {
None
}
}
fn frac(&self) -> f32 {
match &self.progress {
DirProgress::NoConsensus { .. } => 0.0,
DirProgress::FetchingCerts { n_certs, .. } => {
0.25 + f32::from(n_certs.0) / f32::from(n_certs.1) * 0.10
}
DirProgress::Validated {
usable: false,
n_mds,
..
} => 0.35 + (n_mds.0 as f32) / (n_mds.1 as f32) * 0.65,
DirProgress::Validated { usable: true, .. } => 1.0,
}
}
fn blockage(&self) -> Option<DirBlockage> {
const RESET_THRESHOLD: usize = 2;
const ERROR_THRESHOLD: usize = 6;
const STALL_THRESHOLD: usize = 8;
if self.n_resets >= RESET_THRESHOLD {
Some(DirBlockage::TooManyResets)
} else if self.n_errors >= ERROR_THRESHOLD {
Some(DirBlockage::TooManyErrors)
} else if self.n_stalls >= STALL_THRESHOLD {
Some(DirBlockage::Stalled)
} else {
None
}
}
}
impl DirProgress {
fn usable(&self) -> bool {
matches!(self, DirProgress::Validated { usable: true, .. })
}
}
#[derive(Clone, Educe)]
#[educe(Debug)]
pub struct DirBootstrapEvents {
#[educe(Debug(method = "skip_fmt"))]
pub(crate) inner: postage::watch::Receiver<DirBootstrapStatus>,
}
impl Stream for DirBootstrapEvents {
type Item = DirBootstrapStatus;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
self.inner.poll_next_unpin(cx)
}
}
#[cfg(test)]
mod test {
#![allow(clippy::bool_assert_comparison)]
#![allow(clippy::clone_on_copy)]
#![allow(clippy::dbg_macro)]
#![allow(clippy::mixed_attributes_style)]
#![allow(clippy::print_stderr)]
#![allow(clippy::print_stdout)]
#![allow(clippy::single_char_pattern)]
#![allow(clippy::unwrap_used)]
#![allow(clippy::unchecked_time_subtraction)]
#![allow(clippy::useless_vec)]
#![allow(clippy::needless_pass_by_value)]
use std::time::Duration;
use super::*;
use float_eq::assert_float_eq;
use futures::stream::StreamExt;
use tor_rtcompat::test_with_all_runtimes;
use web_time_compat::SystemTimeExt;
#[test]
fn subscribe_and_publish() {
test_with_all_runtimes!(|_rt| async {
let publish: FlagPublisher<DirEvent> = FlagPublisher::new();
let mut sub1 = publish.subscribe();
publish.publish(DirEvent::NewConsensus);
let mut sub2 = publish.subscribe();
let ev = event_listener::Event::new();
let lis = ev.listen();
futures::join!(
async {
let val1 = sub1.next().await;
assert_eq!(val1, Some(DirEvent::NewConsensus));
ev.notify(1); let val2 = sub1.next().await;
assert_eq!(val2, None);
},
async {
let val = sub2.next().await;
assert_eq!(val, None);
},
async {
lis.await;
drop(publish);
}
);
});
}
#[test]
fn receive_two() {
test_with_all_runtimes!(|_rt| async {
let publish: FlagPublisher<DirEvent> = FlagPublisher::new();
let mut sub = publish.subscribe();
let ev = event_listener::Event::new();
let ev_lis = ev.listen();
futures::join!(
async {
let val1 = sub.next().await;
assert_eq!(val1, Some(DirEvent::NewDescriptors));
ev.notify(1);
let val2 = sub.next().await;
assert_eq!(val2, Some(DirEvent::NewConsensus));
},
async {
publish.publish(DirEvent::NewDescriptors);
ev_lis.await;
publish.publish(DirEvent::NewConsensus);
}
);
});
}
#[test]
fn two_publishers() {
test_with_all_runtimes!(|_rt| async {
let publish1: FlagPublisher<DirEvent> = FlagPublisher::new();
let publish2 = publish1.clone();
let mut sub = publish1.subscribe();
let ev1 = event_listener::Event::new();
let ev2 = event_listener::Event::new();
let ev1_lis = ev1.listen();
let ev2_lis = ev2.listen();
futures::join!(
async {
let mut count = [0_usize; 2];
ev1_lis.await;
ev2_lis.await;
while let Some(e) = sub.next().await {
count[e.to_index() as usize] += 1;
}
assert!(count[0] > 0);
assert!(count[1] > 0);
assert!(count[0] <= 100);
assert!(count[1] <= 100);
},
async {
for _ in 0..100 {
publish1.publish(DirEvent::NewDescriptors);
ev1.notify(1);
tor_rtcompat::task::yield_now().await;
}
drop(publish1);
},
async {
for _ in 0..100 {
publish2.publish(DirEvent::NewConsensus);
ev2.notify(1);
tor_rtcompat::task::yield_now().await;
}
drop(publish2);
}
);
});
}
#[test]
fn receive_after_publishers_are_gone() {
test_with_all_runtimes!(|_rt| async {
let publish: FlagPublisher<DirEvent> = FlagPublisher::new();
let mut sub = publish.subscribe();
publish.publish(DirEvent::NewConsensus);
drop(publish);
let v = sub.next().await;
assert_eq!(v, Some(DirEvent::NewConsensus));
let v = sub.next().await;
assert!(v.is_none());
});
}
#[test]
fn failed_conversion() {
assert_eq!(DirEvent::from_index(999), None);
}
#[test]
fn dir_status_basics() {
let now = SystemTime::get();
let hour = Duration::new(3600, 0);
let nothing = DirStatus {
progress: DirProgress::NoConsensus { after: None },
..Default::default()
};
let lifetime = netstatus::Lifetime::new(now, now + hour, now + hour * 2).unwrap();
let unval = DirStatus {
progress: DirProgress::FetchingCerts {
lifetime: lifetime.clone(),
usable_lifetime: lifetime,
n_certs: (3, 5),
},
..Default::default()
};
let lifetime =
netstatus::Lifetime::new(now + hour, now + hour * 2, now + hour * 3).unwrap();
let with_c = DirStatus {
progress: DirProgress::Validated {
lifetime: lifetime.clone(),
usable_lifetime: lifetime,
n_mds: (30, 40),
usable: false,
},
..Default::default()
};
assert!(nothing.usable_lifetime().is_none());
assert_eq!(unval.usable_lifetime().unwrap().valid_after(), now);
assert_eq!(
with_c.usable_lifetime().unwrap().valid_until(),
now + hour * 3
);
const TOL: f32 = 0.00001;
assert_float_eq!(nothing.frac(), 0.0, abs <= TOL);
assert_float_eq!(unval.frac(), 0.25 + 0.06, abs <= TOL);
assert_float_eq!(with_c.frac(), 0.35 + 0.65 * 0.75, abs <= TOL);
let t1 = now + hour / 2;
let t2 = t1 + hour * 2;
assert!(nothing.frac_at(t1).is_none());
assert_float_eq!(unval.frac_at(t1).unwrap(), 0.25 + 0.06, abs <= TOL);
assert!(with_c.frac_at(t1).is_none());
assert!(nothing.frac_at(t2).is_none());
assert!(unval.frac_at(t2).is_none());
assert_float_eq!(with_c.frac_at(t2).unwrap(), 0.35 + 0.65 * 0.75, abs <= TOL);
}
#[test]
fn dir_status_display() {
use time::macros::datetime;
let t1: SystemTime = datetime!(2022-01-17 11:00:00 UTC).into();
let hour = Duration::new(3600, 0);
let lifetime = netstatus::Lifetime::new(t1, t1 + hour, t1 + hour * 3).unwrap();
let ds = DirStatus {
progress: DirProgress::NoConsensus { after: None },
..Default::default()
};
assert_eq!(ds.to_string(), "fetching a consensus");
let ds = DirStatus {
progress: DirProgress::FetchingCerts {
lifetime: lifetime.clone(),
usable_lifetime: lifetime.clone(),
n_certs: (3, 5),
},
..Default::default()
};
assert_eq!(ds.to_string(), "fetching authority certificates (3/5)");
let ds = DirStatus {
progress: DirProgress::Validated {
lifetime: lifetime.clone(),
usable_lifetime: lifetime.clone(),
n_mds: (30, 40),
usable: false,
},
..Default::default()
};
assert_eq!(ds.to_string(), "fetching microdescriptors (30/40)");
let ds = DirStatus {
progress: DirProgress::Validated {
lifetime: lifetime.clone(),
usable_lifetime: lifetime,
n_mds: (30, 40),
usable: true,
},
..Default::default()
};
assert_eq!(
ds.to_string(),
"usable, fresh until 2022-01-17 12:00:00 UTC, and valid until 2022-01-17 14:00:00 UTC"
);
}
#[test]
fn bootstrap_status() {
use time::macros::datetime;
let t1: SystemTime = datetime!(2022-01-17 11:00:00 UTC).into();
let hour = Duration::new(3600, 0);
let lifetime = netstatus::Lifetime::new(t1, t1 + hour, t1 + hour * 3).unwrap();
let lifetime2 = netstatus::Lifetime::new(t1 + hour, t1 + hour * 2, t1 + hour * 4).unwrap();
let dp1 = DirProgress::Validated {
lifetime: lifetime.clone(),
usable_lifetime: lifetime.clone(),
n_mds: (3, 40),
usable: true,
};
let dp2 = DirProgress::Validated {
lifetime: lifetime2.clone(),
usable_lifetime: lifetime2.clone(),
n_mds: (5, 40),
usable: false,
};
let attempt1 = AttemptId::next();
let attempt2 = AttemptId::next();
let bs = DirBootstrapStatus(StatusEnum::Replacing {
current: StatusEntry {
id: attempt1,
status: DirStatus {
progress: dp1.clone(),
..Default::default()
},
},
next: StatusEntry {
id: attempt2,
status: DirStatus {
progress: dp2.clone(),
..Default::default()
},
},
});
assert_eq!(
bs.to_string(),
"directory is usable, fresh until 2022-01-17 12:00:00 UTC, and valid until 2022-01-17 14:00:00 UTC; next directory is fetching microdescriptors (5/40)"
);
const TOL: f32 = 0.00001;
assert_float_eq!(bs.frac_at(t1 + hour / 2), 1.0, abs <= TOL);
assert_float_eq!(
bs.frac_at(t1 + hour * 3 + hour / 2),
0.35 + 0.65 * 0.125,
abs <= TOL
);
let mut bs = bs;
let dp3 = DirProgress::Validated {
lifetime: lifetime2.clone(),
usable_lifetime: lifetime2.clone(),
n_mds: (10, 40),
usable: false,
};
bs.update_progress(attempt2, dp3);
assert!(matches!(
bs.next().unwrap(),
DirStatus {
progress: DirProgress::Validated {
n_mds: (10, 40),
..
},
..
}
));
let ds4 = DirStatus {
progress: DirProgress::Validated {
lifetime: lifetime2.clone(),
usable_lifetime: lifetime2.clone(),
n_mds: (20, 40),
usable: true,
},
..Default::default()
};
bs.update_progress(attempt2, ds4.progress);
assert!(bs.next().is_none());
assert_eq!(
bs.current()
.unwrap()
.usable_lifetime()
.unwrap()
.valid_after(),
lifetime2.valid_after()
);
bs.update_progress(attempt1, dp1);
assert!(bs.next().as_ref().is_none());
assert_ne!(
bs.current()
.unwrap()
.usable_lifetime()
.unwrap()
.valid_after(),
lifetime.valid_after()
);
let mut bs = DirBootstrapStatus::default();
assert!(!dp2.usable());
assert!(bs.current().is_none());
bs.update_progress(attempt2, dp2);
assert!(bs.current().unwrap().usable_lifetime().is_some());
}
}