symsrv/
lib.rs

1//! # symsrv
2//!
3//! This crate lets you download and cache symbol files from symbol servers,
4//! according to the rules from the `_NT_SYMBOL_PATH` environment variable.
5//!
6//! It exposes an async API. Internally it uses `reqwest` and `tokio`.
7//!
8//! The downloaded symbols are stored on the file system. No automatic expiration
9//! or eviction is performed. If you want to enforce a cache size limit or expire
10//! old files, you can observe cache file creations and accesses with the
11//! [`SymsrvObserver`] trait, and then write your own implementation for automatic
12//! file cleanup.
13//!
14//! ## Microsoft Documentation
15//!
16//! - [Advanced SymSrv Use](https://docs.microsoft.com/en-us/windows-hardware/drivers/debugger/advanced-symsrv-use)
17//!
18//! ## Example
19//!
20//! ```
21//! use symsrv::SymsrvDownloader;
22//!
23//! # fn open_pdb_at_path(p: &std::path::Path) {}
24//! #
25//! # async fn wrapper() -> Result<(), symsrv::Error> {
26//! // Parse the _NT_SYMBOL_PATH environment variable.
27//! let symbol_path_env = symsrv::get_symbol_path_from_environment();
28//! let symbol_path = symbol_path_env.as_deref().unwrap_or("srv**https://msdl.microsoft.com/download/symbols");
29//! let parsed_symbol_path = symsrv::parse_nt_symbol_path(symbol_path);
30//!
31//! // Create a downloader which follows the _NT_SYMBOL_PATH recipe.
32//! let mut downloader = SymsrvDownloader::new(parsed_symbol_path);
33//! downloader.set_default_downstream_store(symsrv::get_home_sym_dir());
34//!
35//! // Download and cache a PDB file.
36//! let local_path = downloader.get_file("dcomp.pdb", "648B8DD0780A4E22FA7FA89B84633C231").await?;
37//!
38//! // Use the PDB file.
39//! open_pdb_at_path(&local_path);
40//! # Ok(())
41//! # }
42//! ```
43
44mod computation_coalescing;
45mod download;
46mod file_creation;
47mod poll_all;
48mod remotely_fed_cursor;
49
50use std::future::Future;
51use std::io::{BufReader, Read, Seek, Write};
52use std::ops::Deref;
53use std::path::{Path, PathBuf};
54use std::pin::Pin;
55use std::sync::atomic::AtomicU64;
56use std::sync::Arc;
57use std::time::Duration;
58
59use futures_util::{future, AsyncReadExt};
60use tokio::io::AsyncWriteExt;
61use tokio::time::Instant;
62
63use computation_coalescing::ComputationCoalescer;
64use download::response_to_uncompressed_stream_with_progress;
65use file_creation::{create_file_cleanly, CleanFileCreationError};
66use poll_all::PollAllPreservingOrder;
67use remotely_fed_cursor::{RemotelyFedCursor, RemotelyFedCursorFeeder};
68
69/// The parsed representation of one entry in the (semicolon-separated list of entries in the) `_NT_SYMBOL_PATH` environment variable.
70/// The syntax of this string is documented at <https://docs.microsoft.com/en-us/windows-hardware/drivers/debugger/advanced-symsrv-use>.
71#[derive(Clone, Debug, PartialEq, Eq)]
72pub enum NtSymbolPathEntry {
73    /// Sets a cache path that will be used for subsequent entries, and for any symbol paths that get added at runtime.
74    /// Created for `cache*` entries.
75    Cache(PathBuf),
76    /// A fallback-and-cache chain with optional http / https symbol servers at the end.
77    /// Created for `srv*` and `symsrv*` entries.
78    Chain {
79        /// Usually `symsrv.dll`. (`srv*...` is shorthand for `symsrv*symsrv.dll*...`.)
80        dll: String,
81        /// Any cache directories. The first directory is the "bottom-most" cache. The bottom cache
82        /// is always checked first, and always stores uncompressed files.
83        ///
84        /// Any remaining directories are mid-level cache directories. These can store compressed files.
85        cache_paths: Vec<CachePath>,
86        /// Symbol server URLs. Can serve compressed or uncompressed files. Not used as a cache target.
87        /// These are checked last.
88        urls: Vec<String>,
89    },
90    /// A path where symbols can be found but which is not used as a cache target.
91    /// Created for entries which are just a path.
92    LocalOrShare(PathBuf),
93}
94
95/// A regular cache directory or a marker for the "default downstream store".
96#[derive(Clone, Debug, PartialEq, Eq)]
97pub enum CachePath {
98    /// A placeholder for the directory of the "default downstream store". This is used
99    /// for empty cache items in the `_NT_SYMBOL_PATH`, e.g. if you have a `srv**URL` with
100    /// two asterisks right after each other.
101    DefaultDownstreamStore,
102
103    /// The path to a directory where this cache is located.
104    Path(PathBuf),
105}
106
107impl CachePath {
108    pub fn to_path<'a>(&'a self, default_downstream_store: &'a Path) -> &'a Path {
109        match self {
110            CachePath::DefaultDownstreamStore => default_downstream_store,
111            CachePath::Path(path) => path,
112        }
113    }
114}
115
116/// Returns the absolute path to the `~/sym` directory. This is a reasonable default for the "default downstream store".
117/// The return value can be directly passed to [`SymsrvDownloader::set_default_downstream_store`].
118///
119/// This function returns `None` if the home directory cannot be determined.
120pub fn get_home_sym_dir() -> Option<PathBuf> {
121    let home_dir = dirs::home_dir()?;
122    Some(home_dir.join("sym"))
123}
124
125/// Reads the `_NT_SYMBOL_PATH` environment variable into a string.
126pub fn get_symbol_path_from_environment() -> Option<String> {
127    std::env::var("_NT_SYMBOL_PATH").ok()
128}
129
130/// Parse the value of the `_NT_SYMBOL_PATH` variable. The format of this variable
131/// is a semicolon-separated list of entries, where each entry is an asterisk-separated
132/// hierarchy of symbol locations which can be either directories or server URLs.
133pub fn parse_nt_symbol_path(symbol_path: &str) -> Vec<NtSymbolPathEntry> {
134    fn chain<'a>(dll_name: &str, parts: impl Iterator<Item = &'a str>) -> NtSymbolPathEntry {
135        let mut cache_paths = Vec::new();
136        let mut urls = Vec::new();
137        for part in parts {
138            if part.is_empty() {
139                cache_paths.push(CachePath::DefaultDownstreamStore);
140            } else if part.starts_with("http://") || part.starts_with("https://") {
141                urls.push(part.into());
142            } else {
143                cache_paths.push(CachePath::Path(part.into()));
144            }
145        }
146        NtSymbolPathEntry::Chain {
147            dll: dll_name.to_string(),
148            cache_paths,
149            urls,
150        }
151    }
152
153    symbol_path
154        .split(';')
155        .filter_map(|segment| {
156            let mut parts = segment.split('*');
157            let first = parts.next().unwrap();
158            match first.to_ascii_lowercase().as_str() {
159                "cache" => parts
160                    .next()
161                    .map(|path| NtSymbolPathEntry::Cache(path.into())),
162                "srv" => Some(chain("symsrv.dll", parts)),
163                "symsrv" => parts.next().map(|dll_name| chain(dll_name, parts)),
164                _ => Some(NtSymbolPathEntry::LocalOrShare(first.into())),
165            }
166        })
167        .collect()
168}
169
170/// The error type used for results returned from [`SymsrvDownloader::get_file`].
171#[derive(thiserror::Error, Debug, Clone)]
172#[non_exhaustive]
173pub enum Error {
174    /// There was an error when interacting with the file system.
175    #[error("IO error: {0}")]
176    IoError(String),
177
178    /// The requested file was not found.
179    #[error("The file was not found in the SymsrvDownloader.")]
180    NotFound,
181
182    /// No default downstream store was specified, but it was needed.
183    #[error("No default downstream store was specified, but it was needed.")]
184    NoDefaultDownstreamStore,
185
186    /// The requested path does not have a file extension.
187    #[error("The requested path does not have a file extension.")]
188    NoExtension,
189
190    /// The requested path does not have a recognized file extension.
191    #[error("The requested path does not have a recognized file extension (exe/dll/pdb/dbg).")]
192    UnrecognizedExtension,
193
194    /// An internal error occurred: Couldn't join task
195    #[error("An internal error occurred: Couldn't join task")]
196    JoinError(String),
197
198    /// Generic error from `reqwest`.
199    #[error("ReqwestError: {0}")]
200    ReqwestError(String),
201
202    /// Unexpected Content-Encoding header.
203    #[error("Unexpected Content-Encoding header: {0}")]
204    UnexpectedContentEncoding(String),
205
206    /// An error occurred while extracting a CAB archive.
207    #[error("Error while extracting a CAB archive: {0}")]
208    CabExtraction(String),
209}
210
211impl From<std::io::Error> for Error {
212    fn from(err: std::io::Error) -> Error {
213        Error::IoError(err.to_string())
214    }
215}
216
217impl From<CleanFileCreationError<Error>> for Error {
218    fn from(e: CleanFileCreationError<Error>) -> Error {
219        match e {
220            CleanFileCreationError::CallbackIndicatedError(e) => e,
221            e => Error::IoError(e.to_string()),
222        }
223    }
224}
225
226/// The error type used in the observer notification [`SymsrvObserver::on_download_failed`].
227#[derive(thiserror::Error, Debug)]
228pub enum DownloadError {
229    /// Creating the reqwest Client failed.
230    #[error("Creating the client failed: {0}")]
231    ClientCreationFailed(String),
232
233    /// Opening the request failed.
234    #[error("Opening the request failed: {0}")]
235    OpenFailed(Box<dyn std::error::Error + Send + Sync>),
236
237    /// The download timed out.
238    #[error("The download timed out")]
239    Timeout,
240
241    /// The server returned a non-success status code.
242    #[error("The server returned status code {0}")]
243    StatusError(http::StatusCode),
244
245    /// The destination directory could not be created.
246    #[error("The destination directory could not be created")]
247    CouldNotCreateDestinationDirectory,
248
249    /// The response used an unexpected Content-Encoding.
250    #[error("The response used an unexpected Content-Encoding: {0}")]
251    UnexpectedContentEncoding(String),
252
253    /// An I/O error occurred in the middle of downloading.
254    #[error("Error during downloading: {0}")]
255    ErrorDuringDownloading(std::io::Error),
256
257    /// Error while writing the downloaded file.
258    #[error("Error while writing the downloaded file: {0}")]
259    ErrorWhileWritingDownloadedFile(std::io::Error),
260
261    /// Redirect-related error.
262    #[error("Redirect-related error")]
263    Redirect(Box<dyn std::error::Error + Send + Sync>),
264
265    /// Other error.
266    #[error("Other error: {0}")]
267    Other(Box<dyn std::error::Error + Send + Sync>),
268}
269
270/// The error type used in the observer notification [`SymsrvObserver::on_cab_extraction_failed`].
271#[derive(thiserror::Error, Debug)]
272pub enum CabExtractionError {
273    /// The CAB archive did not contain any files.
274    #[error("Empty CAB archive")]
275    EmptyCab,
276
277    /// The CAB archive could not be opened.
278    #[error("Could not open CAB file: {0}")]
279    CouldNotOpenCabFile(std::io::Error),
280
281    /// The CAB archive could not be parsed.
282    #[error("Error while parsing the CAB file: {0}")]
283    CabParsing(std::io::Error),
284
285    /// There was an error while reading the CAB archive.
286    #[error("Error while reading the CAB file: {0}")]
287    CabReading(std::io::Error),
288
289    /// There was an error while writing the extracted file.
290    #[error("Error while writing the file: {0}")]
291    FileWriting(std::io::Error),
292
293    /// Redirect-related error.
294    #[error("Redirect-related error")]
295    Redirect(Box<dyn std::error::Error + Send + Sync>),
296
297    /// Other error.
298    #[error("Other error: {0}")]
299    Other(Box<dyn std::error::Error + Send + Sync>),
300}
301
302#[cfg(test)]
303#[test]
304fn test_download_error_is_sync() {
305    fn assert_sync<T: Sync>() {}
306    assert_sync::<DownloadError>();
307}
308
309impl From<reqwest::Error> for DownloadError {
310    fn from(e: reqwest::Error) -> Self {
311        if e.is_status() {
312            DownloadError::StatusError(e.status().unwrap())
313        } else if e.is_request() {
314            DownloadError::OpenFailed(e.into())
315        } else if e.is_redirect() {
316            DownloadError::Redirect(e.into())
317        } else if e.is_timeout() {
318            DownloadError::Timeout
319        } else {
320            DownloadError::Other(e.into())
321        }
322    }
323}
324
325/// A trait for observing the behavior of a `SymsrvDownloader`.
326/// This can be used for logging, displaying progress bars, expiring cached files, etc.
327pub trait SymsrvObserver: Send + Sync + 'static {
328    /// Called when a new download is about to start, before the connection is established.
329    ///
330    /// The download ID is unique for each download.
331    ///
332    /// For each download ID, we guarantee that exactly one of the following methods
333    /// will be called at the end of the download: `on_download_completed`,
334    /// `on_download_failed`, or `on_download_canceled`.
335    fn on_new_download_before_connect(&self, download_id: u64, url: &str);
336
337    /// Called once the connection has been established and HTTP headers
338    /// with a success status have arrived.
339    fn on_download_started(&self, download_id: u64);
340
341    /// Called frequently during the download, whenever a new chunk has been read.
342    ///
343    /// If the HTTP response is gzip-compressed, the number of bytes can refer to
344    /// either the compressed or the uncompressed bytes - but it'll be consistent:
345    /// Either both `bytes_so_far` and `total_bytes` refer to the compressed sizes,
346    /// or both refer to the uncompressed sizes.
347    ///
348    /// If `total_bytes` is `None`, the total size is unknown.
349    fn on_download_progress(&self, download_id: u64, bytes_so_far: u64, total_bytes: Option<u64>);
350
351    /// Called when the download has completed successfully.
352    ///
353    /// Mutually exclusive with `on_download_failed` and `on_download_canceled` for a
354    /// given download ID.
355    fn on_download_completed(
356        &self,
357        download_id: u64,
358        uncompressed_size_in_bytes: u64,
359        time_until_headers: Duration,
360        time_until_completed: Duration,
361    );
362
363    /// Called when the download has failed.
364    ///
365    /// This is quite common; the most common reason is [`DownloadError::StatusError`]
366    /// with [`StatusCode::NOT_FOUND`](http::StatusCode::NOT_FOUND), for files which
367    /// are not available on the server.
368    ///
369    /// Mutually exclusive with `on_download_completed` and `on_download_canceled` for a
370    /// given download ID.
371    fn on_download_failed(&self, download_id: u64, reason: DownloadError);
372
373    /// Called when the download has been canceled.
374    ///
375    /// This does not indicate an error. We commonly attempt to download a file from
376    /// multiple sources simultaneously, and cancel other downloads once one has succeeded.
377    ///
378    /// This function is also called if the user cancels the download by dropping the future
379    /// returned from [`SymsrvDownloader::get_file`].
380    ///
381    /// Mutually exclusive with `on_download_completed` and `on_download_failed` for a
382    /// given download ID.
383    fn on_download_canceled(&self, download_id: u64);
384
385    /// Called when a new CAB extraction is about to start.
386    fn on_new_cab_extraction(&self, extraction_id: u64, dest_path: &Path);
387
388    /// Called periodically during a CAB extraction. The byte counts refer to the uncompressed size.
389    fn on_cab_extraction_progress(&self, extraction_id: u64, bytes_so_far: u64, total_bytes: u64);
390
391    /// Called when a CAB extraction has completed successfully.
392    fn on_cab_extraction_completed(
393        &self,
394        extraction_id: u64,
395        uncompressed_size_in_bytes: u64,
396        time_until_completed: Duration,
397    );
398
399    /// Called when a CAB extraction has failed.
400    fn on_cab_extraction_failed(&self, extraction_id: u64, reason: CabExtractionError);
401
402    /// Called when a CAB extraction has been canceled.
403    fn on_cab_extraction_canceled(&self, extraction_id: u64);
404
405    /// Called when a file has been created, for example because it was downloaded from
406    /// a server, copied from a different cache directory, or extracted from a compressed
407    /// file.
408    fn on_file_created(&self, path: &Path, size_in_bytes: u64);
409
410    /// Called when a file from the cache has been used to service a [`SymsrvDownloader::get_file`] call.
411    ///
412    /// This is only called for pre-existing files and not for newly-created files - newly-created
413    /// files only trigger a call to `on_file_created`.
414    ///
415    /// Useful to guide expiration decisions.
416    fn on_file_accessed(&self, path: &Path);
417
418    /// Called when we were looking for a file in the cache, and it wasn't there. Used for
419    /// debug logging.
420    ///
421    /// Also called if checking for file existence fails for any other reason.
422    fn on_file_missed(&self, path: &Path);
423}
424
425static NEXT_DOWNLOAD_OR_EXTRACTION_ID: AtomicU64 = AtomicU64::new(0);
426
427/// Obtains symbol files (PDBs + binary files) according to the instructions in the symbol path.
428///
429/// Create a new instance with [`SymsrvDownloader::new`], and then use the
430/// [`get_file`](SymsrvDownloader::get_file) method to obtain files.
431pub struct SymsrvDownloader {
432    inner: Arc<SymsrvDownloaderInner>,
433    inflight_request_cache:
434        ComputationCoalescer<(String, String, bool), PinBoxDynFuture<Result<PathBuf, Error>>>,
435}
436
437type PinBoxDynFuture<T> = Pin<Box<dyn Future<Output = T> + Send + Sync>>;
438
439struct SymsrvDownloaderInner {
440    symbol_path: Vec<NtSymbolPathEntry>,
441    default_downstream_store: Option<PathBuf>,
442    observer: Option<Arc<dyn SymsrvObserver>>,
443    reqwest_client: Result<reqwest::Client, reqwest::Error>,
444}
445
446#[cfg(test)]
447#[test]
448fn test_symsrv_downloader_error_is_send_and_sync() {
449    fn assert_send<T: Send>() {}
450    fn assert_sync<T: Sync>() {}
451    assert_send::<SymsrvDownloader>();
452    assert_sync::<SymsrvDownloader>();
453}
454
455impl SymsrvDownloader {
456    /// Create a new `SymsrvDownloader`.
457    ///
458    /// `symbol_path` describes the behavior of the downloader, including which servers to
459    /// download from and which cache directories to use. The symbol path is commonly created
460    /// by parsing the `_NT_SYMBOL_PATH` environment variable with [`parse_nt_symbol_path`].
461    ///
462    /// # Example
463    ///
464    /// ```
465    /// use std::path::Path;
466    /// use symsrv::SymsrvDownloader;
467    ///
468    /// let symbol_path_env = symsrv::get_symbol_path_from_environment();
469    /// let symbol_path = symbol_path_env.as_deref().unwrap_or("srv**https://msdl.microsoft.com/download/symbols");
470    /// let parsed_symbol_path = symsrv::parse_nt_symbol_path(symbol_path);
471    ///
472    /// let mut downloader = SymsrvDownloader::new(parsed_symbol_path);
473    /// downloader.set_default_downstream_store(symsrv::get_home_sym_dir());
474    /// ```
475    pub fn new(symbol_path: Vec<NtSymbolPathEntry>) -> Self {
476        Self {
477            inner: Arc::new(SymsrvDownloaderInner::new(symbol_path)),
478            inflight_request_cache: ComputationCoalescer::new(),
479        }
480    }
481
482    /// Set the observer for this downloader.
483    ///
484    /// The observer can be used for logging, displaying progress bars, informing
485    /// automatic expiration of cached files, and so on.
486    ///
487    /// See the [`SymsrvObserver`] trait for more information.
488    pub fn set_observer(&mut self, observer: Option<Arc<dyn SymsrvObserver>>) {
489        Arc::get_mut(&mut self.inner).unwrap().observer = observer;
490    }
491
492    /// Set the default downstream store. In the `srv*DOWNSTREAM_STORE*URL` syntax for `_NT_SYMBOL_PATH`,
493    /// leaving the `DOWNSTREAM_STORE` part empty (i.e. having to asterisks in a row, as in `srv**URL`)
494    /// causes this default directory to be used.
495    ///
496    /// You can set this to `symsrv::get_home_sym_dir()` to use the `~/sym` directory.
497    ///
498    /// You can also leave this at the default `None` to disable the default downstream store;
499    /// this means that `srv**URL` entries will not work because the downloads have nowhere to go.
500    ///
501    /// The Windows Debugger [chooses the default downstream store as follows](https://docs.microsoft.com/en-us/windows-hardware/drivers/debugger/advanced-symsrv-use):
502    /// > If you include two asterisks in a row where a downstream store would normally be specified,
503    /// > then the default downstream store is used. This store will be located in the sym subdirectory
504    /// > of the home directory. The home directory defaults to the debugger installation directory;
505    /// > this can be changed by using the [!homedir](https://learn.microsoft.com/en-us/windows-hardware/drivers/debuggercmds/-homedir)
506    /// > extension or by setting the `DBGHELP_HOMEDIR` environment variable.
507    pub fn set_default_downstream_store<P: Into<PathBuf>>(
508        &mut self,
509        default_downstream_store: Option<P>,
510    ) {
511        Arc::get_mut(&mut self.inner)
512            .unwrap()
513            .default_downstream_store = default_downstream_store.map(Into::into);
514    }
515
516    /// This is the primary entry point to fetch files. It looks up the
517    /// file according to the recipe in the symbol path, by searching
518    /// cache directories, downloading files from servers, and uncompressing files
519    /// as needed.
520    ///
521    /// If a matching file is found, a [`PathBuf`] with the path to the uncompressed
522    /// file on the local file system is returned.
523    ///
524    /// The file name can be the name of a PDB file or of a binary file (i.e. .exe or .dll).
525    ///
526    /// The syntax of the hash depends on the file type:
527    ///
528    ///  - For PDBs: The hash is `<GUID><age>`, with `<GUID>` in uppercase hex (no dashes)
529    ///    and `<age>` in lowercase hex.
530    ///  - For binaries: The hash is `<TIMESTAMP><imageSize>`, with `<TIMESTAMP>`
531    ///    printed as eight uppercase hex digits (with leading zeros added as needed)
532    ///    and `<imageSize>` in lowercase hex digits (no leading zeros).
533    ///
534    /// Examples:
535    ///
536    /// - `xul.pdb`, `B2A2B092E45739B84C4C44205044422E1`
537    /// - `renderdoc.dll`, `61015E74442b000`
538    ///
539    /// The PDB hash is commonly created with the help of the `debugid` crate,
540    /// using [`DebugId::breakpad()`](https://docs.rs/debugid/latest/debugid/struct.DebugId.html#method.breakpad).
541    ///
542    /// The binary hash (the "code ID") can be created using
543    /// [`wholesym::PeCodeId`](https://docs.rs/wholesym/latest/wholesym/struct.PeCodeId.html).
544    pub async fn get_file(&self, filename: &str, hash: &str) -> Result<PathBuf, Error> {
545        self.get_file_impl(filename, hash, true).await
546    }
547
548    /// Same as [`get_file`](Self::get_file), but only checks cache directories.
549    /// No downloads are attempted.
550    pub async fn get_file_no_download(&self, filename: &str, hash: &str) -> Result<PathBuf, Error> {
551        self.get_file_impl(filename, hash, false).await
552    }
553
554    async fn get_file_impl(
555        &self,
556        filename: &str,
557        hash: &str,
558        allow_downloads: bool,
559    ) -> Result<PathBuf, Error> {
560        let inner = self.inner.clone();
561        let filename = filename.to_owned();
562        let hash = hash.to_owned();
563
564        self.inflight_request_cache
565            .subscribe_or_compute(
566                &(filename.clone(), hash.clone(), allow_downloads),
567                move || {
568                    let f =
569                        async move { inner.get_file_impl(&filename, &hash, allow_downloads).await };
570                    Box::pin(f)
571                },
572            )
573            .await
574    }
575}
576
577impl SymsrvDownloaderInner {
578    pub fn new(symbol_path: Vec<NtSymbolPathEntry>) -> Self {
579        let builder = reqwest::Client::builder();
580
581        // Turn off HTTP 2, in order to work around https://github.com/seanmonstar/reqwest/issues/1761 .
582        let builder = builder.http1_only();
583
584        // Turn off automatic decompression because it doesn't allow us to compute
585        // download progress percentages: we'd only know the decompressed current
586        // size and the compressed total size.
587        // Instead, we do the streaming decompression manually, see download.rs.
588        let builder = builder.no_gzip().no_brotli().no_deflate();
589
590        // Create the client.
591        // TODO: Add timeouts, user agent, maybe other settings
592        let client = builder.build();
593
594        Self {
595            symbol_path,
596            default_downstream_store: None,
597            observer: None,
598            reqwest_client: client,
599        }
600    }
601
602    pub async fn get_file_impl(
603        &self,
604        filename: &str,
605        hash: &str,
606        allow_downloads: bool,
607    ) -> Result<PathBuf, Error> {
608        let path: PathBuf = [filename, hash, filename].iter().collect();
609        let rel_path_uncompressed = &path;
610        let rel_path_compressed = create_compressed_path(rel_path_uncompressed)?;
611
612        // This array will contain cache paths from `cache*` entries. These get added
613        // once they are encountered. Once encountered, they apply to all subsequent
614        // entries.
615        let mut persisted_cache_paths: Vec<CachePath> = Vec::new();
616
617        // Iterate all entries in the symbol path, checking them for matches one by one.
618        for entry in &self.symbol_path {
619            match entry {
620                NtSymbolPathEntry::Cache(cache_dir) => {
621                    let cache_path = CachePath::Path(cache_dir.into());
622                    if persisted_cache_paths.contains(&cache_path) {
623                        continue;
624                    }
625
626                    // Check if the symbol file is present in this cache. If found, also persist
627                    // it to the previous cache paths.
628                    if let Some(found_path) = self
629                        .check_directory(
630                            cache_dir,
631                            &persisted_cache_paths,
632                            rel_path_uncompressed,
633                            &rel_path_compressed,
634                        )
635                        .await?
636                    {
637                        return Ok(found_path);
638                    }
639
640                    // Add this path to `persisted_cache_paths` so that any matches in the
641                    // upcoming entries can be persisted to this cache.
642                    persisted_cache_paths.push(cache_path);
643                }
644                NtSymbolPathEntry::Chain {
645                    cache_paths, urls, ..
646                } => {
647                    // If the symbol file is found, it should also be persisted (copied) to all
648                    // of these paths.
649                    let mut parent_cache_paths = persisted_cache_paths.clone();
650
651                    for cache_path in cache_paths {
652                        if parent_cache_paths.contains(cache_path) {
653                            continue;
654                        }
655                        parent_cache_paths.push(cache_path.clone());
656
657                        // Check if the symbol file is present at this path. If found, also persist
658                        // it to the previous cache paths.
659                        let (_, parent_cache_paths) = parent_cache_paths.split_last().unwrap();
660                        if let Some(cache_dir) = self.resolve_cache_path(cache_path) {
661                            if let Some(found_path) = self
662                                .check_directory(
663                                    cache_dir,
664                                    parent_cache_paths,
665                                    rel_path_uncompressed,
666                                    &rel_path_compressed,
667                                )
668                                .await?
669                            {
670                                return Ok(found_path);
671                            }
672                        }
673                    }
674
675                    // The symbol file was not found in any of the cache paths. Try to download it
676                    // from the server URLs in this entry.
677                    if !allow_downloads {
678                        // We're not allowed to download anything. Go to the next entry.
679                        continue;
680                    }
681
682                    // First, make sure we have a place to download to.
683                    let (download_dest_cache, remaining_caches) = parent_cache_paths
684                        .split_last()
685                        .unwrap_or((&CachePath::DefaultDownstreamStore, &[]));
686                    let download_dest_cache_dir = self
687                        .resolve_cache_path(download_dest_cache)
688                        .ok_or(Error::NoDefaultDownstreamStore)?;
689                    let bottom_cache = parent_cache_paths
690                        .first()
691                        .unwrap_or(&CachePath::DefaultDownstreamStore);
692
693                    // Make a list of URLs to try. For each URL, we try both the uncompressed and
694                    // compressed file, in that order.
695                    let mut file_urls = Vec::with_capacity(urls.len() * 2);
696                    for server_url in urls {
697                        file_urls.push((
698                            url_join(server_url, rel_path_uncompressed.components()),
699                            false,
700                        ));
701                        file_urls
702                            .push((url_join(server_url, rel_path_compressed.components()), true));
703                    }
704
705                    // Prepare requests to all candidate URLs.
706                    let response_futures: Vec<_> = file_urls
707                        .into_iter()
708                        .map(|(file_url, is_compressed)| async move {
709                            (
710                                self.prepare_download_of_file(&file_url).await,
711                                is_compressed,
712                            )
713                        })
714                        .map(Box::pin)
715                        .collect();
716
717                    // Start all requests and wait for the first successful response, then cancel
718                    // all other requests by dropping the array of futures.
719                    let Some((notifier, response, is_compressed)) = async {
720                        let mut response_futures = PollAllPreservingOrder::new(response_futures);
721                        while let Some(next_response) = response_futures.next().await {
722                            let (prepared_response, is_compressed) = next_response;
723                            if let Some((notifier, response)) = prepared_response {
724                                // This request returned a success status from the server.
725                                return Some((notifier, response, is_compressed));
726                            };
727                        }
728                        None
729                    }
730                    .await
731                    else {
732                        // All requests failed.
733                        // We are done with this `NtSymbolPathEntry::Chain`. Go to the next entry.
734                        continue;
735                    };
736
737                    // If we get here, we have a response with a success HTTP status.
738                    // Download the file. If successful, also persist the file to the previous cache paths.
739
740                    let uncompressed_dest_path = if is_compressed {
741                        let (rx, tx) = remotely_fed_cursor::create_cursor_channel();
742                        let download_dest_path_future = self.download_file_to_cache(
743                            notifier,
744                            response,
745                            &rel_path_compressed,
746                            download_dest_cache_dir,
747                            Some(tx),
748                        );
749                        let extraction_result_future = self.extract_to_file_in_cache(
750                            CabDataSource::Cursor(rx),
751                            rel_path_uncompressed,
752                            bottom_cache,
753                        );
754                        let (download_dest_path, extraction_result) =
755                            future::join(download_dest_path_future, extraction_result_future).await;
756                        let Some(dest_path) = download_dest_path else {
757                            continue;
758                        };
759
760                        // We have a file!
761                        if let Some((_remaining_bottom_cache, remaining_mid_level_caches)) =
762                            remaining_caches.split_first()
763                        {
764                            // Copy the compressed file to the mid-level caches.
765                            self.copy_file_to_caches(
766                                &rel_path_compressed,
767                                &dest_path,
768                                remaining_mid_level_caches,
769                            )
770                            .await;
771                        }
772
773                        // Return the path to the uncompressed file in the bottom cache.
774                        extraction_result?
775                    } else {
776                        let dest_path = self
777                            .download_file_to_cache(
778                                notifier,
779                                response,
780                                rel_path_uncompressed,
781                                download_dest_cache_dir,
782                                None,
783                            )
784                            .await;
785                        let Some(dest_path) = dest_path else { continue };
786
787                        // The file is not compressed. Just copy to the other caches.
788                        self.copy_file_to_caches(
789                            rel_path_uncompressed,
790                            &dest_path,
791                            remaining_caches,
792                        )
793                        .await;
794                        dest_path
795                    };
796                    return Ok(uncompressed_dest_path);
797                }
798                NtSymbolPathEntry::LocalOrShare(dir_path) => {
799                    if persisted_cache_paths.contains(&CachePath::Path(dir_path.into())) {
800                        continue;
801                    }
802
803                    // Check if the symbol file is present at this path. If found, also persist
804                    // it to the previous cache paths.
805                    if let Some(found_path) = self
806                        .check_directory(
807                            dir_path,
808                            &persisted_cache_paths,
809                            rel_path_uncompressed,
810                            &rel_path_compressed,
811                        )
812                        .await?
813                    {
814                        return Ok(found_path);
815                    };
816                }
817            }
818        }
819        Err(Error::NotFound)
820    }
821
822    /// Return whether a file is found at `path`, and notify the observer if not.
823    async fn check_file_exists(&self, path: &Path) -> bool {
824        let file_exists = matches!(tokio::fs::metadata(path).await, Ok(meta) if meta.is_file());
825        if !file_exists {
826            if let Some(observer) = self.observer.as_deref() {
827                observer.on_file_missed(path);
828            }
829        }
830        file_exists
831    }
832
833    fn resolve_cache_path<'a>(&'a self, cache_path: &'a CachePath) -> Option<&'a Path> {
834        match cache_path {
835            CachePath::Path(path) => Some(path),
836            CachePath::DefaultDownstreamStore => self.default_downstream_store.as_deref(),
837        }
838    }
839
840    /// Attempt to find the file on the local file system. This is done first, before any downloading
841    /// is attempted. If a file is found, it is copied into the caches given by `parent_cache_paths`
842    /// and uncompressed if needed. On success, the bottom-most cache in `parent_cache_paths` (i.e.
843    /// the first entry) will always have the uncompressed file, and the other caches with have
844    /// whichever file was found in `dir`.
845    async fn check_directory(
846        &self,
847        dir: &Path,
848        parent_cache_paths: &[CachePath],
849        rel_path_uncompressed: &Path,
850        rel_path_compressed: &Path,
851    ) -> Result<Option<PathBuf>, Error> {
852        let full_candidate_path = dir.join(rel_path_uncompressed);
853        let full_candidate_path_compr = dir.join(rel_path_compressed);
854
855        let (abs_path, is_compressed) = if self.check_file_exists(&full_candidate_path).await {
856            (full_candidate_path, false)
857        } else if self.check_file_exists(&full_candidate_path_compr).await {
858            (full_candidate_path_compr, true)
859        } else {
860            return Ok(None);
861        };
862
863        // We found a file. Yay!
864
865        if let Some(observer) = self.observer.as_deref() {
866            observer.on_file_accessed(&abs_path);
867        }
868
869        let uncompressed_path = if is_compressed {
870            if let Some((bottom_most_cache, mid_level_caches)) = parent_cache_paths.split_first() {
871                // We have at least one cache, and the file is compressed.
872                // Copy the compressed file to the mid-level caches, and uncompress the file
873                // into the bottom-most cache.
874                self.copy_file_to_caches(rel_path_compressed, &abs_path, mid_level_caches)
875                    .await;
876                self.extract_to_file_in_cache(
877                    CabDataSource::Filename(abs_path.clone()),
878                    rel_path_uncompressed,
879                    bottom_most_cache,
880                )
881                .await?
882            } else {
883                // We have no cache. Extract it into the default downstream cache.
884                self.extract_to_file_in_cache(
885                    CabDataSource::Filename(abs_path.clone()),
886                    rel_path_uncompressed,
887                    &CachePath::DefaultDownstreamStore,
888                )
889                .await?
890            }
891        } else {
892            abs_path
893        };
894
895        Ok(Some(uncompressed_path))
896    }
897
898    /// Copy the file at `abs_path` to the cache directories given by `caches`, using
899    /// `rel_path` to create the correct destination path for each cache.
900    async fn copy_file_to_caches(&self, rel_path: &Path, abs_path: &Path, caches: &[CachePath]) {
901        for cache_path in caches {
902            if let Some(cache_dir) = self.resolve_cache_path(cache_path) {
903                if let Ok(dest_path) = self
904                    .make_dest_path_and_ensure_parent_dirs(rel_path, cache_dir)
905                    .await
906                {
907                    // TODO: Check what happens if this process dies in the middle of copying
908                    // - do we leave a half-copied file behind? Should we use `create_file_cleanly`?
909                    if let Ok(copied_bytes) = tokio::fs::copy(&abs_path, &dest_path).await {
910                        if let Some(observer) = self.observer.as_deref() {
911                            observer.on_file_created(&dest_path, copied_bytes);
912                        }
913                    }
914                }
915            }
916        }
917    }
918
919    /// Given a relative file path and a cache directory path, concatenate the two to make
920    /// a destination path, and create the necessary directories so that a file can be stored
921    /// at the destination path.
922    async fn make_dest_path_and_ensure_parent_dirs(
923        &self,
924        rel_path: &Path,
925        cache_path: &Path,
926    ) -> Result<PathBuf, std::io::Error> {
927        let dest_path = cache_path.join(rel_path);
928        if let Some(dir) = dest_path.parent() {
929            tokio::fs::create_dir_all(dir).await?;
930        }
931        Ok(dest_path)
932    }
933
934    /// Uncompress the cab-compressed `bytes` and store the result in a cache
935    /// directory.
936    async fn extract_to_file_in_cache(
937        &self,
938        cab_data_source: CabDataSource,
939        rel_path: &Path,
940        cache_path: &CachePath,
941    ) -> Result<PathBuf, Error> {
942        let cache_path = self
943            .resolve_cache_path(cache_path)
944            .ok_or(Error::NoDefaultDownstreamStore)?;
945        let dest_path = self
946            .make_dest_path_and_ensure_parent_dirs(rel_path, cache_path)
947            .await?;
948
949        let notifier = {
950            let observer = self.observer.clone();
951            let extraction_id =
952                NEXT_DOWNLOAD_OR_EXTRACTION_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
953            if let Some(observer) = observer.as_deref() {
954                observer.on_new_cab_extraction(extraction_id, &dest_path);
955            }
956            ExtractionStatusReporter::new(extraction_id, observer)
957        };
958        let extraction_id = notifier.extraction_id();
959
960        let observer = self.observer.clone();
961        let extracted_size_result = create_file_cleanly(
962            &dest_path,
963            |mut dest_file: std::fs::File| async {
964                tokio::task::spawn_blocking(move || match cab_data_source {
965                    CabDataSource::Filename(compressed_input_path) => {
966                        let file = std::fs::File::open(compressed_input_path)
967                            .map_err(CabExtractionError::CouldNotOpenCabFile)?;
968                        let buf_read = BufReader::new(file);
969                        extract_cab_to_file(extraction_id, buf_read, &mut dest_file, observer)
970                    }
971                    CabDataSource::Cursor(cursor) => {
972                        extract_cab_to_file(extraction_id, cursor, &mut dest_file, observer)
973                    }
974                })
975                .await
976                .expect("task panicked")
977            },
978            || async {
979                let size = std::fs::metadata(&dest_path)
980                    .map_err(|_| {
981                        CabExtractionError::Other(
982                            "Could not get size of existing extracted file".into(),
983                        )
984                    })?
985                    .len();
986                Ok(size)
987            },
988        )
989        .await;
990
991        let extracted_size = match extracted_size_result {
992            Ok(size) => size,
993            Err(e) => {
994                let error = Error::CabExtraction(format!("{}", e));
995                match e {
996                    CleanFileCreationError::CallbackIndicatedError(e) => {
997                        notifier.extraction_failed(e);
998                    }
999                    _ => {
1000                        notifier.extraction_failed(CabExtractionError::FileWriting(e.into()));
1001                    }
1002                }
1003                return Err(error);
1004            }
1005        };
1006
1007        notifier.extraction_completed(extracted_size, Instant::now());
1008
1009        if let Some(observer) = self.observer.as_deref() {
1010            observer.on_file_created(&dest_path, extracted_size);
1011        }
1012        Ok(dest_path)
1013    }
1014
1015    async fn prepare_download_of_file(
1016        &self,
1017        url: &str,
1018    ) -> Option<(DownloadStatusReporter, reqwest::Response)> {
1019        let download_id =
1020            NEXT_DOWNLOAD_OR_EXTRACTION_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1021        if let Some(observer) = self.observer.as_deref() {
1022            observer.on_new_download_before_connect(download_id, url);
1023        }
1024
1025        let reporter = DownloadStatusReporter::new(download_id, self.observer.clone());
1026
1027        let reqwest_client = match self.reqwest_client.as_ref() {
1028            Ok(client) => client,
1029            Err(e) => {
1030                reporter.download_failed(DownloadError::ClientCreationFailed(e.to_string()));
1031                return None;
1032            }
1033        };
1034
1035        let request_builder = reqwest_client.get(url);
1036
1037        // Manually specify the Accept-Encoding header.
1038        // This would happen automatically if we hadn't turned off automatic
1039        // decompression for this reqwest client.
1040        let request_builder = request_builder.header("Accept-Encoding", "gzip");
1041
1042        // Send the request and wait for the headers.
1043        let response_result = request_builder.send().await;
1044
1045        // Check the HTTP status code.
1046        let response_result = response_result.and_then(|response| response.error_for_status());
1047
1048        let response = match response_result {
1049            Ok(response) => response,
1050            Err(e) => {
1051                // The request failed, most commonly due to a 404 status code.
1052                reporter.download_failed(DownloadError::from(e));
1053                return None;
1054            }
1055        };
1056
1057        Some((reporter, response))
1058    }
1059
1060    /// Download the file at `url` to a file in `cache_dir``.
1061    async fn download_file_to_cache(
1062        &self,
1063        reporter: DownloadStatusReporter,
1064        response: reqwest::Response,
1065        rel_path: &Path,
1066        cache_dir: &Path,
1067        mut chunk_consumer: Option<RemotelyFedCursorFeeder>,
1068    ) -> Option<PathBuf> {
1069        // We have a response with a success status code.
1070        let ts_after_status = Instant::now();
1071        let download_id = reporter.download_id();
1072        if let Some(observer) = self.observer.as_deref() {
1073            observer.on_download_started(download_id);
1074        }
1075
1076        let dest_path = match self
1077            .make_dest_path_and_ensure_parent_dirs(rel_path, cache_dir)
1078            .await
1079        {
1080            Ok(dest_path) => dest_path,
1081            Err(_e) => {
1082                reporter.download_failed(DownloadError::CouldNotCreateDestinationDirectory);
1083                return None;
1084            }
1085        };
1086
1087        let observer = self.observer.clone();
1088        let mut stream = match response_to_uncompressed_stream_with_progress(
1089            response,
1090            move |bytes_so_far, total_bytes| {
1091                if let Some(observer) = observer.as_deref() {
1092                    observer.on_download_progress(download_id, bytes_so_far, total_bytes)
1093                }
1094            },
1095        ) {
1096            Ok(stream) => stream,
1097            Err(download::Error::UnexpectedContentEncoding(encoding)) => {
1098                reporter.download_failed(DownloadError::UnexpectedContentEncoding(encoding));
1099                return None;
1100            }
1101        };
1102
1103        let download_result: Result<u64, CleanFileCreationError<std::io::Error>> =
1104            create_file_cleanly(
1105                &dest_path,
1106                |dest_file: std::fs::File| async move {
1107                    let mut dest_file = tokio::fs::File::from_std(dest_file);
1108                    let mut buf = vec![0u8; 2 * 1024 * 1024];
1109                    let mut uncompressed_size_in_bytes = 0;
1110                    loop {
1111                        let count = stream.read(&mut buf).await?;
1112                        if count == 0 {
1113                            break;
1114                        }
1115                        uncompressed_size_in_bytes += count as u64;
1116                        dest_file.write_all(&buf[..count]).await?;
1117
1118                        if let Some(chunk_consumer) = &mut chunk_consumer {
1119                            chunk_consumer.feed(&buf[..count]);
1120                        }
1121                    }
1122                    if let Some(chunk_consumer) = &mut chunk_consumer {
1123                        chunk_consumer.mark_complete();
1124                    }
1125                    dest_file.flush().await?;
1126                    Ok(uncompressed_size_in_bytes)
1127                },
1128                || async {
1129                    let size = std::fs::metadata(&dest_path)?.len();
1130                    Ok(size)
1131                },
1132            )
1133            .await;
1134
1135        let uncompressed_size_in_bytes = match download_result {
1136            Ok(size) => size,
1137            Err(CleanFileCreationError::CallbackIndicatedError(e)) => {
1138                reporter.download_failed(DownloadError::ErrorDuringDownloading(e));
1139                return None;
1140            }
1141            Err(e) => {
1142                reporter.download_failed(DownloadError::ErrorWhileWritingDownloadedFile(e.into()));
1143                return None;
1144            }
1145        };
1146
1147        let ts_after_download = Instant::now();
1148        reporter.download_completed(
1149            uncompressed_size_in_bytes,
1150            ts_after_status,
1151            ts_after_download,
1152        );
1153
1154        if let Some(observer) = self.observer.as_deref() {
1155            observer.on_file_created(&dest_path, uncompressed_size_in_bytes);
1156        }
1157
1158        Some(dest_path)
1159    }
1160}
1161
1162enum CabDataSource {
1163    Filename(PathBuf),
1164    Cursor(RemotelyFedCursor),
1165}
1166
1167fn get_first_file_entry<R: Read + Seek>(cabinet: &mut cab::Cabinet<R>) -> Option<(String, u64)> {
1168    for folder in cabinet.folder_entries() {
1169        if let Some(file) = folder.file_entries().next() {
1170            return Some((file.name().to_owned(), file.uncompressed_size().into()));
1171        }
1172    }
1173    None
1174}
1175
1176fn extract_cab_to_file<R: Read + Seek>(
1177    extraction_id: u64,
1178    source_data: R,
1179    dest_file: &mut std::fs::File,
1180    observer: Option<Arc<dyn SymsrvObserver>>,
1181) -> Result<u64, CabExtractionError> {
1182    use CabExtractionError::*;
1183    let mut cabinet = cab::Cabinet::new(source_data).map_err(CabParsing)?;
1184    let (file_entry_name, file_extracted_size) =
1185        get_first_file_entry(&mut cabinet).ok_or(EmptyCab)?;
1186    let mut reader = cabinet.read_file(&file_entry_name).map_err(CabParsing)?;
1187
1188    let mut bytes_written = 0;
1189    loop {
1190        let mut buf = [0; 4096];
1191        let bytes_read = reader.read(&mut buf).map_err(CabReading)?;
1192        if bytes_read == 0 {
1193            break;
1194        }
1195        dest_file
1196            .write_all(&buf[..bytes_read])
1197            .map_err(FileWriting)?;
1198        bytes_written += bytes_read as u64;
1199
1200        if let Some(observer) = observer.as_deref() {
1201            observer.on_cab_extraction_progress(extraction_id, bytes_written, file_extracted_size);
1202        }
1203    }
1204
1205    Ok(bytes_written)
1206}
1207
1208/// Convert a relative `Path` into a URL by appending the components to the
1209/// given base URL.
1210fn url_join(base_url: &str, components: std::path::Components) -> String {
1211    format!(
1212        "{}/{}",
1213        base_url.trim_end_matches('/'),
1214        components
1215            .map(|c| c.as_os_str().to_string_lossy())
1216            .collect::<Vec<_>>()
1217            .join("/")
1218    )
1219}
1220
1221/// From a path to the uncompressed exe/dll/pdb file, create the path to the
1222/// compressed file, by replacing the last char of the file extension with
1223/// an underscore. These files are cab-compressed.
1224fn create_compressed_path(uncompressed_path: &Path) -> Result<PathBuf, Error> {
1225    let uncompressed_ext = match uncompressed_path.extension() {
1226        Some(ext) => match ext.to_string_lossy().deref() {
1227            "exe" => "ex_",
1228            "dll" => "dl_",
1229            "pdb" => "pd_",
1230            "dbg" => "db_",
1231            _ => return Err(Error::UnrecognizedExtension),
1232        },
1233        None => return Err(Error::NoExtension),
1234    };
1235
1236    let mut compressed_path = uncompressed_path.to_owned();
1237    compressed_path.set_extension(uncompressed_ext);
1238    Ok(compressed_path)
1239}
1240
1241/// A helper struct with a drop handler. This lets us detect when a download
1242/// is cancelled by dropping the future.
1243struct DownloadStatusReporter {
1244    /// Set to `None` when `download_failed()` or `download_completed()` is called.
1245    download_id: Option<u64>,
1246    observer: Option<Arc<dyn SymsrvObserver>>,
1247    ts_before_connect: Instant,
1248}
1249
1250impl DownloadStatusReporter {
1251    pub fn new(download_id: u64, observer: Option<Arc<dyn SymsrvObserver>>) -> Self {
1252        Self {
1253            download_id: Some(download_id),
1254            observer,
1255            ts_before_connect: Instant::now(),
1256        }
1257    }
1258
1259    pub fn download_id(&self) -> u64 {
1260        self.download_id.unwrap()
1261    }
1262
1263    pub fn download_failed(mut self, e: DownloadError) {
1264        if let (Some(download_id), Some(observer)) = (self.download_id, self.observer.as_deref()) {
1265            observer.on_download_failed(download_id, e);
1266        }
1267        self.download_id = None;
1268        // Drop self. Now the Drop handler won't do anything.
1269    }
1270
1271    pub fn download_completed(
1272        mut self,
1273        uncompressed_size_in_bytes: u64,
1274        ts_after_headers: Instant,
1275        ts_after_completed: Instant,
1276    ) {
1277        if let (Some(download_id), Some(observer)) = (self.download_id, self.observer.as_deref()) {
1278            let time_until_headers = ts_after_headers.duration_since(self.ts_before_connect);
1279            let time_until_completed = ts_after_completed.duration_since(self.ts_before_connect);
1280            observer.on_download_completed(
1281                download_id,
1282                uncompressed_size_in_bytes,
1283                time_until_headers,
1284                time_until_completed,
1285            );
1286        }
1287        self.download_id = None;
1288        // Drop self. Now the Drop handler won't do anything.
1289    }
1290}
1291
1292impl Drop for DownloadStatusReporter {
1293    fn drop(&mut self) {
1294        if let (Some(download_id), Some(observer)) = (self.download_id, self.observer.as_deref()) {
1295            // We were dropped before a call to `download_failed` or `download_completed`.
1296            // This was most likely because the future we were stored in was dropped.
1297            // Tell the observer.
1298            observer.on_download_canceled(download_id);
1299        }
1300    }
1301}
1302
1303/// A helper struct with a drop handler. This lets us detect when a extraction
1304/// is cancelled by dropping the future.
1305struct ExtractionStatusReporter {
1306    /// Set to `None` when `extraction_failed()` or `extraction_completed()` is called.
1307    extraction_id: Option<u64>,
1308    observer: Option<Arc<dyn SymsrvObserver>>,
1309    ts_before_start: Instant,
1310}
1311
1312impl ExtractionStatusReporter {
1313    pub fn new(extraction_id: u64, observer: Option<Arc<dyn SymsrvObserver>>) -> Self {
1314        Self {
1315            extraction_id: Some(extraction_id),
1316            observer,
1317            ts_before_start: Instant::now(),
1318        }
1319    }
1320
1321    pub fn extraction_id(&self) -> u64 {
1322        self.extraction_id.unwrap()
1323    }
1324
1325    pub fn extraction_failed(mut self, e: CabExtractionError) {
1326        if let (Some(extraction_id), Some(observer)) =
1327            (self.extraction_id, self.observer.as_deref())
1328        {
1329            observer.on_cab_extraction_failed(extraction_id, e);
1330        }
1331        self.extraction_id = None;
1332        // Drop self. Now the Drop handler won't do anything.
1333    }
1334
1335    pub fn extraction_completed(
1336        mut self,
1337        uncompressed_size_in_bytes: u64,
1338        ts_after_completed: Instant,
1339    ) {
1340        if let (Some(extraction_id), Some(observer)) =
1341            (self.extraction_id, self.observer.as_deref())
1342        {
1343            let time_until_completed = ts_after_completed.duration_since(self.ts_before_start);
1344            observer.on_cab_extraction_completed(
1345                extraction_id,
1346                uncompressed_size_in_bytes,
1347                time_until_completed,
1348            );
1349        }
1350        self.extraction_id = None;
1351        // Drop self. Now the Drop handler won't do anything.
1352    }
1353}
1354
1355impl Drop for ExtractionStatusReporter {
1356    fn drop(&mut self) {
1357        if let (Some(extraction_id), Some(observer)) =
1358            (self.extraction_id, self.observer.as_deref())
1359        {
1360            // We were dropped before a call to `extraction_failed` or `extraction_completed`.
1361            // This was most likely because the future we were stored in was dropped.
1362            // Tell the observer.
1363            observer.on_cab_extraction_canceled(extraction_id);
1364        }
1365    }
1366}