tor_dirmgr/event.rs
1//! Code for notifying other modules about changes in the directory.
2
3// TODO(nickm): After we have enough experience with this FlagPublisher, we
4// might want to make it a public interface. If we do it should probably move
5// into another crate.
6
7use std::{
8 fmt,
9 marker::PhantomData,
10 pin::Pin,
11 sync::{
12 Arc,
13 atomic::{AtomicUsize, Ordering},
14 },
15 task::Poll,
16 time::SystemTime,
17};
18
19use educe::Educe;
20use futures::{Future, StreamExt, stream::Stream};
21use itertools::chain;
22use paste::paste;
23use time::OffsetDateTime;
24use tor_basic_utils::skip_fmt;
25use tor_netdir::DirEvent;
26use tor_netdoc::doc::netstatus;
27
28#[cfg(feature = "bridge-client")]
29use tor_guardmgr::bridge::BridgeDescEvent;
30
31use crate::bootstrap::AttemptId;
32
33/// A trait to indicate something that can be published with [`FlagPublisher`].
34///
35/// Since the implementation of `FlagPublisher` requires that its events be
36/// represented as small integers, this trait is mainly about converting to and
37/// from those integers.
38pub(crate) trait FlagEvent: Sized {
39 /// The maximum allowed integer value that [`FlagEvent::to_index()`] can return
40 /// for this type.
41 ///
42 /// This is limited to u16 because the [`FlagPublisher`] uses a vector of all
43 /// known flags, and sometimes iterates over the whole vector.
44 const MAXIMUM: u16;
45 /// Convert this event into an index.
46 ///
47 /// For efficiency, indices should be small and densely packed.
48 fn to_index(self) -> u16;
49 /// Try to reconstruct an event from its index. Return None if the index is
50 /// out-of-bounds.
51 fn from_index(flag: u16) -> Option<Self>;
52}
53
54/// Implements [`FlagEvent`] for a C-like enum
55///
56/// Requiremets:
57///
58/// * `$ty` must implement [`strum::EnumCount`] [`strum::IntoEnumIterator`]
59///
60/// * `$ty` type must implement [`Into<u16>`] and [`TryFrom<u16>`]
61/// (for example using the `num_enum` crate).
62///
63/// * The discriminants must be densely allocated.
64/// This will be done automatically by the compiler
65/// if explicit discriminants are not specified.
66/// (This property is checked in a test.)
67///
68/// * The variants may not contain any data.
69/// This is required for correctness.
70/// We think it is checked if you use `num_enum::TryFromPrimitive`.
71///
72/// # Example
73///
74// Sadly, it does not appear to be possible to doctest a private macro.
75/// ```rust,ignore
76/// use num_enum::{IntoPrimitive, TryFromPrimitive};
77/// use strum::{EnumCount, EnumIter};
78///
79/// #[derive(Debug, Clone, Copy, PartialEq, Eq)]
80/// #[derive(EnumIter, EnumCount, IntoPrimitive, TryFromPrimitive)]
81/// #[non_exhaustive]
82/// #[repr(u16)]
83/// pub enum DirEvent {
84/// NewConsensus,
85/// NewDescriptors,
86/// }
87///
88/// impl_FlagEvent!{ DirEvent }
89/// ```
90macro_rules! impl_FlagEvent { { $ty:ident } => { paste!{
91 impl FlagEvent for $ty {
92 const MAXIMUM: u16 = {
93 let count = <$ty as $crate::strum::EnumCount>::COUNT;
94 (count - 1) as u16
95 };
96 fn to_index(self) -> u16 {
97 self.into()
98 }
99 fn from_index(flag: u16) -> Option<Self> {
100 flag.try_into().ok()
101 }
102 }
103
104 #[test]
105 #[allow(non_snake_case)]
106 fn [< flagevent_test_variant_numbers_ $ty >]() {
107 for variant in <$ty as $crate::strum::IntoEnumIterator>::iter() {
108 assert!(<$ty as FlagEvent>::to_index(variant) <=
109 <$ty as FlagEvent>::MAXIMUM,
110 "impl_FlagEvent only allowed if discriminators are dense");
111 }
112 }
113} } }
114
115impl_FlagEvent! { DirEvent }
116
117#[cfg(feature = "bridge-client")]
118impl_FlagEvent! { BridgeDescEvent }
119
120/// A publisher that broadcasts flag-level events to multiple subscribers.
121///
122/// Events with the same flag value may be coalesced: that is, if the same event
123/// is published ten times in a row, a subscriber may receive only a single
124/// notification of the event.
125///
126/// FlagPublisher supports an MPMC model: cloning a Publisher creates a new handle
127/// that can also broadcast events to everybody listening on the channel.
128/// Dropping the last handle closes all streams subscribed to it.
129pub(crate) struct FlagPublisher<F> {
130 /// Inner data shared by publishers and streams.
131 inner: Arc<Inner<F>>,
132}
133
134/// Shared structure to implement [`FlagPublisher`] and [`FlagListener`].
135struct Inner<F> {
136 /// An event that we use to broadcast whenever a new [`FlagEvent`] event has occurred.
137 event: event_listener::Event,
138 /// How many times has each event occurred, ever.
139 ///
140 /// (It is safe for this to wrap around.)
141 // TODO(nickm): I wish this could be an array, but const generics don't
142 // quite support that yet.
143 counts: Vec<AtomicUsize>, // I wish this could be an array.
144 /// How many publishers remain?
145 n_publishers: AtomicUsize,
146 /// Phantom member to provide correct covariance.
147 ///
148 /// The `fn` business is a covariance trick to include `F` without affecting
149 /// this object's Send/Sync status.
150 _phantom: PhantomData<fn(F) -> F>,
151}
152
153/// A [`Stream`] that returns a series of event [`FlagEvent`]s broadcast by a
154/// [`FlagPublisher`].
155pub(crate) struct FlagListener<F> {
156 /// What value of each flag's count have we seen most recently?
157 ///
158 /// Note that we count the event as "received" only once for each observed
159 /// change in the flag's count, even if that count has changed by more than
160 /// 1.
161 my_counts: Vec<usize>,
162 /// An `EventListener` that will be notified when events are published,
163 /// or when the final publisher is dropped.
164 ///
165 /// We must always have one of these available _before_ we check any counts
166 /// in self.inner.
167 listener: event_listener::EventListener,
168 /// Reference to shared data.
169 inner: Arc<Inner<F>>,
170}
171
172impl<F: FlagEvent> Default for FlagPublisher<F> {
173 fn default() -> Self {
174 Self::new()
175 }
176}
177
178impl<F: FlagEvent> FlagPublisher<F> {
179 /// Construct a new FlagPublisher.
180 pub(crate) fn new() -> Self {
181 // We can't use vec![AtomicUsize::new(0); F::MAXIMUM+1]: that would
182 // require AtomicUsize to be Clone.
183 let counts = std::iter::repeat_with(AtomicUsize::default)
184 .take(F::MAXIMUM as usize + 1)
185 .collect();
186 FlagPublisher {
187 inner: Arc::new(Inner {
188 event: event_listener::Event::new(),
189 counts,
190 n_publishers: AtomicUsize::new(1),
191 _phantom: PhantomData,
192 }),
193 }
194 }
195
196 /// Create a new subscription to this FlagPublisher.
197 pub(crate) fn subscribe(&self) -> FlagListener<F> {
198 // We need to do this event.listen before we check the counts; otherwise
199 // we could have a sequence where: we check the count, then the
200 // publisher increments the count, then the publisher calls
201 // event.notify(), and we call event.listen(). That would cause us to
202 // miss the increment.
203 let listener = self.inner.event.listen();
204
205 FlagListener {
206 my_counts: self
207 .inner
208 .counts
209 .iter()
210 .map(|a| a.load(Ordering::SeqCst))
211 .collect(),
212 listener,
213 inner: Arc::clone(&self.inner),
214 }
215 }
216
217 /// Tell every listener that the provided flag has been published.
218 pub(crate) fn publish(&self, flag: F) {
219 self.inner.counts[flag.to_index() as usize].fetch_add(1, Ordering::SeqCst);
220 self.inner.event.notify(usize::MAX);
221 }
222}
223
224impl<F> Clone for FlagPublisher<F> {
225 fn clone(&self) -> FlagPublisher<F> {
226 self.inner.n_publishers.fetch_add(1, Ordering::SeqCst);
227 FlagPublisher {
228 inner: Arc::clone(&self.inner),
229 }
230 }
231}
232
233// We must implement Drop to keep count publishers, and so that when the last
234// publisher goes away, we can wake up every listener so that it notices that
235// the stream is now ended.
236impl<F> Drop for FlagPublisher<F> {
237 fn drop(&mut self) {
238 if self.inner.n_publishers.fetch_sub(1, Ordering::SeqCst) == 1 {
239 // That was the last reference; we must notify the listeners.
240 self.inner.event.notify(usize::MAX);
241 }
242 }
243}
244
245impl<F: FlagEvent> Stream for FlagListener<F> {
246 type Item = F;
247
248 fn poll_next(
249 mut self: std::pin::Pin<&mut Self>,
250 cx: &mut std::task::Context<'_>,
251 ) -> std::task::Poll<Option<Self::Item>> {
252 loop {
253 // Notify the caller if any events are ready to fire.
254 for idx in 0..F::MAXIMUM as usize + 1 {
255 let cur = self.inner.counts[idx].load(Ordering::SeqCst);
256 // We don't have to use < here specifically, since any change
257 // indicates that the count has been modified. That lets us
258 // survive usize wraparound.
259 if cur != self.my_counts[idx] {
260 self.my_counts[idx] = cur;
261 return Poll::Ready(Some(F::from_index(idx as u16).expect("Internal error")));
262 }
263 }
264
265 // At this point, notify the caller if there are no more publishers.
266 if self.inner.n_publishers.load(Ordering::SeqCst) == 0 {
267 return Poll::Ready(None);
268 }
269
270 if let Poll::Ready(()) = Pin::new(&mut self.listener).poll(cx) {
271 // Got a new notification; we must create a new event and continue the loop.
272 //
273 // See discussion in `FlagPublisher::subscribe()` for why we must always create
274 // this listener _before_ checking any flags.
275 self.listener = self.inner.event.listen();
276 } else {
277 // Nothing to do yet: put the listener back.
278 return Poll::Pending;
279 }
280 }
281 }
282}
283
284/// Description of the directory manager's current bootstrapping status.
285///
286/// This status does not necessarily increase monotonically: it can go backwards
287/// if (for example) our directory information expires before we're able to get
288/// new information.
289//
290// TODO(nickm): This type has gotten a bit large for being the type we send over
291// a `postage::watch`: perhaps we'd be better off having this information stored
292// in the guardmgr, and having only a summary of it sent over the
293// `postage::watch`. But for now, let's not, unless it shows up in profiles.
294#[derive(Clone, Debug, Default)]
295pub struct DirBootstrapStatus(StatusEnum);
296
297/// The contents of a DirBootstrapStatus.
298///
299/// This is a separate type since we don't want to make these variables public.
300#[derive(Clone, Debug, Default)]
301enum StatusEnum {
302 /// There is no active attempt to load or fetch a directory.
303 #[default]
304 NoActivity,
305 /// We have only one attempt to fetch a directory.
306 Single {
307 /// The currently active directory attempt.
308 ///
309 /// We're either using this directory now, or we plan to use it as soon
310 /// as it's complete enough.
311 current: StatusEntry,
312 },
313 /// We have an existing directory attempt, but it's stale, and we're
314 /// fetching a new one to replace it.
315 ///
316 /// Invariant: `current.id < next.id`
317 Replacing {
318 /// The previous attempt's status. It may still be trying to fetch
319 /// information if it has descriptors left to download.
320 current: StatusEntry,
321 /// The current attempt's status. We are not yet using this directory
322 /// for our activity, since it does not (yet) have enough information.
323 next: StatusEntry,
324 },
325}
326
327/// The status and identifier of a single attempt to download a full directory.
328#[derive(Clone, Debug)]
329struct StatusEntry {
330 /// The identifier for this attempt.
331 id: AttemptId,
332 /// The latest status.
333 status: DirStatus,
334}
335
336/// The status for a single directory.
337#[derive(Clone, Debug, Default, derive_more::Display)]
338#[display("{0}", progress)]
339pub(crate) struct DirStatus {
340 /// How much of the directory do we currently have?
341 progress: DirProgress,
342 /// How many resets have been forced while fetching this directory?
343 n_resets: usize,
344 /// How many errors have we encountered since last we advanced the
345 /// 'progress' on this directory?
346 n_errors: usize,
347 /// How many times has an `update_progress` call not actually moved us
348 /// forward since we last advanced the 'progress' on this directory?
349 n_stalls: usize,
350}
351
352/// How much progress have we made in downloading a given directory?
353///
354/// This is a separate type so that we don't make the variants public.
355#[derive(Clone, Debug, Educe)]
356#[educe(Default)]
357pub(crate) enum DirProgress {
358 /// We don't have any information yet.
359 #[educe(Default)]
360 NoConsensus {
361 /// If present, we are fetching a consensus whose valid-after time
362 /// postdates this time.
363 #[allow(dead_code)]
364 after: Option<SystemTime>,
365 },
366 /// We've downloaded a consensus, but we haven't validated it yet.
367 FetchingCerts {
368 /// The actual declared lifetime of the consensus.
369 lifetime: netstatus::Lifetime,
370 /// The lifetime for which we are willing to use this consensus. (This
371 /// may be broader than `lifetime`.)
372 usable_lifetime: netstatus::Lifetime,
373 /// A fraction (in (numerator,denominator) format) of the certificates
374 /// we have for this consensus.
375 n_certs: (u16, u16),
376 },
377 /// We've validated a consensus and we're fetching (or have fetched) its
378 /// microdescriptors.
379 Validated {
380 /// The actual declared lifetime of the consensus.
381 lifetime: netstatus::Lifetime,
382 /// The lifetime for which we are willing to use this consensus. (This
383 /// may be broader than `lifetime`.)
384 usable_lifetime: netstatus::Lifetime,
385 /// A fraction (in (numerator,denominator) form) of the microdescriptors
386 /// that we have for this consensus.
387 n_mds: (u32, u32),
388 /// True iff we've decided that the consensus is usable.
389 usable: bool,
390 // TODO(nickm) Someday we could add a field about whether any primary
391 // guards are missing microdescriptors, to give a better explanation for
392 // the case where we won't switch our consensus because of that.
393 },
394}
395
396/// A reported diagnostic for what kind of trouble we've seen while trying to
397/// bootstrap a directory.
398///
399/// These blockages types are not yet terribly specific: if you encounter one,
400/// it's probably a good idea to check the logs to see what's really going on.
401///
402/// If you encounter connection blockage _and_ directory blockage at the same
403/// time, the connection blockage is almost certainly the real problem.
404//
405// TODO(nickm): At present these diagnostics aren't very helpful; they say too
406// much about _how we know_ that the process has gone wrong, but not so much
407// about _what the problem is_. In the future, we may wish to look more closely
408// at what _kind_ of errors or resets we've seen, so we can report better
409// information. Probably, however, we should only do that after we get some
410// experience with which problems people encounter in practice, and what
411// diagnostics would be useful for them.
412#[derive(Clone, Debug, derive_more::Display)]
413#[non_exhaustive]
414pub enum DirBlockage {
415 /// We've been downloading information without error, but we haven't
416 /// actually been getting anything that we want.
417 ///
418 /// This might indicate that there's a problem with information propagating
419 /// through the Tor network, or it might indicate that a bogus consensus or
420 /// a bad clock has tricked us into asking for something that nobody has.
421 #[display("Can't make progress.")]
422 Stalled,
423 /// We've gotten a lot of errors without making forward progress on our
424 /// bootstrap attempt.
425 ///
426 /// This might indicate that something's wrong with the Tor network, or that
427 /// there's something buggy with our ability to handle directory responses.
428 /// It might also indicate a malfunction on our directory guards, or a bug
429 /// on our retry logic.
430 #[display("Too many errors without making progress.")]
431 TooManyErrors,
432 /// We've reset our bootstrap attempt a lot of times.
433 ///
434 /// This either indicates that we have been failing a lot for one of the
435 /// other reasons above, or that we keep getting served a consensus which
436 /// turns out, upon trying to fetch certificates, not to be usable. It can
437 /// also indicate a bug in our retry logic.
438 #[display("Had to reset bootstrapping too many times.")]
439 TooManyResets,
440}
441
442impl fmt::Display for DirProgress {
443 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
444 /// Format this time in a format useful for displaying
445 /// lifetime boundaries.
446 fn fmt_time(t: SystemTime) -> String {
447 use std::sync::LazyLock;
448 /// Formatter object for lifetime boundaries.
449 ///
450 /// We use "YYYY-MM-DD HH:MM:SS UTC" here, since we never have
451 /// sub-second times here, and using non-UTC offsets is confusing
452 /// in this context.
453 static FORMAT: LazyLock<Vec<time::format_description::FormatItem>> =
454 LazyLock::new(|| {
455 time::format_description::parse(
456 "[year]-[month]-[day] [hour]:[minute]:[second] UTC",
457 )
458 .expect("Invalid time format")
459 });
460 OffsetDateTime::from(t)
461 .format(&FORMAT)
462 .unwrap_or_else(|_| "(could not format)".into())
463 }
464
465 match &self {
466 DirProgress::NoConsensus { .. } => write!(f, "fetching a consensus"),
467 DirProgress::FetchingCerts { n_certs, .. } => write!(
468 f,
469 "fetching authority certificates ({}/{})",
470 n_certs.0, n_certs.1
471 ),
472 DirProgress::Validated {
473 usable: false,
474 n_mds,
475 ..
476 } => write!(f, "fetching microdescriptors ({}/{})", n_mds.0, n_mds.1),
477 DirProgress::Validated {
478 usable: true,
479 lifetime,
480 ..
481 } => write!(
482 f,
483 "usable, fresh until {}, and valid until {}",
484 fmt_time(lifetime.fresh_until()),
485 fmt_time(lifetime.valid_until())
486 ),
487 }
488 }
489}
490
491impl fmt::Display for DirBootstrapStatus {
492 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
493 match &self.0 {
494 StatusEnum::NoActivity => write!(f, "not downloading")?,
495 StatusEnum::Single { current } => write!(f, "directory is {}", current.status)?,
496 StatusEnum::Replacing { current, next } => write!(
497 f,
498 "directory is {}; next directory is {}",
499 current.status, next.status
500 )?,
501 }
502 Ok(())
503 }
504}
505
506impl DirBootstrapStatus {
507 /// Return the current DirStatus.
508 ///
509 /// This is the _most complete_ status. If we have any usable status, it is
510 /// this one.
511 fn current(&self) -> Option<&DirStatus> {
512 match &self.0 {
513 StatusEnum::NoActivity => None,
514 StatusEnum::Single { current } => Some(¤t.status),
515 StatusEnum::Replacing { current, .. } => Some(¤t.status),
516 }
517 }
518
519 /// Return the next DirStatus, if there is one.
520 fn next(&self) -> Option<&DirStatus> {
521 match &self.0 {
522 StatusEnum::Replacing { next, .. } => Some(&next.status),
523 _ => None,
524 }
525 }
526
527 /// Return the contained `DirStatus`es, in order: `current`, then `next`
528 #[allow(clippy::implied_bounds_in_impls)]
529 fn statuses(&self) -> impl Iterator<Item = &DirStatus> + DoubleEndedIterator {
530 chain!(self.current(), self.next(),)
531 }
532
533 /// Return the contained `StatusEntry`s mutably, in order: `current`, then `next`
534 #[allow(clippy::implied_bounds_in_impls)]
535 fn entries_mut(&mut self) -> impl Iterator<Item = &mut StatusEntry> + DoubleEndedIterator {
536 let (current, next) = match &mut self.0 {
537 StatusEnum::NoActivity => (None, None),
538 StatusEnum::Single { current } => (Some(current), None),
539 StatusEnum::Replacing { current, next } => (Some(current), Some(next)),
540 };
541 chain!(current, next,)
542 }
543
544 /// Return the fraction of completion for directory download, in a form
545 /// suitable for a progress bar at some particular time.
546 ///
547 /// This value is not monotonic, and can go down as one directory is
548 /// replaced with another.
549 ///
550 /// Callers _should not_ depend on the specific meaning of any particular
551 /// fraction; we may change these fractions in the future.
552 pub fn frac_at(&self, when: SystemTime) -> f32 {
553 self.statuses()
554 .filter_map(|st| st.frac_at(when))
555 .next()
556 .unwrap_or(0.0)
557 }
558
559 /// Return true if this status indicates that we have a current usable
560 /// directory.
561 pub fn usable_at(&self, now: SystemTime) -> bool {
562 if let Some(current) = self.current() {
563 current.progress.usable() && current.okay_to_use_at(now)
564 } else {
565 false
566 }
567 }
568
569 /// If there is a problem with our attempts to bootstrap, return a
570 /// corresponding DirBlockage.
571 pub fn blockage(&self, now: SystemTime) -> Option<DirBlockage> {
572 if let Some(current) = self.current() {
573 if current.progress.usable() && current.declared_live_at(now) {
574 // The current directory is sufficient, and not even a little bit
575 // expired. There is no problem.
576 return None;
577 }
578 }
579
580 // Any blockage in "current" is more serious, so return that if there is one
581 self.statuses().filter_map(|st| st.blockage()).next()
582 }
583
584 /// Return the appropriate DirStatus for `AttemptId`, constructing it if
585 /// necessary.
586 ///
587 /// Return None if all relevant attempts are more recent than this Id.
588 #[allow(clippy::search_is_some)] // tpo/core/arti/-/merge_requests/599#note_2816368
589 fn mut_status_for(&mut self, attempt_id: AttemptId) -> Option<&mut DirStatus> {
590 // First, ensure that we have a *recent enough* attempt
591 // Look for the latest attempt, and see if it's new enough; if not, start a new one.
592 if self
593 .entries_mut()
594 .rev()
595 .take(1)
596 .find(|entry| entry.id >= attempt_id)
597 .is_none()
598 {
599 let current = match std::mem::take(&mut self.0) {
600 StatusEnum::NoActivity => None,
601 StatusEnum::Single { current } => Some(current),
602 StatusEnum::Replacing { current, .. } => Some(current),
603 };
604 // If we have a `current` already, we keep it, and restart `next`.
605 let next = StatusEntry::new(attempt_id);
606 self.0 = match current {
607 None => StatusEnum::Single { current: next },
608 Some(current) => StatusEnum::Replacing { current, next },
609 };
610 }
611
612 // Find the entry with `attempt_id` and return it.
613 // (Despite the above, there might not be one: maybe `attempt_id` is old.)
614 self.entries_mut()
615 .find(|entry| entry.id == attempt_id)
616 .map(|entry| &mut entry.status)
617 }
618
619 /// If the "next" status is usable, replace the current status with it.
620 fn advance_status(&mut self) {
621 // TODO: should make sure that the compiler is smart enough to optimize
622 // this mem::take() and replacement away, and turn it into a conditional
623 // replacement?
624 self.0 = match std::mem::take(&mut self.0) {
625 StatusEnum::Replacing { next, .. } if next.status.progress.usable() => {
626 StatusEnum::Single { current: next }
627 }
628 other => other,
629 };
630 }
631
632 /// Update this status by replacing the `DirProgress` in its current status
633 /// (or its next status) with `new_status`, as appropriate.
634 pub(crate) fn update_progress(&mut self, attempt_id: AttemptId, new_progress: DirProgress) {
635 if let Some(status) = self.mut_status_for(attempt_id) {
636 let old_frac = status.frac();
637 status.progress = new_progress;
638 let new_frac = status.frac();
639 if new_frac > old_frac {
640 // This download has made progress: clear our count of errors
641 // and stalls.
642 status.n_errors = 0;
643 status.n_stalls = 0;
644 } else {
645 // This download didn't make progress; increment the stall
646 // count.
647 status.n_stalls += 1;
648 }
649 self.advance_status();
650 }
651 }
652
653 /// Update this status by noting that some errors have occurred in a given
654 /// download attempt.
655 pub(crate) fn note_errors(&mut self, attempt_id: AttemptId, n_errors: usize) {
656 if let Some(status) = self.mut_status_for(attempt_id) {
657 status.n_errors += n_errors;
658 }
659 }
660
661 /// Update this status by noting that we had to reset a given download attempt;
662 pub(crate) fn note_reset(&mut self, attempt_id: AttemptId) {
663 if let Some(status) = self.mut_status_for(attempt_id) {
664 status.n_resets += 1;
665 }
666 }
667}
668
669impl StatusEntry {
670 /// Construct a new StatusEntry with a given attempt id, and no progress
671 /// reported.
672 fn new(id: AttemptId) -> Self {
673 Self {
674 id,
675 status: DirStatus::default(),
676 }
677 }
678}
679
680impl DirStatus {
681 /// Return the declared consensus lifetime for this directory, if we have one.
682 fn declared_lifetime(&self) -> Option<&netstatus::Lifetime> {
683 match &self.progress {
684 DirProgress::NoConsensus { .. } => None,
685 DirProgress::FetchingCerts { lifetime, .. } => Some(lifetime),
686 DirProgress::Validated { lifetime, .. } => Some(lifetime),
687 }
688 }
689
690 /// Return the consensus lifetime for this directory, if we have one, as
691 /// modified by our skew-tolerance settings.
692 fn usable_lifetime(&self) -> Option<&netstatus::Lifetime> {
693 match &self.progress {
694 DirProgress::NoConsensus { .. } => None,
695 DirProgress::FetchingCerts {
696 usable_lifetime, ..
697 } => Some(usable_lifetime),
698 DirProgress::Validated {
699 usable_lifetime, ..
700 } => Some(usable_lifetime),
701 }
702 }
703
704 /// Return true if the directory is valid at the given time, as modified by
705 /// our clock skew settings.
706 fn okay_to_use_at(&self, when: SystemTime) -> bool {
707 self.usable_lifetime()
708 .map(|lt| lt.valid_at(when))
709 .unwrap_or(false)
710 }
711
712 /// Return true if the directory is valid at the given time, _unmodified_ by our
713 /// clock skew settings.
714 fn declared_live_at(&self, when: SystemTime) -> bool {
715 self.declared_lifetime()
716 .map(|lt| lt.valid_at(when))
717 .unwrap_or(false)
718 }
719
720 /// As `frac`, but return None if this consensus is not valid at the given time,
721 /// and down-rate expired consensuses that we're still willing to use.
722 fn frac_at(&self, when: SystemTime) -> Option<f32> {
723 if self
724 .declared_lifetime()
725 .map(|lt| lt.valid_at(when))
726 .unwrap_or(false)
727 {
728 // We're officially okay to use this directory.
729 Some(self.frac())
730 } else if self.okay_to_use_at(when) {
731 // This directory is a little expired, but only a little.
732 Some(self.frac() * 0.9)
733 } else {
734 None
735 }
736 }
737
738 /// Return the fraction of completion for directory download, in a form
739 /// suitable for a progress bar.
740 ///
741 /// This is monotonically increasing for a single directory, but can go down
742 /// as one directory is replaced with another.
743 ///
744 /// Callers _should not_ depend on the specific meaning of any particular
745 /// fraction; we may change these fractions in the future.
746 fn frac(&self) -> f32 {
747 // We arbitrarily decide that 25% is downloading the consensus, 10% is
748 // downloading the certificates, and the remaining 65% is downloading
749 // the microdescriptors until we become usable. We may want to re-tune that in the future, but
750 // the documentation of this function should allow us to do so.
751 match &self.progress {
752 DirProgress::NoConsensus { .. } => 0.0,
753 DirProgress::FetchingCerts { n_certs, .. } => {
754 0.25 + f32::from(n_certs.0) / f32::from(n_certs.1) * 0.10
755 }
756 DirProgress::Validated {
757 usable: false,
758 n_mds,
759 ..
760 } => 0.35 + (n_mds.0 as f32) / (n_mds.1 as f32) * 0.65,
761 DirProgress::Validated { usable: true, .. } => 1.0,
762 }
763 }
764
765 /// If we think there is a problem with our bootstrapping process, return a
766 /// [`DirBlockage`] to describe it.
767 ///
768 /// The caller may want to also check `usable_at` to avoid reporting trouble
769 /// if the directory is currently usable.
770 fn blockage(&self) -> Option<DirBlockage> {
771 /// How many resets are sufficient for us to report a blockage?
772 const RESET_THRESHOLD: usize = 2;
773 /// How many errors are sufficient for us to report a blockage?
774 const ERROR_THRESHOLD: usize = 6;
775 /// How many no-progress download attempts are sufficient for us to
776 /// report a blockage?
777 const STALL_THRESHOLD: usize = 8;
778
779 if self.n_resets >= RESET_THRESHOLD {
780 Some(DirBlockage::TooManyResets)
781 } else if self.n_errors >= ERROR_THRESHOLD {
782 Some(DirBlockage::TooManyErrors)
783 } else if self.n_stalls >= STALL_THRESHOLD {
784 Some(DirBlockage::Stalled)
785 } else {
786 None
787 }
788 }
789}
790
791impl DirProgress {
792 /// Return true if this progress indicates a usable directory.
793 fn usable(&self) -> bool {
794 matches!(self, DirProgress::Validated { usable: true, .. })
795 }
796}
797
798/// A stream of [`DirBootstrapStatus`] events.
799#[derive(Clone, Educe)]
800#[educe(Debug)]
801pub struct DirBootstrapEvents {
802 /// The `postage::watch::Receiver` that we're wrapping.
803 ///
804 /// We wrap this type so that we don't expose its entire API, and so that we
805 /// can migrate to some other implementation in the future if we want.
806 #[educe(Debug(method = "skip_fmt"))]
807 pub(crate) inner: postage::watch::Receiver<DirBootstrapStatus>,
808}
809
810impl Stream for DirBootstrapEvents {
811 type Item = DirBootstrapStatus;
812
813 fn poll_next(
814 mut self: Pin<&mut Self>,
815 cx: &mut std::task::Context<'_>,
816 ) -> Poll<Option<Self::Item>> {
817 self.inner.poll_next_unpin(cx)
818 }
819}
820
821#[cfg(test)]
822mod test {
823 // @@ begin test lint list maintained by maint/add_warning @@
824 #![allow(clippy::bool_assert_comparison)]
825 #![allow(clippy::clone_on_copy)]
826 #![allow(clippy::dbg_macro)]
827 #![allow(clippy::mixed_attributes_style)]
828 #![allow(clippy::print_stderr)]
829 #![allow(clippy::print_stdout)]
830 #![allow(clippy::single_char_pattern)]
831 #![allow(clippy::unwrap_used)]
832 #![allow(clippy::unchecked_time_subtraction)]
833 #![allow(clippy::useless_vec)]
834 #![allow(clippy::needless_pass_by_value)]
835 #![allow(clippy::string_slice)] // See arti#2571
836 //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
837 use std::time::Duration;
838
839 use super::*;
840 use float_eq::assert_float_eq;
841 use futures::stream::StreamExt;
842 use tor_rtcompat::test_with_all_runtimes;
843 use web_time_compat::SystemTimeExt;
844
845 #[test]
846 fn subscribe_and_publish() {
847 test_with_all_runtimes!(|_rt| async {
848 let publish: FlagPublisher<DirEvent> = FlagPublisher::new();
849 let mut sub1 = publish.subscribe();
850 publish.publish(DirEvent::NewConsensus);
851 let mut sub2 = publish.subscribe();
852 let ev = event_listener::Event::new();
853 let lis = ev.listen();
854
855 futures::join!(
856 async {
857 // sub1 was created in time to see this event...
858 let val1 = sub1.next().await;
859 assert_eq!(val1, Some(DirEvent::NewConsensus));
860 ev.notify(1); // Tell the third task below to drop the publisher.
861 let val2 = sub1.next().await;
862 assert_eq!(val2, None);
863 },
864 async {
865 let val = sub2.next().await;
866 assert_eq!(val, None);
867 },
868 async {
869 lis.await;
870 drop(publish);
871 }
872 );
873 });
874 }
875
876 #[test]
877 fn receive_two() {
878 test_with_all_runtimes!(|_rt| async {
879 let publish: FlagPublisher<DirEvent> = FlagPublisher::new();
880
881 let mut sub = publish.subscribe();
882 let ev = event_listener::Event::new();
883 let ev_lis = ev.listen();
884 futures::join!(
885 async {
886 let val1 = sub.next().await;
887 assert_eq!(val1, Some(DirEvent::NewDescriptors));
888 ev.notify(1);
889 let val2 = sub.next().await;
890 assert_eq!(val2, Some(DirEvent::NewConsensus));
891 },
892 async {
893 publish.publish(DirEvent::NewDescriptors);
894 ev_lis.await;
895 publish.publish(DirEvent::NewConsensus);
896 }
897 );
898 });
899 }
900
901 #[test]
902 fn two_publishers() {
903 test_with_all_runtimes!(|_rt| async {
904 let publish1: FlagPublisher<DirEvent> = FlagPublisher::new();
905 let publish2 = publish1.clone();
906
907 let mut sub = publish1.subscribe();
908 let ev1 = event_listener::Event::new();
909 let ev2 = event_listener::Event::new();
910 let ev1_lis = ev1.listen();
911 let ev2_lis = ev2.listen();
912 futures::join!(
913 async {
914 let mut count = [0_usize; 2];
915 // These awaits guarantee that we will see at least one event flag of each
916 // type, before the stream is dropped.
917 ev1_lis.await;
918 ev2_lis.await;
919 while let Some(e) = sub.next().await {
920 count[e.to_index() as usize] += 1;
921 }
922 assert!(count[0] > 0);
923 assert!(count[1] > 0);
924 assert!(count[0] <= 100);
925 assert!(count[1] <= 100);
926 },
927 async {
928 for _ in 0..100 {
929 publish1.publish(DirEvent::NewDescriptors);
930 ev1.notify(1);
931 tor_rtcompat::task::yield_now().await;
932 }
933 drop(publish1);
934 },
935 async {
936 for _ in 0..100 {
937 publish2.publish(DirEvent::NewConsensus);
938 ev2.notify(1);
939 tor_rtcompat::task::yield_now().await;
940 }
941 drop(publish2);
942 }
943 );
944 });
945 }
946
947 #[test]
948 fn receive_after_publishers_are_gone() {
949 test_with_all_runtimes!(|_rt| async {
950 let publish: FlagPublisher<DirEvent> = FlagPublisher::new();
951
952 let mut sub = publish.subscribe();
953
954 publish.publish(DirEvent::NewConsensus);
955 drop(publish);
956 let v = sub.next().await;
957 assert_eq!(v, Some(DirEvent::NewConsensus));
958 let v = sub.next().await;
959 assert!(v.is_none());
960 });
961 }
962
963 #[test]
964 fn failed_conversion() {
965 assert_eq!(DirEvent::from_index(999), None);
966 }
967
968 #[test]
969 fn dir_status_basics() {
970 let now = SystemTime::get();
971 let hour = Duration::new(3600, 0);
972
973 let nothing = DirStatus {
974 progress: DirProgress::NoConsensus { after: None },
975 ..Default::default()
976 };
977 let lifetime = netstatus::Lifetime::new(now, now + hour, now + hour * 2).unwrap();
978 let unval = DirStatus {
979 progress: DirProgress::FetchingCerts {
980 lifetime: lifetime.clone(),
981 usable_lifetime: lifetime,
982 n_certs: (3, 5),
983 },
984 ..Default::default()
985 };
986 let lifetime =
987 netstatus::Lifetime::new(now + hour, now + hour * 2, now + hour * 3).unwrap();
988 let with_c = DirStatus {
989 progress: DirProgress::Validated {
990 lifetime: lifetime.clone(),
991 usable_lifetime: lifetime,
992 n_mds: (30, 40),
993 usable: false,
994 },
995 ..Default::default()
996 };
997
998 // lifetime()
999 assert!(nothing.usable_lifetime().is_none());
1000 assert_eq!(unval.usable_lifetime().unwrap().valid_after(), now);
1001 assert_eq!(
1002 with_c.usable_lifetime().unwrap().valid_until(),
1003 now + hour * 3
1004 );
1005
1006 // frac() (It's okay if we change the actual numbers here later; the
1007 // current ones are more or less arbitrary.)
1008 const TOL: f32 = 0.00001;
1009 assert_float_eq!(nothing.frac(), 0.0, abs <= TOL);
1010 assert_float_eq!(unval.frac(), 0.25 + 0.06, abs <= TOL);
1011 assert_float_eq!(with_c.frac(), 0.35 + 0.65 * 0.75, abs <= TOL);
1012
1013 // frac_at()
1014 let t1 = now + hour / 2;
1015 let t2 = t1 + hour * 2;
1016 assert!(nothing.frac_at(t1).is_none());
1017 assert_float_eq!(unval.frac_at(t1).unwrap(), 0.25 + 0.06, abs <= TOL);
1018 assert!(with_c.frac_at(t1).is_none());
1019 assert!(nothing.frac_at(t2).is_none());
1020 assert!(unval.frac_at(t2).is_none());
1021 assert_float_eq!(with_c.frac_at(t2).unwrap(), 0.35 + 0.65 * 0.75, abs <= TOL);
1022 }
1023
1024 #[test]
1025 fn dir_status_display() {
1026 use time::macros::datetime;
1027 let t1: SystemTime = datetime!(2022-01-17 11:00:00 UTC).into();
1028 let hour = Duration::new(3600, 0);
1029 let lifetime = netstatus::Lifetime::new(t1, t1 + hour, t1 + hour * 3).unwrap();
1030
1031 let ds = DirStatus {
1032 progress: DirProgress::NoConsensus { after: None },
1033 ..Default::default()
1034 };
1035 assert_eq!(ds.to_string(), "fetching a consensus");
1036
1037 let ds = DirStatus {
1038 progress: DirProgress::FetchingCerts {
1039 lifetime: lifetime.clone(),
1040 usable_lifetime: lifetime.clone(),
1041 n_certs: (3, 5),
1042 },
1043 ..Default::default()
1044 };
1045 assert_eq!(ds.to_string(), "fetching authority certificates (3/5)");
1046
1047 let ds = DirStatus {
1048 progress: DirProgress::Validated {
1049 lifetime: lifetime.clone(),
1050 usable_lifetime: lifetime.clone(),
1051 n_mds: (30, 40),
1052 usable: false,
1053 },
1054 ..Default::default()
1055 };
1056 assert_eq!(ds.to_string(), "fetching microdescriptors (30/40)");
1057
1058 let ds = DirStatus {
1059 progress: DirProgress::Validated {
1060 lifetime: lifetime.clone(),
1061 usable_lifetime: lifetime,
1062 n_mds: (30, 40),
1063 usable: true,
1064 },
1065 ..Default::default()
1066 };
1067 assert_eq!(
1068 ds.to_string(),
1069 "usable, fresh until 2022-01-17 12:00:00 UTC, and valid until 2022-01-17 14:00:00 UTC"
1070 );
1071 }
1072
1073 #[test]
1074 fn bootstrap_status() {
1075 use time::macros::datetime;
1076 let t1: SystemTime = datetime!(2022-01-17 11:00:00 UTC).into();
1077 let hour = Duration::new(3600, 0);
1078 let lifetime = netstatus::Lifetime::new(t1, t1 + hour, t1 + hour * 3).unwrap();
1079 let lifetime2 = netstatus::Lifetime::new(t1 + hour, t1 + hour * 2, t1 + hour * 4).unwrap();
1080
1081 let dp1 = DirProgress::Validated {
1082 lifetime: lifetime.clone(),
1083 usable_lifetime: lifetime.clone(),
1084 n_mds: (3, 40),
1085 usable: true,
1086 };
1087 let dp2 = DirProgress::Validated {
1088 lifetime: lifetime2.clone(),
1089 usable_lifetime: lifetime2.clone(),
1090 n_mds: (5, 40),
1091 usable: false,
1092 };
1093 let attempt1 = AttemptId::next();
1094 let attempt2 = AttemptId::next();
1095
1096 let bs = DirBootstrapStatus(StatusEnum::Replacing {
1097 current: StatusEntry {
1098 id: attempt1,
1099 status: DirStatus {
1100 progress: dp1.clone(),
1101 ..Default::default()
1102 },
1103 },
1104 next: StatusEntry {
1105 id: attempt2,
1106 status: DirStatus {
1107 progress: dp2.clone(),
1108 ..Default::default()
1109 },
1110 },
1111 });
1112
1113 assert_eq!(
1114 bs.to_string(),
1115 "directory is usable, fresh until 2022-01-17 12:00:00 UTC, and valid until 2022-01-17 14:00:00 UTC; next directory is fetching microdescriptors (5/40)"
1116 );
1117
1118 const TOL: f32 = 0.00001;
1119 assert_float_eq!(bs.frac_at(t1 + hour / 2), 1.0, abs <= TOL);
1120 assert_float_eq!(
1121 bs.frac_at(t1 + hour * 3 + hour / 2),
1122 0.35 + 0.65 * 0.125,
1123 abs <= TOL
1124 );
1125
1126 // Now try updating.
1127
1128 // Case 1: we have a usable directory and the updated status isn't usable.
1129 let mut bs = bs;
1130 let dp3 = DirProgress::Validated {
1131 lifetime: lifetime2.clone(),
1132 usable_lifetime: lifetime2.clone(),
1133 n_mds: (10, 40),
1134 usable: false,
1135 };
1136
1137 bs.update_progress(attempt2, dp3);
1138 assert!(matches!(
1139 bs.next().unwrap(),
1140 DirStatus {
1141 progress: DirProgress::Validated {
1142 n_mds: (10, 40),
1143 ..
1144 },
1145 ..
1146 }
1147 ));
1148
1149 // Case 2: The new directory _is_ usable and newer. It will replace the old one.
1150 let ds4 = DirStatus {
1151 progress: DirProgress::Validated {
1152 lifetime: lifetime2.clone(),
1153 usable_lifetime: lifetime2.clone(),
1154 n_mds: (20, 40),
1155 usable: true,
1156 },
1157 ..Default::default()
1158 };
1159 bs.update_progress(attempt2, ds4.progress);
1160 assert!(bs.next().is_none());
1161 assert_eq!(
1162 bs.current()
1163 .unwrap()
1164 .usable_lifetime()
1165 .unwrap()
1166 .valid_after(),
1167 lifetime2.valid_after()
1168 );
1169
1170 // Case 3: The new directory is usable but older. Nothing will happen.
1171 bs.update_progress(attempt1, dp1);
1172 assert!(bs.next().as_ref().is_none());
1173 assert_ne!(
1174 bs.current()
1175 .unwrap()
1176 .usable_lifetime()
1177 .unwrap()
1178 .valid_after(),
1179 lifetime.valid_after()
1180 );
1181
1182 // Case 4: starting with an unusable directory, we always replace.
1183 let mut bs = DirBootstrapStatus::default();
1184 assert!(!dp2.usable());
1185 assert!(bs.current().is_none());
1186 bs.update_progress(attempt2, dp2);
1187 assert!(bs.current().unwrap().usable_lifetime().is_some());
1188 }
1189}