Skip to main content

tor_dirmgr/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![doc = include_str!("../README.md")]
3// @@ begin lint list maintained by maint/add_warning @@
4#![allow(renamed_and_removed_lints)] // @@REMOVE_WHEN(ci_arti_stable)
5#![allow(unknown_lints)] // @@REMOVE_WHEN(ci_arti_nightly)
6#![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_time_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness
39#![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945
41#![allow(clippy::result_large_err)] // temporary workaround for arti#587
42#![allow(clippy::needless_raw_string_hashes)] // complained-about code is fine, often best
43#![allow(clippy::needless_lifetimes)] // See arti#1765
44#![allow(mismatched_lifetime_syntaxes)] // temporary workaround for arti#2060
45#![allow(clippy::collapsible_if)] // See arti#2342
46#![deny(clippy::unused_async)]
47//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
48
49// This clippy lint produces a false positive on `use strum`, below.
50// Attempting to apply the lint to just the use statement fails to suppress
51// this lint and instead produces another lint about a useless clippy attribute.
52#![allow(clippy::single_component_path_imports)]
53
54mod bootstrap;
55pub mod config;
56mod docid;
57mod docmeta;
58mod err;
59mod event;
60mod shared_ref;
61mod state;
62mod storage;
63
64#[cfg(feature = "bridge-client")]
65pub mod bridgedesc;
66#[cfg(feature = "dirfilter")]
67pub mod filter;
68
69use crate::docid::{CacheUsage, ClientRequest, DocQuery};
70use crate::err::BootstrapAction;
71#[cfg(not(feature = "experimental-api"))]
72use crate::shared_ref::SharedMutArc;
73#[cfg(feature = "experimental-api")]
74pub use crate::shared_ref::SharedMutArc;
75use crate::storage::{DynStore, Store};
76use bootstrap::AttemptId;
77use event::DirProgress;
78use postage::watch;
79use scopeguard::ScopeGuard;
80use tor_circmgr::CircMgr;
81use tor_dirclient::SourceInfo;
82use tor_dircommon::config::DirTolerance;
83use tor_error::{info_report, into_internal, warn_report};
84use tor_netdir::params::NetParameters;
85use tor_netdir::{DirEvent, MdReceiver, NetDir, NetDirProvider};
86
87use async_trait::async_trait;
88use futures::stream::BoxStream;
89use oneshot_fused_workaround as oneshot;
90use tor_netdoc::doc::netstatus::ProtoStatuses;
91use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
92use tor_rtcompat::{Runtime, SpawnExt};
93use tracing::{debug, info, instrument, trace, warn};
94
95use std::marker::PhantomData;
96use std::sync::atomic::{AtomicBool, Ordering};
97use std::sync::{Arc, Mutex};
98use std::time::Duration;
99use std::{collections::HashMap, sync::Weak};
100use std::{fmt::Debug, time::SystemTime};
101
102use crate::state::{DirState, NetDirChange};
103pub use config::DirMgrConfig;
104pub use docid::DocId;
105pub use err::Error;
106pub use event::{DirBlockage, DirBootstrapEvents, DirBootstrapStatus};
107pub use storage::DocumentText;
108pub use tor_dircommon::fallback::{FallbackDir, FallbackDirBuilder};
109pub use tor_netdir::Timeliness;
110
111/// Re-export of `strum` crate for use by an internal macro
112use strum;
113
114/// A Result as returned by this crate.
115pub type Result<T> = std::result::Result<T, Error>;
116
117/// Storage manager used by [`DirMgr`] and
118/// [`BridgeDescMgr`](bridgedesc::BridgeDescMgr)
119///
120/// Internally, this wraps up a sqlite database.
121///
122/// This is a handle, which is cheap to clone; clones share state.
123#[derive(Clone)]
124pub struct DirMgrStore<R: Runtime> {
125    /// The actual store
126    pub(crate) store: Arc<Mutex<crate::DynStore>>,
127
128    /// Be parameterized by Runtime even though we don't use it right now
129    pub(crate) runtime: PhantomData<R>,
130}
131
132impl<R: Runtime> DirMgrStore<R> {
133    /// Open the storage, according to the specified configuration
134    pub fn new(config: &DirMgrConfig, runtime: R, offline: bool) -> Result<Self> {
135        let store = Arc::new(Mutex::new(config.open_store(offline)?));
136        drop(runtime);
137        let runtime = PhantomData;
138        Ok(DirMgrStore { store, runtime })
139    }
140}
141
142/// Trait for DirMgr implementations
143#[async_trait]
144pub trait DirProvider: NetDirProvider {
145    /// Try to change our configuration to `new_config`.
146    ///
147    /// Actual behavior will depend on the value of `how`.
148    fn reconfigure(
149        &self,
150        new_config: &DirMgrConfig,
151        how: tor_config::Reconfigure,
152    ) -> std::result::Result<(), tor_config::ReconfigureError>;
153
154    /// Bootstrap a `DirProvider` that hasn't been bootstrapped yet.
155    async fn bootstrap(&self) -> Result<()>;
156
157    /// Return a stream of [`DirBootstrapStatus`] events to tell us about changes
158    /// in the latest directory's bootstrap status.
159    ///
160    /// Note that this stream can be lossy: the caller will not necessarily
161    /// observe every event on the stream
162    fn bootstrap_events(&self) -> BoxStream<'static, DirBootstrapStatus>;
163
164    /// Return a [`TaskHandle`] that can be used to manage the download process.
165    fn download_task_handle(&self) -> Option<TaskHandle> {
166        None
167    }
168}
169
170// NOTE(eta): We can't implement this for Arc<DirMgr<R>> due to trait coherence rules, so instead
171//            there's a blanket impl for Arc<T> in tor-netdir.
172impl<R: Runtime> NetDirProvider for DirMgr<R> {
173    fn netdir(&self, timeliness: Timeliness) -> tor_netdir::Result<Arc<NetDir>> {
174        use tor_netdir::Error as NetDirError;
175        let netdir = self.netdir.get().ok_or(NetDirError::NoInfo)?;
176        let lifetime = match timeliness {
177            Timeliness::Strict => netdir.lifetime().clone(),
178            Timeliness::Timely => self
179                .config
180                .get()
181                .tolerance
182                .extend_lifetime(netdir.lifetime()),
183            Timeliness::Unchecked => return Ok(netdir),
184        };
185        let now = SystemTime::now();
186        if lifetime.valid_after() > now {
187            Err(NetDirError::DirNotYetValid)
188        } else if lifetime.valid_until() < now {
189            Err(NetDirError::DirExpired)
190        } else {
191            Ok(netdir)
192        }
193    }
194
195    fn events(&self) -> BoxStream<'static, DirEvent> {
196        Box::pin(self.events.subscribe())
197    }
198
199    fn params(&self) -> Arc<dyn AsRef<tor_netdir::params::NetParameters>> {
200        if let Some(netdir) = self.netdir.get() {
201            // We have a directory, so we'd like to give it out for its
202            // parameters.
203            //
204            // We do this even if the directory is expired, since parameters
205            // don't really expire on any plausible timescale.
206            netdir
207        } else {
208            // We have no directory, so we'll give out the default parameters as
209            // modified by the provided override_net_params configuration.
210            //
211            self.default_parameters
212                .lock()
213                .expect("Poisoned lock")
214                .clone()
215        }
216        // TODO(nickm): If we felt extremely clever, we could add a third case
217        // where, if we have a pending directory with a validated consensus, we
218        // give out that consensus's network parameters even if we _don't_ yet
219        // have a full directory.  That's significant refactoring, though, for
220        // an unclear amount of benefit.
221    }
222
223    fn protocol_statuses(&self) -> Option<(SystemTime, Arc<ProtoStatuses>)> {
224        self.protocols.lock().expect("Poisoned lock").clone()
225    }
226}
227
228#[async_trait]
229impl<R: Runtime> DirProvider for Arc<DirMgr<R>> {
230    fn reconfigure(
231        &self,
232        new_config: &DirMgrConfig,
233        how: tor_config::Reconfigure,
234    ) -> std::result::Result<(), tor_config::ReconfigureError> {
235        DirMgr::reconfigure(self, new_config, how)
236    }
237
238    #[instrument(level = "trace", skip_all)]
239    async fn bootstrap(&self) -> Result<()> {
240        DirMgr::bootstrap(self).await
241    }
242
243    fn bootstrap_events(&self) -> BoxStream<'static, DirBootstrapStatus> {
244        Box::pin(DirMgr::bootstrap_events(self))
245    }
246
247    fn download_task_handle(&self) -> Option<TaskHandle> {
248        Some(self.task_handle.clone())
249    }
250}
251
252/// A directory manager to download, fetch, and cache a Tor directory.
253///
254/// A DirMgr can operate in three modes:
255///   * In **offline** mode, it only reads from the cache, and can
256///     only read once.
257///   * In **read-only** mode, it reads from the cache, but checks
258///     whether it can acquire an associated lock file.  If it can, then
259///     it enters read-write mode.  If not, it checks the cache
260///     periodically for new information.
261///   * In **read-write** mode, it knows that no other process will be
262///     writing to the cache, and it takes responsibility for fetching
263///     data from the network and updating the directory with new
264///     directory information.
265pub struct DirMgr<R: Runtime> {
266    /// Configuration information: where to find directories, how to
267    /// validate them, and so on.
268    config: tor_config::MutCfg<DirMgrConfig>,
269    /// Handle to our sqlite cache.
270    // TODO(nickm): I'd like to use an rwlock, but that's not feasible, since
271    // rusqlite::Connection isn't Sync.
272    // TODO is needed?
273    store: Arc<Mutex<DynStore>>,
274    /// Our latest sufficiently bootstrapped directory, if we have one.
275    ///
276    /// We use the RwLock so that we can give this out to a bunch of other
277    /// users, and replace it once a new directory is bootstrapped.
278    // TODO(eta): Eurgh! This is so many Arcs! (especially considering this
279    //            gets wrapped in an Arc)
280    netdir: Arc<SharedMutArc<NetDir>>,
281
282    /// Our latest set of recommended protocols.
283    protocols: Mutex<Option<(SystemTime, Arc<ProtoStatuses>)>>,
284
285    /// A set of network parameters to hand out when we have no directory.
286    default_parameters: Mutex<Arc<NetParameters>>,
287
288    /// A publisher handle that we notify whenever the consensus changes.
289    events: event::FlagPublisher<DirEvent>,
290
291    /// A publisher handle that we notify whenever our bootstrapping status
292    /// changes.
293    send_status: Mutex<watch::Sender<event::DirBootstrapStatus>>,
294
295    /// A receiver handle that gets notified whenever our bootstrapping status
296    /// changes.
297    ///
298    /// We don't need to keep this drained, since `postage::watch` already knows
299    /// to discard unread events.
300    receive_status: DirBootstrapEvents,
301
302    /// A circuit manager, if this DirMgr supports downloading.
303    circmgr: Option<Arc<CircMgr<R>>>,
304
305    /// Our asynchronous runtime.
306    runtime: R,
307
308    /// Whether or not we're operating in offline mode.
309    offline: bool,
310
311    /// If we're not in offline mode, stores whether or not the `DirMgr` has attempted
312    /// to bootstrap yet or not.
313    ///
314    /// This exists in order to prevent starting two concurrent bootstrap tasks.
315    ///
316    /// (In offline mode, this does nothing.)
317    bootstrap_started: AtomicBool,
318
319    /// A filter that gets applied to directory objects before we use them.
320    #[cfg(feature = "dirfilter")]
321    filter: crate::filter::FilterConfig,
322
323    /// A task schedule that can be used if we're bootstrapping.  If this is
324    /// None, then there's currently a scheduled task in progress.
325    task_schedule: Mutex<Option<TaskSchedule<R>>>,
326
327    /// A task handle that we return to anybody who needs to manage our download process.
328    task_handle: TaskHandle,
329}
330
331/// The possible origins of a document.
332///
333/// Used (for example) to report where we got a document from if it fails to
334/// parse.
335#[derive(Debug, Clone)]
336#[non_exhaustive]
337pub enum DocSource {
338    /// We loaded the document from our cache.
339    LocalCache,
340    /// We fetched the document from a server.
341    DirServer {
342        /// Information about the server we fetched the document from.
343        source: Option<SourceInfo>,
344    },
345}
346
347impl std::fmt::Display for DocSource {
348    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
349        match self {
350            DocSource::LocalCache => write!(f, "local cache"),
351            DocSource::DirServer { source: None } => write!(f, "directory server"),
352            DocSource::DirServer { source: Some(info) } => write!(f, "directory server {}", info),
353        }
354    }
355}
356
357impl<R: Runtime> DirMgr<R> {
358    /// Try to load the directory from disk, without launching any
359    /// kind of update process.
360    ///
361    /// This function runs in **offline** mode: it will give an error
362    /// if the result is not up-to-date, or not fully downloaded.
363    ///
364    /// In general, you shouldn't use this function in a long-running
365    /// program; it's only suitable for command-line or batch tools.
366    // TODO: I wish this function didn't have to be async or take a runtime.
367    pub fn load_once(runtime: R, config: DirMgrConfig) -> Result<Arc<NetDir>> {
368        let store = DirMgrStore::new(&config, runtime.clone(), true)?;
369        let dirmgr = Arc::new(Self::from_config(config, runtime, store, None, true)?);
370
371        // TODO: add some way to return a directory that isn't up-to-date
372        let attempt = AttemptId::next();
373        trace!(%attempt, "Trying to load a full directory from cache");
374        let outcome = dirmgr.load_directory(attempt);
375        trace!(%attempt, "Load result: {outcome:?}");
376        let _success = outcome?;
377
378        dirmgr
379            .netdir(Timeliness::Timely)
380            .map_err(|_| Error::DirectoryNotPresent)
381    }
382
383    /// Return a current netdir, either loading it or bootstrapping it
384    /// as needed.
385    ///
386    /// Like load_once, but will try to bootstrap (or wait for another
387    /// process to bootstrap) if we don't have an up-to-date
388    /// bootstrapped directory.
389    ///
390    /// In general, you shouldn't use this function in a long-running
391    /// program; it's only suitable for command-line or batch tools.
392    pub async fn load_or_bootstrap_once(
393        config: DirMgrConfig,
394        runtime: R,
395        store: DirMgrStore<R>,
396        circmgr: Arc<CircMgr<R>>,
397    ) -> Result<Arc<NetDir>> {
398        let dirmgr = DirMgr::bootstrap_from_config(config, runtime, store, circmgr).await?;
399        dirmgr
400            .timely_netdir()
401            .map_err(|_| Error::DirectoryNotPresent)
402    }
403
404    /// Create a new `DirMgr` in online mode, but don't bootstrap it yet.
405    ///
406    /// The `DirMgr` can be bootstrapped later with `bootstrap`.
407    pub fn create_unbootstrapped(
408        config: DirMgrConfig,
409        runtime: R,
410        store: DirMgrStore<R>,
411        circmgr: Arc<CircMgr<R>>,
412    ) -> Result<Arc<Self>> {
413        Ok(Arc::new(DirMgr::from_config(
414            config,
415            runtime,
416            store,
417            Some(circmgr),
418            false,
419        )?))
420    }
421
422    /// Bootstrap a `DirMgr` created in online mode that hasn't been bootstrapped yet.
423    ///
424    /// This function will not return until the directory is bootstrapped enough to build circuits.
425    /// It will also launch a background task that fetches any missing information, and that
426    /// replaces the directory when a new one is available.
427    ///
428    /// This function is intended to be used together with `create_unbootstrapped`. There is no
429    /// need to call this function otherwise.
430    ///
431    /// If bootstrapping has already successfully taken place, returns early with success.
432    ///
433    /// # Errors
434    ///
435    /// Returns an error if bootstrapping fails. If the error is [`Error::CantAdvanceState`],
436    /// it may be possible to successfully bootstrap later on by calling this function again.
437    ///
438    /// # Panics
439    ///
440    /// Panics if the `DirMgr` passed to this function was not created in online mode, such as
441    /// via `load_once`.
442    #[allow(clippy::cognitive_complexity)] // TODO: Refactor
443    #[instrument(level = "trace", skip_all)]
444    pub async fn bootstrap(self: &Arc<Self>) -> Result<()> {
445        if self.offline {
446            return Err(Error::OfflineMode);
447        }
448
449        // The semantics of this are "attempt to replace a 'false' value with 'true'.
450        // If the value in bootstrap_started was not 'false' when the attempt was made, returns
451        // `Err`; this means another bootstrap attempt is in progress or has completed, so we
452        // return early.
453
454        // NOTE(eta): could potentially weaken the `Ordering` here in future
455        if self
456            .bootstrap_started
457            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
458            .is_err()
459        {
460            debug!("Attempted to bootstrap twice; ignoring.");
461            return Ok(());
462        }
463
464        // Use a RAII guard to reset `bootstrap_started` to `false` if we return early without
465        // completing bootstrap.
466        let reset_bootstrap_started = scopeguard::guard(&self.bootstrap_started, |v| {
467            v.store(false, Ordering::SeqCst);
468        });
469
470        let schedule = {
471            let sched = self.task_schedule.lock().expect("poisoned lock").take();
472            match sched {
473                Some(sched) => sched,
474                None => {
475                    debug!("Attempted to bootstrap twice; ignoring.");
476                    return Ok(());
477                }
478            }
479        };
480
481        // Try to load from the cache.
482        let attempt_id = AttemptId::next();
483        trace!(attempt=%attempt_id, "Starting to bootstrap directory");
484        let have_directory = self.load_directory(attempt_id)?;
485
486        let (mut sender, receiver) = if have_directory {
487            info!("Loaded a good directory from cache.");
488            (None, None)
489        } else {
490            info!("Didn't get usable directory from cache.");
491            let (sender, receiver) = oneshot::channel();
492            (Some(sender), Some(receiver))
493        };
494
495        // Whether we loaded or not, we now start downloading.
496        let dirmgr_weak = Arc::downgrade(self);
497        self.runtime
498            .spawn(async move {
499                // Use an RAII guard to make sure that when this task exits, the
500                // TaskSchedule object is put back.
501                //
502                // TODO(nick): Putting the schedule back isn't actually useful
503                // if the task exits _after_ we've bootstrapped for the first
504                // time, because of how bootstrap_started works.
505                let mut schedule = scopeguard::guard(schedule, |schedule| {
506                    if let Some(dm) = Weak::upgrade(&dirmgr_weak) {
507                        *dm.task_schedule.lock().expect("poisoned lock") = Some(schedule);
508                    }
509                });
510
511                // Don't warn when these are Error::ManagerDropped: that
512                // means that the DirMgr has been shut down.
513                if let Err(e) =
514                    Self::reload_until_owner(&dirmgr_weak, &mut schedule, attempt_id, &mut sender)
515                        .await
516                {
517                    match e {
518                        Error::ManagerDropped => {}
519                        _ => warn_report!(e, "Unrecovered error while waiting for bootstrap",),
520                    }
521                } else if let Err(e) =
522                    Self::download_forever(dirmgr_weak.clone(), &mut schedule, attempt_id, sender)
523                        .await
524                {
525                    match e {
526                        Error::ManagerDropped => {}
527                        _ => warn_report!(e, "Unrecovered error while downloading"),
528                    }
529                }
530            })
531            .map_err(|e| Error::from_spawn("directory updater task", e))?;
532
533        if let Some(receiver) = receiver {
534            match receiver.await {
535                Ok(()) => {
536                    info!("We have enough information to build circuits.");
537                    // Disarm the RAII guard, since we succeeded.  Now bootstrap_started will remain true.
538                    let _ = ScopeGuard::into_inner(reset_bootstrap_started);
539                }
540                Err(_) => {
541                    warn!("Bootstrapping task exited before finishing.");
542                    return Err(Error::CantAdvanceState);
543                }
544            }
545        }
546        Ok(())
547    }
548
549    /// Returns `true` if a bootstrap attempt is in progress, or successfully completed.
550    pub fn bootstrap_started(&self) -> bool {
551        self.bootstrap_started.load(Ordering::SeqCst)
552    }
553
554    /// Return a new directory manager from a given configuration,
555    /// bootstrapping from the network as necessary.
556    #[instrument(level = "trace", skip_all)]
557    pub async fn bootstrap_from_config(
558        config: DirMgrConfig,
559        runtime: R,
560        store: DirMgrStore<R>,
561        circmgr: Arc<CircMgr<R>>,
562    ) -> Result<Arc<Self>> {
563        let dirmgr = Self::create_unbootstrapped(config, runtime, store, circmgr)?;
564
565        dirmgr.bootstrap().await?;
566
567        Ok(dirmgr)
568    }
569
570    /// Try forever to either lock the storage (and thereby become the
571    /// owner), or to reload the database.
572    ///
573    /// If we have begin to have a bootstrapped directory, send a
574    /// message using `on_complete`.
575    ///
576    /// If we eventually become the owner, return Ok().
577    #[allow(clippy::cognitive_complexity)] // TODO: Refactor?
578    async fn reload_until_owner(
579        weak: &Weak<Self>,
580        schedule: &mut TaskSchedule<R>,
581        attempt_id: AttemptId,
582        on_complete: &mut Option<oneshot::Sender<()>>,
583    ) -> Result<()> {
584        let mut logged = false;
585        let mut bootstrapped;
586        {
587            let dirmgr = upgrade_weak_ref(weak)?;
588            bootstrapped = dirmgr.netdir.get().is_some();
589        }
590
591        loop {
592            {
593                let dirmgr = upgrade_weak_ref(weak)?;
594                trace!("Trying to take ownership of the directory cache lock");
595                if dirmgr.try_upgrade_to_readwrite()? {
596                    // We now own the lock!  (Maybe we owned it before; the
597                    // upgrade_to_readwrite() function is idempotent.)  We can
598                    // do our own bootstrapping.
599                    if logged {
600                        info!(
601                            "The previous owning process has given up the lock. We are now in charge of managing the directory."
602                        );
603                    }
604                    return Ok(());
605                }
606            }
607
608            if !logged {
609                logged = true;
610                if bootstrapped {
611                    info!("Another process is managing the directory. We'll use its cache.");
612                } else {
613                    info!(
614                        "Another process is bootstrapping the directory. Waiting till it finishes or exits."
615                    );
616                }
617            }
618
619            // We don't own the lock.  Somebody else owns the cache.  They
620            // should be updating it.  Wait a bit, then try again.
621            let pause = if bootstrapped {
622                std::time::Duration::new(120, 0)
623            } else {
624                std::time::Duration::new(5, 0)
625            };
626            schedule.sleep(pause).await?;
627            // TODO: instead of loading the whole thing we should have a
628            // database entry that says when the last update was, or use
629            // our state functions.
630            {
631                let dirmgr = upgrade_weak_ref(weak)?;
632                trace!("Trying to load from the directory cache");
633                if dirmgr.load_directory(attempt_id)? {
634                    // Successfully loaded a bootstrapped directory.
635                    if let Some(send_done) = on_complete.take() {
636                        let _ = send_done.send(());
637                    }
638                    if !bootstrapped {
639                        info!("The directory is now bootstrapped.");
640                    }
641                    bootstrapped = true;
642                }
643            }
644        }
645    }
646
647    /// Try to fetch our directory info and keep it updated, indefinitely.
648    ///
649    /// If we have begin to have a bootstrapped directory, send a
650    /// message using `on_complete`.
651    #[allow(clippy::cognitive_complexity)] // TODO: Refactor?
652    #[instrument(level = "trace", skip_all)]
653    async fn download_forever(
654        weak: Weak<Self>,
655        schedule: &mut TaskSchedule<R>,
656        mut attempt_id: AttemptId,
657        mut on_complete: Option<oneshot::Sender<()>>,
658    ) -> Result<()> {
659        let mut state: Box<dyn DirState> = {
660            let dirmgr = upgrade_weak_ref(&weak)?;
661            Box::new(state::GetConsensusState::new(
662                dirmgr.runtime.clone(),
663                dirmgr.config.get(),
664                CacheUsage::CacheOkay,
665                Some(dirmgr.netdir.clone()),
666                #[cfg(feature = "dirfilter")]
667                dirmgr
668                    .filter
669                    .clone()
670                    .unwrap_or_else(|| Arc::new(crate::filter::NilFilter)),
671            ))
672        };
673
674        trace!("Entering download loop.");
675
676        loop {
677            let mut usable = false;
678
679            let retry_config = {
680                let dirmgr = upgrade_weak_ref(&weak)?;
681                // TODO(nickm): instead of getting this every time we loop, it
682                // might be a good idea to refresh it with each attempt, at
683                // least at the point of checking the number of attempts.
684                dirmgr.config.get().schedule.retry_bootstrap()
685            };
686            let mut retry_delay = retry_config.schedule();
687
688            'retry_attempt: for try_num in retry_config.attempts() {
689                trace!(attempt=%attempt_id, ?try_num, "Trying to download a directory.");
690                let outcome = bootstrap::download(
691                    Weak::clone(&weak),
692                    &mut state,
693                    schedule,
694                    attempt_id,
695                    &mut on_complete,
696                )
697                .await;
698                trace!(attempt=%attempt_id, ?try_num, ?outcome, "Download is over.");
699
700                if let Err(err) = outcome {
701                    if state.is_ready(Readiness::Usable) {
702                        usable = true;
703                        info_report!(
704                            err,
705                            "Unable to completely download a directory. (Nevertheless, the directory is usable, so we'll pause for now)"
706                        );
707                        break 'retry_attempt;
708                    }
709
710                    match err.bootstrap_action() {
711                        BootstrapAction::Nonfatal => {
712                            return Err(into_internal!(
713                                "Nonfatal error should not have propagated here"
714                            )(err)
715                            .into());
716                        }
717                        BootstrapAction::Reset => {}
718                        BootstrapAction::Fatal => return Err(err),
719                    }
720
721                    let delay = retry_delay.next_delay(&mut rand::rng());
722                    warn_report!(
723                        err,
724                        "Unable to download a usable directory. (We will restart in {})",
725                        humantime::format_duration(delay),
726                    );
727                    {
728                        let dirmgr = upgrade_weak_ref(&weak)?;
729                        dirmgr.note_reset(attempt_id);
730                    }
731                    schedule.sleep(delay).await?;
732                    state = state.reset();
733                } else {
734                    info!(attempt=%attempt_id, "Directory is complete.");
735                    usable = true;
736                    break 'retry_attempt;
737                }
738            }
739
740            if !usable {
741                // we ran out of attempts.
742                warn!(
743                    "We failed {} times to bootstrap a directory. We're going to give up.",
744                    retry_config.n_attempts()
745                );
746                return Err(Error::CantAdvanceState);
747            } else {
748                // Report success, if appropriate.
749                if let Some(send_done) = on_complete.take() {
750                    let _ = send_done.send(());
751                }
752            }
753
754            let reset_at = state.reset_time();
755            match reset_at {
756                Some(t) => {
757                    trace!("Sleeping until {}", time::OffsetDateTime::from(t));
758                    schedule.sleep_until_wallclock(t).await?;
759                }
760                None => return Ok(()),
761            }
762            attempt_id = bootstrap::AttemptId::next();
763            trace!(attempt=%attempt_id, "Beginning new attempt to bootstrap directory");
764            state = state.reset();
765        }
766    }
767
768    /// Get a reference to the circuit manager, if we have one.
769    fn circmgr(&self) -> Result<Arc<CircMgr<R>>> {
770        self.circmgr.clone().ok_or(Error::NoDownloadSupport)
771    }
772
773    /// Try to change our configuration to `new_config`.
774    ///
775    /// Actual behavior will depend on the value of `how`.
776    pub fn reconfigure(
777        &self,
778        new_config: &DirMgrConfig,
779        how: tor_config::Reconfigure,
780    ) -> std::result::Result<(), tor_config::ReconfigureError> {
781        let config = self.config.get();
782        // We don't support changing these: doing so basically would require us
783        // to abort all our in-progress downloads, since they might be based on
784        // no-longer-viable information.
785        // NOTE: keep this in sync with the behaviour of `DirMgrConfig::update_from_config`
786        if new_config.cache_dir != config.cache_dir {
787            how.cannot_change("storage.cache_dir")?;
788        }
789        if new_config.cache_trust != config.cache_trust {
790            how.cannot_change("storage.permissions")?;
791        }
792        if new_config.authorities() != config.authorities() {
793            how.cannot_change("network.authorities")?;
794        }
795
796        if how == tor_config::Reconfigure::CheckAllOrNothing {
797            return Ok(());
798        }
799
800        let params_changed = new_config.override_net_params != config.override_net_params;
801
802        self.config
803            .map_and_replace(|cfg| cfg.update_from_config(new_config));
804
805        if params_changed {
806            let _ignore_err = self.netdir.mutate(|netdir| {
807                netdir.replace_overridden_parameters(&new_config.override_net_params);
808                Ok(())
809            });
810            {
811                let mut params = self.default_parameters.lock().expect("lock failed");
812                *params = Arc::new(NetParameters::from_map(&new_config.override_net_params));
813            }
814
815            // (It's okay to ignore the error, since it just means that there
816            // was no current netdir.)
817            self.events.publish(DirEvent::NewConsensus);
818        }
819
820        Ok(())
821    }
822
823    /// Return a stream of [`DirBootstrapStatus`] events to tell us about changes
824    /// in the latest directory's bootstrap status.
825    ///
826    /// Note that this stream can be lossy: the caller will not necessarily
827    /// observe every event on the stream
828    pub fn bootstrap_events(&self) -> event::DirBootstrapEvents {
829        self.receive_status.clone()
830    }
831
832    /// Replace the latest status with `progress` and broadcast to anybody
833    /// watching via a [`DirBootstrapEvents`] stream.
834    fn update_progress(&self, attempt_id: AttemptId, progress: DirProgress) {
835        // TODO(nickm): can I kill off this lock by having something else own the sender?
836        let mut sender = self.send_status.lock().expect("poisoned lock");
837        let mut status = sender.borrow_mut();
838
839        status.update_progress(attempt_id, progress);
840    }
841
842    /// Update our status tracker to note that some number of errors has
843    /// occurred.
844    fn note_errors(&self, attempt_id: AttemptId, n_errors: usize) {
845        if n_errors == 0 {
846            return;
847        }
848        let mut sender = self.send_status.lock().expect("poisoned lock");
849        let mut status = sender.borrow_mut();
850
851        status.note_errors(attempt_id, n_errors);
852    }
853
854    /// Update our status tracker to note that we've needed to reset our download attempt.
855    fn note_reset(&self, attempt_id: AttemptId) {
856        let mut sender = self.send_status.lock().expect("poisoned lock");
857        let mut status = sender.borrow_mut();
858
859        status.note_reset(attempt_id);
860    }
861
862    /// Try to make this a directory manager with read-write access to its
863    /// storage.
864    ///
865    /// Return true if we got the lock, or if we already had it.
866    ///
867    /// Return false if another process has the lock
868    fn try_upgrade_to_readwrite(&self) -> Result<bool> {
869        self.store
870            .lock()
871            .expect("Directory storage lock poisoned")
872            .upgrade_to_readwrite()
873    }
874
875    /// Return a reference to the store, if it is currently read-write.
876    #[cfg(test)]
877    fn store_if_rw(&self) -> Option<&Mutex<DynStore>> {
878        let rw = !self
879            .store
880            .lock()
881            .expect("Directory storage lock poisoned")
882            .is_readonly();
883        // A race-condition is possible here, but I believe it's harmless.
884        if rw { Some(&self.store) } else { None }
885    }
886
887    /// Construct a DirMgr from a DirMgrConfig.
888    ///
889    /// If `offline` is set, opens the SQLite store read-only and sets the offline flag in the
890    /// returned manager.
891    #[allow(clippy::unnecessary_wraps)] // API compat and future-proofing
892    fn from_config(
893        config: DirMgrConfig,
894        runtime: R,
895        store: DirMgrStore<R>,
896        circmgr: Option<Arc<CircMgr<R>>>,
897        offline: bool,
898    ) -> Result<Self> {
899        let netdir = Arc::new(SharedMutArc::new());
900        let events = event::FlagPublisher::new();
901        let default_parameters = NetParameters::from_map(&config.override_net_params);
902        let default_parameters = Mutex::new(Arc::new(default_parameters));
903
904        let (send_status, receive_status) = postage::watch::channel();
905        let send_status = Mutex::new(send_status);
906        let receive_status = DirBootstrapEvents {
907            inner: receive_status,
908        };
909        #[cfg(feature = "dirfilter")]
910        let filter = config.extensions.filter.clone();
911
912        // We create these early so the client code can access task_handle before bootstrap() returns.
913        let (task_schedule, task_handle) = TaskSchedule::new(runtime.clone());
914        let task_schedule = Mutex::new(Some(task_schedule));
915
916        // We load the cached protocol recommendations unconditionally: the caller needs them even
917        // if it does not try to load the reset of the cache.
918        let protocols = {
919            let store = store.store.lock().expect("lock poisoned");
920            store
921                .cached_protocol_recommendations()?
922                .map(|(t, p)| (t, Arc::new(p)))
923        };
924
925        Ok(DirMgr {
926            config: config.into(),
927            store: store.store,
928            netdir,
929            protocols: Mutex::new(protocols),
930            default_parameters,
931            events,
932            send_status,
933            receive_status,
934            circmgr,
935            runtime,
936            offline,
937            bootstrap_started: AtomicBool::new(false),
938            #[cfg(feature = "dirfilter")]
939            filter,
940            task_schedule,
941            task_handle,
942        })
943    }
944
945    /// Load the latest non-pending non-expired directory from the
946    /// cache, if it is newer than the one we have.
947    ///
948    /// Return false if there is no such consensus.
949    fn load_directory(self: &Arc<Self>, attempt_id: AttemptId) -> Result<bool> {
950        let state = state::GetConsensusState::new(
951            self.runtime.clone(),
952            self.config.get(),
953            CacheUsage::CacheOnly,
954            None,
955            #[cfg(feature = "dirfilter")]
956            self.filter
957                .clone()
958                .unwrap_or_else(|| Arc::new(crate::filter::NilFilter)),
959        );
960        let _ = bootstrap::load(self, Box::new(state), attempt_id)?;
961
962        Ok(self.netdir.get().is_some())
963    }
964
965    /// Return a new asynchronous stream that will receive notification
966    /// whenever the consensus has changed.
967    ///
968    /// Multiple events may be batched up into a single item: each time
969    /// this stream yields an event, all you can assume is that the event has
970    /// occurred at least once.
971    pub fn events(&self) -> impl futures::Stream<Item = DirEvent> + use<R> {
972        self.events.subscribe()
973    }
974
975    /// Try to load the text of a single document described by `doc` from
976    /// storage.
977    pub fn text(&self, doc: &DocId) -> Result<Option<DocumentText>> {
978        use itertools::Itertools;
979        let mut result = HashMap::new();
980        let query: DocQuery = (*doc).into();
981        let store = self.store.lock().expect("store lock poisoned");
982        query.load_from_store_into(&mut result, &**store)?;
983        let item = result.into_iter().at_most_one().map_err(|_| {
984            Error::CacheCorruption("Found more than one entry in storage for given docid")
985        })?;
986        if let Some((docid, doctext)) = item {
987            if &docid != doc {
988                return Err(Error::CacheCorruption(
989                    "Item from storage had incorrect docid.",
990                ));
991            }
992            Ok(Some(doctext))
993        } else {
994            Ok(None)
995        }
996    }
997
998    /// Load the text for a collection of documents.
999    ///
1000    /// If many of the documents have the same type, this can be more
1001    /// efficient than calling [`text`](Self::text).
1002    pub fn texts<T>(&self, docs: T) -> Result<HashMap<DocId, DocumentText>>
1003    where
1004        T: IntoIterator<Item = DocId>,
1005    {
1006        let partitioned = docid::partition_by_type(docs);
1007        let mut result = HashMap::new();
1008        let store = self.store.lock().expect("store lock poisoned");
1009        for (_, query) in partitioned.into_iter() {
1010            query.load_from_store_into(&mut result, &**store)?;
1011        }
1012        Ok(result)
1013    }
1014
1015    /// Given a request we sent and the response we got from a
1016    /// directory server, see whether we should expand that response
1017    /// into "something larger".
1018    ///
1019    /// Currently, this handles expanding consensus diffs, and nothing
1020    /// else.  We do it at this stage of our downloading operation
1021    /// because it requires access to the store.
1022    fn expand_response_text(&self, req: &ClientRequest, text: String) -> Result<String> {
1023        if let ClientRequest::Consensus(req) = req {
1024            if tor_consdiff::looks_like_diff(&text) {
1025                if let Some(old_d) = req.old_consensus_digests().next() {
1026                    let db_val = {
1027                        let s = self.store.lock().expect("Directory storage lock poisoned");
1028                        s.consensus_by_sha3_digest_of_signed_part(old_d)?
1029                    };
1030                    if let Some((old_consensus, meta)) = db_val {
1031                        info!("Applying a consensus diff");
1032                        let new_consensus = tor_consdiff::apply_diff(
1033                            old_consensus.as_str()?,
1034                            &text,
1035                            Some(*meta.sha3_256_of_signed()),
1036                        )?;
1037                        new_consensus.check_digest()?;
1038                        return Ok(new_consensus.to_string());
1039                    }
1040                }
1041                return Err(Error::Unwanted(
1042                    "Received a consensus diff we did not ask for",
1043                ));
1044            }
1045        }
1046        Ok(text)
1047    }
1048
1049    /// If `state` has netdir changes to apply, apply them to our netdir.
1050    #[allow(clippy::cognitive_complexity)]
1051    fn apply_netdir_changes(
1052        self: &Arc<Self>,
1053        state: &mut Box<dyn DirState>,
1054        store: &mut dyn Store,
1055    ) -> Result<()> {
1056        if let Some(change) = state.get_netdir_change() {
1057            match change {
1058                NetDirChange::AttemptReplace {
1059                    netdir,
1060                    consensus_meta,
1061                } => {
1062                    // Check the new netdir is sufficient, if we have a circmgr.
1063                    // (Unwraps are fine because the `Option` is `Some` until we take it.)
1064                    if let Some(ref cm) = self.circmgr {
1065                        if !cm
1066                            .netdir_is_sufficient(netdir.as_ref().expect("AttemptReplace had None"))
1067                        {
1068                            debug!("Got a new NetDir, but it doesn't have enough guards yet.");
1069                            return Ok(());
1070                        }
1071                    }
1072                    let is_stale = {
1073                        // Done inside a block to not hold a long-lived copy of the NetDir.
1074                        self.netdir
1075                            .get()
1076                            .map(|x| {
1077                                x.lifetime().valid_after()
1078                                    > netdir
1079                                        .as_ref()
1080                                        .expect("AttemptReplace had None")
1081                                        .lifetime()
1082                                        .valid_after()
1083                            })
1084                            .unwrap_or(false)
1085                    };
1086                    if is_stale {
1087                        warn!("Got a new NetDir, but it's older than the one we currently have!");
1088                        return Err(Error::NetDirOlder);
1089                    }
1090                    let cfg = self.config.get();
1091                    let mut netdir = netdir.take().expect("AttemptReplace had None");
1092                    netdir.replace_overridden_parameters(&cfg.override_net_params);
1093                    self.netdir.replace(netdir);
1094                    self.events.publish(DirEvent::NewConsensus);
1095                    self.events.publish(DirEvent::NewDescriptors);
1096
1097                    info!("Marked consensus usable.");
1098                    if !store.is_readonly() {
1099                        store.mark_consensus_usable(consensus_meta)?;
1100                        // Now that a consensus is usable, older consensuses may
1101                        // need to expire.
1102                        store.expire_all(&crate::storage::EXPIRATION_DEFAULTS)?;
1103                    }
1104                    Ok(())
1105                }
1106                NetDirChange::AddMicrodescs(mds) => {
1107                    self.netdir.mutate(|netdir| {
1108                        for md in mds.drain(..) {
1109                            netdir.add_microdesc(md);
1110                        }
1111                        Ok(())
1112                    })?;
1113                    self.events.publish(DirEvent::NewDescriptors);
1114                    Ok(())
1115                }
1116                NetDirChange::SetRequiredProtocol { timestamp, protos } => {
1117                    if !store.is_readonly() {
1118                        store.update_protocol_recommendations(timestamp, protos.as_ref())?;
1119                    }
1120                    let mut pr = self.protocols.lock().expect("Poisoned lock");
1121                    *pr = Some((timestamp, protos));
1122                    self.events.publish(DirEvent::NewProtocolRecommendation);
1123                    Ok(())
1124                }
1125            }
1126        } else {
1127            Ok(())
1128        }
1129    }
1130}
1131
1132/// A degree of readiness for a given directory state object.
1133#[derive(Debug, Copy, Clone)]
1134enum Readiness {
1135    /// There is no more information to download.
1136    Complete,
1137    /// There is more information to download, but we don't need to
1138    Usable,
1139}
1140
1141/// Try to upgrade a weak reference to a DirMgr, and give an error on
1142/// failure.
1143fn upgrade_weak_ref<T>(weak: &Weak<T>) -> Result<Arc<T>> {
1144    Weak::upgrade(weak).ok_or(Error::ManagerDropped)
1145}
1146
1147/// Given a time `now`, and an amount of tolerated clock skew `tolerance`,
1148/// return the age of the oldest consensus that we should request at that time.
1149pub(crate) fn default_consensus_cutoff(
1150    now: SystemTime,
1151    tolerance: &DirTolerance,
1152) -> Result<SystemTime> {
1153    /// We _always_ allow at least this much age in our consensuses, to account
1154    /// for the fact that consensuses have some lifetime.
1155    const MIN_AGE_TO_ALLOW: Duration = Duration::from_secs(3 * 3600);
1156    let allow_skew = std::cmp::max(MIN_AGE_TO_ALLOW, tolerance.post_valid_tolerance());
1157    let cutoff = time::OffsetDateTime::from(now - allow_skew);
1158    // We now round cutoff to the next hour, so that we aren't leaking our exact
1159    // time to the directory cache.
1160    //
1161    // With the time crate, it's easier to calculate the "next hour" by rounding
1162    // _down_ then adding an hour; rounding up would sometimes require changing
1163    // the date too.
1164    let (h, _m, _s) = cutoff.to_hms();
1165    let cutoff = cutoff.replace_time(
1166        time::Time::from_hms(h, 0, 0)
1167            .map_err(tor_error::into_internal!("Failed clock calculation"))?,
1168    );
1169    let cutoff = cutoff + Duration::from_secs(3600);
1170
1171    Ok(cutoff.into())
1172}
1173
1174/// Return a list of the protocols [supported](tor_protover::doc_supported) by this crate
1175/// when running as a client.
1176pub fn supported_client_protocols() -> tor_protover::Protocols {
1177    use tor_protover::named::*;
1178    // WARNING: REMOVING ELEMENTS FROM THIS LIST CAN BE DANGEROUS!
1179    // SEE [`tor_protover::doc_changing`]
1180    [
1181        //
1182        DIRCACHE_CONSDIFF,
1183    ]
1184    .into_iter()
1185    .collect()
1186}
1187
1188#[cfg(test)]
1189mod test {
1190    // @@ begin test lint list maintained by maint/add_warning @@
1191    #![allow(clippy::bool_assert_comparison)]
1192    #![allow(clippy::clone_on_copy)]
1193    #![allow(clippy::dbg_macro)]
1194    #![allow(clippy::mixed_attributes_style)]
1195    #![allow(clippy::print_stderr)]
1196    #![allow(clippy::print_stdout)]
1197    #![allow(clippy::single_char_pattern)]
1198    #![allow(clippy::unwrap_used)]
1199    #![allow(clippy::unchecked_time_subtraction)]
1200    #![allow(clippy::useless_vec)]
1201    #![allow(clippy::needless_pass_by_value)]
1202    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
1203    use super::*;
1204    use crate::docmeta::{AuthCertMeta, ConsensusMeta};
1205    use std::time::Duration;
1206    use tempfile::TempDir;
1207    use tor_basic_utils::test_rng::testing_rng;
1208    use tor_netdoc::doc::netstatus::ConsensusFlavor;
1209    use tor_netdoc::doc::{authcert::AuthCertKeyIds, netstatus::Lifetime};
1210    use tor_rtcompat::SleepProvider;
1211
1212    #[test]
1213    fn protocols() {
1214        let pr = supported_client_protocols();
1215        let expected = "DirCache=2".parse().unwrap();
1216        assert_eq!(pr, expected);
1217    }
1218
1219    pub(crate) fn new_mgr<R: Runtime>(runtime: R) -> (TempDir, DirMgr<R>) {
1220        let dir = TempDir::new().unwrap();
1221        let config = DirMgrConfig {
1222            cache_dir: dir.path().into(),
1223            ..Default::default()
1224        };
1225        let store = DirMgrStore::new(&config, runtime.clone(), false).unwrap();
1226        let dirmgr = DirMgr::from_config(config, runtime, store, None, false).unwrap();
1227
1228        (dir, dirmgr)
1229    }
1230
1231    #[test]
1232    fn failing_accessors() {
1233        tor_rtcompat::test_with_one_runtime!(|rt| async {
1234            let (_tempdir, mgr) = new_mgr(rt);
1235
1236            assert!(mgr.circmgr().is_err());
1237            assert!(mgr.netdir(Timeliness::Unchecked).is_err());
1238        });
1239    }
1240
1241    #[test]
1242    fn load_and_store_internals() {
1243        tor_rtcompat::test_with_one_runtime!(|rt| async {
1244            let now = rt.wallclock();
1245            let tomorrow = now + Duration::from_secs(86400);
1246            let later = tomorrow + Duration::from_secs(86400);
1247
1248            let (_tempdir, mgr) = new_mgr(rt);
1249
1250            // Seed the storage with a bunch of junk.
1251            let d1 = [5_u8; 32];
1252            let d2 = [7; 32];
1253            let d3 = [42; 32];
1254            let d4 = [99; 20];
1255            let d5 = [12; 20];
1256            let certid1 = AuthCertKeyIds {
1257                id_fingerprint: d4.into(),
1258                sk_fingerprint: d5.into(),
1259            };
1260            let certid2 = AuthCertKeyIds {
1261                id_fingerprint: d5.into(),
1262                sk_fingerprint: d4.into(),
1263            };
1264
1265            {
1266                let mut store = mgr.store.lock().unwrap();
1267
1268                store
1269                    .store_microdescs(
1270                        &[
1271                            ("Fake micro 1", &d1),
1272                            ("Fake micro 2", &d2),
1273                            ("Fake micro 3", &d3),
1274                        ],
1275                        now,
1276                    )
1277                    .unwrap();
1278
1279                #[cfg(feature = "routerdesc")]
1280                store
1281                    .store_routerdescs(&[("Fake rd1", now, &d4), ("Fake rd2", now, &d5)])
1282                    .unwrap();
1283
1284                store
1285                    .store_authcerts(&[
1286                        (
1287                            AuthCertMeta::new(certid1, now, tomorrow),
1288                            "Fake certificate one",
1289                        ),
1290                        (
1291                            AuthCertMeta::new(certid2, now, tomorrow),
1292                            "Fake certificate two",
1293                        ),
1294                    ])
1295                    .unwrap();
1296
1297                let cmeta = ConsensusMeta::new(
1298                    Lifetime::new(now, tomorrow, later).unwrap(),
1299                    [102; 32],
1300                    [103; 32],
1301                );
1302                store
1303                    .store_consensus(&cmeta, ConsensusFlavor::Microdesc, false, "Fake consensus!")
1304                    .unwrap();
1305            }
1306
1307            // Try to get it with text().
1308            let t1 = mgr.text(&DocId::Microdesc(d1)).unwrap().unwrap();
1309            assert_eq!(t1.as_str(), Ok("Fake micro 1"));
1310
1311            let t2 = mgr
1312                .text(&DocId::LatestConsensus {
1313                    flavor: ConsensusFlavor::Microdesc,
1314                    cache_usage: CacheUsage::CacheOkay,
1315                })
1316                .unwrap()
1317                .unwrap();
1318            assert_eq!(t2.as_str(), Ok("Fake consensus!"));
1319
1320            let t3 = mgr.text(&DocId::Microdesc([255; 32])).unwrap();
1321            assert!(t3.is_none());
1322
1323            // Now try texts()
1324            let d_bogus = DocId::Microdesc([255; 32]);
1325            let res = mgr
1326                .texts(vec![
1327                    DocId::Microdesc(d2),
1328                    DocId::Microdesc(d3),
1329                    d_bogus,
1330                    DocId::AuthCert(certid2),
1331                    #[cfg(feature = "routerdesc")]
1332                    DocId::RouterDesc(d5),
1333                ])
1334                .unwrap();
1335            assert_eq!(
1336                res.get(&DocId::Microdesc(d2)).unwrap().as_str(),
1337                Ok("Fake micro 2")
1338            );
1339            assert_eq!(
1340                res.get(&DocId::Microdesc(d3)).unwrap().as_str(),
1341                Ok("Fake micro 3")
1342            );
1343            assert!(!res.contains_key(&d_bogus));
1344            assert_eq!(
1345                res.get(&DocId::AuthCert(certid2)).unwrap().as_str(),
1346                Ok("Fake certificate two")
1347            );
1348            #[cfg(feature = "routerdesc")]
1349            assert_eq!(
1350                res.get(&DocId::RouterDesc(d5)).unwrap().as_str(),
1351                Ok("Fake rd2")
1352            );
1353        });
1354    }
1355
1356    #[test]
1357    fn make_consensus_request() {
1358        tor_rtcompat::test_with_one_runtime!(|rt| async {
1359            let now = rt.wallclock();
1360            let tomorrow = now + Duration::from_secs(86400);
1361            let later = tomorrow + Duration::from_secs(86400);
1362
1363            let (_tempdir, mgr) = new_mgr(rt);
1364            let config = DirMgrConfig::default();
1365
1366            // Try with an empty store.
1367            let req = {
1368                let store = mgr.store.lock().unwrap();
1369                bootstrap::make_consensus_request(
1370                    now,
1371                    ConsensusFlavor::Microdesc,
1372                    &**store,
1373                    &config,
1374                )
1375                .unwrap()
1376            };
1377            let tolerance = DirTolerance::default().post_valid_tolerance();
1378            match req {
1379                ClientRequest::Consensus(r) => {
1380                    assert_eq!(r.old_consensus_digests().count(), 0);
1381                    let date = r.last_consensus_date().unwrap();
1382                    assert!(date >= now - tolerance);
1383                    assert!(date <= now - tolerance + Duration::from_secs(3600));
1384                }
1385                _ => panic!("Wrong request type"),
1386            }
1387
1388            // Add a fake consensus record.
1389            let d_prev = [42; 32];
1390            {
1391                let mut store = mgr.store.lock().unwrap();
1392
1393                let cmeta = ConsensusMeta::new(
1394                    Lifetime::new(now, tomorrow, later).unwrap(),
1395                    d_prev,
1396                    [103; 32],
1397                );
1398                store
1399                    .store_consensus(&cmeta, ConsensusFlavor::Microdesc, false, "Fake consensus!")
1400                    .unwrap();
1401            }
1402
1403            // Now try again.
1404            let req = {
1405                let store = mgr.store.lock().unwrap();
1406                bootstrap::make_consensus_request(
1407                    now,
1408                    ConsensusFlavor::Microdesc,
1409                    &**store,
1410                    &config,
1411                )
1412                .unwrap()
1413            };
1414            match req {
1415                ClientRequest::Consensus(r) => {
1416                    let ds: Vec<_> = r.old_consensus_digests().collect();
1417                    assert_eq!(ds.len(), 1);
1418                    assert_eq!(ds[0], &d_prev);
1419                    assert_eq!(r.last_consensus_date(), Some(now));
1420                }
1421                _ => panic!("Wrong request type"),
1422            }
1423        });
1424    }
1425
1426    #[test]
1427    fn make_other_requests() {
1428        tor_rtcompat::test_with_one_runtime!(|rt| async {
1429            use rand::Rng;
1430            let (_tempdir, mgr) = new_mgr(rt);
1431
1432            let certid1 = AuthCertKeyIds {
1433                id_fingerprint: [99; 20].into(),
1434                sk_fingerprint: [100; 20].into(),
1435            };
1436            let mut rng = testing_rng();
1437            #[cfg(feature = "routerdesc")]
1438            let rd_ids: Vec<DocId> = (0..1000).map(|_| DocId::RouterDesc(rng.random())).collect();
1439            let md_ids: Vec<DocId> = (0..1000).map(|_| DocId::Microdesc(rng.random())).collect();
1440            let config = DirMgrConfig::default();
1441
1442            // Try an authcert.
1443            let query = DocId::AuthCert(certid1);
1444            let store = mgr.store.lock().unwrap();
1445            let reqs =
1446                bootstrap::make_requests_for_documents(&mgr.runtime, &[query], &**store, &config)
1447                    .unwrap();
1448            assert_eq!(reqs.len(), 1);
1449            let req = &reqs[0];
1450            if let ClientRequest::AuthCert(r) = req {
1451                assert_eq!(r.keys().next(), Some(&certid1));
1452            } else {
1453                panic!();
1454            }
1455
1456            // Try a bunch of mds.
1457            let reqs =
1458                bootstrap::make_requests_for_documents(&mgr.runtime, &md_ids, &**store, &config)
1459                    .unwrap();
1460            assert_eq!(reqs.len(), 2);
1461            assert!(matches!(reqs[0], ClientRequest::Microdescs(_)));
1462
1463            // Try a bunch of rds.
1464            #[cfg(feature = "routerdesc")]
1465            {
1466                let reqs = bootstrap::make_requests_for_documents(
1467                    &mgr.runtime,
1468                    &rd_ids,
1469                    &**store,
1470                    &config,
1471                )
1472                .unwrap();
1473                assert_eq!(reqs.len(), 2);
1474                assert!(matches!(reqs[0], ClientRequest::RouterDescs(_)));
1475            }
1476        });
1477    }
1478
1479    #[test]
1480    fn expand_response() {
1481        tor_rtcompat::test_with_one_runtime!(|rt| async {
1482            let now = rt.wallclock();
1483            let day = Duration::from_secs(86400);
1484            let config = DirMgrConfig::default();
1485
1486            let (_tempdir, mgr) = new_mgr(rt);
1487
1488            // Try a simple request: nothing should happen.
1489            let q = DocId::Microdesc([99; 32]);
1490            let r = {
1491                let store = mgr.store.lock().unwrap();
1492                bootstrap::make_requests_for_documents(&mgr.runtime, &[q], &**store, &config)
1493                    .unwrap()
1494            };
1495            let expanded = mgr.expand_response_text(&r[0], "ABC".to_string());
1496            assert_eq!(&expanded.unwrap(), "ABC");
1497
1498            // Try a consensus response that doesn't look like a diff in
1499            // response to a query that doesn't ask for one.
1500            let latest_id = DocId::LatestConsensus {
1501                flavor: ConsensusFlavor::Microdesc,
1502                cache_usage: CacheUsage::CacheOkay,
1503            };
1504            let r = {
1505                let store = mgr.store.lock().unwrap();
1506                bootstrap::make_requests_for_documents(
1507                    &mgr.runtime,
1508                    &[latest_id],
1509                    &**store,
1510                    &config,
1511                )
1512                .unwrap()
1513            };
1514            let expanded = mgr.expand_response_text(&r[0], "DEF".to_string());
1515            assert_eq!(&expanded.unwrap(), "DEF");
1516
1517            // Now stick some metadata and a string into the storage so that
1518            // we can ask for a diff.
1519            {
1520                let mut store = mgr.store.lock().unwrap();
1521                let d_in = [0x99; 32]; // This one, we can fake.
1522                let cmeta = ConsensusMeta::new(
1523                    Lifetime::new(now, now + day, now + 2 * day).unwrap(),
1524                    d_in,
1525                    d_in,
1526                );
1527                store
1528                    .store_consensus(
1529                        &cmeta,
1530                        ConsensusFlavor::Microdesc,
1531                        false,
1532                        "line 1\nline2\nline 3\n",
1533                    )
1534                    .unwrap();
1535            }
1536
1537            // Try expanding something that isn't a consensus, even if we'd like
1538            // one.
1539            let r = {
1540                let store = mgr.store.lock().unwrap();
1541                bootstrap::make_requests_for_documents(
1542                    &mgr.runtime,
1543                    &[latest_id],
1544                    &**store,
1545                    &config,
1546                )
1547                .unwrap()
1548            };
1549            let expanded = mgr.expand_response_text(&r[0], "hello".to_string());
1550            assert_eq!(&expanded.unwrap(), "hello");
1551
1552            // Finally, try "expanding" a diff (by applying it and checking the digest.
1553            let diff = "network-status-diff-version 1
1554hash 9999999999999999999999999999999999999999999999999999999999999999 8382374ca766873eb0d2530643191c6eaa2c5e04afa554cbac349b5d0592d300
15552c
1556replacement line
1557.
1558".to_string();
1559            let expanded = mgr.expand_response_text(&r[0], diff);
1560
1561            assert_eq!(expanded.unwrap(), "line 1\nreplacement line\nline 3\n");
1562
1563            // If the digest is wrong, that should get rejected.
1564            let diff = "network-status-diff-version 1
1565hash 9999999999999999999999999999999999999999999999999999999999999999 9999999999999999999999999999999999999999999999999999999999999999
15662c
1567replacement line
1568.
1569".to_string();
1570            let expanded = mgr.expand_response_text(&r[0], diff);
1571            assert!(expanded.is_err());
1572        });
1573    }
1574}