Skip to main content

tor_dirmgr/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![doc = include_str!("../README.md")]
3// @@ begin lint list maintained by maint/add_warning @@
4#![allow(renamed_and_removed_lints)] // @@REMOVE_WHEN(ci_arti_stable)
5#![allow(unknown_lints)] // @@REMOVE_WHEN(ci_arti_nightly)
6#![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_time_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness
39#![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945
41#![allow(clippy::result_large_err)] // temporary workaround for arti#587
42#![allow(clippy::needless_raw_string_hashes)] // complained-about code is fine, often best
43#![allow(clippy::needless_lifetimes)] // See arti#1765
44#![allow(mismatched_lifetime_syntaxes)] // temporary workaround for arti#2060
45#![allow(clippy::collapsible_if)] // See arti#2342
46#![deny(clippy::unused_async)]
47//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
48
49// This clippy lint produces a false positive on `use strum`, below.
50// Attempting to apply the lint to just the use statement fails to suppress
51// this lint and instead produces another lint about a useless clippy attribute.
52#![allow(clippy::single_component_path_imports)]
53
54mod bootstrap;
55pub mod config;
56mod docid;
57mod docmeta;
58mod err;
59mod event;
60mod shared_ref;
61mod state;
62mod storage;
63
64#[cfg(feature = "bridge-client")]
65pub mod bridgedesc;
66#[cfg(feature = "dirfilter")]
67pub mod filter;
68
69use crate::docid::{CacheUsage, ClientRequest, DocQuery};
70use crate::err::BootstrapAction;
71#[cfg(not(feature = "experimental-api"))]
72use crate::shared_ref::SharedMutArc;
73#[cfg(feature = "experimental-api")]
74pub use crate::shared_ref::SharedMutArc;
75use crate::storage::{DynStore, Store};
76use bootstrap::AttemptId;
77use event::DirProgress;
78use postage::watch;
79use scopeguard::ScopeGuard;
80use tor_circmgr::CircMgr;
81use tor_dirclient::SourceInfo;
82use tor_dircommon::config::DirTolerance;
83use tor_error::{info_report, into_internal, warn_report};
84use tor_netdir::params::NetParameters;
85use tor_netdir::{DirEvent, MdReceiver, NetDir, NetDirProvider};
86
87use async_trait::async_trait;
88use futures::stream::BoxStream;
89use oneshot_fused_workaround as oneshot;
90use tor_netdoc::doc::netstatus::ProtoStatuses;
91use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
92use tor_rtcompat::{Runtime, SpawnExt};
93use tracing::{debug, info, instrument, trace, warn};
94use web_time_compat::SystemTimeExt;
95
96use std::marker::PhantomData;
97use std::sync::atomic::{AtomicBool, Ordering};
98use std::sync::{Arc, Mutex};
99use std::time::Duration;
100use std::{collections::HashMap, sync::Weak};
101use std::{fmt::Debug, time::SystemTime};
102
103use crate::state::{DirState, NetDirChange};
104pub use config::DirMgrConfig;
105pub use docid::DocId;
106pub use err::Error;
107pub use event::{DirBlockage, DirBootstrapEvents, DirBootstrapStatus};
108pub use storage::DocumentText;
109pub use tor_dircommon::fallback::{FallbackDir, FallbackDirBuilder};
110pub use tor_netdir::Timeliness;
111
112/// Re-export of `strum` crate for use by an internal macro
113use strum;
114
115/// A Result as returned by this crate.
116pub type Result<T> = std::result::Result<T, Error>;
117
118/// Storage manager used by [`DirMgr`] and
119/// [`BridgeDescMgr`](bridgedesc::BridgeDescMgr)
120///
121/// Internally, this wraps up a sqlite database.
122///
123/// This is a handle, which is cheap to clone; clones share state.
124#[derive(Clone)]
125pub struct DirMgrStore<R: Runtime> {
126    /// The actual store
127    pub(crate) store: Arc<Mutex<crate::DynStore>>,
128
129    /// Be parameterized by Runtime even though we don't use it right now
130    pub(crate) runtime: PhantomData<R>,
131}
132
133impl<R: Runtime> DirMgrStore<R> {
134    /// Open the storage, according to the specified configuration
135    pub fn new(config: &DirMgrConfig, runtime: R, offline: bool) -> Result<Self> {
136        let store = Arc::new(Mutex::new(config.open_store(offline)?));
137        drop(runtime);
138        let runtime = PhantomData;
139        Ok(DirMgrStore { store, runtime })
140    }
141}
142
143/// Trait for DirMgr implementations
144#[async_trait]
145pub trait DirProvider: NetDirProvider {
146    /// Try to change our configuration to `new_config`.
147    ///
148    /// Actual behavior will depend on the value of `how`.
149    fn reconfigure(
150        &self,
151        new_config: &DirMgrConfig,
152        how: tor_config::Reconfigure,
153    ) -> std::result::Result<(), tor_config::ReconfigureError>;
154
155    /// Bootstrap a `DirProvider` that hasn't been bootstrapped yet.
156    async fn bootstrap(&self) -> Result<()>;
157
158    /// Return a stream of [`DirBootstrapStatus`] events to tell us about changes
159    /// in the latest directory's bootstrap status.
160    ///
161    /// Note that this stream can be lossy: the caller will not necessarily
162    /// observe every event on the stream
163    fn bootstrap_events(&self) -> BoxStream<'static, DirBootstrapStatus>;
164
165    /// Return a [`TaskHandle`] that can be used to manage the download process.
166    fn download_task_handle(&self) -> Option<TaskHandle> {
167        None
168    }
169}
170
171// NOTE(eta): We can't implement this for Arc<DirMgr<R>> due to trait coherence rules, so instead
172//            there's a blanket impl for Arc<T> in tor-netdir.
173impl<R: Runtime> NetDirProvider for DirMgr<R> {
174    fn netdir(&self, timeliness: Timeliness) -> tor_netdir::Result<Arc<NetDir>> {
175        use tor_netdir::Error as NetDirError;
176        let netdir = self.netdir.get().ok_or(NetDirError::NoInfo)?;
177        let lifetime = match timeliness {
178            Timeliness::Strict => netdir.lifetime().clone(),
179            Timeliness::Timely => self
180                .config
181                .get()
182                .tolerance
183                .extend_lifetime(netdir.lifetime()),
184            Timeliness::Unchecked => return Ok(netdir),
185        };
186        // TODO #2384 -- we have a runtime here; we should use it.
187        let now = SystemTime::get();
188        if lifetime.valid_after() > now {
189            Err(NetDirError::DirNotYetValid)
190        } else if lifetime.valid_until() < now {
191            Err(NetDirError::DirExpired)
192        } else {
193            Ok(netdir)
194        }
195    }
196
197    fn events(&self) -> BoxStream<'static, DirEvent> {
198        Box::pin(self.events.subscribe())
199    }
200
201    fn params(&self) -> Arc<dyn AsRef<tor_netdir::params::NetParameters>> {
202        if let Some(netdir) = self.netdir.get() {
203            // We have a directory, so we'd like to give it out for its
204            // parameters.
205            //
206            // We do this even if the directory is expired, since parameters
207            // don't really expire on any plausible timescale.
208            netdir
209        } else {
210            // We have no directory, so we'll give out the default parameters as
211            // modified by the provided override_net_params configuration.
212            //
213            self.default_parameters
214                .lock()
215                .expect("Poisoned lock")
216                .clone()
217        }
218        // TODO(nickm): If we felt extremely clever, we could add a third case
219        // where, if we have a pending directory with a validated consensus, we
220        // give out that consensus's network parameters even if we _don't_ yet
221        // have a full directory.  That's significant refactoring, though, for
222        // an unclear amount of benefit.
223    }
224
225    fn protocol_statuses(&self) -> Option<(SystemTime, Arc<ProtoStatuses>)> {
226        self.protocols.lock().expect("Poisoned lock").clone()
227    }
228}
229
230#[async_trait]
231impl<R: Runtime> DirProvider for Arc<DirMgr<R>> {
232    fn reconfigure(
233        &self,
234        new_config: &DirMgrConfig,
235        how: tor_config::Reconfigure,
236    ) -> std::result::Result<(), tor_config::ReconfigureError> {
237        DirMgr::reconfigure(self, new_config, how)
238    }
239
240    #[instrument(level = "trace", skip_all)]
241    async fn bootstrap(&self) -> Result<()> {
242        DirMgr::bootstrap(self).await
243    }
244
245    fn bootstrap_events(&self) -> BoxStream<'static, DirBootstrapStatus> {
246        Box::pin(DirMgr::bootstrap_events(self))
247    }
248
249    fn download_task_handle(&self) -> Option<TaskHandle> {
250        Some(self.task_handle.clone())
251    }
252}
253
254/// A directory manager to download, fetch, and cache a Tor directory.
255///
256/// A DirMgr can operate in three modes:
257///   * In **offline** mode, it only reads from the cache, and can
258///     only read once.
259///   * In **read-only** mode, it reads from the cache, but checks
260///     whether it can acquire an associated lock file.  If it can, then
261///     it enters read-write mode.  If not, it checks the cache
262///     periodically for new information.
263///   * In **read-write** mode, it knows that no other process will be
264///     writing to the cache, and it takes responsibility for fetching
265///     data from the network and updating the directory with new
266///     directory information.
267pub struct DirMgr<R: Runtime> {
268    /// Configuration information: where to find directories, how to
269    /// validate them, and so on.
270    config: tor_config::MutCfg<DirMgrConfig>,
271    /// Handle to our sqlite cache.
272    // TODO(nickm): I'd like to use an rwlock, but that's not feasible, since
273    // rusqlite::Connection isn't Sync.
274    // TODO is needed?
275    store: Arc<Mutex<DynStore>>,
276    /// Our latest sufficiently bootstrapped directory, if we have one.
277    ///
278    /// We use the RwLock so that we can give this out to a bunch of other
279    /// users, and replace it once a new directory is bootstrapped.
280    // TODO(eta): Eurgh! This is so many Arcs! (especially considering this
281    //            gets wrapped in an Arc)
282    netdir: Arc<SharedMutArc<NetDir>>,
283
284    /// Our latest set of recommended protocols.
285    protocols: Mutex<Option<(SystemTime, Arc<ProtoStatuses>)>>,
286
287    /// A set of network parameters to hand out when we have no directory.
288    default_parameters: Mutex<Arc<NetParameters>>,
289
290    /// A publisher handle that we notify whenever the consensus changes.
291    events: event::FlagPublisher<DirEvent>,
292
293    /// A publisher handle that we notify whenever our bootstrapping status
294    /// changes.
295    send_status: Mutex<watch::Sender<event::DirBootstrapStatus>>,
296
297    /// A receiver handle that gets notified whenever our bootstrapping status
298    /// changes.
299    ///
300    /// We don't need to keep this drained, since `postage::watch` already knows
301    /// to discard unread events.
302    receive_status: DirBootstrapEvents,
303
304    /// A circuit manager, if this DirMgr supports downloading.
305    circmgr: Option<Arc<CircMgr<R>>>,
306
307    /// Our asynchronous runtime.
308    runtime: R,
309
310    /// Whether or not we're operating in offline mode.
311    offline: bool,
312
313    /// If we're not in offline mode, stores whether or not the `DirMgr` has attempted
314    /// to bootstrap yet or not.
315    ///
316    /// This exists in order to prevent starting two concurrent bootstrap tasks.
317    ///
318    /// (In offline mode, this does nothing.)
319    bootstrap_started: AtomicBool,
320
321    /// A filter that gets applied to directory objects before we use them.
322    #[cfg(feature = "dirfilter")]
323    filter: crate::filter::FilterConfig,
324
325    /// A task schedule that can be used if we're bootstrapping.  If this is
326    /// None, then there's currently a scheduled task in progress.
327    task_schedule: Mutex<Option<TaskSchedule<R>>>,
328
329    /// A task handle that we return to anybody who needs to manage our download process.
330    task_handle: TaskHandle,
331}
332
333/// The possible origins of a document.
334///
335/// Used (for example) to report where we got a document from if it fails to
336/// parse.
337#[derive(Debug, Clone)]
338#[non_exhaustive]
339pub enum DocSource {
340    /// We loaded the document from our cache.
341    LocalCache,
342    /// We fetched the document from a server.
343    DirServer {
344        /// Information about the server we fetched the document from.
345        source: Option<SourceInfo>,
346    },
347}
348
349impl std::fmt::Display for DocSource {
350    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
351        match self {
352            DocSource::LocalCache => write!(f, "local cache"),
353            DocSource::DirServer { source: None } => write!(f, "directory server"),
354            DocSource::DirServer { source: Some(info) } => write!(f, "directory server {}", info),
355        }
356    }
357}
358
359impl<R: Runtime> DirMgr<R> {
360    /// Try to load the directory from disk, without launching any
361    /// kind of update process.
362    ///
363    /// This function runs in **offline** mode: it will give an error
364    /// if the result is not up-to-date, or not fully downloaded.
365    ///
366    /// In general, you shouldn't use this function in a long-running
367    /// program; it's only suitable for command-line or batch tools.
368    // TODO: I wish this function didn't have to be async or take a runtime.
369    pub fn load_once(runtime: R, config: DirMgrConfig) -> Result<Arc<NetDir>> {
370        let store = DirMgrStore::new(&config, runtime.clone(), true)?;
371        let dirmgr = Arc::new(Self::from_config(config, runtime, store, None, true)?);
372
373        // TODO: add some way to return a directory that isn't up-to-date
374        let attempt = AttemptId::next();
375        trace!(%attempt, "Trying to load a full directory from cache");
376        let outcome = dirmgr.load_directory(attempt);
377        trace!(%attempt, "Load result: {outcome:?}");
378        let _success = outcome?;
379
380        dirmgr
381            .netdir(Timeliness::Timely)
382            .map_err(|_| Error::DirectoryNotPresent)
383    }
384
385    /// Return a current netdir, either loading it or bootstrapping it
386    /// as needed.
387    ///
388    /// Like load_once, but will try to bootstrap (or wait for another
389    /// process to bootstrap) if we don't have an up-to-date
390    /// bootstrapped directory.
391    ///
392    /// In general, you shouldn't use this function in a long-running
393    /// program; it's only suitable for command-line or batch tools.
394    pub async fn load_or_bootstrap_once(
395        config: DirMgrConfig,
396        runtime: R,
397        store: DirMgrStore<R>,
398        circmgr: Arc<CircMgr<R>>,
399    ) -> Result<Arc<NetDir>> {
400        let dirmgr = DirMgr::bootstrap_from_config(config, runtime, store, circmgr).await?;
401        dirmgr
402            .timely_netdir()
403            .map_err(|_| Error::DirectoryNotPresent)
404    }
405
406    /// Create a new `DirMgr` in online mode, but don't bootstrap it yet.
407    ///
408    /// The `DirMgr` can be bootstrapped later with `bootstrap`.
409    pub fn create_unbootstrapped(
410        config: DirMgrConfig,
411        runtime: R,
412        store: DirMgrStore<R>,
413        circmgr: Arc<CircMgr<R>>,
414    ) -> Result<Arc<Self>> {
415        Ok(Arc::new(DirMgr::from_config(
416            config,
417            runtime,
418            store,
419            Some(circmgr),
420            false,
421        )?))
422    }
423
424    /// Bootstrap a `DirMgr` created in online mode that hasn't been bootstrapped yet.
425    ///
426    /// This function will not return until the directory is bootstrapped enough to build circuits.
427    /// It will also launch a background task that fetches any missing information, and that
428    /// replaces the directory when a new one is available.
429    ///
430    /// This function is intended to be used together with `create_unbootstrapped`. There is no
431    /// need to call this function otherwise.
432    ///
433    /// If bootstrapping has already successfully taken place, returns early with success.
434    ///
435    /// # Errors
436    ///
437    /// Returns an error if bootstrapping fails. If the error is [`Error::CantAdvanceState`],
438    /// it may be possible to successfully bootstrap later on by calling this function again.
439    ///
440    /// # Panics
441    ///
442    /// Panics if the `DirMgr` passed to this function was not created in online mode, such as
443    /// via `load_once`.
444    #[allow(clippy::cognitive_complexity)] // TODO: Refactor
445    #[instrument(level = "trace", skip_all)]
446    pub async fn bootstrap(self: &Arc<Self>) -> Result<()> {
447        if self.offline {
448            return Err(Error::OfflineMode);
449        }
450
451        // The semantics of this are "attempt to replace a 'false' value with 'true'.
452        // If the value in bootstrap_started was not 'false' when the attempt was made, returns
453        // `Err`; this means another bootstrap attempt is in progress or has completed, so we
454        // return early.
455
456        // NOTE(eta): could potentially weaken the `Ordering` here in future
457        if self
458            .bootstrap_started
459            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
460            .is_err()
461        {
462            debug!("Attempted to bootstrap twice; ignoring.");
463            return Ok(());
464        }
465
466        // Use a RAII guard to reset `bootstrap_started` to `false` if we return early without
467        // completing bootstrap.
468        let reset_bootstrap_started = scopeguard::guard(&self.bootstrap_started, |v| {
469            v.store(false, Ordering::SeqCst);
470        });
471
472        let schedule = {
473            let sched = self.task_schedule.lock().expect("poisoned lock").take();
474            match sched {
475                Some(sched) => sched,
476                None => {
477                    debug!("Attempted to bootstrap twice; ignoring.");
478                    return Ok(());
479                }
480            }
481        };
482
483        // Try to load from the cache.
484        let attempt_id = AttemptId::next();
485        trace!(attempt=%attempt_id, "Starting to bootstrap directory");
486        let have_directory = self.load_directory(attempt_id)?;
487
488        let (mut sender, receiver) = if have_directory {
489            info!("Loaded a good directory from cache.");
490            (None, None)
491        } else {
492            info!("Didn't get usable directory from cache.");
493            let (sender, receiver) = oneshot::channel();
494            (Some(sender), Some(receiver))
495        };
496
497        // Whether we loaded or not, we now start downloading.
498        let dirmgr_weak = Arc::downgrade(self);
499        self.runtime
500            .spawn(async move {
501                // Use an RAII guard to make sure that when this task exits, the
502                // TaskSchedule object is put back.
503                //
504                // TODO(nick): Putting the schedule back isn't actually useful
505                // if the task exits _after_ we've bootstrapped for the first
506                // time, because of how bootstrap_started works.
507                let mut schedule = scopeguard::guard(schedule, |schedule| {
508                    if let Some(dm) = Weak::upgrade(&dirmgr_weak) {
509                        *dm.task_schedule.lock().expect("poisoned lock") = Some(schedule);
510                    }
511                });
512
513                // Don't warn when these are Error::ManagerDropped: that
514                // means that the DirMgr has been shut down.
515                if let Err(e) =
516                    Self::reload_until_owner(&dirmgr_weak, &mut schedule, attempt_id, &mut sender)
517                        .await
518                {
519                    match e {
520                        Error::ManagerDropped => {}
521                        _ => warn_report!(e, "Unrecovered error while waiting for bootstrap",),
522                    }
523                } else if let Err(e) =
524                    Self::download_forever(dirmgr_weak.clone(), &mut schedule, attempt_id, sender)
525                        .await
526                {
527                    match e {
528                        Error::ManagerDropped => {}
529                        _ => warn_report!(e, "Unrecovered error while downloading"),
530                    }
531                }
532            })
533            .map_err(|e| Error::from_spawn("directory updater task", e))?;
534
535        if let Some(receiver) = receiver {
536            match receiver.await {
537                Ok(()) => {
538                    info!("We have enough information to build circuits.");
539                    // Disarm the RAII guard, since we succeeded.  Now bootstrap_started will remain true.
540                    let _ = ScopeGuard::into_inner(reset_bootstrap_started);
541                }
542                Err(_) => {
543                    warn!("Bootstrapping task exited before finishing.");
544                    return Err(Error::CantAdvanceState);
545                }
546            }
547        }
548        Ok(())
549    }
550
551    /// Returns `true` if a bootstrap attempt is in progress, or successfully completed.
552    pub fn bootstrap_started(&self) -> bool {
553        self.bootstrap_started.load(Ordering::SeqCst)
554    }
555
556    /// Return a new directory manager from a given configuration,
557    /// bootstrapping from the network as necessary.
558    #[instrument(level = "trace", skip_all)]
559    pub async fn bootstrap_from_config(
560        config: DirMgrConfig,
561        runtime: R,
562        store: DirMgrStore<R>,
563        circmgr: Arc<CircMgr<R>>,
564    ) -> Result<Arc<Self>> {
565        let dirmgr = Self::create_unbootstrapped(config, runtime, store, circmgr)?;
566
567        dirmgr.bootstrap().await?;
568
569        Ok(dirmgr)
570    }
571
572    /// Try forever to either lock the storage (and thereby become the
573    /// owner), or to reload the database.
574    ///
575    /// If we have begin to have a bootstrapped directory, send a
576    /// message using `on_complete`.
577    ///
578    /// If we eventually become the owner, return Ok().
579    #[allow(clippy::cognitive_complexity)] // TODO: Refactor?
580    async fn reload_until_owner(
581        weak: &Weak<Self>,
582        schedule: &mut TaskSchedule<R>,
583        attempt_id: AttemptId,
584        on_complete: &mut Option<oneshot::Sender<()>>,
585    ) -> Result<()> {
586        let mut logged = false;
587        let mut bootstrapped;
588        {
589            let dirmgr = upgrade_weak_ref(weak)?;
590            bootstrapped = dirmgr.netdir.get().is_some();
591        }
592
593        loop {
594            {
595                let dirmgr = upgrade_weak_ref(weak)?;
596                trace!("Trying to take ownership of the directory cache lock");
597                if dirmgr.try_upgrade_to_readwrite()? {
598                    // We now own the lock!  (Maybe we owned it before; the
599                    // upgrade_to_readwrite() function is idempotent.)  We can
600                    // do our own bootstrapping.
601                    if logged {
602                        info!(
603                            "The previous owning process has given up the lock. We are now in charge of managing the directory."
604                        );
605                    }
606                    return Ok(());
607                }
608            }
609
610            if !logged {
611                logged = true;
612                if bootstrapped {
613                    info!("Another process is managing the directory. We'll use its cache.");
614                } else {
615                    info!(
616                        "Another process is bootstrapping the directory. Waiting till it finishes or exits."
617                    );
618                }
619            }
620
621            // We don't own the lock.  Somebody else owns the cache.  They
622            // should be updating it.  Wait a bit, then try again.
623            let pause = if bootstrapped {
624                std::time::Duration::new(120, 0)
625            } else {
626                std::time::Duration::new(5, 0)
627            };
628            schedule.sleep(pause).await?;
629            // TODO: instead of loading the whole thing we should have a
630            // database entry that says when the last update was, or use
631            // our state functions.
632            {
633                let dirmgr = upgrade_weak_ref(weak)?;
634                trace!("Trying to load from the directory cache");
635                if dirmgr.load_directory(attempt_id)? {
636                    // Successfully loaded a bootstrapped directory.
637                    if let Some(send_done) = on_complete.take() {
638                        let _ = send_done.send(());
639                    }
640                    if !bootstrapped {
641                        info!("The directory is now bootstrapped.");
642                    }
643                    bootstrapped = true;
644                }
645            }
646        }
647    }
648
649    /// Try to fetch our directory info and keep it updated, indefinitely.
650    ///
651    /// If we have begin to have a bootstrapped directory, send a
652    /// message using `on_complete`.
653    #[allow(clippy::cognitive_complexity)] // TODO: Refactor?
654    #[instrument(level = "trace", skip_all)]
655    async fn download_forever(
656        weak: Weak<Self>,
657        schedule: &mut TaskSchedule<R>,
658        mut attempt_id: AttemptId,
659        mut on_complete: Option<oneshot::Sender<()>>,
660    ) -> Result<()> {
661        let mut state: Box<dyn DirState> = {
662            let dirmgr = upgrade_weak_ref(&weak)?;
663            Box::new(state::GetConsensusState::new(
664                dirmgr.runtime.clone(),
665                dirmgr.config.get(),
666                CacheUsage::CacheOkay,
667                Some(dirmgr.netdir.clone()),
668                #[cfg(feature = "dirfilter")]
669                dirmgr
670                    .filter
671                    .clone()
672                    .unwrap_or_else(|| Arc::new(crate::filter::NilFilter)),
673            ))
674        };
675
676        trace!("Entering download loop.");
677
678        loop {
679            let mut usable = false;
680
681            let retry_config = {
682                let dirmgr = upgrade_weak_ref(&weak)?;
683                // TODO(nickm): instead of getting this every time we loop, it
684                // might be a good idea to refresh it with each attempt, at
685                // least at the point of checking the number of attempts.
686                dirmgr.config.get().schedule.retry_bootstrap()
687            };
688            let mut retry_delay = retry_config.schedule();
689
690            'retry_attempt: for try_num in retry_config.attempts() {
691                trace!(attempt=%attempt_id, ?try_num, "Trying to download a directory.");
692                let outcome = bootstrap::download(
693                    Weak::clone(&weak),
694                    &mut state,
695                    schedule,
696                    attempt_id,
697                    &mut on_complete,
698                )
699                .await;
700                trace!(attempt=%attempt_id, ?try_num, ?outcome, "Download is over.");
701
702                if let Err(err) = outcome {
703                    if state.is_ready(Readiness::Usable) {
704                        usable = true;
705                        info_report!(
706                            err,
707                            "Unable to completely download a directory. (Nevertheless, the directory is usable, so we'll pause for now)"
708                        );
709                        break 'retry_attempt;
710                    }
711
712                    match err.bootstrap_action() {
713                        BootstrapAction::Nonfatal => {
714                            return Err(into_internal!(
715                                "Nonfatal error should not have propagated here"
716                            )(err)
717                            .into());
718                        }
719                        BootstrapAction::Reset => {}
720                        BootstrapAction::Fatal => return Err(err),
721                    }
722
723                    let delay = retry_delay.next_delay(&mut rand::rng());
724                    warn_report!(
725                        err,
726                        "Unable to download a usable directory. (We will restart in {})",
727                        humantime::format_duration(delay),
728                    );
729                    {
730                        let dirmgr = upgrade_weak_ref(&weak)?;
731                        dirmgr.note_reset(attempt_id);
732                    }
733                    schedule.sleep(delay).await?;
734                    state = state.reset();
735                } else {
736                    info!(attempt=%attempt_id, "Directory is complete.");
737                    usable = true;
738                    break 'retry_attempt;
739                }
740            }
741
742            if !usable {
743                // we ran out of attempts.
744                warn!(
745                    "We failed {} times to bootstrap a directory. We're going to give up.",
746                    retry_config.n_attempts()
747                );
748                return Err(Error::CantAdvanceState);
749            } else {
750                // Report success, if appropriate.
751                if let Some(send_done) = on_complete.take() {
752                    let _ = send_done.send(());
753                }
754            }
755
756            let reset_at = state.reset_time();
757            match reset_at {
758                Some(t) => {
759                    trace!("Sleeping until {}", time::OffsetDateTime::from(t));
760                    schedule.sleep_until_wallclock(t).await?;
761                }
762                None => return Ok(()),
763            }
764            attempt_id = bootstrap::AttemptId::next();
765            trace!(attempt=%attempt_id, "Beginning new attempt to bootstrap directory");
766            state = state.reset();
767        }
768    }
769
770    /// Get a reference to the circuit manager, if we have one.
771    fn circmgr(&self) -> Result<Arc<CircMgr<R>>> {
772        self.circmgr.clone().ok_or(Error::NoDownloadSupport)
773    }
774
775    /// Try to change our configuration to `new_config`.
776    ///
777    /// Actual behavior will depend on the value of `how`.
778    pub fn reconfigure(
779        &self,
780        new_config: &DirMgrConfig,
781        how: tor_config::Reconfigure,
782    ) -> std::result::Result<(), tor_config::ReconfigureError> {
783        let config = self.config.get();
784        // We don't support changing these: doing so basically would require us
785        // to abort all our in-progress downloads, since they might be based on
786        // no-longer-viable information.
787        // NOTE: keep this in sync with the behaviour of `DirMgrConfig::update_from_config`
788        if new_config.cache_dir != config.cache_dir {
789            how.cannot_change("storage.cache_dir")?;
790        }
791        if new_config.cache_trust != config.cache_trust {
792            how.cannot_change("storage.permissions")?;
793        }
794        if new_config.authorities() != config.authorities() {
795            how.cannot_change("network.authorities")?;
796        }
797
798        if how == tor_config::Reconfigure::CheckAllOrNothing {
799            return Ok(());
800        }
801
802        let params_changed = new_config.override_net_params != config.override_net_params;
803
804        self.config
805            .map_and_replace(|cfg| cfg.update_from_config(new_config));
806
807        if params_changed {
808            let _ignore_err = self.netdir.mutate(|netdir| {
809                netdir.replace_overridden_parameters(&new_config.override_net_params);
810                Ok(())
811            });
812            {
813                let mut params = self.default_parameters.lock().expect("lock failed");
814                *params = Arc::new(NetParameters::from_map(&new_config.override_net_params));
815            }
816
817            // (It's okay to ignore the error, since it just means that there
818            // was no current netdir.)
819            self.events.publish(DirEvent::NewConsensus);
820        }
821
822        Ok(())
823    }
824
825    /// Return a stream of [`DirBootstrapStatus`] events to tell us about changes
826    /// in the latest directory's bootstrap status.
827    ///
828    /// Note that this stream can be lossy: the caller will not necessarily
829    /// observe every event on the stream
830    pub fn bootstrap_events(&self) -> event::DirBootstrapEvents {
831        self.receive_status.clone()
832    }
833
834    /// Replace the latest status with `progress` and broadcast to anybody
835    /// watching via a [`DirBootstrapEvents`] stream.
836    fn update_progress(&self, attempt_id: AttemptId, progress: DirProgress) {
837        // TODO(nickm): can I kill off this lock by having something else own the sender?
838        let mut sender = self.send_status.lock().expect("poisoned lock");
839        let mut status = sender.borrow_mut();
840
841        status.update_progress(attempt_id, progress);
842    }
843
844    /// Update our status tracker to note that some number of errors has
845    /// occurred.
846    fn note_errors(&self, attempt_id: AttemptId, n_errors: usize) {
847        if n_errors == 0 {
848            return;
849        }
850        let mut sender = self.send_status.lock().expect("poisoned lock");
851        let mut status = sender.borrow_mut();
852
853        status.note_errors(attempt_id, n_errors);
854    }
855
856    /// Update our status tracker to note that we've needed to reset our download attempt.
857    fn note_reset(&self, attempt_id: AttemptId) {
858        let mut sender = self.send_status.lock().expect("poisoned lock");
859        let mut status = sender.borrow_mut();
860
861        status.note_reset(attempt_id);
862    }
863
864    /// Try to make this a directory manager with read-write access to its
865    /// storage.
866    ///
867    /// Return true if we got the lock, or if we already had it.
868    ///
869    /// Return false if another process has the lock
870    fn try_upgrade_to_readwrite(&self) -> Result<bool> {
871        self.store
872            .lock()
873            .expect("Directory storage lock poisoned")
874            .upgrade_to_readwrite()
875    }
876
877    /// Return a reference to the store, if it is currently read-write.
878    #[cfg(test)]
879    fn store_if_rw(&self) -> Option<&Mutex<DynStore>> {
880        let rw = !self
881            .store
882            .lock()
883            .expect("Directory storage lock poisoned")
884            .is_readonly();
885        // A race-condition is possible here, but I believe it's harmless.
886        if rw { Some(&self.store) } else { None }
887    }
888
889    /// Construct a DirMgr from a DirMgrConfig.
890    ///
891    /// If `offline` is set, opens the SQLite store read-only and sets the offline flag in the
892    /// returned manager.
893    #[allow(clippy::unnecessary_wraps)] // API compat and future-proofing
894    fn from_config(
895        config: DirMgrConfig,
896        runtime: R,
897        store: DirMgrStore<R>,
898        circmgr: Option<Arc<CircMgr<R>>>,
899        offline: bool,
900    ) -> Result<Self> {
901        let netdir = Arc::new(SharedMutArc::new());
902        let events = event::FlagPublisher::new();
903        let default_parameters = NetParameters::from_map(&config.override_net_params);
904        let default_parameters = Mutex::new(Arc::new(default_parameters));
905
906        let (send_status, receive_status) = postage::watch::channel();
907        let send_status = Mutex::new(send_status);
908        let receive_status = DirBootstrapEvents {
909            inner: receive_status,
910        };
911        #[cfg(feature = "dirfilter")]
912        let filter = config.extensions.filter.clone();
913
914        // We create these early so the client code can access task_handle before bootstrap() returns.
915        let (task_schedule, task_handle) = TaskSchedule::new(runtime.clone());
916        let task_schedule = Mutex::new(Some(task_schedule));
917
918        // We load the cached protocol recommendations unconditionally: the caller needs them even
919        // if it does not try to load the reset of the cache.
920        let protocols = {
921            let store = store.store.lock().expect("lock poisoned");
922            store
923                .cached_protocol_recommendations()?
924                .map(|(t, p)| (t, Arc::new(p)))
925        };
926
927        Ok(DirMgr {
928            config: config.into(),
929            store: store.store,
930            netdir,
931            protocols: Mutex::new(protocols),
932            default_parameters,
933            events,
934            send_status,
935            receive_status,
936            circmgr,
937            runtime,
938            offline,
939            bootstrap_started: AtomicBool::new(false),
940            #[cfg(feature = "dirfilter")]
941            filter,
942            task_schedule,
943            task_handle,
944        })
945    }
946
947    /// Load the latest non-pending non-expired directory from the
948    /// cache, if it is newer than the one we have.
949    ///
950    /// Return false if there is no such consensus.
951    fn load_directory(self: &Arc<Self>, attempt_id: AttemptId) -> Result<bool> {
952        let state = state::GetConsensusState::new(
953            self.runtime.clone(),
954            self.config.get(),
955            CacheUsage::CacheOnly,
956            None,
957            #[cfg(feature = "dirfilter")]
958            self.filter
959                .clone()
960                .unwrap_or_else(|| Arc::new(crate::filter::NilFilter)),
961        );
962        let _ = bootstrap::load(self, Box::new(state), attempt_id)?;
963
964        Ok(self.netdir.get().is_some())
965    }
966
967    /// Return a new asynchronous stream that will receive notification
968    /// whenever the consensus has changed.
969    ///
970    /// Multiple events may be batched up into a single item: each time
971    /// this stream yields an event, all you can assume is that the event has
972    /// occurred at least once.
973    pub fn events(&self) -> impl futures::Stream<Item = DirEvent> + use<R> {
974        self.events.subscribe()
975    }
976
977    /// Try to load the text of a single document described by `doc` from
978    /// storage.
979    pub fn text(&self, doc: &DocId) -> Result<Option<DocumentText>> {
980        use itertools::Itertools;
981        let mut result = HashMap::new();
982        let query: DocQuery = (*doc).into();
983        let store = self.store.lock().expect("store lock poisoned");
984        query.load_from_store_into(&mut result, &**store)?;
985        let item = result.into_iter().at_most_one().map_err(|_| {
986            Error::CacheCorruption("Found more than one entry in storage for given docid")
987        })?;
988        if let Some((docid, doctext)) = item {
989            if &docid != doc {
990                return Err(Error::CacheCorruption(
991                    "Item from storage had incorrect docid.",
992                ));
993            }
994            Ok(Some(doctext))
995        } else {
996            Ok(None)
997        }
998    }
999
1000    /// Load the text for a collection of documents.
1001    ///
1002    /// If many of the documents have the same type, this can be more
1003    /// efficient than calling [`text`](Self::text).
1004    pub fn texts<T>(&self, docs: T) -> Result<HashMap<DocId, DocumentText>>
1005    where
1006        T: IntoIterator<Item = DocId>,
1007    {
1008        let partitioned = docid::partition_by_type(docs);
1009        let mut result = HashMap::new();
1010        let store = self.store.lock().expect("store lock poisoned");
1011        for (_, query) in partitioned.into_iter() {
1012            query.load_from_store_into(&mut result, &**store)?;
1013        }
1014        Ok(result)
1015    }
1016
1017    /// Given a request we sent and the response we got from a
1018    /// directory server, see whether we should expand that response
1019    /// into "something larger".
1020    ///
1021    /// Currently, this handles expanding consensus diffs, and nothing
1022    /// else.  We do it at this stage of our downloading operation
1023    /// because it requires access to the store.
1024    fn expand_response_text(&self, req: &ClientRequest, text: String) -> Result<String> {
1025        if let ClientRequest::Consensus(req) = req {
1026            if tor_consdiff::looks_like_diff(&text) {
1027                if let Some(old_d) = req.old_consensus_digests().next() {
1028                    let db_val = {
1029                        let s = self.store.lock().expect("Directory storage lock poisoned");
1030                        s.consensus_by_sha3_digest_of_signed_part(old_d)?
1031                    };
1032                    if let Some((old_consensus, meta)) = db_val {
1033                        info!("Applying a consensus diff");
1034                        let new_consensus = tor_consdiff::apply_diff(
1035                            old_consensus.as_str()?,
1036                            &text,
1037                            Some(*meta.sha3_256_of_signed()),
1038                        )?;
1039                        new_consensus.check_digest()?;
1040                        return Ok(new_consensus.to_string());
1041                    }
1042                }
1043                return Err(Error::Unwanted(
1044                    "Received a consensus diff we did not ask for",
1045                ));
1046            }
1047        }
1048        Ok(text)
1049    }
1050
1051    /// If `state` has netdir changes to apply, apply them to our netdir.
1052    #[allow(clippy::cognitive_complexity)]
1053    fn apply_netdir_changes(
1054        self: &Arc<Self>,
1055        state: &mut Box<dyn DirState>,
1056        store: &mut dyn Store,
1057    ) -> Result<()> {
1058        if let Some(change) = state.get_netdir_change() {
1059            match change {
1060                NetDirChange::AttemptReplace {
1061                    netdir,
1062                    consensus_meta,
1063                } => {
1064                    // Check the new netdir is sufficient, if we have a circmgr.
1065                    // (Unwraps are fine because the `Option` is `Some` until we take it.)
1066                    if let Some(ref cm) = self.circmgr {
1067                        if !cm
1068                            .netdir_is_sufficient(netdir.as_ref().expect("AttemptReplace had None"))
1069                        {
1070                            debug!("Got a new NetDir, but it doesn't have enough guards yet.");
1071                            return Ok(());
1072                        }
1073                    }
1074                    let is_stale = {
1075                        // Done inside a block to not hold a long-lived copy of the NetDir.
1076                        self.netdir
1077                            .get()
1078                            .map(|x| {
1079                                x.lifetime().valid_after()
1080                                    > netdir
1081                                        .as_ref()
1082                                        .expect("AttemptReplace had None")
1083                                        .lifetime()
1084                                        .valid_after()
1085                            })
1086                            .unwrap_or(false)
1087                    };
1088                    if is_stale {
1089                        warn!("Got a new NetDir, but it's older than the one we currently have!");
1090                        return Err(Error::NetDirOlder);
1091                    }
1092                    let cfg = self.config.get();
1093                    let mut netdir = netdir.take().expect("AttemptReplace had None");
1094                    netdir.replace_overridden_parameters(&cfg.override_net_params);
1095                    self.netdir.replace(netdir);
1096                    self.events.publish(DirEvent::NewConsensus);
1097                    self.events.publish(DirEvent::NewDescriptors);
1098
1099                    info!("Marked consensus usable.");
1100                    if !store.is_readonly() {
1101                        store.mark_consensus_usable(consensus_meta)?;
1102                        // Now that a consensus is usable, older consensuses may
1103                        // need to expire.
1104                        store.expire_all(&crate::storage::EXPIRATION_DEFAULTS)?;
1105                    }
1106                    Ok(())
1107                }
1108                NetDirChange::AddMicrodescs(mds) => {
1109                    self.netdir.mutate(|netdir| {
1110                        for md in mds.drain(..) {
1111                            netdir.add_microdesc(md);
1112                        }
1113                        Ok(())
1114                    })?;
1115                    self.events.publish(DirEvent::NewDescriptors);
1116                    Ok(())
1117                }
1118                NetDirChange::SetRequiredProtocol { timestamp, protos } => {
1119                    if !store.is_readonly() {
1120                        store.update_protocol_recommendations(timestamp, protos.as_ref())?;
1121                    }
1122                    let mut pr = self.protocols.lock().expect("Poisoned lock");
1123                    *pr = Some((timestamp, protos));
1124                    self.events.publish(DirEvent::NewProtocolRecommendation);
1125                    Ok(())
1126                }
1127            }
1128        } else {
1129            Ok(())
1130        }
1131    }
1132}
1133
1134/// A degree of readiness for a given directory state object.
1135#[derive(Debug, Copy, Clone)]
1136enum Readiness {
1137    /// There is no more information to download.
1138    Complete,
1139    /// There is more information to download, but we don't need to
1140    Usable,
1141}
1142
1143/// Try to upgrade a weak reference to a DirMgr, and give an error on
1144/// failure.
1145fn upgrade_weak_ref<T>(weak: &Weak<T>) -> Result<Arc<T>> {
1146    Weak::upgrade(weak).ok_or(Error::ManagerDropped)
1147}
1148
1149/// Given a time `now`, and an amount of tolerated clock skew `tolerance`,
1150/// return the age of the oldest consensus that we should request at that time.
1151pub(crate) fn default_consensus_cutoff(
1152    now: SystemTime,
1153    tolerance: &DirTolerance,
1154) -> Result<SystemTime> {
1155    /// We _always_ allow at least this much age in our consensuses, to account
1156    /// for the fact that consensuses have some lifetime.
1157    const MIN_AGE_TO_ALLOW: Duration = Duration::from_secs(3 * 3600);
1158    let allow_skew = std::cmp::max(MIN_AGE_TO_ALLOW, tolerance.post_valid_tolerance());
1159    let cutoff = time::OffsetDateTime::from(now - allow_skew);
1160    // We now round cutoff to the next hour, so that we aren't leaking our exact
1161    // time to the directory cache.
1162    //
1163    // With the time crate, it's easier to calculate the "next hour" by rounding
1164    // _down_ then adding an hour; rounding up would sometimes require changing
1165    // the date too.
1166    let (h, _m, _s) = cutoff.to_hms();
1167    let cutoff = cutoff.replace_time(
1168        time::Time::from_hms(h, 0, 0)
1169            .map_err(tor_error::into_internal!("Failed clock calculation"))?,
1170    );
1171    let cutoff = cutoff + Duration::from_secs(3600);
1172
1173    Ok(cutoff.into())
1174}
1175
1176/// Return a list of the protocols [supported](tor_protover::doc_supported) by this crate
1177/// when running as a client.
1178pub fn supported_client_protocols() -> tor_protover::Protocols {
1179    use tor_protover::named::*;
1180    // WARNING: REMOVING ELEMENTS FROM THIS LIST CAN BE DANGEROUS!
1181    // SEE [`tor_protover::doc_changing`]
1182    [
1183        //
1184        DIRCACHE_CONSDIFF,
1185    ]
1186    .into_iter()
1187    .collect()
1188}
1189
1190#[cfg(test)]
1191mod test {
1192    // @@ begin test lint list maintained by maint/add_warning @@
1193    #![allow(clippy::bool_assert_comparison)]
1194    #![allow(clippy::clone_on_copy)]
1195    #![allow(clippy::dbg_macro)]
1196    #![allow(clippy::mixed_attributes_style)]
1197    #![allow(clippy::print_stderr)]
1198    #![allow(clippy::print_stdout)]
1199    #![allow(clippy::single_char_pattern)]
1200    #![allow(clippy::unwrap_used)]
1201    #![allow(clippy::unchecked_time_subtraction)]
1202    #![allow(clippy::useless_vec)]
1203    #![allow(clippy::needless_pass_by_value)]
1204    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
1205    use super::*;
1206    use crate::docmeta::{AuthCertMeta, ConsensusMeta};
1207    use std::time::Duration;
1208    use tempfile::TempDir;
1209    use tor_basic_utils::test_rng::testing_rng;
1210    use tor_netdoc::doc::netstatus::ConsensusFlavor;
1211    use tor_netdoc::doc::{authcert::AuthCertKeyIds, netstatus::Lifetime};
1212    use tor_rtcompat::SleepProvider;
1213
1214    #[test]
1215    fn protocols() {
1216        let pr = supported_client_protocols();
1217        let expected = "DirCache=2".parse().unwrap();
1218        assert_eq!(pr, expected);
1219    }
1220
1221    pub(crate) fn new_mgr<R: Runtime>(runtime: R) -> (TempDir, DirMgr<R>) {
1222        let dir = TempDir::new().unwrap();
1223        let config = DirMgrConfig {
1224            cache_dir: dir.path().into(),
1225            ..Default::default()
1226        };
1227        let store = DirMgrStore::new(&config, runtime.clone(), false).unwrap();
1228        let dirmgr = DirMgr::from_config(config, runtime, store, None, false).unwrap();
1229
1230        (dir, dirmgr)
1231    }
1232
1233    #[test]
1234    fn failing_accessors() {
1235        tor_rtcompat::test_with_one_runtime!(|rt| async {
1236            let (_tempdir, mgr) = new_mgr(rt);
1237
1238            assert!(mgr.circmgr().is_err());
1239            assert!(mgr.netdir(Timeliness::Unchecked).is_err());
1240        });
1241    }
1242
1243    #[test]
1244    fn load_and_store_internals() {
1245        tor_rtcompat::test_with_one_runtime!(|rt| async {
1246            let now = rt.wallclock();
1247            let tomorrow = now + Duration::from_secs(86400);
1248            let later = tomorrow + Duration::from_secs(86400);
1249
1250            let (_tempdir, mgr) = new_mgr(rt);
1251
1252            // Seed the storage with a bunch of junk.
1253            let d1 = [5_u8; 32];
1254            let d2 = [7; 32];
1255            let d3 = [42; 32];
1256            let d4 = [99; 20];
1257            let d5 = [12; 20];
1258            let certid1 = AuthCertKeyIds {
1259                id_fingerprint: d4.into(),
1260                sk_fingerprint: d5.into(),
1261            };
1262            let certid2 = AuthCertKeyIds {
1263                id_fingerprint: d5.into(),
1264                sk_fingerprint: d4.into(),
1265            };
1266
1267            {
1268                let mut store = mgr.store.lock().unwrap();
1269
1270                store
1271                    .store_microdescs(
1272                        &[
1273                            ("Fake micro 1", &d1),
1274                            ("Fake micro 2", &d2),
1275                            ("Fake micro 3", &d3),
1276                        ],
1277                        now,
1278                    )
1279                    .unwrap();
1280
1281                #[cfg(feature = "routerdesc")]
1282                store
1283                    .store_routerdescs(&[("Fake rd1", now, &d4), ("Fake rd2", now, &d5)])
1284                    .unwrap();
1285
1286                store
1287                    .store_authcerts(&[
1288                        (
1289                            AuthCertMeta::new(certid1, now, tomorrow),
1290                            "Fake certificate one",
1291                        ),
1292                        (
1293                            AuthCertMeta::new(certid2, now, tomorrow),
1294                            "Fake certificate two",
1295                        ),
1296                    ])
1297                    .unwrap();
1298
1299                let cmeta = ConsensusMeta::new(
1300                    Lifetime::new(now, tomorrow, later).unwrap(),
1301                    [102; 32],
1302                    [103; 32],
1303                );
1304                store
1305                    .store_consensus(&cmeta, ConsensusFlavor::Microdesc, false, "Fake consensus!")
1306                    .unwrap();
1307            }
1308
1309            // Try to get it with text().
1310            let t1 = mgr.text(&DocId::Microdesc(d1)).unwrap().unwrap();
1311            assert_eq!(t1.as_str(), Ok("Fake micro 1"));
1312
1313            let t2 = mgr
1314                .text(&DocId::LatestConsensus {
1315                    flavor: ConsensusFlavor::Microdesc,
1316                    cache_usage: CacheUsage::CacheOkay,
1317                })
1318                .unwrap()
1319                .unwrap();
1320            assert_eq!(t2.as_str(), Ok("Fake consensus!"));
1321
1322            let t3 = mgr.text(&DocId::Microdesc([255; 32])).unwrap();
1323            assert!(t3.is_none());
1324
1325            // Now try texts()
1326            let d_bogus = DocId::Microdesc([255; 32]);
1327            let res = mgr
1328                .texts(vec![
1329                    DocId::Microdesc(d2),
1330                    DocId::Microdesc(d3),
1331                    d_bogus,
1332                    DocId::AuthCert(certid2),
1333                    #[cfg(feature = "routerdesc")]
1334                    DocId::RouterDesc(d5),
1335                ])
1336                .unwrap();
1337            assert_eq!(
1338                res.get(&DocId::Microdesc(d2)).unwrap().as_str(),
1339                Ok("Fake micro 2")
1340            );
1341            assert_eq!(
1342                res.get(&DocId::Microdesc(d3)).unwrap().as_str(),
1343                Ok("Fake micro 3")
1344            );
1345            assert!(!res.contains_key(&d_bogus));
1346            assert_eq!(
1347                res.get(&DocId::AuthCert(certid2)).unwrap().as_str(),
1348                Ok("Fake certificate two")
1349            );
1350            #[cfg(feature = "routerdesc")]
1351            assert_eq!(
1352                res.get(&DocId::RouterDesc(d5)).unwrap().as_str(),
1353                Ok("Fake rd2")
1354            );
1355        });
1356    }
1357
1358    #[test]
1359    fn make_consensus_request() {
1360        tor_rtcompat::test_with_one_runtime!(|rt| async {
1361            let now = rt.wallclock();
1362            let tomorrow = now + Duration::from_secs(86400);
1363            let later = tomorrow + Duration::from_secs(86400);
1364
1365            let (_tempdir, mgr) = new_mgr(rt);
1366            let config = DirMgrConfig::default();
1367
1368            // Try with an empty store.
1369            let req = {
1370                let store = mgr.store.lock().unwrap();
1371                bootstrap::make_consensus_request(
1372                    now,
1373                    ConsensusFlavor::Microdesc,
1374                    &**store,
1375                    &config,
1376                )
1377                .unwrap()
1378            };
1379            let tolerance = DirTolerance::default().post_valid_tolerance();
1380            match req {
1381                ClientRequest::Consensus(r) => {
1382                    assert_eq!(r.old_consensus_digests().count(), 0);
1383                    let date = r.last_consensus_date().unwrap();
1384                    assert!(date >= now - tolerance);
1385                    assert!(date <= now - tolerance + Duration::from_secs(3600));
1386                }
1387                _ => panic!("Wrong request type"),
1388            }
1389
1390            // Add a fake consensus record.
1391            let d_prev = [42; 32];
1392            {
1393                let mut store = mgr.store.lock().unwrap();
1394
1395                let cmeta = ConsensusMeta::new(
1396                    Lifetime::new(now, tomorrow, later).unwrap(),
1397                    d_prev,
1398                    [103; 32],
1399                );
1400                store
1401                    .store_consensus(&cmeta, ConsensusFlavor::Microdesc, false, "Fake consensus!")
1402                    .unwrap();
1403            }
1404
1405            // Now try again.
1406            let req = {
1407                let store = mgr.store.lock().unwrap();
1408                bootstrap::make_consensus_request(
1409                    now,
1410                    ConsensusFlavor::Microdesc,
1411                    &**store,
1412                    &config,
1413                )
1414                .unwrap()
1415            };
1416            match req {
1417                ClientRequest::Consensus(r) => {
1418                    let ds: Vec<_> = r.old_consensus_digests().collect();
1419                    assert_eq!(ds.len(), 1);
1420                    assert_eq!(ds[0], &d_prev);
1421                    assert_eq!(r.last_consensus_date(), Some(now));
1422                }
1423                _ => panic!("Wrong request type"),
1424            }
1425        });
1426    }
1427
1428    #[test]
1429    fn make_other_requests() {
1430        tor_rtcompat::test_with_one_runtime!(|rt| async {
1431            use rand::Rng;
1432            let (_tempdir, mgr) = new_mgr(rt);
1433
1434            let certid1 = AuthCertKeyIds {
1435                id_fingerprint: [99; 20].into(),
1436                sk_fingerprint: [100; 20].into(),
1437            };
1438            let mut rng = testing_rng();
1439            #[cfg(feature = "routerdesc")]
1440            let rd_ids: Vec<DocId> = (0..1000).map(|_| DocId::RouterDesc(rng.random())).collect();
1441            let md_ids: Vec<DocId> = (0..1000).map(|_| DocId::Microdesc(rng.random())).collect();
1442            let config = DirMgrConfig::default();
1443
1444            // Try an authcert.
1445            let query = DocId::AuthCert(certid1);
1446            let store = mgr.store.lock().unwrap();
1447            let reqs =
1448                bootstrap::make_requests_for_documents(&mgr.runtime, &[query], &**store, &config)
1449                    .unwrap();
1450            assert_eq!(reqs.len(), 1);
1451            let req = &reqs[0];
1452            if let ClientRequest::AuthCert(r) = req {
1453                assert_eq!(r.keys().next(), Some(&certid1));
1454            } else {
1455                panic!();
1456            }
1457
1458            // Try a bunch of mds.
1459            let reqs =
1460                bootstrap::make_requests_for_documents(&mgr.runtime, &md_ids, &**store, &config)
1461                    .unwrap();
1462            assert_eq!(reqs.len(), 2);
1463            assert!(matches!(reqs[0], ClientRequest::Microdescs(_)));
1464
1465            // Try a bunch of rds.
1466            #[cfg(feature = "routerdesc")]
1467            {
1468                let reqs = bootstrap::make_requests_for_documents(
1469                    &mgr.runtime,
1470                    &rd_ids,
1471                    &**store,
1472                    &config,
1473                )
1474                .unwrap();
1475                assert_eq!(reqs.len(), 2);
1476                assert!(matches!(reqs[0], ClientRequest::RouterDescs(_)));
1477            }
1478        });
1479    }
1480
1481    #[test]
1482    fn expand_response() {
1483        tor_rtcompat::test_with_one_runtime!(|rt| async {
1484            let now = rt.wallclock();
1485            let day = Duration::from_secs(86400);
1486            let config = DirMgrConfig::default();
1487
1488            let (_tempdir, mgr) = new_mgr(rt);
1489
1490            // Try a simple request: nothing should happen.
1491            let q = DocId::Microdesc([99; 32]);
1492            let r = {
1493                let store = mgr.store.lock().unwrap();
1494                bootstrap::make_requests_for_documents(&mgr.runtime, &[q], &**store, &config)
1495                    .unwrap()
1496            };
1497            let expanded = mgr.expand_response_text(&r[0], "ABC".to_string());
1498            assert_eq!(&expanded.unwrap(), "ABC");
1499
1500            // Try a consensus response that doesn't look like a diff in
1501            // response to a query that doesn't ask for one.
1502            let latest_id = DocId::LatestConsensus {
1503                flavor: ConsensusFlavor::Microdesc,
1504                cache_usage: CacheUsage::CacheOkay,
1505            };
1506            let r = {
1507                let store = mgr.store.lock().unwrap();
1508                bootstrap::make_requests_for_documents(
1509                    &mgr.runtime,
1510                    &[latest_id],
1511                    &**store,
1512                    &config,
1513                )
1514                .unwrap()
1515            };
1516            let expanded = mgr.expand_response_text(&r[0], "DEF".to_string());
1517            assert_eq!(&expanded.unwrap(), "DEF");
1518
1519            // Now stick some metadata and a string into the storage so that
1520            // we can ask for a diff.
1521            {
1522                let mut store = mgr.store.lock().unwrap();
1523                let d_in = [0x99; 32]; // This one, we can fake.
1524                let cmeta = ConsensusMeta::new(
1525                    Lifetime::new(now, now + day, now + 2 * day).unwrap(),
1526                    d_in,
1527                    d_in,
1528                );
1529                store
1530                    .store_consensus(
1531                        &cmeta,
1532                        ConsensusFlavor::Microdesc,
1533                        false,
1534                        "line 1\nline2\nline 3\n",
1535                    )
1536                    .unwrap();
1537            }
1538
1539            // Try expanding something that isn't a consensus, even if we'd like
1540            // one.
1541            let r = {
1542                let store = mgr.store.lock().unwrap();
1543                bootstrap::make_requests_for_documents(
1544                    &mgr.runtime,
1545                    &[latest_id],
1546                    &**store,
1547                    &config,
1548                )
1549                .unwrap()
1550            };
1551            let expanded = mgr.expand_response_text(&r[0], "hello".to_string());
1552            assert_eq!(&expanded.unwrap(), "hello");
1553
1554            // Finally, try "expanding" a diff (by applying it and checking the digest.
1555            let diff = "network-status-diff-version 1
1556hash 9999999999999999999999999999999999999999999999999999999999999999 8382374ca766873eb0d2530643191c6eaa2c5e04afa554cbac349b5d0592d300
15572c
1558replacement line
1559.
1560".to_string();
1561            let expanded = mgr.expand_response_text(&r[0], diff);
1562
1563            assert_eq!(expanded.unwrap(), "line 1\nreplacement line\nline 3\n");
1564
1565            // If the digest is wrong, that should get rejected.
1566            let diff = "network-status-diff-version 1
1567hash 9999999999999999999999999999999999999999999999999999999999999999 9999999999999999999999999999999999999999999999999999999999999999
15682c
1569replacement line
1570.
1571".to_string();
1572            let expanded = mgr.expand_response_text(&r[0], diff);
1573            assert!(expanded.is_err());
1574        });
1575    }
1576}