Skip to main content

tor_dirmgr/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![doc = include_str!("../README.md")]
3// @@ begin lint list maintained by maint/add_warning @@
4#![allow(renamed_and_removed_lints)] // @@REMOVE_WHEN(ci_arti_stable)
5#![allow(unknown_lints)] // @@REMOVE_WHEN(ci_arti_nightly)
6#![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_time_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness
39#![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945
41#![allow(clippy::result_large_err)] // temporary workaround for arti#587
42#![allow(clippy::needless_raw_string_hashes)] // complained-about code is fine, often best
43#![allow(clippy::needless_lifetimes)] // See arti#1765
44#![allow(mismatched_lifetime_syntaxes)] // temporary workaround for arti#2060
45#![allow(clippy::collapsible_if)] // See arti#2342
46#![deny(clippy::unused_async)]
47#![deny(clippy::string_slice)] // See arti#2571
48//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
49
50// This clippy lint produces a false positive on `use strum`, below.
51// Attempting to apply the lint to just the use statement fails to suppress
52// this lint and instead produces another lint about a useless clippy attribute.
53#![allow(clippy::single_component_path_imports)]
54
55mod bootstrap;
56pub mod config;
57mod docid;
58mod docmeta;
59mod err;
60mod event;
61mod shared_ref;
62mod state;
63mod storage;
64
65#[cfg(feature = "bridge-client")]
66pub mod bridgedesc;
67#[cfg(feature = "dirfilter")]
68pub mod filter;
69
70use crate::docid::{CacheUsage, ClientRequest, DocQuery};
71use crate::err::BootstrapAction;
72#[cfg(not(feature = "experimental-api"))]
73use crate::shared_ref::SharedMutArc;
74#[cfg(feature = "experimental-api")]
75pub use crate::shared_ref::SharedMutArc;
76use crate::storage::{DynStore, Store};
77use bootstrap::AttemptId;
78use event::DirProgress;
79use postage::watch;
80use scopeguard::ScopeGuard;
81use tor_circmgr::CircMgr;
82use tor_dirclient::SourceInfo;
83use tor_dircommon::config::DirTolerance;
84use tor_error::{info_report, into_internal, warn_report};
85use tor_netdir::params::NetParameters;
86use tor_netdir::{DirEvent, MdReceiver, NetDir, NetDirProvider};
87
88use async_trait::async_trait;
89use futures::stream::BoxStream;
90use oneshot_fused_workaround as oneshot;
91use tor_netdoc::doc::netstatus::ProtoStatuses;
92use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
93use tor_rtcompat::{Runtime, SpawnExt};
94use tracing::{debug, info, instrument, trace, warn};
95use web_time_compat::SystemTimeExt;
96
97use std::marker::PhantomData;
98use std::sync::atomic::{AtomicBool, Ordering};
99use std::sync::{Arc, Mutex};
100use std::time::Duration;
101use std::{collections::HashMap, sync::Weak};
102use std::{fmt::Debug, time::SystemTime};
103
104use crate::state::{DirState, NetDirChange};
105pub use config::DirMgrConfig;
106pub use docid::DocId;
107pub use err::Error;
108pub use event::{DirBlockage, DirBootstrapEvents, DirBootstrapStatus};
109pub use storage::DocumentText;
110pub use tor_dircommon::fallback::{FallbackDir, FallbackDirBuilder};
111pub use tor_netdir::Timeliness;
112
113/// Re-export of `strum` crate for use by an internal macro
114use strum;
115
116/// A Result as returned by this crate.
117pub type Result<T> = std::result::Result<T, Error>;
118
119/// Storage manager used by [`DirMgr`] and
120/// [`BridgeDescMgr`](bridgedesc::BridgeDescMgr)
121///
122/// Internally, this wraps up a sqlite database.
123///
124/// This is a handle, which is cheap to clone; clones share state.
125#[derive(Clone)]
126pub struct DirMgrStore<R: Runtime> {
127    /// The actual store
128    pub(crate) store: Arc<Mutex<crate::DynStore>>,
129
130    /// Be parameterized by Runtime even though we don't use it right now
131    pub(crate) runtime: PhantomData<R>,
132}
133
134impl<R: Runtime> DirMgrStore<R> {
135    /// Open the storage, according to the specified configuration
136    pub fn new(config: &DirMgrConfig, runtime: R, offline: bool) -> Result<Self> {
137        let store = Arc::new(Mutex::new(config.open_store(offline)?));
138        drop(runtime);
139        let runtime = PhantomData;
140        Ok(DirMgrStore { store, runtime })
141    }
142}
143
144/// Trait for DirMgr implementations
145#[async_trait]
146pub trait DirProvider: NetDirProvider {
147    /// Try to change our configuration to `new_config`.
148    ///
149    /// Actual behavior will depend on the value of `how`.
150    fn reconfigure(
151        &self,
152        new_config: &DirMgrConfig,
153        how: tor_config::Reconfigure,
154    ) -> std::result::Result<(), tor_config::ReconfigureError>;
155
156    /// Bootstrap a `DirProvider` that hasn't been bootstrapped yet.
157    async fn bootstrap(&self) -> Result<()>;
158
159    /// Return a stream of [`DirBootstrapStatus`] events to tell us about changes
160    /// in the latest directory's bootstrap status.
161    ///
162    /// Note that this stream can be lossy: the caller will not necessarily
163    /// observe every event on the stream
164    fn bootstrap_events(&self) -> BoxStream<'static, DirBootstrapStatus>;
165
166    /// Return a [`TaskHandle`] that can be used to manage the download process.
167    fn download_task_handle(&self) -> Option<TaskHandle> {
168        None
169    }
170}
171
172// NOTE(eta): We can't implement this for Arc<DirMgr<R>> due to trait coherence rules, so instead
173//            there's a blanket impl for Arc<T> in tor-netdir.
174impl<R: Runtime> NetDirProvider for DirMgr<R> {
175    fn netdir(&self, timeliness: Timeliness) -> tor_netdir::Result<Arc<NetDir>> {
176        use tor_netdir::Error as NetDirError;
177        let netdir = self.netdir.get().ok_or(NetDirError::NoInfo)?;
178        let lifetime = match timeliness {
179            Timeliness::Strict => netdir.lifetime().clone(),
180            Timeliness::Timely => self
181                .config
182                .get()
183                .tolerance
184                .extend_lifetime(netdir.lifetime()),
185            Timeliness::Unchecked => return Ok(netdir),
186        };
187        // TODO #2384 -- we have a runtime here; we should use it.
188        let now = SystemTime::get();
189        if lifetime.valid_after() > now {
190            Err(NetDirError::DirNotYetValid)
191        } else if lifetime.valid_until() < now {
192            Err(NetDirError::DirExpired)
193        } else {
194            Ok(netdir)
195        }
196    }
197
198    fn events(&self) -> BoxStream<'static, DirEvent> {
199        Box::pin(self.events.subscribe())
200    }
201
202    fn params(&self) -> Arc<dyn AsRef<tor_netdir::params::NetParameters>> {
203        if let Some(netdir) = self.netdir.get() {
204            // We have a directory, so we'd like to give it out for its
205            // parameters.
206            //
207            // We do this even if the directory is expired, since parameters
208            // don't really expire on any plausible timescale.
209            netdir
210        } else {
211            // We have no directory, so we'll give out the default parameters as
212            // modified by the provided override_net_params configuration.
213            //
214            self.default_parameters
215                .lock()
216                .expect("Poisoned lock")
217                .clone()
218        }
219        // TODO(nickm): If we felt extremely clever, we could add a third case
220        // where, if we have a pending directory with a validated consensus, we
221        // give out that consensus's network parameters even if we _don't_ yet
222        // have a full directory.  That's significant refactoring, though, for
223        // an unclear amount of benefit.
224    }
225
226    fn protocol_statuses(&self) -> Option<(SystemTime, Arc<ProtoStatuses>)> {
227        self.protocols.lock().expect("Poisoned lock").clone()
228    }
229}
230
231#[async_trait]
232impl<R: Runtime> DirProvider for Arc<DirMgr<R>> {
233    fn reconfigure(
234        &self,
235        new_config: &DirMgrConfig,
236        how: tor_config::Reconfigure,
237    ) -> std::result::Result<(), tor_config::ReconfigureError> {
238        DirMgr::reconfigure(self, new_config, how)
239    }
240
241    #[instrument(level = "trace", skip_all)]
242    async fn bootstrap(&self) -> Result<()> {
243        DirMgr::bootstrap(self).await
244    }
245
246    fn bootstrap_events(&self) -> BoxStream<'static, DirBootstrapStatus> {
247        Box::pin(DirMgr::bootstrap_events(self))
248    }
249
250    fn download_task_handle(&self) -> Option<TaskHandle> {
251        Some(self.task_handle.clone())
252    }
253}
254
255/// A directory manager to download, fetch, and cache a Tor directory.
256///
257/// A DirMgr can operate in three modes:
258///   * In **offline** mode, it only reads from the cache, and can
259///     only read once.
260///   * In **read-only** mode, it reads from the cache, but checks
261///     whether it can acquire an associated lock file.  If it can, then
262///     it enters read-write mode.  If not, it checks the cache
263///     periodically for new information.
264///   * In **read-write** mode, it knows that no other process will be
265///     writing to the cache, and it takes responsibility for fetching
266///     data from the network and updating the directory with new
267///     directory information.
268pub struct DirMgr<R: Runtime> {
269    /// Configuration information: where to find directories, how to
270    /// validate them, and so on.
271    config: tor_config::MutCfg<DirMgrConfig>,
272    /// Handle to our sqlite cache.
273    // TODO(nickm): I'd like to use an rwlock, but that's not feasible, since
274    // rusqlite::Connection isn't Sync.
275    // TODO is needed?
276    store: Arc<Mutex<DynStore>>,
277    /// Our latest sufficiently bootstrapped directory, if we have one.
278    ///
279    /// We use the RwLock so that we can give this out to a bunch of other
280    /// users, and replace it once a new directory is bootstrapped.
281    // TODO(eta): Eurgh! This is so many Arcs! (especially considering this
282    //            gets wrapped in an Arc)
283    netdir: Arc<SharedMutArc<NetDir>>,
284
285    /// Our latest set of recommended protocols.
286    protocols: Mutex<Option<(SystemTime, Arc<ProtoStatuses>)>>,
287
288    /// A set of network parameters to hand out when we have no directory.
289    default_parameters: Mutex<Arc<NetParameters>>,
290
291    /// A publisher handle that we notify whenever the consensus changes.
292    events: event::FlagPublisher<DirEvent>,
293
294    /// A publisher handle that we notify whenever our bootstrapping status
295    /// changes.
296    send_status: Mutex<watch::Sender<event::DirBootstrapStatus>>,
297
298    /// A receiver handle that gets notified whenever our bootstrapping status
299    /// changes.
300    ///
301    /// We don't need to keep this drained, since `postage::watch` already knows
302    /// to discard unread events.
303    receive_status: DirBootstrapEvents,
304
305    /// A circuit manager, if this DirMgr supports downloading.
306    circmgr: Option<Arc<CircMgr<R>>>,
307
308    /// Our asynchronous runtime.
309    runtime: R,
310
311    /// Whether or not we're operating in offline mode.
312    offline: bool,
313
314    /// If we're not in offline mode, stores whether or not the `DirMgr` has attempted
315    /// to bootstrap yet or not.
316    ///
317    /// This exists in order to prevent starting two concurrent bootstrap tasks.
318    ///
319    /// (In offline mode, this does nothing.)
320    bootstrap_started: AtomicBool,
321
322    /// A filter that gets applied to directory objects before we use them.
323    #[cfg(feature = "dirfilter")]
324    filter: crate::filter::FilterConfig,
325
326    /// A task schedule that can be used if we're bootstrapping.  If this is
327    /// None, then there's currently a scheduled task in progress.
328    task_schedule: Mutex<Option<TaskSchedule<R>>>,
329
330    /// A task handle that we return to anybody who needs to manage our download process.
331    task_handle: TaskHandle,
332}
333
334/// The possible origins of a document.
335///
336/// Used (for example) to report where we got a document from if it fails to
337/// parse.
338#[derive(Debug, Clone)]
339#[non_exhaustive]
340pub enum DocSource {
341    /// We loaded the document from our cache.
342    LocalCache,
343    /// We fetched the document from a server.
344    DirServer {
345        /// Information about the server we fetched the document from.
346        source: Option<SourceInfo>,
347    },
348}
349
350impl std::fmt::Display for DocSource {
351    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
352        match self {
353            DocSource::LocalCache => write!(f, "local cache"),
354            DocSource::DirServer { source: None } => write!(f, "directory server"),
355            DocSource::DirServer { source: Some(info) } => write!(f, "directory server {}", info),
356        }
357    }
358}
359
360impl<R: Runtime> DirMgr<R> {
361    /// Try to load the directory from disk, without launching any
362    /// kind of update process.
363    ///
364    /// This function runs in **offline** mode: it will give an error
365    /// if the result is not up-to-date, or not fully downloaded.
366    ///
367    /// In general, you shouldn't use this function in a long-running
368    /// program; it's only suitable for command-line or batch tools.
369    // TODO: I wish this function didn't have to be async or take a runtime.
370    pub fn load_once(runtime: R, config: DirMgrConfig) -> Result<Arc<NetDir>> {
371        let store = DirMgrStore::new(&config, runtime.clone(), true)?;
372        let dirmgr = Arc::new(Self::from_config(config, runtime, store, None, true)?);
373
374        // TODO: add some way to return a directory that isn't up-to-date
375        let attempt = AttemptId::next();
376        trace!(%attempt, "Trying to load a full directory from cache");
377        let outcome = dirmgr.load_directory(attempt);
378        trace!(%attempt, "Load result: {outcome:?}");
379        let _success = outcome?;
380
381        dirmgr
382            .netdir(Timeliness::Timely)
383            .map_err(|_| Error::DirectoryNotPresent)
384    }
385
386    /// Return a current netdir, either loading it or bootstrapping it
387    /// as needed.
388    ///
389    /// Like load_once, but will try to bootstrap (or wait for another
390    /// process to bootstrap) if we don't have an up-to-date
391    /// bootstrapped directory.
392    ///
393    /// In general, you shouldn't use this function in a long-running
394    /// program; it's only suitable for command-line or batch tools.
395    pub async fn load_or_bootstrap_once(
396        config: DirMgrConfig,
397        runtime: R,
398        store: DirMgrStore<R>,
399        circmgr: Arc<CircMgr<R>>,
400    ) -> Result<Arc<NetDir>> {
401        let dirmgr = DirMgr::bootstrap_from_config(config, runtime, store, circmgr).await?;
402        dirmgr
403            .timely_netdir()
404            .map_err(|_| Error::DirectoryNotPresent)
405    }
406
407    /// Create a new `DirMgr` in online mode, but don't bootstrap it yet.
408    ///
409    /// The `DirMgr` can be bootstrapped later with `bootstrap`.
410    pub fn create_unbootstrapped(
411        config: DirMgrConfig,
412        runtime: R,
413        store: DirMgrStore<R>,
414        circmgr: Arc<CircMgr<R>>,
415    ) -> Result<Arc<Self>> {
416        Ok(Arc::new(DirMgr::from_config(
417            config,
418            runtime,
419            store,
420            Some(circmgr),
421            false,
422        )?))
423    }
424
425    /// Bootstrap a `DirMgr` created in online mode that hasn't been bootstrapped yet.
426    ///
427    /// This function will not return until the directory is bootstrapped enough to build circuits.
428    /// It will also launch a background task that fetches any missing information, and that
429    /// replaces the directory when a new one is available.
430    ///
431    /// This function is intended to be used together with `create_unbootstrapped`. There is no
432    /// need to call this function otherwise.
433    ///
434    /// If bootstrapping has already successfully taken place, returns early with success.
435    ///
436    /// # Errors
437    ///
438    /// Returns an error if bootstrapping fails. If the error is [`Error::CantAdvanceState`],
439    /// it may be possible to successfully bootstrap later on by calling this function again.
440    ///
441    /// # Panics
442    ///
443    /// Panics if the `DirMgr` passed to this function was not created in online mode, such as
444    /// via `load_once`.
445    #[allow(clippy::cognitive_complexity)] // TODO: Refactor
446    #[instrument(level = "trace", skip_all)]
447    pub async fn bootstrap(self: &Arc<Self>) -> Result<()> {
448        if self.offline {
449            return Err(Error::OfflineMode);
450        }
451
452        // The semantics of this are "attempt to replace a 'false' value with 'true'.
453        // If the value in bootstrap_started was not 'false' when the attempt was made, returns
454        // `Err`; this means another bootstrap attempt is in progress or has completed, so we
455        // return early.
456
457        // NOTE(eta): could potentially weaken the `Ordering` here in future
458        if self
459            .bootstrap_started
460            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
461            .is_err()
462        {
463            debug!("Attempted to bootstrap twice; ignoring.");
464            return Ok(());
465        }
466
467        // Use a RAII guard to reset `bootstrap_started` to `false` if we return early without
468        // completing bootstrap.
469        let reset_bootstrap_started = scopeguard::guard(&self.bootstrap_started, |v| {
470            v.store(false, Ordering::SeqCst);
471        });
472
473        let schedule = {
474            let sched = self.task_schedule.lock().expect("poisoned lock").take();
475            match sched {
476                Some(sched) => sched,
477                None => {
478                    debug!("Attempted to bootstrap twice; ignoring.");
479                    return Ok(());
480                }
481            }
482        };
483
484        // Try to load from the cache.
485        let attempt_id = AttemptId::next();
486        trace!(attempt=%attempt_id, "Starting to bootstrap directory");
487        let have_directory = self.load_directory(attempt_id)?;
488
489        let (mut sender, receiver) = if have_directory {
490            info!("Loaded a good directory from cache.");
491            (None, None)
492        } else {
493            info!("Didn't get usable directory from cache.");
494            let (sender, receiver) = oneshot::channel();
495            (Some(sender), Some(receiver))
496        };
497
498        // Whether we loaded or not, we now start downloading.
499        let dirmgr_weak = Arc::downgrade(self);
500        self.runtime
501            .spawn(async move {
502                // Use an RAII guard to make sure that when this task exits, the
503                // TaskSchedule object is put back.
504                //
505                // TODO(nick): Putting the schedule back isn't actually useful
506                // if the task exits _after_ we've bootstrapped for the first
507                // time, because of how bootstrap_started works.
508                let mut schedule = scopeguard::guard(schedule, |schedule| {
509                    if let Some(dm) = Weak::upgrade(&dirmgr_weak) {
510                        *dm.task_schedule.lock().expect("poisoned lock") = Some(schedule);
511                    }
512                });
513
514                // Don't warn when these are Error::ManagerDropped: that
515                // means that the DirMgr has been shut down.
516                if let Err(e) =
517                    Self::reload_until_owner(&dirmgr_weak, &mut schedule, attempt_id, &mut sender)
518                        .await
519                {
520                    match e {
521                        Error::ManagerDropped => {}
522                        _ => warn_report!(e, "Unrecovered error while waiting for bootstrap",),
523                    }
524                } else if let Err(e) =
525                    Self::download_forever(dirmgr_weak.clone(), &mut schedule, attempt_id, sender)
526                        .await
527                {
528                    match e {
529                        Error::ManagerDropped => {}
530                        _ => warn_report!(e, "Unrecovered error while downloading"),
531                    }
532                }
533            })
534            .map_err(|e| Error::from_spawn("directory updater task", e))?;
535
536        if let Some(receiver) = receiver {
537            match receiver.await {
538                Ok(()) => {
539                    info!("We have enough information to build circuits.");
540                    // Disarm the RAII guard, since we succeeded.  Now bootstrap_started will remain true.
541                    let _ = ScopeGuard::into_inner(reset_bootstrap_started);
542                }
543                Err(_) => {
544                    warn!("Bootstrapping task exited before finishing.");
545                    return Err(Error::CantAdvanceState);
546                }
547            }
548        }
549        Ok(())
550    }
551
552    /// Returns `true` if a bootstrap attempt is in progress, or successfully completed.
553    pub fn bootstrap_started(&self) -> bool {
554        self.bootstrap_started.load(Ordering::SeqCst)
555    }
556
557    /// Return a new directory manager from a given configuration,
558    /// bootstrapping from the network as necessary.
559    #[instrument(level = "trace", skip_all)]
560    pub async fn bootstrap_from_config(
561        config: DirMgrConfig,
562        runtime: R,
563        store: DirMgrStore<R>,
564        circmgr: Arc<CircMgr<R>>,
565    ) -> Result<Arc<Self>> {
566        let dirmgr = Self::create_unbootstrapped(config, runtime, store, circmgr)?;
567
568        dirmgr.bootstrap().await?;
569
570        Ok(dirmgr)
571    }
572
573    /// Try forever to either lock the storage (and thereby become the
574    /// owner), or to reload the database.
575    ///
576    /// If we have begin to have a bootstrapped directory, send a
577    /// message using `on_complete`.
578    ///
579    /// If we eventually become the owner, return Ok().
580    #[allow(clippy::cognitive_complexity)] // TODO: Refactor?
581    async fn reload_until_owner(
582        weak: &Weak<Self>,
583        schedule: &mut TaskSchedule<R>,
584        attempt_id: AttemptId,
585        on_complete: &mut Option<oneshot::Sender<()>>,
586    ) -> Result<()> {
587        let mut logged = false;
588        let mut bootstrapped;
589        {
590            let dirmgr = upgrade_weak_ref(weak)?;
591            bootstrapped = dirmgr.netdir.get().is_some();
592        }
593
594        loop {
595            {
596                let dirmgr = upgrade_weak_ref(weak)?;
597                trace!("Trying to take ownership of the directory cache lock");
598                if dirmgr.try_upgrade_to_readwrite()? {
599                    // We now own the lock!  (Maybe we owned it before; the
600                    // upgrade_to_readwrite() function is idempotent.)  We can
601                    // do our own bootstrapping.
602                    if logged {
603                        info!(
604                            "The previous owning process has given up the lock. We are now in charge of managing the directory."
605                        );
606                    }
607                    return Ok(());
608                }
609            }
610
611            if !logged {
612                logged = true;
613                if bootstrapped {
614                    info!("Another process is managing the directory. We'll use its cache.");
615                } else {
616                    info!(
617                        "Another process is bootstrapping the directory. Waiting till it finishes or exits."
618                    );
619                }
620            }
621
622            // We don't own the lock.  Somebody else owns the cache.  They
623            // should be updating it.  Wait a bit, then try again.
624            let pause = if bootstrapped {
625                std::time::Duration::new(120, 0)
626            } else {
627                std::time::Duration::new(5, 0)
628            };
629            schedule.sleep(pause).await?;
630            // TODO: instead of loading the whole thing we should have a
631            // database entry that says when the last update was, or use
632            // our state functions.
633            {
634                let dirmgr = upgrade_weak_ref(weak)?;
635                trace!("Trying to load from the directory cache");
636                if dirmgr.load_directory(attempt_id)? {
637                    // Successfully loaded a bootstrapped directory.
638                    if let Some(send_done) = on_complete.take() {
639                        let _ = send_done.send(());
640                    }
641                    if !bootstrapped {
642                        info!("The directory is now bootstrapped.");
643                    }
644                    bootstrapped = true;
645                }
646            }
647        }
648    }
649
650    /// Try to fetch our directory info and keep it updated, indefinitely.
651    ///
652    /// If we have begin to have a bootstrapped directory, send a
653    /// message using `on_complete`.
654    #[allow(clippy::cognitive_complexity)] // TODO: Refactor?
655    #[instrument(level = "trace", skip_all)]
656    async fn download_forever(
657        weak: Weak<Self>,
658        schedule: &mut TaskSchedule<R>,
659        mut attempt_id: AttemptId,
660        mut on_complete: Option<oneshot::Sender<()>>,
661    ) -> Result<()> {
662        let mut state: Box<dyn DirState> = {
663            let dirmgr = upgrade_weak_ref(&weak)?;
664            Box::new(state::GetConsensusState::new(
665                dirmgr.runtime.clone(),
666                dirmgr.config.get(),
667                CacheUsage::CacheOkay,
668                Some(dirmgr.netdir.clone()),
669                #[cfg(feature = "dirfilter")]
670                dirmgr
671                    .filter
672                    .clone()
673                    .unwrap_or_else(|| Arc::new(crate::filter::NilFilter)),
674            ))
675        };
676
677        trace!("Entering download loop.");
678
679        loop {
680            let mut usable = false;
681
682            let retry_config = {
683                let dirmgr = upgrade_weak_ref(&weak)?;
684                // TODO(nickm): instead of getting this every time we loop, it
685                // might be a good idea to refresh it with each attempt, at
686                // least at the point of checking the number of attempts.
687                dirmgr.config.get().schedule.retry_bootstrap()
688            };
689            let mut retry_delay = retry_config.schedule();
690
691            'retry_attempt: for try_num in retry_config.attempts() {
692                trace!(attempt=%attempt_id, ?try_num, "Trying to download a directory.");
693                let outcome = bootstrap::download(
694                    Weak::clone(&weak),
695                    &mut state,
696                    schedule,
697                    attempt_id,
698                    &mut on_complete,
699                )
700                .await;
701                trace!(attempt=%attempt_id, ?try_num, ?outcome, "Download is over.");
702
703                if let Err(err) = outcome {
704                    if state.is_ready(Readiness::Usable) {
705                        usable = true;
706                        info_report!(
707                            err,
708                            "Unable to completely download a directory. (Nevertheless, the directory is usable, so we'll pause for now)"
709                        );
710                        break 'retry_attempt;
711                    }
712
713                    match err.bootstrap_action() {
714                        BootstrapAction::Nonfatal => {
715                            return Err(into_internal!(
716                                "Nonfatal error should not have propagated here"
717                            )(err)
718                            .into());
719                        }
720                        BootstrapAction::Reset => {}
721                        BootstrapAction::Fatal => return Err(err),
722                    }
723
724                    let delay = retry_delay.next_delay(&mut rand::rng());
725                    warn_report!(
726                        err,
727                        "Unable to download a usable directory. (We will restart in {})",
728                        humantime::format_duration(delay),
729                    );
730                    {
731                        let dirmgr = upgrade_weak_ref(&weak)?;
732                        dirmgr.note_reset(attempt_id);
733                    }
734                    schedule.sleep(delay).await?;
735                    state = state.reset();
736                } else {
737                    info!(attempt=%attempt_id, "Directory is complete.");
738                    usable = true;
739                    break 'retry_attempt;
740                }
741            }
742
743            if !usable {
744                // we ran out of attempts.
745                warn!(
746                    "We failed {} times to bootstrap a directory. We're going to give up.",
747                    retry_config.n_attempts()
748                );
749                return Err(Error::CantAdvanceState);
750            } else {
751                // Report success, if appropriate.
752                if let Some(send_done) = on_complete.take() {
753                    let _ = send_done.send(());
754                }
755            }
756
757            let reset_at = state.reset_time();
758            match reset_at {
759                Some(t) => {
760                    trace!("Sleeping until {}", time::OffsetDateTime::from(t));
761                    schedule.sleep_until_wallclock(t).await?;
762                }
763                None => return Ok(()),
764            }
765            attempt_id = bootstrap::AttemptId::next();
766            trace!(attempt=%attempt_id, "Beginning new attempt to bootstrap directory");
767            state = state.reset();
768        }
769    }
770
771    /// Get a reference to the circuit manager, if we have one.
772    fn circmgr(&self) -> Result<Arc<CircMgr<R>>> {
773        self.circmgr.clone().ok_or(Error::NoDownloadSupport)
774    }
775
776    /// Try to change our configuration to `new_config`.
777    ///
778    /// Actual behavior will depend on the value of `how`.
779    pub fn reconfigure(
780        &self,
781        new_config: &DirMgrConfig,
782        how: tor_config::Reconfigure,
783    ) -> std::result::Result<(), tor_config::ReconfigureError> {
784        let config = self.config.get();
785        // We don't support changing these: doing so basically would require us
786        // to abort all our in-progress downloads, since they might be based on
787        // no-longer-viable information.
788        // NOTE: keep this in sync with the behaviour of `DirMgrConfig::update_from_config`
789        if new_config.cache_dir != config.cache_dir {
790            how.cannot_change("storage.cache_dir")?;
791        }
792        if new_config.cache_trust != config.cache_trust {
793            how.cannot_change("storage.permissions")?;
794        }
795        if new_config.authorities() != config.authorities() {
796            how.cannot_change("network.authorities")?;
797        }
798
799        if how == tor_config::Reconfigure::CheckAllOrNothing {
800            return Ok(());
801        }
802
803        let params_changed = new_config.override_net_params != config.override_net_params;
804
805        self.config
806            .map_and_replace(|cfg| cfg.update_from_config(new_config));
807
808        if params_changed {
809            let _ignore_err = self.netdir.mutate(|netdir| {
810                netdir.replace_overridden_parameters(&new_config.override_net_params);
811                Ok(())
812            });
813            {
814                let mut params = self.default_parameters.lock().expect("lock failed");
815                *params = Arc::new(NetParameters::from_map(&new_config.override_net_params));
816            }
817
818            // (It's okay to ignore the error, since it just means that there
819            // was no current netdir.)
820            self.events.publish(DirEvent::NewConsensus);
821        }
822
823        Ok(())
824    }
825
826    /// Return a stream of [`DirBootstrapStatus`] events to tell us about changes
827    /// in the latest directory's bootstrap status.
828    ///
829    /// Note that this stream can be lossy: the caller will not necessarily
830    /// observe every event on the stream
831    pub fn bootstrap_events(&self) -> event::DirBootstrapEvents {
832        self.receive_status.clone()
833    }
834
835    /// Replace the latest status with `progress` and broadcast to anybody
836    /// watching via a [`DirBootstrapEvents`] stream.
837    fn update_progress(&self, attempt_id: AttemptId, progress: DirProgress) {
838        // TODO(nickm): can I kill off this lock by having something else own the sender?
839        let mut sender = self.send_status.lock().expect("poisoned lock");
840        let mut status = sender.borrow_mut();
841
842        status.update_progress(attempt_id, progress);
843    }
844
845    /// Update our status tracker to note that some number of errors has
846    /// occurred.
847    fn note_errors(&self, attempt_id: AttemptId, n_errors: usize) {
848        if n_errors == 0 {
849            return;
850        }
851        let mut sender = self.send_status.lock().expect("poisoned lock");
852        let mut status = sender.borrow_mut();
853
854        status.note_errors(attempt_id, n_errors);
855    }
856
857    /// Update our status tracker to note that we've needed to reset our download attempt.
858    fn note_reset(&self, attempt_id: AttemptId) {
859        let mut sender = self.send_status.lock().expect("poisoned lock");
860        let mut status = sender.borrow_mut();
861
862        status.note_reset(attempt_id);
863    }
864
865    /// Try to make this a directory manager with read-write access to its
866    /// storage.
867    ///
868    /// Return true if we got the lock, or if we already had it.
869    ///
870    /// Return false if another process has the lock
871    fn try_upgrade_to_readwrite(&self) -> Result<bool> {
872        self.store
873            .lock()
874            .expect("Directory storage lock poisoned")
875            .upgrade_to_readwrite()
876    }
877
878    /// Return a reference to the store, if it is currently read-write.
879    #[cfg(test)]
880    fn store_if_rw(&self) -> Option<&Mutex<DynStore>> {
881        let rw = !self
882            .store
883            .lock()
884            .expect("Directory storage lock poisoned")
885            .is_readonly();
886        // A race-condition is possible here, but I believe it's harmless.
887        if rw { Some(&self.store) } else { None }
888    }
889
890    /// Construct a DirMgr from a DirMgrConfig.
891    ///
892    /// If `offline` is set, opens the SQLite store read-only and sets the offline flag in the
893    /// returned manager.
894    #[allow(clippy::unnecessary_wraps)] // API compat and future-proofing
895    fn from_config(
896        config: DirMgrConfig,
897        runtime: R,
898        store: DirMgrStore<R>,
899        circmgr: Option<Arc<CircMgr<R>>>,
900        offline: bool,
901    ) -> Result<Self> {
902        let netdir = Arc::new(SharedMutArc::new());
903        let events = event::FlagPublisher::new();
904        let default_parameters = NetParameters::from_map(&config.override_net_params);
905        let default_parameters = Mutex::new(Arc::new(default_parameters));
906
907        let (send_status, receive_status) = postage::watch::channel();
908        let send_status = Mutex::new(send_status);
909        let receive_status = DirBootstrapEvents {
910            inner: receive_status,
911        };
912        #[cfg(feature = "dirfilter")]
913        let filter = config.extensions.filter.clone();
914
915        // We create these early so the client code can access task_handle before bootstrap() returns.
916        let (task_schedule, task_handle) = TaskSchedule::new(runtime.clone());
917        let task_schedule = Mutex::new(Some(task_schedule));
918
919        // We load the cached protocol recommendations unconditionally: the caller needs them even
920        // if it does not try to load the reset of the cache.
921        let protocols = {
922            let store = store.store.lock().expect("lock poisoned");
923            store
924                .cached_protocol_recommendations()?
925                .map(|(t, p)| (t, Arc::new(p)))
926        };
927
928        Ok(DirMgr {
929            config: config.into(),
930            store: store.store,
931            netdir,
932            protocols: Mutex::new(protocols),
933            default_parameters,
934            events,
935            send_status,
936            receive_status,
937            circmgr,
938            runtime,
939            offline,
940            bootstrap_started: AtomicBool::new(false),
941            #[cfg(feature = "dirfilter")]
942            filter,
943            task_schedule,
944            task_handle,
945        })
946    }
947
948    /// Load the latest non-pending non-expired directory from the
949    /// cache, if it is newer than the one we have.
950    ///
951    /// Return false if there is no such consensus.
952    fn load_directory(self: &Arc<Self>, attempt_id: AttemptId) -> Result<bool> {
953        let state = state::GetConsensusState::new(
954            self.runtime.clone(),
955            self.config.get(),
956            CacheUsage::CacheOnly,
957            None,
958            #[cfg(feature = "dirfilter")]
959            self.filter
960                .clone()
961                .unwrap_or_else(|| Arc::new(crate::filter::NilFilter)),
962        );
963        let _ = bootstrap::load(self, Box::new(state), attempt_id)?;
964
965        Ok(self.netdir.get().is_some())
966    }
967
968    /// Return a new asynchronous stream that will receive notification
969    /// whenever the consensus has changed.
970    ///
971    /// Multiple events may be batched up into a single item: each time
972    /// this stream yields an event, all you can assume is that the event has
973    /// occurred at least once.
974    pub fn events(&self) -> impl futures::Stream<Item = DirEvent> + use<R> {
975        self.events.subscribe()
976    }
977
978    /// Try to load the text of a single document described by `doc` from
979    /// storage.
980    pub fn text(&self, doc: &DocId) -> Result<Option<DocumentText>> {
981        use itertools::Itertools;
982        let mut result = HashMap::new();
983        let query: DocQuery = (*doc).into();
984        let store = self.store.lock().expect("store lock poisoned");
985        query.load_from_store_into(&mut result, &**store)?;
986        let item = result.into_iter().at_most_one().map_err(|_| {
987            Error::CacheCorruption("Found more than one entry in storage for given docid")
988        })?;
989        if let Some((docid, doctext)) = item {
990            if &docid != doc {
991                return Err(Error::CacheCorruption(
992                    "Item from storage had incorrect docid.",
993                ));
994            }
995            Ok(Some(doctext))
996        } else {
997            Ok(None)
998        }
999    }
1000
1001    /// Load the text for a collection of documents.
1002    ///
1003    /// If many of the documents have the same type, this can be more
1004    /// efficient than calling [`text`](Self::text).
1005    pub fn texts<T>(&self, docs: T) -> Result<HashMap<DocId, DocumentText>>
1006    where
1007        T: IntoIterator<Item = DocId>,
1008    {
1009        let partitioned = docid::partition_by_type(docs);
1010        let mut result = HashMap::new();
1011        let store = self.store.lock().expect("store lock poisoned");
1012        for (_, query) in partitioned.into_iter() {
1013            query.load_from_store_into(&mut result, &**store)?;
1014        }
1015        Ok(result)
1016    }
1017
1018    /// Given a request we sent and the response we got from a
1019    /// directory server, see whether we should expand that response
1020    /// into "something larger".
1021    ///
1022    /// Currently, this handles expanding consensus diffs, and nothing
1023    /// else.  We do it at this stage of our downloading operation
1024    /// because it requires access to the store.
1025    fn expand_response_text(&self, req: &ClientRequest, text: String) -> Result<String> {
1026        if let ClientRequest::Consensus(req) = req {
1027            if tor_consdiff::looks_like_diff(&text) {
1028                if let Some(old_d) = req.old_consensus_digests().next() {
1029                    let db_val = {
1030                        let s = self.store.lock().expect("Directory storage lock poisoned");
1031                        s.consensus_by_sha3_digest_of_signed_part(old_d)?
1032                    };
1033                    if let Some((old_consensus, meta)) = db_val {
1034                        info!("Applying a consensus diff");
1035                        let new_consensus = tor_consdiff::apply_diff(
1036                            old_consensus.as_str()?,
1037                            &text,
1038                            Some(*meta.sha3_256_of_signed()),
1039                        )?;
1040                        new_consensus.check_digest()?;
1041                        return Ok(new_consensus.to_string());
1042                    }
1043                }
1044                return Err(Error::Unwanted(
1045                    "Received a consensus diff we did not ask for",
1046                ));
1047            }
1048        }
1049        Ok(text)
1050    }
1051
1052    /// If `state` has netdir changes to apply, apply them to our netdir.
1053    #[allow(clippy::cognitive_complexity)]
1054    fn apply_netdir_changes(
1055        self: &Arc<Self>,
1056        state: &mut Box<dyn DirState>,
1057        store: &mut dyn Store,
1058    ) -> Result<()> {
1059        if let Some(change) = state.get_netdir_change() {
1060            match change {
1061                NetDirChange::AttemptReplace {
1062                    netdir,
1063                    consensus_meta,
1064                } => {
1065                    // Check the new netdir is sufficient, if we have a circmgr.
1066                    // (Unwraps are fine because the `Option` is `Some` until we take it.)
1067                    if let Some(ref cm) = self.circmgr {
1068                        if !cm
1069                            .netdir_is_sufficient(netdir.as_ref().expect("AttemptReplace had None"))
1070                        {
1071                            debug!("Got a new NetDir, but it doesn't have enough guards yet.");
1072                            return Ok(());
1073                        }
1074                    }
1075                    let is_stale = {
1076                        // Done inside a block to not hold a long-lived copy of the NetDir.
1077                        self.netdir
1078                            .get()
1079                            .map(|x| {
1080                                x.lifetime().valid_after()
1081                                    > netdir
1082                                        .as_ref()
1083                                        .expect("AttemptReplace had None")
1084                                        .lifetime()
1085                                        .valid_after()
1086                            })
1087                            .unwrap_or(false)
1088                    };
1089                    if is_stale {
1090                        warn!("Got a new NetDir, but it's older than the one we currently have!");
1091                        return Err(Error::NetDirOlder);
1092                    }
1093                    let cfg = self.config.get();
1094                    let mut netdir = netdir.take().expect("AttemptReplace had None");
1095                    netdir.replace_overridden_parameters(&cfg.override_net_params);
1096                    self.netdir.replace(netdir);
1097                    self.events.publish(DirEvent::NewConsensus);
1098                    self.events.publish(DirEvent::NewDescriptors);
1099
1100                    info!("Marked consensus usable.");
1101                    if !store.is_readonly() {
1102                        store.mark_consensus_usable(consensus_meta)?;
1103                        // Now that a consensus is usable, older consensuses may
1104                        // need to expire.
1105                        store.expire_all(&crate::storage::EXPIRATION_DEFAULTS)?;
1106                    }
1107                    Ok(())
1108                }
1109                NetDirChange::AddMicrodescs(mds) => {
1110                    self.netdir.mutate(|netdir| {
1111                        for md in mds.drain(..) {
1112                            netdir.add_microdesc(md);
1113                        }
1114                        Ok(())
1115                    })?;
1116                    self.events.publish(DirEvent::NewDescriptors);
1117                    Ok(())
1118                }
1119                NetDirChange::SetRequiredProtocol { timestamp, protos } => {
1120                    if !store.is_readonly() {
1121                        store.update_protocol_recommendations(timestamp, protos.as_ref())?;
1122                    }
1123                    let mut pr = self.protocols.lock().expect("Poisoned lock");
1124                    *pr = Some((timestamp, protos));
1125                    self.events.publish(DirEvent::NewProtocolRecommendation);
1126                    Ok(())
1127                }
1128            }
1129        } else {
1130            Ok(())
1131        }
1132    }
1133}
1134
1135/// A degree of readiness for a given directory state object.
1136#[derive(Debug, Copy, Clone)]
1137enum Readiness {
1138    /// There is no more information to download.
1139    Complete,
1140    /// There is more information to download, but we don't need to
1141    Usable,
1142}
1143
1144/// Try to upgrade a weak reference to a DirMgr, and give an error on
1145/// failure.
1146fn upgrade_weak_ref<T>(weak: &Weak<T>) -> Result<Arc<T>> {
1147    Weak::upgrade(weak).ok_or(Error::ManagerDropped)
1148}
1149
1150/// Given a time `now`, and an amount of tolerated clock skew `tolerance`,
1151/// return the age of the oldest consensus that we should request at that time.
1152pub(crate) fn default_consensus_cutoff(
1153    now: SystemTime,
1154    tolerance: &DirTolerance,
1155) -> Result<SystemTime> {
1156    /// We _always_ allow at least this much age in our consensuses, to account
1157    /// for the fact that consensuses have some lifetime.
1158    const MIN_AGE_TO_ALLOW: Duration = Duration::from_secs(3 * 3600);
1159    let allow_skew = std::cmp::max(MIN_AGE_TO_ALLOW, tolerance.post_valid_tolerance());
1160    let cutoff = time::OffsetDateTime::from(now - allow_skew);
1161    // We now round cutoff to the next hour, so that we aren't leaking our exact
1162    // time to the directory cache.
1163    //
1164    // With the time crate, it's easier to calculate the "next hour" by rounding
1165    // _down_ then adding an hour; rounding up would sometimes require changing
1166    // the date too.
1167    let (h, _m, _s) = cutoff.to_hms();
1168    let cutoff = cutoff.replace_time(
1169        time::Time::from_hms(h, 0, 0)
1170            .map_err(tor_error::into_internal!("Failed clock calculation"))?,
1171    );
1172    let cutoff = cutoff + Duration::from_secs(3600);
1173
1174    Ok(cutoff.into())
1175}
1176
1177/// Return a list of the protocols [supported](tor_protover::doc_supported) by this crate
1178/// when running as a client.
1179pub fn supported_client_protocols() -> tor_protover::Protocols {
1180    use tor_protover::named::*;
1181    // WARNING: REMOVING ELEMENTS FROM THIS LIST CAN BE DANGEROUS!
1182    // SEE [`tor_protover::doc_changing`]
1183    [
1184        //
1185        DIRCACHE_CONSDIFF,
1186    ]
1187    .into_iter()
1188    .collect()
1189}
1190
1191#[cfg(test)]
1192mod test {
1193    // @@ begin test lint list maintained by maint/add_warning @@
1194    #![allow(clippy::bool_assert_comparison)]
1195    #![allow(clippy::clone_on_copy)]
1196    #![allow(clippy::dbg_macro)]
1197    #![allow(clippy::mixed_attributes_style)]
1198    #![allow(clippy::print_stderr)]
1199    #![allow(clippy::print_stdout)]
1200    #![allow(clippy::single_char_pattern)]
1201    #![allow(clippy::unwrap_used)]
1202    #![allow(clippy::unchecked_time_subtraction)]
1203    #![allow(clippy::useless_vec)]
1204    #![allow(clippy::needless_pass_by_value)]
1205    #![allow(clippy::string_slice)] // See arti#2571
1206    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
1207    use super::*;
1208    use crate::docmeta::{AuthCertMeta, ConsensusMeta};
1209    use std::time::Duration;
1210    use tempfile::TempDir;
1211    use tor_basic_utils::test_rng::testing_rng;
1212    use tor_netdoc::doc::netstatus::ConsensusFlavor;
1213    use tor_netdoc::doc::{authcert::AuthCertKeyIds, netstatus::Lifetime};
1214    use tor_rtcompat::SleepProvider;
1215
1216    #[test]
1217    fn protocols() {
1218        let pr = supported_client_protocols();
1219        let expected = "DirCache=2".parse().unwrap();
1220        assert_eq!(pr, expected);
1221    }
1222
1223    pub(crate) fn new_mgr<R: Runtime>(runtime: R) -> (TempDir, DirMgr<R>) {
1224        let dir = TempDir::new().unwrap();
1225        let config = DirMgrConfig {
1226            cache_dir: dir.path().into(),
1227            ..Default::default()
1228        };
1229        let store = DirMgrStore::new(&config, runtime.clone(), false).unwrap();
1230        let dirmgr = DirMgr::from_config(config, runtime, store, None, false).unwrap();
1231
1232        (dir, dirmgr)
1233    }
1234
1235    #[test]
1236    fn failing_accessors() {
1237        tor_rtcompat::test_with_one_runtime!(|rt| async {
1238            let (_tempdir, mgr) = new_mgr(rt);
1239
1240            assert!(mgr.circmgr().is_err());
1241            assert!(mgr.netdir(Timeliness::Unchecked).is_err());
1242        });
1243    }
1244
1245    #[test]
1246    fn load_and_store_internals() {
1247        tor_rtcompat::test_with_one_runtime!(|rt| async {
1248            let now = rt.wallclock();
1249            let tomorrow = now + Duration::from_secs(86400);
1250            let later = tomorrow + Duration::from_secs(86400);
1251
1252            let (_tempdir, mgr) = new_mgr(rt);
1253
1254            // Seed the storage with a bunch of junk.
1255            let d1 = [5_u8; 32];
1256            let d2 = [7; 32];
1257            let d3 = [42; 32];
1258            let d4 = [99; 20];
1259            let d5 = [12; 20];
1260            let certid1 = AuthCertKeyIds {
1261                id_fingerprint: d4.into(),
1262                sk_fingerprint: d5.into(),
1263            };
1264            let certid2 = AuthCertKeyIds {
1265                id_fingerprint: d5.into(),
1266                sk_fingerprint: d4.into(),
1267            };
1268
1269            {
1270                let mut store = mgr.store.lock().unwrap();
1271
1272                store
1273                    .store_microdescs(
1274                        &[
1275                            ("Fake micro 1", &d1),
1276                            ("Fake micro 2", &d2),
1277                            ("Fake micro 3", &d3),
1278                        ],
1279                        now,
1280                    )
1281                    .unwrap();
1282
1283                #[cfg(feature = "routerdesc")]
1284                store
1285                    .store_routerdescs(&[("Fake rd1", now, &d4), ("Fake rd2", now, &d5)])
1286                    .unwrap();
1287
1288                store
1289                    .store_authcerts(&[
1290                        (
1291                            AuthCertMeta::new(certid1, now, tomorrow),
1292                            "Fake certificate one",
1293                        ),
1294                        (
1295                            AuthCertMeta::new(certid2, now, tomorrow),
1296                            "Fake certificate two",
1297                        ),
1298                    ])
1299                    .unwrap();
1300
1301                let cmeta = ConsensusMeta::new(
1302                    Lifetime::new(now, tomorrow, later).unwrap(),
1303                    [102; 32],
1304                    [103; 32],
1305                );
1306                store
1307                    .store_consensus(&cmeta, ConsensusFlavor::Microdesc, false, "Fake consensus!")
1308                    .unwrap();
1309            }
1310
1311            // Try to get it with text().
1312            let t1 = mgr.text(&DocId::Microdesc(d1)).unwrap().unwrap();
1313            assert_eq!(t1.as_str(), Ok("Fake micro 1"));
1314
1315            let t2 = mgr
1316                .text(&DocId::LatestConsensus {
1317                    flavor: ConsensusFlavor::Microdesc,
1318                    cache_usage: CacheUsage::CacheOkay,
1319                })
1320                .unwrap()
1321                .unwrap();
1322            assert_eq!(t2.as_str(), Ok("Fake consensus!"));
1323
1324            let t3 = mgr.text(&DocId::Microdesc([255; 32])).unwrap();
1325            assert!(t3.is_none());
1326
1327            // Now try texts()
1328            let d_bogus = DocId::Microdesc([255; 32]);
1329            let res = mgr
1330                .texts(vec![
1331                    DocId::Microdesc(d2),
1332                    DocId::Microdesc(d3),
1333                    d_bogus,
1334                    DocId::AuthCert(certid2),
1335                    #[cfg(feature = "routerdesc")]
1336                    DocId::RouterDesc(d5),
1337                ])
1338                .unwrap();
1339            assert_eq!(
1340                res.get(&DocId::Microdesc(d2)).unwrap().as_str(),
1341                Ok("Fake micro 2")
1342            );
1343            assert_eq!(
1344                res.get(&DocId::Microdesc(d3)).unwrap().as_str(),
1345                Ok("Fake micro 3")
1346            );
1347            assert!(!res.contains_key(&d_bogus));
1348            assert_eq!(
1349                res.get(&DocId::AuthCert(certid2)).unwrap().as_str(),
1350                Ok("Fake certificate two")
1351            );
1352            #[cfg(feature = "routerdesc")]
1353            assert_eq!(
1354                res.get(&DocId::RouterDesc(d5)).unwrap().as_str(),
1355                Ok("Fake rd2")
1356            );
1357        });
1358    }
1359
1360    #[test]
1361    fn make_consensus_request() {
1362        tor_rtcompat::test_with_one_runtime!(|rt| async {
1363            let now = rt.wallclock();
1364            let tomorrow = now + Duration::from_secs(86400);
1365            let later = tomorrow + Duration::from_secs(86400);
1366
1367            let (_tempdir, mgr) = new_mgr(rt);
1368            let config = DirMgrConfig::default();
1369
1370            // Try with an empty store.
1371            let req = {
1372                let store = mgr.store.lock().unwrap();
1373                bootstrap::make_consensus_request(
1374                    now,
1375                    ConsensusFlavor::Microdesc,
1376                    &**store,
1377                    &config,
1378                )
1379                .unwrap()
1380            };
1381            let tolerance = DirTolerance::default().post_valid_tolerance();
1382            match req {
1383                ClientRequest::Consensus(r) => {
1384                    assert_eq!(r.old_consensus_digests().count(), 0);
1385                    let date = r.last_consensus_date().unwrap();
1386                    assert!(date >= now - tolerance);
1387                    assert!(date <= now - tolerance + Duration::from_secs(3600));
1388                }
1389                _ => panic!("Wrong request type"),
1390            }
1391
1392            // Add a fake consensus record.
1393            let d_prev = [42; 32];
1394            {
1395                let mut store = mgr.store.lock().unwrap();
1396
1397                let cmeta = ConsensusMeta::new(
1398                    Lifetime::new(now, tomorrow, later).unwrap(),
1399                    d_prev,
1400                    [103; 32],
1401                );
1402                store
1403                    .store_consensus(&cmeta, ConsensusFlavor::Microdesc, false, "Fake consensus!")
1404                    .unwrap();
1405            }
1406
1407            // Now try again.
1408            let req = {
1409                let store = mgr.store.lock().unwrap();
1410                bootstrap::make_consensus_request(
1411                    now,
1412                    ConsensusFlavor::Microdesc,
1413                    &**store,
1414                    &config,
1415                )
1416                .unwrap()
1417            };
1418            match req {
1419                ClientRequest::Consensus(r) => {
1420                    let ds: Vec<_> = r.old_consensus_digests().collect();
1421                    assert_eq!(ds.len(), 1);
1422                    assert_eq!(ds[0], &d_prev);
1423                    assert_eq!(r.last_consensus_date(), Some(now));
1424                }
1425                _ => panic!("Wrong request type"),
1426            }
1427        });
1428    }
1429
1430    #[test]
1431    fn make_other_requests() {
1432        tor_rtcompat::test_with_one_runtime!(|rt| async {
1433            use rand::RngExt;
1434            let (_tempdir, mgr) = new_mgr(rt);
1435
1436            let certid1 = AuthCertKeyIds {
1437                id_fingerprint: [99; 20].into(),
1438                sk_fingerprint: [100; 20].into(),
1439            };
1440            let mut rng = testing_rng();
1441            #[cfg(feature = "routerdesc")]
1442            let rd_ids: Vec<DocId> = (0..1000).map(|_| DocId::RouterDesc(rng.random())).collect();
1443            let md_ids: Vec<DocId> = (0..1000).map(|_| DocId::Microdesc(rng.random())).collect();
1444            let config = DirMgrConfig::default();
1445
1446            // Try an authcert.
1447            let query = DocId::AuthCert(certid1);
1448            let store = mgr.store.lock().unwrap();
1449            let reqs =
1450                bootstrap::make_requests_for_documents(&mgr.runtime, &[query], &**store, &config)
1451                    .unwrap();
1452            assert_eq!(reqs.len(), 1);
1453            let req = &reqs[0];
1454            if let ClientRequest::AuthCert(r) = req {
1455                assert_eq!(r.keys().next(), Some(&certid1));
1456            } else {
1457                panic!();
1458            }
1459
1460            // Try a bunch of mds.
1461            let reqs =
1462                bootstrap::make_requests_for_documents(&mgr.runtime, &md_ids, &**store, &config)
1463                    .unwrap();
1464            assert_eq!(reqs.len(), 2);
1465            assert!(matches!(reqs[0], ClientRequest::Microdescs(_)));
1466
1467            // Try a bunch of rds.
1468            #[cfg(feature = "routerdesc")]
1469            {
1470                let reqs = bootstrap::make_requests_for_documents(
1471                    &mgr.runtime,
1472                    &rd_ids,
1473                    &**store,
1474                    &config,
1475                )
1476                .unwrap();
1477                assert_eq!(reqs.len(), 2);
1478                assert!(matches!(reqs[0], ClientRequest::RouterDescs(_)));
1479            }
1480        });
1481    }
1482
1483    #[test]
1484    fn expand_response() {
1485        tor_rtcompat::test_with_one_runtime!(|rt| async {
1486            let now = rt.wallclock();
1487            let day = Duration::from_secs(86400);
1488            let config = DirMgrConfig::default();
1489
1490            let (_tempdir, mgr) = new_mgr(rt);
1491
1492            // Try a simple request: nothing should happen.
1493            let q = DocId::Microdesc([99; 32]);
1494            let r = {
1495                let store = mgr.store.lock().unwrap();
1496                bootstrap::make_requests_for_documents(&mgr.runtime, &[q], &**store, &config)
1497                    .unwrap()
1498            };
1499            let expanded = mgr.expand_response_text(&r[0], "ABC".to_string());
1500            assert_eq!(&expanded.unwrap(), "ABC");
1501
1502            // Try a consensus response that doesn't look like a diff in
1503            // response to a query that doesn't ask for one.
1504            let latest_id = DocId::LatestConsensus {
1505                flavor: ConsensusFlavor::Microdesc,
1506                cache_usage: CacheUsage::CacheOkay,
1507            };
1508            let r = {
1509                let store = mgr.store.lock().unwrap();
1510                bootstrap::make_requests_for_documents(
1511                    &mgr.runtime,
1512                    &[latest_id],
1513                    &**store,
1514                    &config,
1515                )
1516                .unwrap()
1517            };
1518            let expanded = mgr.expand_response_text(&r[0], "DEF".to_string());
1519            assert_eq!(&expanded.unwrap(), "DEF");
1520
1521            // Now stick some metadata and a string into the storage so that
1522            // we can ask for a diff.
1523            {
1524                let mut store = mgr.store.lock().unwrap();
1525                let d_in = [0x99; 32]; // This one, we can fake.
1526                let cmeta = ConsensusMeta::new(
1527                    Lifetime::new(now, now + day, now + 2 * day).unwrap(),
1528                    d_in,
1529                    d_in,
1530                );
1531                store
1532                    .store_consensus(
1533                        &cmeta,
1534                        ConsensusFlavor::Microdesc,
1535                        false,
1536                        "line 1\nline2\nline 3\n",
1537                    )
1538                    .unwrap();
1539            }
1540
1541            // Try expanding something that isn't a consensus, even if we'd like
1542            // one.
1543            let r = {
1544                let store = mgr.store.lock().unwrap();
1545                bootstrap::make_requests_for_documents(
1546                    &mgr.runtime,
1547                    &[latest_id],
1548                    &**store,
1549                    &config,
1550                )
1551                .unwrap()
1552            };
1553            let expanded = mgr.expand_response_text(&r[0], "hello".to_string());
1554            assert_eq!(&expanded.unwrap(), "hello");
1555
1556            // Finally, try "expanding" a diff (by applying it and checking the digest.
1557            let diff = "network-status-diff-version 1
1558hash 9999999999999999999999999999999999999999999999999999999999999999 8382374ca766873eb0d2530643191c6eaa2c5e04afa554cbac349b5d0592d300
15592c
1560replacement line
1561.
1562".to_string();
1563            let expanded = mgr.expand_response_text(&r[0], diff);
1564
1565            assert_eq!(expanded.unwrap(), "line 1\nreplacement line\nline 3\n");
1566
1567            // If the digest is wrong, that should get rejected.
1568            let diff = "network-status-diff-version 1
1569hash 9999999999999999999999999999999999999999999999999999999999999999 9999999999999999999999999999999999999999999999999999999999999999
15702c
1571replacement line
1572.
1573".to_string();
1574            let expanded = mgr.expand_response_text(&r[0], diff);
1575            assert!(expanded.is_err());
1576        });
1577    }
1578}